mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-06-10 11:12:35 -07:00
cwb complete
This commit is contained in:
@@ -6,7 +6,6 @@
|
||||
|
||||
import type { Content } from '@google/genai';
|
||||
import type { AgentChatHistory } from '../core/agentChatHistory.js';
|
||||
import { debugLogger } from '../utils/debugLogger.js';
|
||||
import type { ConcreteNode } from './ir/types.js';
|
||||
import type { ContextEventBus } from './eventBus.js';
|
||||
import type { ContextTracer } from './tracer.js';
|
||||
@@ -15,11 +14,13 @@ import type { SidecarConfig } from './sidecar/types.js';
|
||||
import type { PipelineOrchestrator } from './sidecar/orchestrator.js';
|
||||
import { HistoryObserver } from './historyObserver.js';
|
||||
import { IrProjector } from './ir/projector.js';
|
||||
import { ContextWorkingBufferImpl } from './sidecar/contextWorkingBuffer.js';
|
||||
|
||||
export class ContextManager {
|
||||
// The stateful, pristine flat graph.
|
||||
// The master state containing the pristine graph and current active graph.
|
||||
private buffer: ContextWorkingBufferImpl = ContextWorkingBufferImpl.initialize([]);
|
||||
private pristineNodes: readonly ConcreteNode[] = [];
|
||||
private currentNodes: readonly ConcreteNode[] = [];
|
||||
|
||||
private readonly eventBus: ContextEventBus;
|
||||
|
||||
// Internal sub-components
|
||||
@@ -47,31 +48,16 @@ export class ContextManager {
|
||||
|
||||
this.eventBus.onPristineHistoryUpdated((event) => {
|
||||
this.pristineNodes = event.nodes;
|
||||
// In V2, we assume currentNodes updates sequentially via Orchestrator patches.
|
||||
// But if pristine changes, we must ensure our current view incorporates new nodes.
|
||||
// For now, simple fallback: if the current nodes doesn't have the new nodes, append them.
|
||||
// A more robust implementation would diff the nodes, but for now we'll just track.
|
||||
const existingIds = new Set(this.currentNodes.map((n) => n.id));
|
||||
|
||||
const existingIds = new Set(this.buffer.nodes.map((n) => n.id));
|
||||
const addedNodes = event.nodes.filter((n) => !existingIds.has(n.id));
|
||||
|
||||
if (addedNodes.length > 0) {
|
||||
this.currentNodes = [...this.currentNodes, ...addedNodes];
|
||||
this.buffer = this.buffer.appendPristineNodes(addedNodes);
|
||||
}
|
||||
|
||||
this.evaluateTriggers(event.newNodes);
|
||||
});
|
||||
|
||||
this.eventBus.onVariantReady((event) => {
|
||||
// In V2, async workers write back patches.
|
||||
// The old variant dict logic is replaced by the orchestrator applying patches directly.
|
||||
// For now we log it.
|
||||
this.tracer.logEvent(
|
||||
'ContextManager',
|
||||
`Received async variant [${event.variantId}] for Node ${event.targetId}`,
|
||||
);
|
||||
debugLogger.log(
|
||||
`ContextManager: Received async variant [${event.variantId}] for Node ${event.targetId}.`,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -91,21 +77,21 @@ export class ContextManager {
|
||||
|
||||
if (newNodes.size > 0) {
|
||||
this.eventBus.emitChunkReceived({
|
||||
nodes: this.currentNodes,
|
||||
nodes: this.buffer.nodes,
|
||||
targetNodeIds: newNodes,
|
||||
});
|
||||
}
|
||||
|
||||
const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens(
|
||||
this.currentNodes,
|
||||
this.buffer.nodes,
|
||||
);
|
||||
|
||||
if (currentTokens > this.sidecar.budget.retainedTokens) {
|
||||
const agedOutNodes = new Set<string>();
|
||||
let rollingTokens = 0;
|
||||
// Walk backwards finding nodes that fall out of the retained budget
|
||||
for (let i = this.currentNodes.length - 1; i >= 0; i--) {
|
||||
const node = this.currentNodes[i];
|
||||
for (let i = this.buffer.nodes.length - 1; i >= 0; i--) {
|
||||
const node = this.buffer.nodes[i];
|
||||
rollingTokens += this.env.tokenCalculator.calculateConcreteListTokens([
|
||||
node,
|
||||
]);
|
||||
@@ -116,7 +102,7 @@ export class ContextManager {
|
||||
|
||||
if (agedOutNodes.size > 0) {
|
||||
this.eventBus.emitConsolidationNeeded({
|
||||
nodes: this.currentNodes,
|
||||
nodes: this.buffer.nodes,
|
||||
targetDeficit: currentTokens - this.sidecar.budget.retainedTokens,
|
||||
targetNodeIds: agedOutNodes,
|
||||
});
|
||||
@@ -139,7 +125,7 @@ export class ContextManager {
|
||||
* This is the view that will eventually be projected back to the LLM.
|
||||
*/
|
||||
getNodes(): readonly ConcreteNode[] {
|
||||
return [...this.currentNodes];
|
||||
return [...this.buffer.nodes];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -156,7 +142,7 @@ export class ContextManager {
|
||||
);
|
||||
// Apply final GC Backstop pressure barrier synchronously before mapping
|
||||
const finalHistory = await IrProjector.project(
|
||||
this.currentNodes,
|
||||
this.buffer.nodes,
|
||||
this.orchestrator,
|
||||
this.sidecar,
|
||||
this.tracer,
|
||||
|
||||
@@ -23,11 +23,7 @@ export interface IrChunkReceivedEvent {
|
||||
targetNodeIds: Set<string>;
|
||||
}
|
||||
|
||||
export interface VariantReadyEvent {
|
||||
targetId: string; // The Episode or Step ID this variant attaches to
|
||||
variantId: string; // A unique ID for the variant itself
|
||||
variant: ConcreteNode; // In V2, variants are synthetic concrete nodes
|
||||
}
|
||||
|
||||
|
||||
export class ContextEventBus extends EventEmitter {
|
||||
emitPristineHistoryUpdated(event: PristineHistoryUpdatedEvent) {
|
||||
@@ -55,12 +51,4 @@ export class ContextEventBus extends EventEmitter {
|
||||
onConsolidationNeeded(listener: (event: ContextConsolidationEvent) => void) {
|
||||
this.on('BUDGET_RETAINED_CROSSED', listener);
|
||||
}
|
||||
|
||||
emitVariantReady(event: VariantReadyEvent) {
|
||||
this.emit('VARIANT_READY', event);
|
||||
}
|
||||
|
||||
onVariantReady(listener: (event: VariantReadyEvent) => void) {
|
||||
this.on('VARIANT_READY', listener);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,10 +18,18 @@ export interface InboxSnapshot {
|
||||
consume(messageId: string): void;
|
||||
}
|
||||
|
||||
export interface GraphMutation {
|
||||
readonly processorId: string;
|
||||
readonly timestamp: number;
|
||||
readonly removedIds: readonly string[];
|
||||
readonly addedNodes: readonly ConcreteNode[];
|
||||
}
|
||||
|
||||
export interface ContextWorkingBuffer {
|
||||
readonly nodes: readonly ConcreteNode[];
|
||||
getPristineNode(id: string): ConcreteNode | undefined;
|
||||
getPristineNodes(id: string): readonly ConcreteNode[];
|
||||
getLineage(id: string): readonly ConcreteNode[];
|
||||
getAuditLog(): readonly GraphMutation[];
|
||||
}
|
||||
|
||||
export interface ProcessArgs {
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2026 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { ContextWorkingBufferImpl } from './contextWorkingBuffer.js';
|
||||
import { createDummyNode } from '../testing/contextTestUtils.js';
|
||||
|
||||
describe('ContextWorkingBufferImpl', () => {
|
||||
it('should initialize with a pristine graph correctly', () => {
|
||||
const pristine1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1');
|
||||
const pristine2 = createDummyNode('ep1', 'AGENT_THOUGHT', 10, undefined, 'p2');
|
||||
|
||||
const buffer = ContextWorkingBufferImpl.initialize([pristine1, pristine2]);
|
||||
|
||||
expect(buffer.nodes).toHaveLength(2);
|
||||
expect(buffer.getAuditLog()).toHaveLength(0);
|
||||
|
||||
// Pristine nodes should point to themselves
|
||||
expect(buffer.getPristineNodes('p1')).toEqual([pristine1]);
|
||||
expect(buffer.getPristineNodes('p2')).toEqual([pristine2]);
|
||||
});
|
||||
|
||||
it('should track 1:1 replacements (e.g., masking) and append to audit log', () => {
|
||||
const pristine1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1');
|
||||
let buffer = ContextWorkingBufferImpl.initialize([pristine1]);
|
||||
|
||||
const maskedNode = createDummyNode('ep1', 'USER_PROMPT', 5, undefined, 'm1');
|
||||
// Simulate what a processor does
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(maskedNode as any).replacesId = 'p1';
|
||||
|
||||
buffer = buffer.applyProcessorResult('ToolMasking', [pristine1], [maskedNode]);
|
||||
|
||||
expect(buffer.nodes).toHaveLength(1);
|
||||
expect(buffer.nodes[0].id).toBe('m1');
|
||||
|
||||
const log = buffer.getAuditLog();
|
||||
expect(log).toHaveLength(1);
|
||||
expect(log[0].processorId).toBe('ToolMasking');
|
||||
expect(log[0].removedIds).toEqual(['p1']);
|
||||
expect(log[0].addedNodes[0].id).toBe('m1');
|
||||
|
||||
// Provenance lookup: the masked node should resolve back to the pristine root
|
||||
expect(buffer.getPristineNodes('m1')).toEqual([pristine1]);
|
||||
});
|
||||
|
||||
it('should track N:1 abstractions (e.g., rolling summaries)', () => {
|
||||
const p1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1');
|
||||
const p2 = createDummyNode('ep1', 'AGENT_THOUGHT', 10, undefined, 'p2');
|
||||
const p3 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p3');
|
||||
|
||||
let buffer = ContextWorkingBufferImpl.initialize([p1, p2, p3]);
|
||||
|
||||
const summaryNode = createDummyNode('ep1', 'ROLLING_SUMMARY', 15, undefined, 's1');
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(summaryNode as any).abstractsIds = ['p1', 'p2'];
|
||||
|
||||
buffer = buffer.applyProcessorResult('Summarizer', [p1, p2], [summaryNode]);
|
||||
|
||||
// p1 and p2 are removed, p3 remains, s1 is added
|
||||
expect(buffer.nodes.map(n => n.id)).toEqual(['p3', 's1']);
|
||||
|
||||
// Provenance lookup: The summary node should resolve to both p1 and p2!
|
||||
const roots = buffer.getPristineNodes('s1');
|
||||
expect(roots).toHaveLength(2);
|
||||
expect(roots).toContain(p1);
|
||||
expect(roots).toContain(p2);
|
||||
});
|
||||
|
||||
it('should track multi-generation provenance correctly', () => {
|
||||
const p1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1');
|
||||
let buffer = ContextWorkingBufferImpl.initialize([p1]);
|
||||
|
||||
// Gen 1: Masked
|
||||
const gen1 = createDummyNode('ep1', 'USER_PROMPT', 8, undefined, 'gen1');
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(gen1 as any).replacesId = 'p1';
|
||||
buffer = buffer.applyProcessorResult('Masking', [p1], [gen1]);
|
||||
|
||||
// Gen 2: Summarized
|
||||
const gen2 = createDummyNode('ep1', 'ROLLING_SUMMARY', 5, undefined, 'gen2');
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(gen2 as any).abstractsIds = ['gen1'];
|
||||
buffer = buffer.applyProcessorResult('Summarizer', [gen1], [gen2]);
|
||||
|
||||
expect(buffer.nodes).toHaveLength(1);
|
||||
expect(buffer.nodes[0].id).toBe('gen2');
|
||||
|
||||
// Audit log should show sequence
|
||||
const log = buffer.getAuditLog();
|
||||
expect(log).toHaveLength(2);
|
||||
expect(log[0].processorId).toBe('Masking');
|
||||
expect(log[1].processorId).toBe('Summarizer');
|
||||
|
||||
// Multi-gen Provenance lookup: gen2 -> gen1 -> p1
|
||||
expect(buffer.getPristineNodes('gen2')).toEqual([p1]);
|
||||
});
|
||||
|
||||
it('should handle net-new injected nodes without throwing', () => {
|
||||
const p1 = createDummyNode('ep1', 'USER_PROMPT', 10, undefined, 'p1');
|
||||
let buffer = ContextWorkingBufferImpl.initialize([p1]);
|
||||
|
||||
const injected = createDummyNode('ep1', 'SYSTEM_EVENT', 5, undefined, 'injected1');
|
||||
// No replacesId or abstractsIds
|
||||
|
||||
buffer = buffer.applyProcessorResult('Injector', [], [injected]);
|
||||
|
||||
expect(buffer.nodes.map(n => n.id)).toEqual(['p1', 'injected1']);
|
||||
|
||||
// It should root to itself
|
||||
expect(buffer.getPristineNodes('injected1')).toEqual([injected]);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,187 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2026 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import type { ContextWorkingBuffer, GraphMutation } from '../pipeline.js';
|
||||
import type { ConcreteNode } from '../ir/types.js';
|
||||
|
||||
export class ContextWorkingBufferImpl implements ContextWorkingBuffer {
|
||||
// The current active graph
|
||||
readonly nodes: readonly ConcreteNode[];
|
||||
|
||||
// The AOT pre-calculated provenance index (Current ID -> Pristine IDs)
|
||||
private readonly provenanceMap: ReadonlyMap<string, ReadonlySet<string>>;
|
||||
|
||||
// The original immutable pristine nodes mapping
|
||||
private readonly pristineNodesMap: ReadonlyMap<string, ConcreteNode>;
|
||||
|
||||
// The historical linked list of changes
|
||||
private readonly history: readonly GraphMutation[];
|
||||
|
||||
private constructor(
|
||||
nodes: readonly ConcreteNode[],
|
||||
pristineNodesMap: ReadonlyMap<string, ConcreteNode>,
|
||||
provenanceMap: ReadonlyMap<string, ReadonlySet<string>>,
|
||||
history: readonly GraphMutation[]
|
||||
) {
|
||||
this.nodes = nodes;
|
||||
this.pristineNodesMap = pristineNodesMap;
|
||||
this.provenanceMap = provenanceMap;
|
||||
this.history = history;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a brand new ContextWorkingBuffer from a pristine graph.
|
||||
* Every node's provenance points to itself.
|
||||
*/
|
||||
static initialize(pristineNodes: readonly ConcreteNode[]): ContextWorkingBufferImpl {
|
||||
const pristineMap = new Map<string, ConcreteNode>();
|
||||
const initialProvenance = new Map<string, ReadonlySet<string>>();
|
||||
|
||||
for (const node of pristineNodes) {
|
||||
pristineMap.set(node.id, node);
|
||||
initialProvenance.set(node.id, new Set([node.id]));
|
||||
}
|
||||
|
||||
return new ContextWorkingBufferImpl(
|
||||
pristineNodes,
|
||||
pristineMap,
|
||||
initialProvenance,
|
||||
[] // Empty history
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends newly observed pristine nodes (e.g. from a user message) to the working buffer.
|
||||
* Ensures they are tracked in the pristine map and point to themselves in provenance.
|
||||
*/
|
||||
appendPristineNodes(newNodes: readonly ConcreteNode[]): ContextWorkingBufferImpl {
|
||||
if (newNodes.length === 0) return this;
|
||||
|
||||
const newPristineMap = new Map<string, ConcreteNode>(this.pristineNodesMap);
|
||||
const newProvenanceMap = new Map(this.provenanceMap);
|
||||
|
||||
for (const node of newNodes) {
|
||||
newPristineMap.set(node.id, node);
|
||||
newProvenanceMap.set(node.id, new Set([node.id]));
|
||||
}
|
||||
|
||||
return new ContextWorkingBufferImpl(
|
||||
[...this.nodes, ...newNodes],
|
||||
newPristineMap,
|
||||
newProvenanceMap,
|
||||
[...this.history]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates an entirely new buffer instance by calculating the delta between the processor's input and output.
|
||||
*/
|
||||
applyProcessorResult(
|
||||
processorId: string,
|
||||
inputTargets: readonly ConcreteNode[],
|
||||
outputNodes: readonly ConcreteNode[]
|
||||
): ContextWorkingBufferImpl {
|
||||
const outputIds = new Set(outputNodes.map(n => n.id));
|
||||
const inputIds = new Set(inputTargets.map(n => n.id));
|
||||
|
||||
// Calculate diffs
|
||||
const removedIds = inputTargets.filter(n => !outputIds.has(n.id)).map(n => n.id);
|
||||
const addedNodes = outputNodes.filter(n => !inputIds.has(n.id));
|
||||
|
||||
// Create mutation record
|
||||
const mutation: GraphMutation = {
|
||||
processorId,
|
||||
timestamp: Date.now(),
|
||||
removedIds,
|
||||
addedNodes,
|
||||
};
|
||||
|
||||
// Calculate new node array
|
||||
const removedSet = new Set(removedIds);
|
||||
const retainedNodes = this.nodes.filter(n => !removedSet.has(n.id));
|
||||
const newGraph = [...retainedNodes];
|
||||
|
||||
// We append the output nodes in the same general position if possible,
|
||||
// but in a complex graph we just ensure they exist. V2 graph uses timestamps for order.
|
||||
// For simplicity, we just push added nodes to the end of the retained array
|
||||
newGraph.push(...addedNodes);
|
||||
|
||||
// Calculate new provenance map
|
||||
const newProvenanceMap = new Map(this.provenanceMap);
|
||||
|
||||
let finalPristineMap = this.pristineNodesMap;
|
||||
|
||||
// Map the new synthetic nodes back to their pristine roots
|
||||
for (const added of addedNodes) {
|
||||
const roots = new Set<string>();
|
||||
|
||||
// 1:1 Replacement (e.g. Masked Node)
|
||||
if (added.replacesId) {
|
||||
const inheritedRoots = this.provenanceMap.get(added.replacesId);
|
||||
if (inheritedRoots) {
|
||||
for (const rootId of inheritedRoots) roots.add(rootId);
|
||||
}
|
||||
}
|
||||
|
||||
// N:1 Abstraction (e.g. Rolling Summary)
|
||||
if (added.abstractsIds) {
|
||||
for (const abstractId of added.abstractsIds) {
|
||||
const inheritedRoots = this.provenanceMap.get(abstractId);
|
||||
if (inheritedRoots) {
|
||||
for (const rootId of inheritedRoots) roots.add(rootId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If it has no links back to the original graph, it is its own root
|
||||
// (e.g., a system-injected instruction)
|
||||
if (roots.size === 0) {
|
||||
roots.add(added.id);
|
||||
// It acts as a net-new pristine root.
|
||||
if (!finalPristineMap.has(added.id)) {
|
||||
const mutableMap = new Map<string, ConcreteNode>(finalPristineMap);
|
||||
mutableMap.set(added.id, added);
|
||||
finalPristineMap = mutableMap;
|
||||
}
|
||||
}
|
||||
|
||||
newProvenanceMap.set(added.id, roots);
|
||||
}
|
||||
|
||||
return new ContextWorkingBufferImpl(
|
||||
newGraph,
|
||||
finalPristineMap,
|
||||
newProvenanceMap,
|
||||
[...this.history, mutation]
|
||||
);
|
||||
}
|
||||
|
||||
getPristineNodes(id: string): readonly ConcreteNode[] {
|
||||
const pristineIds = this.provenanceMap.get(id);
|
||||
if (!pristineIds) return [];
|
||||
return Array.from(pristineIds).map(pid => this.pristineNodesMap.get(pid)!);
|
||||
}
|
||||
|
||||
getAuditLog(): readonly GraphMutation[] {
|
||||
return this.history;
|
||||
}
|
||||
|
||||
getLineage(id: string): readonly ConcreteNode[] {
|
||||
const lineage: ConcreteNode[] = [];
|
||||
const currentNodesMap = new Map(this.nodes.map(n => [n.id, n]));
|
||||
|
||||
let current = currentNodesMap.get(id);
|
||||
while (current) {
|
||||
lineage.push(current);
|
||||
if (current.logicalParentId && current.logicalParentId !== current.id) {
|
||||
current = currentNodesMap.get(current.logicalParentId);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return lineage;
|
||||
}
|
||||
}
|
||||
@@ -43,7 +43,7 @@ class ModifyingProcessor implements ContextProcessor {
|
||||
text: newParts[0].text + ' [modified]',
|
||||
};
|
||||
}
|
||||
newTargets[0] = { ...prompt, semanticParts: newParts };
|
||||
newTargets[0] = { ...prompt, id: prompt.id + '-modified', replacesId: prompt.id, semanticParts: newParts };
|
||||
}
|
||||
return newTargets;
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import type { ConcreteNode } from '../ir/types.js';
|
||||
import type {
|
||||
ContextProcessor,
|
||||
ContextWorker,
|
||||
ContextWorkingBuffer,
|
||||
} from '../pipeline.js';
|
||||
import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js';
|
||||
import type {
|
||||
@@ -19,37 +18,7 @@ import type {
|
||||
import type { SidecarRegistry } from './registry.js';
|
||||
import { debugLogger } from '../../utils/debugLogger.js';
|
||||
import { InboxSnapshotImpl } from './inbox.js';
|
||||
|
||||
class ContextWorkingBufferImpl implements ContextWorkingBuffer {
|
||||
private readonly nodesMap: Map<string, ConcreteNode>;
|
||||
|
||||
constructor(readonly nodes: readonly ConcreteNode[]) {
|
||||
this.nodesMap = new Map(nodes.map((n) => [n.id, n]));
|
||||
}
|
||||
|
||||
getPristineNode(id: string): ConcreteNode | undefined {
|
||||
// In V2, pristine nodes are accessed via the IrMapper's state tracking or through the history
|
||||
// Since orchestrator doesn't natively hold the original pristine graph, we search current buffer
|
||||
// or rely on the env's capability. For now, since pristine graph is maintained in ContextManager,
|
||||
// we just return the node from the current buffer if we don't have a direct pristine link.
|
||||
// To fully implement pristine lookup, we would need to pass the pristine graph from ContextManager.
|
||||
return this.nodesMap.get(id);
|
||||
}
|
||||
|
||||
getLineage(id: string): readonly ConcreteNode[] {
|
||||
const lineage: ConcreteNode[] = [];
|
||||
let current = this.nodesMap.get(id);
|
||||
while (current) {
|
||||
lineage.push(current);
|
||||
if (current.logicalParentId && current.logicalParentId !== current.id) {
|
||||
current = this.nodesMap.get(current.logicalParentId);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return lineage;
|
||||
}
|
||||
}
|
||||
import { ContextWorkingBufferImpl } from './contextWorkingBuffer.js';
|
||||
|
||||
export class PipelineOrchestrator {
|
||||
private activeTimers: NodeJS.Timeout[] = [];
|
||||
@@ -184,63 +153,13 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
applyProcessorDiff(
|
||||
nodes: readonly ConcreteNode[],
|
||||
targets: readonly ConcreteNode[],
|
||||
returnedNodes: readonly ConcreteNode[],
|
||||
): readonly ConcreteNode[] {
|
||||
const mutableNodes = [...nodes];
|
||||
const targetSet = new Set(targets.map((n) => n.id));
|
||||
const returnedMap = new Map(returnedNodes.map((n) => [n.id, n]));
|
||||
|
||||
const removedIds = new Set<string>();
|
||||
const newNodes: ConcreteNode[] = [];
|
||||
|
||||
for (const t of targets) {
|
||||
const returnedNode = returnedMap.get(t.id);
|
||||
if (!returnedNode) {
|
||||
removedIds.add(t.id);
|
||||
} else if (returnedNode !== t) {
|
||||
removedIds.add(t.id);
|
||||
newNodes.push(returnedNode);
|
||||
}
|
||||
}
|
||||
|
||||
for (const r of returnedNodes) {
|
||||
if (!targetSet.has(r.id)) {
|
||||
newNodes.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
if (removedIds.size === 0 && newNodes.length === 0) {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
let earliestRemovalIdx = mutableNodes.length;
|
||||
let i = 0;
|
||||
while (i < mutableNodes.length) {
|
||||
if (removedIds.has(mutableNodes[i].id)) {
|
||||
if (i < earliestRemovalIdx) earliestRemovalIdx = i;
|
||||
mutableNodes.splice(i, 1);
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
if (newNodes.length > 0) {
|
||||
mutableNodes.splice(earliestRemovalIdx, 0, ...newNodes);
|
||||
}
|
||||
|
||||
return mutableNodes;
|
||||
}
|
||||
|
||||
async executeTriggerSync(
|
||||
trigger: PipelineTrigger,
|
||||
nodes: readonly ConcreteNode[],
|
||||
triggerTargets: ReadonlySet<string>,
|
||||
protectedLogicalIds: ReadonlySet<string> = new Set(),
|
||||
): Promise<readonly ConcreteNode[]> {
|
||||
let currentNodes = nodes;
|
||||
let currentBuffer = ContextWorkingBufferImpl.initialize(nodes);
|
||||
const pipelines = this.config.pipelines.filter((p) =>
|
||||
p.triggers.includes(trigger),
|
||||
);
|
||||
@@ -261,18 +180,18 @@ export class PipelineOrchestrator {
|
||||
`Executing processor synchronously: ${procDef.processorId}`,
|
||||
);
|
||||
|
||||
const allowedTargets = currentNodes.filter((n) =>
|
||||
const allowedTargets = currentBuffer.nodes.filter((n) =>
|
||||
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds),
|
||||
);
|
||||
|
||||
const returnedNodes = await processor.process({
|
||||
buffer: new ContextWorkingBufferImpl(currentNodes),
|
||||
buffer: currentBuffer,
|
||||
targets: allowedTargets,
|
||||
inbox: inboxSnapshot,
|
||||
});
|
||||
|
||||
currentNodes = this.applyProcessorDiff(
|
||||
currentNodes,
|
||||
currentBuffer = currentBuffer.applyProcessorResult(
|
||||
processor.id,
|
||||
allowedTargets,
|
||||
returnedNodes,
|
||||
);
|
||||
@@ -288,7 +207,7 @@ export class PipelineOrchestrator {
|
||||
// Success! Drain consumed messages
|
||||
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
|
||||
return currentNodes;
|
||||
return currentBuffer.nodes;
|
||||
}
|
||||
|
||||
private async executePipelineAsync(
|
||||
@@ -303,7 +222,7 @@ export class PipelineOrchestrator {
|
||||
);
|
||||
if (!nodes || nodes.length === 0) return;
|
||||
|
||||
let currentNodes = nodes;
|
||||
let currentBuffer = ContextWorkingBufferImpl.initialize(nodes);
|
||||
const inboxSnapshot = new InboxSnapshotImpl(
|
||||
this.env.inbox.getMessages() || [],
|
||||
);
|
||||
@@ -318,18 +237,18 @@ export class PipelineOrchestrator {
|
||||
`Executing processor: ${procDef.processorId} (async)`,
|
||||
);
|
||||
|
||||
const allowedTargets = currentNodes.filter((n) =>
|
||||
const allowedTargets = currentBuffer.nodes.filter((n) =>
|
||||
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds),
|
||||
);
|
||||
|
||||
const returnedNodes = await processor.process({
|
||||
buffer: new ContextWorkingBufferImpl(currentNodes),
|
||||
buffer: currentBuffer,
|
||||
targets: allowedTargets,
|
||||
inbox: inboxSnapshot,
|
||||
});
|
||||
|
||||
currentNodes = this.applyProcessorDiff(
|
||||
currentNodes,
|
||||
currentBuffer = currentBuffer.applyProcessorResult(
|
||||
processor.id,
|
||||
allowedTargets,
|
||||
returnedNodes,
|
||||
);
|
||||
|
||||
@@ -124,35 +124,21 @@ export class SimulationHarness {
|
||||
);
|
||||
const orchestrator = this.orchestrator;
|
||||
// In the V2 simulation, we trigger the 'gc_backstop' to simulate emergency pressure.
|
||||
currentView = await orchestrator.executeTriggerSync(
|
||||
// Since contextManager owns its buffer natively, the simulation now properly matches reality
|
||||
// where the manager runs the orchestrator and keeps the resulting modified view.
|
||||
const modifiedView = await orchestrator.executeTriggerSync(
|
||||
'gc_backstop',
|
||||
currentView,
|
||||
new Set(currentView.map((e) => e.id)),
|
||||
new Set<string>(),
|
||||
);
|
||||
// Inject the truncated view back into the graph
|
||||
for (let i = 0; i < currentView.length; i++) {
|
||||
const ep = currentView[i];
|
||||
if (!this.contextManager.getNodes().find((c) => c.id === ep.id)) {
|
||||
this.eventBus.emitVariantReady({
|
||||
targetId: ep.id,
|
||||
variantId: 'v-emergency',
|
||||
variant: {
|
||||
type: 'MASKED_TOOL',
|
||||
id: 'mock-id',
|
||||
tokens: { intent: 0, observation: 0 },
|
||||
intent: {},
|
||||
observation: {},
|
||||
toolName: 'tool',
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
// Wait for variant propagation
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// In the real system, ContextManager triggers this and retains it.
|
||||
// We will emulate that behavior internally in the test loop for token counting.
|
||||
currentView = modifiedView;
|
||||
}
|
||||
|
||||
// 4. Measure tokens after background processors have (hopefully) emitted variants
|
||||
// 4. Measure tokens after background processors have processed inboxes
|
||||
const tokensAfter = this.env.tokenCalculator.calculateConcreteListTokens(
|
||||
this.contextManager.getNodes(),
|
||||
);
|
||||
|
||||
@@ -24,7 +24,6 @@ import type { BaseLlmClient } from '../../core/baseLlmClient.js';
|
||||
import type { Content, GenerateContentResponse } from '@google/genai';
|
||||
import { InboxSnapshotImpl } from '../sidecar/inbox.js';
|
||||
import type {
|
||||
ContextWorkingBuffer,
|
||||
InboxMessage,
|
||||
ProcessArgs,
|
||||
} from '../pipeline.js';
|
||||
@@ -43,7 +42,7 @@ export const createMockGenerateContentResponse = (
|
||||
|
||||
export function createDummyNode(
|
||||
logicalParentId: string,
|
||||
type: 'USER_PROMPT' | 'SYSTEM_EVENT' | 'AGENT_THOUGHT' | 'AGENT_YIELD',
|
||||
type: ConcreteNode['type'],
|
||||
tokens = 100,
|
||||
overrides?: Partial<ConcreteNode>,
|
||||
id?: string,
|
||||
@@ -176,32 +175,7 @@ export function createMockEnvironment(
|
||||
* Creates a block of synthetic conversation history designed to consume a specific number of tokens.
|
||||
* Assumes roughly 4 characters per token for standard English text.
|
||||
*/
|
||||
export class FakeContextWorkingBuffer implements ContextWorkingBuffer {
|
||||
readonly nodes: readonly ConcreteNode[];
|
||||
private readonly nodesById = new Map<string, ConcreteNode>();
|
||||
private readonly nodesByEpisode = new Map<string, ConcreteNode[]>();
|
||||
|
||||
constructor(nodes: readonly ConcreteNode[]) {
|
||||
this.nodes = nodes;
|
||||
for (const node of nodes) {
|
||||
this.nodesById.set(node.id, node);
|
||||
const parentId = node.logicalParentId || 'orphan';
|
||||
const epNodes = this.nodesByEpisode.get(parentId) || [];
|
||||
epNodes.push(node);
|
||||
this.nodesByEpisode.set(parentId, epNodes);
|
||||
}
|
||||
}
|
||||
|
||||
getPristineNode(id: string): ConcreteNode | undefined {
|
||||
return this.nodesById.get(id);
|
||||
}
|
||||
|
||||
getLineage(id: string): readonly ConcreteNode[] {
|
||||
const node = this.nodesById.get(id);
|
||||
if (!node) return [];
|
||||
return this.nodesByEpisode.get(node.logicalParentId || 'orphan') || [];
|
||||
}
|
||||
}
|
||||
import { ContextWorkingBufferImpl } from '../sidecar/contextWorkingBuffer.js';
|
||||
|
||||
export function createMockProcessArgs(
|
||||
targets: ConcreteNode[],
|
||||
@@ -210,9 +184,7 @@ export function createMockProcessArgs(
|
||||
): ProcessArgs {
|
||||
return {
|
||||
targets,
|
||||
buffer: new FakeContextWorkingBuffer(
|
||||
bufferNodes.length ? bufferNodes : targets,
|
||||
),
|
||||
buffer: ContextWorkingBufferImpl.initialize(bufferNodes.length ? bufferNodes : targets),
|
||||
inbox: new InboxSnapshotImpl(inboxMessages),
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user