diff --git a/apps/agent-worker/src/agent-events.ts b/apps/agent-worker/src/agent-events.ts index d20c05f..2b88590 100644 --- a/apps/agent-worker/src/agent-events.ts +++ b/apps/agent-worker/src/agent-events.ts @@ -139,9 +139,16 @@ export const normalizeCodexJsonLine = ( const event = asRecord(parsed); if (!event) return []; const type = stringify(event.type ?? event.event); - const id = event.id ?? event.session_id ?? event.sessionId; + const id = + event.id ?? + event.session_id ?? + event.sessionId ?? + event.thread_id ?? + event.threadId; const sessionId = - typeof id === 'string' && type.toLowerCase().includes('session') + typeof id === 'string' && + (type.toLowerCase().includes('session') || + type.toLowerCase().includes('thread.started')) ? id : undefined; const events: NormalizedAgentEvent[] = sessionId diff --git a/apps/agent-worker/src/runtime/docker.ts b/apps/agent-worker/src/runtime/docker.ts index 5e01af8..9d7e3d0 100644 --- a/apps/agent-worker/src/runtime/docker.ts +++ b/apps/agent-worker/src/runtime/docker.ts @@ -192,6 +192,7 @@ export const streamInJobContainer = async (args: { let stdoutBuffer = ''; let stderrBuffer = ''; const output: string[] = []; + let lineHandlers = Promise.resolve(); const consume = async ( chunk: Buffer, source: 'stdout' | 'stderr', @@ -210,12 +211,17 @@ export const streamInJobContainer = async (args: { } }; subprocess.stdout.on('data', (chunk: Buffer) => { - void consume(chunk, 'stdout', args.onStdoutLine); + lineHandlers = lineHandlers.then(() => + consume(chunk, 'stdout', args.onStdoutLine), + ); }); subprocess.stderr.on('data', (chunk: Buffer) => { - void consume(chunk, 'stderr', args.onStderrLine); + lineHandlers = lineHandlers.then(() => + consume(chunk, 'stderr', args.onStderrLine), + ); }); const result = await subprocess; + await lineHandlers; if (stdoutBuffer && args.onStdoutLine) { await args.onStdoutLine(args.redact(stdoutBuffer)); } diff --git a/apps/agent-worker/src/worker.ts b/apps/agent-worker/src/worker.ts index 2016b5e..02568ef 100644 --- a/apps/agent-worker/src/worker.ts +++ b/apps/agent-worker/src/worker.ts @@ -430,6 +430,9 @@ const codexModel = (claim: Claim) => { return model.includes('/') ? model.split('/').at(-1) ?? model : model; }; +const codexModelFlag = (claim: Claim) => + isCodexLoginProfile(claim) ? '' : ` --model ${quoteShell(codexModel(claim))}`; + const writeJsonFile = async (filePath: string, content: string) => { let normalized = content.trim(); try { @@ -694,15 +697,15 @@ const runCodexTurn = async (args: { }); const command = workspace.codexSessionId ? commandToShell( - `codex exec resume --json --model ${quoteShell( - codexModel(workspace.claim), + `codex exec resume --json${codexModelFlag( + workspace.claim, )} --dangerously-bypass-approvals-and-sandbox ${quoteShell( workspace.codexSessionId, )} ${quoteShell(prompt)}`, ) : commandToShell( - `codex exec --json --model ${quoteShell( - codexModel(workspace.claim), + `codex exec --json${codexModelFlag( + workspace.claim, )} --dangerously-bypass-approvals-and-sandbox --cd ${quoteShell( codexContainerRepo, )} ${quoteShell(prompt)}`, @@ -734,7 +737,7 @@ const runCodexTurn = async (args: { if (line.trim()) { await appendEvent( workspace.claim.job._id, - 'debug', + 'info', 'plan', truncate(line, 10_000), ); @@ -1439,9 +1442,10 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => { }); await appendEvent(claim.job._id, 'info', 'plan', 'Sending message to agent.'); + let assistantMessageId: Id<'agentJobMessages'> | undefined; try { workspace.agentTurnActive = true; - const assistantMessageId = await appendMessage({ + assistantMessageId = await appendMessage({ jobId: claim.job._id, role: 'assistant', status: 'streaming', @@ -1449,6 +1453,12 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => { }); const assistantContent = { value: '' }; if (isCodexLoginProfile(claim)) { + await appendEvent( + claim.job._id, + 'info', + 'plan', + 'Starting Codex CLI turn with the configured login profile.', + ); await runCodexTurn({ workspace, prompt, @@ -1456,6 +1466,12 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => { assistantContent, }); } else if (env.runtime === 'docker') { + await appendEvent( + claim.job._id, + 'info', + 'plan', + 'Starting OpenCode server turn with the configured API provider.', + ); await runOpenCodeTurn({ workspace, prompt, @@ -1532,12 +1548,20 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => { 'cleanup', truncate(redact(message), 20_000), ); - await appendMessage({ - jobId: claim.job._id, - role: 'assistant', - status: 'failed', - content: truncate(redact(message), 40_000), - }); + if (assistantMessageId) { + await updateMessage({ + messageId: assistantMessageId, + status: 'failed', + content: truncate(redact(message), 40_000), + }); + } else { + await appendMessage({ + jobId: claim.job._id, + role: 'assistant', + status: 'failed', + content: truncate(redact(message), 40_000), + }); + } throw error; } }; diff --git a/apps/agent-worker/tests/unit/agent-events.test.ts b/apps/agent-worker/tests/unit/agent-events.test.ts index 7b61c71..e11063f 100644 --- a/apps/agent-worker/tests/unit/agent-events.test.ts +++ b/apps/agent-worker/tests/unit/agent-events.test.ts @@ -26,6 +26,36 @@ describe('agent event normalization', () => { ).toContainEqual({ kind: 'assistant_delta', content: 'hello' }); }); + test('normalizes Codex CLI thread lifecycle events', () => { + expect( + normalizeCodexJsonLine( + JSON.stringify({ + type: 'thread.started', + thread_id: '019ef701-f7d7-76a0-a96b-15c059631dd9', + }), + ), + ).toContainEqual({ + kind: 'session', + sessionId: '019ef701-f7d7-76a0-a96b-15c059631dd9', + }); + + expect( + normalizeCodexJsonLine( + JSON.stringify({ + type: 'turn.started', + }), + ), + ).toContainEqual({ kind: 'status', status: 'turn.started' }); + + expect( + normalizeCodexJsonLine( + JSON.stringify({ + type: 'turn.completed', + }), + ), + ).toContainEqual({ kind: 'assistant_completed' }); + }); + test('normalizes Codex command and file events', () => { expect( normalizeCodexJsonLine( diff --git a/apps/next/src/components/agent-workspace/agent-thread.tsx b/apps/next/src/components/agent-workspace/agent-thread.tsx index 245ca80..55c9c8c 100644 --- a/apps/next/src/components/agent-workspace/agent-thread.tsx +++ b/apps/next/src/components/agent-workspace/agent-thread.tsx @@ -21,7 +21,7 @@ type ActivityFilter = 'all' | 'chat' | 'activity' | 'files' | 'errors'; const filters: { value: ActivityFilter; label: string }[] = [ { value: 'all', label: 'All' }, { value: 'chat', label: 'Chat' }, - { value: 'activity', label: 'Activity' }, + { value: 'activity', label: 'Tools' }, { value: 'files', label: 'Files' }, { value: 'errors', label: 'Errors' }, ]; @@ -68,22 +68,42 @@ export const AgentThread = ({ const [replying, setReplying] = useState(); const [filter, setFilter] = useState('all'); const scrollRef = useRef(null); - const failedMessages = useMemo( - () => messages.filter((message) => message.status === 'failed'), + const chatMessages = useMemo( + () => + messages.filter((message) => { + if (message.role === 'system') return false; + if (message.role === 'tool') return false; + if (message.role === 'assistant' && !message.content.trim()) { + return message.status === 'streaming' && agentTurnActive; + } + return true; + }), + [agentTurnActive, messages], + ); + const toolMessages = useMemo( + () => + messages.filter( + (message) => message.role === 'tool' && message.content.trim(), + ), [messages], ); + const failedMessages = useMemo( + () => chatMessages.filter((message) => message.status === 'failed'), + [chatMessages], + ); + const errorEvents = useMemo( + () => events.filter((event) => event.level === 'error'), + [events], + ); const visibleMessages = filter === 'activity' || filter === 'files' || filter === 'errors' ? filter === 'errors' ? failedMessages : [] - : messages; - const visibleEvents = - filter === 'chat' || filter === 'files' - ? [] - : filter === 'errors' - ? events.filter((event) => event.level === 'error') - : events; + : chatMessages; + const visibleToolMessages = + filter === 'all' || filter === 'activity' ? toolMessages : []; + const visibleEvents = filter === 'errors' ? errorEvents : []; const visibleChanges = filter === 'chat' || filter === 'activity' || filter === 'errors' ? [] @@ -260,15 +280,19 @@ export const AgentThread = ({ } >
- {message.role} - - {message.status} - + + {message.role === 'assistant' ? 'Agent' : 'You'} + + {message.status === 'failed' || message.status === 'streaming' ? ( + + {message.status === 'streaming' ? 'Working' : 'Failed'} + + ) : null}

{message.content || @@ -276,6 +300,23 @@ export const AgentThread = ({

))} + {visibleToolMessages.map((message) => ( +
+
+ + Tool + {message.status === 'streaming' ? ( + Running + ) : null} +
+
+              {message.content}
+            
+
+ ))} {visibleChanges.map((change) => (
))} {visibleMessages.length === 0 && + visibleToolMessages.length === 0 && visibleEvents.length === 0 && visibleChanges.length === 0 && (filter !== 'chat' || interactions.length === 0) ? ( diff --git a/apps/next/src/components/agent-workspace/agent-workspace-shell.tsx b/apps/next/src/components/agent-workspace/agent-workspace-shell.tsx index 6455dd6..3965e03 100644 --- a/apps/next/src/components/agent-workspace/agent-workspace-shell.tsx +++ b/apps/next/src/components/agent-workspace/agent-workspace-shell.tsx @@ -114,9 +114,14 @@ export const AgentWorkspaceShell = ({ jobId }: { jobId: Id<'agentJobs'> }) => { const response = await fetch(`/api/agent-jobs/${jobId}/agent/status`); if (!response.ok) { setAgentTurnActive(false); + const body = await response.text(); + if (body.includes('workspace is not active')) { + setWorkspaceError(body); + } return; } const data = (await response.json()) as { active?: boolean }; + setWorkspaceError(undefined); setAgentTurnActive(Boolean(data.active)); }, [jobId]); diff --git a/apps/next/tests/component/render.test.tsx b/apps/next/tests/component/render.test.tsx index edd7df1..b6e8cc1 100644 --- a/apps/next/tests/component/render.test.tsx +++ b/apps/next/tests/component/render.test.tsx @@ -115,6 +115,106 @@ describe('component test harness', () => { expect(onOpenFile).toHaveBeenCalledWith('apps/web/auth.ts'); }); + it('keeps the workspace thread focused on user, agent, and tool content', () => { + render( + , + ); + + expect(screen.queryByText('Workspace is ready.')).not.toBeInTheDocument(); + expect( + screen.queryByText('Sending message to agent.'), + ).not.toBeInTheDocument(); + expect(screen.queryByText('Assistant')).not.toBeInTheDocument(); + expect( + screen.getByText('Use Authentik as the only provider.'), + ).toBeInTheDocument(); + expect( + screen.getByText('I found the Auth.js provider configuration.'), + ).toBeInTheDocument(); + expect(screen.getByText('rg Authentik')).toBeInTheDocument(); + }); + it('renders thread workspaces on the canonical thread route', () => { mockUseParams.mockReturnValue({ threadId: 'thread-1' }); mockUseQuery.mockReturnValue({