From 68e7e93eaaa5794cd8b496113f77de0d664d9077 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 9 Apr 2026 18:44:27 +0000 Subject: [PATCH] cwb complete --- packages/core/src/context/contextManager.ts | 44 ++--- packages/core/src/context/eventBus.ts | 14 +- packages/core/src/context/pipeline.ts | 10 +- .../sidecar/contextWorkingBuffer.test.ts | 116 +++++++++++ .../context/sidecar/contextWorkingBuffer.ts | 187 ++++++++++++++++++ .../src/context/sidecar/orchestrator.test.ts | 2 +- .../core/src/context/sidecar/orchestrator.ts | 105 ++-------- .../context/system-tests/SimulationHarness.ts | 30 +-- .../src/context/testing/contextTestUtils.ts | 34 +--- 9 files changed, 352 insertions(+), 190 deletions(-) create mode 100644 packages/core/src/context/sidecar/contextWorkingBuffer.test.ts create mode 100644 packages/core/src/context/sidecar/contextWorkingBuffer.ts diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 5f4e5dea05..e7359c4770 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -6,7 +6,6 @@ import type { Content } from '@google/genai'; import type { AgentChatHistory } from '../core/agentChatHistory.js'; -import { debugLogger } from '../utils/debugLogger.js'; import type { ConcreteNode } from './ir/types.js'; import type { ContextEventBus } from './eventBus.js'; import type { ContextTracer } from './tracer.js'; @@ -15,11 +14,13 @@ import type { SidecarConfig } from './sidecar/types.js'; import type { PipelineOrchestrator } from './sidecar/orchestrator.js'; import { HistoryObserver } from './historyObserver.js'; import { IrProjector } from './ir/projector.js'; +import { ContextWorkingBufferImpl } from './sidecar/contextWorkingBuffer.js'; export class ContextManager { - // The stateful, pristine flat graph. + // The master state containing the pristine graph and current active graph. + private buffer: ContextWorkingBufferImpl = ContextWorkingBufferImpl.initialize([]); private pristineNodes: readonly ConcreteNode[] = []; - private currentNodes: readonly ConcreteNode[] = []; + private readonly eventBus: ContextEventBus; // Internal sub-components @@ -47,31 +48,16 @@ export class ContextManager { this.eventBus.onPristineHistoryUpdated((event) => { this.pristineNodes = event.nodes; - // In V2, we assume currentNodes updates sequentially via Orchestrator patches. - // But if pristine changes, we must ensure our current view incorporates new nodes. - // For now, simple fallback: if the current nodes doesn't have the new nodes, append them. - // A more robust implementation would diff the nodes, but for now we'll just track. - const existingIds = new Set(this.currentNodes.map((n) => n.id)); + + const existingIds = new Set(this.buffer.nodes.map((n) => n.id)); const addedNodes = event.nodes.filter((n) => !existingIds.has(n.id)); + if (addedNodes.length > 0) { - this.currentNodes = [...this.currentNodes, ...addedNodes]; + this.buffer = this.buffer.appendPristineNodes(addedNodes); } this.evaluateTriggers(event.newNodes); }); - - this.eventBus.onVariantReady((event) => { - // In V2, async workers write back patches. - // The old variant dict logic is replaced by the orchestrator applying patches directly. - // For now we log it. - this.tracer.logEvent( - 'ContextManager', - `Received async variant [${event.variantId}] for Node ${event.targetId}`, - ); - debugLogger.log( - `ContextManager: Received async variant [${event.variantId}] for Node ${event.targetId}.`, - ); - }); } /** @@ -91,21 +77,21 @@ export class ContextManager { if (newNodes.size > 0) { this.eventBus.emitChunkReceived({ - nodes: this.currentNodes, + nodes: this.buffer.nodes, targetNodeIds: newNodes, }); } const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens( - this.currentNodes, + this.buffer.nodes, ); if (currentTokens > this.sidecar.budget.retainedTokens) { const agedOutNodes = new Set(); let rollingTokens = 0; // Walk backwards finding nodes that fall out of the retained budget - for (let i = this.currentNodes.length - 1; i >= 0; i--) { - const node = this.currentNodes[i]; + for (let i = this.buffer.nodes.length - 1; i >= 0; i--) { + const node = this.buffer.nodes[i]; rollingTokens += this.env.tokenCalculator.calculateConcreteListTokens([ node, ]); @@ -116,7 +102,7 @@ export class ContextManager { if (agedOutNodes.size > 0) { this.eventBus.emitConsolidationNeeded({ - nodes: this.currentNodes, + nodes: this.buffer.nodes, targetDeficit: currentTokens - this.sidecar.budget.retainedTokens, targetNodeIds: agedOutNodes, }); @@ -139,7 +125,7 @@ export class ContextManager { * This is the view that will eventually be projected back to the LLM. */ getNodes(): readonly ConcreteNode[] { - return [...this.currentNodes]; + return [...this.buffer.nodes]; } /** @@ -156,7 +142,7 @@ export class ContextManager { ); // Apply final GC Backstop pressure barrier synchronously before mapping const finalHistory = await IrProjector.project( - this.currentNodes, + this.buffer.nodes, this.orchestrator, this.sidecar, this.tracer, diff --git a/packages/core/src/context/eventBus.ts b/packages/core/src/context/eventBus.ts index 1e0192c052..ab0b8a196e 100644 --- a/packages/core/src/context/eventBus.ts +++ b/packages/core/src/context/eventBus.ts @@ -23,11 +23,7 @@ export interface IrChunkReceivedEvent { targetNodeIds: Set; } -export interface VariantReadyEvent { - targetId: string; // The Episode or Step ID this variant attaches to - variantId: string; // A unique ID for the variant itself - variant: ConcreteNode; // In V2, variants are synthetic concrete nodes -} + export class ContextEventBus extends EventEmitter { emitPristineHistoryUpdated(event: PristineHistoryUpdatedEvent) { @@ -55,12 +51,4 @@ export class ContextEventBus extends EventEmitter { onConsolidationNeeded(listener: (event: ContextConsolidationEvent) => void) { this.on('BUDGET_RETAINED_CROSSED', listener); } - - emitVariantReady(event: VariantReadyEvent) { - this.emit('VARIANT_READY', event); - } - - onVariantReady(listener: (event: VariantReadyEvent) => void) { - this.on('VARIANT_READY', listener); - } } diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index 631a89bf89..1bfe3ed9c5 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -18,10 +18,18 @@ export interface InboxSnapshot { consume(messageId: string): void; } +export interface GraphMutation { + readonly processorId: string; + readonly timestamp: number; + readonly removedIds: readonly string[]; + readonly addedNodes: readonly ConcreteNode[]; +} + export interface ContextWorkingBuffer { readonly nodes: readonly ConcreteNode[]; - getPristineNode(id: string): ConcreteNode | undefined; + getPristineNodes(id: string): readonly ConcreteNode[]; getLineage(id: string): readonly ConcreteNode[]; + getAuditLog(): readonly GraphMutation[]; } export interface ProcessArgs { diff --git a/packages/core/src/context/sidecar/contextWorkingBuffer.test.ts b/packages/core/src/context/sidecar/contextWorkingBuffer.test.ts new file mode 100644 index 0000000000..6f01c01692 --- /dev/null +++ b/packages/core/src/context/sidecar/contextWorkingBuffer.test.ts @@ -0,0 +1,116 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect } from 'vitest'; +import { ContextWorkingBufferImpl } from './contextWorkingBuffer.js'; +import { createDummyNode } from '../testing/contextTestUtils.js'; + +describe('ContextWorkingBufferImpl', () => { + it('should initialize with a pristine graph correctly', () => { + const pristine1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1'); + const pristine2 = createDummyNode('ep1', 'AGENT_THOUGHT', 10, undefined, 'p2'); + + const buffer = ContextWorkingBufferImpl.initialize([pristine1, pristine2]); + + expect(buffer.nodes).toHaveLength(2); + expect(buffer.getAuditLog()).toHaveLength(0); + + // Pristine nodes should point to themselves + expect(buffer.getPristineNodes('p1')).toEqual([pristine1]); + expect(buffer.getPristineNodes('p2')).toEqual([pristine2]); + }); + + it('should track 1:1 replacements (e.g., masking) and append to audit log', () => { + const pristine1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1'); + let buffer = ContextWorkingBufferImpl.initialize([pristine1]); + + const maskedNode = createDummyNode('ep1', 'USER_PROMPT', 5, undefined, 'm1'); + // Simulate what a processor does + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (maskedNode as any).replacesId = 'p1'; + + buffer = buffer.applyProcessorResult('ToolMasking', [pristine1], [maskedNode]); + + expect(buffer.nodes).toHaveLength(1); + expect(buffer.nodes[0].id).toBe('m1'); + + const log = buffer.getAuditLog(); + expect(log).toHaveLength(1); + expect(log[0].processorId).toBe('ToolMasking'); + expect(log[0].removedIds).toEqual(['p1']); + expect(log[0].addedNodes[0].id).toBe('m1'); + + // Provenance lookup: the masked node should resolve back to the pristine root + expect(buffer.getPristineNodes('m1')).toEqual([pristine1]); + }); + + it('should track N:1 abstractions (e.g., rolling summaries)', () => { + const p1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1'); + const p2 = createDummyNode('ep1', 'AGENT_THOUGHT', 10, undefined, 'p2'); + const p3 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p3'); + + let buffer = ContextWorkingBufferImpl.initialize([p1, p2, p3]); + + const summaryNode = createDummyNode('ep1', 'ROLLING_SUMMARY', 15, undefined, 's1'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (summaryNode as any).abstractsIds = ['p1', 'p2']; + + buffer = buffer.applyProcessorResult('Summarizer', [p1, p2], [summaryNode]); + + // p1 and p2 are removed, p3 remains, s1 is added + expect(buffer.nodes.map(n => n.id)).toEqual(['p3', 's1']); + + // Provenance lookup: The summary node should resolve to both p1 and p2! + const roots = buffer.getPristineNodes('s1'); + expect(roots).toHaveLength(2); + expect(roots).toContain(p1); + expect(roots).toContain(p2); + }); + + it('should track multi-generation provenance correctly', () => { + const p1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1'); + let buffer = ContextWorkingBufferImpl.initialize([p1]); + + // Gen 1: Masked + const gen1 = createDummyNode('ep1', 'USER_PROMPT', 8, undefined, 'gen1'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (gen1 as any).replacesId = 'p1'; + buffer = buffer.applyProcessorResult('Masking', [p1], [gen1]); + + // Gen 2: Summarized + const gen2 = createDummyNode('ep1', 'ROLLING_SUMMARY', 5, undefined, 'gen2'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (gen2 as any).abstractsIds = ['gen1']; + buffer = buffer.applyProcessorResult('Summarizer', [gen1], [gen2]); + + expect(buffer.nodes).toHaveLength(1); + expect(buffer.nodes[0].id).toBe('gen2'); + + // Audit log should show sequence + const log = buffer.getAuditLog(); + expect(log).toHaveLength(2); + expect(log[0].processorId).toBe('Masking'); + expect(log[1].processorId).toBe('Summarizer'); + + // Multi-gen Provenance lookup: gen2 -> gen1 -> p1 + expect(buffer.getPristineNodes('gen2')).toEqual([p1]); + }); + + it('should handle net-new injected nodes without throwing', () => { + const p1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1'); + let buffer = ContextWorkingBufferImpl.initialize([p1]); + + const injected = createDummyNode('ep1', 'SYSTEM_EVENT', 5, undefined, 'injected1'); + // No replacesId or abstractsIds + + buffer = buffer.applyProcessorResult('Injector', [], [injected]); + + expect(buffer.nodes.map(n => n.id)).toEqual(['p1', 'injected1']); + + // It should root to itself + expect(buffer.getPristineNodes('injected1')).toEqual([injected]); + }); +}); \ No newline at end of file diff --git a/packages/core/src/context/sidecar/contextWorkingBuffer.ts b/packages/core/src/context/sidecar/contextWorkingBuffer.ts new file mode 100644 index 0000000000..6b3e37627b --- /dev/null +++ b/packages/core/src/context/sidecar/contextWorkingBuffer.ts @@ -0,0 +1,187 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { ContextWorkingBuffer, GraphMutation } from '../pipeline.js'; +import type { ConcreteNode } from '../ir/types.js'; + +export class ContextWorkingBufferImpl implements ContextWorkingBuffer { + // The current active graph + readonly nodes: readonly ConcreteNode[]; + + // The AOT pre-calculated provenance index (Current ID -> Pristine IDs) + private readonly provenanceMap: ReadonlyMap>; + + // The original immutable pristine nodes mapping + private readonly pristineNodesMap: ReadonlyMap; + + // The historical linked list of changes + private readonly history: readonly GraphMutation[]; + + private constructor( + nodes: readonly ConcreteNode[], + pristineNodesMap: ReadonlyMap, + provenanceMap: ReadonlyMap>, + history: readonly GraphMutation[] + ) { + this.nodes = nodes; + this.pristineNodesMap = pristineNodesMap; + this.provenanceMap = provenanceMap; + this.history = history; + } + + /** + * Initializes a brand new ContextWorkingBuffer from a pristine graph. + * Every node's provenance points to itself. + */ + static initialize(pristineNodes: readonly ConcreteNode[]): ContextWorkingBufferImpl { + const pristineMap = new Map(); + const initialProvenance = new Map>(); + + for (const node of pristineNodes) { + pristineMap.set(node.id, node); + initialProvenance.set(node.id, new Set([node.id])); + } + + return new ContextWorkingBufferImpl( + pristineNodes, + pristineMap, + initialProvenance, + [] // Empty history + ); + } + + /** + * Appends newly observed pristine nodes (e.g. from a user message) to the working buffer. + * Ensures they are tracked in the pristine map and point to themselves in provenance. + */ + appendPristineNodes(newNodes: readonly ConcreteNode[]): ContextWorkingBufferImpl { + if (newNodes.length === 0) return this; + + const newPristineMap = new Map(this.pristineNodesMap); + const newProvenanceMap = new Map(this.provenanceMap); + + for (const node of newNodes) { + newPristineMap.set(node.id, node); + newProvenanceMap.set(node.id, new Set([node.id])); + } + + return new ContextWorkingBufferImpl( + [...this.nodes, ...newNodes], + newPristineMap, + newProvenanceMap, + [...this.history] + ); + } + + /** + * Generates an entirely new buffer instance by calculating the delta between the processor's input and output. + */ + applyProcessorResult( + processorId: string, + inputTargets: readonly ConcreteNode[], + outputNodes: readonly ConcreteNode[] + ): ContextWorkingBufferImpl { + const outputIds = new Set(outputNodes.map(n => n.id)); + const inputIds = new Set(inputTargets.map(n => n.id)); + + // Calculate diffs + const removedIds = inputTargets.filter(n => !outputIds.has(n.id)).map(n => n.id); + const addedNodes = outputNodes.filter(n => !inputIds.has(n.id)); + + // Create mutation record + const mutation: GraphMutation = { + processorId, + timestamp: Date.now(), + removedIds, + addedNodes, + }; + + // Calculate new node array + const removedSet = new Set(removedIds); + const retainedNodes = this.nodes.filter(n => !removedSet.has(n.id)); + const newGraph = [...retainedNodes]; + + // We append the output nodes in the same general position if possible, + // but in a complex graph we just ensure they exist. V2 graph uses timestamps for order. + // For simplicity, we just push added nodes to the end of the retained array + newGraph.push(...addedNodes); + + // Calculate new provenance map + const newProvenanceMap = new Map(this.provenanceMap); + + let finalPristineMap = this.pristineNodesMap; + + // Map the new synthetic nodes back to their pristine roots + for (const added of addedNodes) { + const roots = new Set(); + + // 1:1 Replacement (e.g. Masked Node) + if (added.replacesId) { + const inheritedRoots = this.provenanceMap.get(added.replacesId); + if (inheritedRoots) { + for (const rootId of inheritedRoots) roots.add(rootId); + } + } + + // N:1 Abstraction (e.g. Rolling Summary) + if (added.abstractsIds) { + for (const abstractId of added.abstractsIds) { + const inheritedRoots = this.provenanceMap.get(abstractId); + if (inheritedRoots) { + for (const rootId of inheritedRoots) roots.add(rootId); + } + } + } + + // If it has no links back to the original graph, it is its own root + // (e.g., a system-injected instruction) + if (roots.size === 0) { + roots.add(added.id); + // It acts as a net-new pristine root. + if (!finalPristineMap.has(added.id)) { + const mutableMap = new Map(finalPristineMap); + mutableMap.set(added.id, added); + finalPristineMap = mutableMap; + } + } + + newProvenanceMap.set(added.id, roots); + } + + return new ContextWorkingBufferImpl( + newGraph, + finalPristineMap, + newProvenanceMap, + [...this.history, mutation] + ); + } + + getPristineNodes(id: string): readonly ConcreteNode[] { + const pristineIds = this.provenanceMap.get(id); + if (!pristineIds) return []; + return Array.from(pristineIds).map(pid => this.pristineNodesMap.get(pid)!); + } + + getAuditLog(): readonly GraphMutation[] { + return this.history; + } + + getLineage(id: string): readonly ConcreteNode[] { + const lineage: ConcreteNode[] = []; + const currentNodesMap = new Map(this.nodes.map(n => [n.id, n])); + + let current = currentNodesMap.get(id); + while (current) { + lineage.push(current); + if (current.logicalParentId && current.logicalParentId !== current.id) { + current = currentNodesMap.get(current.logicalParentId); + } else { + break; + } + } + return lineage; + } +} diff --git a/packages/core/src/context/sidecar/orchestrator.test.ts b/packages/core/src/context/sidecar/orchestrator.test.ts index 47593ab020..ec91e4fa4e 100644 --- a/packages/core/src/context/sidecar/orchestrator.test.ts +++ b/packages/core/src/context/sidecar/orchestrator.test.ts @@ -43,7 +43,7 @@ class ModifyingProcessor implements ContextProcessor { text: newParts[0].text + ' [modified]', }; } - newTargets[0] = { ...prompt, semanticParts: newParts }; + newTargets[0] = { ...prompt, id: prompt.id + '-modified', replacesId: prompt.id, semanticParts: newParts }; } return newTargets; } diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 38afe1520b..3265916961 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -8,7 +8,6 @@ import type { ConcreteNode } from '../ir/types.js'; import type { ContextProcessor, ContextWorker, - ContextWorkingBuffer, } from '../pipeline.js'; import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js'; import type { @@ -19,37 +18,7 @@ import type { import type { SidecarRegistry } from './registry.js'; import { debugLogger } from '../../utils/debugLogger.js'; import { InboxSnapshotImpl } from './inbox.js'; - -class ContextWorkingBufferImpl implements ContextWorkingBuffer { - private readonly nodesMap: Map; - - constructor(readonly nodes: readonly ConcreteNode[]) { - this.nodesMap = new Map(nodes.map((n) => [n.id, n])); - } - - getPristineNode(id: string): ConcreteNode | undefined { - // In V2, pristine nodes are accessed via the IrMapper's state tracking or through the history - // Since orchestrator doesn't natively hold the original pristine graph, we search current buffer - // or rely on the env's capability. For now, since pristine graph is maintained in ContextManager, - // we just return the node from the current buffer if we don't have a direct pristine link. - // To fully implement pristine lookup, we would need to pass the pristine graph from ContextManager. - return this.nodesMap.get(id); - } - - getLineage(id: string): readonly ConcreteNode[] { - const lineage: ConcreteNode[] = []; - let current = this.nodesMap.get(id); - while (current) { - lineage.push(current); - if (current.logicalParentId && current.logicalParentId !== current.id) { - current = this.nodesMap.get(current.logicalParentId); - } else { - break; - } - } - return lineage; - } -} +import { ContextWorkingBufferImpl } from './contextWorkingBuffer.js'; export class PipelineOrchestrator { private activeTimers: NodeJS.Timeout[] = []; @@ -184,63 +153,13 @@ export class PipelineOrchestrator { } } - applyProcessorDiff( - nodes: readonly ConcreteNode[], - targets: readonly ConcreteNode[], - returnedNodes: readonly ConcreteNode[], - ): readonly ConcreteNode[] { - const mutableNodes = [...nodes]; - const targetSet = new Set(targets.map((n) => n.id)); - const returnedMap = new Map(returnedNodes.map((n) => [n.id, n])); - - const removedIds = new Set(); - const newNodes: ConcreteNode[] = []; - - for (const t of targets) { - const returnedNode = returnedMap.get(t.id); - if (!returnedNode) { - removedIds.add(t.id); - } else if (returnedNode !== t) { - removedIds.add(t.id); - newNodes.push(returnedNode); - } - } - - for (const r of returnedNodes) { - if (!targetSet.has(r.id)) { - newNodes.push(r); - } - } - - if (removedIds.size === 0 && newNodes.length === 0) { - return nodes; - } - - let earliestRemovalIdx = mutableNodes.length; - let i = 0; - while (i < mutableNodes.length) { - if (removedIds.has(mutableNodes[i].id)) { - if (i < earliestRemovalIdx) earliestRemovalIdx = i; - mutableNodes.splice(i, 1); - } else { - i++; - } - } - - if (newNodes.length > 0) { - mutableNodes.splice(earliestRemovalIdx, 0, ...newNodes); - } - - return mutableNodes; - } - async executeTriggerSync( trigger: PipelineTrigger, nodes: readonly ConcreteNode[], triggerTargets: ReadonlySet, protectedLogicalIds: ReadonlySet = new Set(), ): Promise { - let currentNodes = nodes; + let currentBuffer = ContextWorkingBufferImpl.initialize(nodes); const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger), ); @@ -261,18 +180,18 @@ export class PipelineOrchestrator { `Executing processor synchronously: ${procDef.processorId}`, ); - const allowedTargets = currentNodes.filter((n) => + const allowedTargets = currentBuffer.nodes.filter((n) => this.isNodeAllowed(n, triggerTargets, protectedLogicalIds), ); const returnedNodes = await processor.process({ - buffer: new ContextWorkingBufferImpl(currentNodes), + buffer: currentBuffer, targets: allowedTargets, inbox: inboxSnapshot, }); - currentNodes = this.applyProcessorDiff( - currentNodes, + currentBuffer = currentBuffer.applyProcessorResult( + processor.id, allowedTargets, returnedNodes, ); @@ -288,7 +207,7 @@ export class PipelineOrchestrator { // Success! Drain consumed messages this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds()); - return currentNodes; + return currentBuffer.nodes; } private async executePipelineAsync( @@ -303,7 +222,7 @@ export class PipelineOrchestrator { ); if (!nodes || nodes.length === 0) return; - let currentNodes = nodes; + let currentBuffer = ContextWorkingBufferImpl.initialize(nodes); const inboxSnapshot = new InboxSnapshotImpl( this.env.inbox.getMessages() || [], ); @@ -318,18 +237,18 @@ export class PipelineOrchestrator { `Executing processor: ${procDef.processorId} (async)`, ); - const allowedTargets = currentNodes.filter((n) => + const allowedTargets = currentBuffer.nodes.filter((n) => this.isNodeAllowed(n, triggerTargets, protectedLogicalIds), ); const returnedNodes = await processor.process({ - buffer: new ContextWorkingBufferImpl(currentNodes), + buffer: currentBuffer, targets: allowedTargets, inbox: inboxSnapshot, }); - currentNodes = this.applyProcessorDiff( - currentNodes, + currentBuffer = currentBuffer.applyProcessorResult( + processor.id, allowedTargets, returnedNodes, ); diff --git a/packages/core/src/context/system-tests/SimulationHarness.ts b/packages/core/src/context/system-tests/SimulationHarness.ts index 87c5a6c576..adf022f8ff 100644 --- a/packages/core/src/context/system-tests/SimulationHarness.ts +++ b/packages/core/src/context/system-tests/SimulationHarness.ts @@ -124,35 +124,21 @@ export class SimulationHarness { ); const orchestrator = this.orchestrator; // In the V2 simulation, we trigger the 'gc_backstop' to simulate emergency pressure. - currentView = await orchestrator.executeTriggerSync( + // Since contextManager owns its buffer natively, the simulation now properly matches reality + // where the manager runs the orchestrator and keeps the resulting modified view. + const modifiedView = await orchestrator.executeTriggerSync( 'gc_backstop', currentView, new Set(currentView.map((e) => e.id)), new Set(), ); - // Inject the truncated view back into the graph - for (let i = 0; i < currentView.length; i++) { - const ep = currentView[i]; - if (!this.contextManager.getNodes().find((c) => c.id === ep.id)) { - this.eventBus.emitVariantReady({ - targetId: ep.id, - variantId: 'v-emergency', - variant: { - type: 'MASKED_TOOL', - id: 'mock-id', - tokens: { intent: 0, observation: 0 }, - intent: {}, - observation: {}, - toolName: 'tool', - }, - }); - } - } - // Wait for variant propagation - await new Promise((resolve) => setTimeout(resolve, 50)); + + // In the real system, ContextManager triggers this and retains it. + // We will emulate that behavior internally in the test loop for token counting. + currentView = modifiedView; } - // 4. Measure tokens after background processors have (hopefully) emitted variants + // 4. Measure tokens after background processors have processed inboxes const tokensAfter = this.env.tokenCalculator.calculateConcreteListTokens( this.contextManager.getNodes(), ); diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index b6e5ff41cf..ad3874c9a9 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -24,7 +24,6 @@ import type { BaseLlmClient } from '../../core/baseLlmClient.js'; import type { Content, GenerateContentResponse } from '@google/genai'; import { InboxSnapshotImpl } from '../sidecar/inbox.js'; import type { - ContextWorkingBuffer, InboxMessage, ProcessArgs, } from '../pipeline.js'; @@ -43,7 +42,7 @@ export const createMockGenerateContentResponse = ( export function createDummyNode( logicalParentId: string, - type: 'USER_PROMPT' | 'SYSTEM_EVENT' | 'AGENT_THOUGHT' | 'AGENT_YIELD', + type: ConcreteNode['type'], tokens = 100, overrides?: Partial, id?: string, @@ -176,32 +175,7 @@ export function createMockEnvironment( * Creates a block of synthetic conversation history designed to consume a specific number of tokens. * Assumes roughly 4 characters per token for standard English text. */ -export class FakeContextWorkingBuffer implements ContextWorkingBuffer { - readonly nodes: readonly ConcreteNode[]; - private readonly nodesById = new Map(); - private readonly nodesByEpisode = new Map(); - - constructor(nodes: readonly ConcreteNode[]) { - this.nodes = nodes; - for (const node of nodes) { - this.nodesById.set(node.id, node); - const parentId = node.logicalParentId || 'orphan'; - const epNodes = this.nodesByEpisode.get(parentId) || []; - epNodes.push(node); - this.nodesByEpisode.set(parentId, epNodes); - } - } - - getPristineNode(id: string): ConcreteNode | undefined { - return this.nodesById.get(id); - } - - getLineage(id: string): readonly ConcreteNode[] { - const node = this.nodesById.get(id); - if (!node) return []; - return this.nodesByEpisode.get(node.logicalParentId || 'orphan') || []; - } -} +import { ContextWorkingBufferImpl } from '../sidecar/contextWorkingBuffer.js'; export function createMockProcessArgs( targets: ConcreteNode[], @@ -210,9 +184,7 @@ export function createMockProcessArgs( ): ProcessArgs { return { targets, - buffer: new FakeContextWorkingBuffer( - bufferNodes.length ? bufferNodes : targets, - ), + buffer: ContextWorkingBufferImpl.initialize(bufferNodes.length ? bufferNodes : targets), inbox: new InboxSnapshotImpl(inboxMessages), }; }