/**
* The orchestrator is the only component that:
* - walks the pipeline (Std 6)
* - calls agents and reads their AgentResult (Std 11, Std 12)
* - performs repository write-back per each agent's declared surface (Std 10)
* - logs escalations and stops on terminal failure (Std 9, Std 12)
*
* Agents themselves never persist, never recurse implicitly, and never
* call each other directly — all coordination is here.
*/
import { jobRequestSchema, makeContext, type JobRequest } from './types.js';
import type { ExecutionContext, FailureObject, Handoff, Lineage } from './types.js';
import {
type AgentResult,
type AgentStandardsContract,
type HITLEscalation,
STANDARDS_SUMMARY,
} from './standards.js';
import { Repository } from './repository.js';
import { pipeline, type PipelineStep } from './pipeline.js';
import {
runSourceExtraction,
sourceExtractionContract,
} from './agents/baseline/source-extraction/index.js';
import {
runNormalization,
normalizationContract,
} from './agents/baseline/normalization/index.js';
import {
runResolution,
resolutionContract,
} from './agents/baseline/resolution/index.js';
import {
runAnalyticalTable,
analyticalTableContract,
} from './agents/intelligence/analytical-table/index.js';
import {
runPerformanceMetrics,
performanceMetricsContract,
} from './agents/intelligence/performance-metrics/index.js';
import {
runComparisonsSynthesis,
comparisonsSynthesisContract,
} from './agents/intelligence/comparisons-synthesis/index.js';
import {
runInsightSynthesis,
insightSynthesisContract,
} from './agents/intelligence/insight-synthesis/index.js';
import {
runOutputIngestion,
outputIngestionContract,
} from './agents/decision/output-ingestion/index.js';
import {
runVisualization,
visualizationContract,
} from './agents/decision/visualization/index.js';
import {
runDeliveryDistribution,
deliveryDistributionContract,
} from './agents/decision/delivery-distribution/index.js';
export interface OrchestrationResult {
analysisId: string;
ok: boolean;
finalHandoff: Handoff<unknown> | null;
failure: FailureObject | null;
escalations: HITLEscalation[];
repositorySnapshot: ReturnType<Repository['snapshot']>;
trace: ExecutionContext['trace'];
standards: typeof STANDARDS_SUMMARY;
pipeline: PipelineStep[];
}
/** Registry of every known agent contract (Std 1/4/5/6/8/10 declarations). */
const CONTRACTS: Record<string, AgentStandardsContract> = {
'baseline.source-extraction': sourceExtractionContract,
'baseline.normalization': normalizationContract,
'baseline.resolution': resolutionContract,
'intelligence.analytical-table': analyticalTableContract,
'intelligence.performance-metrics': performanceMetricsContract,
'intelligence.comparisons-synthesis': comparisonsSynthesisContract,
'intelligence.insight-synthesis': insightSynthesisContract,
'decision.output-ingestion': outputIngestionContract,
'decision.visualization': visualizationContract,
'decision.delivery-distribution': deliveryDistributionContract,
};
function banner(line: string): void {
// eslint-disable-next-line no-console
console.log(`\n=== ${line} ===`);
}
/** Std 10: persist exactly what the agent's contract declared. */
function persistHandoff(
repo: Repository,
contract: AgentStandardsContract,
result: { handoff: Handoff<unknown>; escalations: readonly HITLEscalation[] },
): void {
if (contract.writeBack.structuredOutputs) repo.writeHandoff(result.handoff);
if (contract.writeBack.learnedRules) {
const payload = result.handoff.payload as unknown;
if (payload && typeof payload === 'object' && 'learnedRules' in payload) {
const learned = (payload as { learnedRules?: { key: string; value: string }[] }).learnedRules ?? [];
for (const lr of learned) repo.writeLearnedRule(contract.agentName, lr.key, lr.value);
}
}
for (const esc of result.escalations) repo.writeEscalation(esc);
}
export async function runAnalysis(rawJob: unknown): Promise<OrchestrationResult> {
// Std 2: validate the JobRequest at the boundary.
const parsed = jobRequestSchema.safeParse(rawJob);
if (!parsed.success) {
const lineage: Lineage = {
sourceUrl: null,
capturedAt: new Date().toISOString(),
effectiveAs: null,
agentVersion: 'orchestrator/0.1.0',
upstream: [],
};
return {
analysisId: 'unknown',
ok: false,
finalHandoff: null,
failure: {
agent: 'orchestrator',
agentVersion: '0.1.0',
category: 'invalid-input',
reason: 'JobRequest failed schema validation.',
context: { issues: parsed.error.issues },
lineage,
attempts: 0,
recursionDepth: 0,
occurredAt: new Date().toISOString(),
},
escalations: [],
repositorySnapshot: new Repository().snapshot(),
trace: [],
standards: STANDARDS_SUMMARY,
pipeline,
};
}
const job: JobRequest = parsed.data;
const ctx = makeContext(job.analysisId, `corr-${job.analysisId}-${Date.now()}`);
const repo = new Repository();
const allEscalations: HITLEscalation[] = [];
banner(`BID Orchestrator — analysis=${job.analysisId}`);
// eslint-disable-next-line no-console
console.log(`Question: ${job.question}`);
// eslint-disable-next-line no-console
console.log(`Entities: ${job.entities.map(e => e.id).join(', ')} | Period: ${job.period} | Sources: ${job.sources.join(',')}`);
// eslint-disable-next-line no-console
console.log(`Pipeline: ${pipeline.map(s => (s.kind === 'agent' ? `${s.pillar}/${s.agent}` : `[${s.name}]`)).join(' → ')}`);
let lastHandoff: Handoff<unknown> | null = null;
let lastLineage: Lineage = {
sourceUrl: null,
capturedAt: new Date().toISOString(),
effectiveAs: null,
agentVersion: 'orchestrator/0.1.0',
upstream: [],
};
const accumulatedUnresolved: ExecutionContext['trace'] = []; // unused — kept for symmetry
void accumulatedUnresolved;
let unresolvedAccumulator: { category: string; detail: string; blocking: boolean }[] = [];
/* Std 4 cross-check support: when Performance Metrics produces its
* computed metrics, capture the metricKey set so Insight Synthesis
* (further downstream) can validate that every claim's metric
* reference points at a real metric the pipeline actually computed. */
let upstreamMetricKeys: readonly string[] = [];
for (const step of pipeline) {
if (step.kind === 'checkpoint') {
banner(`Checkpoint: ${step.name} (HITL pause — not implemented in POC)`);
continue;
}
const fqn = `${step.pillar}.${step.agent}`;
const contract = CONTRACTS[fqn];
if (!contract) {
const failure: FailureObject = {
agent: 'orchestrator',
agentVersion: '0.1.0',
category: 'tool-unavailable',
reason: `No contract registered for pipeline step "${fqn}".`,
context: { step },
lineage: lastLineage,
attempts: 0,
recursionDepth: ctx.recursionDepth,
occurredAt: new Date().toISOString(),
};
repo.writeFailure(failure);
return finalize(false, null, failure, allEscalations, repo, ctx);
}
banner(`Agent: ${contract.agentName} v${contract.agentVersion}`);
// eslint-disable-next-line no-console
console.log(` Objective: ${contract.objective.does}`);
// eslint-disable-next-line no-console
console.log(` Capabilities: ${contract.capabilities.join(', ')}`);
// eslint-disable-next-line no-console
console.log(` Forbidden (baseline): ${contract.rules.pillarSpecificForbidden.join(', ')}`);
let result: AgentResult<unknown>;
if (fqn === 'baseline.source-extraction') {
result = (await runSourceExtraction(job, ctx)) as AgentResult<unknown>;
} else if (fqn === 'baseline.normalization') {
// Std 11: pass upstream lineage explicitly — downstream never reconstructs.
result = (await runNormalization(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'baseline.resolution') {
const payload = (lastHandoff?.payload as { records: unknown[] } | undefined) ?? { records: [] };
result = (await runResolution(
{ records: payload.records ?? [], unresolvedIssues: unresolvedAccumulator },
{ upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'intelligence.analytical-table') {
const payload = (lastHandoff?.payload as { records: unknown[] } | undefined) ?? { records: [] };
result = (await runAnalyticalTable(
{ records: payload.records ?? [] },
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'intelligence.performance-metrics') {
result = (await runPerformanceMetrics(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'intelligence.comparisons-synthesis') {
result = (await runComparisonsSynthesis(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'intelligence.insight-synthesis') {
/* Pass the comparisons payload AND the metric keys from the
* previous handoff up the chain so the agent's Std-4 cross-check
* can validate metric refs. */
const metricKeys = upstreamMetricKeys;
result = (await runInsightSynthesis(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage, upstreamMetricKeys: metricKeys },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'decision.output-ingestion') {
result = (await runOutputIngestion(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'decision.visualization') {
result = (await runVisualization(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else if (fqn === 'decision.delivery-distribution') {
result = (await runDeliveryDistribution(
lastHandoff?.payload ?? null,
{ jobRequest: job, upstreamLineage: lastLineage },
ctx,
)) as AgentResult<unknown>;
} else {
const failure: FailureObject = {
agent: 'orchestrator',
agentVersion: '0.1.0',
category: 'tool-unavailable',
reason: `Step "${fqn}" registered in pipeline but no executor wired.`,
context: { step },
lineage: lastLineage,
attempts: 0,
recursionDepth: ctx.recursionDepth,
occurredAt: new Date().toISOString(),
};
repo.writeFailure(failure);
return finalize(false, null, failure, allEscalations, repo, ctx);
}
if (!result.ok) {
// Std 12: terminal failure. Persist + return; do not continue silently.
repo.writeFailure(result.failure);
for (const esc of result.escalations) {
repo.writeEscalation(esc);
allEscalations.push(esc);
}
// eslint-disable-next-line no-console
console.log(` [orchestrator][Std 12] FAILURE category=${result.failure.category}: ${result.failure.reason}`);
return finalize(false, lastHandoff, result.failure, allEscalations, repo, ctx);
}
// Std 10: orchestrator does the write-back per declared surface.
persistHandoff(repo, contract, result);
for (const esc of result.escalations) allEscalations.push(esc);
lastHandoff = result.handoff;
lastLineage = result.handoff.lineage;
unresolvedAccumulator = [
...unresolvedAccumulator,
...result.handoff.unresolvedIssues.map(u => ({
category: u.category,
detail: u.detail,
blocking: u.blocking,
})),
];
/* Std 4 hop: snapshot metricKeys after Performance Metrics so
* Insight Synthesis can cross-check. */
if (fqn === 'intelligence.performance-metrics') {
const payload = result.handoff.payload as { metrics?: { metricKey: string }[] };
upstreamMetricKeys = Array.from(new Set((payload.metrics ?? []).map(m => m.metricKey)));
}
// eslint-disable-next-line no-console
console.log(
` [orchestrator][Std 7] handoff validation=${result.handoff.validation.status} ` +
`confidence=${result.handoff.confidence.tier} (${result.handoff.confidence.value.toFixed(2)})`,
);
// eslint-disable-next-line no-console
console.log(
` [orchestrator][Std 10] persisted (structuredOutputs=${contract.writeBack.structuredOutputs}, ` +
`learnedRules=${contract.writeBack.learnedRules})`,
);
}
return finalize(true, lastHandoff, null, allEscalations, repo, ctx);
}
function finalize(
ok: boolean,
lastHandoff: Handoff<unknown> | null,
failure: FailureObject | null,
escalations: HITLEscalation[],
repo: Repository,
ctx: ExecutionContext,
): OrchestrationResult {
// eslint-disable-next-line no-console
console.log(`\n=== Done. ok=${ok} escalations=${escalations.length} ===\n`);
return {
analysisId: ctx.analysisId,
ok,
finalHandoff: lastHandoff,
failure,
escalations,
repositorySnapshot: repo.snapshot(),
trace: ctx.trace,
standards: STANDARDS_SUMMARY,
pipeline,
};
}