/**
* Live-run streamer.
*
* Spawns `npx tsx scripts/run-demo.ts "<question>"` as a child
* process and streams its stdout / stderr to any clients connected
* via Server-Sent Events. When the child exits, the streamer scans
* the output/ directory for the audit file the demo just wrote and
* notifies clients with a `done` event carrying the new run id.
*
* No queueing: each run is independent. The web ANTHROPIC_API_KEY is
* forwarded to the child so the demo can call Anthropic.
*/
import { spawn, type ChildProcess } from 'node:child_process';
import { randomUUID } from 'node:crypto';
import { readdir, stat } from 'node:fs/promises';
import path from 'node:path';
import type { ServerResponse } from 'node:http';
export interface RunStatus {
readonly runId: string;
readonly state: 'running' | 'completed' | 'failed';
readonly startedAt: string;
readonly endedAt: string | null;
readonly question: string;
readonly stdout: string;
readonly resultRunId: string | null;
readonly exitCode: number | null;
}
interface ActiveRun {
readonly runId: string;
readonly proc: ChildProcess;
state: 'running' | 'completed' | 'failed';
startedAt: Date;
endedAt: Date | null;
question: string;
buffer: string[];
resultRunId: string | null;
exitCode: number | null;
clients: Set<ServerResponse>;
startMtime: number;
}
const runs = new Map<string, ActiveRun>();
function broadcast(run: ActiveRun, event: string, data: unknown): void {
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const res of run.clients) {
try { res.write(payload); } catch { run.clients.delete(res); }
}
}
function pushLine(run: ActiveRun, line: string): void {
run.buffer.push(line);
if (run.buffer.length > 4000) run.buffer.shift();
broadcast(run, 'log', { line });
/* The demo prints "Audit trail written: output/run-<stamp>.json"
* once it finishes serialising the audit. Capture the stamp so the
* UI can navigate directly to the run-detail page. */
const m = /Audit trail written:\s+(.*run-([^/\\]+)\.json)/.exec(line);
if (m) run.resultRunId = m[2];
}
export async function startRun(question: string, rootDir: string): Promise<string> {
const runId = randomUUID();
/* Record current latest output mtime to disambiguate the new run's
* audit file if the regex extraction misses (defensive). */
let startMtime = 0;
try {
const dir = await readdir(path.join(rootDir, 'output'));
for (const f of dir) {
if (!/^run-.+\.json$/.test(f)) continue;
const s = await stat(path.join(rootDir, 'output', f));
if (s.mtimeMs > startMtime) startMtime = s.mtimeMs;
}
} catch { /* output dir might not exist yet */ }
const proc = spawn(process.execPath, [
/* tsx via node's --import for compatibility with the loader API. */
'--import', 'tsx',
path.join(rootDir, 'scripts', 'run-demo.ts'),
question,
], {
cwd: rootDir,
env: {
...process.env,
FORCE_COLOR: '0',
},
stdio: ['ignore', 'pipe', 'pipe'],
});
const run: ActiveRun = {
runId,
proc,
state: 'running',
startedAt: new Date(),
endedAt: null,
question,
buffer: [],
resultRunId: null,
exitCode: null,
clients: new Set(),
startMtime,
};
runs.set(runId, run);
const onLineFrom = (stream: NodeJS.ReadableStream): void => {
let leftover = '';
stream.on('data', (chunk: Buffer) => {
const text = leftover + chunk.toString('utf8');
const parts = text.split('\n');
leftover = parts.pop() ?? '';
for (const line of parts) pushLine(run, line);
});
stream.on('end', () => {
if (leftover) pushLine(run, leftover);
});
};
if (proc.stdout) onLineFrom(proc.stdout);
if (proc.stderr) onLineFrom(proc.stderr);
proc.on('exit', async (code) => {
run.exitCode = code;
run.endedAt = new Date();
run.state = code === 0 ? 'completed' : 'failed';
/* If we didn't capture the run id from stdout, find the newest
* audit file written since start. */
if (!run.resultRunId) {
try {
const dir = await readdir(path.join(rootDir, 'output'));
let best: { id: string; mtime: number } | null = null;
for (const f of dir) {
const m = /^run-(.+)\.json$/.exec(f);
if (!m) continue;
const s = await stat(path.join(rootDir, 'output', f));
if (s.mtimeMs <= run.startMtime) continue;
if (!best || s.mtimeMs > best.mtime) best = { id: m[1], mtime: s.mtimeMs };
}
if (best) run.resultRunId = best.id;
} catch { /* ignore */ }
}
broadcast(run, 'done', {
state: run.state,
exitCode: code,
resultRunId: run.resultRunId,
});
/* Drop clients after a grace period so the SSE close is visible
* client-side. */
setTimeout(() => {
for (const res of run.clients) { try { res.end(); } catch { /* */ } }
run.clients.clear();
}, 1000);
});
return runId;
}
export function attachStream(runId: string, res: ServerResponse): void {
const run = runs.get(runId);
if (!run) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'unknown runId' }));
return;
}
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.write(`retry: 2000\n\n`);
/* Replay buffered output so a late client still sees everything. */
for (const line of run.buffer) {
res.write(`event: log\ndata: ${JSON.stringify({ line })}\n\n`);
}
if (run.state !== 'running') {
res.write(`event: done\ndata: ${JSON.stringify({
state: run.state,
exitCode: run.exitCode,
resultRunId: run.resultRunId,
})}\n\n`);
res.end();
return;
}
run.clients.add(res);
res.on('close', () => { run.clients.delete(res); });
}
export function getRunStatus(runId: string): RunStatus | null {
const r = runs.get(runId);
if (!r) return null;
return {
runId: r.runId,
state: r.state,
startedAt: r.startedAt.toISOString(),
endedAt: r.endedAt ? r.endedAt.toISOString() : null,
question: r.question,
stdout: r.buffer.join('\n'),
resultRunId: r.resultRunId,
exitCode: r.exitCode,
};
}