BID · Console
Baseline · Intelligence · Decision
src/orchestrator.ts 13,557 bytes · typescript
/**
 * The orchestrator is the only component that:
 *  - walks the pipeline (Std 6)
 *  - calls agents and reads their AgentResult (Std 11, Std 12)
 *  - performs repository write-back per each agent's declared surface (Std 10)
 *  - logs escalations and stops on terminal failure (Std 9, Std 12)
 *
 * Agents themselves never persist, never recurse implicitly, and never
 * call each other directly — all coordination is here.
 */

import { jobRequestSchema, makeContext, type JobRequest } from './types.js';
import type { ExecutionContext, FailureObject, Handoff, Lineage } from './types.js';
import {
  type AgentResult,
  type AgentStandardsContract,
  type HITLEscalation,
  STANDARDS_SUMMARY,
} from './standards.js';
import { Repository } from './repository.js';
import { pipeline, type PipelineStep } from './pipeline.js';
import {
  runSourceExtraction,
  sourceExtractionContract,
} from './agents/baseline/source-extraction/index.js';
import {
  runNormalization,
  normalizationContract,
} from './agents/baseline/normalization/index.js';
import {
  runResolution,
  resolutionContract,
} from './agents/baseline/resolution/index.js';
import {
  runAnalyticalTable,
  analyticalTableContract,
} from './agents/intelligence/analytical-table/index.js';
import {
  runPerformanceMetrics,
  performanceMetricsContract,
} from './agents/intelligence/performance-metrics/index.js';
import {
  runComparisonsSynthesis,
  comparisonsSynthesisContract,
} from './agents/intelligence/comparisons-synthesis/index.js';
import {
  runInsightSynthesis,
  insightSynthesisContract,
} from './agents/intelligence/insight-synthesis/index.js';
import {
  runOutputIngestion,
  outputIngestionContract,
} from './agents/decision/output-ingestion/index.js';
import {
  runVisualization,
  visualizationContract,
} from './agents/decision/visualization/index.js';
import {
  runDeliveryDistribution,
  deliveryDistributionContract,
} from './agents/decision/delivery-distribution/index.js';

export interface OrchestrationResult {
  analysisId: string;
  ok: boolean;
  finalHandoff: Handoff<unknown> | null;
  failure: FailureObject | null;
  escalations: HITLEscalation[];
  repositorySnapshot: ReturnType<Repository['snapshot']>;
  trace: ExecutionContext['trace'];
  standards: typeof STANDARDS_SUMMARY;
  pipeline: PipelineStep[];
}

/** Registry of every known agent contract (Std 1/4/5/6/8/10 declarations). */
const CONTRACTS: Record<string, AgentStandardsContract> = {
  'baseline.source-extraction': sourceExtractionContract,
  'baseline.normalization': normalizationContract,
  'baseline.resolution': resolutionContract,
  'intelligence.analytical-table': analyticalTableContract,
  'intelligence.performance-metrics': performanceMetricsContract,
  'intelligence.comparisons-synthesis': comparisonsSynthesisContract,
  'intelligence.insight-synthesis': insightSynthesisContract,
  'decision.output-ingestion': outputIngestionContract,
  'decision.visualization': visualizationContract,
  'decision.delivery-distribution': deliveryDistributionContract,
};

function banner(line: string): void {
  // eslint-disable-next-line no-console
  console.log(`\n=== ${line} ===`);
}

/** Std 10: persist exactly what the agent's contract declared. */
function persistHandoff(
  repo: Repository,
  contract: AgentStandardsContract,
  result: { handoff: Handoff<unknown>; escalations: readonly HITLEscalation[] },
): void {
  if (contract.writeBack.structuredOutputs) repo.writeHandoff(result.handoff);
  if (contract.writeBack.learnedRules) {
    const payload = result.handoff.payload as unknown;
    if (payload && typeof payload === 'object' && 'learnedRules' in payload) {
      const learned = (payload as { learnedRules?: { key: string; value: string }[] }).learnedRules ?? [];
      for (const lr of learned) repo.writeLearnedRule(contract.agentName, lr.key, lr.value);
    }
  }
  for (const esc of result.escalations) repo.writeEscalation(esc);
}

export async function runAnalysis(rawJob: unknown): Promise<OrchestrationResult> {
  // Std 2: validate the JobRequest at the boundary.
  const parsed = jobRequestSchema.safeParse(rawJob);
  if (!parsed.success) {
    const lineage: Lineage = {
      sourceUrl: null,
      capturedAt: new Date().toISOString(),
      effectiveAs: null,
      agentVersion: 'orchestrator/0.1.0',
      upstream: [],
    };
    return {
      analysisId: 'unknown',
      ok: false,
      finalHandoff: null,
      failure: {
        agent: 'orchestrator',
        agentVersion: '0.1.0',
        category: 'invalid-input',
        reason: 'JobRequest failed schema validation.',
        context: { issues: parsed.error.issues },
        lineage,
        attempts: 0,
        recursionDepth: 0,
        occurredAt: new Date().toISOString(),
      },
      escalations: [],
      repositorySnapshot: new Repository().snapshot(),
      trace: [],
      standards: STANDARDS_SUMMARY,
      pipeline,
    };
  }
  const job: JobRequest = parsed.data;

  const ctx = makeContext(job.analysisId, `corr-${job.analysisId}-${Date.now()}`);
  const repo = new Repository();
  const allEscalations: HITLEscalation[] = [];

  banner(`BID Orchestrator — analysis=${job.analysisId}`);
  // eslint-disable-next-line no-console
  console.log(`Question: ${job.question}`);
  // eslint-disable-next-line no-console
  console.log(`Entities: ${job.entities.map(e => e.id).join(', ')} | Period: ${job.period} | Sources: ${job.sources.join(',')}`);
  // eslint-disable-next-line no-console
  console.log(`Pipeline: ${pipeline.map(s => (s.kind === 'agent' ? `${s.pillar}/${s.agent}` : `[${s.name}]`)).join(' → ')}`);

  let lastHandoff: Handoff<unknown> | null = null;
  let lastLineage: Lineage = {
    sourceUrl: null,
    capturedAt: new Date().toISOString(),
    effectiveAs: null,
    agentVersion: 'orchestrator/0.1.0',
    upstream: [],
  };
  const accumulatedUnresolved: ExecutionContext['trace'] = []; // unused — kept for symmetry
  void accumulatedUnresolved;
  let unresolvedAccumulator: { category: string; detail: string; blocking: boolean }[] = [];

  /* Std 4 cross-check support: when Performance Metrics produces its
   * computed metrics, capture the metricKey set so Insight Synthesis
   * (further downstream) can validate that every claim's metric
   * reference points at a real metric the pipeline actually computed. */
  let upstreamMetricKeys: readonly string[] = [];

  for (const step of pipeline) {
    if (step.kind === 'checkpoint') {
      banner(`Checkpoint: ${step.name} (HITL pause — not implemented in POC)`);
      continue;
    }
    const fqn = `${step.pillar}.${step.agent}`;
    const contract = CONTRACTS[fqn];
    if (!contract) {
      const failure: FailureObject = {
        agent: 'orchestrator',
        agentVersion: '0.1.0',
        category: 'tool-unavailable',
        reason: `No contract registered for pipeline step "${fqn}".`,
        context: { step },
        lineage: lastLineage,
        attempts: 0,
        recursionDepth: ctx.recursionDepth,
        occurredAt: new Date().toISOString(),
      };
      repo.writeFailure(failure);
      return finalize(false, null, failure, allEscalations, repo, ctx);
    }

    banner(`Agent: ${contract.agentName} v${contract.agentVersion}`);
    // eslint-disable-next-line no-console
    console.log(`  Objective: ${contract.objective.does}`);
    // eslint-disable-next-line no-console
    console.log(`  Capabilities: ${contract.capabilities.join(', ')}`);
    // eslint-disable-next-line no-console
    console.log(`  Forbidden (baseline): ${contract.rules.pillarSpecificForbidden.join(', ')}`);

    let result: AgentResult<unknown>;
    if (fqn === 'baseline.source-extraction') {
      result = (await runSourceExtraction(job, ctx)) as AgentResult<unknown>;
    } else if (fqn === 'baseline.normalization') {
      // Std 11: pass upstream lineage explicitly — downstream never reconstructs.
      result = (await runNormalization(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'baseline.resolution') {
      const payload = (lastHandoff?.payload as { records: unknown[] } | undefined) ?? { records: [] };
      result = (await runResolution(
        { records: payload.records ?? [], unresolvedIssues: unresolvedAccumulator },
        { upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'intelligence.analytical-table') {
      const payload = (lastHandoff?.payload as { records: unknown[] } | undefined) ?? { records: [] };
      result = (await runAnalyticalTable(
        { records: payload.records ?? [] },
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'intelligence.performance-metrics') {
      result = (await runPerformanceMetrics(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'intelligence.comparisons-synthesis') {
      result = (await runComparisonsSynthesis(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'intelligence.insight-synthesis') {
      /* Pass the comparisons payload AND the metric keys from the
       * previous handoff up the chain so the agent's Std-4 cross-check
       * can validate metric refs. */
      const metricKeys = upstreamMetricKeys;
      result = (await runInsightSynthesis(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage, upstreamMetricKeys: metricKeys },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'decision.output-ingestion') {
      result = (await runOutputIngestion(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'decision.visualization') {
      result = (await runVisualization(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else if (fqn === 'decision.delivery-distribution') {
      result = (await runDeliveryDistribution(
        lastHandoff?.payload ?? null,
        { jobRequest: job, upstreamLineage: lastLineage },
        ctx,
      )) as AgentResult<unknown>;
    } else {
      const failure: FailureObject = {
        agent: 'orchestrator',
        agentVersion: '0.1.0',
        category: 'tool-unavailable',
        reason: `Step "${fqn}" registered in pipeline but no executor wired.`,
        context: { step },
        lineage: lastLineage,
        attempts: 0,
        recursionDepth: ctx.recursionDepth,
        occurredAt: new Date().toISOString(),
      };
      repo.writeFailure(failure);
      return finalize(false, null, failure, allEscalations, repo, ctx);
    }

    if (!result.ok) {
      // Std 12: terminal failure. Persist + return; do not continue silently.
      repo.writeFailure(result.failure);
      for (const esc of result.escalations) {
        repo.writeEscalation(esc);
        allEscalations.push(esc);
      }
      // eslint-disable-next-line no-console
      console.log(`  [orchestrator][Std 12] FAILURE category=${result.failure.category}: ${result.failure.reason}`);
      return finalize(false, lastHandoff, result.failure, allEscalations, repo, ctx);
    }

    // Std 10: orchestrator does the write-back per declared surface.
    persistHandoff(repo, contract, result);
    for (const esc of result.escalations) allEscalations.push(esc);

    lastHandoff = result.handoff;
    lastLineage = result.handoff.lineage;
    unresolvedAccumulator = [
      ...unresolvedAccumulator,
      ...result.handoff.unresolvedIssues.map(u => ({
        category: u.category,
        detail: u.detail,
        blocking: u.blocking,
      })),
    ];

    /* Std 4 hop: snapshot metricKeys after Performance Metrics so
     * Insight Synthesis can cross-check. */
    if (fqn === 'intelligence.performance-metrics') {
      const payload = result.handoff.payload as { metrics?: { metricKey: string }[] };
      upstreamMetricKeys = Array.from(new Set((payload.metrics ?? []).map(m => m.metricKey)));
    }

    // eslint-disable-next-line no-console
    console.log(
      `  [orchestrator][Std 7] handoff validation=${result.handoff.validation.status} ` +
        `confidence=${result.handoff.confidence.tier} (${result.handoff.confidence.value.toFixed(2)})`,
    );
    // eslint-disable-next-line no-console
    console.log(
      `  [orchestrator][Std 10] persisted (structuredOutputs=${contract.writeBack.structuredOutputs}, ` +
        `learnedRules=${contract.writeBack.learnedRules})`,
    );
  }

  return finalize(true, lastHandoff, null, allEscalations, repo, ctx);
}

function finalize(
  ok: boolean,
  lastHandoff: Handoff<unknown> | null,
  failure: FailureObject | null,
  escalations: HITLEscalation[],
  repo: Repository,
  ctx: ExecutionContext,
): OrchestrationResult {
  // eslint-disable-next-line no-console
  console.log(`\n=== Done. ok=${ok} escalations=${escalations.length} ===\n`);
  return {
    analysisId: ctx.analysisId,
    ok,
    finalHandoff: lastHandoff,
    failure,
    escalations,
    repositorySnapshot: repo.snapshot(),
    trace: ctx.trace,
    standards: STANDARDS_SUMMARY,
    pipeline,
  };
}