Fix agent empty-response in prod: workdir mount, image freshness, error surfacing
- Pin codex@0.142.0 + opencode-ai@1.17.9 in the job image (was @latest, causing dev/prod drift) - Worker now s the job image once per process so prod stops running a stale Codex - Surface Codex error/turn.failed events instead of swallowing them, so the real failure reason is reported rather than 'no assistant response' - Harden the Codex JSON parser to also handle the legacy msg-wrapped shape - Fix the docker-in-docker workdir: bind-mount identical host:container path and set SPOON_AGENT_HOST_WORKDIR (named volume can't be mounted by sibling job containers) - Add docs/compose.prod.yml as a documented reference deployment
This commit is contained in:
@@ -140,6 +140,57 @@ const isCodexConfigWarning = (message: string) =>
|
||||
message.includes('`[features].codex_hooks` is deprecated') ||
|
||||
message.includes('Use `[features].hooks` instead');
|
||||
|
||||
// Handles the legacy `codex-rs` `{ id, msg: { type, ... } }` envelope.
|
||||
const normalizeCodexMsgEvent = (
|
||||
msg: Record<string, unknown>,
|
||||
envelope: Record<string, unknown>,
|
||||
): NormalizedAgentEvent[] => {
|
||||
const msgType = stringify(msg.type).toLowerCase();
|
||||
const events: NormalizedAgentEvent[] = [];
|
||||
if (msgType === 'session_configured' || msgType.includes('session')) {
|
||||
const sessionId = stringify(
|
||||
msg.session_id ?? envelope.session_id ?? envelope.id,
|
||||
);
|
||||
if (sessionId) events.push({ kind: 'session', sessionId });
|
||||
}
|
||||
if (msgType === 'agent_message_delta' || msgType === 'agent_reasoning_delta') {
|
||||
const delta = stringify(msg.delta ?? msg.text);
|
||||
if (delta) events.push({ kind: 'assistant_delta', content: delta });
|
||||
}
|
||||
if (msgType === 'agent_message') {
|
||||
const text = stringify(msg.message ?? msg.text);
|
||||
if (text) {
|
||||
events.push({ kind: 'assistant_delta', content: `${text.trim()}\n\n` });
|
||||
}
|
||||
}
|
||||
if (msgType === 'exec_command_begin') {
|
||||
events.push({
|
||||
kind: 'tool_started',
|
||||
name: 'Command',
|
||||
input: commandString(msg.command),
|
||||
});
|
||||
}
|
||||
if (msgType === 'exec_command_end') {
|
||||
events.push({
|
||||
kind: 'tool_completed',
|
||||
name: 'Command',
|
||||
output: toolOutputFromRecord(msg),
|
||||
});
|
||||
}
|
||||
if (msgType === 'error' || msgType === 'turn_failed' || msgType === 'task_error') {
|
||||
const message = stringify(msg.message ?? msg.error ?? msg);
|
||||
if (isCodexConfigWarning(message)) {
|
||||
events.push({ kind: 'status', status: message });
|
||||
} else {
|
||||
events.push({ kind: 'error', message });
|
||||
}
|
||||
}
|
||||
if (msgType === 'task_complete' || msgType === 'turn_complete') {
|
||||
events.push({ kind: 'assistant_completed' });
|
||||
}
|
||||
return events;
|
||||
};
|
||||
|
||||
export const normalizeCodexJsonLine = (
|
||||
line: string,
|
||||
): NormalizedAgentEvent[] => {
|
||||
@@ -152,6 +203,15 @@ export const normalizeCodexJsonLine = (
|
||||
}
|
||||
const event = asRecord(parsed);
|
||||
if (!event) return [];
|
||||
// Older Codex (`codex-rs`) protocol wraps events as `{ id, msg: { type, ... } }`
|
||||
// instead of the newer `{ type, item: { ... } }` shape. Unwrap it so version
|
||||
// skew between the pinned image and an upstream build degrades gracefully
|
||||
// instead of silently producing an empty assistant response.
|
||||
const msg = asRecord(event.msg);
|
||||
if (msg) {
|
||||
const msgEvents = normalizeCodexMsgEvent(msg, event);
|
||||
if (msgEvents.length > 0) return msgEvents;
|
||||
}
|
||||
const type = stringify(event.type ?? event.event);
|
||||
const id =
|
||||
event.id ??
|
||||
|
||||
@@ -18,6 +18,26 @@ const networkArgs = () => (env.network ? ['--network', env.network] : []);
|
||||
|
||||
const containerRuntime = () => env.containerRuntime;
|
||||
|
||||
// `docker run` reuses a stale local `:latest` forever, so without an explicit
|
||||
// pull the job image never updates in production. Pull once per worker process
|
||||
// (i.e. once per deploy/restart) so a fresh worker always runs a fresh job
|
||||
// image. Best-effort: if the registry is unreachable we fall back to whatever
|
||||
// image is present locally rather than failing the job.
|
||||
let jobImagePullPromise: Promise<void> | undefined;
|
||||
export const ensureJobImagePulled = () => {
|
||||
jobImagePullPromise ??= (async () => {
|
||||
try {
|
||||
await execa(containerRuntime(), ['pull', env.jobImage], {
|
||||
reject: false,
|
||||
stdin: 'ignore',
|
||||
});
|
||||
} catch {
|
||||
// Ignore: keep running with the locally cached image.
|
||||
}
|
||||
})();
|
||||
return jobImagePullPromise;
|
||||
};
|
||||
|
||||
const hostWorkspacePath = (workdir: string) => {
|
||||
if (!env.hostWorkdir) return workdir;
|
||||
const workerRoot = path.resolve(env.workdir);
|
||||
@@ -46,6 +66,7 @@ export const runInJobContainer = async (args: {
|
||||
redact: (value: string) => string;
|
||||
timeoutMs: number;
|
||||
}): Promise<CommandResult> => {
|
||||
await ensureJobImagePulled();
|
||||
const result = await execa(
|
||||
containerRuntime(),
|
||||
[
|
||||
@@ -84,6 +105,7 @@ export const startWorkspaceContainer = async (args: {
|
||||
command?: string[];
|
||||
publishTcpPort?: number;
|
||||
}) => {
|
||||
await ensureJobImagePulled();
|
||||
await execa(
|
||||
containerRuntime(),
|
||||
[
|
||||
@@ -180,6 +202,7 @@ export const streamInJobContainer = async (args: {
|
||||
onStdoutLine?: (line: string) => Promise<void>;
|
||||
onStderrLine?: (line: string) => Promise<void>;
|
||||
}): Promise<CommandResult> => {
|
||||
await ensureJobImagePulled();
|
||||
const subprocess = execa(
|
||||
containerRuntime(),
|
||||
[
|
||||
|
||||
@@ -111,6 +111,10 @@ type ActiveWorkspace = {
|
||||
agentTurnActive?: boolean;
|
||||
resolveTurn?: () => void;
|
||||
lastRecordedDiffSignature?: string;
|
||||
// Captures the most recent Codex `error`/`turn.failed` event for the active
|
||||
// turn so the failure surfaces the real reason instead of a generic
|
||||
// "no assistant response" message.
|
||||
codexTurnError?: string;
|
||||
};
|
||||
|
||||
type FileTreeNode = {
|
||||
@@ -599,6 +603,11 @@ const handleAgentEvent = async (args: {
|
||||
);
|
||||
return;
|
||||
}
|
||||
// event.kind === 'error'
|
||||
// Record the real Codex failure reason on the workspace so the turn can
|
||||
// surface it (Codex can emit `error`/`turn.failed` events and still exit 0
|
||||
// in some versions, which otherwise looks like an empty response).
|
||||
workspace.codexTurnError = event.message;
|
||||
await appendEvent(jobId, 'error', 'plan', truncate(event.message, 20_000));
|
||||
};
|
||||
|
||||
@@ -683,6 +692,12 @@ const workspaceCurrentContent = new Map<
|
||||
}
|
||||
>();
|
||||
|
||||
// Reading through a function boundary prevents TypeScript from narrowing the
|
||||
// field to `undefined` after the synchronous reset in `runCodexTurn`; it is set
|
||||
// asynchronously by the stream event handler.
|
||||
const readCodexTurnError = (workspace: ActiveWorkspace) =>
|
||||
workspace.codexTurnError;
|
||||
|
||||
const runCodexTurn = async (args: {
|
||||
workspace: ActiveWorkspace;
|
||||
prompt: string;
|
||||
@@ -691,6 +706,7 @@ const runCodexTurn = async (args: {
|
||||
}) => {
|
||||
const { workspace, prompt, assistantMessageId, assistantContent } = args;
|
||||
workspace.runtimeMode = 'codex_exec';
|
||||
workspace.codexTurnError = undefined;
|
||||
await setRuntimeSession({
|
||||
jobId: workspace.claim.job._id,
|
||||
agentRuntimeMode: 'codex_exec',
|
||||
@@ -813,6 +829,15 @@ const runCodexTurn = async (args: {
|
||||
);
|
||||
}
|
||||
}
|
||||
// Codex can report a failure via a JSON `error`/`turn.failed` event while
|
||||
// still exiting 0. If the turn produced no assistant text but did report an
|
||||
// error, surface that real reason rather than a generic empty response.
|
||||
// Read through a helper so it is not narrowed away by the reset above (the
|
||||
// field is mutated asynchronously inside the stream handler).
|
||||
const codexTurnError = readCodexTurnError(workspace);
|
||||
if (!assistantContent.value.trim() && codexTurnError) {
|
||||
throw new Error(`codex failed:\n${codexTurnError}`);
|
||||
}
|
||||
};
|
||||
|
||||
const runOpenCodeTurn = async (args: {
|
||||
@@ -1593,7 +1618,9 @@ export const sendWorkspaceMessage = async (
|
||||
`Codex completed without producing an assistant response for job ${claim.job._id}.`,
|
||||
);
|
||||
throw new Error(
|
||||
'Codex completed without producing an assistant response.',
|
||||
workspace.codexTurnError
|
||||
? `Codex failed: ${workspace.codexTurnError}`
|
||||
: 'Codex completed without producing an assistant response.',
|
||||
);
|
||||
}
|
||||
await updateMessage({
|
||||
|
||||
@@ -26,6 +26,32 @@ describe('agent event normalization', () => {
|
||||
).toContainEqual({ kind: 'assistant_delta', content: 'hello' });
|
||||
});
|
||||
|
||||
test('normalizes legacy codex-rs msg-wrapped events', () => {
|
||||
expect(
|
||||
normalizeCodexJsonLine(
|
||||
JSON.stringify({
|
||||
id: '0',
|
||||
msg: { type: 'agent_message', message: 'hello there' },
|
||||
}),
|
||||
),
|
||||
).toContainEqual({ kind: 'assistant_delta', content: 'hello there\n\n' });
|
||||
|
||||
expect(
|
||||
normalizeCodexJsonLine(
|
||||
JSON.stringify({
|
||||
id: '1',
|
||||
msg: { type: 'error', message: 'usage limit reached' },
|
||||
}),
|
||||
),
|
||||
).toContainEqual({ kind: 'error', message: 'usage limit reached' });
|
||||
|
||||
expect(
|
||||
normalizeCodexJsonLine(
|
||||
JSON.stringify({ id: '2', msg: { type: 'task_complete' } }),
|
||||
),
|
||||
).toContainEqual({ kind: 'assistant_completed' });
|
||||
});
|
||||
|
||||
test('normalizes Codex CLI thread lifecycle events', () => {
|
||||
expect(
|
||||
normalizeCodexJsonLine(
|
||||
|
||||
Reference in New Issue
Block a user