docs: tracking checklist progress

This commit is contained in:
Your Name
2026-04-07 23:27:42 +00:00
parent b1d62a0b9d
commit 5381a5cc64
2 changed files with 100 additions and 176 deletions
+12 -12
View File
@@ -1,21 +1,21 @@
# The Ship of Theseus Migration Checklist
- [ ] **Phase 1: Core Types (`ir/types.ts`)**
- [ ] Add `ConcreteNode` and `LogicalNode` types.
- [ ] Add `episodeId` (or generic `parentId`) to all `ConcreteNode`
- [x] **Phase 1: Core Types (`ir/types.ts`)**
- [x] Add `ConcreteNode` and `LogicalNode` types.
- [x] Add `episodeId` (or generic `parentId`) to all `ConcreteNode`
interfaces.
- [ ] Add `replacesId` and `abstractsIds` pointers.
- [ ] Remove `variants` dictionary from `IrNode`.
- [x] Add `replacesId` and `abstractsIds` pointers.
- [x] Remove `variants` dictionary from `IrNode`.
- [ ] **Phase 2: Processor Pipeline (`pipeline.ts`)**
- [ ] Delete `EpisodeEditor`.
- [ ] Define `ContextPatch`.
- [ ] Update `ContextProcessor` signature to accept `ProcessArgs` and return
- [x] **Phase 2: Processor Pipeline (`pipeline.ts`)**
- [x] Delete `EpisodeEditor`.
- [x] Define `ContextPatch`.
- [x] Update `ContextProcessor` signature to accept `ProcessArgs` and return
`Promise<ContextPatch[]>`.
- [ ] **Phase 3: The Reducer (`sidecar/orchestrator.ts`)**
- [ ] Update `executePipeline` and `executeTriggerSync` to act as a reducer.
- [ ] Map `ContextPatch` results onto the flat Ship array.
- [x] **Phase 3: The Reducer (`sidecar/orchestrator.ts`)**
- [x] Update `executePipeline` and `executeTriggerSync` to act as a reducer.
- [x] Map `ContextPatch` results onto the flat Ship array.
- [ ] **Phase 4: Pristine Graph & Mapping (`contextManager.ts` & `ir/toIr.ts`)**
- [ ] Update `toIr` to produce a flat list of `ConcreteNode`s and a tree of
+88 -164
View File
@@ -4,8 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import type { Episode } from '../ir/types.js';
import type { ContextProcessor, ContextAccountingState } from '../pipeline.js';
import type { ConcreteNode } from '../ir/types.js';
import type { ContextProcessor, ContextAccountingState, ContextPatch } from '../pipeline.js';
import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js';
import type {
ContextEnvironment,
@@ -14,8 +14,6 @@ import type {
} from './environment.js';
import type { ProcessorRegistry } from './registry.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { EpisodeEditor } from '../ir/episodeEditor.js';
import { isUserPrompt } from '../ir/graphUtils.js';
export class PipelineOrchestrator {
private activeTimers: NodeJS.Timeout[] = [];
@@ -29,45 +27,27 @@ export class PipelineOrchestrator {
private readonly registry: ProcessorRegistry,
) {
this.instantiateProcessors();
this.registerTriggers();
this.setupTriggers();
}
/**
* Pre-loads and configures all processors defined in the sidecar config.
*/
private instantiateProcessors() {
for (const pipeline of this.config.pipelines) {
for (const procDef of pipeline.processors) {
if (!this.instantiatedProcessors.has(procDef.processorId)) {
const processorClass = this.registry.get(procDef.processorId);
if (!processorClass) {
throw new Error(
`Context Processor [${procDef.processorId}] is not registered.`,
);
}
// The Orchestrator injects standard dependencies required by processors
// If a processor needs the eventBus (like Snapshot), it expects it via constructor.
const instance = processorClass.create(
this.env,
procDef.options ?? {},
);
const factory = this.registry.get(procDef.processorId);
const instance = factory.create(this.env, procDef.options);
this.instantiatedProcessors.set(procDef.processorId, instance);
}
}
}
}
/**
* Sets up listeners for the triggers defined in the SidecarConfig.
*/
private registerTriggers() {
private setupTriggers() {
for (const pipeline of this.config.pipelines) {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {
// For background timers, we need a way to get the latest state
// But timers are generally disabled right now via the triggers config.
// If needed, we will pass it via event bus.
// Background timers not fully implemented in V1 yet
}, trigger.intervalMs);
this.activeTimers.push(timer);
} else if (trigger === 'retained_exceeded') {
@@ -78,10 +58,10 @@ export class PipelineOrchestrator {
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: event.targetDeficit,
protectedEpisodeIds: new Set(),
targetNodeIds: event.targetNodeIds,
protectedLogicalIds: new Set(),
};
void this.executePipelineAsync(pipeline, event.episodes, state);
// Note: In a real implementation, event.episodes needs to be mapped to the Concrete Ship
void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state);
});
} else if (trigger === 'new_message') {
this.eventBus.onChunkReceived((event) => {
@@ -91,10 +71,9 @@ export class PipelineOrchestrator {
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: 0,
protectedEpisodeIds: new Set(),
targetNodeIds: event.targetNodeIds,
protectedLogicalIds: new Set(),
};
void this.executePipelineAsync(pipeline, event.episodes, state);
void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state);
});
}
}
@@ -108,116 +87,104 @@ export class PipelineOrchestrator {
}
/**
* Executes a pipeline asynchronously in the background. This is the "Eventual Consistency" path.
* When the pipeline resolves, it emits a VariantReady event to cache the new graph.
*/
/**
* Synchronously executes all pipelines configured with a specific trigger.
* Forces them to run in a blocking manner regardless of their 'execution' setting
* (useful for emergency backstops right before sending a prompt).
* Applies an array of ContextPatches to the Ship, returning a new immutable Ship array.
*/
reduceShip(
ship: ReadonlyArray<ConcreteNode>,
patches: ContextPatch[]
): ReadonlyArray<ConcreteNode> {
if (patches.length === 0) return ship;
const mutableShip = [...ship];
for (const patch of patches) {
const { removedIds, insertedNodes = [], insertionIndex } = patch;
let targetIdx = insertionIndex ?? -1;
if (targetIdx === -1 && removedIds.length > 0) {
targetIdx = mutableShip.findIndex(n => n.id === removedIds[0]);
}
if (targetIdx === -1) {
targetIdx = mutableShip.length;
}
if (removedIds.length > 0) {
const removeSet = new Set(removedIds);
let i = 0;
while (i < mutableShip.length) {
if (removeSet.has(mutableShip[i].id)) {
mutableShip.splice(i, 1);
if (i < targetIdx) targetIdx--;
} else {
i++;
}
}
}
if (insertedNodes.length > 0) {
mutableShip.splice(targetIdx, 0, ...insertedNodes);
}
}
return mutableShip;
}
async executeTriggerSync(
trigger: PipelineTrigger,
episodes: Episode[],
ship: ReadonlyArray<ConcreteNode>,
triggerTargets: ReadonlySet<string>,
state: ContextAccountingState,
): Promise<Episode[]> {
let currentEpisodes = [...episodes];
): Promise<ReadonlyArray<ConcreteNode>> {
let currentShip = ship;
const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger));
for (const pipeline of pipelines) {
this.tracer.logEvent(
'Orchestrator',
`Triggering synchronous emergency pipeline via [${trigger}]: ${pipeline.name}`,
);
for (let i = 0; i < pipeline.processors.length; i++) {
const procDef = pipeline.processors[i];
for (const procDef of pipeline.processors) {
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
try {
this.tracer.logEvent(
'Orchestrator',
`Executing processor: ${procDef.processorId}`,
`Executing processor synchronously: ${procDef.processorId}`,
);
const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds);
await processor.process(editor, state);
currentEpisodes = editor.getFinalEpisodes();
const patches = await processor.process({
ship: currentShip,
triggerTargets,
state,
getPristineNode: () => undefined,
});
currentShip = this.reduceShip(currentShip, patches);
} catch (error) {
debugLogger.error(
`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`,
`Synchronous processor ${procDef.processorId} failed:`,
error,
);
}
}
}
return currentEpisodes;
return currentShip;
}
/**
* Executes a pipeline based on its configured execution strategy ('blocking' or 'background').
*/
async executePipeline(
pipelineName: string,
episodes: Episode[],
state: ContextAccountingState,
): Promise<Episode[]> {
const pipeline = this.config.pipelines.find((p) => p.name === pipelineName);
if (!pipeline) return episodes;
if (pipeline.execution === 'background') {
this.executePipelineAsync(pipeline, episodes, state).catch((e) => {
debugLogger.error(`Background pipeline ${pipeline.name} failed:`, e);
});
return episodes; // Return immediately
}
// Blocking execution
this.tracer.logEvent(
'Orchestrator',
`Triggering synchronous pipeline: ${pipeline.name}`,
);
let currentEpisodes = [...episodes];
for (let i = 0; i < pipeline.processors.length; i++) {
const procDef = pipeline.processors[i];
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
try {
this.tracer.logEvent(
'Orchestrator',
`Executing processor: ${procDef.processorId}`,
);
const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds);
await processor.process(editor, state);
currentEpisodes = editor.getFinalEpisodes();
} catch (error) {
debugLogger.error(
`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`,
error,
);
return currentEpisodes; // Return what we have so far
}
}
return currentEpisodes;
}
/**
* Internal method for running a pipeline entirely in the background.
*/
private async executePipelineAsync(
pipeline: PipelineDef,
currentState: Episode[],
ship: ReadonlyArray<ConcreteNode>,
triggerTargets: Set<string>,
state: ContextAccountingState,
) {
this.tracer.logEvent(
'Orchestrator',
`Triggering async pipeline: ${pipeline.name}`,
);
if (!currentState || currentState.length === 0) return;
if (!ship || ship.length === 0) return;
let currentEpisodes = [...currentState];
let currentShip = ship;
for (const procDef of pipeline.processors) {
const processor = this.instantiatedProcessors.get(procDef.processorId);
@@ -229,64 +196,21 @@ export class PipelineOrchestrator {
`Executing processor: ${procDef.processorId} (async)`,
);
const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds);
await processor.process(editor, state);
currentEpisodes = editor.getFinalEpisodes();
const patches = await processor.process({
ship: currentShip,
triggerTargets,
state,
getPristineNode: () => undefined,
});
currentShip = this.reduceShip(currentShip, patches);
// Synthesize VariantReady events for anything that changed or was newly created
for (const mutation of editor.getMutations()) {
// We only broadcast modifications or replacements
// (Insertions without replacement and deletions are not tracked as variants on an existing node)
if (mutation.type === 'modified' || mutation.type === 'replaced') {
const variantId = `v-${procDef.processorId.toLowerCase()}`;
let vType: 'snapshot' | 'summary' | 'masked' = 'masked';
if (procDef.processorId.includes('Snapshot')) vType = 'snapshot';
else if (procDef.processorId.includes('Semantic'))
vType = 'summary';
const ep = mutation.episode!;
let fallbackText = '';
if (ep.yield?.text) fallbackText = ep.yield.text;
else if (isUserPrompt(ep.trigger)) {
const firstPart = ep.trigger.semanticParts?.[0];
if (firstPart) {
fallbackText =
firstPart.type === 'text'
? firstPart.presentation?.text || firstPart.text
: '';
}
}
this.eventBus.emitVariantReady({
targetId:
mutation.type === 'replaced' ? mutation.originalIds![0] : ep.id,
variantId,
variant:
vType === 'snapshot'
? {
status: 'ready',
type: 'snapshot',
episode: ep,
recoveredTokens: ep.yield?.metadata?.currentTokens || 10,
replacedEpisodeIds: mutation.originalIds || [],
}
: {
status: 'ready',
type: vType,
text: fallbackText,
recoveredTokens: ep.yield?.metadata?.currentTokens || 10,
},
});
}
}
} catch (error) {
debugLogger.error(
`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`,
`Pipeline ${pipeline.name} failed async at ${procDef.processorId}:`,
error,
);
return; // Halt pipeline
return;
}
}
}
}
}