diff --git a/apps/agent-worker/src/runtime/docker.ts b/apps/agent-worker/src/runtime/docker.ts index 8bdc1f9..9d79176 100644 --- a/apps/agent-worker/src/runtime/docker.ts +++ b/apps/agent-worker/src/runtime/docker.ts @@ -1,3 +1,4 @@ +import { mkdir } from 'node:fs/promises'; import path from 'node:path'; import type { Readable } from 'node:stream'; import { execa } from 'execa'; @@ -350,6 +351,10 @@ export const ensureUserContainer = async (args: { { reject: false, stdin: 'ignore' }, ); if (inspect.exitCode === 0 && inspect.stdout.trim() === 'true') return name; + // The box mounts the per-user home, but it's created before the thread's clone + // populates it — ensure it exists first, since podman (unlike docker) refuses to + // bind-mount a missing source directory (statfs: no such file or directory). + await mkdir(args.workdir, { recursive: true }); // Not running: remove any stale container, then start fresh. await execa(containerRuntime(), ['rm', '-f', name], { reject: false }); await execa( diff --git a/apps/agent-worker/src/terminal.ts b/apps/agent-worker/src/terminal.ts index 4949cbf..1a8d3be 100644 --- a/apps/agent-worker/src/terminal.ts +++ b/apps/agent-worker/src/terminal.ts @@ -1,7 +1,7 @@ +import { spawn } from 'node:child_process'; +import type { ChildProcessWithoutNullStreams } from 'node:child_process'; import type { Server } from 'node:http'; -import type { Duplex } from 'node:stream'; import type { WebSocket } from 'ws'; -import Docker from 'dockerode'; import { WebSocketServer } from 'ws'; import { env } from './env'; @@ -9,7 +9,14 @@ import { verifyTerminalToken } from './terminal-token'; import { acquireUserBox, releaseUserBox } from './user-container'; import { getTerminalWorkspace } from './worker'; -const docker = new Docker(); +const clampDimension = (value: unknown) => { + const n = Math.trunc(Number(value)); + if (!Number.isFinite(n)) return undefined; + return Math.min(Math.max(n, 1), 1000); +}; + +// Single-quote a string for a POSIX shell. +const shellQuote = (value: string) => `'${value.replaceAll("'", `'\\''`)}'`; const bridge = async (ws: WebSocket, jobId: string) => { const workspace = getTerminalWorkspace(jobId); @@ -18,6 +25,58 @@ const bridge = async (ws: WebSocket, jobId: string) => { return; } + // bun can't load node-pty (native ABI mismatch) and dockerode can't attach to + // podman, so we drive the runtime CLI (` exec -i`) and allocate the PTY + // *inside* the container with `script`, bridging the plain pipes to the socket. + // + // Register the message handler immediately and buffer input/size until the exec + // is ready (acquiring the box can take seconds on first connect), so the initial + // resize and early keystrokes aren't dropped. + const procHolder: { current?: ChildProcessWithoutNullStreams } = {}; + const pendingInput: Buffer[] = []; + let cols = 80; + let rows = 24; + + ws.on('message', (data: Buffer, isBinary: boolean) => { + if (!isBinary) { + // Text frames are control messages (resize); anything else is raw input. + try { + const message = JSON.parse(data.toString('utf8')) as { + type?: string; + cols?: number; + rows?: number; + }; + if (message.type === 'resize') { + const c = clampDimension(message.cols); + const r = clampDimension(message.rows); + if (c && r) { + cols = c; + rows = r; + } + return; + } + } catch { + // fall through: treat as raw input + } + } + if (procHolder.current) procHolder.current.stdin.write(data); + else pendingInput.push(data); + }); + + let acquired = false; + let released = false; + // Read through a function so TS doesn't narrow `released` to a constant — the + // cleanup handler flips it asynchronously when the socket closes. + const isReleased = () => released; + const cleanup = () => { + if (released) return; + released = true; + procHolder.current?.kill(); + if (acquired) releaseUserBox(workspace.username); + }; + ws.on('close', cleanup); + ws.on('error', cleanup); + // Hold the per-user box open while this terminal is connected; the agent and // the terminal share the exact same container (Phase 2). let boxName: string; @@ -27,6 +86,7 @@ const bridge = async (ws: WebSocket, jobId: string) => { workdir: workspace.workdir, containerHome: workspace.containerHome, }); + acquired = true; } catch (error) { ws.close( 1011, @@ -35,79 +95,58 @@ const bridge = async (ws: WebSocket, jobId: string) => { return; } - let stream: Duplex | undefined; - let exec: Docker.Exec | undefined; - try { - exec = await docker.getContainer(boxName).exec({ - // Reattach a persistent tmux session across reconnects when tmux is - // available; otherwise fall back to a plain login shell. - Cmd: [ - '/bin/bash', - '-lc', - 'exec tmux new-session -A -s spoon 2>/dev/null || exec bash -l', - ], - AttachStdin: true, - AttachStdout: true, - AttachStderr: true, - Tty: true, - WorkingDir: workspace.containerRepo, - Env: [ - 'TERM=xterm-256color', - `HOME=${workspace.containerHome}`, - ...workspace.secrets.map((s) => `${s.name}=${s.value}`), - ], - }); - stream = await exec.start({ hijack: true, stdin: true, Tty: true }); - } catch (error) { - ws.close( - 1011, - `Failed to start terminal: ${error instanceof Error ? error.message : 'unknown error'}`, - ); - releaseUserBox(workspace.username); - return; - } + if (isReleased()) return; // client disconnected during startup; cleanup ran - const activeStream = stream; - const activeExec = exec; + // Reattach a persistent tmux session across reconnects when available, else a + // plain login shell. `stty` sizes the PTY to the client's viewport up front. + const launcher = + `stty rows ${rows} cols ${cols} 2>/dev/null; ` + + // Reattach a persistent tmux session when tmux is present; otherwise fall back + // to an interactive login shell (`-i` so it prints a prompt and line-edits). + // Check with `command -v` rather than `exec tmux || …`: a failed `exec` makes a + // non-interactive shell exit before the `||`, so the fallback never runs. + 'if command -v tmux >/dev/null 2>&1; then exec tmux new-session -A -s spoon; ' + + 'else exec bash -il; fi'; + const envFlags = [ + '-e', + 'TERM=xterm-256color', + '-e', + `HOME=${workspace.containerHome}`, + ...workspace.secrets.flatMap((s) => ['-e', `${s.name}=${s.value}`]), + ]; - activeStream.on('data', (chunk: Buffer) => { + const proc = spawn( + env.containerRuntime, + [ + 'exec', + '-i', + ...envFlags, + '-w', + workspace.containerRepo, + boxName, + '/bin/bash', + '-lc', + `exec script -qfc ${shellQuote(launcher)} /dev/null`, + ], + { stdio: ['pipe', 'pipe', 'pipe'] }, + ); + procHolder.current = proc; + + // Replay any keystrokes the client sent before the process was ready. + for (const buffered of pendingInput) proc.stdin.write(buffered); + pendingInput.length = 0; + + const forward = (chunk: Buffer) => { if (ws.readyState === ws.OPEN) ws.send(chunk, { binary: true }); - }); - activeStream.on('end', () => ws.close()); - activeStream.on('error', () => ws.close()); - - ws.on('message', (data: Buffer, isBinary: boolean) => { - if (isBinary) { - activeStream.write(data); - return; - } - // Text frames are control messages (resize); anything else is treated as - // input for resilience. - try { - const message = JSON.parse(data.toString('utf8')) as { - type?: string; - cols?: number; - rows?: number; - }; - if (message.type === 'resize' && message.cols && message.rows) { - void activeExec.resize({ w: message.cols, h: message.rows }); - return; - } - } catch { - // fall through: treat as raw input - } - activeStream.write(data); - }); - - let released = false; - const cleanup = () => { - if (released) return; - released = true; - activeStream.end(); - releaseUserBox(workspace.username); }; - ws.on('close', cleanup); - ws.on('error', cleanup); + proc.stdout.on('data', forward); + proc.stderr.on('data', forward); + proc.on('exit', () => { + if (ws.readyState === ws.OPEN) ws.close(); + }); + proc.on('error', () => { + if (ws.readyState === ws.OPEN) ws.close(); + }); }; /**