import { execa } from 'execa'; import path from 'node:path'; import { env } from '../env'; type CommandResult = { exitCode: number; output: string; }; const environmentArgs = (environment: Record) => Object.entries(environment).flatMap(([name, value]) => [ '-e', `${name}=${value}`, ]); const networkArgs = () => (env.network ? ['--network', env.network] : []); const containerRuntime = () => env.containerRuntime; // `docker run` reuses a stale local `:latest` forever, so without an explicit // pull the job image never updates in production. Pull once per worker process // (i.e. once per deploy/restart) so a fresh worker always runs a fresh job // image. Best-effort: if the registry is unreachable we fall back to whatever // image is present locally rather than failing the job. let jobImagePullPromise: Promise | undefined; export const ensureJobImagePulled = () => { jobImagePullPromise ??= (async () => { try { await execa(containerRuntime(), ['pull', env.jobImage], { reject: false, stdin: 'ignore', }); } catch { // Ignore: keep running with the locally cached image. } })(); return jobImagePullPromise; }; // execa with `reject: false` resolves (does not throw) even when the runtime // binary is missing (ENOENT) — `exitCode` is then `undefined`. Coercing that to // 0 makes a failed spawn look like a successful empty run, which is exactly how // a worker image without a `docker` CLI silently produced empty agent // responses. Normalize so any spawn failure is a non-zero exit carrying the // real reason. export const normalizeRunResult = ( // Declared nullable on purpose: execa's types claim these are always present, // but on a spawn failure (e.g. missing `docker` binary) `exitCode`/`all` are // actually undefined at runtime. result: { exitCode?: number; shortMessage?: string }, output: string | undefined, redact: (value: string) => string, ): CommandResult => { const text = output ?? ''; if (result.exitCode == null) { const reason = result.shortMessage ?? 'container runtime failed to start'; return { exitCode: 1, output: redact(`${text}${text ? '\n' : ''}${reason}`), }; } return { exitCode: result.exitCode, output: redact(text) }; }; const hostWorkspacePath = (workdir: string) => { if (!env.hostWorkdir) return workdir; const workerRoot = path.resolve(env.workdir); const resolvedWorkdir = path.resolve(workdir); const relative = path.relative(workerRoot, resolvedWorkdir); if (relative.startsWith('..') || path.isAbsolute(relative)) { return workdir; } return path.join(env.hostWorkdir, relative); }; export const jobWorkspaceVolumeSpec = (workdir: string) => { const volumeOptions = env.containerVolumeOptions ?? (containerRuntime().endsWith('podman') ? 'Z' : undefined); const source = hostWorkspacePath(workdir); return volumeOptions ? `${source}:/workspace:${volumeOptions}` : `${source}:/workspace`; }; export const runInJobContainer = async (args: { workdir: string; command: string[]; environment: Record; redact: (value: string) => string; timeoutMs: number; }): Promise => { await ensureJobImagePulled(); const result = await execa( containerRuntime(), [ 'run', '--rm', '--memory', '4g', '--cpus', '2', ...networkArgs(), ...environmentArgs(args.environment), '-v', jobWorkspaceVolumeSpec(args.workdir), '-w', '/workspace/repo', env.jobImage, ...args.command, ], { all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs, }, ); return normalizeRunResult(result, result.all, args.redact); }; export const startWorkspaceContainer = async (args: { workdir: string; containerName: string; environment: Record; command?: string[]; publishTcpPort?: number; }) => { await ensureJobImagePulled(); await execa( containerRuntime(), [ 'rm', '-f', args.containerName, ], { reject: false }, ); const result = await execa( containerRuntime(), [ 'run', '-d', '--name', args.containerName, '--memory', '4g', '--cpus', '2', ...networkArgs(), ...(args.publishTcpPort ? ['-p', `127.0.0.1::${args.publishTcpPort}`] : []), ...environmentArgs(args.environment), '-v', jobWorkspaceVolumeSpec(args.workdir), '-w', '/workspace/repo', env.jobImage, ...(args.command ?? ['sleep', 'infinity']), ], { all: true, stdin: 'ignore' }, ); return { containerId: result.stdout.trim(), containerName: args.containerName, hostPort: args.publishTcpPort ? await getPublishedPort(args.containerName, args.publishTcpPort) : undefined, }; }; const getPublishedPort = async (containerName: string, containerPort: number) => { const result = await execa( containerRuntime(), ['port', containerName, `${containerPort}/tcp`], { all: true, reject: false, stdin: 'ignore' }, ); const output = result.all.trim(); const match = /:(\d+)\s*$/.exec(output); if (!match?.[1]) { throw new Error( `Could not determine published port for ${containerName}:${containerPort}.`, ); } return match[1]; }; export const execInWorkspaceContainer = async (args: { containerName: string; command: string[]; environment?: Record; redact: (value: string) => string; timeoutMs: number; }): Promise => { const result = await execa( containerRuntime(), [ 'exec', ...(args.environment ? environmentArgs(args.environment) : []), args.containerName, ...args.command, ], { all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs, }, ); return { exitCode: result.exitCode ?? 0, output: args.redact(result.all), }; }; export const streamInJobContainer = async (args: { workdir: string; command: string[]; environment: Record; redact: (value: string) => string; timeoutMs: number; onStdoutLine?: (line: string) => Promise; onStderrLine?: (line: string) => Promise; }): Promise => { await ensureJobImagePulled(); const subprocess = execa( containerRuntime(), [ 'run', '--rm', '--memory', '4g', '--cpus', '2', ...networkArgs(), ...environmentArgs(args.environment), '-v', jobWorkspaceVolumeSpec(args.workdir), '-w', '/workspace/repo', env.jobImage, ...args.command, ], { all: true, reject: false, stdin: 'ignore', timeout: args.timeoutMs, }, ); let stdoutBuffer = ''; let stderrBuffer = ''; const output: string[] = []; let lineHandlers = Promise.resolve(); const consume = async ( chunk: Buffer, source: 'stdout' | 'stderr', handler?: (line: string) => Promise, ) => { output.push(chunk.toString('utf8')); const next = `${source === 'stdout' ? stdoutBuffer : stderrBuffer}${chunk.toString('utf8')}`; const lines = next.split(/\r?\n/); const remainder = lines.pop() ?? ''; if (source === 'stdout') stdoutBuffer = remainder; else stderrBuffer = remainder; for (const line of lines) { if (handler) { await handler(args.redact(line)); } } }; subprocess.stdout.on('data', (chunk: Buffer) => { lineHandlers = lineHandlers.then(() => consume(chunk, 'stdout', args.onStdoutLine), ); }); subprocess.stderr.on('data', (chunk: Buffer) => { lineHandlers = lineHandlers.then(() => consume(chunk, 'stderr', args.onStderrLine), ); }); let result: Awaited; try { result = await subprocess; } catch (error) { await lineHandlers; const outputText = output.join(''); const message = error instanceof Error ? error.message : 'Container command failed.'; return { exitCode: 1, output: args.redact(`${outputText}${outputText ? '\n' : ''}${message}`), }; } await lineHandlers; if (stdoutBuffer && args.onStdoutLine) { await args.onStdoutLine(args.redact(stdoutBuffer)); } if (stderrBuffer && args.onStderrLine) { await args.onStderrLine(args.redact(stderrBuffer)); } return normalizeRunResult(result, output.join(''), args.redact); }; export const stopWorkspaceContainer = async (containerName: string) => { await execa(containerRuntime(), ['rm', '-f', containerName], { reject: false, }); }; export const inspectWorkspaceContainer = async (containerName: string) => { const result = await execa( containerRuntime(), ['inspect', containerName], { all: true, reject: false, }, ); return { exists: result.exitCode === 0, output: result.all, }; }; export const listWorkspaceContainerNames = async (prefix: string) => { const result = await execa( containerRuntime(), ['ps', '-a', '--format', '{{.Names}}'], { all: true, reject: false }, ); if (result.exitCode !== 0) return []; return result.all .split('\n') .map((line) => line.trim()) .filter((line) => line.startsWith(prefix)); };