Add features & update project

This commit is contained in:
Gabriel Brown
2026-06-23 01:46:08 -04:00
parent 930fbf5965
commit fe72fc2957
39 changed files with 3106 additions and 178 deletions
+231
View File
@@ -0,0 +1,231 @@
export type NormalizedAgentEvent =
| { kind: 'assistant_delta'; content: string; externalMessageId?: string }
| {
kind: 'assistant_completed';
content?: string;
externalMessageId?: string;
}
| {
kind: 'tool_started';
name: string;
input?: string;
externalMessageId?: string;
}
| {
kind: 'tool_completed';
name: string;
output?: string;
externalMessageId?: string;
}
| { kind: 'file_edited'; path: string }
| {
kind: 'command_executed';
command: string;
exitCode?: number;
output?: string;
}
| {
kind: 'permission_requested';
externalRequestId: string;
title: string;
body: string;
metadata?: string;
}
| {
kind: 'question_requested';
externalRequestId: string;
title: string;
body: string;
options?: string[];
metadata?: string;
}
| { kind: 'session'; sessionId: string }
| { kind: 'status'; status: string; metadata?: string }
| { kind: 'error'; message: string; metadata?: string };
const stringify = (value: unknown) => {
if (typeof value === 'string') return value;
if (value === undefined || value === null) return '';
if (
typeof value === 'number' ||
typeof value === 'boolean' ||
typeof value === 'bigint'
) {
return value.toString();
}
try {
return JSON.stringify(value, null, 2);
} catch {
return '';
}
};
const asRecord = (value: unknown): Record<string, unknown> | null =>
value && typeof value === 'object'
? (value as Record<string, unknown>)
: null;
const textFromPart = (part: Record<string, unknown>) => {
const text = part.text ?? part.content ?? part.delta;
return typeof text === 'string' ? text : '';
};
export const normalizeCodexJsonLine = (
line: string,
): NormalizedAgentEvent[] => {
if (!line.trim()) return [];
let parsed: unknown;
try {
parsed = JSON.parse(line) as unknown;
} catch {
return [{ kind: 'status', status: line }];
}
const event = asRecord(parsed);
if (!event) return [];
const type = stringify(event.type ?? event.event);
const id = event.id ?? event.session_id ?? event.sessionId;
const sessionId =
typeof id === 'string' && type.toLowerCase().includes('session')
? id
: undefined;
const events: NormalizedAgentEvent[] = sessionId
? [{ kind: 'session', sessionId }]
: [];
const message = asRecord(event.message);
const item = asRecord(event.item);
const data = asRecord(event.data);
const part = asRecord(event.part);
const delta = event.delta ?? data?.delta;
if (typeof delta === 'string') {
events.push({ kind: 'assistant_delta', content: delta });
}
const text =
(part ? textFromPart(part) : '') ||
(message ? stringify(message.content ?? message.text) : '') ||
(item ? stringify(item.content ?? item.text) : '');
if (
text &&
(type.includes('message') ||
type.includes('response.output_text') ||
type.includes('agent_message'))
) {
events.push({ kind: 'assistant_delta', content: text });
}
const command = event.command ?? data?.command;
if (typeof command === 'string') {
events.push({
kind: 'command_executed',
command,
output: stringify(event.output ?? data?.output),
});
}
const file = event.file ?? event.path ?? data?.file ?? data?.path;
if (typeof file === 'string' && type.includes('file')) {
events.push({ kind: 'file_edited', path: file });
}
if (type.includes('error')) {
events.push({
kind: 'error',
message: stringify(event.message ?? event.error ?? data),
});
}
if (type.includes('completed') || type.includes('turn.done')) {
events.push({ kind: 'assistant_completed' });
}
if (events.length === 0) {
events.push({ kind: 'status', status: type || 'codex_event' });
}
return events;
};
export const normalizeOpenCodeEvent = (
input: unknown,
): NormalizedAgentEvent[] => {
const event = asRecord(input);
if (!event) return [];
const type = stringify(event.type);
const properties = asRecord(event.properties) ?? asRecord(event.data) ?? event;
const events: NormalizedAgentEvent[] = [];
const sessionId = properties.sessionID ?? properties.sessionId;
if (typeof sessionId === 'string' && type.includes('session')) {
events.push({ kind: 'session', sessionId });
}
if (type === 'message.part.delta') {
const part = asRecord(properties.part) ?? properties;
const text = textFromPart(part);
if (text) {
events.push({
kind: 'assistant_delta',
content: text,
externalMessageId: stringify(properties.messageID),
});
}
}
if (type === 'message.updated' || type === 'message.part.updated') {
const part = asRecord(properties.part);
const text = part ? textFromPart(part) : stringify(properties.message);
if (text) {
events.push({
kind: 'assistant_delta',
content: text,
externalMessageId: stringify(properties.messageID),
});
}
}
if (type.includes('tool.started')) {
events.push({
kind: 'tool_started',
name: stringify(properties.tool ?? properties.name ?? 'tool'),
input: stringify(properties.input),
externalMessageId: stringify(properties.messageID),
});
}
if (type.includes('tool.finished') || type.includes('tool.completed')) {
events.push({
kind: 'tool_completed',
name: stringify(properties.tool ?? properties.name ?? 'tool'),
output: stringify(properties.output ?? properties.result),
externalMessageId: stringify(properties.messageID),
});
}
if (type === 'file.edited') {
const file = properties.file;
if (typeof file === 'string') events.push({ kind: 'file_edited', path: file });
}
if (type === 'command.executed') {
events.push({
kind: 'command_executed',
command: stringify(properties.command),
output: stringify(properties.output),
});
}
if (type.includes('permission') && type.includes('asked')) {
events.push({
kind: 'permission_requested',
externalRequestId: stringify(properties.permissionID ?? properties.id),
title: 'Permission requested',
body: stringify(properties.permission ?? properties.message ?? properties),
metadata: stringify(properties),
});
}
if (type.includes('question') && type.includes('asked')) {
events.push({
kind: 'question_requested',
externalRequestId: stringify(properties.requestID ?? properties.id),
title: 'Agent question',
body: stringify(properties.question ?? properties.message ?? properties),
metadata: stringify(properties),
});
}
if (type === 'session.idle') events.push({ kind: 'assistant_completed' });
if (type === 'session.error') {
events.push({
kind: 'error',
message: stringify(properties.error ?? properties.message ?? properties),
});
}
if (events.length === 0 && type) {
events.push({ kind: 'status', status: type, metadata: stringify(properties) });
}
return events;
};
+8
View File
@@ -19,6 +19,14 @@ export const env = {
workerToken: requiredEnv('SPOON_WORKER_TOKEN'),
workerId: process.env.SPOON_AGENT_WORKER_ID?.trim() ?? 'local-worker',
runtime: process.env.SPOON_AGENT_RUNTIME?.trim() ?? 'docker',
containerRuntime:
process.env.SPOON_AGENT_CONTAINER_RUNTIME?.trim() ??
process.env.SPOON_CONTAINER_RUNTIME?.trim() ??
'docker',
containerAccess:
process.env.SPOON_AGENT_CONTAINER_ACCESS?.trim() === 'host_port'
? 'host_port'
: 'network',
jobImage:
process.env.SPOON_AGENT_JOB_IMAGE?.trim() ?? 'spoon-agent-job:latest',
workdir: process.env.SPOON_AGENT_WORKDIR?.trim() ?? '.local/agent-work',
+126
View File
@@ -0,0 +1,126 @@
import { createOpencodeClient } from '@opencode-ai/sdk';
import type { OpencodeClient } from '@opencode-ai/sdk';
import type { NormalizedAgentEvent } from './agent-events';
import { normalizeOpenCodeEvent } from './agent-events';
export type OpenCodeSession = {
client: OpencodeClient;
sessionId: string;
close: () => void;
};
const basicAuth = (username: string, password: string) =>
`Basic ${Buffer.from(`${username}:${password}`).toString('base64')}`;
const modelParts = (model: string) => {
const [rawProviderId, ...rest] = model.split('/');
const providerID =
rawProviderId && rawProviderId.length > 0 ? rawProviderId : 'openai';
const modelID = rest.length > 0 ? rest.join('/') : model;
return {
providerID,
modelID,
};
};
export const createOpenCodeSession = async (args: {
baseUrl: string;
password: string;
directory: string;
title: string;
onEvent: (event: NormalizedAgentEvent) => Promise<void>;
}) => {
const abortController = new AbortController();
const client = createOpencodeClient({
baseUrl: args.baseUrl,
directory: args.directory,
headers: {
authorization: basicAuth('opencode', args.password),
},
});
const created = await client.session.create({
query: { directory: args.directory },
body: { title: args.title },
});
if (!created.data) {
throw new Error('OpenCode session could not be created.');
}
const sessionId = created.data.id;
void (async () => {
const events = await client.event.subscribe({
signal: abortController.signal,
query: { directory: args.directory },
onSseEvent: (event) => {
for (const normalized of normalizeOpenCodeEvent(event.data)) {
void args.onEvent(normalized);
}
},
onSseError: (error) => {
void args.onEvent({
kind: 'error',
message: error instanceof Error ? error.message : String(error),
});
},
});
for await (const event of events.stream) {
for (const normalized of normalizeOpenCodeEvent(event)) {
await args.onEvent(normalized);
}
}
})().catch((error: unknown) => {
if (!abortController.signal.aborted) {
void args.onEvent({
kind: 'error',
message: error instanceof Error ? error.message : String(error),
});
}
});
return {
client,
sessionId,
close: () => abortController.abort(),
} satisfies OpenCodeSession;
};
export const promptOpenCodeSession = async (args: {
session: OpenCodeSession;
prompt: string;
model: string;
directory: string;
}) => {
const model = modelParts(args.model);
const result = await args.session.client.session.promptAsync({
path: { id: args.session.sessionId },
query: { directory: args.directory },
body: {
model,
parts: [{ type: 'text', text: args.prompt }],
},
});
if (result.error) {
throw new Error('OpenCode prompt was rejected.');
}
};
export const abortOpenCodeSession = async (session: OpenCodeSession) => {
await session.client.session.abort({
path: { id: session.sessionId },
});
};
export const replyOpenCodePermission = async (args: {
session: OpenCodeSession;
permissionId: string;
response: 'once' | 'always' | 'reject';
directory: string;
}) => {
const result = await args.session.client.postSessionIdPermissionsPermissionId({
path: { id: args.session.sessionId, permissionID: args.permissionId },
query: { directory: args.directory },
body: { response: args.response },
});
if (result.error) {
throw new Error('OpenCode permission response was rejected.');
}
};
+218 -9
View File
@@ -2,20 +2,30 @@ import { execa } from 'execa';
import { env } from '../env';
type CommandResult = {
exitCode: number;
output: string;
};
const environmentArgs = (environment: Record<string, string>) =>
Object.entries(environment).flatMap(([name, value]) => [
'-e',
`${name}=${value}`,
]);
const networkArgs = () => (env.network ? ['--network', env.network] : []);
const containerRuntime = () => env.containerRuntime;
export const runInJobContainer = async (args: {
workdir: string;
command: string[];
environment: Record<string, string>;
redact: (value: string) => string;
timeoutMs: number;
}) => {
const envArgs = Object.entries(args.environment).flatMap(([name, value]) => [
'-e',
`${name}=${value}`,
]);
const networkArgs = env.network ? ['--network', env.network] : [];
}): Promise<CommandResult> => {
const result = await execa(
'docker',
containerRuntime(),
[
'run',
'--rm',
@@ -23,8 +33,8 @@ export const runInJobContainer = async (args: {
'4g',
'--cpus',
'2',
...networkArgs,
...envArgs,
...networkArgs(),
...environmentArgs(args.environment),
'-v',
`${args.workdir}:/workspace`,
'-w',
@@ -43,3 +53,202 @@ export const runInJobContainer = async (args: {
output: args.redact(result.all),
};
};
export const startWorkspaceContainer = async (args: {
workdir: string;
containerName: string;
environment: Record<string, string>;
command?: string[];
publishTcpPort?: number;
}) => {
await execa(
containerRuntime(),
[
'rm',
'-f',
args.containerName,
],
{ reject: false },
);
const result = await execa(
containerRuntime(),
[
'run',
'-d',
'--name',
args.containerName,
'--memory',
'4g',
'--cpus',
'2',
...networkArgs(),
...(args.publishTcpPort
? ['-p', `127.0.0.1::${args.publishTcpPort}`]
: []),
...environmentArgs(args.environment),
'-v',
`${args.workdir}:/workspace`,
'-w',
'/workspace/repo',
env.jobImage,
...(args.command ?? ['sleep', 'infinity']),
],
{ all: true },
);
return {
containerId: result.stdout.trim(),
containerName: args.containerName,
hostPort: args.publishTcpPort
? await getPublishedPort(args.containerName, args.publishTcpPort)
: undefined,
};
};
const getPublishedPort = async (containerName: string, containerPort: number) => {
const result = await execa(
containerRuntime(),
['port', containerName, `${containerPort}/tcp`],
{ all: true, reject: false },
);
const output = result.all.trim();
const match = /:(\d+)\s*$/.exec(output);
if (!match?.[1]) {
throw new Error(
`Could not determine published port for ${containerName}:${containerPort}.`,
);
}
return match[1];
};
export const execInWorkspaceContainer = async (args: {
containerName: string;
command: string[];
environment?: Record<string, string>;
redact: (value: string) => string;
timeoutMs: number;
}): Promise<CommandResult> => {
const result = await execa(
containerRuntime(),
[
'exec',
...(args.environment ? environmentArgs(args.environment) : []),
args.containerName,
...args.command,
],
{
all: true,
reject: false,
timeout: args.timeoutMs,
},
);
return {
exitCode: result.exitCode ?? 0,
output: args.redact(result.all),
};
};
export const streamInJobContainer = async (args: {
workdir: string;
command: string[];
environment: Record<string, string>;
redact: (value: string) => string;
timeoutMs: number;
onStdoutLine?: (line: string) => Promise<void>;
onStderrLine?: (line: string) => Promise<void>;
}): Promise<CommandResult> => {
const subprocess = execa(
containerRuntime(),
[
'run',
'--rm',
'--memory',
'4g',
'--cpus',
'2',
...networkArgs(),
...environmentArgs(args.environment),
'-v',
`${args.workdir}:/workspace`,
'-w',
'/workspace/repo',
env.jobImage,
...args.command,
],
{
all: true,
reject: false,
timeout: args.timeoutMs,
},
);
let stdoutBuffer = '';
let stderrBuffer = '';
const output: string[] = [];
const consume = async (
chunk: Buffer,
source: 'stdout' | 'stderr',
handler?: (line: string) => Promise<void>,
) => {
output.push(chunk.toString('utf8'));
const next = `${source === 'stdout' ? stdoutBuffer : stderrBuffer}${chunk.toString('utf8')}`;
const lines = next.split(/\r?\n/);
const remainder = lines.pop() ?? '';
if (source === 'stdout') stdoutBuffer = remainder;
else stderrBuffer = remainder;
for (const line of lines) {
if (handler) {
await handler(args.redact(line));
}
}
};
subprocess.stdout.on('data', (chunk: Buffer) => {
void consume(chunk, 'stdout', args.onStdoutLine);
});
subprocess.stderr.on('data', (chunk: Buffer) => {
void consume(chunk, 'stderr', args.onStderrLine);
});
const result = await subprocess;
if (stdoutBuffer && args.onStdoutLine) {
await args.onStdoutLine(args.redact(stdoutBuffer));
}
if (stderrBuffer && args.onStderrLine) {
await args.onStderrLine(args.redact(stderrBuffer));
}
return {
exitCode: result.exitCode ?? 0,
output: args.redact(output.join('')),
};
};
export const stopWorkspaceContainer = async (containerName: string) => {
await execa(containerRuntime(), ['rm', '-f', containerName], {
reject: false,
});
};
export const inspectWorkspaceContainer = async (containerName: string) => {
const result = await execa(
containerRuntime(),
['inspect', containerName],
{
all: true,
reject: false,
},
);
return {
exists: result.exitCode === 0,
output: result.all,
};
};
export const listWorkspaceContainerNames = async (prefix: string) => {
const result = await execa(
containerRuntime(),
['ps', '-a', '--format', '{{.Names}}'],
{ all: true, reject: false },
);
if (result.exitCode !== 0) return [];
return result.all
.split('\n')
.map((line) => line.trim())
.filter((line) => line.startsWith(prefix));
};
+54 -9
View File
@@ -1,12 +1,19 @@
import { createServer } from 'node:http';
import type { IncomingMessage, ServerResponse } from 'node:http';
import type { Id } from '@spoon/backend/convex/_generated/dataModel.js';
import { env } from './env';
import {
abortWorkspaceAgent,
cleanupOrphanedWorkspaces,
getWorkerHealth,
getWorkspaceAgentStatus,
getWorkspaceDiff,
listWorkspaceTree,
openWorkspacePullRequest,
readWorkspaceFile,
replyToInteraction,
runWorkspaceCommand,
sendWorkspaceMessage,
stopWorkspace,
@@ -43,7 +50,7 @@ const requireAuth = (request: IncomingMessage) => {
};
const jobRoute = (pathname: string) => {
const match = /^\/jobs\/([^/]+)\/([^/]+)$/.exec(pathname);
const match = /^\/jobs\/([^/]+)\/(.+)$/.exec(pathname);
if (!match?.[1] || !match[2]) return null;
return { jobId: decodeURIComponent(match[1]), action: match[2] };
};
@@ -57,8 +64,12 @@ export const startWorkerServer = () => {
request.url ?? '/',
`http://localhost:${env.httpPort}`,
);
if (url.pathname === '/health') {
sendJson(response, 200, { ok: true, workerId: env.workerId });
if (url.pathname === '/health' && request.method === 'GET') {
sendJson(response, 200, await getWorkerHealth());
return;
}
if (url.pathname === '/cleanup' && request.method === 'POST') {
sendJson(response, 200, await cleanupOrphanedWorkspaces());
return;
}
const route = jobRoute(url.pathname);
@@ -108,6 +119,34 @@ export const startWorkerServer = () => {
sendJson(response, 200, { success: true });
return;
}
if (request.method === 'GET' && route.action === 'agent/status') {
sendJson(response, 200, getWorkspaceAgentStatus(route.jobId));
return;
}
if (request.method === 'POST' && route.action === 'agent/abort') {
sendJson(response, 200, await abortWorkspaceAgent(route.jobId));
return;
}
const interactionMatch =
/^interactions\/([^/]+)\/reply$/.exec(route.action);
if (request.method === 'POST' && interactionMatch?.[1]) {
const body = await parseJson<{
externalRequestId?: string;
response?: string;
}>(request);
sendJson(
response,
200,
await replyToInteraction(route.jobId, {
interactionId: decodeURIComponent(
interactionMatch[1],
) as Id<'agentInteractionRequests'>,
externalRequestId: body.externalRequestId ?? '',
response: body.response ?? 'once',
}),
);
return;
}
if (request.method === 'POST' && route.action === 'run-command') {
const body = await parseJson<{ command?: string }>(request);
sendJson(
@@ -126,12 +165,18 @@ export const startWorkerServer = () => {
return;
}
sendJson(response, 404, { error: 'Not found' });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
sendJson(response, message === 'Unauthorized' ? 401 : 500, {
error: message,
});
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const status =
message === 'Unauthorized'
? 401
: message.includes('not supported')
? 409
: 500;
sendJson(response, status, {
error: message,
});
}
})();
});
server.listen(env.httpPort, () => {
+602 -59
View File
@@ -7,12 +7,15 @@ import {
stat,
writeFile,
} from 'node:fs/promises';
import { randomBytes } from 'node:crypto';
import path from 'node:path';
import { ConvexHttpClient } from 'convex/browser';
import type { Id } from '@spoon/backend/convex/_generated/dataModel.js';
import { api } from '@spoon/backend/convex/_generated/api.js';
import type { NormalizedAgentEvent } from './agent-events';
import { normalizeCodexJsonLine } from './agent-events';
import { env } from './env';
import {
cloneRepository,
@@ -22,8 +25,21 @@ import {
run,
} from './git';
import { getInstallationToken, openDraftPullRequest } from './github';
import type { OpenCodeSession } from './opencode-session';
import {
abortOpenCodeSession,
createOpenCodeSession,
promptOpenCodeSession,
replyOpenCodePermission,
} from './opencode-session';
import { createRedactor, truncate } from './redact';
import { runInJobContainer } from './runtime/docker';
import {
listWorkspaceContainerNames,
runInJobContainer,
startWorkspaceContainer,
stopWorkspaceContainer,
streamInJobContainer,
} from './runtime/docker';
type Claim = {
job: {
@@ -81,6 +97,14 @@ type ActiveWorkspace = {
repoDir: string;
githubToken: string;
redact: (value: string) => string;
runtimeMode?: 'opencode_server' | 'codex_exec' | 'legacy_cli';
containerName?: string;
containerId?: string;
opencodePassword?: string;
opencodeSession?: OpenCodeSession;
codexSessionId?: string;
agentTurnActive?: boolean;
resolveTurn?: () => void;
};
type FileTreeNode = {
@@ -225,6 +249,70 @@ const appendMessage = async (args: {
...args,
});
const updateMessage = async (args: {
messageId: Id<'agentJobMessages'>;
content?: string;
status?: 'queued' | 'streaming' | 'completed' | 'failed';
metadata?: string;
}) =>
await client.mutation(api.agentJobs.updateMessage, {
workerToken: env.workerToken,
workerId: env.workerId,
...args,
});
const setRuntimeSession = async (args: {
jobId: Id<'agentJobs'>;
agentRuntimeMode: 'opencode_server' | 'codex_exec' | 'legacy_cli';
opencodeSessionId?: string;
codexSessionId?: string;
containerId?: string;
}) =>
await client.mutation(api.agentJobs.setRuntimeSession, {
workerToken: env.workerToken,
workerId: env.workerId,
...args,
});
const setCodexSessionId = async (
jobId: Id<'agentJobs'>,
codexSessionId: string,
) =>
await client.mutation(api.agentJobs.setCodexSessionId, {
workerToken: env.workerToken,
workerId: env.workerId,
jobId,
codexSessionId,
});
const createInteractionRequest = async (args: {
jobId: Id<'agentJobs'>;
runtime: 'opencode' | 'codex';
externalRequestId: string;
kind: 'question' | 'permission' | 'tool_confirmation';
title: string;
body: string;
options?: string[];
metadata?: string;
}) =>
await client.mutation(api.agentJobs.createInteractionRequest, {
workerToken: env.workerToken,
workerId: env.workerId,
...args,
});
const patchInteractionRequest = async (args: {
interactionId: Id<'agentInteractionRequests'>;
status: 'pending' | 'answered' | 'approved' | 'rejected' | 'expired';
response?: string;
metadata?: string;
}) =>
await client.mutation(api.agentJobs.patchInteractionRequest, {
workerToken: env.workerToken,
workerId: env.workerId,
...args,
});
const recordWorkspaceChange = async (args: {
jobId: Id<'agentJobs'>;
path: string;
@@ -240,6 +328,9 @@ const recordWorkspaceChange = async (args: {
const commandToShell = (command: string) => ['bash', '-lc', command];
const workspaceContainerName = (jobId: string) =>
`spoon-agent-job-${jobId.replace(/[^a-zA-Z0-9_.-]/g, '-')}`;
const isCodexLoginProfile = (claim: Claim) =>
claim.aiProviderProfile?.provider === 'opencode_openai_login' ||
claim.aiProviderProfile?.authType === 'opencode_auth_json';
@@ -373,20 +464,305 @@ const prepareCodexAuth = async (workspace: ActiveWorkspace) => {
);
};
const agentCommand = (claim: Claim, prompt: string) => {
if (isCodexLoginProfile(claim)) {
return commandToShell(
`codex exec --model ${quoteShell(codexModel(claim))} --sandbox workspace-write ${quoteShell(prompt)}`,
);
}
return commandToShell(
`opencode run --model ${quoteShell(opencodeModel(claim))} ${quoteShell(prompt)}`,
);
};
const agentFailurePrefix = (claim: Claim) =>
isCodexLoginProfile(claim) ? 'codex failed' : 'opencode failed';
const handleAgentEvent = async (args: {
workspace: ActiveWorkspace;
event: NormalizedAgentEvent;
assistantMessageId: Id<'agentJobMessages'>;
assistantContent: { value: string };
}) => {
const { workspace, event, assistantMessageId, assistantContent } = args;
const jobId = workspace.claim.job._id;
if (event.kind === 'assistant_delta') {
assistantContent.value = truncate(
`${assistantContent.value}${event.content}`,
40_000,
);
await updateMessage({
messageId: assistantMessageId,
content: assistantContent.value,
status: 'streaming',
metadata: event.externalMessageId
? JSON.stringify({ externalMessageId: event.externalMessageId })
: undefined,
});
return;
}
if (event.kind === 'assistant_completed') {
workspace.agentTurnActive = false;
workspace.resolveTurn?.();
workspace.resolveTurn = undefined;
if (event.content) {
assistantContent.value = truncate(
`${assistantContent.value}${event.content}`,
40_000,
);
}
await updateMessage({
messageId: assistantMessageId,
content: assistantContent.value,
status: 'completed',
});
return;
}
if (event.kind === 'session') {
if (workspace.runtimeMode === 'codex_exec') {
workspace.codexSessionId = event.sessionId;
await setCodexSessionId(jobId, event.sessionId);
}
return;
}
if (event.kind === 'tool_started' || event.kind === 'tool_completed') {
const detail =
event.kind === 'tool_started' ? event.input : event.output;
await appendMessage({
jobId,
role: 'tool',
status: event.kind === 'tool_started' ? 'streaming' : 'completed',
content: truncate(
`${event.name}${detail ? `\n\n${detail}` : ''}`,
20_000,
),
metadata: JSON.stringify({
kind: event.kind,
externalMessageId: event.externalMessageId,
}),
});
return;
}
if (event.kind === 'file_edited') {
const diff = await getWorktreeDiff(workspace.repoDir, workspace.redact);
await recordWorkspaceChange({
jobId,
path: event.path,
source: 'agent',
changeType: await fileChangedType(workspace.repoDir, event.path),
diff: truncate(diff.output, 50_000),
});
await appendEvent(jobId, 'info', 'edit', `Agent edited ${event.path}.`);
return;
}
if (event.kind === 'command_executed') {
await appendEvent(
jobId,
event.exitCode && event.exitCode !== 0 ? 'warn' : 'info',
'check',
event.command,
event.output ? truncate(event.output, 10_000) : undefined,
);
return;
}
if (
event.kind === 'permission_requested' ||
event.kind === 'question_requested'
) {
await createInteractionRequest({
jobId,
runtime: workspace.runtimeMode === 'codex_exec' ? 'codex' : 'opencode',
externalRequestId: event.externalRequestId,
kind: event.kind === 'permission_requested' ? 'permission' : 'question',
title: event.title,
body: truncate(event.body, 20_000),
options: event.kind === 'question_requested' ? event.options : undefined,
metadata: event.metadata,
});
await appendMessage({
jobId,
role: 'system',
status: 'completed',
content: `${event.title}\n\n${truncate(event.body, 20_000)}`,
metadata: JSON.stringify({ kind: event.kind }),
});
return;
}
if (event.kind === 'status') {
await appendEvent(
jobId,
'debug',
'plan',
event.status,
event.metadata ? truncate(event.metadata, 10_000) : undefined,
);
return;
}
await appendEvent(jobId, 'error', 'plan', truncate(event.message, 20_000));
};
const ensureOpenCodeSession = async (workspace: ActiveWorkspace) => {
if (workspace.opencodeSession) return workspace.opencodeSession;
const containerName = workspaceContainerName(workspace.claim.job._id);
const password = randomBytes(24).toString('hex');
const aiEnv = providerEnvironment(workspace.claim);
const secretEnv = Object.fromEntries(
workspace.claim.secrets.map((secret) => [secret.name, secret.value]),
);
const container = await startWorkspaceContainer({
workdir: workspace.workdir,
containerName,
environment: {
...aiEnv,
...secretEnv,
OPENCODE_SERVER_PASSWORD: password,
OPENCODE_SERVER_USERNAME: 'opencode',
},
command: ['opencode', 'serve', '--hostname', '0.0.0.0', '--port', '4096'],
publishTcpPort: env.containerAccess === 'host_port' ? 4096 : undefined,
});
const baseUrl =
env.containerAccess === 'host_port'
? `http://127.0.0.1:${container.hostPort}`
: `http://${containerName}:4096`;
workspace.containerName = container.containerName;
workspace.containerId = container.containerId;
workspace.opencodePassword = password;
workspace.runtimeMode = 'opencode_server';
await setRuntimeSession({
jobId: workspace.claim.job._id,
agentRuntimeMode: 'opencode_server',
containerId: container.containerId,
});
let lastError: unknown;
for (let attempt = 0; attempt < 20; attempt += 1) {
try {
const session = await createOpenCodeSession({
baseUrl,
password,
directory: '/workspace/repo',
title: workspace.claim.job.prompt.slice(0, 80) || 'Spoon workspace',
onEvent: async (event) => {
const messageId = workspaceCurrentMessage.get(workspace.claim.job._id);
if (!messageId) return;
await handleAgentEvent({
workspace,
event,
assistantMessageId: messageId,
assistantContent:
workspaceCurrentContent.get(workspace.claim.job._id) ?? {
value: '',
},
});
},
});
workspace.opencodeSession = session;
await setRuntimeSession({
jobId: workspace.claim.job._id,
agentRuntimeMode: 'opencode_server',
opencodeSessionId: session.sessionId,
containerId: container.containerId,
});
return session;
} catch (error) {
lastError = error;
await sleep(500);
}
}
throw lastError instanceof Error
? lastError
: new Error('OpenCode server did not become ready.');
};
const workspaceCurrentMessage = new Map<string, Id<'agentJobMessages'>>();
const workspaceCurrentContent = new Map<
string,
{
value: string;
}
>();
const runCodexTurn = async (args: {
workspace: ActiveWorkspace;
prompt: string;
assistantMessageId: Id<'agentJobMessages'>;
assistantContent: { value: string };
}) => {
const { workspace, prompt, assistantMessageId, assistantContent } = args;
workspace.runtimeMode = 'codex_exec';
await setRuntimeSession({
jobId: workspace.claim.job._id,
agentRuntimeMode: 'codex_exec',
codexSessionId: workspace.codexSessionId,
});
const command = workspace.codexSessionId
? commandToShell(
`codex exec resume --json --model ${quoteShell(
codexModel(workspace.claim),
)} ${quoteShell(workspace.codexSessionId)} ${quoteShell(prompt)}`,
)
: commandToShell(
`codex exec --json --model ${quoteShell(
codexModel(workspace.claim),
)} --sandbox workspace-write ${quoteShell(prompt)}`,
);
const aiEnv = providerEnvironment(workspace.claim, jobContainerWorkspace);
const secretEnv = Object.fromEntries(
workspace.claim.secrets.map((secret) => [secret.name, secret.value]),
);
const result = await streamInJobContainer({
workdir: workspace.workdir,
command,
environment: {
...aiEnv,
...secretEnv,
},
redact: workspace.redact,
timeoutMs: env.jobTimeoutMs,
onStdoutLine: async (line) => {
for (const event of normalizeCodexJsonLine(line)) {
await handleAgentEvent({
workspace,
event,
assistantMessageId,
assistantContent,
});
}
},
onStderrLine: async (line) => {
if (line.trim()) {
await appendEvent(
workspace.claim.job._id,
'debug',
'plan',
truncate(line, 10_000),
);
}
},
});
if (result.exitCode !== 0) {
throw new Error(`codex failed:\n${result.output}`);
}
};
const runOpenCodeTurn = async (args: {
workspace: ActiveWorkspace;
prompt: string;
assistantMessageId: Id<'agentJobMessages'>;
assistantContent: { value: string };
}) => {
const { workspace, prompt, assistantMessageId, assistantContent } = args;
workspaceCurrentMessage.set(workspace.claim.job._id, assistantMessageId);
workspaceCurrentContent.set(workspace.claim.job._id, assistantContent);
const session = await ensureOpenCodeSession(workspace);
const turnDone = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
workspace.resolveTurn = undefined;
reject(new Error('OpenCode turn timed out.'));
}, env.jobTimeoutMs);
workspace.resolveTurn = () => {
clearTimeout(timeout);
resolve();
};
});
await promptOpenCodeSession({
session,
prompt,
model: opencodeModel(workspace.claim),
directory: '/workspace/repo',
});
await turnDone;
};
const systemPromptForJob = (claim: Claim) => {
const base = [
`Spoon: ${claim.spoon.name}`,
@@ -759,8 +1135,8 @@ const runClaim = async (claim: Claim) => {
await appendEvent(jobId, 'info', 'plan', 'Interactive workspace is ready.');
await sendWorkspaceMessage(jobId, systemPromptForJob(claim));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await appendEvent(
jobId,
'error',
@@ -888,9 +1264,80 @@ export const runWorkspaceCommand = async (jobId: string, command: string) => {
return { success: true };
};
export const getWorkspaceAgentStatus = (jobId: string) => {
const workspace = resolveWorkspace(jobId);
return {
runtimeMode: workspace.runtimeMode ?? 'legacy_cli',
opencodeSessionId: workspace.opencodeSession?.sessionId,
codexSessionId: workspace.codexSessionId,
containerId: workspace.containerId,
active: Boolean(workspace.agentTurnActive),
};
};
export const abortWorkspaceAgent = async (jobId: string) => {
const workspace = resolveWorkspace(jobId);
if (workspace.opencodeSession) {
await abortOpenCodeSession(workspace.opencodeSession);
workspace.agentTurnActive = false;
workspace.resolveTurn?.();
workspace.resolveTurn = undefined;
await appendEvent(workspace.claim.job._id, 'warn', 'cleanup', 'Agent turn aborted.');
return { success: true };
}
if (workspace.runtimeMode === 'codex_exec') {
throw new Error('Codex agent turns cannot be aborted from Spoon yet.');
}
return { success: true };
};
export const replyToInteraction = async (
jobId: string,
args: {
interactionId: Id<'agentInteractionRequests'>;
externalRequestId: string;
response: string;
},
) => {
const workspace = resolveWorkspace(jobId);
if (workspace.runtimeMode === 'codex_exec') {
throw new Error('Codex interaction replies are not supported yet.');
}
if (!workspace.opencodeSession) {
throw new Error('OpenCode session is not active.');
}
const mapped =
args.response === 'reject'
? 'reject'
: args.response === 'always'
? 'always'
: 'once';
await replyOpenCodePermission({
session: workspace.opencodeSession,
permissionId: args.externalRequestId,
response: mapped,
directory: '/workspace/repo',
});
await patchInteractionRequest({
interactionId: args.interactionId,
status: mapped === 'reject' ? 'rejected' : 'approved',
response: mapped,
});
await appendMessage({
jobId: workspace.claim.job._id,
role: 'system',
status: 'completed',
content: `Interaction ${mapped === 'reject' ? 'rejected' : 'approved'}.`,
});
return { success: true };
};
export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
const workspace = resolveWorkspace(jobId);
const { claim, repoDir, redact, workdir } = workspace;
const { claim, redact } = workspace;
if (workspace.agentTurnActive) {
throw new Error('Wait for the current agent turn to finish or abort it.');
}
await appendMessage({
jobId: claim.job._id,
role: 'user',
@@ -903,50 +1350,62 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
if ((claim.job.runtime ?? 'opencode') !== 'opencode') {
throw new Error('Legacy OpenAI direct jobs are no longer supported.');
}
const aiEnv = providerEnvironment(
claim,
env.runtime === 'docker' ? jobContainerWorkspace : workdir,
);
const secretEnv = Object.fromEntries(
claim.secrets.map((secret) => [secret.name, secret.value]),
);
const command = agentCommand(claim, prompt);
const result =
env.runtime === 'docker'
? await runInJobContainer({
workdir,
command,
environment: {
...aiEnv,
...secretEnv,
},
redact,
timeoutMs: env.jobTimeoutMs,
})
: await run(
'bash',
command.slice(1),
{
cwd: repoDir,
env: {
...aiEnv,
...secretEnv,
},
redact,
timeoutMs: env.jobTimeoutMs,
},
);
await appendMessage({
workspace.agentTurnActive = true;
const assistantMessageId = await appendMessage({
jobId: claim.job._id,
role: 'assistant',
status: result.exitCode === 0 ? 'completed' : 'failed',
content: truncate(result.output, 40_000),
status: 'streaming',
content: '',
});
if (result.exitCode !== 0) {
throw new Error(`${agentFailurePrefix(claim)}:\n${result.output}`);
const assistantContent = { value: '' };
if (isCodexLoginProfile(claim)) {
await runCodexTurn({
workspace,
prompt,
assistantMessageId,
assistantContent,
});
} else if (env.runtime === 'docker') {
await runOpenCodeTurn({
workspace,
prompt,
assistantMessageId,
assistantContent,
});
} else {
const aiEnv = providerEnvironment(claim);
const secretEnv = Object.fromEntries(
claim.secrets.map((secret) => [secret.name, secret.value]),
);
const result = await run('bash', ['-lc', `opencode run --format json --model ${quoteShell(opencodeModel(claim))} ${quoteShell(prompt)}`], {
cwd: workspace.repoDir,
env: {
...aiEnv,
...secretEnv,
},
redact,
timeoutMs: env.jobTimeoutMs,
});
await updateMessage({
messageId: assistantMessageId,
status: result.exitCode === 0 ? 'completed' : 'failed',
content: truncate(result.output, 40_000),
});
if (result.exitCode !== 0) {
throw new Error(`${agentFailurePrefix(claim)}:\n${result.output}`);
}
}
if (claim.job.jobType === 'maintenance_review') {
const decision = parseMaintenanceDecision(result.output);
if (isCodexLoginProfile(claim)) {
await updateMessage({
messageId: assistantMessageId,
status: 'completed',
content: assistantContent.value,
});
workspace.agentTurnActive = false;
}
workspace.agentTurnActive = false;
if (claim.job.jobType === 'maintenance_review') {
const decision = parseMaintenanceDecision(assistantContent.value);
if (decision) {
await addArtifact({
jobId: claim.job._id,
@@ -959,11 +1418,11 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
} else {
await updateStatus(claim.job._id, 'changes_ready', {
summary:
'OpenCode completed the review, but Spoon could not parse a structured maintenance decision.',
'The agent completed the review, but Spoon could not parse a structured maintenance decision.',
});
}
}
const diff = await getWorktreeDiff(repoDir, redact);
const diff = await getWorktreeDiff(workspace.repoDir, redact);
await addArtifact({
jobId: claim.job._id,
kind: 'diff',
@@ -978,8 +1437,11 @@ export const sendWorkspaceMessage = async (jobId: string, prompt: string) => {
changeType: 'modified',
diff: truncate(diff.output, 50_000),
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
} catch (error) {
workspace.agentTurnActive = false;
workspace.resolveTurn?.();
workspace.resolveTurn = undefined;
const message = error instanceof Error ? error.message : String(error);
await appendEvent(
claim.job._id,
'error',
@@ -1059,6 +1521,10 @@ export const openWorkspacePullRequest = async (jobId: string) => {
summary: 'Draft PR opened from interactive workspace.',
});
await markWorkspaceStopped(claim.job._id);
workspace.opencodeSession?.close();
if (workspace.containerName) {
await stopWorkspaceContainer(workspace.containerName);
}
activeWorkspaces.delete(jobId);
await rm(workspace.workdir, { recursive: true, force: true });
return {
@@ -1070,11 +1536,88 @@ export const openWorkspacePullRequest = async (jobId: string) => {
export const stopWorkspace = async (jobId: string) => {
const workspace = resolveWorkspace(jobId);
await markWorkspaceStopped(workspace.claim.job._id);
workspace.opencodeSession?.close();
if (workspace.containerName) {
await stopWorkspaceContainer(workspace.containerName);
}
activeWorkspaces.delete(jobId);
await rm(workspace.workdir, { recursive: true, force: true });
return { success: true };
};
export const getWorkerHealth = async () => {
const active = [...activeWorkspaces.entries()].map(([jobId, workspace]) => ({
jobId,
runtimeMode: workspace.runtimeMode ?? 'legacy_cli',
containerName: workspace.containerName,
workdir: workspace.workdir,
agentTurnActive: Boolean(workspace.agentTurnActive),
}));
const containerNames = await listWorkspaceContainerNames('spoon-agent-job-');
return {
ok: true,
workerId: env.workerId,
convexUrl: env.convexUrl,
runtime: env.runtime,
containerRuntime: env.containerRuntime,
containerAccess: env.containerAccess,
jobImage: env.jobImage,
workdir: env.workdir,
network: env.network,
httpPort: env.httpPort,
maxConcurrentJobs: env.maxConcurrentJobs,
jobTimeoutMs: env.jobTimeoutMs,
activeWorkspaceCount: active.length,
activeWorkspaces: active,
workspaceContainers: containerNames,
};
};
export const cleanupOrphanedWorkspaces = async () => {
const activeContainers = new Set(
[...activeWorkspaces.values()]
.map((workspace) => workspace.containerName)
.filter((value): value is string => Boolean(value)),
);
const activeWorkdirs = new Set(
[...activeWorkspaces.values()].map((workspace) =>
path.resolve(workspace.workdir),
),
);
const removedContainers: string[] = [];
for (const containerName of await listWorkspaceContainerNames(
'spoon-agent-job-',
)) {
if (activeContainers.has(containerName)) continue;
await stopWorkspaceContainer(containerName);
removedContainers.push(containerName);
}
const removedWorkdirs: string[] = [];
const root = path.resolve(env.workdir);
try {
const entries = await readdir(root, { withFileTypes: true });
for (const entry of entries) {
if (!entry.isDirectory() || entry.name.startsWith('.')) continue;
const target = path.resolve(root, entry.name);
if (activeWorkdirs.has(target)) continue;
await rm(target, { recursive: true, force: true });
removedWorkdirs.push(target);
}
} catch (error) {
const code = error && typeof error === 'object' ? 'code' in error : false;
if (!code || (error as { code?: string }).code !== 'ENOENT') {
throw error;
}
}
return {
success: true,
removedContainers,
removedWorkdirs,
};
};
export const startWorker = async () => {
console.log(`Spoon agent worker ${env.workerId} polling ${env.convexUrl}`);
for (;;) {