From 30a17196f54ca3aa1b09082a5db7706ef7b87b8e Mon Sep 17 00:00:00 2001 From: Gabriel Brown Date: Tue, 23 Jun 2026 21:38:41 -0400 Subject: [PATCH] fix worker forreal --- apps/agent-worker/src/agent-events.ts | 33 ++++++-- apps/agent-worker/src/git.ts | 34 ++++++++- apps/agent-worker/src/runtime/docker.ts | 21 +++++- apps/agent-worker/src/worker.ts | 75 ++++++++++++------- .../tests/unit/agent-events.test.ts | 45 ++++++++++- 5 files changed, 169 insertions(+), 39 deletions(-) diff --git a/apps/agent-worker/src/agent-events.ts b/apps/agent-worker/src/agent-events.ts index 2b88590..9505b4d 100644 --- a/apps/agent-worker/src/agent-events.ts +++ b/apps/agent-worker/src/agent-events.ts @@ -82,8 +82,10 @@ const toolNameFromRecord = (record: Record | null) => record?.toolName ?? record?.name ?? record?.function ?? - record?.type ?? - record?.command ?? + (stringify(record?.type).toLowerCase().includes('exec') || + record?.command + ? 'Command' + : record?.type) ?? 'tool', ); @@ -103,9 +105,15 @@ const toolOutputFromRecord = ( ) => stringify( record?.output ?? + record?.aggregated_output ?? + record?.stdout ?? + record?.stderr ?? record?.result ?? record?.content ?? record?.text ?? + (record?.exit_code !== undefined + ? `exit code: ${stringify(record.exit_code)}` + : undefined) ?? fallback, ); @@ -121,11 +129,17 @@ const recordLooksLikeTool = ( recordType.includes('tool') || recordType.includes('function_call') || recordType.includes('local_shell_call') || + recordType.includes('exec_command') || + recordType.includes('command') || recordType.includes('mcp') || - Boolean(record?.tool ?? record?.tool_name ?? record?.name) + Boolean(record?.tool ?? record?.tool_name ?? record?.name ?? record?.command) ); }; +const isCodexConfigWarning = (message: string) => + message.includes('`[features].codex_hooks` is deprecated') || + message.includes('Use `[features].hooks` instead'); + export const normalizeCodexJsonLine = ( line: string, ): NormalizedAgentEvent[] => { @@ -205,13 +219,22 @@ export const normalizeCodexJsonLine = ( itemType.includes('message') || itemType.includes('agent_message')) ) { - events.push({ kind: 'assistant_delta', content: text }); + events.push({ + kind: 'assistant_delta', + content: itemType.includes('agent_message') ? `${text.trim()}\n\n` : text, + externalMessageId: stringify(item?.id ?? event.id), + }); } const error = event.error ?? item?.error; if (error || itemType === 'error') { + const message = stringify(error ?? item?.message ?? event.message); + if (isCodexConfigWarning(message)) { + events.push({ kind: 'status', status: message }); + return events; + } events.push({ kind: 'error', - message: stringify(error ?? item?.message ?? event.message), + message, }); } const command = diff --git a/apps/agent-worker/src/git.ts b/apps/agent-worker/src/git.ts index f07380d..6a7a3f3 100644 --- a/apps/agent-worker/src/git.ts +++ b/apps/agent-worker/src/git.ts @@ -126,12 +126,42 @@ export const getDiff = async ( export const getWorktreeDiff = async ( repoDir: string, redact: (value: string) => string, -) => - await run('git', ['diff', '--', '.'], { +) => { + const trackedDiff = await run('git', ['diff', '--', '.'], { cwd: repoDir, redact, timeoutMs: 60_000, }); + const untracked = await run( + 'git', + ['ls-files', '--others', '--exclude-standard'], + { + cwd: repoDir, + redact, + timeoutMs: 60_000, + }, + ); + const untrackedDiffs: string[] = []; + for (const filePath of untracked.output.split('\n').filter(Boolean)) { + const diff = await run( + 'git', + ['diff', '--no-index', '--', '/dev/null', filePath], + { + cwd: repoDir, + redact, + timeoutMs: 60_000, + }, + ); + if (diff.output.trim()) untrackedDiffs.push(diff.output); + } + return { + exitCode: + trackedDiff.exitCode === 0 && untracked.exitCode === 0 ? 0 : 1, + output: [trackedDiff.output, ...untrackedDiffs] + .filter((part) => part.trim()) + .join('\n'), + }; +}; export const getStatus = async ( repoDir: string, diff --git a/apps/agent-worker/src/runtime/docker.ts b/apps/agent-worker/src/runtime/docker.ts index 9d7e3d0..eb339ce 100644 --- a/apps/agent-worker/src/runtime/docker.ts +++ b/apps/agent-worker/src/runtime/docker.ts @@ -54,6 +54,7 @@ export const runInJobContainer = async (args: { { all: true, reject: false, + stdin: 'ignore', timeout: args.timeoutMs, }, ); @@ -102,7 +103,7 @@ export const startWorkspaceContainer = async (args: { env.jobImage, ...(args.command ?? ['sleep', 'infinity']), ], - { all: true }, + { all: true, stdin: 'ignore' }, ); return { containerId: result.stdout.trim(), @@ -117,7 +118,7 @@ const getPublishedPort = async (containerName: string, containerPort: number) => const result = await execa( containerRuntime(), ['port', containerName, `${containerPort}/tcp`], - { all: true, reject: false }, + { all: true, reject: false, stdin: 'ignore' }, ); const output = result.all.trim(); const match = /:(\d+)\s*$/.exec(output); @@ -147,6 +148,7 @@ export const execInWorkspaceContainer = async (args: { { all: true, reject: false, + stdin: 'ignore', timeout: args.timeoutMs, }, ); @@ -186,6 +188,7 @@ export const streamInJobContainer = async (args: { { all: true, reject: false, + stdin: 'ignore', timeout: args.timeoutMs, }, ); @@ -220,7 +223,19 @@ export const streamInJobContainer = async (args: { consume(chunk, 'stderr', args.onStderrLine), ); }); - const result = await subprocess; + let result: Awaited; + try { + result = await subprocess; + } catch (error) { + await lineHandlers; + const outputText = output.join(''); + const message = + error instanceof Error ? error.message : 'Container command failed.'; + return { + exitCode: 1, + output: args.redact(`${outputText}${outputText ? '\n' : ''}${message}`), + }; + } await lineHandlers; if (stdoutBuffer && args.onStdoutLine) { await args.onStdoutLine(args.redact(stdoutBuffer)); diff --git a/apps/agent-worker/src/worker.ts b/apps/agent-worker/src/worker.ts index 02568ef..73a1c1e 100644 --- a/apps/agent-worker/src/worker.ts +++ b/apps/agent-worker/src/worker.ts @@ -110,6 +110,7 @@ type ActiveWorkspace = { codexSessionId?: string; agentTurnActive?: boolean; resolveTurn?: () => void; + lastRecordedDiffSignature?: string; }; type FileTreeNode = { @@ -430,8 +431,8 @@ const codexModel = (claim: Claim) => { return model.includes('/') ? model.split('/').at(-1) ?? model : model; }; -const codexModelFlag = (claim: Claim) => - isCodexLoginProfile(claim) ? '' : ` --model ${quoteShell(codexModel(claim))}`; +const codexModelArgs = (claim: Claim) => + isCodexLoginProfile(claim) ? [] : ['--model', codexModel(claim)]; const writeJsonFile = async (filePath: string, content: string) => { let normalized = content.trim(); @@ -696,20 +697,26 @@ const runCodexTurn = async (args: { codexSessionId: workspace.codexSessionId, }); const command = workspace.codexSessionId - ? commandToShell( - `codex exec resume --json${codexModelFlag( - workspace.claim, - )} --dangerously-bypass-approvals-and-sandbox ${quoteShell( - workspace.codexSessionId, - )} ${quoteShell(prompt)}`, - ) - : commandToShell( - `codex exec --json${codexModelFlag( - workspace.claim, - )} --dangerously-bypass-approvals-and-sandbox --cd ${quoteShell( - codexContainerRepo, - )} ${quoteShell(prompt)}`, - ); + ? [ + 'codex', + 'exec', + 'resume', + '--json', + ...codexModelArgs(workspace.claim), + '--dangerously-bypass-approvals-and-sandbox', + workspace.codexSessionId, + prompt, + ] + : [ + 'codex', + 'exec', + '--json', + ...codexModelArgs(workspace.claim), + '--dangerously-bypass-approvals-and-sandbox', + '--cd', + codexContainerRepo, + prompt, + ]; const aiEnv = providerEnvironment(workspace.claim, codexContainerWorkspace); const secretEnv = Object.fromEntries( workspace.claim.secrets.map((secret) => [secret.name, secret.value]), @@ -734,12 +741,17 @@ const runCodexTurn = async (args: { } }, onStderrLine: async (line) => { - if (line.trim()) { + const trimmed = line.trim(); + if ( + trimmed && + trimmed !== 'Reading additional input from stdin...' && + !trimmed.includes('`[features].codex_hooks` is deprecated') + ) { await appendEvent( workspace.claim.job._id, 'info', 'plan', - truncate(line, 10_000), + truncate(trimmed, 10_000), ); } }, @@ -1034,6 +1046,9 @@ const recordChangedFiles = async ( workspace.repoDir, workspace.redact, ); + const signature = JSON.stringify({ diff, changes }); + if (signature === workspace.lastRecordedDiffSignature) return; + workspace.lastRecordedDiffSignature = signature; for (const change of changes) { await recordWorkspaceChange({ jobId: workspace.claim.job._id, @@ -1235,7 +1250,9 @@ const runClaim = async (claim: Claim) => { }); await appendEvent(jobId, 'info', 'plan', 'Interactive workspace is ready.'); - await sendWorkspaceMessage(jobId, systemPromptForJob(claim)); + await sendWorkspaceMessage(jobId, systemPromptForJob(claim), { + recordUserMessage: false, + }); } catch (error) { const message = error instanceof Error ? error.message : String(error); await appendEvent( @@ -1428,18 +1445,24 @@ export const replyToInteraction = async ( return { success: true }; }; -export const sendWorkspaceMessage = async (jobId: string, prompt: string) => { +export const sendWorkspaceMessage = async ( + jobId: string, + prompt: string, + options: { recordUserMessage?: boolean } = {}, +) => { const workspace = resolveWorkspace(jobId); const { claim, redact } = workspace; if (workspace.agentTurnActive) { throw new Error('Wait for the current agent turn to finish or abort it.'); } - await appendMessage({ - jobId: claim.job._id, - role: 'user', - status: 'completed', - content: prompt, - }); + if (options.recordUserMessage ?? true) { + await appendMessage({ + jobId: claim.job._id, + role: 'user', + status: 'completed', + content: prompt, + }); + } await appendEvent(claim.job._id, 'info', 'plan', 'Sending message to agent.'); let assistantMessageId: Id<'agentJobMessages'> | undefined; diff --git a/apps/agent-worker/tests/unit/agent-events.test.ts b/apps/agent-worker/tests/unit/agent-events.test.ts index e11063f..f12c880 100644 --- a/apps/agent-worker/tests/unit/agent-events.test.ts +++ b/apps/agent-worker/tests/unit/agent-events.test.ts @@ -95,7 +95,8 @@ describe('agent event normalization', () => { ), ).toContainEqual({ kind: 'assistant_delta', - content: 'I updated the auth provider.', + content: 'I updated the auth provider.\n\n', + externalMessageId: 'item-1', }); expect( @@ -125,6 +126,24 @@ describe('agent event normalization', () => { kind: 'error', message: '{\n "message": "request failed"\n}', }); + + expect( + normalizeCodexJsonLine( + JSON.stringify({ + type: 'item.completed', + item: { + id: 'item-warning', + type: 'error', + message: + '`[features].codex_hooks` is deprecated. Use `[features].hooks` instead.', + }, + }), + ), + ).toContainEqual({ + kind: 'status', + status: + '`[features].codex_hooks` is deprecated. Use `[features].hooks` instead.', + }); }); test('normalizes Codex tool item lifecycle events', () => { @@ -141,7 +160,7 @@ describe('agent event normalization', () => { ), ).toContainEqual({ kind: 'tool_started', - name: 'local_shell_call', + name: 'Command', input: 'bash -lc rg Authentik', externalMessageId: 'tool-1', }); @@ -160,10 +179,30 @@ describe('agent event normalization', () => { ), ).toContainEqual({ kind: 'tool_completed', - name: 'local_shell_call', + name: 'Command', output: 'apps/web/auth.ts', externalMessageId: 'tool-1', }); + + expect( + normalizeCodexJsonLine( + JSON.stringify({ + type: 'item.completed', + item: { + id: 'tool-2', + type: 'exec_command', + command: 'cat package.json', + aggregated_output: '{"scripts":{"build":"turbo build"}}', + exit_code: 0, + }, + }), + ), + ).toContainEqual({ + kind: 'tool_completed', + name: 'Command', + output: '{"scripts":{"build":"turbo build"}}', + externalMessageId: 'tool-2', + }); }); test('normalizes OpenCode assistant, tool, and permission events', () => {