Simplify.

This commit is contained in:
Christian Gunderman
2026-04-10 17:35:15 -07:00
parent 70b7e66fae
commit 28068431d9
19 changed files with 354 additions and 1042 deletions
+187 -77
View File
@@ -5,147 +5,257 @@
*/
import type { Content } from '@google/genai';
import type { AgentChatHistory } from '../core/agentChatHistory.js';
import type { AgentChatHistory, HistoryEvent } from '../core/agentChatHistory.js';
import type { ConcreteNode } from './ir/types.js';
import type { ContextEventBus } from './eventBus.js';
import type { ContextTracer } from './tracer.js';
import type { ContextEnvironment } from './pipeline/environment.js';
import type { ContextProfile } from './config/profiles.js';
import type { PipelineOrchestrator } from './pipeline/orchestrator.js';
import { HistoryObserver } from './historyObserver.js';
import { IrProjector } from './ir/projector.js';
import { ContextWorkingBufferImpl } from './pipeline/contextWorkingBuffer.js';
import type { PipelineDef, AsyncPipelineDef, PipelineTrigger } from './config/types.js';
import { debugLogger } from '../utils/debugLogger.js';
export class ContextManager {
// The master state containing the pristine graph and current active graph.
private buffer: ContextWorkingBufferImpl =
ContextWorkingBufferImpl.initialize([]);
private buffer: ContextWorkingBufferImpl = ContextWorkingBufferImpl.initialize([]);
private pristineNodes: readonly ConcreteNode[] = [];
private readonly eventBus: ContextEventBus;
// Internal sub-components
private readonly orchestrator: PipelineOrchestrator;
private readonly historyObserver: HistoryObserver;
private unsubscribeHistory?: () => void;
private seenNodeIds = new Set<string>();
private activeTimers: NodeJS.Timeout[] = [];
private pipelines: PipelineDef[] = [];
private asyncPipelines: AsyncPipelineDef[] = [];
constructor(
private readonly sidecar: ContextProfile,
private readonly env: ContextEnvironment,
private readonly tracer: ContextTracer,
orchestrator: PipelineOrchestrator,
chatHistory: AgentChatHistory,
private readonly chatHistory: AgentChatHistory,
) {
this.eventBus = env.eventBus;
this.orchestrator = orchestrator;
this.pipelines = sidecar.buildPipelines(env);
this.asyncPipelines = sidecar.buildAsyncPipelines(env);
this.setupTriggers();
this.startHistoryObserver();
}
this.historyObserver = new HistoryObserver(
chatHistory,
this.env.eventBus,
this.tracer,
this.env.tokenCalculator,
this.env.irMapper,
);
this.historyObserver.start();
private startHistoryObserver() {
this.unsubscribeHistory = this.chatHistory.subscribe((_event: HistoryEvent) => {
const pristineEpisodes = this.env.irMapper.toIr(
this.chatHistory.get(),
this.env.tokenCalculator,
);
this.eventBus.onPristineHistoryUpdated((event) => {
this.pristineNodes = event.nodes;
const nodes: ConcreteNode[] = [];
for (const ep of pristineEpisodes) {
if (ep.concreteNodes) {
for (const child of ep.concreteNodes) {
nodes.push(child);
}
}
}
const newNodes = new Set<string>();
for (const node of nodes) {
if (!this.seenNodeIds.has(node.id)) {
newNodes.add(node.id);
this.seenNodeIds.add(node.id);
}
}
this.tracer.logEvent(
'ContextManager',
'Rebuilt pristine graph from chat history update',
{ nodesSize: nodes.length, newNodesCount: newNodes.size },
);
this.pristineNodes = nodes;
const existingIds = new Set(this.buffer.nodes.map((n) => n.id));
const addedNodes = event.nodes.filter((n) => !existingIds.has(n.id));
const addedNodes = nodes.filter((n) => !existingIds.has(n.id));
if (addedNodes.length > 0) {
this.buffer = this.buffer.appendPristineNodes(addedNodes);
}
this.evaluateTriggers(event.newNodes);
this.evaluateTriggers(newNodes);
});
}
/**
* Safely stops background async pipelines and clears event listeners.
*/
shutdown() {
this.orchestrator.shutdown();
this.historyObserver.stop();
private setupTriggers() {
// In V1, background timers were set up here.
for (const pipeline of this.pipelines) {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {}, trigger.intervalMs);
this.activeTimers.push(timer);
}
}
}
}
/**
* Evaluates if the current working buffer exceeds configured budget thresholds,
* firing consolidation events if necessary.
*/
private evaluateTriggers(newNodes: Set<string>) {
if (!this.sidecar.config.budget) return;
if (newNodes.size > 0) {
this.eventBus.emitChunkReceived({
nodes: this.buffer.nodes,
targetNodeIds: newNodes,
});
this.executeTrigger('new_message', newNodes);
this.executeTrigger('nodes_added', newNodes);
}
const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens(
this.buffer.nodes,
);
const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens(this.buffer.nodes);
if (currentTokens > this.sidecar.config.budget.retainedTokens) {
const agedOutNodes = new Set<string>();
let rollingTokens = 0;
// Walk backwards finding nodes that fall out of the retained budget
for (let i = this.buffer.nodes.length - 1; i >= 0; i--) {
const node = this.buffer.nodes[i];
rollingTokens += this.env.tokenCalculator.calculateConcreteListTokens([
node,
]);
rollingTokens += this.env.tokenCalculator.calculateConcreteListTokens([node]);
if (rollingTokens > this.sidecar.config.budget.retainedTokens) {
agedOutNodes.add(node.id);
}
}
if (agedOutNodes.size > 0) {
this.eventBus.emitConsolidationNeeded({
nodes: this.buffer.nodes,
targetDeficit:
currentTokens - this.sidecar.config.budget.retainedTokens,
targetNodeIds: agedOutNodes,
});
this.executeTrigger('retained_exceeded', agedOutNodes);
this.executeTrigger('nodes_aged_out', agedOutNodes);
}
}
}
/**
* Retrieves the raw, uncompressed Episodic IR graph.
* Useful for internal tool rendering (like the trace viewer).
* Note: This is an expensive, deep clone operation.
*/
private executeTrigger(trigger: PipelineTrigger, targetNodeIds: ReadonlySet<string>) {
const triggerPipelines = this.pipelines.filter(p => p.triggers.includes(trigger));
for (const pipeline of triggerPipelines) {
void this.executePipelineAsync(pipeline, this.buffer.nodes, targetNodeIds, new Set());
}
const triggerAsyncPipelines = this.asyncPipelines.filter(p => p.triggers.includes(trigger));
for (const pipeline of triggerAsyncPipelines) {
const targets = this.buffer.nodes.filter(n => targetNodeIds.has(n.id));
for (const processor of pipeline.processors) {
processor.process({
targets,
snapshotCache: this.env.snapshotCache,
buffer: ContextWorkingBufferImpl.initialize(this.buffer.nodes),
}).catch(e => debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e));
}
}
}
async executeTriggerSync(
trigger: PipelineTrigger,
nodes: readonly ConcreteNode[],
triggerTargets: ReadonlySet<string>,
protectedLogicalIds: ReadonlySet<string> = new Set(),
): Promise<readonly ConcreteNode[]> {
let currentBuffer = ContextWorkingBufferImpl.initialize(nodes);
const triggerPipelines = this.pipelines.filter((p) => p.triggers.includes(trigger));
for (const pipeline of triggerPipelines) {
for (const processor of pipeline.processors) {
try {
this.tracer.logEvent('ContextManager', `Executing processor synchronously: ${processor.id}`);
const allowedTargets = currentBuffer.nodes.filter((n) =>
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds)
);
const returnedNodes = await processor.process({
buffer: currentBuffer,
targets: allowedTargets,
snapshotCache: this.env.snapshotCache,
});
currentBuffer = currentBuffer.applyProcessorResult(
processor.id,
allowedTargets,
returnedNodes,
);
} catch (error) {
debugLogger.error(`Synchronous processor ${processor.id} failed:`, error);
}
}
}
return currentBuffer.nodes;
}
private async executePipelineAsync(
pipeline: PipelineDef,
nodes: readonly ConcreteNode[],
triggerTargets: ReadonlySet<string>,
protectedLogicalIds: ReadonlySet<string> = new Set(),
) {
this.tracer.logEvent('ContextManager', `Triggering async pipeline: ${pipeline.name}`);
if (!nodes || nodes.length === 0) return;
let currentBuffer = ContextWorkingBufferImpl.initialize(nodes);
for (const processor of pipeline.processors) {
try {
this.tracer.logEvent('ContextManager', `Executing processor: ${processor.id} (async)`);
const allowedTargets = currentBuffer.nodes.filter((n) =>
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds)
);
const returnedNodes = await processor.process({
buffer: currentBuffer,
targets: allowedTargets,
snapshotCache: this.env.snapshotCache,
});
currentBuffer = currentBuffer.applyProcessorResult(
processor.id,
allowedTargets,
returnedNodes,
);
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed async at ${processor.id}:`, error);
return;
}
}
// Push the state to buffer
this.buffer = currentBuffer;
}
private isNodeAllowed(
node: ConcreteNode,
triggerTargets: ReadonlySet<string>,
protectedLogicalIds: ReadonlySet<string> = new Set(),
): boolean {
return (
triggerTargets.has(node.id) &&
!protectedLogicalIds.has(node.id) &&
(!node.logicalParentId || !protectedLogicalIds.has(node.logicalParentId))
);
}
shutdown() {
for (const timer of this.activeTimers) {
clearInterval(timer);
}
if (this.unsubscribeHistory) {
this.unsubscribeHistory();
this.unsubscribeHistory = undefined;
}
}
getPristineGraph(): readonly ConcreteNode[] {
return [...this.pristineNodes];
}
/**
* Generates a virtual view of the pristine graph, substituting in variants
* up to the configured token budget.
* This is the view that will eventually be projected back to the LLM.
*/
getNodes(): readonly ConcreteNode[] {
return [...this.buffer.nodes];
}
/**
* Executes the final 'gc_backstop' pipeline if necessary, enforcing the token budget,
* and maps the Episodic IR back into a raw Gemini Content[] array for transmission.
* This is the primary method called by the agent framework before sending a request.
*/
async projectCompressedHistory(
activeTaskIds: Set<string> = new Set(),
): Promise<Content[]> {
this.tracer.logEvent(
'ContextManager',
'Starting projection to LLM context',
);
// Apply final GC Backstop pressure barrier synchronously before mapping
this.tracer.logEvent('ContextManager', 'Starting projection to LLM context');
const finalHistory = await IrProjector.project(
this.buffer.nodes,
this.orchestrator,
this,
this.sidecar,
this.tracer,
this.env,
-52
View File
@@ -1,52 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { EventEmitter } from 'node:events';
import type { ConcreteNode } from './ir/types.js';
export interface PristineHistoryUpdatedEvent {
nodes: readonly ConcreteNode[];
newNodes: Set<string>;
}
export interface ContextConsolidationEvent {
nodes: readonly ConcreteNode[];
targetDeficit: number;
targetNodeIds: Set<string>;
}
export interface IrChunkReceivedEvent {
nodes: readonly ConcreteNode[];
targetNodeIds: Set<string>;
}
export class ContextEventBus extends EventEmitter {
emitPristineHistoryUpdated(event: PristineHistoryUpdatedEvent) {
this.emit('PRISTINE_HISTORY_UPDATED', event);
}
onPristineHistoryUpdated(
listener: (event: PristineHistoryUpdatedEvent) => void,
) {
this.on('PRISTINE_HISTORY_UPDATED', listener);
}
emitChunkReceived(event: IrChunkReceivedEvent) {
this.emit('IR_CHUNK_RECEIVED', event);
}
onChunkReceived(listener: (event: IrChunkReceivedEvent) => void) {
this.on('IR_CHUNK_RECEIVED', listener);
}
emitConsolidationNeeded(event: ContextConsolidationEvent) {
this.emit('BUDGET_RETAINED_CROSSED', event);
}
onConsolidationNeeded(listener: (event: ContextConsolidationEvent) => void) {
this.on('BUDGET_RETAINED_CROSSED', listener);
}
}
@@ -1,88 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type {
AgentChatHistory,
HistoryEvent,
} from '../core/agentChatHistory.js';
import type { IrMapper } from './ir/mapper.js';
import type { ContextTokenCalculator } from './utils/contextTokenCalculator.js';
import type { ContextEventBus } from './eventBus.js';
import type { ContextTracer } from './tracer.js';
import type { ConcreteNode } from './ir/types.js';
/**
* Connects the raw AgentChatHistory to the ContextManager.
* It maps raw messages into Episodic Intermediate Representation (IR)
* and evaluates background triggers whenever history changes.
*/
export class HistoryObserver {
private unsubscribeHistory?: () => void;
private seenNodeIds = new Set<string>();
constructor(
private readonly chatHistory: AgentChatHistory,
private readonly eventBus: ContextEventBus,
private readonly tracer: ContextTracer,
private readonly tokenCalculator: ContextTokenCalculator,
private readonly irMapper: IrMapper,
) {}
start() {
if (this.unsubscribeHistory) {
this.unsubscribeHistory();
}
this.unsubscribeHistory = this.chatHistory.subscribe(
(_event: HistoryEvent) => {
// Rebuild the pristine IR graph from the full source history on every change.
// Wait, toIr still returns an Episode[].
// We actually need to map the Episode[] to a flat ConcreteNode[] here to form the 'nodes'.
const pristineEpisodes = this.irMapper.toIr(
this.chatHistory.get(),
this.tokenCalculator,
);
const nodes: ConcreteNode[] = [];
for (const ep of pristineEpisodes) {
if (ep.concreteNodes) {
for (const child of ep.concreteNodes) {
nodes.push(child);
}
}
}
const newNodes = new Set<string>();
for (const node of nodes) {
if (!this.seenNodeIds.has(node.id)) {
newNodes.add(node.id);
this.seenNodeIds.add(node.id);
}
}
this.tracer.logEvent(
'HistoryObserver',
'Rebuilt pristine graph from chat history update',
{ nodesSize: nodes.length, newNodesCount: newNodes.size },
);
this.eventBus.emitPristineHistoryUpdated({
nodes,
newNodes,
});
},
);
}
stop() {
if (this.unsubscribeHistory) {
this.unsubscribeHistory();
this.unsubscribeHistory = undefined;
}
}
}
+3 -3
View File
@@ -11,7 +11,7 @@ import type {
ContextEnvironment,
ContextTracer,
} from '../pipeline/environment.js';
import type { PipelineOrchestrator } from '../pipeline/orchestrator.js';
import type { ContextManager } from '../contextManager.js';
import type { ContextProfile } from '../config/profiles.js';
export class IrProjector {
@@ -21,7 +21,7 @@ export class IrProjector {
*/
static async project(
nodes: readonly ConcreteNode[],
orchestrator: PipelineOrchestrator,
contextManager: ContextManager,
sidecar: ContextProfile,
tracer: ContextTracer,
env: ContextEnvironment,
@@ -84,7 +84,7 @@ export class IrProjector {
}
}
const processedNodes = await orchestrator.executeTriggerSync(
const processedNodes = await contextManager.executeTriggerSync(
'gc_backstop',
nodes,
agedOutNodes,
+9 -7
View File
@@ -6,16 +6,18 @@
import type { ConcreteNode } from './ir/types.js';
export interface InboxMessage<T = unknown> {
export interface SnapshotProposal {
id: string;
topic: string;
payload: T;
newText: string;
consumedIds: string[];
type: string;
timestamp: number;
}
export interface InboxSnapshot {
getMessages<T = unknown>(topic: string): ReadonlyArray<InboxMessage<T>>;
consume(messageId: string): void;
export interface SnapshotCache {
getProposals(): ReadonlyArray<SnapshotProposal>;
consume(id: string): void;
publish(proposal: Omit<SnapshotProposal, 'id' | 'timestamp'>, idGenerator: { generateId(): string }): void;
}
export interface GraphMutation {
@@ -35,7 +37,7 @@ export interface ContextWorkingBuffer {
export interface ProcessArgs {
readonly buffer: ContextWorkingBuffer;
readonly targets: readonly ConcreteNode[];
readonly inbox: InboxSnapshot;
readonly snapshotCache: SnapshotCache;
}
/**
@@ -4,16 +4,15 @@
* SPDX-License-Identifier: Apache-2.0
*/
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextEventBus } from '../eventBus.js';
import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js';
import type { ContextTracer } from '../tracer.js';
import type { IFileSystem } from '../system/IFileSystem.js';
import type { IIdGenerator } from '../system/IIdGenerator.js';
import type { LiveInbox } from './inbox.js';
import type { SnapshotCache } from '../pipeline.js';
import type { IrNodeBehaviorRegistry } from '../ir/behaviorRegistry.js';
import type { IrMapper } from '../ir/mapper.js';
export type { ContextTracer, ContextEventBus };
export type { ContextTracer };
export interface ContextEnvironment {
readonly llmClient: BaseLlmClient;
@@ -26,8 +25,7 @@ export interface ContextEnvironment {
readonly tokenCalculator: ContextTokenCalculator;
readonly fileSystem: IFileSystem;
readonly idGenerator: IIdGenerator;
readonly eventBus: ContextEventBus;
readonly inbox: LiveInbox;
readonly snapshotCache: SnapshotCache;
readonly behaviorRegistry: IrNodeBehaviorRegistry;
readonly irMapper: IrMapper;
}
@@ -5,68 +5,44 @@
*/
import { describe, it, expect } from 'vitest';
import { ContextEnvironmentImpl } from './environmentImpl.js';
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import { ContextTracer } from '../tracer.js';
import { ContextEventBus } from '../eventBus.js';
import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js';
import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js';
import { createMockLlmClient } from '../testing/contextTestUtils.js';
describe('ContextEnvironmentImpl', () => {
it('should initialize with defaults correctly', () => {
const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'mock' });
const eventBus = new ContextEventBus();
const mockLlmClient = createMockLlmClient();
it('should initialize with provided dependencies and default optional ones', () => {
// Mock required dependencies
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const llmClient = {} as BaseLlmClient;
const tracer = new ContextTracer({
targetDir: '/tmp',
sessionId: 'test-session',
});
const env = new ContextEnvironmentImpl(
mockLlmClient,
'mock-session',
'mock-prompt',
llmClient,
'test-session',
'test-prompt-id',
'/tmp/trace',
'/tmp/temp',
tracer,
4,
eventBus,
);
expect(env.llmClient).toBe(mockLlmClient);
expect(env.sessionId).toBe('mock-session');
expect(env.promptId).toBe('mock-prompt');
// Verify injected properties
expect(env.llmClient).toBe(llmClient);
expect(env.sessionId).toBe('test-session');
expect(env.promptId).toBe('test-prompt-id');
expect(env.traceDir).toBe('/tmp/trace');
expect(env.projectTempDir).toBe('/tmp/temp');
expect(env.tracer).toBe(tracer);
expect(env.charsPerToken).toBe(4);
expect(env.eventBus).toBe(eventBus);
// Default internals
expect(env.behaviorRegistry).toBeDefined();
// Verify default initialized properties
expect(env.tokenCalculator).toBeDefined();
expect(env.fileSystem).toBeDefined();
expect(env.idGenerator).toBeDefined();
expect(env.inbox).toBeDefined();
expect(env.snapshotCache).toBeDefined();
expect(env.behaviorRegistry).toBeDefined();
expect(env.irMapper).toBeDefined();
});
it('should initialize with provided overrides', () => {
const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'mock' });
const eventBus = new ContextEventBus();
const mockLlmClient = createMockLlmClient();
const fileSystem = new InMemoryFileSystem();
const idGenerator = new DeterministicIdGenerator('test-');
const env = new ContextEnvironmentImpl(
mockLlmClient,
'mock-session',
'mock-prompt',
'/tmp/trace',
'/tmp/temp',
tracer,
4,
eventBus,
fileSystem,
idGenerator,
);
expect(env.fileSystem).toBe(fileSystem);
expect(env.idGenerator).toBe(idGenerator);
});
});
@@ -7,22 +7,22 @@
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextTracer } from '../tracer.js';
import type { ContextEnvironment } from './environment.js';
import type { ContextEventBus } from '../eventBus.js';
import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js';
import type { IFileSystem } from '../system/IFileSystem.js';
import { NodeFileSystem } from '../system/NodeFileSystem.js';
import type { IIdGenerator } from '../system/IIdGenerator.js';
import { NodeIdGenerator } from '../system/NodeIdGenerator.js';
import { LiveInbox } from './inbox.js';
import { LiveSnapshotCache } from './snapshotCache.js';
import { IrNodeBehaviorRegistry } from '../ir/behaviorRegistry.js';
import { registerBuiltInBehaviors } from '../ir/builtinBehaviors.js';
import { IrMapper } from '../ir/mapper.js';
import type { SnapshotCache } from '../pipeline.js';
export class ContextEnvironmentImpl implements ContextEnvironment {
readonly tokenCalculator: ContextTokenCalculator;
readonly fileSystem: IFileSystem;
readonly idGenerator: IIdGenerator;
readonly inbox: LiveInbox;
readonly snapshotCache: SnapshotCache;
readonly behaviorRegistry: IrNodeBehaviorRegistry;
readonly irMapper: IrMapper;
@@ -34,7 +34,6 @@ export class ContextEnvironmentImpl implements ContextEnvironment {
readonly projectTempDir: string,
readonly tracer: ContextTracer,
readonly charsPerToken: number,
readonly eventBus: ContextEventBus,
fileSystem?: IFileSystem,
idGenerator?: IIdGenerator,
) {
@@ -46,7 +45,7 @@ export class ContextEnvironmentImpl implements ContextEnvironment {
);
this.fileSystem = fileSystem || new NodeFileSystem();
this.idGenerator = idGenerator || new NodeIdGenerator();
this.inbox = new LiveInbox();
this.snapshotCache = new LiveSnapshotCache();
this.irMapper = new IrMapper(this.behaviorRegistry, this.idGenerator);
}
}
@@ -1,48 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect } from 'vitest';
import { LiveInbox, InboxSnapshotImpl } from './inbox.js';
import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js';
describe('Inbox', () => {
it('should publish messages and provide snapshots', () => {
const inbox = new LiveInbox();
const idGenerator = new DeterministicIdGenerator('mock-uuid-');
inbox.publish('test-topic', { data: 'hello' }, idGenerator);
inbox.publish('other-topic', { data: 'world' }, idGenerator);
const messages = inbox.getMessages();
expect(messages.length).toBe(2);
expect(messages[0].topic).toBe('test-topic');
expect(messages[0].payload).toEqual({ data: 'hello' });
});
it('should drain consumed messages from the snapshot', () => {
const inbox = new LiveInbox();
const idGenerator = new DeterministicIdGenerator('mock-uuid-');
inbox.publish('test-topic', { data: 'hello' }, idGenerator);
inbox.publish('other-topic', { data: 'world' }, idGenerator);
const messages = inbox.getMessages();
const snapshot = new InboxSnapshotImpl(messages);
const filtered = snapshot.getMessages<{ data: string }>('test-topic');
expect(filtered.length).toBe(1);
expect(filtered[0].payload.data).toBe('hello');
// Consume the message
snapshot.consume(filtered[0].id);
// Provide the consumed IDs to the real inbox to drain them
inbox.drainConsumed(snapshot.getConsumedIds());
const finalMessages = inbox.getMessages();
expect(finalMessages.length).toBe(1);
expect(finalMessages[0].topic).toBe('other-topic');
});
});
@@ -1,64 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { InboxMessage, InboxSnapshot } from '../pipeline.js';
export class LiveInbox {
private messages: InboxMessage[] = [];
publish<T>(
topic: string,
payload: T,
idGenerator: { generateId(): string },
): void {
this.messages.push({
id: idGenerator.generateId(),
topic,
payload,
timestamp: Date.now(),
});
}
getMessages(): readonly InboxMessage[] {
return [...this.messages];
}
drainConsumed(consumedIds: Set<string>): void {
this.messages = this.messages.filter((m) => !consumedIds.has(m.id));
}
}
export class InboxSnapshotImpl implements InboxSnapshot {
private messages: readonly InboxMessage[];
private consumedIds = new Set<string>();
constructor(messages: readonly InboxMessage[]) {
this.messages = messages;
}
getMessages<T = unknown>(topic: string): ReadonlyArray<InboxMessage<T>> {
const raw = this.messages.filter((m) => m.topic === topic);
/*
* Architectural Justification for Unchecked Cast:
* The Inbox is a heterogeneous event bus designed to support arbitrary, declarative
* routing via configuration files (where topics are just strings). Because TypeScript
* completely erases generic type information (<T>) at runtime, the central array
* can only hold `unknown` payloads. To enforce strict type safety without a central
* registry (which would break decoupling) or heavy runtime validation (Zod schemas),
* we must assert the type boundary here. The contract relies on the async pipeline and Processor
* agreeing on the payload structure associated with the configured topic string.
*/
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
return raw as ReadonlyArray<InboxMessage<T>>;
}
consume(messageId: string): void {
this.consumedIds.add(messageId);
}
getConsumedIds(): Set<string> {
return this.consumedIds;
}
}
@@ -1,224 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import assert from 'node:assert';
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { PipelineOrchestrator } from './orchestrator.js';
import {
createMockEnvironment,
createDummyNode,
} from '../testing/contextTestUtils.js';
import type { ContextEnvironment } from './environment.js';
import type {
ContextProcessor,
AsyncContextProcessor,
ProcessArgs,
} from '../pipeline.js';
import type { PipelineDef, AsyncPipelineDef } from '../config/types.js';
import type { ContextEventBus } from '../eventBus.js';
import type { ConcreteNode, UserPrompt } from '../ir/types.js';
// A realistic mock processor that modifies the text of the first target node
function createModifyingProcessor(id: string): ContextProcessor {
return {
id,
name: 'ModifyingProcessor',
process: async (args: ProcessArgs) => {
const newTargets = [...args.targets];
if (newTargets.length > 0 && newTargets[0].type === 'USER_PROMPT') {
const prompt = newTargets[0];
const newParts = [...prompt.semanticParts];
if (newParts.length > 0 && newParts[0].type === 'text') {
newParts[0] = {
...newParts[0],
text: newParts[0].text + ' [modified]',
};
}
newTargets[0] = {
...prompt,
id: prompt.id + '-modified',
replacesId: prompt.id,
semanticParts: newParts,
};
}
return newTargets;
},
};
}
// A processor that just throws an error
function createThrowingProcessor(id: string): ContextProcessor {
return {
id,
name: 'Throwing',
process: async (): Promise<readonly ConcreteNode[]> => {
throw new Error('Processor failed intentionally');
},
};
}
// A mock async processor that signals it ran
function createMockAsyncProcessor(
id: string,
executeSpy: ReturnType<typeof vi.fn>,
): AsyncContextProcessor {
return {
id,
name: 'MockAsyncProcessor',
process: async (args: ProcessArgs) => {
executeSpy(args);
},
};
}
describe('PipelineOrchestrator (Component)', () => {
let env: ContextEnvironment;
let eventBus: ContextEventBus;
beforeEach(() => {
env = createMockEnvironment();
eventBus = env.eventBus;
});
afterEach(() => {
vi.restoreAllMocks();
});
const setupOrchestrator = (
pipelines: PipelineDef[],
asyncPipelines: AsyncPipelineDef[] = [],
) => {
const orchestrator = new PipelineOrchestrator(
pipelines,
asyncPipelines,
env,
eventBus,
env.tracer,
);
return orchestrator;
};
describe('Synchronous Pipeline Execution', () => {
it('applies processors in sequence on matching trigger', async () => {
const pipelines: PipelineDef[] = [
{
name: 'TestPipeline',
triggers: ['new_message'],
processors: [createModifyingProcessor('Mod')],
},
];
const orchestrator = setupOrchestrator(pipelines);
const originalNode = createDummyNode('ep1', 'USER_PROMPT', 50, {
semanticParts: [{ type: 'text', text: 'Original' }],
});
const processed = await orchestrator.executeTriggerSync(
'new_message',
[originalNode],
new Set([originalNode.id]),
new Set(),
);
expect(processed.length).toBe(1);
const resultingNode = processed[0] as UserPrompt;
assert(resultingNode.semanticParts[0].type === 'text');
expect(resultingNode.semanticParts[0].text).toBe('Original [modified]');
expect(resultingNode.replacesId).toBe(originalNode.id);
});
it('bypasses pipelines that do not match the trigger', async () => {
const pipelines: PipelineDef[] = [
{
name: 'TestPipeline',
triggers: ['gc_backstop'], // Different trigger
processors: [createModifyingProcessor('Mod')],
},
];
const orchestrator = setupOrchestrator(pipelines);
const originalNode = createDummyNode('ep1', 'USER_PROMPT', 50, {
semanticParts: [{ type: 'text', text: 'Original' }],
});
const processed = await orchestrator.executeTriggerSync(
'new_message',
[originalNode],
new Set([originalNode.id]),
new Set(),
);
expect(processed).toEqual([originalNode]); // Untouched
});
it('gracefully handles a failing processor without crashing the pipeline', async () => {
const pipelines: PipelineDef[] = [
{
name: 'FailingPipeline',
triggers: ['new_message'],
processors: [
createThrowingProcessor('Thrower'),
createModifyingProcessor('Mod'),
],
},
];
const orchestrator = setupOrchestrator(pipelines);
const originalNode = createDummyNode('ep1', 'USER_PROMPT', 50, {
semanticParts: [{ type: 'text', text: 'Original' }],
});
// The throwing processor should be caught and logged, allowing Mod to still run.
const processed = await orchestrator.executeTriggerSync(
'new_message',
[originalNode],
new Set([originalNode.id]),
new Set(),
);
expect(processed.length).toBe(1);
const resultingNode = processed[0] as UserPrompt;
assert(resultingNode.semanticParts[0].type === 'text');
expect(resultingNode.semanticParts[0].text).toBe('Original [modified]');
});
});
describe('Asynchronous async pipeline Events', () => {
it('routes emitChunkReceived to async pipelines with nodes_added trigger', async () => {
const executeSpy = vi.fn();
const asyncProcessor = createMockAsyncProcessor(
'MyAsyncProcessor',
executeSpy,
);
setupOrchestrator(
[],
[
{
name: 'TestAsync',
triggers: ['nodes_added'],
processors: [asyncProcessor],
},
],
);
const node1 = createDummyNode('ep1', 'USER_PROMPT', 10);
const node2 = createDummyNode('ep1', 'AGENT_THOUGHT', 20);
eventBus.emitChunkReceived({
nodes: [node1, node2],
targetNodeIds: new Set([node2.id]),
});
// Yield event loop
await new Promise((resolve) => setTimeout(resolve, 0));
expect(executeSpy).toHaveBeenCalledTimes(1);
const callArgs = executeSpy.mock.calls[0][0];
expect(callArgs.targets).toEqual([node2]); // AsyncProcessors only get the target nodes
});
});
});
@@ -1,218 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ConcreteNode } from '../ir/types.js';
import type {
AsyncPipelineDef,
PipelineDef,
PipelineTrigger,
} from '../config/types.js';
import type {
ContextEnvironment,
ContextEventBus,
ContextTracer,
} from './environment.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { InboxSnapshotImpl } from './inbox.js';
import { ContextWorkingBufferImpl } from './contextWorkingBuffer.js';
export class PipelineOrchestrator {
private activeTimers: NodeJS.Timeout[] = [];
constructor(
private readonly pipelines: PipelineDef[],
private readonly asyncPipelines: AsyncPipelineDef[],
private readonly env: ContextEnvironment,
private readonly eventBus: ContextEventBus,
private readonly tracer: ContextTracer,
) {
this.setupTriggers();
}
private isNodeAllowed(
node: ConcreteNode,
triggerTargets: ReadonlySet<string>,
protectedLogicalIds: ReadonlySet<string> = new Set(),
): boolean {
return (
triggerTargets.has(node.id) &&
!protectedLogicalIds.has(node.id) &&
(!node.logicalParentId || !protectedLogicalIds.has(node.logicalParentId))
);
}
private setupTriggers() {
const bindTriggers = <P extends PipelineDef | AsyncPipelineDef>(
pipelines: P[],
executeFn: (
pipeline: P,
nodes: readonly ConcreteNode[],
targets: ReadonlySet<string>,
protectedIds: ReadonlySet<string>,
) => void,
) => {
for (const pipeline of pipelines) {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {
// Background timers not fully implemented in V1 yet
}, trigger.intervalMs);
this.activeTimers.push(timer);
} else if (
trigger === 'retained_exceeded' ||
trigger === 'nodes_aged_out'
) {
this.eventBus.onConsolidationNeeded((event) => {
executeFn(pipeline, event.nodes, event.targetNodeIds, new Set());
});
} else if (trigger === 'new_message' || trigger === 'nodes_added') {
this.eventBus.onChunkReceived((event) => {
executeFn(pipeline, event.nodes, event.targetNodeIds, new Set());
});
}
}
}
};
bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => {
void this.executePipelineAsync(
pipeline,
nodes,
new Set(targets),
new Set(protectedIds),
);
});
bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => {
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
const targets = nodes.filter((n) => targetIds.has(n.id));
for (const processor of pipeline.processors) {
processor
.process({
targets,
inbox: inboxSnapshot,
buffer: ContextWorkingBufferImpl.initialize(nodes),
})
.catch((e: unknown) =>
debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e),
);
}
});
}
shutdown() {
for (const timer of this.activeTimers) {
clearInterval(timer);
}
}
async executeTriggerSync(
trigger: PipelineTrigger,
nodes: readonly ConcreteNode[],
triggerTargets: ReadonlySet<string>,
protectedLogicalIds: ReadonlySet<string> = new Set(),
): Promise<readonly ConcreteNode[]> {
let currentBuffer = ContextWorkingBufferImpl.initialize(nodes);
const triggerPipelines = this.pipelines.filter((p) =>
p.triggers.includes(trigger),
);
// Freeze the inbox for this pipeline run
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
for (const pipeline of triggerPipelines) {
for (const processor of pipeline.processors) {
try {
this.tracer.logEvent(
'Orchestrator',
`Executing processor synchronously: ${processor.id}`,
);
const allowedTargets = currentBuffer.nodes.filter((n) =>
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds),
);
const returnedNodes = await processor.process({
buffer: currentBuffer,
targets: allowedTargets,
inbox: inboxSnapshot,
});
currentBuffer = currentBuffer.applyProcessorResult(
processor.id,
allowedTargets,
returnedNodes,
);
} catch (error) {
debugLogger.error(
`Synchronous processor ${processor.id} failed:`,
error,
);
}
}
}
// Success! Drain consumed messages
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
return currentBuffer.nodes;
}
private async executePipelineAsync(
pipeline: PipelineDef,
nodes: readonly ConcreteNode[],
triggerTargets: Set<string>,
protectedLogicalIds: ReadonlySet<string> = new Set(),
) {
this.tracer.logEvent(
'Orchestrator',
`Triggering async pipeline: ${pipeline.name}`,
);
if (!nodes || nodes.length === 0) return;
let currentBuffer = ContextWorkingBufferImpl.initialize(nodes);
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
for (const processor of pipeline.processors) {
try {
this.tracer.logEvent(
'Orchestrator',
`Executing processor: ${processor.id} (async)`,
);
const allowedTargets = currentBuffer.nodes.filter((n) =>
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds),
);
const returnedNodes = await processor.process({
buffer: currentBuffer,
targets: allowedTargets,
inbox: inboxSnapshot,
});
currentBuffer = currentBuffer.applyProcessorResult(
processor.id,
allowedTargets,
returnedNodes,
);
} catch (error) {
debugLogger.error(
`Pipeline ${pipeline.name} failed async at ${processor.id}:`,
error,
);
return;
}
}
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
}
}
@@ -0,0 +1,30 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { SnapshotProposal, SnapshotCache } from '../pipeline.js';
export class LiveSnapshotCache implements SnapshotCache {
private proposals: SnapshotProposal[] = [];
private consumedIds = new Set<string>();
publish(
proposal: Omit<SnapshotProposal, 'id' | 'timestamp'>,
idGenerator: { generateId(): string },
): void {
this.proposals.push({
...proposal,
id: idGenerator.generateId(),
timestamp: Date.now(),
});
}
getProposals(): readonly SnapshotProposal[] {
return this.proposals.filter((p) => !this.consumedIds.has(p.id));
}
consume(proposalId: string): void {
this.consumedIds.add(proposalId);
}
}
@@ -10,15 +10,11 @@ import {
createDummyNode,
createMockProcessArgs,
} from '../testing/contextTestUtils.js';
import type { InboxMessage } from '../pipeline.js';
import type { InboxSnapshotImpl } from '../pipeline/inbox.js';
describe('StateSnapshotAsyncProcessor', () => {
it('should generate a snapshot and publish it to the inbox', async () => {
it('should generate a snapshot and publish it to the cache', async () => {
const env = createMockEnvironment();
// Spy on the publish method
const publishSpy = vi.spyOn(env.inbox, 'publish');
const worker = createStateSnapshotAsyncProcessor(
'StateSnapshotAsyncProcessor',
env,
@@ -29,14 +25,16 @@ describe('StateSnapshotAsyncProcessor', () => {
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const targets = [nodeA, nodeB];
await worker.process(createMockProcessArgs(targets, targets, []));
const args = createMockProcessArgs(targets, targets, []);
const publishSpy = vi.spyOn(args.snapshotCache, 'publish');
await worker.process(args);
// Ensure generateContent was called
expect(env.llmClient.generateContent).toHaveBeenCalled();
// Verify it published to the inbox
// Verify it published to the cache
expect(publishSpy).toHaveBeenCalledWith(
'PROPOSED_SNAPSHOT',
expect.objectContaining({
newText: 'Mock LLM summary response',
consumedIds: ['node-A', 'node-B'],
@@ -46,11 +44,8 @@ describe('StateSnapshotAsyncProcessor', () => {
);
});
it('should pull previous accumulate snapshot from inbox and append new targets', async () => {
it('should pull previous accumulate snapshot from cache and append new targets', async () => {
const env = createMockEnvironment();
const publishSpy = vi.spyOn(env.inbox, 'publish');
const drainSpy = vi.spyOn(env.inbox, 'drainConsumed');
const worker = createStateSnapshotAsyncProcessor(
'StateSnapshotAsyncProcessor',
env,
@@ -60,32 +55,27 @@ describe('StateSnapshotAsyncProcessor', () => {
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeC];
const inboxMessages: InboxMessage[] = [
const proposals = [
{
id: 'draft-1',
topic: 'PROPOSED_SNAPSHOT',
timestamp: Date.now() - 1000,
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<old snapshot>',
type: 'accumulate',
},
consumedIds: ['node-A', 'node-B'],
newText: '<old snapshot>',
type: 'accumulate',
},
];
const args = createMockProcessArgs(targets, targets, inboxMessages);
const args = createMockProcessArgs(targets, targets, proposals);
const publishSpy = vi.spyOn(args.snapshotCache, 'publish');
const consumeSpy = vi.spyOn(args.snapshotCache, 'consume');
await worker.process(args);
// The old draft should be consumed
expect(
(args.inbox as InboxSnapshotImpl).getConsumedIds().has('draft-1'),
).toBe(true);
expect(drainSpy).toHaveBeenCalledWith(expect.any(Set));
expect(consumeSpy).toHaveBeenCalledWith('draft-1');
// The new publish should contain ALL consumed IDs (old + new)
expect(publishSpy).toHaveBeenCalledWith(
'PROPOSED_SNAPSHOT',
expect.objectContaining({
newText: 'Mock LLM summary response',
consumedIds: ['node-A', 'node-B', 'node-C'], // Aggregated!
@@ -112,14 +102,16 @@ describe('StateSnapshotAsyncProcessor', () => {
it('should ignore empty targets', async () => {
const env = createMockEnvironment();
const publishSpy = vi.spyOn(env.inbox, 'publish');
const worker = createStateSnapshotAsyncProcessor(
'StateSnapshotAsyncProcessor',
env,
{ type: 'accumulate' },
);
await worker.process(createMockProcessArgs([], [], []));
const args = createMockProcessArgs([], [], []);
const publishSpy = vi.spyOn(args.snapshotCache, 'publish');
await worker.process(args);
expect(env.llmClient.generateContent).not.toHaveBeenCalled();
expect(publishSpy).not.toHaveBeenCalled();
@@ -24,7 +24,7 @@ export function createStateSnapshotAsyncProcessor(
return {
id,
name: 'StateSnapshotAsyncProcessor',
process: async ({ targets, inbox }: ProcessArgs): Promise<void> => {
process: async ({ targets, snapshotCache }: ProcessArgs): Promise<void> => {
if (targets.length === 0) return;
try {
@@ -33,14 +33,10 @@ export function createStateSnapshotAsyncProcessor(
const processorType = options.type ?? 'point-in-time';
if (processorType === 'accumulate') {
// Look for the most recent unconsumed accumulate snapshot in the inbox
const proposedSnapshots = inbox.getMessages<{
newText: string;
consumedIds: string[];
type: string;
}>('PROPOSED_SNAPSHOT');
// Look for the most recent unconsumed accumulate snapshot in the cache
const proposedSnapshots = snapshotCache.getProposals();
const accumulateSnapshots = proposedSnapshots.filter(
(s) => s.payload.type === 'accumulate',
(s) => s.type === 'accumulate',
);
if (accumulateSnapshots.length > 0) {
@@ -49,13 +45,10 @@ export function createStateSnapshotAsyncProcessor(
(a, b) => b.timestamp - a.timestamp,
)[0];
// Consume the old draft so the inbox doesn't fill up with stale drafts
inbox.consume(latest.id);
// And we must persist its consumption back to the live inbox immediately,
// because we are effectively "taking" it from the shelf to modify.
env.inbox.drainConsumed(new Set([latest.id]));
// Consume the old draft so the cache doesn't fill up with stale drafts
snapshotCache.consume(latest.id);
previousConsumedIds = latest.payload.consumedIds;
previousConsumedIds = latest.consumedIds;
// Prepend a synthetic node representing the previous rolling state
const previousStateNode: ConcreteNode = {
@@ -63,7 +56,7 @@ export function createStateSnapshotAsyncProcessor(
logicalParentId: '',
type: 'SNAPSHOT',
timestamp: latest.timestamp,
text: latest.payload.newText,
text: latest.newText,
};
nodesToSummarize = [previousStateNode, ...targets];
@@ -80,9 +73,7 @@ export function createStateSnapshotAsyncProcessor(
...targets.map((t) => t.id),
];
// In V2, async pipelines communicate their work to the inbox, and the processor picks it up.
env.inbox.publish(
'PROPOSED_SNAPSHOT',
snapshotCache.publish(
{
newText: snapshotText,
consumedIds: newConsumedIds,
@@ -3,129 +3,96 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect } from 'vitest';
import { describe, it, expect, vi } from 'vitest';
import { createStateSnapshotProcessor } from './stateSnapshotProcessor.js';
import {
createMockEnvironment,
createDummyNode,
createMockProcessArgs,
} from '../testing/contextTestUtils.js';
import type { InboxSnapshotImpl } from '../pipeline/inbox.js';
describe('StateSnapshotProcessor', () => {
it('should ignore if budget is satisfied', async () => {
it('should return original targets if no nodes to process', async () => {
const env = createMockEnvironment();
const processor = createStateSnapshotProcessor(
'StateSnapshotProcessor',
env,
{
target: 'incremental',
},
);
const targets = [createDummyNode('ep1', 'USER_PROMPT')];
const result = await processor.process(createMockProcessArgs(targets));
expect(result).toBe(targets); // Strict equality
const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, {
target: 'max',
});
const result = await processor.process(createMockProcessArgs([], []));
expect(result).toEqual([]);
});
it('should apply a valid snapshot from the Inbox (Fast Path)', async () => {
it('should use pre-computed snapshot from cache if valid', async () => {
const env = createMockEnvironment();
const processor = createStateSnapshotProcessor(
'StateSnapshotProcessor',
env,
{
target: 'incremental',
},
);
const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, {
target: 'max', // implies 'accumulate' type
});
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 50, {}, 'node-B');
const nodeC = createDummyNode('ep1', 'TOOL_EXECUTION', 50, {}, 'node-C');
const targets = [nodeA, nodeB, nodeC];
// The async background pipeline created a snapshot of A and B
const messages = [
const proposals = [
{
id: 'msg-1',
topic: 'PROPOSED_SNAPSHOT',
timestamp: Date.now(),
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<compressed A and B>',
type: 'point-in-time',
},
newText: 'Pre-computed summary of A and B',
consumedIds: ['node-A', 'node-B'],
type: 'accumulate',
},
];
const processArgs = createMockProcessArgs(targets, [], messages);
const processArgs = createMockProcessArgs(targets, targets, proposals);
const consumeSpy = vi.spyOn(processArgs.snapshotCache, 'consume');
const result = await processor.process(processArgs);
// Should remove A and B, insert Snapshot, keep C
// It should have replaced A and B with a SNAPSHOT, and kept C
expect(result.length).toBe(2);
expect(result[0].type).toBe('SNAPSHOT');
expect(result[1].id).toBe('node-C');
expect((result[0] as any).text).toBe('Pre-computed summary of A and B');
expect(result[1]).toEqual(nodeC);
// Should consume the message
expect(
(processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1'),
).toBe(true);
// The message should be consumed
expect(consumeSpy).toHaveBeenCalledWith('msg-1');
});
it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => {
it('should fall back to synchronous generation if no valid snapshot in cache', async () => {
const env = createMockEnvironment();
const processor = createStateSnapshotProcessor(
'StateSnapshotProcessor',
env,
{
target: 'incremental',
},
);
// Make deficit 0 so we don't fall through to the sync backstop and fail the test that way
const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, {
target: 'max',
});
// node-A is MISSING (user deleted it)
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const targets = [nodeB];
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 50, {}, 'node-B');
const messages = [
const targets = [nodeA, nodeB];
// Invalid snapshot (consumes a node that isn't in targets)
const proposals = [
{
id: 'msg-1',
topic: 'PROPOSED_SNAPSHOT',
timestamp: Date.now(),
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<compressed A and B>',
},
newText: 'Invalid summary',
consumedIds: ['node-X'],
type: 'accumulate',
},
];
const processArgs = createMockProcessArgs(targets, [], messages);
const processArgs = createMockProcessArgs(targets, targets, proposals);
const consumeSpy = vi.spyOn(processArgs.snapshotCache, 'consume');
const result = await processor.process(processArgs);
// Because deficit is 0, and Inbox was rejected, nothing should change
expect(result.length).toBe(1);
expect(result[0].id).toBe('node-B');
expect(
(processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1'),
).toBe(false);
});
it('should fall back to sync backstop if inbox is empty', async () => {
const env = createMockEnvironment();
const processor = createStateSnapshotProcessor(
'StateSnapshotProcessor',
env,
{ target: 'max' },
); // Summarize all
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeA, nodeB, nodeC];
const result = await processor.process(createMockProcessArgs(targets));
// Should synthesize a new snapshot synchronously
// Should have generated synchronously
expect(env.llmClient.generateContent).toHaveBeenCalled();
expect(result.length).toBe(2); // nodeA is skipped as "system prompt", snapshot + nodeA
expect(result[1].type).toBe('SNAPSHOT');
expect(result.length).toBe(1);
expect(result[0].type).toBe('SNAPSHOT');
expect((result[0] as any).text).toBe('Mock LLM summary response');
// Should not have consumed the invalid message
expect(consumeSpy).not.toHaveBeenCalledWith('msg-1');
});
});
@@ -28,7 +28,7 @@ export function createStateSnapshotProcessor(
return {
id,
name: 'StateSnapshotProcessor',
process: async ({ targets, inbox }: ProcessArgs) => {
process: async ({ targets, snapshotCache }: ProcessArgs) => {
if (targets.length === 0) {
return targets;
}
@@ -38,17 +38,13 @@ export function createStateSnapshotProcessor(
const expectedType =
strategy === 'incremental' ? 'point-in-time' : 'accumulate';
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{
newText: string;
consumedIds: string[];
type: string;
}>('PROPOSED_SNAPSHOT');
// 1. Check cache for a completed Snapshot (The Fast Path)
const proposedSnapshots = snapshotCache.getProposals();
if (proposedSnapshots.length > 0) {
// Filter for the snapshot type that matches our processor mode
const matchingSnapshots = proposedSnapshots.filter(
(s) => s.payload.type === expectedType,
(s) => s.type === expectedType,
);
// Sort by newest timestamp first (we want the most accumulated snapshot)
@@ -57,7 +53,7 @@ export function createStateSnapshotProcessor(
);
for (const proposed of sorted) {
const { consumedIds, newText } = proposed.payload;
const { consumedIds, newText } = proposed;
// Verify all consumed IDs still exist sequentially in targets
const targetIds = new Set(targets.map((t) => t.id));
@@ -91,7 +87,7 @@ export function createStateSnapshotProcessor(
returnedNodes.unshift(snapshotNode);
}
inbox.consume(proposed.id);
snapshotCache.consume(proposed.id);
return returnedNodes;
}
}
@@ -10,8 +10,6 @@ import type { Content } from '@google/genai';
import type { ContextProfile } from '../config/profiles.js';
import { ContextEnvironmentImpl } from '../pipeline/environmentImpl.js';
import { ContextTracer } from '../tracer.js';
import { ContextEventBus } from '../eventBus.js';
import { PipelineOrchestrator } from '../pipeline/orchestrator.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js';
import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js';
@@ -27,8 +25,6 @@ export class SimulationHarness {
readonly chatHistory: AgentChatHistory;
contextManager!: ContextManager;
env!: ContextEnvironmentImpl;
orchestrator!: PipelineOrchestrator;
readonly eventBus: ContextEventBus;
config!: ContextProfile;
private tracer!: ContextTracer;
private currentTurnIndex = 0;
@@ -46,7 +42,6 @@ export class SimulationHarness {
private constructor() {
this.chatHistory = new AgentChatHistory();
this.eventBus = new ContextEventBus();
}
private async init(
@@ -68,37 +63,22 @@ export class SimulationHarness {
mockTempDir,
this.tracer,
1, // 1 char per token average
this.eventBus,
new InMemoryFileSystem(),
new DeterministicIdGenerator(),
);
this.orchestrator = new PipelineOrchestrator(
config.buildPipelines(this.env),
config.buildAsyncPipelines(this.env),
this.env,
this.eventBus,
this.tracer,
);
this.contextManager = new ContextManager(
config,
this.env,
this.tracer,
this.orchestrator,
this.chatHistory,
);
}
/**
* Simulates a single "Turn" (User input + Model/Tool outputs)
* A turn might consist of multiple Content messages (e.g. user prompt -> model call -> user response -> model answer)
*/
async simulateTurn(messages: Content[]) {
// 1. Append the new messages
const currentHistory = this.chatHistory.get();
this.chatHistory.set([...currentHistory, ...messages]);
// 2. Measure tokens immediately after append (Before background processing)
const tokensBefore = this.env.tokenCalculator.calculateConcreteListTokens(
this.contextManager.getNodes(),
);
@@ -106,10 +86,9 @@ export class SimulationHarness {
`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`,
);
// 3. Yield to event loop to allow internal async subscribers and orchestrator to finish
// Yield to let internal event loops settle
await new Promise((resolve) => setTimeout(resolve, 50));
// 3.1 Simulate what projectCompressedHistory does with the sync handlers
let currentView = this.contextManager.getNodes();
const currentTokens =
this.env.tokenCalculator.calculateConcreteListTokens(currentView);
@@ -120,23 +99,17 @@ export class SimulationHarness {
debugLogger.log(
`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.config.budget.maxTokens}`,
);
const orchestrator = this.orchestrator;
// In the V2 simulation, we trigger the 'gc_backstop' to simulate emergency pressure.
// Since contextManager owns its buffer natively, the simulation now properly matches reality
// where the manager runs the orchestrator and keeps the resulting modified view.
const modifiedView = await orchestrator.executeTriggerSync(
const modifiedView = await this.contextManager.executeTriggerSync(
'gc_backstop',
currentView,
new Set(currentView.map((e) => e.id)),
new Set<string>(),
);
// In the real system, ContextManager triggers this and retains it.
// We will emulate that behavior internally in the test loop for token counting.
currentView = modifiedView;
}
// 4. Measure tokens after background processors have processed inboxes
const tokensAfter = this.env.tokenCalculator.calculateConcreteListTokens(
this.contextManager.getNodes(),
);
@@ -14,22 +14,17 @@ import { ContextTracer } from '../tracer.js';
import { ContextEnvironmentImpl } from '../pipeline/environmentImpl.js';
import { SidecarLoader } from '../config/configLoader.js';
import { SidecarRegistry } from '../config/registry.js';
import { ContextEventBus } from '../eventBus.js';
import { PipelineOrchestrator } from '../pipeline/orchestrator.js';
import type { ConcreteNode, ToolExecution } from '../ir/types.js';
import type { ContextEnvironment } from '../pipeline/environment.js';
import type { Config } from '../../config/config.js';
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { Content, GenerateContentResponse } from '@google/genai';
import { InboxSnapshotImpl } from '../pipeline/inbox.js';
import type { InboxMessage, ProcessArgs } from '../pipeline.js';
import type { SnapshotProposal, ProcessArgs } from '../pipeline.js';
import type { ContextProfile } from '../config/profiles.js';
import type { Mock } from 'vitest';
import { LiveSnapshotCache } from '../pipeline/snapshotCache.js';
import { ContextWorkingBufferImpl } from '../pipeline/contextWorkingBuffer.js';
/**
* Creates a valid mock GenerateContentResponse with the provided text.
* Used to avoid having to manually construct the deeply nested candidate/content/part structure.
*/
export const createMockGenerateContentResponse = (
text: string,
): GenerateContentResponse =>
@@ -114,7 +109,6 @@ export function createMockLlmClient(
generateContentMock.mockResolvedValueOnce(response);
}
}
// Fallback to the last response for any subsequent calls
const lastResponse = responses[responses.length - 1];
if (typeof lastResponse === 'string') {
generateContentMock.mockResolvedValue(
@@ -124,7 +118,6 @@ export function createMockLlmClient(
generateContentMock.mockResolvedValue(lastResponse);
}
} else {
// Default fallback
generateContentMock.mockResolvedValue(
createMockGenerateContentResponse('Mock LLM response'),
);
@@ -145,7 +138,6 @@ export function createMockEnvironment(
targetDir: '/tmp',
sessionId: 'mock-session',
});
const eventBus = new ContextEventBus();
const env = new ContextEnvironmentImpl(
llmClient,
@@ -155,7 +147,6 @@ export function createMockEnvironment(
'/tmp/.gemini/tool-outputs',
tracer,
1,
eventBus,
new InMemoryFileSystem(),
new DeterministicIdGenerator('mock-uuid-'),
);
@@ -166,23 +157,23 @@ export function createMockEnvironment(
return env;
}
/**
* Creates a block of synthetic conversation history designed to consume a specific number of tokens.
* Assumes roughly 4 characters per token for standard English text.
*/
import { ContextWorkingBufferImpl } from '../pipeline/contextWorkingBuffer.js';
export function createMockProcessArgs(
targets: ConcreteNode[],
bufferNodes: ConcreteNode[] = [],
inboxMessages: InboxMessage[] = [],
proposals: SnapshotProposal[] = [],
): ProcessArgs {
const cache = new LiveSnapshotCache();
// We can just manually add the proposals for the mock
for (const p of proposals) {
(cache as any).proposals.push(p);
}
return {
targets,
buffer: ContextWorkingBufferImpl.initialize(
bufferNodes.length ? bufferNodes : targets,
),
inbox: new InboxSnapshotImpl(inboxMessages),
snapshotCache: cache,
};
}
@@ -207,9 +198,6 @@ export function createSyntheticHistory(
return history;
}
/**
* Creates a fully mocked Config object tailored for Context Component testing.
*/
export function createMockContextConfig(
overrides?: Record<string, unknown>,
llmClientOverride?: unknown,
@@ -236,22 +224,17 @@ export function createMockContextConfig(
return { ...defaultConfig, ...overrides } as unknown as Config;
}
/**
* Wires up a full ContextManager component with an AgentChatHistory and active background async pipelines.
*/
export function setupContextComponentTest(
config: Config,
sidecarOverride?: ContextProfile,
): { chatHistory: AgentChatHistory; contextManager: ContextManager } {
const chatHistory = new AgentChatHistory();
const registry = new SidecarRegistry(); // Provide an empty registry for tests, or one pre-filled by the caller if needed later
const registry = new SidecarRegistry();
const sidecar = sidecarOverride || SidecarLoader.fromConfig(config, registry);
const tracer = new ContextTracer({
targetDir: '/tmp',
sessionId: 'test-session',
});
const eventBus = new ContextEventBus();
const env = new ContextEnvironmentImpl(
config.getBaseLlmClient(),
'test prompt-id',
@@ -260,25 +243,14 @@ export function setupContextComponentTest(
'/tmp/gemini-test',
tracer,
1,
eventBus,
);
const orchestrator = new PipelineOrchestrator(
sidecar.buildPipelines(env),
sidecar.buildAsyncPipelines(env),
env,
eventBus,
tracer,
);
const contextManager = new ContextManager(
sidecar,
env,
tracer,
orchestrator,
chatHistory,
);
// The async async pipeline is now internally managed by ContextManager
return { chatHistory, contextManager };
}