diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index c4ad407bfc..631a89bf89 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -41,6 +41,7 @@ export interface ContextWorker { readonly name: string; readonly triggers: { onNodesAdded?: boolean; + onNodesAgedOut?: boolean; onInboxTopics?: string[]; }; execute(args: { diff --git a/packages/core/src/context/processors/stateSnapshotWorker.ts b/packages/core/src/context/processors/stateSnapshotWorker.ts index ac4c4952ac..4d637d8229 100644 --- a/packages/core/src/context/processors/stateSnapshotWorker.ts +++ b/packages/core/src/context/processors/stateSnapshotWorker.ts @@ -30,7 +30,7 @@ export class StateSnapshotWorker implements ContextWorker { // Triggers when nodes exceed retained threshold (via retained_exceeded in Orchestrator) readonly triggers = { - onNodesAdded: true, + onNodesAgedOut: true, }; constructor(env: ContextEnvironment, options: StateSnapshotWorkerOptions) { diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index c7f1f3e42b..9dd78e0614 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -125,7 +125,7 @@ export class PipelineOrchestrator { this.eventBus.onConsolidationNeeded((event) => { void this.executePipelineAsync( pipeline, - [], + event.ship, event.targetNodeIds, new Set(), // protected IDs ); @@ -134,7 +134,7 @@ export class PipelineOrchestrator { this.eventBus.onChunkReceived((event) => { void this.executePipelineAsync( pipeline, - [], + event.ship, event.targetNodeIds, new Set(), // protected IDs ); @@ -143,7 +143,7 @@ export class PipelineOrchestrator { } } - // 2. Worker Triggers (onNodesAdded is roughly onChunkReceived for now) + // 2. Worker Triggers (onNodesAdded / onNodesAgedOut) this.eventBus.onChunkReceived((event) => { // Fire all workers that care about new nodes for (const worker of this.instantiatedWorkers.values()) { @@ -151,14 +151,31 @@ export class PipelineOrchestrator { const inboxSnapshot = new InboxSnapshotImpl( this.env.inbox?.getMessages() || [], ); + const targets = event.ship.filter(n => event.targetNodeIds.has(n.id)); // Fire and forget - worker.execute({ targets: [], inbox: inboxSnapshot }).catch((e) => { + worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => { debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e); }); } } }); + this.eventBus.onConsolidationNeeded((event) => { + // Fire all workers that care about aged out nodes + for (const worker of this.instantiatedWorkers.values()) { + if (worker.triggers.onNodesAgedOut) { + const inboxSnapshot = new InboxSnapshotImpl( + this.env.inbox?.getMessages() || [], + ); + const targets = event.ship.filter(n => event.targetNodeIds.has(n.id)); + // Fire and forget + worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => { + debugLogger.error(`Worker ${worker.name} failed onNodesAgedOut:`, e); + }); + } + } + }); + // We don't have a formal event bus for inbox publish yet, but we will soon. // For now the workers are just registered. } diff --git a/packages/core/src/context/sidecar/profiles.ts b/packages/core/src/context/sidecar/profiles.ts index 1f7f0205e0..eafc3845bf 100644 --- a/packages/core/src/context/sidecar/profiles.ts +++ b/packages/core/src/context/sidecar/profiles.ts @@ -15,6 +15,14 @@ export const defaultSidecarProfile: SidecarConfig = { retainedTokens: 65000, maxTokens: 150000, }, + workers: [ + { + workerId: 'StateSnapshotWorker', + options: { + type: 'accumulate', + }, + }, + ], pipelines: [ { name: 'Immediate Sanitization', @@ -41,7 +49,6 @@ export const defaultSidecarProfile: SidecarConfig = { processorId: 'SemanticCompressionProcessor', options: { nodeThresholdTokens: 5000 }, }, - { processorId: 'StateSnapshotProcessor', options: {} }, ], }, { @@ -49,7 +56,10 @@ export const defaultSidecarProfile: SidecarConfig = { triggers: ['gc_backstop'], execution: 'blocking', processors: [ - { processorId: 'StateSnapshotProcessor', options: {} }, + { + processorId: 'StateSnapshotProcessor', + options: { target: 'max' } + }, { processorId: 'EmergencyTruncationProcessor', options: {} }, ], }, diff --git a/packages/core/src/context/sidecar/schema.ts b/packages/core/src/context/sidecar/schema.ts index b3fd83bd5d..03cee327c4 100644 --- a/packages/core/src/context/sidecar/schema.ts +++ b/packages/core/src/context/sidecar/schema.ts @@ -32,6 +32,18 @@ export function getSidecarConfigSchema(registry: ProcessorRegistry) { }, }, }, + workers: { + type: 'array', + description: 'Background workers that proactively accumulate context.', + items: { + type: 'object', + required: ['workerId'], + properties: { + workerId: { type: 'string' }, + options: { type: 'object' }, + }, + }, + }, pipelines: { type: 'array', description: 'The execution graphs for context manipulation.',