BID · Console
Baseline · Intelligence · Decision
web/runner.ts 6,354 bytes · typescript
/**
 * Live-run streamer.
 *
 * Spawns `npx tsx scripts/run-demo.ts "<question>"` as a child
 * process and streams its stdout / stderr to any clients connected
 * via Server-Sent Events. When the child exits, the streamer scans
 * the output/ directory for the audit file the demo just wrote and
 * notifies clients with a `done` event carrying the new run id.
 *
 * No queueing: each run is independent. The web ANTHROPIC_API_KEY is
 * forwarded to the child so the demo can call Anthropic.
 */

import { spawn, type ChildProcess } from 'node:child_process';
import { randomUUID } from 'node:crypto';
import { readdir, stat } from 'node:fs/promises';
import path from 'node:path';
import type { ServerResponse } from 'node:http';

export interface RunStatus {
  readonly runId: string;
  readonly state: 'running' | 'completed' | 'failed';
  readonly startedAt: string;
  readonly endedAt: string | null;
  readonly question: string;
  readonly stdout: string;
  readonly resultRunId: string | null;
  readonly exitCode: number | null;
}

interface ActiveRun {
  readonly runId: string;
  readonly proc: ChildProcess;
  state: 'running' | 'completed' | 'failed';
  startedAt: Date;
  endedAt: Date | null;
  question: string;
  buffer: string[];
  resultRunId: string | null;
  exitCode: number | null;
  clients: Set<ServerResponse>;
  startMtime: number;
}

const runs = new Map<string, ActiveRun>();

function broadcast(run: ActiveRun, event: string, data: unknown): void {
  const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
  for (const res of run.clients) {
    try { res.write(payload); } catch { run.clients.delete(res); }
  }
}

function pushLine(run: ActiveRun, line: string): void {
  run.buffer.push(line);
  if (run.buffer.length > 4000) run.buffer.shift();
  broadcast(run, 'log', { line });
  /* The demo prints "Audit trail written: output/run-<stamp>.json"
   * once it finishes serialising the audit. Capture the stamp so the
   * UI can navigate directly to the run-detail page. */
  const m = /Audit trail written:\s+(.*run-([^/\\]+)\.json)/.exec(line);
  if (m) run.resultRunId = m[2];
}

export async function startRun(question: string, rootDir: string): Promise<string> {
  const runId = randomUUID();
  /* Record current latest output mtime to disambiguate the new run's
   * audit file if the regex extraction misses (defensive). */
  let startMtime = 0;
  try {
    const dir = await readdir(path.join(rootDir, 'output'));
    for (const f of dir) {
      if (!/^run-.+\.json$/.test(f)) continue;
      const s = await stat(path.join(rootDir, 'output', f));
      if (s.mtimeMs > startMtime) startMtime = s.mtimeMs;
    }
  } catch { /* output dir might not exist yet */ }

  const proc = spawn(process.execPath, [
    /* tsx via node's --import for compatibility with the loader API. */
    '--import', 'tsx',
    path.join(rootDir, 'scripts', 'run-demo.ts'),
    question,
  ], {
    cwd: rootDir,
    env: {
      ...process.env,
      FORCE_COLOR: '0',
    },
    stdio: ['ignore', 'pipe', 'pipe'],
  });

  const run: ActiveRun = {
    runId,
    proc,
    state: 'running',
    startedAt: new Date(),
    endedAt: null,
    question,
    buffer: [],
    resultRunId: null,
    exitCode: null,
    clients: new Set(),
    startMtime,
  };
  runs.set(runId, run);

  const onLineFrom = (stream: NodeJS.ReadableStream): void => {
    let leftover = '';
    stream.on('data', (chunk: Buffer) => {
      const text = leftover + chunk.toString('utf8');
      const parts = text.split('\n');
      leftover = parts.pop() ?? '';
      for (const line of parts) pushLine(run, line);
    });
    stream.on('end', () => {
      if (leftover) pushLine(run, leftover);
    });
  };

  if (proc.stdout) onLineFrom(proc.stdout);
  if (proc.stderr) onLineFrom(proc.stderr);

  proc.on('exit', async (code) => {
    run.exitCode = code;
    run.endedAt = new Date();
    run.state = code === 0 ? 'completed' : 'failed';
    /* If we didn't capture the run id from stdout, find the newest
     * audit file written since start. */
    if (!run.resultRunId) {
      try {
        const dir = await readdir(path.join(rootDir, 'output'));
        let best: { id: string; mtime: number } | null = null;
        for (const f of dir) {
          const m = /^run-(.+)\.json$/.exec(f);
          if (!m) continue;
          const s = await stat(path.join(rootDir, 'output', f));
          if (s.mtimeMs <= run.startMtime) continue;
          if (!best || s.mtimeMs > best.mtime) best = { id: m[1], mtime: s.mtimeMs };
        }
        if (best) run.resultRunId = best.id;
      } catch { /* ignore */ }
    }
    broadcast(run, 'done', {
      state: run.state,
      exitCode: code,
      resultRunId: run.resultRunId,
    });
    /* Drop clients after a grace period so the SSE close is visible
     * client-side. */
    setTimeout(() => {
      for (const res of run.clients) { try { res.end(); } catch { /* */ } }
      run.clients.clear();
    }, 1000);
  });

  return runId;
}

export function attachStream(runId: string, res: ServerResponse): void {
  const run = runs.get(runId);
  if (!run) {
    res.writeHead(404, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify({ error: 'unknown runId' }));
    return;
  }
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache, no-transform',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no',
  });
  res.write(`retry: 2000\n\n`);
  /* Replay buffered output so a late client still sees everything. */
  for (const line of run.buffer) {
    res.write(`event: log\ndata: ${JSON.stringify({ line })}\n\n`);
  }
  if (run.state !== 'running') {
    res.write(`event: done\ndata: ${JSON.stringify({
      state: run.state,
      exitCode: run.exitCode,
      resultRunId: run.resultRunId,
    })}\n\n`);
    res.end();
    return;
  }
  run.clients.add(res);
  res.on('close', () => { run.clients.delete(res); });
}

export function getRunStatus(runId: string): RunStatus | null {
  const r = runs.get(runId);
  if (!r) return null;
  return {
    runId: r.runId,
    state: r.state,
    startedAt: r.startedAt.toISOString(),
    endedAt: r.endedAt ? r.endedAt.toISOString() : null,
    question: r.question,
    stdout: r.buffer.join('\n'),
    resultRunId: r.resultRunId,
    exitCode: r.exitCode,
  };
}