This commit is contained in:
Your Name
2026-04-07 00:47:39 +00:00
parent d3d6b9403d
commit f423affe6d
5 changed files with 390 additions and 1 deletions
@@ -134,7 +134,43 @@ export class PipelineOrchestrator {
try {
this.tracer.logEvent('Orchestrator', `Executing processor: ${procDef.processorId} (async)`);
// Before running, capture the state so we know what changed
const beforeMap = new Map(currentEpisodes.map(ep => [ep.id, ep]));
currentEpisodes = await processor.process(currentEpisodes, state);
// Synthesize VariantReady events for anything that changed or was newly created
for (const ep of currentEpisodes) {
const original = beforeMap.get(ep.id);
// If an episode was transformed, or if it's a completely new synthetic episode (like a Snapshot)
// we need to broadcast it so the ContextManager can cache it as a variant.
if (!original || original !== ep) {
const variantId = `v-${procDef.processorId.toLowerCase()}`;
// Determine variant type. StateSnapshot generates full 'snapshot' replacement nodes.
// Masking/Squashing generate 'masked' or 'summary' in-place variants.
let vType: 'snapshot' | 'summary' | 'masked' = 'masked';
if (procDef.processorId.includes('Snapshot')) vType = 'snapshot';
else if (procDef.processorId.includes('Semantic')) vType = 'summary';
this.eventBus.emitVariantReady({
targetId: ep.id, // The ID of the modified or new episode
variantId,
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
variant: {
status: 'ready',
type: vType,
episode: vType === 'snapshot' ? ep : undefined,
text: vType !== 'snapshot' ? (ep.yield?.text || (ep.trigger as any)?.semanticParts?.[0]?.presentation?.text || '') : undefined,
recoveredTokens: ep.yield?.metadata?.currentTokens || 10,
// For snapshots, we look at the transformations metadata to see what it replaced
replacedEpisodeIds: vType === 'snapshot' ? currentState.map(c => c.id).filter(id => id !== ep.id && !currentEpisodes.find(ce => ce.id === id)) : undefined,
} as any
});
}
}
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error);
return; // Halt pipeline