From 28068431d9486645f4445987d1995c28ee875b27 Mon Sep 17 00:00:00 2001 From: Christian Gunderman Date: Fri, 10 Apr 2026 17:35:15 -0700 Subject: [PATCH] Simplify. --- packages/core/src/context/contextManager.ts | 264 +++++++++++++----- packages/core/src/context/eventBus.ts | 52 ---- packages/core/src/context/historyObserver.ts | 88 ------ packages/core/src/context/ir/projector.ts | 6 +- packages/core/src/context/pipeline.ts | 16 +- .../core/src/context/pipeline/environment.ts | 8 +- .../context/pipeline/environmentImpl.test.ts | 62 ++-- .../src/context/pipeline/environmentImpl.ts | 9 +- .../core/src/context/pipeline/inbox.test.ts | 48 ---- packages/core/src/context/pipeline/inbox.ts | 64 ----- .../src/context/pipeline/orchestrator.test.ts | 224 --------------- .../core/src/context/pipeline/orchestrator.ts | 218 --------------- .../src/context/pipeline/snapshotCache.ts | 30 ++ .../stateSnapshotAsyncProcessor.test.ts | 48 ++-- .../processors/stateSnapshotAsyncProcessor.ts | 27 +- .../processors/stateSnapshotProcessor.test.ts | 131 ++++----- .../processors/stateSnapshotProcessor.ts | 16 +- .../context/system-tests/simulationHarness.ts | 33 +-- .../src/context/testing/contextTestUtils.ts | 52 +--- 19 files changed, 354 insertions(+), 1042 deletions(-) delete mode 100644 packages/core/src/context/eventBus.ts delete mode 100644 packages/core/src/context/historyObserver.ts delete mode 100644 packages/core/src/context/pipeline/inbox.test.ts delete mode 100644 packages/core/src/context/pipeline/inbox.ts delete mode 100644 packages/core/src/context/pipeline/orchestrator.test.ts delete mode 100644 packages/core/src/context/pipeline/orchestrator.ts create mode 100644 packages/core/src/context/pipeline/snapshotCache.ts diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 818b0c6bc0..3da85471f3 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -5,147 +5,257 @@ */ import type { Content } from '@google/genai'; -import type { AgentChatHistory } from '../core/agentChatHistory.js'; +import type { AgentChatHistory, HistoryEvent } from '../core/agentChatHistory.js'; import type { ConcreteNode } from './ir/types.js'; -import type { ContextEventBus } from './eventBus.js'; import type { ContextTracer } from './tracer.js'; import type { ContextEnvironment } from './pipeline/environment.js'; import type { ContextProfile } from './config/profiles.js'; -import type { PipelineOrchestrator } from './pipeline/orchestrator.js'; -import { HistoryObserver } from './historyObserver.js'; import { IrProjector } from './ir/projector.js'; import { ContextWorkingBufferImpl } from './pipeline/contextWorkingBuffer.js'; +import type { PipelineDef, AsyncPipelineDef, PipelineTrigger } from './config/types.js'; +import { debugLogger } from '../utils/debugLogger.js'; export class ContextManager { - // The master state containing the pristine graph and current active graph. - private buffer: ContextWorkingBufferImpl = - ContextWorkingBufferImpl.initialize([]); + private buffer: ContextWorkingBufferImpl = ContextWorkingBufferImpl.initialize([]); private pristineNodes: readonly ConcreteNode[] = []; - - private readonly eventBus: ContextEventBus; - - // Internal sub-components - private readonly orchestrator: PipelineOrchestrator; - private readonly historyObserver: HistoryObserver; + + private unsubscribeHistory?: () => void; + private seenNodeIds = new Set(); + private activeTimers: NodeJS.Timeout[] = []; + + private pipelines: PipelineDef[] = []; + private asyncPipelines: AsyncPipelineDef[] = []; constructor( private readonly sidecar: ContextProfile, private readonly env: ContextEnvironment, private readonly tracer: ContextTracer, - orchestrator: PipelineOrchestrator, - chatHistory: AgentChatHistory, + private readonly chatHistory: AgentChatHistory, ) { - this.eventBus = env.eventBus; - this.orchestrator = orchestrator; + this.pipelines = sidecar.buildPipelines(env); + this.asyncPipelines = sidecar.buildAsyncPipelines(env); + this.setupTriggers(); + this.startHistoryObserver(); + } - this.historyObserver = new HistoryObserver( - chatHistory, - this.env.eventBus, - this.tracer, - this.env.tokenCalculator, - this.env.irMapper, - ); - this.historyObserver.start(); + private startHistoryObserver() { + this.unsubscribeHistory = this.chatHistory.subscribe((_event: HistoryEvent) => { + const pristineEpisodes = this.env.irMapper.toIr( + this.chatHistory.get(), + this.env.tokenCalculator, + ); - this.eventBus.onPristineHistoryUpdated((event) => { - this.pristineNodes = event.nodes; + const nodes: ConcreteNode[] = []; + for (const ep of pristineEpisodes) { + if (ep.concreteNodes) { + for (const child of ep.concreteNodes) { + nodes.push(child); + } + } + } + + const newNodes = new Set(); + for (const node of nodes) { + if (!this.seenNodeIds.has(node.id)) { + newNodes.add(node.id); + this.seenNodeIds.add(node.id); + } + } + + this.tracer.logEvent( + 'ContextManager', + 'Rebuilt pristine graph from chat history update', + { nodesSize: nodes.length, newNodesCount: newNodes.size }, + ); + + this.pristineNodes = nodes; const existingIds = new Set(this.buffer.nodes.map((n) => n.id)); - const addedNodes = event.nodes.filter((n) => !existingIds.has(n.id)); + const addedNodes = nodes.filter((n) => !existingIds.has(n.id)); if (addedNodes.length > 0) { this.buffer = this.buffer.appendPristineNodes(addedNodes); } - this.evaluateTriggers(event.newNodes); + this.evaluateTriggers(newNodes); }); } - /** - * Safely stops background async pipelines and clears event listeners. - */ - shutdown() { - this.orchestrator.shutdown(); - this.historyObserver.stop(); + private setupTriggers() { + // In V1, background timers were set up here. + for (const pipeline of this.pipelines) { + for (const trigger of pipeline.triggers) { + if (typeof trigger === 'object' && trigger.type === 'timer') { + const timer = setInterval(() => {}, trigger.intervalMs); + this.activeTimers.push(timer); + } + } + } } - /** - * Evaluates if the current working buffer exceeds configured budget thresholds, - * firing consolidation events if necessary. - */ private evaluateTriggers(newNodes: Set) { if (!this.sidecar.config.budget) return; if (newNodes.size > 0) { - this.eventBus.emitChunkReceived({ - nodes: this.buffer.nodes, - targetNodeIds: newNodes, - }); + this.executeTrigger('new_message', newNodes); + this.executeTrigger('nodes_added', newNodes); } - const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens( - this.buffer.nodes, - ); + const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens(this.buffer.nodes); if (currentTokens > this.sidecar.config.budget.retainedTokens) { const agedOutNodes = new Set(); let rollingTokens = 0; - // Walk backwards finding nodes that fall out of the retained budget for (let i = this.buffer.nodes.length - 1; i >= 0; i--) { const node = this.buffer.nodes[i]; - rollingTokens += this.env.tokenCalculator.calculateConcreteListTokens([ - node, - ]); + rollingTokens += this.env.tokenCalculator.calculateConcreteListTokens([node]); if (rollingTokens > this.sidecar.config.budget.retainedTokens) { agedOutNodes.add(node.id); } } if (agedOutNodes.size > 0) { - this.eventBus.emitConsolidationNeeded({ - nodes: this.buffer.nodes, - targetDeficit: - currentTokens - this.sidecar.config.budget.retainedTokens, - targetNodeIds: agedOutNodes, - }); + this.executeTrigger('retained_exceeded', agedOutNodes); + this.executeTrigger('nodes_aged_out', agedOutNodes); } } } - /** - * Retrieves the raw, uncompressed Episodic IR graph. - * Useful for internal tool rendering (like the trace viewer). - * Note: This is an expensive, deep clone operation. - */ + private executeTrigger(trigger: PipelineTrigger, targetNodeIds: ReadonlySet) { + const triggerPipelines = this.pipelines.filter(p => p.triggers.includes(trigger)); + for (const pipeline of triggerPipelines) { + void this.executePipelineAsync(pipeline, this.buffer.nodes, targetNodeIds, new Set()); + } + + const triggerAsyncPipelines = this.asyncPipelines.filter(p => p.triggers.includes(trigger)); + for (const pipeline of triggerAsyncPipelines) { + const targets = this.buffer.nodes.filter(n => targetNodeIds.has(n.id)); + for (const processor of pipeline.processors) { + processor.process({ + targets, + snapshotCache: this.env.snapshotCache, + buffer: ContextWorkingBufferImpl.initialize(this.buffer.nodes), + }).catch(e => debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e)); + } + } + } + + async executeTriggerSync( + trigger: PipelineTrigger, + nodes: readonly ConcreteNode[], + triggerTargets: ReadonlySet, + protectedLogicalIds: ReadonlySet = new Set(), + ): Promise { + let currentBuffer = ContextWorkingBufferImpl.initialize(nodes); + const triggerPipelines = this.pipelines.filter((p) => p.triggers.includes(trigger)); + + for (const pipeline of triggerPipelines) { + for (const processor of pipeline.processors) { + try { + this.tracer.logEvent('ContextManager', `Executing processor synchronously: ${processor.id}`); + + const allowedTargets = currentBuffer.nodes.filter((n) => + this.isNodeAllowed(n, triggerTargets, protectedLogicalIds) + ); + + const returnedNodes = await processor.process({ + buffer: currentBuffer, + targets: allowedTargets, + snapshotCache: this.env.snapshotCache, + }); + + currentBuffer = currentBuffer.applyProcessorResult( + processor.id, + allowedTargets, + returnedNodes, + ); + } catch (error) { + debugLogger.error(`Synchronous processor ${processor.id} failed:`, error); + } + } + } + + return currentBuffer.nodes; + } + + private async executePipelineAsync( + pipeline: PipelineDef, + nodes: readonly ConcreteNode[], + triggerTargets: ReadonlySet, + protectedLogicalIds: ReadonlySet = new Set(), + ) { + this.tracer.logEvent('ContextManager', `Triggering async pipeline: ${pipeline.name}`); + if (!nodes || nodes.length === 0) return; + + let currentBuffer = ContextWorkingBufferImpl.initialize(nodes); + + for (const processor of pipeline.processors) { + try { + this.tracer.logEvent('ContextManager', `Executing processor: ${processor.id} (async)`); + + const allowedTargets = currentBuffer.nodes.filter((n) => + this.isNodeAllowed(n, triggerTargets, protectedLogicalIds) + ); + + const returnedNodes = await processor.process({ + buffer: currentBuffer, + targets: allowedTargets, + snapshotCache: this.env.snapshotCache, + }); + + currentBuffer = currentBuffer.applyProcessorResult( + processor.id, + allowedTargets, + returnedNodes, + ); + } catch (error) { + debugLogger.error(`Pipeline ${pipeline.name} failed async at ${processor.id}:`, error); + return; + } + } + + // Push the state to buffer + this.buffer = currentBuffer; + } + + private isNodeAllowed( + node: ConcreteNode, + triggerTargets: ReadonlySet, + protectedLogicalIds: ReadonlySet = new Set(), + ): boolean { + return ( + triggerTargets.has(node.id) && + !protectedLogicalIds.has(node.id) && + (!node.logicalParentId || !protectedLogicalIds.has(node.logicalParentId)) + ); + } + + shutdown() { + for (const timer of this.activeTimers) { + clearInterval(timer); + } + if (this.unsubscribeHistory) { + this.unsubscribeHistory(); + this.unsubscribeHistory = undefined; + } + } + getPristineGraph(): readonly ConcreteNode[] { return [...this.pristineNodes]; } - /** - * Generates a virtual view of the pristine graph, substituting in variants - * up to the configured token budget. - * This is the view that will eventually be projected back to the LLM. - */ getNodes(): readonly ConcreteNode[] { return [...this.buffer.nodes]; } - /** - * Executes the final 'gc_backstop' pipeline if necessary, enforcing the token budget, - * and maps the Episodic IR back into a raw Gemini Content[] array for transmission. - * This is the primary method called by the agent framework before sending a request. - */ async projectCompressedHistory( activeTaskIds: Set = new Set(), ): Promise { - this.tracer.logEvent( - 'ContextManager', - 'Starting projection to LLM context', - ); - // Apply final GC Backstop pressure barrier synchronously before mapping + this.tracer.logEvent('ContextManager', 'Starting projection to LLM context'); + const finalHistory = await IrProjector.project( this.buffer.nodes, - this.orchestrator, + this, this.sidecar, this.tracer, this.env, diff --git a/packages/core/src/context/eventBus.ts b/packages/core/src/context/eventBus.ts deleted file mode 100644 index 67e27fb895..0000000000 --- a/packages/core/src/context/eventBus.ts +++ /dev/null @@ -1,52 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -import { EventEmitter } from 'node:events'; -import type { ConcreteNode } from './ir/types.js'; - -export interface PristineHistoryUpdatedEvent { - nodes: readonly ConcreteNode[]; - newNodes: Set; -} - -export interface ContextConsolidationEvent { - nodes: readonly ConcreteNode[]; - targetDeficit: number; - targetNodeIds: Set; -} - -export interface IrChunkReceivedEvent { - nodes: readonly ConcreteNode[]; - targetNodeIds: Set; -} - -export class ContextEventBus extends EventEmitter { - emitPristineHistoryUpdated(event: PristineHistoryUpdatedEvent) { - this.emit('PRISTINE_HISTORY_UPDATED', event); - } - - onPristineHistoryUpdated( - listener: (event: PristineHistoryUpdatedEvent) => void, - ) { - this.on('PRISTINE_HISTORY_UPDATED', listener); - } - - emitChunkReceived(event: IrChunkReceivedEvent) { - this.emit('IR_CHUNK_RECEIVED', event); - } - - onChunkReceived(listener: (event: IrChunkReceivedEvent) => void) { - this.on('IR_CHUNK_RECEIVED', listener); - } - - emitConsolidationNeeded(event: ContextConsolidationEvent) { - this.emit('BUDGET_RETAINED_CROSSED', event); - } - - onConsolidationNeeded(listener: (event: ContextConsolidationEvent) => void) { - this.on('BUDGET_RETAINED_CROSSED', listener); - } -} diff --git a/packages/core/src/context/historyObserver.ts b/packages/core/src/context/historyObserver.ts deleted file mode 100644 index 89fa9caa45..0000000000 --- a/packages/core/src/context/historyObserver.ts +++ /dev/null @@ -1,88 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -import type { - AgentChatHistory, - HistoryEvent, -} from '../core/agentChatHistory.js'; -import type { IrMapper } from './ir/mapper.js'; -import type { ContextTokenCalculator } from './utils/contextTokenCalculator.js'; -import type { ContextEventBus } from './eventBus.js'; -import type { ContextTracer } from './tracer.js'; - -import type { ConcreteNode } from './ir/types.js'; - -/** - * Connects the raw AgentChatHistory to the ContextManager. - * It maps raw messages into Episodic Intermediate Representation (IR) - * and evaluates background triggers whenever history changes. - */ -export class HistoryObserver { - private unsubscribeHistory?: () => void; - - private seenNodeIds = new Set(); - - constructor( - private readonly chatHistory: AgentChatHistory, - private readonly eventBus: ContextEventBus, - private readonly tracer: ContextTracer, - private readonly tokenCalculator: ContextTokenCalculator, - private readonly irMapper: IrMapper, - ) {} - - start() { - if (this.unsubscribeHistory) { - this.unsubscribeHistory(); - } - - this.unsubscribeHistory = this.chatHistory.subscribe( - (_event: HistoryEvent) => { - // Rebuild the pristine IR graph from the full source history on every change. - // Wait, toIr still returns an Episode[]. - // We actually need to map the Episode[] to a flat ConcreteNode[] here to form the 'nodes'. - const pristineEpisodes = this.irMapper.toIr( - this.chatHistory.get(), - this.tokenCalculator, - ); - - const nodes: ConcreteNode[] = []; - for (const ep of pristineEpisodes) { - if (ep.concreteNodes) { - for (const child of ep.concreteNodes) { - nodes.push(child); - } - } - } - - const newNodes = new Set(); - for (const node of nodes) { - if (!this.seenNodeIds.has(node.id)) { - newNodes.add(node.id); - this.seenNodeIds.add(node.id); - } - } - - this.tracer.logEvent( - 'HistoryObserver', - 'Rebuilt pristine graph from chat history update', - { nodesSize: nodes.length, newNodesCount: newNodes.size }, - ); - - this.eventBus.emitPristineHistoryUpdated({ - nodes, - newNodes, - }); - }, - ); - } - - stop() { - if (this.unsubscribeHistory) { - this.unsubscribeHistory(); - this.unsubscribeHistory = undefined; - } - } -} diff --git a/packages/core/src/context/ir/projector.ts b/packages/core/src/context/ir/projector.ts index 651656fe6c..e1cace0e20 100644 --- a/packages/core/src/context/ir/projector.ts +++ b/packages/core/src/context/ir/projector.ts @@ -11,7 +11,7 @@ import type { ContextEnvironment, ContextTracer, } from '../pipeline/environment.js'; -import type { PipelineOrchestrator } from '../pipeline/orchestrator.js'; +import type { ContextManager } from '../contextManager.js'; import type { ContextProfile } from '../config/profiles.js'; export class IrProjector { @@ -21,7 +21,7 @@ export class IrProjector { */ static async project( nodes: readonly ConcreteNode[], - orchestrator: PipelineOrchestrator, + contextManager: ContextManager, sidecar: ContextProfile, tracer: ContextTracer, env: ContextEnvironment, @@ -84,7 +84,7 @@ export class IrProjector { } } - const processedNodes = await orchestrator.executeTriggerSync( + const processedNodes = await contextManager.executeTriggerSync( 'gc_backstop', nodes, agedOutNodes, diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index 7cfcbcd4fc..e3f78577c9 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -6,16 +6,18 @@ import type { ConcreteNode } from './ir/types.js'; -export interface InboxMessage { +export interface SnapshotProposal { id: string; - topic: string; - payload: T; + newText: string; + consumedIds: string[]; + type: string; timestamp: number; } -export interface InboxSnapshot { - getMessages(topic: string): ReadonlyArray>; - consume(messageId: string): void; +export interface SnapshotCache { + getProposals(): ReadonlyArray; + consume(id: string): void; + publish(proposal: Omit, idGenerator: { generateId(): string }): void; } export interface GraphMutation { @@ -35,7 +37,7 @@ export interface ContextWorkingBuffer { export interface ProcessArgs { readonly buffer: ContextWorkingBuffer; readonly targets: readonly ConcreteNode[]; - readonly inbox: InboxSnapshot; + readonly snapshotCache: SnapshotCache; } /** diff --git a/packages/core/src/context/pipeline/environment.ts b/packages/core/src/context/pipeline/environment.ts index 459fc89091..c81f7d00c6 100644 --- a/packages/core/src/context/pipeline/environment.ts +++ b/packages/core/src/context/pipeline/environment.ts @@ -4,16 +4,15 @@ * SPDX-License-Identifier: Apache-2.0 */ import type { BaseLlmClient } from '../../core/baseLlmClient.js'; -import type { ContextEventBus } from '../eventBus.js'; import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; import type { ContextTracer } from '../tracer.js'; import type { IFileSystem } from '../system/IFileSystem.js'; import type { IIdGenerator } from '../system/IIdGenerator.js'; -import type { LiveInbox } from './inbox.js'; +import type { SnapshotCache } from '../pipeline.js'; import type { IrNodeBehaviorRegistry } from '../ir/behaviorRegistry.js'; import type { IrMapper } from '../ir/mapper.js'; -export type { ContextTracer, ContextEventBus }; +export type { ContextTracer }; export interface ContextEnvironment { readonly llmClient: BaseLlmClient; @@ -26,8 +25,7 @@ export interface ContextEnvironment { readonly tokenCalculator: ContextTokenCalculator; readonly fileSystem: IFileSystem; readonly idGenerator: IIdGenerator; - readonly eventBus: ContextEventBus; - readonly inbox: LiveInbox; + readonly snapshotCache: SnapshotCache; readonly behaviorRegistry: IrNodeBehaviorRegistry; readonly irMapper: IrMapper; } diff --git a/packages/core/src/context/pipeline/environmentImpl.test.ts b/packages/core/src/context/pipeline/environmentImpl.test.ts index eab5c32923..4478745532 100644 --- a/packages/core/src/context/pipeline/environmentImpl.test.ts +++ b/packages/core/src/context/pipeline/environmentImpl.test.ts @@ -5,68 +5,44 @@ */ import { describe, it, expect } from 'vitest'; import { ContextEnvironmentImpl } from './environmentImpl.js'; +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; import { ContextTracer } from '../tracer.js'; -import { ContextEventBus } from '../eventBus.js'; -import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js'; -import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js'; -import { createMockLlmClient } from '../testing/contextTestUtils.js'; describe('ContextEnvironmentImpl', () => { - it('should initialize with defaults correctly', () => { - const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'mock' }); - const eventBus = new ContextEventBus(); - const mockLlmClient = createMockLlmClient(); + it('should initialize with provided dependencies and default optional ones', () => { + // Mock required dependencies + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const llmClient = {} as BaseLlmClient; + const tracer = new ContextTracer({ + targetDir: '/tmp', + sessionId: 'test-session', + }); const env = new ContextEnvironmentImpl( - mockLlmClient, - 'mock-session', - 'mock-prompt', + llmClient, + 'test-session', + 'test-prompt-id', '/tmp/trace', '/tmp/temp', tracer, 4, - eventBus, ); - expect(env.llmClient).toBe(mockLlmClient); - expect(env.sessionId).toBe('mock-session'); - expect(env.promptId).toBe('mock-prompt'); + // Verify injected properties + expect(env.llmClient).toBe(llmClient); + expect(env.sessionId).toBe('test-session'); + expect(env.promptId).toBe('test-prompt-id'); expect(env.traceDir).toBe('/tmp/trace'); expect(env.projectTempDir).toBe('/tmp/temp'); expect(env.tracer).toBe(tracer); expect(env.charsPerToken).toBe(4); - expect(env.eventBus).toBe(eventBus); - // Default internals - expect(env.behaviorRegistry).toBeDefined(); + // Verify default initialized properties expect(env.tokenCalculator).toBeDefined(); expect(env.fileSystem).toBeDefined(); expect(env.idGenerator).toBeDefined(); - expect(env.inbox).toBeDefined(); + expect(env.snapshotCache).toBeDefined(); + expect(env.behaviorRegistry).toBeDefined(); expect(env.irMapper).toBeDefined(); }); - - it('should initialize with provided overrides', () => { - const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'mock' }); - const eventBus = new ContextEventBus(); - const mockLlmClient = createMockLlmClient(); - const fileSystem = new InMemoryFileSystem(); - const idGenerator = new DeterministicIdGenerator('test-'); - - const env = new ContextEnvironmentImpl( - mockLlmClient, - 'mock-session', - 'mock-prompt', - '/tmp/trace', - '/tmp/temp', - tracer, - 4, - eventBus, - fileSystem, - idGenerator, - ); - - expect(env.fileSystem).toBe(fileSystem); - expect(env.idGenerator).toBe(idGenerator); - }); }); diff --git a/packages/core/src/context/pipeline/environmentImpl.ts b/packages/core/src/context/pipeline/environmentImpl.ts index 32a16adbff..84ac4b32bb 100644 --- a/packages/core/src/context/pipeline/environmentImpl.ts +++ b/packages/core/src/context/pipeline/environmentImpl.ts @@ -7,22 +7,22 @@ import type { BaseLlmClient } from '../../core/baseLlmClient.js'; import type { ContextTracer } from '../tracer.js'; import type { ContextEnvironment } from './environment.js'; -import type { ContextEventBus } from '../eventBus.js'; import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; import type { IFileSystem } from '../system/IFileSystem.js'; import { NodeFileSystem } from '../system/NodeFileSystem.js'; import type { IIdGenerator } from '../system/IIdGenerator.js'; import { NodeIdGenerator } from '../system/NodeIdGenerator.js'; -import { LiveInbox } from './inbox.js'; +import { LiveSnapshotCache } from './snapshotCache.js'; import { IrNodeBehaviorRegistry } from '../ir/behaviorRegistry.js'; import { registerBuiltInBehaviors } from '../ir/builtinBehaviors.js'; import { IrMapper } from '../ir/mapper.js'; +import type { SnapshotCache } from '../pipeline.js'; export class ContextEnvironmentImpl implements ContextEnvironment { readonly tokenCalculator: ContextTokenCalculator; readonly fileSystem: IFileSystem; readonly idGenerator: IIdGenerator; - readonly inbox: LiveInbox; + readonly snapshotCache: SnapshotCache; readonly behaviorRegistry: IrNodeBehaviorRegistry; readonly irMapper: IrMapper; @@ -34,7 +34,6 @@ export class ContextEnvironmentImpl implements ContextEnvironment { readonly projectTempDir: string, readonly tracer: ContextTracer, readonly charsPerToken: number, - readonly eventBus: ContextEventBus, fileSystem?: IFileSystem, idGenerator?: IIdGenerator, ) { @@ -46,7 +45,7 @@ export class ContextEnvironmentImpl implements ContextEnvironment { ); this.fileSystem = fileSystem || new NodeFileSystem(); this.idGenerator = idGenerator || new NodeIdGenerator(); - this.inbox = new LiveInbox(); + this.snapshotCache = new LiveSnapshotCache(); this.irMapper = new IrMapper(this.behaviorRegistry, this.idGenerator); } } diff --git a/packages/core/src/context/pipeline/inbox.test.ts b/packages/core/src/context/pipeline/inbox.test.ts deleted file mode 100644 index 3c327d2198..0000000000 --- a/packages/core/src/context/pipeline/inbox.test.ts +++ /dev/null @@ -1,48 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ -import { describe, it, expect } from 'vitest'; -import { LiveInbox, InboxSnapshotImpl } from './inbox.js'; -import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js'; - -describe('Inbox', () => { - it('should publish messages and provide snapshots', () => { - const inbox = new LiveInbox(); - const idGenerator = new DeterministicIdGenerator('mock-uuid-'); - - inbox.publish('test-topic', { data: 'hello' }, idGenerator); - inbox.publish('other-topic', { data: 'world' }, idGenerator); - - const messages = inbox.getMessages(); - expect(messages.length).toBe(2); - expect(messages[0].topic).toBe('test-topic'); - expect(messages[0].payload).toEqual({ data: 'hello' }); - }); - - it('should drain consumed messages from the snapshot', () => { - const inbox = new LiveInbox(); - const idGenerator = new DeterministicIdGenerator('mock-uuid-'); - - inbox.publish('test-topic', { data: 'hello' }, idGenerator); - inbox.publish('other-topic', { data: 'world' }, idGenerator); - - const messages = inbox.getMessages(); - const snapshot = new InboxSnapshotImpl(messages); - - const filtered = snapshot.getMessages<{ data: string }>('test-topic'); - expect(filtered.length).toBe(1); - expect(filtered[0].payload.data).toBe('hello'); - - // Consume the message - snapshot.consume(filtered[0].id); - - // Provide the consumed IDs to the real inbox to drain them - inbox.drainConsumed(snapshot.getConsumedIds()); - - const finalMessages = inbox.getMessages(); - expect(finalMessages.length).toBe(1); - expect(finalMessages[0].topic).toBe('other-topic'); - }); -}); diff --git a/packages/core/src/context/pipeline/inbox.ts b/packages/core/src/context/pipeline/inbox.ts deleted file mode 100644 index 87c32c2b59..0000000000 --- a/packages/core/src/context/pipeline/inbox.ts +++ /dev/null @@ -1,64 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ -import type { InboxMessage, InboxSnapshot } from '../pipeline.js'; - -export class LiveInbox { - private messages: InboxMessage[] = []; - - publish( - topic: string, - payload: T, - idGenerator: { generateId(): string }, - ): void { - this.messages.push({ - id: idGenerator.generateId(), - topic, - payload, - timestamp: Date.now(), - }); - } - - getMessages(): readonly InboxMessage[] { - return [...this.messages]; - } - - drainConsumed(consumedIds: Set): void { - this.messages = this.messages.filter((m) => !consumedIds.has(m.id)); - } -} - -export class InboxSnapshotImpl implements InboxSnapshot { - private messages: readonly InboxMessage[]; - private consumedIds = new Set(); - - constructor(messages: readonly InboxMessage[]) { - this.messages = messages; - } - - getMessages(topic: string): ReadonlyArray> { - const raw = this.messages.filter((m) => m.topic === topic); - /* - * Architectural Justification for Unchecked Cast: - * The Inbox is a heterogeneous event bus designed to support arbitrary, declarative - * routing via configuration files (where topics are just strings). Because TypeScript - * completely erases generic type information () at runtime, the central array - * can only hold `unknown` payloads. To enforce strict type safety without a central - * registry (which would break decoupling) or heavy runtime validation (Zod schemas), - * we must assert the type boundary here. The contract relies on the async pipeline and Processor - * agreeing on the payload structure associated with the configured topic string. - */ - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - return raw as ReadonlyArray>; - } - - consume(messageId: string): void { - this.consumedIds.add(messageId); - } - - getConsumedIds(): Set { - return this.consumedIds; - } -} diff --git a/packages/core/src/context/pipeline/orchestrator.test.ts b/packages/core/src/context/pipeline/orchestrator.test.ts deleted file mode 100644 index f3647000a1..0000000000 --- a/packages/core/src/context/pipeline/orchestrator.test.ts +++ /dev/null @@ -1,224 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -import assert from 'node:assert'; -import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; -import { PipelineOrchestrator } from './orchestrator.js'; -import { - createMockEnvironment, - createDummyNode, -} from '../testing/contextTestUtils.js'; -import type { ContextEnvironment } from './environment.js'; -import type { - ContextProcessor, - AsyncContextProcessor, - ProcessArgs, -} from '../pipeline.js'; -import type { PipelineDef, AsyncPipelineDef } from '../config/types.js'; -import type { ContextEventBus } from '../eventBus.js'; -import type { ConcreteNode, UserPrompt } from '../ir/types.js'; - -// A realistic mock processor that modifies the text of the first target node -function createModifyingProcessor(id: string): ContextProcessor { - return { - id, - name: 'ModifyingProcessor', - process: async (args: ProcessArgs) => { - const newTargets = [...args.targets]; - if (newTargets.length > 0 && newTargets[0].type === 'USER_PROMPT') { - const prompt = newTargets[0]; - const newParts = [...prompt.semanticParts]; - if (newParts.length > 0 && newParts[0].type === 'text') { - newParts[0] = { - ...newParts[0], - text: newParts[0].text + ' [modified]', - }; - } - newTargets[0] = { - ...prompt, - id: prompt.id + '-modified', - replacesId: prompt.id, - semanticParts: newParts, - }; - } - return newTargets; - }, - }; -} - -// A processor that just throws an error -function createThrowingProcessor(id: string): ContextProcessor { - return { - id, - name: 'Throwing', - process: async (): Promise => { - throw new Error('Processor failed intentionally'); - }, - }; -} - -// A mock async processor that signals it ran -function createMockAsyncProcessor( - id: string, - executeSpy: ReturnType, -): AsyncContextProcessor { - return { - id, - name: 'MockAsyncProcessor', - process: async (args: ProcessArgs) => { - executeSpy(args); - }, - }; -} - -describe('PipelineOrchestrator (Component)', () => { - let env: ContextEnvironment; - let eventBus: ContextEventBus; - - beforeEach(() => { - env = createMockEnvironment(); - eventBus = env.eventBus; - }); - - afterEach(() => { - vi.restoreAllMocks(); - }); - - const setupOrchestrator = ( - pipelines: PipelineDef[], - asyncPipelines: AsyncPipelineDef[] = [], - ) => { - const orchestrator = new PipelineOrchestrator( - pipelines, - asyncPipelines, - env, - eventBus, - env.tracer, - ); - return orchestrator; - }; - - describe('Synchronous Pipeline Execution', () => { - it('applies processors in sequence on matching trigger', async () => { - const pipelines: PipelineDef[] = [ - { - name: 'TestPipeline', - triggers: ['new_message'], - processors: [createModifyingProcessor('Mod')], - }, - ]; - - const orchestrator = setupOrchestrator(pipelines); - const originalNode = createDummyNode('ep1', 'USER_PROMPT', 50, { - semanticParts: [{ type: 'text', text: 'Original' }], - }); - - const processed = await orchestrator.executeTriggerSync( - 'new_message', - [originalNode], - new Set([originalNode.id]), - new Set(), - ); - - expect(processed.length).toBe(1); - const resultingNode = processed[0] as UserPrompt; - assert(resultingNode.semanticParts[0].type === 'text'); - expect(resultingNode.semanticParts[0].text).toBe('Original [modified]'); - expect(resultingNode.replacesId).toBe(originalNode.id); - }); - - it('bypasses pipelines that do not match the trigger', async () => { - const pipelines: PipelineDef[] = [ - { - name: 'TestPipeline', - triggers: ['gc_backstop'], // Different trigger - processors: [createModifyingProcessor('Mod')], - }, - ]; - - const orchestrator = setupOrchestrator(pipelines); - const originalNode = createDummyNode('ep1', 'USER_PROMPT', 50, { - semanticParts: [{ type: 'text', text: 'Original' }], - }); - - const processed = await orchestrator.executeTriggerSync( - 'new_message', - [originalNode], - new Set([originalNode.id]), - new Set(), - ); - - expect(processed).toEqual([originalNode]); // Untouched - }); - - it('gracefully handles a failing processor without crashing the pipeline', async () => { - const pipelines: PipelineDef[] = [ - { - name: 'FailingPipeline', - triggers: ['new_message'], - processors: [ - createThrowingProcessor('Thrower'), - createModifyingProcessor('Mod'), - ], - }, - ]; - - const orchestrator = setupOrchestrator(pipelines); - const originalNode = createDummyNode('ep1', 'USER_PROMPT', 50, { - semanticParts: [{ type: 'text', text: 'Original' }], - }); - - // The throwing processor should be caught and logged, allowing Mod to still run. - const processed = await orchestrator.executeTriggerSync( - 'new_message', - [originalNode], - new Set([originalNode.id]), - new Set(), - ); - - expect(processed.length).toBe(1); - const resultingNode = processed[0] as UserPrompt; - assert(resultingNode.semanticParts[0].type === 'text'); - expect(resultingNode.semanticParts[0].text).toBe('Original [modified]'); - }); - }); - - describe('Asynchronous async pipeline Events', () => { - it('routes emitChunkReceived to async pipelines with nodes_added trigger', async () => { - const executeSpy = vi.fn(); - const asyncProcessor = createMockAsyncProcessor( - 'MyAsyncProcessor', - executeSpy, - ); - - setupOrchestrator( - [], - [ - { - name: 'TestAsync', - triggers: ['nodes_added'], - processors: [asyncProcessor], - }, - ], - ); - - const node1 = createDummyNode('ep1', 'USER_PROMPT', 10); - const node2 = createDummyNode('ep1', 'AGENT_THOUGHT', 20); - - eventBus.emitChunkReceived({ - nodes: [node1, node2], - targetNodeIds: new Set([node2.id]), - }); - - // Yield event loop - await new Promise((resolve) => setTimeout(resolve, 0)); - - expect(executeSpy).toHaveBeenCalledTimes(1); - const callArgs = executeSpy.mock.calls[0][0]; - expect(callArgs.targets).toEqual([node2]); // AsyncProcessors only get the target nodes - }); - }); -}); diff --git a/packages/core/src/context/pipeline/orchestrator.ts b/packages/core/src/context/pipeline/orchestrator.ts deleted file mode 100644 index ee9a6f5e39..0000000000 --- a/packages/core/src/context/pipeline/orchestrator.ts +++ /dev/null @@ -1,218 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -import type { ConcreteNode } from '../ir/types.js'; -import type { - AsyncPipelineDef, - PipelineDef, - PipelineTrigger, -} from '../config/types.js'; -import type { - ContextEnvironment, - ContextEventBus, - ContextTracer, -} from './environment.js'; -import { debugLogger } from '../../utils/debugLogger.js'; -import { InboxSnapshotImpl } from './inbox.js'; -import { ContextWorkingBufferImpl } from './contextWorkingBuffer.js'; - -export class PipelineOrchestrator { - private activeTimers: NodeJS.Timeout[] = []; - - constructor( - private readonly pipelines: PipelineDef[], - private readonly asyncPipelines: AsyncPipelineDef[], - private readonly env: ContextEnvironment, - private readonly eventBus: ContextEventBus, - private readonly tracer: ContextTracer, - ) { - this.setupTriggers(); - } - - private isNodeAllowed( - node: ConcreteNode, - triggerTargets: ReadonlySet, - protectedLogicalIds: ReadonlySet = new Set(), - ): boolean { - return ( - triggerTargets.has(node.id) && - !protectedLogicalIds.has(node.id) && - (!node.logicalParentId || !protectedLogicalIds.has(node.logicalParentId)) - ); - } - - private setupTriggers() { - const bindTriggers =

( - pipelines: P[], - executeFn: ( - pipeline: P, - nodes: readonly ConcreteNode[], - targets: ReadonlySet, - protectedIds: ReadonlySet, - ) => void, - ) => { - for (const pipeline of pipelines) { - for (const trigger of pipeline.triggers) { - if (typeof trigger === 'object' && trigger.type === 'timer') { - const timer = setInterval(() => { - // Background timers not fully implemented in V1 yet - }, trigger.intervalMs); - this.activeTimers.push(timer); - } else if ( - trigger === 'retained_exceeded' || - trigger === 'nodes_aged_out' - ) { - this.eventBus.onConsolidationNeeded((event) => { - executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); - }); - } else if (trigger === 'new_message' || trigger === 'nodes_added') { - this.eventBus.onChunkReceived((event) => { - executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); - }); - } - } - } - }; - - bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => { - void this.executePipelineAsync( - pipeline, - nodes, - new Set(targets), - new Set(protectedIds), - ); - }); - - bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => { - const inboxSnapshot = new InboxSnapshotImpl( - this.env.inbox.getMessages() || [], - ); - const targets = nodes.filter((n) => targetIds.has(n.id)); - for (const processor of pipeline.processors) { - processor - .process({ - targets, - inbox: inboxSnapshot, - buffer: ContextWorkingBufferImpl.initialize(nodes), - }) - .catch((e: unknown) => - debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e), - ); - } - }); - } - - shutdown() { - for (const timer of this.activeTimers) { - clearInterval(timer); - } - } - - async executeTriggerSync( - trigger: PipelineTrigger, - nodes: readonly ConcreteNode[], - triggerTargets: ReadonlySet, - protectedLogicalIds: ReadonlySet = new Set(), - ): Promise { - let currentBuffer = ContextWorkingBufferImpl.initialize(nodes); - const triggerPipelines = this.pipelines.filter((p) => - p.triggers.includes(trigger), - ); - - // Freeze the inbox for this pipeline run - const inboxSnapshot = new InboxSnapshotImpl( - this.env.inbox.getMessages() || [], - ); - - for (const pipeline of triggerPipelines) { - for (const processor of pipeline.processors) { - try { - this.tracer.logEvent( - 'Orchestrator', - `Executing processor synchronously: ${processor.id}`, - ); - - const allowedTargets = currentBuffer.nodes.filter((n) => - this.isNodeAllowed(n, triggerTargets, protectedLogicalIds), - ); - - const returnedNodes = await processor.process({ - buffer: currentBuffer, - targets: allowedTargets, - inbox: inboxSnapshot, - }); - - currentBuffer = currentBuffer.applyProcessorResult( - processor.id, - allowedTargets, - returnedNodes, - ); - } catch (error) { - debugLogger.error( - `Synchronous processor ${processor.id} failed:`, - error, - ); - } - } - } - - // Success! Drain consumed messages - this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds()); - - return currentBuffer.nodes; - } - - private async executePipelineAsync( - pipeline: PipelineDef, - nodes: readonly ConcreteNode[], - triggerTargets: Set, - protectedLogicalIds: ReadonlySet = new Set(), - ) { - this.tracer.logEvent( - 'Orchestrator', - `Triggering async pipeline: ${pipeline.name}`, - ); - if (!nodes || nodes.length === 0) return; - - let currentBuffer = ContextWorkingBufferImpl.initialize(nodes); - const inboxSnapshot = new InboxSnapshotImpl( - this.env.inbox.getMessages() || [], - ); - - for (const processor of pipeline.processors) { - try { - this.tracer.logEvent( - 'Orchestrator', - `Executing processor: ${processor.id} (async)`, - ); - - const allowedTargets = currentBuffer.nodes.filter((n) => - this.isNodeAllowed(n, triggerTargets, protectedLogicalIds), - ); - - const returnedNodes = await processor.process({ - buffer: currentBuffer, - targets: allowedTargets, - inbox: inboxSnapshot, - }); - - currentBuffer = currentBuffer.applyProcessorResult( - processor.id, - allowedTargets, - returnedNodes, - ); - } catch (error) { - debugLogger.error( - `Pipeline ${pipeline.name} failed async at ${processor.id}:`, - error, - ); - return; - } - } - - this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds()); - } -} diff --git a/packages/core/src/context/pipeline/snapshotCache.ts b/packages/core/src/context/pipeline/snapshotCache.ts new file mode 100644 index 0000000000..5027231767 --- /dev/null +++ b/packages/core/src/context/pipeline/snapshotCache.ts @@ -0,0 +1,30 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import type { SnapshotProposal, SnapshotCache } from '../pipeline.js'; + +export class LiveSnapshotCache implements SnapshotCache { + private proposals: SnapshotProposal[] = []; + private consumedIds = new Set(); + + publish( + proposal: Omit, + idGenerator: { generateId(): string }, + ): void { + this.proposals.push({ + ...proposal, + id: idGenerator.generateId(), + timestamp: Date.now(), + }); + } + + getProposals(): readonly SnapshotProposal[] { + return this.proposals.filter((p) => !this.consumedIds.has(p.id)); + } + + consume(proposalId: string): void { + this.consumedIds.add(proposalId); + } +} diff --git a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts index b1217f0166..a5307720bf 100644 --- a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts @@ -10,15 +10,11 @@ import { createDummyNode, createMockProcessArgs, } from '../testing/contextTestUtils.js'; -import type { InboxMessage } from '../pipeline.js'; -import type { InboxSnapshotImpl } from '../pipeline/inbox.js'; describe('StateSnapshotAsyncProcessor', () => { - it('should generate a snapshot and publish it to the inbox', async () => { + it('should generate a snapshot and publish it to the cache', async () => { const env = createMockEnvironment(); - // Spy on the publish method - const publishSpy = vi.spyOn(env.inbox, 'publish'); - + const worker = createStateSnapshotAsyncProcessor( 'StateSnapshotAsyncProcessor', env, @@ -29,14 +25,16 @@ describe('StateSnapshotAsyncProcessor', () => { const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); const targets = [nodeA, nodeB]; - await worker.process(createMockProcessArgs(targets, targets, [])); + const args = createMockProcessArgs(targets, targets, []); + const publishSpy = vi.spyOn(args.snapshotCache, 'publish'); + + await worker.process(args); // Ensure generateContent was called expect(env.llmClient.generateContent).toHaveBeenCalled(); - // Verify it published to the inbox + // Verify it published to the cache expect(publishSpy).toHaveBeenCalledWith( - 'PROPOSED_SNAPSHOT', expect.objectContaining({ newText: 'Mock LLM summary response', consumedIds: ['node-A', 'node-B'], @@ -46,11 +44,8 @@ describe('StateSnapshotAsyncProcessor', () => { ); }); - it('should pull previous accumulate snapshot from inbox and append new targets', async () => { + it('should pull previous accumulate snapshot from cache and append new targets', async () => { const env = createMockEnvironment(); - const publishSpy = vi.spyOn(env.inbox, 'publish'); - const drainSpy = vi.spyOn(env.inbox, 'drainConsumed'); - const worker = createStateSnapshotAsyncProcessor( 'StateSnapshotAsyncProcessor', env, @@ -60,32 +55,27 @@ describe('StateSnapshotAsyncProcessor', () => { const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); const targets = [nodeC]; - const inboxMessages: InboxMessage[] = [ + const proposals = [ { id: 'draft-1', - topic: 'PROPOSED_SNAPSHOT', timestamp: Date.now() - 1000, - payload: { - consumedIds: ['node-A', 'node-B'], - newText: '', - type: 'accumulate', - }, + consumedIds: ['node-A', 'node-B'], + newText: '', + type: 'accumulate', }, ]; - const args = createMockProcessArgs(targets, targets, inboxMessages); + const args = createMockProcessArgs(targets, targets, proposals); + const publishSpy = vi.spyOn(args.snapshotCache, 'publish'); + const consumeSpy = vi.spyOn(args.snapshotCache, 'consume'); await worker.process(args); // The old draft should be consumed - expect( - (args.inbox as InboxSnapshotImpl).getConsumedIds().has('draft-1'), - ).toBe(true); - expect(drainSpy).toHaveBeenCalledWith(expect.any(Set)); + expect(consumeSpy).toHaveBeenCalledWith('draft-1'); // The new publish should contain ALL consumed IDs (old + new) expect(publishSpy).toHaveBeenCalledWith( - 'PROPOSED_SNAPSHOT', expect.objectContaining({ newText: 'Mock LLM summary response', consumedIds: ['node-A', 'node-B', 'node-C'], // Aggregated! @@ -112,14 +102,16 @@ describe('StateSnapshotAsyncProcessor', () => { it('should ignore empty targets', async () => { const env = createMockEnvironment(); - const publishSpy = vi.spyOn(env.inbox, 'publish'); const worker = createStateSnapshotAsyncProcessor( 'StateSnapshotAsyncProcessor', env, { type: 'accumulate' }, ); - await worker.process(createMockProcessArgs([], [], [])); + const args = createMockProcessArgs([], [], []); + const publishSpy = vi.spyOn(args.snapshotCache, 'publish'); + + await worker.process(args); expect(env.llmClient.generateContent).not.toHaveBeenCalled(); expect(publishSpy).not.toHaveBeenCalled(); diff --git a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts index c37e4ae059..405b338894 100644 --- a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts @@ -24,7 +24,7 @@ export function createStateSnapshotAsyncProcessor( return { id, name: 'StateSnapshotAsyncProcessor', - process: async ({ targets, inbox }: ProcessArgs): Promise => { + process: async ({ targets, snapshotCache }: ProcessArgs): Promise => { if (targets.length === 0) return; try { @@ -33,14 +33,10 @@ export function createStateSnapshotAsyncProcessor( const processorType = options.type ?? 'point-in-time'; if (processorType === 'accumulate') { - // Look for the most recent unconsumed accumulate snapshot in the inbox - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - }>('PROPOSED_SNAPSHOT'); + // Look for the most recent unconsumed accumulate snapshot in the cache + const proposedSnapshots = snapshotCache.getProposals(); const accumulateSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === 'accumulate', + (s) => s.type === 'accumulate', ); if (accumulateSnapshots.length > 0) { @@ -49,13 +45,10 @@ export function createStateSnapshotAsyncProcessor( (a, b) => b.timestamp - a.timestamp, )[0]; - // Consume the old draft so the inbox doesn't fill up with stale drafts - inbox.consume(latest.id); - // And we must persist its consumption back to the live inbox immediately, - // because we are effectively "taking" it from the shelf to modify. - env.inbox.drainConsumed(new Set([latest.id])); + // Consume the old draft so the cache doesn't fill up with stale drafts + snapshotCache.consume(latest.id); - previousConsumedIds = latest.payload.consumedIds; + previousConsumedIds = latest.consumedIds; // Prepend a synthetic node representing the previous rolling state const previousStateNode: ConcreteNode = { @@ -63,7 +56,7 @@ export function createStateSnapshotAsyncProcessor( logicalParentId: '', type: 'SNAPSHOT', timestamp: latest.timestamp, - text: latest.payload.newText, + text: latest.newText, }; nodesToSummarize = [previousStateNode, ...targets]; @@ -80,9 +73,7 @@ export function createStateSnapshotAsyncProcessor( ...targets.map((t) => t.id), ]; - // In V2, async pipelines communicate their work to the inbox, and the processor picks it up. - env.inbox.publish( - 'PROPOSED_SNAPSHOT', + snapshotCache.publish( { newText: snapshotText, consumedIds: newConsumedIds, diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts index d3eb53dc8a..88da3ee308 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts @@ -3,129 +3,96 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect } from 'vitest'; +import { describe, it, expect, vi } from 'vitest'; import { createStateSnapshotProcessor } from './stateSnapshotProcessor.js'; import { createMockEnvironment, createDummyNode, createMockProcessArgs, } from '../testing/contextTestUtils.js'; -import type { InboxSnapshotImpl } from '../pipeline/inbox.js'; describe('StateSnapshotProcessor', () => { - it('should ignore if budget is satisfied', async () => { + it('should return original targets if no nodes to process', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor( - 'StateSnapshotProcessor', - env, - { - target: 'incremental', - }, - ); - const targets = [createDummyNode('ep1', 'USER_PROMPT')]; - const result = await processor.process(createMockProcessArgs(targets)); - expect(result).toBe(targets); // Strict equality + const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { + target: 'max', + }); + + const result = await processor.process(createMockProcessArgs([], [])); + expect(result).toEqual([]); }); - it('should apply a valid snapshot from the Inbox (Fast Path)', async () => { + it('should use pre-computed snapshot from cache if valid', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor( - 'StateSnapshotProcessor', - env, - { - target: 'incremental', - }, - ); + const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { + target: 'max', // implies 'accumulate' type + }); const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); - const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); - const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); + const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 50, {}, 'node-B'); + const nodeC = createDummyNode('ep1', 'TOOL_EXECUTION', 50, {}, 'node-C'); const targets = [nodeA, nodeB, nodeC]; - // The async background pipeline created a snapshot of A and B - const messages = [ + const proposals = [ { id: 'msg-1', - topic: 'PROPOSED_SNAPSHOT', timestamp: Date.now(), - payload: { - consumedIds: ['node-A', 'node-B'], - newText: '', - type: 'point-in-time', - }, + newText: 'Pre-computed summary of A and B', + consumedIds: ['node-A', 'node-B'], + type: 'accumulate', }, ]; - const processArgs = createMockProcessArgs(targets, [], messages); + const processArgs = createMockProcessArgs(targets, targets, proposals); + const consumeSpy = vi.spyOn(processArgs.snapshotCache, 'consume'); + const result = await processor.process(processArgs); - // Should remove A and B, insert Snapshot, keep C + // It should have replaced A and B with a SNAPSHOT, and kept C expect(result.length).toBe(2); expect(result[0].type).toBe('SNAPSHOT'); - expect(result[1].id).toBe('node-C'); + expect((result[0] as any).text).toBe('Pre-computed summary of A and B'); + expect(result[1]).toEqual(nodeC); - // Should consume the message - expect( - (processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1'), - ).toBe(true); + // The message should be consumed + expect(consumeSpy).toHaveBeenCalledWith('msg-1'); }); - it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => { + it('should fall back to synchronous generation if no valid snapshot in cache', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor( - 'StateSnapshotProcessor', - env, - { - target: 'incremental', - }, - ); - // Make deficit 0 so we don't fall through to the sync backstop and fail the test that way + const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { + target: 'max', + }); - // node-A is MISSING (user deleted it) - const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); - const targets = [nodeB]; + const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); + const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 50, {}, 'node-B'); - const messages = [ + const targets = [nodeA, nodeB]; + + // Invalid snapshot (consumes a node that isn't in targets) + const proposals = [ { id: 'msg-1', - topic: 'PROPOSED_SNAPSHOT', timestamp: Date.now(), - payload: { - consumedIds: ['node-A', 'node-B'], - newText: '', - }, + newText: 'Invalid summary', + consumedIds: ['node-X'], + type: 'accumulate', }, ]; - const processArgs = createMockProcessArgs(targets, [], messages); + const processArgs = createMockProcessArgs(targets, targets, proposals); + const consumeSpy = vi.spyOn(processArgs.snapshotCache, 'consume'); + const result = await processor.process(processArgs); - // Because deficit is 0, and Inbox was rejected, nothing should change - expect(result.length).toBe(1); - expect(result[0].id).toBe('node-B'); - expect( - (processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1'), - ).toBe(false); - }); - - it('should fall back to sync backstop if inbox is empty', async () => { - const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor( - 'StateSnapshotProcessor', - env, - { target: 'max' }, - ); // Summarize all - - const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); - const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); - const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); - const targets = [nodeA, nodeB, nodeC]; - const result = await processor.process(createMockProcessArgs(targets)); - - // Should synthesize a new snapshot synchronously + // Should have generated synchronously expect(env.llmClient.generateContent).toHaveBeenCalled(); - expect(result.length).toBe(2); // nodeA is skipped as "system prompt", snapshot + nodeA - expect(result[1].type).toBe('SNAPSHOT'); + expect(result.length).toBe(1); + expect(result[0].type).toBe('SNAPSHOT'); + expect((result[0] as any).text).toBe('Mock LLM summary response'); + + // Should not have consumed the invalid message + expect(consumeSpy).not.toHaveBeenCalledWith('msg-1'); }); }); diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 9fa3389843..7b4b473537 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -28,7 +28,7 @@ export function createStateSnapshotProcessor( return { id, name: 'StateSnapshotProcessor', - process: async ({ targets, inbox }: ProcessArgs) => { + process: async ({ targets, snapshotCache }: ProcessArgs) => { if (targets.length === 0) { return targets; } @@ -38,17 +38,13 @@ export function createStateSnapshotProcessor( const expectedType = strategy === 'incremental' ? 'point-in-time' : 'accumulate'; - // 1. Check Inbox for a completed Snapshot (The Fast Path) - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - }>('PROPOSED_SNAPSHOT'); + // 1. Check cache for a completed Snapshot (The Fast Path) + const proposedSnapshots = snapshotCache.getProposals(); if (proposedSnapshots.length > 0) { // Filter for the snapshot type that matches our processor mode const matchingSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === expectedType, + (s) => s.type === expectedType, ); // Sort by newest timestamp first (we want the most accumulated snapshot) @@ -57,7 +53,7 @@ export function createStateSnapshotProcessor( ); for (const proposed of sorted) { - const { consumedIds, newText } = proposed.payload; + const { consumedIds, newText } = proposed; // Verify all consumed IDs still exist sequentially in targets const targetIds = new Set(targets.map((t) => t.id)); @@ -91,7 +87,7 @@ export function createStateSnapshotProcessor( returnedNodes.unshift(snapshotNode); } - inbox.consume(proposed.id); + snapshotCache.consume(proposed.id); return returnedNodes; } } diff --git a/packages/core/src/context/system-tests/simulationHarness.ts b/packages/core/src/context/system-tests/simulationHarness.ts index 26eec5bfa4..af5a0aa5f7 100644 --- a/packages/core/src/context/system-tests/simulationHarness.ts +++ b/packages/core/src/context/system-tests/simulationHarness.ts @@ -10,8 +10,6 @@ import type { Content } from '@google/genai'; import type { ContextProfile } from '../config/profiles.js'; import { ContextEnvironmentImpl } from '../pipeline/environmentImpl.js'; import { ContextTracer } from '../tracer.js'; -import { ContextEventBus } from '../eventBus.js'; -import { PipelineOrchestrator } from '../pipeline/orchestrator.js'; import { debugLogger } from '../../utils/debugLogger.js'; import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js'; import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js'; @@ -27,8 +25,6 @@ export class SimulationHarness { readonly chatHistory: AgentChatHistory; contextManager!: ContextManager; env!: ContextEnvironmentImpl; - orchestrator!: PipelineOrchestrator; - readonly eventBus: ContextEventBus; config!: ContextProfile; private tracer!: ContextTracer; private currentTurnIndex = 0; @@ -46,7 +42,6 @@ export class SimulationHarness { private constructor() { this.chatHistory = new AgentChatHistory(); - this.eventBus = new ContextEventBus(); } private async init( @@ -68,37 +63,22 @@ export class SimulationHarness { mockTempDir, this.tracer, 1, // 1 char per token average - this.eventBus, new InMemoryFileSystem(), new DeterministicIdGenerator(), ); - this.orchestrator = new PipelineOrchestrator( - config.buildPipelines(this.env), - config.buildAsyncPipelines(this.env), - this.env, - this.eventBus, - this.tracer, - ); this.contextManager = new ContextManager( config, this.env, this.tracer, - this.orchestrator, this.chatHistory, ); } - /** - * Simulates a single "Turn" (User input + Model/Tool outputs) - * A turn might consist of multiple Content messages (e.g. user prompt -> model call -> user response -> model answer) - */ async simulateTurn(messages: Content[]) { - // 1. Append the new messages const currentHistory = this.chatHistory.get(); this.chatHistory.set([...currentHistory, ...messages]); - // 2. Measure tokens immediately after append (Before background processing) const tokensBefore = this.env.tokenCalculator.calculateConcreteListTokens( this.contextManager.getNodes(), ); @@ -106,10 +86,9 @@ export class SimulationHarness { `[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`, ); - // 3. Yield to event loop to allow internal async subscribers and orchestrator to finish + // Yield to let internal event loops settle await new Promise((resolve) => setTimeout(resolve, 50)); - // 3.1 Simulate what projectCompressedHistory does with the sync handlers let currentView = this.contextManager.getNodes(); const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens(currentView); @@ -120,23 +99,17 @@ export class SimulationHarness { debugLogger.log( `[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.config.budget.maxTokens}`, ); - const orchestrator = this.orchestrator; - // In the V2 simulation, we trigger the 'gc_backstop' to simulate emergency pressure. - // Since contextManager owns its buffer natively, the simulation now properly matches reality - // where the manager runs the orchestrator and keeps the resulting modified view. - const modifiedView = await orchestrator.executeTriggerSync( + + const modifiedView = await this.contextManager.executeTriggerSync( 'gc_backstop', currentView, new Set(currentView.map((e) => e.id)), new Set(), ); - // In the real system, ContextManager triggers this and retains it. - // We will emulate that behavior internally in the test loop for token counting. currentView = modifiedView; } - // 4. Measure tokens after background processors have processed inboxes const tokensAfter = this.env.tokenCalculator.calculateConcreteListTokens( this.contextManager.getNodes(), ); diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index 979712389e..626b5e04db 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -14,22 +14,17 @@ import { ContextTracer } from '../tracer.js'; import { ContextEnvironmentImpl } from '../pipeline/environmentImpl.js'; import { SidecarLoader } from '../config/configLoader.js'; import { SidecarRegistry } from '../config/registry.js'; -import { ContextEventBus } from '../eventBus.js'; -import { PipelineOrchestrator } from '../pipeline/orchestrator.js'; import type { ConcreteNode, ToolExecution } from '../ir/types.js'; import type { ContextEnvironment } from '../pipeline/environment.js'; import type { Config } from '../../config/config.js'; import type { BaseLlmClient } from '../../core/baseLlmClient.js'; import type { Content, GenerateContentResponse } from '@google/genai'; -import { InboxSnapshotImpl } from '../pipeline/inbox.js'; -import type { InboxMessage, ProcessArgs } from '../pipeline.js'; +import type { SnapshotProposal, ProcessArgs } from '../pipeline.js'; import type { ContextProfile } from '../config/profiles.js'; import type { Mock } from 'vitest'; +import { LiveSnapshotCache } from '../pipeline/snapshotCache.js'; +import { ContextWorkingBufferImpl } from '../pipeline/contextWorkingBuffer.js'; -/** - * Creates a valid mock GenerateContentResponse with the provided text. - * Used to avoid having to manually construct the deeply nested candidate/content/part structure. - */ export const createMockGenerateContentResponse = ( text: string, ): GenerateContentResponse => @@ -114,7 +109,6 @@ export function createMockLlmClient( generateContentMock.mockResolvedValueOnce(response); } } - // Fallback to the last response for any subsequent calls const lastResponse = responses[responses.length - 1]; if (typeof lastResponse === 'string') { generateContentMock.mockResolvedValue( @@ -124,7 +118,6 @@ export function createMockLlmClient( generateContentMock.mockResolvedValue(lastResponse); } } else { - // Default fallback generateContentMock.mockResolvedValue( createMockGenerateContentResponse('Mock LLM response'), ); @@ -145,7 +138,6 @@ export function createMockEnvironment( targetDir: '/tmp', sessionId: 'mock-session', }); - const eventBus = new ContextEventBus(); const env = new ContextEnvironmentImpl( llmClient, @@ -155,7 +147,6 @@ export function createMockEnvironment( '/tmp/.gemini/tool-outputs', tracer, 1, - eventBus, new InMemoryFileSystem(), new DeterministicIdGenerator('mock-uuid-'), ); @@ -166,23 +157,23 @@ export function createMockEnvironment( return env; } -/** - * Creates a block of synthetic conversation history designed to consume a specific number of tokens. - * Assumes roughly 4 characters per token for standard English text. - */ -import { ContextWorkingBufferImpl } from '../pipeline/contextWorkingBuffer.js'; - export function createMockProcessArgs( targets: ConcreteNode[], bufferNodes: ConcreteNode[] = [], - inboxMessages: InboxMessage[] = [], + proposals: SnapshotProposal[] = [], ): ProcessArgs { + const cache = new LiveSnapshotCache(); + // We can just manually add the proposals for the mock + for (const p of proposals) { + (cache as any).proposals.push(p); + } + return { targets, buffer: ContextWorkingBufferImpl.initialize( bufferNodes.length ? bufferNodes : targets, ), - inbox: new InboxSnapshotImpl(inboxMessages), + snapshotCache: cache, }; } @@ -207,9 +198,6 @@ export function createSyntheticHistory( return history; } -/** - * Creates a fully mocked Config object tailored for Context Component testing. - */ export function createMockContextConfig( overrides?: Record, llmClientOverride?: unknown, @@ -236,22 +224,17 @@ export function createMockContextConfig( return { ...defaultConfig, ...overrides } as unknown as Config; } -/** - * Wires up a full ContextManager component with an AgentChatHistory and active background async pipelines. - */ - export function setupContextComponentTest( config: Config, sidecarOverride?: ContextProfile, ): { chatHistory: AgentChatHistory; contextManager: ContextManager } { const chatHistory = new AgentChatHistory(); - const registry = new SidecarRegistry(); // Provide an empty registry for tests, or one pre-filled by the caller if needed later + const registry = new SidecarRegistry(); const sidecar = sidecarOverride || SidecarLoader.fromConfig(config, registry); const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'test-session', }); - const eventBus = new ContextEventBus(); const env = new ContextEnvironmentImpl( config.getBaseLlmClient(), 'test prompt-id', @@ -260,25 +243,14 @@ export function setupContextComponentTest( '/tmp/gemini-test', tracer, 1, - eventBus, - ); - - const orchestrator = new PipelineOrchestrator( - sidecar.buildPipelines(env), - sidecar.buildAsyncPipelines(env), - env, - eventBus, - tracer, ); const contextManager = new ContextManager( sidecar, env, tracer, - orchestrator, chatHistory, ); - // The async async pipeline is now internally managed by ContextManager return { chatHistory, contextManager }; }