diff --git a/apps/agent-worker/src/env.ts b/apps/agent-worker/src/env.ts index 3c7c079..d3433da 100644 --- a/apps/agent-worker/src/env.ts +++ b/apps/agent-worker/src/env.ts @@ -46,6 +46,8 @@ export const env = { process.env.SPOON_WORKER_TOKEN?.trim() ?? '', terminalIdleMs: intEnv('SPOON_AGENT_TERMINAL_IDLE_MS', 1_800_000), + // How long a per-user box container survives with no active jobs/terminals. + boxIdleMs: intEnv('SPOON_AGENT_BOX_IDLE_MS', 1_800_000), workdir: process.env.SPOON_AGENT_WORKDIR?.trim() ?? '.local/agent-work', hostWorkdir: process.env.SPOON_AGENT_HOST_WORKDIR?.trim(), network: process.env.SPOON_AGENT_NETWORK?.trim(), diff --git a/apps/agent-worker/src/runtime/docker.ts b/apps/agent-worker/src/runtime/docker.ts index ac5a98f..8bdc1f9 100644 --- a/apps/agent-worker/src/runtime/docker.ts +++ b/apps/agent-worker/src/runtime/docker.ts @@ -1,4 +1,5 @@ import path from 'node:path'; +import type { Readable } from 'node:stream'; import { execa } from 'execa'; import { env } from '../env'; @@ -225,6 +226,67 @@ export const execInWorkspaceContainer = async (args: { }; }; +// Shared line-streaming + result normalization for a started subprocess +// (used by both `docker run` and `docker exec` paths). +type StreamingSubprocess = { + stdout: Readable | null; + stderr: Readable | null; +} & Promise<{ exitCode?: number; shortMessage?: string; all?: string }>; + +const streamSubprocess = async ( + subprocess: StreamingSubprocess, + redact: (value: string) => string, + onStdoutLine?: (line: string) => Promise, + onStderrLine?: (line: string) => Promise, +): Promise => { + let stdoutBuffer = ''; + let stderrBuffer = ''; + const output: string[] = []; + let lineHandlers = Promise.resolve(); + const consume = async ( + chunk: Buffer, + source: 'stdout' | 'stderr', + handler?: (line: string) => Promise, + ) => { + output.push(chunk.toString('utf8')); + const next = `${source === 'stdout' ? stdoutBuffer : stderrBuffer}${chunk.toString('utf8')}`; + const lines = next.split(/\r?\n/); + const remainder = lines.pop() ?? ''; + if (source === 'stdout') stdoutBuffer = remainder; + else stderrBuffer = remainder; + for (const line of lines) { + if (handler) await handler(redact(line)); + } + }; + subprocess.stdout?.on('data', (chunk: Buffer) => { + lineHandlers = lineHandlers.then(() => + consume(chunk, 'stdout', onStdoutLine), + ); + }); + subprocess.stderr?.on('data', (chunk: Buffer) => { + lineHandlers = lineHandlers.then(() => + consume(chunk, 'stderr', onStderrLine), + ); + }); + let result: Awaited; + try { + result = await subprocess; + } catch (error) { + await lineHandlers; + const outputText = output.join(''); + const message = + error instanceof Error ? error.message : 'Container command failed.'; + return { + exitCode: 1, + output: redact(`${outputText}${outputText ? '\n' : ''}${message}`), + }; + } + await lineHandlers; + if (stdoutBuffer && onStdoutLine) await onStdoutLine(redact(stdoutBuffer)); + if (stderrBuffer && onStderrLine) await onStderrLine(redact(stderrBuffer)); + return normalizeRunResult(result, output.join(''), redact); +}; + export const streamInJobContainer = async (args: { workdir: string; containerHome?: string; @@ -262,58 +324,110 @@ export const streamInJobContainer = async (args: { timeout: args.timeoutMs, }, ); - let stdoutBuffer = ''; - let stderrBuffer = ''; - const output: string[] = []; - let lineHandlers = Promise.resolve(); - const consume = async ( - chunk: Buffer, - source: 'stdout' | 'stderr', - handler?: (line: string) => Promise, - ) => { - output.push(chunk.toString('utf8')); - const next = `${source === 'stdout' ? stdoutBuffer : stderrBuffer}${chunk.toString('utf8')}`; - const lines = next.split(/\r?\n/); - const remainder = lines.pop() ?? ''; - if (source === 'stdout') stdoutBuffer = remainder; - else stderrBuffer = remainder; - for (const line of lines) { - if (handler) { - await handler(args.redact(line)); - } - } - }; - subprocess.stdout.on('data', (chunk: Buffer) => { - lineHandlers = lineHandlers.then(() => - consume(chunk, 'stdout', args.onStdoutLine), - ); - }); - subprocess.stderr.on('data', (chunk: Buffer) => { - lineHandlers = lineHandlers.then(() => - consume(chunk, 'stderr', args.onStderrLine), - ); - }); - let result: Awaited; - try { - result = await subprocess; - } catch (error) { - await lineHandlers; - const outputText = output.join(''); - const message = - error instanceof Error ? error.message : 'Container command failed.'; - return { - exitCode: 1, - output: args.redact(`${outputText}${outputText ? '\n' : ''}${message}`), - }; - } - await lineHandlers; - if (stdoutBuffer && args.onStdoutLine) { - await args.onStdoutLine(args.redact(stdoutBuffer)); - } - if (stderrBuffer && args.onStderrLine) { - await args.onStderrLine(args.redact(stderrBuffer)); - } - return normalizeRunResult(result, output.join(''), args.redact); + return streamSubprocess( + subprocess, + args.redact, + args.onStdoutLine, + args.onStderrLine, + ); +}; + +// Per-user persistent "box" container that all of a user's threads exec into +// (Phase 2). Started once, reused; the home volume persists state across stops. +export const userContainerName = (username: string) => + `spoon-box-${username.replace(/[^a-zA-Z0-9_.-]/g, '-')}`; + +export const ensureUserContainer = async (args: { + username: string; + workdir: string; + containerHome: string; +}): Promise => { + await ensureJobImagePulled(); + const name = userContainerName(args.username); + const inspect = await execa( + containerRuntime(), + ['inspect', '-f', '{{.State.Running}}', name], + { reject: false, stdin: 'ignore' }, + ); + if (inspect.exitCode === 0 && inspect.stdout.trim() === 'true') return name; + // Not running: remove any stale container, then start fresh. + await execa(containerRuntime(), ['rm', '-f', name], { reject: false }); + await execa( + containerRuntime(), + [ + 'run', + '-d', + '--name', + name, + '--memory', + '4g', + '--cpus', + '2', + ...networkArgs(), + '-v', + jobWorkspaceVolumeSpec(args.workdir, args.containerHome), + '-w', + args.containerHome, + env.jobImage, + 'sleep', + 'infinity', + ], + { stdin: 'ignore' }, + ); + return name; +}; + +export const streamExecInContainer = async (args: { + containerName: string; + command: string[]; + environment: Record; + containerCwd: string; + redact: (value: string) => string; + timeoutMs: number; + onStdoutLine?: (line: string) => Promise; + onStderrLine?: (line: string) => Promise; +}): Promise => { + const subprocess = execa( + containerRuntime(), + [ + 'exec', + ...environmentArgs(args.environment), + '-w', + args.containerCwd, + args.containerName, + ...args.command, + ], + { all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs }, + ); + return streamSubprocess( + subprocess, + args.redact, + args.onStdoutLine, + args.onStderrLine, + ); +}; + +export const runExecInContainer = async (args: { + containerName: string; + command: string[]; + environment: Record; + containerCwd: string; + redact: (value: string) => string; + timeoutMs: number; +}): Promise => { + const result = await execa( + containerRuntime(), + [ + 'exec', + ...environmentArgs(args.environment), + '-w', + args.containerCwd, + args.containerName, + ...args.command, + ], + { all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs }, + ); + return normalizeRunResult(result, result.all, args.redact); }; export const stopWorkspaceContainer = async (containerName: string) => { diff --git a/apps/agent-worker/src/terminal.ts b/apps/agent-worker/src/terminal.ts index 598a4a7..4949cbf 100644 --- a/apps/agent-worker/src/terminal.ts +++ b/apps/agent-worker/src/terminal.ts @@ -5,69 +5,12 @@ import Docker from 'dockerode'; import { WebSocketServer } from 'ws'; import { env } from './env'; -import { containerVolumeSuffix, hostWorkspacePath } from './runtime/docker'; import { verifyTerminalToken } from './terminal-token'; +import { acquireUserBox, releaseUserBox } from './user-container'; import { getTerminalWorkspace } from './worker'; -const TERMINAL_IMAGE = env.terminalImage; -const IDLE_STOP_MS = env.terminalIdleMs; - const docker = new Docker(); -const containerName = (jobId: string) => - `spoon-agent-term-${jobId.replace(/[^a-zA-Z0-9_.-]/g, '-')}`; - -type Session = { connections: number; idleTimer?: NodeJS.Timeout }; -const sessions = new Map(); - -const ensureTerminalContainer = async ( - jobId: string, - workdir: string, - containerHome: string, -) => { - const name = containerName(jobId); - const container = docker.getContainer(name); - const info = await container.inspect().catch(() => null); - if (info?.State.Running) return container; - if (info && !info.State.Running) { - await container.remove({ force: true }).catch(() => undefined); - } - const suffix = containerVolumeSuffix(); - const source = hostWorkspacePath(workdir); - const created = await docker.createContainer({ - name, - Image: TERMINAL_IMAGE, - Cmd: ['sleep', 'infinity'], - WorkingDir: containerHome, - Tty: false, - Labels: { 'spoon.agent.terminal': jobId }, - HostConfig: { - Binds: [`${source}:${containerHome}${suffix ? `:${suffix}` : ''}`], - NetworkMode: env.network, - Memory: 4 * 1024 * 1024 * 1024, - AutoRemove: false, - }, - }); - await created.start(); - return created; -}; - -const stopTerminalContainer = async (jobId: string) => { - await docker - .getContainer(containerName(jobId)) - .remove({ force: true }) - .catch(() => undefined); - sessions.delete(jobId); -}; - -const scheduleIdleStop = (jobId: string) => { - const session = sessions.get(jobId); - if (!session || session.connections > 0) return; - session.idleTimer = setTimeout(() => { - void stopTerminalContainer(jobId); - }, IDLE_STOP_MS); -}; - const bridge = async (ws: WebSocket, jobId: string) => { const workspace = getTerminalWorkspace(jobId); if (!workspace) { @@ -75,21 +18,27 @@ const bridge = async (ws: WebSocket, jobId: string) => { return; } - const session = sessions.get(jobId) ?? { connections: 0 }; - if (session.idleTimer) clearTimeout(session.idleTimer); - session.idleTimer = undefined; - session.connections += 1; - sessions.set(jobId, session); + // Hold the per-user box open while this terminal is connected; the agent and + // the terminal share the exact same container (Phase 2). + let boxName: string; + try { + boxName = await acquireUserBox({ + username: workspace.username, + workdir: workspace.workdir, + containerHome: workspace.containerHome, + }); + } catch (error) { + ws.close( + 1011, + `Failed to start terminal: ${error instanceof Error ? error.message : 'unknown error'}`, + ); + return; + } let stream: Duplex | undefined; let exec: Docker.Exec | undefined; try { - const container = await ensureTerminalContainer( - jobId, - workspace.workdir, - workspace.containerHome, - ); - exec = await container.exec({ + exec = await docker.getContainer(boxName).exec({ // Reattach a persistent tmux session across reconnects when tmux is // available; otherwise fall back to a plain login shell. Cmd: [ @@ -114,8 +63,7 @@ const bridge = async (ws: WebSocket, jobId: string) => { 1011, `Failed to start terminal: ${error instanceof Error ? error.message : 'unknown error'}`, ); - session.connections -= 1; - scheduleIdleStop(jobId); + releaseUserBox(workspace.username); return; } @@ -151,13 +99,12 @@ const bridge = async (ws: WebSocket, jobId: string) => { activeStream.write(data); }); + let released = false; const cleanup = () => { + if (released) return; + released = true; activeStream.end(); - const current = sessions.get(jobId); - if (current) { - current.connections = Math.max(0, current.connections - 1); - scheduleIdleStop(jobId); - } + releaseUserBox(workspace.username); }; ws.on('close', cleanup); ws.on('error', cleanup); diff --git a/apps/agent-worker/src/user-container.ts b/apps/agent-worker/src/user-container.ts new file mode 100644 index 0000000..c4bb374 --- /dev/null +++ b/apps/agent-worker/src/user-container.ts @@ -0,0 +1,40 @@ +import { env } from './env'; +import { + ensureUserContainer, + stopWorkspaceContainer, + userContainerName, +} from './runtime/docker'; + +// Phase 2: one persistent "box" container per user that all of their threads +// (agent turns + terminal + commands) exec into. Reference-counted so it stays +// up while any thread workspace is active or a terminal is connected, and is +// reaped after an idle period once nothing holds it. +type Box = { refs: number; idleTimer?: NodeJS.Timeout }; +const boxes = new Map(); + +export const acquireUserBox = async (args: { + username: string; + workdir: string; + containerHome: string; +}): Promise => { + const name = await ensureUserContainer(args); + const box = boxes.get(args.username) ?? { refs: 0 }; + if (box.idleTimer) { + clearTimeout(box.idleTimer); + box.idleTimer = undefined; + } + box.refs += 1; + boxes.set(args.username, box); + return name; +}; + +export const releaseUserBox = (username: string) => { + const box = boxes.get(username); + if (!box) return; + box.refs = Math.max(0, box.refs - 1); + if (box.refs > 0) return; + box.idleTimer = setTimeout(() => { + void stopWorkspaceContainer(userContainerName(username)); + boxes.delete(username); + }, env.boxIdleMs); +}; diff --git a/apps/agent-worker/src/user-environment.ts b/apps/agent-worker/src/user-environment.ts index 40c63d6..19f6a30 100644 --- a/apps/agent-worker/src/user-environment.ts +++ b/apps/agent-worker/src/user-environment.ts @@ -7,7 +7,7 @@ import type { Id } from '@spoon/backend/convex/_generated/dataModel.js'; import { api } from '@spoon/backend/convex/_generated/api.js'; import { env } from './env'; -import { runInJobContainer } from './runtime/docker'; +import { runExecInContainer } from './runtime/docker'; const client = new ConvexHttpClient(env.convexUrl); @@ -51,10 +51,11 @@ const safeHomeJoin = (homeDir: string, relPath: string) => { export const materializeUserHome = async (args: { homeDir: string; containerHome: string; + boxName: string; userEnv: UserEnvironment; redact: (value: string) => string; }): Promise => { - const { homeDir, containerHome, userEnv, redact } = args; + const { homeDir, containerHome, boxName, userEnv, redact } = args; await mkdir(homeDir, { recursive: true }); // A mounted home has no /etc/skel, so ensure login shells source ~/.bashrc. @@ -95,11 +96,10 @@ export const materializeUserHome = async (args: { ] .filter(Boolean) .join('\n'); - await runInJobContainer({ - workdir: homeDir, - containerHome, - containerCwd: containerHome, + await runExecInContainer({ + containerName: boxName, command: ['bash', '-lc', script], + containerCwd: containerHome, environment: { HOME: containerHome }, redact, timeoutMs: env.jobTimeoutMs, diff --git a/apps/agent-worker/src/worker.ts b/apps/agent-worker/src/worker.ts index a9a6b53..d5025ad 100644 --- a/apps/agent-worker/src/worker.ts +++ b/apps/agent-worker/src/worker.ts @@ -36,11 +36,12 @@ import { import { createRedactor, truncate } from './redact'; import { listWorkspaceContainerNames, - runInJobContainer, + runExecInContainer, startWorkspaceContainer, stopWorkspaceContainer, - streamInJobContainer, + streamExecInContainer, } from './runtime/docker'; +import { acquireUserBox, releaseUserBox } from './user-container'; import { fetchUserEnvironment, materializeUserHome } from './user-environment'; type Claim = { @@ -104,6 +105,8 @@ type ActiveWorkspace = { containerHome: string; containerRepo: string; repoDir: string; + // Phase 2: the per-user box container this thread execs into. + boxName: string; githubToken: string; redact: (value: string) => string; runtimeMode?: 'opencode_server' | 'codex_exec' | 'legacy_cli'; @@ -760,9 +763,8 @@ const runCodexTurn = async (args: { const secretEnv = Object.fromEntries( workspace.claim.secrets.map((secret) => [secret.name, secret.value]), ); - const result = await streamInJobContainer({ - workdir: workspace.workdir, - containerHome: workspace.containerHome, + const result = await streamExecInContainer({ + containerName: workspace.boxName, containerCwd: workspace.containerRepo, command, environment: { @@ -1006,27 +1008,29 @@ const runProjectCommand = async (args: { command: string; phase: 'install' | 'check' | 'test'; claim: Claim; - workdir: string; + boxName: string; + containerHome: string; + containerCwd: string; repoDir: string; redact: (value: string) => string; }) => { await appendEvent(args.claim.job._id, 'info', args.phase, args.command); + const secretEnv = Object.fromEntries( + args.claim.secrets.map((secret) => [secret.name, secret.value]), + ); const result = env.runtime === 'docker' - ? await runInJobContainer({ - workdir: args.workdir, + ? await runExecInContainer({ + containerName: args.boxName, command: commandToShell(args.command), - environment: Object.fromEntries( - args.claim.secrets.map((secret) => [secret.name, secret.value]), - ), + containerCwd: args.containerCwd, + environment: { HOME: args.containerHome, ...secretEnv }, redact: args.redact, timeoutMs: env.jobTimeoutMs, }) : await run('bash', ['-lc', args.command], { cwd: args.repoDir, - env: Object.fromEntries( - args.claim.secrets.map((secret) => [secret.name, secret.value]), - ), + env: secretEnv, redact: args.redact, timeoutMs: env.jobTimeoutMs, }); @@ -1292,6 +1296,7 @@ const runClaim = async (claim: Claim) => { ...claim.secrets.map((secret) => secret.value), ].filter(Boolean); const redact = createRedactor(secretValues); + let acquiredBoxUser: string | undefined; try { if ((claim.job.runtime ?? 'opencode') !== 'opencode') { throw new Error('Legacy OpenAI direct jobs are no longer supported.'); @@ -1320,6 +1325,15 @@ const runClaim = async (claim: Claim) => { branchSlug, ); + // Start (or reuse) the persistent per-user box that this thread — and the + // terminal — exec into. It mounts the home, so the clone below is visible. + const boxName = await acquireUserBox({ + username, + workdir: homeDir, + containerHome, + }); + acquiredBoxUser = username; + const repoDir = await cloneRepository({ workdir: checkoutParent, dirName: branchSlug, @@ -1339,6 +1353,7 @@ const runClaim = async (claim: Claim) => { containerHome, containerRepo, repoDir, + boxName, githubToken, redact, }; @@ -1349,7 +1364,13 @@ const runClaim = async (claim: Claim) => { 'clone', 'Applying your dotfiles and environment.', ); - await materializeUserHome({ homeDir, containerHome, userEnv, redact }); + await materializeUserHome({ + homeDir, + containerHome, + boxName, + userEnv, + redact, + }); } if (isCodexLoginProfile(claim)) { await prepareCodexAuth(workspace); @@ -1412,6 +1433,7 @@ const runClaim = async (claim: Claim) => { ).catch((stopError: unknown) => { console.error(stopError); }); + if (acquiredBoxUser) releaseUserBox(acquiredBoxUser); } }; @@ -1497,7 +1519,9 @@ export const runWorkspaceCommand = async (jobId: string, command: string) => { command, phase: command.includes('test') ? 'test' : 'check', claim: workspace.claim, - workdir: workspace.workdir, + boxName: workspace.boxName, + containerHome: workspace.containerHome, + containerCwd: workspace.containerRepo, repoDir: workspace.repoDir, redact: workspace.redact, }); @@ -1831,7 +1855,8 @@ export const openWorkspacePullRequest = async (jobId: string) => { } activeWorkspaces.delete(jobId); // The persistent per-user home + ~/Code checkouts survive across sessions; - // only the container is stopped. + // release the box (reaped once no other thread/terminal holds it). + releaseUserBox(workspace.username); return { pullRequestUrl: pullRequest.html_url, pullRequestNumber: pullRequest.number, @@ -1847,7 +1872,8 @@ export const stopWorkspace = async (jobId: string) => { } activeWorkspaces.delete(jobId); // The persistent per-user home + ~/Code checkouts survive across sessions; - // only the container is stopped. + // release the box (reaped once no other thread/terminal holds it). + releaseUserBox(workspace.username); return { success: true }; };