Phase 2: single per-user box container for every thread

Every thread (agent turns + terminal + project commands) now execs into one
persistent per-user container (spoon-box-{username}) instead of ephemeral
docker run --rm — so the agent and terminal share the exact same running
environment, filesystem, and in-session installs.

- docker.ts: ensureUserContainer (persistent box) + streamExecInContainer/
  runExecInContainer (docker exec, streaming) sharing a factored streamSubprocess
- user-container.ts: reference-counted box lifecycle (held while any thread
  workspace is active or a terminal is connected; idle-reaped after
  SPOON_AGENT_BOX_IDLE_MS, default 30m)
- worker.ts: runClaim acquires the box; codex turn + runProjectCommand exec into
  it; release on stop/PR/failure
- terminal.ts: execs into the shared box (dockerode TTY) instead of a per-job
  container; materializeUserHome runs the dotfiles setup in the box
- Verified: agent + terminal run in the same box, share fs, dotfiles + tmux load
This commit is contained in:
Gabriel Brown
2026-06-24 10:30:40 -04:00
parent c103430c7d
commit c6b27063a4
6 changed files with 281 additions and 152 deletions
+2
View File
@@ -46,6 +46,8 @@ export const env = {
process.env.SPOON_WORKER_TOKEN?.trim() ?? process.env.SPOON_WORKER_TOKEN?.trim() ??
'', '',
terminalIdleMs: intEnv('SPOON_AGENT_TERMINAL_IDLE_MS', 1_800_000), terminalIdleMs: intEnv('SPOON_AGENT_TERMINAL_IDLE_MS', 1_800_000),
// How long a per-user box container survives with no active jobs/terminals.
boxIdleMs: intEnv('SPOON_AGENT_BOX_IDLE_MS', 1_800_000),
workdir: process.env.SPOON_AGENT_WORKDIR?.trim() ?? '.local/agent-work', workdir: process.env.SPOON_AGENT_WORKDIR?.trim() ?? '.local/agent-work',
hostWorkdir: process.env.SPOON_AGENT_HOST_WORKDIR?.trim(), hostWorkdir: process.env.SPOON_AGENT_HOST_WORKDIR?.trim(),
network: process.env.SPOON_AGENT_NETWORK?.trim(), network: process.env.SPOON_AGENT_NETWORK?.trim(),
+166 -52
View File
@@ -1,4 +1,5 @@
import path from 'node:path'; import path from 'node:path';
import type { Readable } from 'node:stream';
import { execa } from 'execa'; import { execa } from 'execa';
import { env } from '../env'; import { env } from '../env';
@@ -225,6 +226,67 @@ export const execInWorkspaceContainer = async (args: {
}; };
}; };
// Shared line-streaming + result normalization for a started subprocess
// (used by both `docker run` and `docker exec` paths).
type StreamingSubprocess = {
stdout: Readable | null;
stderr: Readable | null;
} & Promise<{ exitCode?: number; shortMessage?: string; all?: string }>;
const streamSubprocess = async (
subprocess: StreamingSubprocess,
redact: (value: string) => string,
onStdoutLine?: (line: string) => Promise<void>,
onStderrLine?: (line: string) => Promise<void>,
): Promise<CommandResult> => {
let stdoutBuffer = '';
let stderrBuffer = '';
const output: string[] = [];
let lineHandlers = Promise.resolve();
const consume = async (
chunk: Buffer,
source: 'stdout' | 'stderr',
handler?: (line: string) => Promise<void>,
) => {
output.push(chunk.toString('utf8'));
const next = `${source === 'stdout' ? stdoutBuffer : stderrBuffer}${chunk.toString('utf8')}`;
const lines = next.split(/\r?\n/);
const remainder = lines.pop() ?? '';
if (source === 'stdout') stdoutBuffer = remainder;
else stderrBuffer = remainder;
for (const line of lines) {
if (handler) await handler(redact(line));
}
};
subprocess.stdout?.on('data', (chunk: Buffer) => {
lineHandlers = lineHandlers.then(() =>
consume(chunk, 'stdout', onStdoutLine),
);
});
subprocess.stderr?.on('data', (chunk: Buffer) => {
lineHandlers = lineHandlers.then(() =>
consume(chunk, 'stderr', onStderrLine),
);
});
let result: Awaited<StreamingSubprocess>;
try {
result = await subprocess;
} catch (error) {
await lineHandlers;
const outputText = output.join('');
const message =
error instanceof Error ? error.message : 'Container command failed.';
return {
exitCode: 1,
output: redact(`${outputText}${outputText ? '\n' : ''}${message}`),
};
}
await lineHandlers;
if (stdoutBuffer && onStdoutLine) await onStdoutLine(redact(stdoutBuffer));
if (stderrBuffer && onStderrLine) await onStderrLine(redact(stderrBuffer));
return normalizeRunResult(result, output.join(''), redact);
};
export const streamInJobContainer = async (args: { export const streamInJobContainer = async (args: {
workdir: string; workdir: string;
containerHome?: string; containerHome?: string;
@@ -262,58 +324,110 @@ export const streamInJobContainer = async (args: {
timeout: args.timeoutMs, timeout: args.timeoutMs,
}, },
); );
let stdoutBuffer = ''; return streamSubprocess(
let stderrBuffer = ''; subprocess,
const output: string[] = []; args.redact,
let lineHandlers = Promise.resolve(); args.onStdoutLine,
const consume = async ( args.onStderrLine,
chunk: Buffer, );
source: 'stdout' | 'stderr', };
handler?: (line: string) => Promise<void>,
) => { // Per-user persistent "box" container that all of a user's threads exec into
output.push(chunk.toString('utf8')); // (Phase 2). Started once, reused; the home volume persists state across stops.
const next = `${source === 'stdout' ? stdoutBuffer : stderrBuffer}${chunk.toString('utf8')}`; export const userContainerName = (username: string) =>
const lines = next.split(/\r?\n/); `spoon-box-${username.replace(/[^a-zA-Z0-9_.-]/g, '-')}`;
const remainder = lines.pop() ?? '';
if (source === 'stdout') stdoutBuffer = remainder; export const ensureUserContainer = async (args: {
else stderrBuffer = remainder; username: string;
for (const line of lines) { workdir: string;
if (handler) { containerHome: string;
await handler(args.redact(line)); }): Promise<string> => {
} await ensureJobImagePulled();
} const name = userContainerName(args.username);
}; const inspect = await execa(
subprocess.stdout.on('data', (chunk: Buffer) => { containerRuntime(),
lineHandlers = lineHandlers.then(() => ['inspect', '-f', '{{.State.Running}}', name],
consume(chunk, 'stdout', args.onStdoutLine), { reject: false, stdin: 'ignore' },
); );
}); if (inspect.exitCode === 0 && inspect.stdout.trim() === 'true') return name;
subprocess.stderr.on('data', (chunk: Buffer) => { // Not running: remove any stale container, then start fresh.
lineHandlers = lineHandlers.then(() => await execa(containerRuntime(), ['rm', '-f', name], { reject: false });
consume(chunk, 'stderr', args.onStderrLine), await execa(
); containerRuntime(),
}); [
let result: Awaited<typeof subprocess>; 'run',
try { '-d',
result = await subprocess; '--name',
} catch (error) { name,
await lineHandlers; '--memory',
const outputText = output.join(''); '4g',
const message = '--cpus',
error instanceof Error ? error.message : 'Container command failed.'; '2',
return { ...networkArgs(),
exitCode: 1, '-v',
output: args.redact(`${outputText}${outputText ? '\n' : ''}${message}`), jobWorkspaceVolumeSpec(args.workdir, args.containerHome),
}; '-w',
} args.containerHome,
await lineHandlers; env.jobImage,
if (stdoutBuffer && args.onStdoutLine) { 'sleep',
await args.onStdoutLine(args.redact(stdoutBuffer)); 'infinity',
} ],
if (stderrBuffer && args.onStderrLine) { { stdin: 'ignore' },
await args.onStderrLine(args.redact(stderrBuffer)); );
} return name;
return normalizeRunResult(result, output.join(''), args.redact); };
export const streamExecInContainer = async (args: {
containerName: string;
command: string[];
environment: Record<string, string>;
containerCwd: string;
redact: (value: string) => string;
timeoutMs: number;
onStdoutLine?: (line: string) => Promise<void>;
onStderrLine?: (line: string) => Promise<void>;
}): Promise<CommandResult> => {
const subprocess = execa(
containerRuntime(),
[
'exec',
...environmentArgs(args.environment),
'-w',
args.containerCwd,
args.containerName,
...args.command,
],
{ all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs },
);
return streamSubprocess(
subprocess,
args.redact,
args.onStdoutLine,
args.onStderrLine,
);
};
export const runExecInContainer = async (args: {
containerName: string;
command: string[];
environment: Record<string, string>;
containerCwd: string;
redact: (value: string) => string;
timeoutMs: number;
}): Promise<CommandResult> => {
const result = await execa(
containerRuntime(),
[
'exec',
...environmentArgs(args.environment),
'-w',
args.containerCwd,
args.containerName,
...args.command,
],
{ all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs },
);
return normalizeRunResult(result, result.all, args.redact);
}; };
export const stopWorkspaceContainer = async (containerName: string) => { export const stopWorkspaceContainer = async (containerName: string) => {
+23 -76
View File
@@ -5,69 +5,12 @@ import Docker from 'dockerode';
import { WebSocketServer } from 'ws'; import { WebSocketServer } from 'ws';
import { env } from './env'; import { env } from './env';
import { containerVolumeSuffix, hostWorkspacePath } from './runtime/docker';
import { verifyTerminalToken } from './terminal-token'; import { verifyTerminalToken } from './terminal-token';
import { acquireUserBox, releaseUserBox } from './user-container';
import { getTerminalWorkspace } from './worker'; import { getTerminalWorkspace } from './worker';
const TERMINAL_IMAGE = env.terminalImage;
const IDLE_STOP_MS = env.terminalIdleMs;
const docker = new Docker(); const docker = new Docker();
const containerName = (jobId: string) =>
`spoon-agent-term-${jobId.replace(/[^a-zA-Z0-9_.-]/g, '-')}`;
type Session = { connections: number; idleTimer?: NodeJS.Timeout };
const sessions = new Map<string, Session>();
const ensureTerminalContainer = async (
jobId: string,
workdir: string,
containerHome: string,
) => {
const name = containerName(jobId);
const container = docker.getContainer(name);
const info = await container.inspect().catch(() => null);
if (info?.State.Running) return container;
if (info && !info.State.Running) {
await container.remove({ force: true }).catch(() => undefined);
}
const suffix = containerVolumeSuffix();
const source = hostWorkspacePath(workdir);
const created = await docker.createContainer({
name,
Image: TERMINAL_IMAGE,
Cmd: ['sleep', 'infinity'],
WorkingDir: containerHome,
Tty: false,
Labels: { 'spoon.agent.terminal': jobId },
HostConfig: {
Binds: [`${source}:${containerHome}${suffix ? `:${suffix}` : ''}`],
NetworkMode: env.network,
Memory: 4 * 1024 * 1024 * 1024,
AutoRemove: false,
},
});
await created.start();
return created;
};
const stopTerminalContainer = async (jobId: string) => {
await docker
.getContainer(containerName(jobId))
.remove({ force: true })
.catch(() => undefined);
sessions.delete(jobId);
};
const scheduleIdleStop = (jobId: string) => {
const session = sessions.get(jobId);
if (!session || session.connections > 0) return;
session.idleTimer = setTimeout(() => {
void stopTerminalContainer(jobId);
}, IDLE_STOP_MS);
};
const bridge = async (ws: WebSocket, jobId: string) => { const bridge = async (ws: WebSocket, jobId: string) => {
const workspace = getTerminalWorkspace(jobId); const workspace = getTerminalWorkspace(jobId);
if (!workspace) { if (!workspace) {
@@ -75,21 +18,27 @@ const bridge = async (ws: WebSocket, jobId: string) => {
return; return;
} }
const session = sessions.get(jobId) ?? { connections: 0 }; // Hold the per-user box open while this terminal is connected; the agent and
if (session.idleTimer) clearTimeout(session.idleTimer); // the terminal share the exact same container (Phase 2).
session.idleTimer = undefined; let boxName: string;
session.connections += 1; try {
sessions.set(jobId, session); boxName = await acquireUserBox({
username: workspace.username,
workdir: workspace.workdir,
containerHome: workspace.containerHome,
});
} catch (error) {
ws.close(
1011,
`Failed to start terminal: ${error instanceof Error ? error.message : 'unknown error'}`,
);
return;
}
let stream: Duplex | undefined; let stream: Duplex | undefined;
let exec: Docker.Exec | undefined; let exec: Docker.Exec | undefined;
try { try {
const container = await ensureTerminalContainer( exec = await docker.getContainer(boxName).exec({
jobId,
workspace.workdir,
workspace.containerHome,
);
exec = await container.exec({
// Reattach a persistent tmux session across reconnects when tmux is // Reattach a persistent tmux session across reconnects when tmux is
// available; otherwise fall back to a plain login shell. // available; otherwise fall back to a plain login shell.
Cmd: [ Cmd: [
@@ -114,8 +63,7 @@ const bridge = async (ws: WebSocket, jobId: string) => {
1011, 1011,
`Failed to start terminal: ${error instanceof Error ? error.message : 'unknown error'}`, `Failed to start terminal: ${error instanceof Error ? error.message : 'unknown error'}`,
); );
session.connections -= 1; releaseUserBox(workspace.username);
scheduleIdleStop(jobId);
return; return;
} }
@@ -151,13 +99,12 @@ const bridge = async (ws: WebSocket, jobId: string) => {
activeStream.write(data); activeStream.write(data);
}); });
let released = false;
const cleanup = () => { const cleanup = () => {
if (released) return;
released = true;
activeStream.end(); activeStream.end();
const current = sessions.get(jobId); releaseUserBox(workspace.username);
if (current) {
current.connections = Math.max(0, current.connections - 1);
scheduleIdleStop(jobId);
}
}; };
ws.on('close', cleanup); ws.on('close', cleanup);
ws.on('error', cleanup); ws.on('error', cleanup);
+40
View File
@@ -0,0 +1,40 @@
import { env } from './env';
import {
ensureUserContainer,
stopWorkspaceContainer,
userContainerName,
} from './runtime/docker';
// Phase 2: one persistent "box" container per user that all of their threads
// (agent turns + terminal + commands) exec into. Reference-counted so it stays
// up while any thread workspace is active or a terminal is connected, and is
// reaped after an idle period once nothing holds it.
type Box = { refs: number; idleTimer?: NodeJS.Timeout };
const boxes = new Map<string, Box>();
export const acquireUserBox = async (args: {
username: string;
workdir: string;
containerHome: string;
}): Promise<string> => {
const name = await ensureUserContainer(args);
const box = boxes.get(args.username) ?? { refs: 0 };
if (box.idleTimer) {
clearTimeout(box.idleTimer);
box.idleTimer = undefined;
}
box.refs += 1;
boxes.set(args.username, box);
return name;
};
export const releaseUserBox = (username: string) => {
const box = boxes.get(username);
if (!box) return;
box.refs = Math.max(0, box.refs - 1);
if (box.refs > 0) return;
box.idleTimer = setTimeout(() => {
void stopWorkspaceContainer(userContainerName(username));
boxes.delete(username);
}, env.boxIdleMs);
};
+6 -6
View File
@@ -7,7 +7,7 @@ import type { Id } from '@spoon/backend/convex/_generated/dataModel.js';
import { api } from '@spoon/backend/convex/_generated/api.js'; import { api } from '@spoon/backend/convex/_generated/api.js';
import { env } from './env'; import { env } from './env';
import { runInJobContainer } from './runtime/docker'; import { runExecInContainer } from './runtime/docker';
const client = new ConvexHttpClient(env.convexUrl); const client = new ConvexHttpClient(env.convexUrl);
@@ -51,10 +51,11 @@ const safeHomeJoin = (homeDir: string, relPath: string) => {
export const materializeUserHome = async (args: { export const materializeUserHome = async (args: {
homeDir: string; homeDir: string;
containerHome: string; containerHome: string;
boxName: string;
userEnv: UserEnvironment; userEnv: UserEnvironment;
redact: (value: string) => string; redact: (value: string) => string;
}): Promise<void> => { }): Promise<void> => {
const { homeDir, containerHome, userEnv, redact } = args; const { homeDir, containerHome, boxName, userEnv, redact } = args;
await mkdir(homeDir, { recursive: true }); await mkdir(homeDir, { recursive: true });
// A mounted home has no /etc/skel, so ensure login shells source ~/.bashrc. // A mounted home has no /etc/skel, so ensure login shells source ~/.bashrc.
@@ -95,11 +96,10 @@ export const materializeUserHome = async (args: {
] ]
.filter(Boolean) .filter(Boolean)
.join('\n'); .join('\n');
await runInJobContainer({ await runExecInContainer({
workdir: homeDir, containerName: boxName,
containerHome,
containerCwd: containerHome,
command: ['bash', '-lc', script], command: ['bash', '-lc', script],
containerCwd: containerHome,
environment: { HOME: containerHome }, environment: { HOME: containerHome },
redact, redact,
timeoutMs: env.jobTimeoutMs, timeoutMs: env.jobTimeoutMs,
+44 -18
View File
@@ -36,11 +36,12 @@ import {
import { createRedactor, truncate } from './redact'; import { createRedactor, truncate } from './redact';
import { import {
listWorkspaceContainerNames, listWorkspaceContainerNames,
runInJobContainer, runExecInContainer,
startWorkspaceContainer, startWorkspaceContainer,
stopWorkspaceContainer, stopWorkspaceContainer,
streamInJobContainer, streamExecInContainer,
} from './runtime/docker'; } from './runtime/docker';
import { acquireUserBox, releaseUserBox } from './user-container';
import { fetchUserEnvironment, materializeUserHome } from './user-environment'; import { fetchUserEnvironment, materializeUserHome } from './user-environment';
type Claim = { type Claim = {
@@ -104,6 +105,8 @@ type ActiveWorkspace = {
containerHome: string; containerHome: string;
containerRepo: string; containerRepo: string;
repoDir: string; repoDir: string;
// Phase 2: the per-user box container this thread execs into.
boxName: string;
githubToken: string; githubToken: string;
redact: (value: string) => string; redact: (value: string) => string;
runtimeMode?: 'opencode_server' | 'codex_exec' | 'legacy_cli'; runtimeMode?: 'opencode_server' | 'codex_exec' | 'legacy_cli';
@@ -760,9 +763,8 @@ const runCodexTurn = async (args: {
const secretEnv = Object.fromEntries( const secretEnv = Object.fromEntries(
workspace.claim.secrets.map((secret) => [secret.name, secret.value]), workspace.claim.secrets.map((secret) => [secret.name, secret.value]),
); );
const result = await streamInJobContainer({ const result = await streamExecInContainer({
workdir: workspace.workdir, containerName: workspace.boxName,
containerHome: workspace.containerHome,
containerCwd: workspace.containerRepo, containerCwd: workspace.containerRepo,
command, command,
environment: { environment: {
@@ -1006,27 +1008,29 @@ const runProjectCommand = async (args: {
command: string; command: string;
phase: 'install' | 'check' | 'test'; phase: 'install' | 'check' | 'test';
claim: Claim; claim: Claim;
workdir: string; boxName: string;
containerHome: string;
containerCwd: string;
repoDir: string; repoDir: string;
redact: (value: string) => string; redact: (value: string) => string;
}) => { }) => {
await appendEvent(args.claim.job._id, 'info', args.phase, args.command); await appendEvent(args.claim.job._id, 'info', args.phase, args.command);
const secretEnv = Object.fromEntries(
args.claim.secrets.map((secret) => [secret.name, secret.value]),
);
const result = const result =
env.runtime === 'docker' env.runtime === 'docker'
? await runInJobContainer({ ? await runExecInContainer({
workdir: args.workdir, containerName: args.boxName,
command: commandToShell(args.command), command: commandToShell(args.command),
environment: Object.fromEntries( containerCwd: args.containerCwd,
args.claim.secrets.map((secret) => [secret.name, secret.value]), environment: { HOME: args.containerHome, ...secretEnv },
),
redact: args.redact, redact: args.redact,
timeoutMs: env.jobTimeoutMs, timeoutMs: env.jobTimeoutMs,
}) })
: await run('bash', ['-lc', args.command], { : await run('bash', ['-lc', args.command], {
cwd: args.repoDir, cwd: args.repoDir,
env: Object.fromEntries( env: secretEnv,
args.claim.secrets.map((secret) => [secret.name, secret.value]),
),
redact: args.redact, redact: args.redact,
timeoutMs: env.jobTimeoutMs, timeoutMs: env.jobTimeoutMs,
}); });
@@ -1292,6 +1296,7 @@ const runClaim = async (claim: Claim) => {
...claim.secrets.map((secret) => secret.value), ...claim.secrets.map((secret) => secret.value),
].filter(Boolean); ].filter(Boolean);
const redact = createRedactor(secretValues); const redact = createRedactor(secretValues);
let acquiredBoxUser: string | undefined;
try { try {
if ((claim.job.runtime ?? 'opencode') !== 'opencode') { if ((claim.job.runtime ?? 'opencode') !== 'opencode') {
throw new Error('Legacy OpenAI direct jobs are no longer supported.'); throw new Error('Legacy OpenAI direct jobs are no longer supported.');
@@ -1320,6 +1325,15 @@ const runClaim = async (claim: Claim) => {
branchSlug, branchSlug,
); );
// Start (or reuse) the persistent per-user box that this thread — and the
// terminal — exec into. It mounts the home, so the clone below is visible.
const boxName = await acquireUserBox({
username,
workdir: homeDir,
containerHome,
});
acquiredBoxUser = username;
const repoDir = await cloneRepository({ const repoDir = await cloneRepository({
workdir: checkoutParent, workdir: checkoutParent,
dirName: branchSlug, dirName: branchSlug,
@@ -1339,6 +1353,7 @@ const runClaim = async (claim: Claim) => {
containerHome, containerHome,
containerRepo, containerRepo,
repoDir, repoDir,
boxName,
githubToken, githubToken,
redact, redact,
}; };
@@ -1349,7 +1364,13 @@ const runClaim = async (claim: Claim) => {
'clone', 'clone',
'Applying your dotfiles and environment.', 'Applying your dotfiles and environment.',
); );
await materializeUserHome({ homeDir, containerHome, userEnv, redact }); await materializeUserHome({
homeDir,
containerHome,
boxName,
userEnv,
redact,
});
} }
if (isCodexLoginProfile(claim)) { if (isCodexLoginProfile(claim)) {
await prepareCodexAuth(workspace); await prepareCodexAuth(workspace);
@@ -1412,6 +1433,7 @@ const runClaim = async (claim: Claim) => {
).catch((stopError: unknown) => { ).catch((stopError: unknown) => {
console.error(stopError); console.error(stopError);
}); });
if (acquiredBoxUser) releaseUserBox(acquiredBoxUser);
} }
}; };
@@ -1497,7 +1519,9 @@ export const runWorkspaceCommand = async (jobId: string, command: string) => {
command, command,
phase: command.includes('test') ? 'test' : 'check', phase: command.includes('test') ? 'test' : 'check',
claim: workspace.claim, claim: workspace.claim,
workdir: workspace.workdir, boxName: workspace.boxName,
containerHome: workspace.containerHome,
containerCwd: workspace.containerRepo,
repoDir: workspace.repoDir, repoDir: workspace.repoDir,
redact: workspace.redact, redact: workspace.redact,
}); });
@@ -1831,7 +1855,8 @@ export const openWorkspacePullRequest = async (jobId: string) => {
} }
activeWorkspaces.delete(jobId); activeWorkspaces.delete(jobId);
// The persistent per-user home + ~/Code checkouts survive across sessions; // The persistent per-user home + ~/Code checkouts survive across sessions;
// only the container is stopped. // release the box (reaped once no other thread/terminal holds it).
releaseUserBox(workspace.username);
return { return {
pullRequestUrl: pullRequest.html_url, pullRequestUrl: pullRequest.html_url,
pullRequestNumber: pullRequest.number, pullRequestNumber: pullRequest.number,
@@ -1847,7 +1872,8 @@ export const stopWorkspace = async (jobId: string) => {
} }
activeWorkspaces.delete(jobId); activeWorkspaces.delete(jobId);
// The persistent per-user home + ~/Code checkouts survive across sessions; // The persistent per-user home + ~/Code checkouts survive across sessions;
// only the container is stopped. // release the box (reaped once no other thread/terminal holds it).
releaseUserBox(workspace.username);
return { success: true }; return { success: true };
}; };