fix worker forreal
This commit is contained in:
@@ -82,8 +82,10 @@ const toolNameFromRecord = (record: Record<string, unknown> | null) =>
|
||||
record?.toolName ??
|
||||
record?.name ??
|
||||
record?.function ??
|
||||
record?.type ??
|
||||
record?.command ??
|
||||
(stringify(record?.type).toLowerCase().includes('exec') ||
|
||||
record?.command
|
||||
? 'Command'
|
||||
: record?.type) ??
|
||||
'tool',
|
||||
);
|
||||
|
||||
@@ -103,9 +105,15 @@ const toolOutputFromRecord = (
|
||||
) =>
|
||||
stringify(
|
||||
record?.output ??
|
||||
record?.aggregated_output ??
|
||||
record?.stdout ??
|
||||
record?.stderr ??
|
||||
record?.result ??
|
||||
record?.content ??
|
||||
record?.text ??
|
||||
(record?.exit_code !== undefined
|
||||
? `exit code: ${stringify(record.exit_code)}`
|
||||
: undefined) ??
|
||||
fallback,
|
||||
);
|
||||
|
||||
@@ -121,11 +129,17 @@ const recordLooksLikeTool = (
|
||||
recordType.includes('tool') ||
|
||||
recordType.includes('function_call') ||
|
||||
recordType.includes('local_shell_call') ||
|
||||
recordType.includes('exec_command') ||
|
||||
recordType.includes('command') ||
|
||||
recordType.includes('mcp') ||
|
||||
Boolean(record?.tool ?? record?.tool_name ?? record?.name)
|
||||
Boolean(record?.tool ?? record?.tool_name ?? record?.name ?? record?.command)
|
||||
);
|
||||
};
|
||||
|
||||
const isCodexConfigWarning = (message: string) =>
|
||||
message.includes('`[features].codex_hooks` is deprecated') ||
|
||||
message.includes('Use `[features].hooks` instead');
|
||||
|
||||
export const normalizeCodexJsonLine = (
|
||||
line: string,
|
||||
): NormalizedAgentEvent[] => {
|
||||
@@ -205,13 +219,22 @@ export const normalizeCodexJsonLine = (
|
||||
itemType.includes('message') ||
|
||||
itemType.includes('agent_message'))
|
||||
) {
|
||||
events.push({ kind: 'assistant_delta', content: text });
|
||||
events.push({
|
||||
kind: 'assistant_delta',
|
||||
content: itemType.includes('agent_message') ? `${text.trim()}\n\n` : text,
|
||||
externalMessageId: stringify(item?.id ?? event.id),
|
||||
});
|
||||
}
|
||||
const error = event.error ?? item?.error;
|
||||
if (error || itemType === 'error') {
|
||||
const message = stringify(error ?? item?.message ?? event.message);
|
||||
if (isCodexConfigWarning(message)) {
|
||||
events.push({ kind: 'status', status: message });
|
||||
return events;
|
||||
}
|
||||
events.push({
|
||||
kind: 'error',
|
||||
message: stringify(error ?? item?.message ?? event.message),
|
||||
message,
|
||||
});
|
||||
}
|
||||
const command =
|
||||
|
||||
@@ -126,12 +126,42 @@ export const getDiff = async (
|
||||
export const getWorktreeDiff = async (
|
||||
repoDir: string,
|
||||
redact: (value: string) => string,
|
||||
) =>
|
||||
await run('git', ['diff', '--', '.'], {
|
||||
) => {
|
||||
const trackedDiff = await run('git', ['diff', '--', '.'], {
|
||||
cwd: repoDir,
|
||||
redact,
|
||||
timeoutMs: 60_000,
|
||||
});
|
||||
const untracked = await run(
|
||||
'git',
|
||||
['ls-files', '--others', '--exclude-standard'],
|
||||
{
|
||||
cwd: repoDir,
|
||||
redact,
|
||||
timeoutMs: 60_000,
|
||||
},
|
||||
);
|
||||
const untrackedDiffs: string[] = [];
|
||||
for (const filePath of untracked.output.split('\n').filter(Boolean)) {
|
||||
const diff = await run(
|
||||
'git',
|
||||
['diff', '--no-index', '--', '/dev/null', filePath],
|
||||
{
|
||||
cwd: repoDir,
|
||||
redact,
|
||||
timeoutMs: 60_000,
|
||||
},
|
||||
);
|
||||
if (diff.output.trim()) untrackedDiffs.push(diff.output);
|
||||
}
|
||||
return {
|
||||
exitCode:
|
||||
trackedDiff.exitCode === 0 && untracked.exitCode === 0 ? 0 : 1,
|
||||
output: [trackedDiff.output, ...untrackedDiffs]
|
||||
.filter((part) => part.trim())
|
||||
.join('\n'),
|
||||
};
|
||||
};
|
||||
|
||||
export const getStatus = async (
|
||||
repoDir: string,
|
||||
|
||||
@@ -54,6 +54,7 @@ export const runInJobContainer = async (args: {
|
||||
{
|
||||
all: true,
|
||||
reject: false,
|
||||
stdin: 'ignore',
|
||||
timeout: args.timeoutMs,
|
||||
},
|
||||
);
|
||||
@@ -102,7 +103,7 @@ export const startWorkspaceContainer = async (args: {
|
||||
env.jobImage,
|
||||
...(args.command ?? ['sleep', 'infinity']),
|
||||
],
|
||||
{ all: true },
|
||||
{ all: true, stdin: 'ignore' },
|
||||
);
|
||||
return {
|
||||
containerId: result.stdout.trim(),
|
||||
@@ -117,7 +118,7 @@ const getPublishedPort = async (containerName: string, containerPort: number) =>
|
||||
const result = await execa(
|
||||
containerRuntime(),
|
||||
['port', containerName, `${containerPort}/tcp`],
|
||||
{ all: true, reject: false },
|
||||
{ all: true, reject: false, stdin: 'ignore' },
|
||||
);
|
||||
const output = result.all.trim();
|
||||
const match = /:(\d+)\s*$/.exec(output);
|
||||
@@ -147,6 +148,7 @@ export const execInWorkspaceContainer = async (args: {
|
||||
{
|
||||
all: true,
|
||||
reject: false,
|
||||
stdin: 'ignore',
|
||||
timeout: args.timeoutMs,
|
||||
},
|
||||
);
|
||||
@@ -186,6 +188,7 @@ export const streamInJobContainer = async (args: {
|
||||
{
|
||||
all: true,
|
||||
reject: false,
|
||||
stdin: 'ignore',
|
||||
timeout: args.timeoutMs,
|
||||
},
|
||||
);
|
||||
@@ -220,7 +223,19 @@ export const streamInJobContainer = async (args: {
|
||||
consume(chunk, 'stderr', args.onStderrLine),
|
||||
);
|
||||
});
|
||||
const result = await subprocess;
|
||||
let result: Awaited<typeof subprocess>;
|
||||
try {
|
||||
result = await subprocess;
|
||||
} catch (error) {
|
||||
await lineHandlers;
|
||||
const outputText = output.join('');
|
||||
const message =
|
||||
error instanceof Error ? error.message : 'Container command failed.';
|
||||
return {
|
||||
exitCode: 1,
|
||||
output: args.redact(`${outputText}${outputText ? '\n' : ''}${message}`),
|
||||
};
|
||||
}
|
||||
await lineHandlers;
|
||||
if (stdoutBuffer && args.onStdoutLine) {
|
||||
await args.onStdoutLine(args.redact(stdoutBuffer));
|
||||
|
||||
@@ -110,6 +110,7 @@ type ActiveWorkspace = {
|
||||
codexSessionId?: string;
|
||||
agentTurnActive?: boolean;
|
||||
resolveTurn?: () => void;
|
||||
lastRecordedDiffSignature?: string;
|
||||
};
|
||||
|
||||
type FileTreeNode = {
|
||||
@@ -430,8 +431,8 @@ const codexModel = (claim: Claim) => {
|
||||
return model.includes('/') ? model.split('/').at(-1) ?? model : model;
|
||||
};
|
||||
|
||||
const codexModelFlag = (claim: Claim) =>
|
||||
isCodexLoginProfile(claim) ? '' : ` --model ${quoteShell(codexModel(claim))}`;
|
||||
const codexModelArgs = (claim: Claim) =>
|
||||
isCodexLoginProfile(claim) ? [] : ['--model', codexModel(claim)];
|
||||
|
||||
const writeJsonFile = async (filePath: string, content: string) => {
|
||||
let normalized = content.trim();
|
||||
@@ -696,20 +697,26 @@ const runCodexTurn = async (args: {
|
||||
codexSessionId: workspace.codexSessionId,
|
||||
});
|
||||
const command = workspace.codexSessionId
|
||||
? commandToShell(
|
||||
`codex exec resume --json${codexModelFlag(
|
||||
workspace.claim,
|
||||
)} --dangerously-bypass-approvals-and-sandbox ${quoteShell(
|
||||
? [
|
||||
'codex',
|
||||
'exec',
|
||||
'resume',
|
||||
'--json',
|
||||
...codexModelArgs(workspace.claim),
|
||||
'--dangerously-bypass-approvals-and-sandbox',
|
||||
workspace.codexSessionId,
|
||||
)} ${quoteShell(prompt)}`,
|
||||
)
|
||||
: commandToShell(
|
||||
`codex exec --json${codexModelFlag(
|
||||
workspace.claim,
|
||||
)} --dangerously-bypass-approvals-and-sandbox --cd ${quoteShell(
|
||||
prompt,
|
||||
]
|
||||
: [
|
||||
'codex',
|
||||
'exec',
|
||||
'--json',
|
||||
...codexModelArgs(workspace.claim),
|
||||
'--dangerously-bypass-approvals-and-sandbox',
|
||||
'--cd',
|
||||
codexContainerRepo,
|
||||
)} ${quoteShell(prompt)}`,
|
||||
);
|
||||
prompt,
|
||||
];
|
||||
const aiEnv = providerEnvironment(workspace.claim, codexContainerWorkspace);
|
||||
const secretEnv = Object.fromEntries(
|
||||
workspace.claim.secrets.map((secret) => [secret.name, secret.value]),
|
||||
@@ -734,12 +741,17 @@ const runCodexTurn = async (args: {
|
||||
}
|
||||
},
|
||||
onStderrLine: async (line) => {
|
||||
if (line.trim()) {
|
||||
const trimmed = line.trim();
|
||||
if (
|
||||
trimmed &&
|
||||
trimmed !== 'Reading additional input from stdin...' &&
|
||||
!trimmed.includes('`[features].codex_hooks` is deprecated')
|
||||
) {
|
||||
await appendEvent(
|
||||
workspace.claim.job._id,
|
||||
'info',
|
||||
'plan',
|
||||
truncate(line, 10_000),
|
||||
truncate(trimmed, 10_000),
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -1034,6 +1046,9 @@ const recordChangedFiles = async (
|
||||
workspace.repoDir,
|
||||
workspace.redact,
|
||||
);
|
||||
const signature = JSON.stringify({ diff, changes });
|
||||
if (signature === workspace.lastRecordedDiffSignature) return;
|
||||
workspace.lastRecordedDiffSignature = signature;
|
||||
for (const change of changes) {
|
||||
await recordWorkspaceChange({
|
||||
jobId: workspace.claim.job._id,
|
||||
@@ -1235,7 +1250,9 @@ const runClaim = async (claim: Claim) => {
|
||||
});
|
||||
await appendEvent(jobId, 'info', 'plan', 'Interactive workspace is ready.');
|
||||
|
||||
await sendWorkspaceMessage(jobId, systemPromptForJob(claim));
|
||||
await sendWorkspaceMessage(jobId, systemPromptForJob(claim), {
|
||||
recordUserMessage: false,
|
||||
});
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
await appendEvent(
|
||||
@@ -1428,18 +1445,24 @@ export const replyToInteraction = async (
|
||||
return { success: true };
|
||||
};
|
||||
|
||||
export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
|
||||
export const sendWorkspaceMessage = async (
|
||||
jobId: string,
|
||||
prompt: string,
|
||||
options: { recordUserMessage?: boolean } = {},
|
||||
) => {
|
||||
const workspace = resolveWorkspace(jobId);
|
||||
const { claim, redact } = workspace;
|
||||
if (workspace.agentTurnActive) {
|
||||
throw new Error('Wait for the current agent turn to finish or abort it.');
|
||||
}
|
||||
if (options.recordUserMessage ?? true) {
|
||||
await appendMessage({
|
||||
jobId: claim.job._id,
|
||||
role: 'user',
|
||||
status: 'completed',
|
||||
content: prompt,
|
||||
});
|
||||
}
|
||||
await appendEvent(claim.job._id, 'info', 'plan', 'Sending message to agent.');
|
||||
|
||||
let assistantMessageId: Id<'agentJobMessages'> | undefined;
|
||||
|
||||
@@ -95,7 +95,8 @@ describe('agent event normalization', () => {
|
||||
),
|
||||
).toContainEqual({
|
||||
kind: 'assistant_delta',
|
||||
content: 'I updated the auth provider.',
|
||||
content: 'I updated the auth provider.\n\n',
|
||||
externalMessageId: 'item-1',
|
||||
});
|
||||
|
||||
expect(
|
||||
@@ -125,6 +126,24 @@ describe('agent event normalization', () => {
|
||||
kind: 'error',
|
||||
message: '{\n "message": "request failed"\n}',
|
||||
});
|
||||
|
||||
expect(
|
||||
normalizeCodexJsonLine(
|
||||
JSON.stringify({
|
||||
type: 'item.completed',
|
||||
item: {
|
||||
id: 'item-warning',
|
||||
type: 'error',
|
||||
message:
|
||||
'`[features].codex_hooks` is deprecated. Use `[features].hooks` instead.',
|
||||
},
|
||||
}),
|
||||
),
|
||||
).toContainEqual({
|
||||
kind: 'status',
|
||||
status:
|
||||
'`[features].codex_hooks` is deprecated. Use `[features].hooks` instead.',
|
||||
});
|
||||
});
|
||||
|
||||
test('normalizes Codex tool item lifecycle events', () => {
|
||||
@@ -141,7 +160,7 @@ describe('agent event normalization', () => {
|
||||
),
|
||||
).toContainEqual({
|
||||
kind: 'tool_started',
|
||||
name: 'local_shell_call',
|
||||
name: 'Command',
|
||||
input: 'bash -lc rg Authentik',
|
||||
externalMessageId: 'tool-1',
|
||||
});
|
||||
@@ -160,10 +179,30 @@ describe('agent event normalization', () => {
|
||||
),
|
||||
).toContainEqual({
|
||||
kind: 'tool_completed',
|
||||
name: 'local_shell_call',
|
||||
name: 'Command',
|
||||
output: 'apps/web/auth.ts',
|
||||
externalMessageId: 'tool-1',
|
||||
});
|
||||
|
||||
expect(
|
||||
normalizeCodexJsonLine(
|
||||
JSON.stringify({
|
||||
type: 'item.completed',
|
||||
item: {
|
||||
id: 'tool-2',
|
||||
type: 'exec_command',
|
||||
command: 'cat package.json',
|
||||
aggregated_output: '{"scripts":{"build":"turbo build"}}',
|
||||
exit_code: 0,
|
||||
},
|
||||
}),
|
||||
),
|
||||
).toContainEqual({
|
||||
kind: 'tool_completed',
|
||||
name: 'Command',
|
||||
output: '{"scripts":{"build":"turbo build"}}',
|
||||
externalMessageId: 'tool-2',
|
||||
});
|
||||
});
|
||||
|
||||
test('normalizes OpenCode assistant, tool, and permission events', () => {
|
||||
|
||||
Reference in New Issue
Block a user