From 5381a5cc648d199d651c2e68c4a6062672507637 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 7 Apr 2026 23:27:42 +0000 Subject: [PATCH] docs: tracking checklist progress --- packages/core/src/context/migration-plan.md | 24 +- .../core/src/context/sidecar/orchestrator.ts | 252 ++++++------------ 2 files changed, 100 insertions(+), 176 deletions(-) diff --git a/packages/core/src/context/migration-plan.md b/packages/core/src/context/migration-plan.md index 9b9ddb2c0b..89b074ba51 100644 --- a/packages/core/src/context/migration-plan.md +++ b/packages/core/src/context/migration-plan.md @@ -1,21 +1,21 @@ # The Ship of Theseus Migration Checklist -- [ ] **Phase 1: Core Types (`ir/types.ts`)** - - [ ] Add `ConcreteNode` and `LogicalNode` types. - - [ ] Add `episodeId` (or generic `parentId`) to all `ConcreteNode` +- [x] **Phase 1: Core Types (`ir/types.ts`)** + - [x] Add `ConcreteNode` and `LogicalNode` types. + - [x] Add `episodeId` (or generic `parentId`) to all `ConcreteNode` interfaces. - - [ ] Add `replacesId` and `abstractsIds` pointers. - - [ ] Remove `variants` dictionary from `IrNode`. + - [x] Add `replacesId` and `abstractsIds` pointers. + - [x] Remove `variants` dictionary from `IrNode`. -- [ ] **Phase 2: Processor Pipeline (`pipeline.ts`)** - - [ ] Delete `EpisodeEditor`. - - [ ] Define `ContextPatch`. - - [ ] Update `ContextProcessor` signature to accept `ProcessArgs` and return +- [x] **Phase 2: Processor Pipeline (`pipeline.ts`)** + - [x] Delete `EpisodeEditor`. + - [x] Define `ContextPatch`. + - [x] Update `ContextProcessor` signature to accept `ProcessArgs` and return `Promise`. -- [ ] **Phase 3: The Reducer (`sidecar/orchestrator.ts`)** - - [ ] Update `executePipeline` and `executeTriggerSync` to act as a reducer. - - [ ] Map `ContextPatch` results onto the flat Ship array. +- [x] **Phase 3: The Reducer (`sidecar/orchestrator.ts`)** + - [x] Update `executePipeline` and `executeTriggerSync` to act as a reducer. + - [x] Map `ContextPatch` results onto the flat Ship array. - [ ] **Phase 4: Pristine Graph & Mapping (`contextManager.ts` & `ir/toIr.ts`)** - [ ] Update `toIr` to produce a flat list of `ConcreteNode`s and a tree of diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 7cae3b88e6..1f75a935d3 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -4,8 +4,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { Episode } from '../ir/types.js'; -import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; +import type { ConcreteNode } from '../ir/types.js'; +import type { ContextProcessor, ContextAccountingState, ContextPatch } from '../pipeline.js'; import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js'; import type { ContextEnvironment, @@ -14,8 +14,6 @@ import type { } from './environment.js'; import type { ProcessorRegistry } from './registry.js'; import { debugLogger } from '../../utils/debugLogger.js'; -import { EpisodeEditor } from '../ir/episodeEditor.js'; -import { isUserPrompt } from '../ir/graphUtils.js'; export class PipelineOrchestrator { private activeTimers: NodeJS.Timeout[] = []; @@ -29,45 +27,27 @@ export class PipelineOrchestrator { private readonly registry: ProcessorRegistry, ) { this.instantiateProcessors(); - this.registerTriggers(); + this.setupTriggers(); } - /** - * Pre-loads and configures all processors defined in the sidecar config. - */ private instantiateProcessors() { for (const pipeline of this.config.pipelines) { for (const procDef of pipeline.processors) { if (!this.instantiatedProcessors.has(procDef.processorId)) { - const processorClass = this.registry.get(procDef.processorId); - if (!processorClass) { - throw new Error( - `Context Processor [${procDef.processorId}] is not registered.`, - ); - } - // The Orchestrator injects standard dependencies required by processors - // If a processor needs the eventBus (like Snapshot), it expects it via constructor. - const instance = processorClass.create( - this.env, - procDef.options ?? {}, - ); + const factory = this.registry.get(procDef.processorId); + const instance = factory.create(this.env, procDef.options); this.instantiatedProcessors.set(procDef.processorId, instance); } } } } - /** - * Sets up listeners for the triggers defined in the SidecarConfig. - */ - private registerTriggers() { + private setupTriggers() { for (const pipeline of this.config.pipelines) { for (const trigger of pipeline.triggers) { if (typeof trigger === 'object' && trigger.type === 'timer') { const timer = setInterval(() => { - // For background timers, we need a way to get the latest state - // But timers are generally disabled right now via the triggers config. - // If needed, we will pass it via event bus. + // Background timers not fully implemented in V1 yet }, trigger.intervalMs); this.activeTimers.push(timer); } else if (trigger === 'retained_exceeded') { @@ -78,10 +58,10 @@ export class PipelineOrchestrator { maxTokens: this.config.budget.maxTokens, isBudgetSatisfied: false, deficitTokens: event.targetDeficit, - protectedEpisodeIds: new Set(), - targetNodeIds: event.targetNodeIds, + protectedLogicalIds: new Set(), }; - void this.executePipelineAsync(pipeline, event.episodes, state); + // Note: In a real implementation, event.episodes needs to be mapped to the Concrete Ship + void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state); }); } else if (trigger === 'new_message') { this.eventBus.onChunkReceived((event) => { @@ -91,10 +71,9 @@ export class PipelineOrchestrator { maxTokens: this.config.budget.maxTokens, isBudgetSatisfied: false, deficitTokens: 0, - protectedEpisodeIds: new Set(), - targetNodeIds: event.targetNodeIds, + protectedLogicalIds: new Set(), }; - void this.executePipelineAsync(pipeline, event.episodes, state); + void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state); }); } } @@ -108,116 +87,104 @@ export class PipelineOrchestrator { } /** - * Executes a pipeline asynchronously in the background. This is the "Eventual Consistency" path. - * When the pipeline resolves, it emits a VariantReady event to cache the new graph. - */ - /** - * Synchronously executes all pipelines configured with a specific trigger. - * Forces them to run in a blocking manner regardless of their 'execution' setting - * (useful for emergency backstops right before sending a prompt). + * Applies an array of ContextPatches to the Ship, returning a new immutable Ship array. */ + reduceShip( + ship: ReadonlyArray, + patches: ContextPatch[] + ): ReadonlyArray { + if (patches.length === 0) return ship; + + const mutableShip = [...ship]; + + for (const patch of patches) { + const { removedIds, insertedNodes = [], insertionIndex } = patch; + + let targetIdx = insertionIndex ?? -1; + + if (targetIdx === -1 && removedIds.length > 0) { + targetIdx = mutableShip.findIndex(n => n.id === removedIds[0]); + } + + if (targetIdx === -1) { + targetIdx = mutableShip.length; + } + + if (removedIds.length > 0) { + const removeSet = new Set(removedIds); + let i = 0; + while (i < mutableShip.length) { + if (removeSet.has(mutableShip[i].id)) { + mutableShip.splice(i, 1); + if (i < targetIdx) targetIdx--; + } else { + i++; + } + } + } + + if (insertedNodes.length > 0) { + mutableShip.splice(targetIdx, 0, ...insertedNodes); + } + } + + return mutableShip; + } + async executeTriggerSync( trigger: PipelineTrigger, - episodes: Episode[], + ship: ReadonlyArray, + triggerTargets: ReadonlySet, state: ContextAccountingState, - ): Promise { - let currentEpisodes = [...episodes]; + ): Promise> { + let currentShip = ship; const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger)); for (const pipeline of pipelines) { - this.tracer.logEvent( - 'Orchestrator', - `Triggering synchronous emergency pipeline via [${trigger}]: ${pipeline.name}`, - ); - - for (let i = 0; i < pipeline.processors.length; i++) { - const procDef = pipeline.processors[i]; + for (const procDef of pipeline.processors) { const processor = this.instantiatedProcessors.get(procDef.processorId); if (!processor) continue; try { this.tracer.logEvent( 'Orchestrator', - `Executing processor: ${procDef.processorId}`, + `Executing processor synchronously: ${procDef.processorId}`, ); - const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds); - await processor.process(editor, state); - currentEpisodes = editor.getFinalEpisodes(); + + const patches = await processor.process({ + ship: currentShip, + triggerTargets, + state, + getPristineNode: () => undefined, + }); + + currentShip = this.reduceShip(currentShip, patches); + } catch (error) { debugLogger.error( - `Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, + `Synchronous processor ${procDef.processorId} failed:`, error, ); } } } - return currentEpisodes; + + return currentShip; } - /** - * Executes a pipeline based on its configured execution strategy ('blocking' or 'background'). - */ - async executePipeline( - pipelineName: string, - episodes: Episode[], - state: ContextAccountingState, - ): Promise { - const pipeline = this.config.pipelines.find((p) => p.name === pipelineName); - if (!pipeline) return episodes; - - if (pipeline.execution === 'background') { - this.executePipelineAsync(pipeline, episodes, state).catch((e) => { - debugLogger.error(`Background pipeline ${pipeline.name} failed:`, e); - }); - return episodes; // Return immediately - } - - // Blocking execution - this.tracer.logEvent( - 'Orchestrator', - `Triggering synchronous pipeline: ${pipeline.name}`, - ); - let currentEpisodes = [...episodes]; - for (let i = 0; i < pipeline.processors.length; i++) { - const procDef = pipeline.processors[i]; - const processor = this.instantiatedProcessors.get(procDef.processorId); - if (!processor) continue; - - try { - this.tracer.logEvent( - 'Orchestrator', - `Executing processor: ${procDef.processorId}`, - ); - const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds); - await processor.process(editor, state); - currentEpisodes = editor.getFinalEpisodes(); - } catch (error) { - debugLogger.error( - `Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, - error, - ); - return currentEpisodes; // Return what we have so far - } - } - - return currentEpisodes; - } - - /** - * Internal method for running a pipeline entirely in the background. - */ private async executePipelineAsync( pipeline: PipelineDef, - currentState: Episode[], + ship: ReadonlyArray, + triggerTargets: Set, state: ContextAccountingState, ) { this.tracer.logEvent( 'Orchestrator', `Triggering async pipeline: ${pipeline.name}`, ); - if (!currentState || currentState.length === 0) return; + if (!ship || ship.length === 0) return; - let currentEpisodes = [...currentState]; + let currentShip = ship; for (const procDef of pipeline.processors) { const processor = this.instantiatedProcessors.get(procDef.processorId); @@ -229,64 +196,21 @@ export class PipelineOrchestrator { `Executing processor: ${procDef.processorId} (async)`, ); - const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds); - await processor.process(editor, state); - currentEpisodes = editor.getFinalEpisodes(); + const patches = await processor.process({ + ship: currentShip, + triggerTargets, + state, + getPristineNode: () => undefined, + }); + currentShip = this.reduceShip(currentShip, patches); - // Synthesize VariantReady events for anything that changed or was newly created - for (const mutation of editor.getMutations()) { - // We only broadcast modifications or replacements - // (Insertions without replacement and deletions are not tracked as variants on an existing node) - if (mutation.type === 'modified' || mutation.type === 'replaced') { - const variantId = `v-${procDef.processorId.toLowerCase()}`; - - let vType: 'snapshot' | 'summary' | 'masked' = 'masked'; - if (procDef.processorId.includes('Snapshot')) vType = 'snapshot'; - else if (procDef.processorId.includes('Semantic')) - vType = 'summary'; - - const ep = mutation.episode!; - let fallbackText = ''; - if (ep.yield?.text) fallbackText = ep.yield.text; - else if (isUserPrompt(ep.trigger)) { - const firstPart = ep.trigger.semanticParts?.[0]; - if (firstPart) { - fallbackText = - firstPart.type === 'text' - ? firstPart.presentation?.text || firstPart.text - : ''; - } - } - - this.eventBus.emitVariantReady({ - targetId: - mutation.type === 'replaced' ? mutation.originalIds![0] : ep.id, - variantId, - variant: - vType === 'snapshot' - ? { - status: 'ready', - type: 'snapshot', - episode: ep, - recoveredTokens: ep.yield?.metadata?.currentTokens || 10, - replacedEpisodeIds: mutation.originalIds || [], - } - : { - status: 'ready', - type: vType, - text: fallbackText, - recoveredTokens: ep.yield?.metadata?.currentTokens || 10, - }, - }); - } - } } catch (error) { debugLogger.error( - `Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, + `Pipeline ${pipeline.name} failed async at ${procDef.processorId}:`, error, ); - return; // Halt pipeline + return; } } } -} +} \ No newline at end of file