From 256a7a83faa37e4bb4dc313ba106908d5b3c8c5f Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 7 Apr 2026 20:59:26 +0000 Subject: [PATCH] broken refactor --- packages/core/src/context/DESIGN.md | 79 +++++++++++++++++ .../src/context/contextManager.async.test.ts | 1 + packages/core/src/context/contextManager.ts | 30 +++++-- packages/core/src/context/eventBus.ts | 3 + packages/core/src/context/historyObserver.ts | 28 +++++- packages/core/src/context/ir/episodeEditor.ts | 88 +++++++++++++++++-- packages/core/src/context/ir/fromIr.ts | 7 +- packages/core/src/context/ir/graphUtils.ts | 31 ++++++- packages/core/src/context/ir/toIr.ts | 4 +- packages/core/src/context/ir/types.ts | 1 + packages/core/src/context/pipeline.ts | 7 ++ .../processors/blobDegradationProcessor.ts | 10 ++- .../emergencyTruncationProcessor.ts | 2 +- .../processors/historySquashingProcessor.ts | 24 ++--- .../semanticCompressionProcessor.ts | 78 ++++++++-------- .../processors/stateSnapshotProcessor.ts | 14 +-- .../processors/toolMaskingProcessor.test.ts | 1 + .../processors/toolMaskingProcessor.ts | 2 +- .../src/context/sidecar/orchestrator.test.ts | 8 +- .../core/src/context/sidecar/orchestrator.ts | 21 ++++- .../src/context/testing/contextTestUtils.ts | 1 + 21 files changed, 355 insertions(+), 85 deletions(-) create mode 100644 packages/core/src/context/DESIGN.md diff --git a/packages/core/src/context/DESIGN.md b/packages/core/src/context/DESIGN.md new file mode 100644 index 0000000000..8baaebc249 --- /dev/null +++ b/packages/core/src/context/DESIGN.md @@ -0,0 +1,79 @@ +# Context Manager V0: High-Level Design + +## 1. Introduction & Motivation + +This document provides a high-level orientation to the Context Management system within `@google/gemini-cli-core`. + +Previously, context management in the CLI was decentralized, synchronous, and relied on fixed-function, destructive mutations of the raw Gemini `Content[]` history. Because all context management was local, this approach made it nearly impossible to reason about the global impact of any specific change. For example, should we distill tool outputs, or mask them? Or maybe it's contextual? What about other processors like the snapshotter, should they see masked results? Distilled results? What about new approaches to context management, how do they fit into the solution we've already built. The old approach to context management made it nearly challenging to even attempt to answer any one of these questions, let alone to try and answer all of them. + +To address these issues, we went back to the drawing board to create an explicit Context Manager. As opposed to our old approach, the new Context Manager V0 is a robust, event-driven, pluggable system. It introduces a non-destructive Episodic Intermediate Representation (IR) and an asynchronous processing pipeline, allowing the CLI to run expensive LLM summarization tasks in the background and opportunistically project an optimized view of the history only when budget constraints require it. + +--- + +## 2. Chief Innovations & Salient Features + +The architecture is built upon seven core principles that distinguish it from the legacy system: + +1. **Centralized Budgeting:** The `ContextManager` is the sole source of truth for the token budget. It makes the final, just-in-time decision about what gets projected to the LLM. +2. **Statelessness via IR:** Raw history is never mutated or deleted. Instead, it is translated into an Intermediate Representation (IR). Context reduction is achieved by attaching compressed `Variant`s to the IR graph. The original text is always recoverable. +3. **Asynchronicity:** Designed around a `ContextEventBus`. Heavy context operations (like LLM-powered summarization) run as detached background tasks without blocking the main agent loop. +4. **Configurability:** Driven by a typed JSON "Sidecar" configuration. Token ceilings, fallback strategies, and processing pipelines are entirely data-driven. +5. **Pluggability:** `ContextProcessor`s are isolated plugins with typed schemas. They are registered via Dependency Injection and can be arranged into arbitrary pipelines. +6. **Debuggability:** A built-in `ContextTracer` tracks every step of the pipeline, providing full audit trails of exactly when, why, and how a message was altered. +7. **Testability:** Global state has been eliminated. The system uses strict Dependency Injection (`ProcessorRegistry`, `ContextEnvironment`, `ContextEventBus`), making every layer easily unit-testable. + +--- + +## 3. The Major Pieces: Roles & Responsibilities + +### The Brain: `ContextManager` +The central coordinator. It owns the "Pristine History" (the ground-truth Episodic IR graph). Its primary responsibility is exposing `projectCompressedHistory()`, which flattens the IR graph into a standard `Content[]` array strictly adhering to the configured token budget. + +### The Data Model: Episodic Intermediate Representation (IR) +Instead of a flat array of messages, interactions are grouped into `Episode`s. An Episode represents a single turn: a User Prompt, followed by the Agent's Thoughts and Tool Executions (Steps), concluding with a Yield. +* **`IrNode`:** The base unit (e.g., `ToolExecution`, `AgentThought`). +* **`Variant`:** Compressed alternatives to the raw node (e.g., `SummaryVariant`, `MaskedVariant`, `SnapshotVariant`). +* **`IrMetadata`:** An audit trail attached to every node, tracking token counts and the chronological list of `transformations` applied by processors. + +### The Engine: `PipelineOrchestrator` & Sidecar +The orchestrator reads the `SidecarConfig`. It manages the lifecycle of the pipelines, registering triggers and executing processors in order. It dictates whether a pipeline blocks the main thread or runs in the background. + +### The Workers: `ContextProcessor`s +Small, highly-focused classes that implement context reduction strategies. They do not mutate the graph directly; instead, they are given an `EpisodeEditor` which provides a safe, scoped API to attach `Variant`s and append metadata. +* *Examples:* `ToolMaskingProcessor`, `SemanticCompressionProcessor`, `BlobDegradationProcessor`. + +### The Glue: `ContextEventBus` +A Pub/Sub bus that decouples the components. It enables the `HistoryObserver` to notify the system of new messages, and allows background processors to notify the `ContextManager` when a new compressed variant is ready to be used. + +--- + +## 4. How They Interact: The Life of a Message + +To understand how these pieces fit together, let's walk through the lifecycle of a single interaction as it moves through the context system. + +### Phase 1: Ingestion & Translation +1. **Action:** The user sends a prompt, and the agent responds with a tool call. These raw messages are appended to the standard `AgentChatHistory`. +2. **Observation:** The `HistoryObserver` detects the new messages. +3. **Translation:** The observer passes the raw `Content[]` to the `IrMapper`. The mapper groups the prompt and the tool execution into a single, structured `Episode`. +4. **Registration:** The new `Episode` is added to the `ContextManager`'s pristine graph. + +### Phase 2: Triggering the Pipelines +1. **Event Emission:** The `ContextManager` fires a `PristineHistoryUpdatedEvent` over the `ContextEventBus`. +2. **Orchestration:** The `PipelineOrchestrator` hears the event and evaluates its configured `PipelineDef`s. It finds a pipeline with the trigger `on_turn`. +3. **Execution:** The Orchestrator begins running the processors in that pipeline sequentially. If the pipeline is marked `execution: 'background'`, this happens asynchronously. + +### Phase 3: Processing & Safe Editing +1. **Processing:** A processor (e.g., `ToolMaskingProcessor`) receives the `EpisodeEditor`. It identifies a massive JSON payload in the tool execution. +2. **Editing:** Instead of deleting the JSON, it calls `editor.editEpisode()`. It creates a `MaskedVariant` containing a string summary of the JSON. +3. **Auditing:** The editor automatically appends a record to the node's `IrMetadata.transformations` indicating that the `ToolMaskingProcessor` applied a `MASKED` action. + +### Phase 4: Async Resolution +1. **Completion:** The background pipeline finishes. The orchestrator fires a `VariantReadyEvent` over the bus. +2. **Integration:** The `ContextManager` receives the event and securely attaches the `MaskedVariant` to the correct `Episode` in the pristine graph. (If the pipeline was synchronous/blocking, this happens immediately). + +### Phase 5: Just-In-Time Projection +1. **Request:** The agent is ready to send the next prompt to Gemini. The core routing logic calls `contextManager.projectCompressedHistory()`. +2. **Budget Evaluation:** The `IrProjector` calculates the current total tokens of the pristine graph and compares it to the `SidecarConfig` budget. +3. **Variant Selection:** If the graph exceeds the budget, the projector looks for available `Variant`s. It sees the newly attached `MaskedVariant` and calculates the token deficit recovered by using it. +4. **Flattening:** The `graphUtils` safely swap the raw node for the `MaskedVariant` in a temporary view, and flatten the Episodic IR back into a raw Gemini `Content[]` array. +5. **Delivery:** The optimized, budget-compliant array is sent to the LLM. The underlying pristine graph remains completely untouched and available for future reference or alternative projections. diff --git a/packages/core/src/context/contextManager.async.test.ts b/packages/core/src/context/contextManager.async.test.ts index b07512b914..4921cb119e 100644 --- a/packages/core/src/context/contextManager.async.test.ts +++ b/packages/core/src/context/contextManager.async.test.ts @@ -48,6 +48,7 @@ describe('ContextManager Barrier Tests', () => { type: 'snapshot', replacedEpisodeIds: replacedIds, episode: { + type: 'EPISODE', id: 'snapshot-ep', timestamp: Date.now(), trigger: { diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 7a1078a15b..9cc0202fd6 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -59,7 +59,7 @@ export class ContextManager { this.eventBus.onPristineHistoryUpdated((event) => { this.pristineEpisodes = event.episodes; - this.evaluateTriggers(); + this.evaluateTriggers(event.newNodes); }); this.eventBus.onVariantReady((event) => { @@ -97,7 +97,7 @@ export class ContextManager { * Evaluates if the current working buffer exceeds configured budget thresholds, * firing consolidation events if necessary. */ - private evaluateTriggers() { + private evaluateTriggers(newNodes: Set) { if (!this.sidecar.budget) return; const workingBuffer = this.getWorkingBufferView(); @@ -109,20 +109,40 @@ export class ContextManager { retainedTokens: this.sidecar.budget.retainedTokens, }); - // 1. Eager Compute Trigger - this.eventBus.emitChunkReceived({ episodes: this.pristineEpisodes }); + // 1. Eager Compute Trigger (on_turn) + if (newNodes.size > 0) { + this.eventBus.emitChunkReceived({ episodes: this.pristineEpisodes, targetNodeIds: newNodes }); + } // 2. Budget Crossed Trigger if (currentTokens > this.sidecar.budget.retainedTokens) { const deficit = currentTokens - this.sidecar.budget.retainedTokens; + + // Calculate exactly which nodes aged out of the retainedTokens budget to form our target delta + const agedOutNodes = new Set(); + let rollingTokens = 0; + // Start from newest and count backwards + for (let i = workingBuffer.length - 1; i >= 0; i--) { + const ep = workingBuffer[i]; + const epTokens = this.env.tokenCalculator.calculateEpisodeListTokens([ep]); + rollingTokens += epTokens; + if (rollingTokens > this.sidecar.budget.retainedTokens) { + agedOutNodes.add(ep.id); + agedOutNodes.add(ep.trigger.id); + for (const step of ep.steps) agedOutNodes.add(step.id); + if (ep.yield) agedOutNodes.add(ep.yield.id); + } + } + this.tracer.logEvent( 'ContextManager', 'Budget crossed. Emitting ConsolidationNeeded', - { deficit }, + { deficit, agedOutCount: agedOutNodes.size }, ); this.eventBus.emitConsolidationNeeded({ episodes: workingBuffer, targetDeficit: deficit, + targetNodeIds: agedOutNodes, }); } } diff --git a/packages/core/src/context/eventBus.ts b/packages/core/src/context/eventBus.ts index d7e8a9e0c5..ee6ddb9bcf 100644 --- a/packages/core/src/context/eventBus.ts +++ b/packages/core/src/context/eventBus.ts @@ -9,15 +9,18 @@ import type { Episode, Variant } from './ir/types.js'; export interface PristineHistoryUpdatedEvent { episodes: Episode[]; + newNodes: Set; } export interface ContextConsolidationEvent { episodes: Episode[]; targetDeficit: number; + targetNodeIds: Set; } export interface IrChunkReceivedEvent { episodes: Episode[]; + targetNodeIds: Set; } export interface VariantReadyEvent { diff --git a/packages/core/src/context/historyObserver.ts b/packages/core/src/context/historyObserver.ts index 012a1f2e27..c2cd1dda43 100644 --- a/packages/core/src/context/historyObserver.ts +++ b/packages/core/src/context/historyObserver.ts @@ -21,6 +21,8 @@ import type { ContextTracer } from './tracer.js'; export class HistoryObserver { private unsubscribeHistory?: () => void; + private seenNodeIds = new Set(); + constructor( private readonly chatHistory: AgentChatHistory, private readonly eventBus: ContextEventBus, @@ -40,14 +42,38 @@ export class HistoryObserver { this.chatHistory.get(), this.tokenCalculator, ); + + const newNodes = new Set(); + for (const ep of pristineEpisodes) { + if (!this.seenNodeIds.has(ep.id)) { + newNodes.add(ep.id); + this.seenNodeIds.add(ep.id); + } + if (!this.seenNodeIds.has(ep.trigger.id)) { + newNodes.add(ep.trigger.id); + this.seenNodeIds.add(ep.trigger.id); + } + for (const step of ep.steps) { + if (!this.seenNodeIds.has(step.id)) { + newNodes.add(step.id); + this.seenNodeIds.add(step.id); + } + } + if (ep.yield && !this.seenNodeIds.has(ep.yield.id)) { + newNodes.add(ep.yield.id); + this.seenNodeIds.add(ep.yield.id); + } + } + this.tracer.logEvent( 'HistoryObserver', 'Rebuilt pristine graph from chat history update', - { episodeCount: pristineEpisodes.length }, + { episodeCount: pristineEpisodes.length, newNodesCount: newNodes.size }, ); this.eventBus.emitPristineHistoryUpdated({ episodes: pristineEpisodes, + newNodes, }); }, ); diff --git a/packages/core/src/context/ir/episodeEditor.ts b/packages/core/src/context/ir/episodeEditor.ts index 8e71d55df4..fc62eeb96a 100644 --- a/packages/core/src/context/ir/episodeEditor.ts +++ b/packages/core/src/context/ir/episodeEditor.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { Episode } from './types.js'; +import type { Episode, IrNode } from './types.js'; export interface MutationRecord { episodeId: string; @@ -19,27 +19,89 @@ export class EpisodeEditor { private workingOrder: string[]; private workingMap: Map; private mutations: MutationRecord[] = []; + private targetNodes?: Set; - constructor(episodes: Episode[]) { + constructor(episodes: Episode[], targetNodes?: Set) { this.originalMap = new Map(episodes.map((e) => [e.id, e])); this.workingOrder = episodes.map((e) => e.id); this.workingMap = new Map(episodes.map((e) => [e.id, e])); + this.targetNodes = targetNodes; } /** - * Provides a readonly view of the current working state of the episodes. - * Processors should iterate over this to decide what to mutate. + * Provides a readonly view of the specific targets this processor is allowed to touch. + * If no targets were specified (e.g. fallback pipeline), it returns the entire history. */ - get episodes(): readonly Episode[] { + get targets(): Array<{ episode: Episode; node: IrNode | Episode }> { + const results: Array<{ episode: Episode; node: IrNode | Episode }> = []; + + for (const epId of this.workingOrder) { + const ep = this.workingMap.get(epId)!; + + // If we don't have restricted targets, everything is a target + if (!this.targetNodes) { + results.push({ episode: ep, node: ep }); + continue; + } + + // Check episode itself + if (this.targetNodes.has(ep.id)) { + results.push({ episode: ep, node: ep }); + } + // Check trigger + if (this.targetNodes.has(ep.trigger.id)) { + results.push({ episode: ep, node: ep.trigger }); + } + // Check steps + for (const step of ep.steps) { + if (this.targetNodes.has(step.id)) { + results.push({ episode: ep, node: step }); + } + } + // Check yield + if (ep.yield && this.targetNodes.has(ep.yield.id)) { + results.push({ episode: ep, node: ep.yield }); + } + } + + return results; + } + + /** + * Returns the full history for READ-ONLY context purposes. + * Processors should not iterate over this array to decide what to mutate. + * They should iterate over `editor.targets`. + */ + getFullHistory(): readonly Episode[] { return this.workingOrder.map((id) => this.workingMap.get(id)!); } + private isTargeted(episodeId: string): boolean { + if (!this.targetNodes) return true; + if (this.targetNodes.has(episodeId)) return true; + + const ep = this.workingMap.get(episodeId); + if (!ep) return false; + + if (this.targetNodes.has(ep.trigger.id)) return true; + if (ep.yield && this.targetNodes.has(ep.yield.id)) return true; + for (const step of ep.steps) { + if (this.targetNodes.has(step.id)) return true; + } + + return false; + } + /** * Safely edits an existing episode. - * The framework will handle deeply cloning the episode before passing it to the mutator, - * guaranteeing that original references are never modified. + * The framework will handle deeply cloning the episode before passing it to the mutator. + * Throws an error if the processor attempts to edit a non-targeted node. */ editEpisode(id: string, action: string, mutator: (draft: Episode) => void) { + if (!this.isTargeted(id)) { + throw new Error(`EpisodeEditor: Processor attempted to edit Episode ${id} which is outside its allowed target scope.`); + } + const ep = this.workingMap.get(id); if (!ep) return; @@ -82,6 +144,12 @@ export class EpisodeEditor { * It inserts the new episode at the lowest index of the removed episodes. */ replaceEpisodes(oldIds: string[], newEpisode: Episode, action: string) { + for (const id of oldIds) { + if (!this.isTargeted(id)) { + throw new Error(`EpisodeEditor: Processor attempted to replace Episode ${id} which is outside its allowed target scope.`); + } + } + const indices = oldIds .map((id) => this.workingOrder.indexOf(id)) .filter((i) => i !== -1); @@ -112,6 +180,12 @@ export class EpisodeEditor { * Removes episodes from the graph completely (e.g., emergency truncation). */ removeEpisodes(oldIds: string[], action: string) { + for (const id of oldIds) { + if (!this.isTargeted(id)) { + throw new Error(`EpisodeEditor: Processor attempted to remove Episode ${id} which is outside its allowed target scope.`); + } + } + this.workingOrder = this.workingOrder.filter((id) => !oldIds.includes(id)); for (const id of oldIds) { this.workingMap.delete(id); diff --git a/packages/core/src/context/ir/fromIr.ts b/packages/core/src/context/ir/fromIr.ts index b1d2be18b5..e9a624284f 100644 --- a/packages/core/src/context/ir/fromIr.ts +++ b/packages/core/src/context/ir/fromIr.ts @@ -6,12 +6,13 @@ import type { Content, Part } from '@google/genai'; import type { Episode, EpisodeStep, UserPrompt, AgentYield } from './types.js'; +import { isAgentThought, isToolExecution, isUserPrompt } from './graphUtils.js'; export function fromIr(episodes: Episode[]): Content[] { const history: Content[] = []; for (const ep of episodes) { - if (ep.trigger.type === 'USER_PROMPT') { + if (isUserPrompt(ep.trigger)) { const triggerContent = serializeTrigger(ep.trigger); if (triggerContent) history.push(triggerContent); } @@ -66,12 +67,12 @@ function serializeSteps(steps: EpisodeStep[]): Content[] { }; for (const step of steps) { - if (step.type === 'AGENT_THOUGHT') { + if (isAgentThought(step)) { if (pendingUserParts.length > 0) flushPending(); pendingModelParts.push({ text: step.presentation?.text ?? step.text, }); - } else if (step.type === 'TOOL_EXECUTION') { + } else if (isToolExecution(step)) { pendingModelParts.push({ functionCall: { name: step.toolName, diff --git a/packages/core/src/context/ir/graphUtils.ts b/packages/core/src/context/ir/graphUtils.ts index 62e9d0a4c2..8914ab177b 100644 --- a/packages/core/src/context/ir/graphUtils.ts +++ b/packages/core/src/context/ir/graphUtils.ts @@ -4,11 +4,35 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { Episode } from './types.js'; +import type { Episode, IrNode, AgentThought, ToolExecution, UserPrompt, AgentYield, SystemEvent } from './types.js'; import type { ContextTracer } from '../tracer.js'; import { debugLogger } from '../../utils/debugLogger.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; +export function isEpisode(node: IrNode | Episode): node is Episode { + return node.type === 'EPISODE'; +} + +export function isAgentThought(node: IrNode | Episode): node is AgentThought { + return node.type === 'AGENT_THOUGHT'; +} + +export function isToolExecution(node: IrNode | Episode): node is ToolExecution { + return node.type === 'TOOL_EXECUTION'; +} + +export function isUserPrompt(node: IrNode | Episode): node is UserPrompt { + return node.type === 'USER_PROMPT'; +} + +export function isAgentYield(node: IrNode | Episode): node is AgentYield { + return node.type === 'AGENT_YIELD'; +} + +export function isSystemEvent(node: IrNode | Episode): node is SystemEvent { + return node.type === 'SYSTEM_EVENT'; +} + /** * Generates a computed view of the pristine log. * Sweeps backwards (newest to oldest), tracking rolling tokens. @@ -42,7 +66,7 @@ export function generateWorkingBufferView( let projectedTrigger: typeof ep.trigger; - if (ep.trigger.type === 'USER_PROMPT') { + if (isUserPrompt(ep.trigger)) { projectedTrigger = { ...ep.trigger, metadata: { @@ -63,6 +87,7 @@ export function generateWorkingBufferView( let projectedEp: Episode = { ...ep, + type: 'EPISODE', trigger: projectedTrigger, steps: ep.steps.map((step) => ({ ...step, @@ -145,7 +170,7 @@ export function generateWorkingBufferView( masked.type === 'masked' ) { if ( - projectedEp.trigger.type === 'USER_PROMPT' && + isUserPrompt(projectedEp.trigger) && projectedEp.trigger.semanticParts && projectedEp.trigger.semanticParts.length > 0 ) { diff --git a/packages/core/src/context/ir/toIr.ts b/packages/core/src/context/ir/toIr.ts index 7081d0817a..ed0adc61c4 100644 --- a/packages/core/src/context/ir/toIr.ts +++ b/packages/core/src/context/ir/toIr.ts @@ -17,6 +17,7 @@ import type { SystemEvent, } from './types.js'; import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; +import { isAgentThought } from './graphUtils.js'; // WeakMap to provide stable, deterministic identity across parses for the exact same Content/Part references const nodeIdentityMap = new WeakMap(); @@ -199,6 +200,7 @@ function parseUserParts( }; return { + type: 'EPISODE', id: getStableId(msg), timestamp: Date.now(), trigger, @@ -247,7 +249,7 @@ function parseModelParts( function finalizeYield(currentEpisode: Partial) { if (currentEpisode.steps && currentEpisode.steps.length > 0) { const lastStep = currentEpisode.steps[currentEpisode.steps.length - 1]; - if (lastStep.type === 'AGENT_THOUGHT') { + if (isAgentThought(lastStep)) { const yieldNode: AgentYield = { id: lastStep.id, type: 'AGENT_YIELD', diff --git a/packages/core/src/context/ir/types.ts b/packages/core/src/context/ir/types.ts index fddf55197b..e46aa20ca6 100644 --- a/packages/core/src/context/ir/types.ts +++ b/packages/core/src/context/ir/types.ts @@ -188,6 +188,7 @@ export interface AgentYield extends IrNode { * internal reasoning and observations (Steps). */ export interface Episode { + readonly type: 'EPISODE'; readonly id: string; /** When the episode began */ readonly timestamp: number; diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index b114098a74..e2cfdee7eb 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -28,6 +28,13 @@ export interface ContextAccountingState { * True if currentTokens <= retainedTokens. */ readonly isBudgetSatisfied: boolean; + + /** + * If this pipeline was triggered by a specific event (e.g., a new turn), + * this contains the specific Node IDs (Episodes, Steps, or Triggers) that should be evaluated. + * If undefined, the processor may evaluate the entire graph. + */ + readonly targetNodeIds?: Set; } /** diff --git a/packages/core/src/context/processors/blobDegradationProcessor.ts b/packages/core/src/context/processors/blobDegradationProcessor.ts index 2981a5d7e8..25a7246468 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.ts @@ -8,6 +8,7 @@ import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import { sanitizeFilenamePart } from '../../utils/fileUtils.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; +import { isUserPrompt } from '../ir/graphUtils.js'; export type BlobDegradationProcessorOptions = Record; @@ -59,11 +60,14 @@ export class BlobDegradationProcessor implements ContextProcessor { }; // Forward scan, looking for bloated non-text parts to degrade - for (const ep of editor.episodes) { + for (const target of editor.targets) { + const ep = target.episode; + if (target.node !== ep.trigger) continue; + if (currentDeficit <= 0) break; if (state.protectedEpisodeIds.has(ep.id)) continue; - if (ep.trigger.type === 'USER_PROMPT') { + if (isUserPrompt(ep.trigger)) { for (let j = 0; j < ep.trigger.semanticParts.length; j++) { const part = ep.trigger.semanticParts[j]; if (currentDeficit <= 0) break; @@ -120,7 +124,7 @@ export class BlobDegradationProcessor implements ContextProcessor { ]); editor.editEpisode(ep.id, 'DEGRADE_BLOB', (draft) => { - if (draft.trigger.type === 'USER_PROMPT') { + if (isUserPrompt(draft.trigger)) { draft.trigger.semanticParts[j].presentation = { text: newText, tokens: newTokens, diff --git a/packages/core/src/context/processors/emergencyTruncationProcessor.ts b/packages/core/src/context/processors/emergencyTruncationProcessor.ts index ed1e120dd3..2b768829aa 100644 --- a/packages/core/src/context/processors/emergencyTruncationProcessor.ts +++ b/packages/core/src/context/processors/emergencyTruncationProcessor.ts @@ -39,7 +39,7 @@ export class EmergencyTruncationProcessor implements ContextProcessor { const toRemove: string[] = []; // We respect the global protected Episode IDs (like the system prompt at index 0) - for (const ep of editor.episodes) { + for (const ep of editor.getFullHistory()) { const epTokens = this._env.tokenCalculator.calculateEpisodeListTokens([ ep, ]); diff --git a/packages/core/src/context/processors/historySquashingProcessor.ts b/packages/core/src/context/processors/historySquashingProcessor.ts index add0c813fb..f7bcc3a572 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.ts @@ -8,6 +8,7 @@ import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import { truncateProportionally } from '../truncation.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; +import { isAgentThought, isUserPrompt } from '../ir/graphUtils.js'; export interface HistorySquashingProcessorOptions { maxTokensPerNode: number; @@ -88,12 +89,13 @@ export class HistorySquashingProcessor implements ContextProcessor { // We track how many tokens we still need to cut. If we hit 0, we can stop early! let currentDeficit = state.deficitTokens; - for (const ep of editor.episodes) { + for (const target of editor.targets) { + const ep = target.episode; if (currentDeficit <= 0) break; if (state.protectedEpisodeIds.has(ep.id)) continue; // 1. Squash User Prompts - if (ep.trigger.type === 'USER_PROMPT') { + if (target.node === ep.trigger && isUserPrompt(ep.trigger)) { for (let j = 0; j < ep.trigger.semanticParts.length; j++) { const part = ep.trigger.semanticParts[j]; if (part.type === 'text') { @@ -103,7 +105,7 @@ export class HistorySquashingProcessor implements ContextProcessor { currentDeficit, (p) => { editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => { - if (draft.trigger.type === 'USER_PROMPT') { + if (isUserPrompt(draft.trigger)) { draft.trigger.semanticParts[j].presentation = p; } }); @@ -124,11 +126,10 @@ export class HistorySquashingProcessor implements ContextProcessor { } // 2. Squash Model Thoughts - if (ep.steps) { - for (let j = 0; j < ep.steps.length; j++) { - const step = ep.steps[j]; - if (currentDeficit <= 0) break; - if (step.type === 'AGENT_THOUGHT') { + if (isAgentThought(target.node)) { + const step = target.node; + const j = ep.steps.findIndex(s => s.id === step.id); + if (j !== -1 && currentDeficit > 0) { const saved = this.tryApplySquash( step.text, limitChars, @@ -136,7 +137,7 @@ export class HistorySquashingProcessor implements ContextProcessor { (p) => { editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => { const draftStep = draft.steps[j]; - if (draftStep.type === 'AGENT_THOUGHT') { + if (isAgentThought(draftStep)) { draftStep.presentation = p; } }); @@ -144,7 +145,7 @@ export class HistorySquashingProcessor implements ContextProcessor { () => { editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => { const draftStep = draft.steps[j]; - if (draftStep.type === 'AGENT_THOUGHT') { + if (isAgentThought(draftStep)) { draftStep.metadata.transformations.push({ processorName: this.name, action: 'TRUNCATED', @@ -155,12 +156,11 @@ export class HistorySquashingProcessor implements ContextProcessor { }, ); currentDeficit -= saved; - } } } // 3. Squash Agent Yields - if (currentDeficit > 0 && ep.yield) { + if (currentDeficit > 0 && target.node === ep.yield && ep.yield) { const saved = this.tryApplySquash( ep.yield.text, limitChars, diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.ts b/packages/core/src/context/processors/semanticCompressionProcessor.ts index 9ba737124f..5e2eb19863 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.ts @@ -10,6 +10,7 @@ import { debugLogger } from '../../utils/debugLogger.js'; import { LlmRole } from '../../telemetry/types.js'; import { getResponseText } from '../../utils/partUtils.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; +import { isAgentThought, isToolExecution, isUserPrompt } from '../ir/graphUtils.js'; export interface SemanticCompressionProcessorOptions { nodeThresholdTokens: number; @@ -65,12 +66,13 @@ export class SemanticCompressionProcessor implements ContextProcessor { let currentDeficit = state.deficitTokens; // We scan backwards (oldest to newest would also work, but older is safer to degrade first) - for (const ep of editor.episodes) { + for (const target of editor.targets) { + const ep = target.episode; if (currentDeficit <= 0) break; if (state.protectedEpisodeIds.has(ep.id)) continue; // 1. Compress User Prompts - if (ep.trigger.type === 'USER_PROMPT') { + if (target.node === ep.trigger && isUserPrompt(ep.trigger)) { for (let j = 0; j < ep.trigger.semanticParts.length; j++) { const part = ep.trigger.semanticParts[j]; if (currentDeficit <= 0) break; @@ -92,7 +94,7 @@ export class SemanticCompressionProcessor implements ContextProcessor { if (newTokens < oldTokens) { editor.editEpisode(ep.id, 'SUMMARIZE_PROMPT', (draft) => { - if (draft.trigger.type === 'USER_PROMPT') { + if (isUserPrompt(draft.trigger)) { draft.trigger.semanticParts[j].presentation = { text: summary, tokens: newTokens, @@ -111,12 +113,10 @@ export class SemanticCompressionProcessor implements ContextProcessor { } // 2. Compress Model Thoughts - if (ep.steps) { - for (let j = 0; j < ep.steps.length; j++) { - const step = ep.steps[j]; - if (currentDeficit <= 0) break; - if (step.type === 'AGENT_THOUGHT') { - if (step.presentation) continue; + if (isAgentThought(target.node)) { + const step = target.node; + const j = ep.steps.findIndex(s => s.id === step.id); + if (j !== -1 && currentDeficit > 0 && !step.presentation) { if (step.text.length > thresholdChars) { const summary = await this.generateSummary( step.text, @@ -132,11 +132,21 @@ export class SemanticCompressionProcessor implements ContextProcessor { if (newTokens < oldTokens) { editor.editEpisode(ep.id, 'SUMMARIZE_THOUGHT', (draft) => { const draftStep = draft.steps[j]; - if (draftStep.type === 'AGENT_THOUGHT') { + if (isAgentThought(draftStep)) { draftStep.presentation = { text: summary, tokens: newTokens, }; + if (!draftStep.metadata) { + draftStep.metadata = { + transformations: [], + currentTokens: 0, + originalTokens: 0, + }; + } + if (!draftStep.metadata.transformations) { + draftStep.metadata.transformations = []; + } draftStep.metadata.transformations.push({ processorName: this.name, action: 'SUMMARIZED', @@ -147,11 +157,15 @@ export class SemanticCompressionProcessor implements ContextProcessor { currentDeficit -= oldTokens - newTokens; } } - } + } + } - // 3. Compress Tool Observations - if (step.type === 'TOOL_EXECUTION') { - const rawObs = step.presentation?.observation ?? step.observation; + // 3. Compress Tool Observations + if (isToolExecution(target.node)) { + const step = target.node; + const j = ep.steps.findIndex(s => s.id === step.id); + if (j !== -1 && currentDeficit > 0 && !step.presentation) { + const rawObs = (step.presentation as any)?.observation ?? step.observation; let stringifiedObs = ''; if (typeof rawObs === 'string') { @@ -164,40 +178,33 @@ export class SemanticCompressionProcessor implements ContextProcessor { } } - if ( - stringifiedObs.length > thresholdChars && - !stringifiedObs.includes('') - ) { + if (stringifiedObs.length > thresholdChars) { const summary = await this.generateSummary( stringifiedObs, - `Tool Output (${step.toolName})`, + step.toolName, ); - - // Wrap the summary in an object so the Gemini API accepts it as a valid functionResponse.response const newObsObject = { summary }; - const newObsTokens = - this.env.tokenCalculator.estimateTokensForParts([ - { - functionResponse: { - name: step.toolName, - response: newObsObject, - id: step.id, - }, + const newObsTokens = this.env.tokenCalculator.estimateTokensForParts([ + { + functionResponse: { + name: step.toolName, + response: newObsObject, + id: step.id, }, - ]); + }, + ]); const oldObsTokens = - step.presentation?.tokens?.observation ?? - step.tokens?.observation ?? - step.tokens; + (step.presentation as any)?.tokens?.observation ?? + step.tokens?.observation ?? step.tokens; const intentTokens = - step.presentation?.tokens?.intent ?? step.tokens?.intent ?? 0; + (step.presentation as any)?.tokens?.intent ?? step.tokens?.intent ?? 0; if (newObsTokens < oldObsTokens) { editor.editEpisode(ep.id, 'SUMMARIZE_TOOL', (draft) => { const draftStep = draft.steps[j]; - if (draftStep.type === 'TOOL_EXECUTION') { + if (isToolExecution(draftStep)) { draftStep.presentation = { intent: draftStep.presentation?.intent ?? draftStep.intent, @@ -227,7 +234,6 @@ export class SemanticCompressionProcessor implements ContextProcessor { currentDeficit -= oldObsTokens - newObsTokens; } } - } } } } diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 6872288130..a914b973b0 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -14,6 +14,7 @@ import { v4 as uuidv4 } from 'uuid'; import { LlmRole } from '../../telemetry/llmRole.js'; import { debugLogger } from 'src/utils/debugLogger.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; +import { isSystemEvent, isToolExecution, isUserPrompt } from '../ir/graphUtils.js'; export interface StateSnapshotProcessorOptions { model?: string; @@ -58,11 +59,11 @@ export class StateSnapshotProcessor implements ContextProcessor { let deficitAccumulator = 0; const selectedEpisodes: Episode[] = []; - for (let i = 1; i < editor.episodes.length - 1; i++) { - const ep = editor.episodes[i]; + for (let i = 1; i < editor.getFullHistory().length - 1; i++) { + const ep = editor.getFullHistory()[i]; selectedEpisodes.push(ep); let triggerText = ''; - if (ep.trigger?.type === 'USER_PROMPT') { + if (isUserPrompt(ep.trigger)) { const firstPart = ep.trigger.semanticParts?.[0]; if (firstPart) { triggerText = @@ -102,7 +103,7 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; for (const ep of episodes) { - if (ep.trigger?.type === 'USER_PROMPT') { + if (isUserPrompt(ep.trigger)) { const partsText = ep.trigger.semanticParts .map((p) => { if (p.type === 'text') return p.text; @@ -111,11 +112,11 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo }) .join(''); userPromptText += `USER: ${partsText}\n`; - } else if (ep.trigger?.type === 'SYSTEM_EVENT') { + } else if (isSystemEvent(ep.trigger)) { userPromptText += `[SYSTEM EVENT: ${ep.trigger.name}]\n`; } for (const step of ep.steps) { - if (step.type === 'TOOL_EXECUTION') { + if (isToolExecution(step)) { userPromptText += `[Tool Called: ${step.toolName}]\n`; } } @@ -144,6 +145,7 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo ]); return { + type: 'EPISODE', id: newId, timestamp: Date.now(), trigger: { diff --git a/packages/core/src/context/processors/toolMaskingProcessor.test.ts b/packages/core/src/context/processors/toolMaskingProcessor.test.ts index f74b02eb56..7201e067e5 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.test.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.test.ts @@ -47,6 +47,7 @@ describe('ToolMaskingProcessor', () => { intent: Record, observation: Record, ): Episode => ({ + type: 'EPISODE', id, timestamp: Date.now(), trigger: { diff --git a/packages/core/src/context/processors/toolMaskingProcessor.ts b/packages/core/src/context/processors/toolMaskingProcessor.ts index 15812d1629..30d5dd6b2a 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.ts @@ -148,7 +148,7 @@ export class ToolMaskingProcessor implements ContextProcessor { }; // Forward scan, looking for massive intents or observations to mask - for (const ep of editor.episodes) { + for (const ep of editor.getFullHistory()) { if (currentDeficit <= 0) break; if (!ep || !ep.steps || state.protectedEpisodeIds.has(ep.id)) continue; diff --git a/packages/core/src/context/sidecar/orchestrator.test.ts b/packages/core/src/context/sidecar/orchestrator.test.ts index 3ecd342263..9a3e2213a1 100644 --- a/packages/core/src/context/sidecar/orchestrator.test.ts +++ b/packages/core/src/context/sidecar/orchestrator.test.ts @@ -28,8 +28,9 @@ class DummySyncProcessor implements ContextProcessor { readonly id = 'DummySync'; readonly options = {}; async process(editor: EpisodeEditor, _state: ContextAccountingState) { + if (editor.targets.length === 0) return; editor.editEpisode( - editor.episodes[0].id, + editor.targets[0].episode.id, 'DUMMY_EDIT', (draft: unknown) => { (draft as Record)['dummyModified'] = true; @@ -47,8 +48,9 @@ class DummyAsyncProcessor implements ContextProcessor { readonly id = 'DummyAsync'; readonly options = {}; async process(editor: EpisodeEditor, _state: ContextAccountingState) { + if (editor.targets.length === 0) return; editor.editEpisode( - editor.episodes[0].id, + editor.targets[0].episode.id, 'DUMMY_EDIT', (draft: unknown) => { (draft as Record)['dummyAsyncModified'] = true; @@ -285,7 +287,7 @@ describe('PipelineOrchestrator (Component)', () => { const episodes = [createDummyEpisode('1', 'USER_PROMPT', [])]; // Emit the trigger - eventBus.emitConsolidationNeeded({ episodes, targetDeficit: 100 }); + eventBus.emitConsolidationNeeded({ episodes, targetDeficit: 100, targetNodeIds: new Set() }); expect(executeSpy).toHaveBeenCalled(); }); diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 10905f0015..89795fc2e2 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -15,6 +15,7 @@ import type { import type { ProcessorRegistry } from './registry.js'; import { debugLogger } from '../../utils/debugLogger.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; +import { isUserPrompt } from '../ir/graphUtils.js'; export class PipelineOrchestrator { private activeTimers: NodeJS.Timeout[] = []; @@ -78,6 +79,20 @@ export class PipelineOrchestrator { isBudgetSatisfied: false, deficitTokens: event.targetDeficit, protectedEpisodeIds: new Set(), + targetNodeIds: event.targetNodeIds, + }; + void this.executePipelineAsync(pipeline, event.episodes, state); + }); + } else if (trigger === 'on_turn') { + this.eventBus.onChunkReceived((event) => { + const state: ContextAccountingState = { + currentTokens: 0, + retainedTokens: this.config.budget.retainedTokens, + maxTokens: this.config.budget.maxTokens, + isBudgetSatisfied: false, + deficitTokens: 0, + protectedEpisodeIds: new Set(), + targetNodeIds: event.targetNodeIds, }; void this.executePipelineAsync(pipeline, event.episodes, state); }); @@ -130,7 +145,7 @@ export class PipelineOrchestrator { 'Orchestrator', `Executing processor: ${procDef.processorId}`, ); - const editor = new EpisodeEditor(currentEpisodes); + const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds); await processor.process(editor, state); currentEpisodes = editor.getFinalEpisodes(); } catch (error) { @@ -171,7 +186,7 @@ export class PipelineOrchestrator { `Executing processor: ${procDef.processorId} (async)`, ); - const editor = new EpisodeEditor(currentEpisodes); + const editor = new EpisodeEditor(currentEpisodes, state.targetNodeIds); await processor.process(editor, state); currentEpisodes = editor.getFinalEpisodes(); @@ -190,7 +205,7 @@ export class PipelineOrchestrator { const ep = mutation.episode!; let fallbackText = ''; if (ep.yield?.text) fallbackText = ep.yield.text; - else if (ep.trigger?.type === 'USER_PROMPT') { + else if (isUserPrompt(ep.trigger)) { const firstPart = ep.trigger.semanticParts?.[0]; if (firstPart) { fallbackText = diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index fa15f61a05..2668419a75 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -78,6 +78,7 @@ export function createDummyEpisode( } return { + type: 'EPISODE', id, timestamp: Date.now(), trigger,