Fix worker

This commit is contained in:
Gabriel Brown
2026-06-23 20:35:01 -04:00
parent 5567a4be95
commit c3d265d428
7 changed files with 249 additions and 35 deletions
+9 -2
View File
@@ -139,9 +139,16 @@ export const normalizeCodexJsonLine = (
const event = asRecord(parsed);
if (!event) return [];
const type = stringify(event.type ?? event.event);
const id = event.id ?? event.session_id ?? event.sessionId;
const id =
event.id ??
event.session_id ??
event.sessionId ??
event.thread_id ??
event.threadId;
const sessionId =
typeof id === 'string' && type.toLowerCase().includes('session')
typeof id === 'string' &&
(type.toLowerCase().includes('session') ||
type.toLowerCase().includes('thread.started'))
? id
: undefined;
const events: NormalizedAgentEvent[] = sessionId
+8 -2
View File
@@ -192,6 +192,7 @@ export const streamInJobContainer = async (args: {
let stdoutBuffer = '';
let stderrBuffer = '';
const output: string[] = [];
let lineHandlers = Promise.resolve();
const consume = async (
chunk: Buffer,
source: 'stdout' | 'stderr',
@@ -210,12 +211,17 @@ export const streamInJobContainer = async (args: {
}
};
subprocess.stdout.on('data', (chunk: Buffer) => {
void consume(chunk, 'stdout', args.onStdoutLine);
lineHandlers = lineHandlers.then(() =>
consume(chunk, 'stdout', args.onStdoutLine),
);
});
subprocess.stderr.on('data', (chunk: Buffer) => {
void consume(chunk, 'stderr', args.onStderrLine);
lineHandlers = lineHandlers.then(() =>
consume(chunk, 'stderr', args.onStderrLine),
);
});
const result = await subprocess;
await lineHandlers;
if (stdoutBuffer && args.onStdoutLine) {
await args.onStdoutLine(args.redact(stdoutBuffer));
}
+36 -12
View File
@@ -430,6 +430,9 @@ const codexModel = (claim: Claim) => {
return model.includes('/') ? model.split('/').at(-1) ?? model : model;
};
const codexModelFlag = (claim: Claim) =>
isCodexLoginProfile(claim) ? '' : ` --model ${quoteShell(codexModel(claim))}`;
const writeJsonFile = async (filePath: string, content: string) => {
let normalized = content.trim();
try {
@@ -694,15 +697,15 @@ const runCodexTurn = async (args: {
});
const command = workspace.codexSessionId
? commandToShell(
`codex exec resume --json --model ${quoteShell(
codexModel(workspace.claim),
`codex exec resume --json${codexModelFlag(
workspace.claim,
)} --dangerously-bypass-approvals-and-sandbox ${quoteShell(
workspace.codexSessionId,
)} ${quoteShell(prompt)}`,
)
: commandToShell(
`codex exec --json --model ${quoteShell(
codexModel(workspace.claim),
`codex exec --json${codexModelFlag(
workspace.claim,
)} --dangerously-bypass-approvals-and-sandbox --cd ${quoteShell(
codexContainerRepo,
)} ${quoteShell(prompt)}`,
@@ -734,7 +737,7 @@ const runCodexTurn = async (args: {
if (line.trim()) {
await appendEvent(
workspace.claim.job._id,
'debug',
'info',
'plan',
truncate(line, 10_000),
);
@@ -1439,9 +1442,10 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
});
await appendEvent(claim.job._id, 'info', 'plan', 'Sending message to agent.');
let assistantMessageId: Id<'agentJobMessages'> | undefined;
try {
workspace.agentTurnActive = true;
const assistantMessageId = await appendMessage({
assistantMessageId = await appendMessage({
jobId: claim.job._id,
role: 'assistant',
status: 'streaming',
@@ -1449,6 +1453,12 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
});
const assistantContent = { value: '' };
if (isCodexLoginProfile(claim)) {
await appendEvent(
claim.job._id,
'info',
'plan',
'Starting Codex CLI turn with the configured login profile.',
);
await runCodexTurn({
workspace,
prompt,
@@ -1456,6 +1466,12 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
assistantContent,
});
} else if (env.runtime === 'docker') {
await appendEvent(
claim.job._id,
'info',
'plan',
'Starting OpenCode server turn with the configured API provider.',
);
await runOpenCodeTurn({
workspace,
prompt,
@@ -1532,12 +1548,20 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
'cleanup',
truncate(redact(message), 20_000),
);
await appendMessage({
jobId: claim.job._id,
role: 'assistant',
status: 'failed',
content: truncate(redact(message), 40_000),
});
if (assistantMessageId) {
await updateMessage({
messageId: assistantMessageId,
status: 'failed',
content: truncate(redact(message), 40_000),
});
} else {
await appendMessage({
jobId: claim.job._id,
role: 'assistant',
status: 'failed',
content: truncate(redact(message), 40_000),
});
}
throw error;
}
};
@@ -26,6 +26,36 @@ describe('agent event normalization', () => {
).toContainEqual({ kind: 'assistant_delta', content: 'hello' });
});
test('normalizes Codex CLI thread lifecycle events', () => {
expect(
normalizeCodexJsonLine(
JSON.stringify({
type: 'thread.started',
thread_id: '019ef701-f7d7-76a0-a96b-15c059631dd9',
}),
),
).toContainEqual({
kind: 'session',
sessionId: '019ef701-f7d7-76a0-a96b-15c059631dd9',
});
expect(
normalizeCodexJsonLine(
JSON.stringify({
type: 'turn.started',
}),
),
).toContainEqual({ kind: 'status', status: 'turn.started' });
expect(
normalizeCodexJsonLine(
JSON.stringify({
type: 'turn.completed',
}),
),
).toContainEqual({ kind: 'assistant_completed' });
});
test('normalizes Codex command and file events', () => {
expect(
normalizeCodexJsonLine(