diff --git a/packages/core/src/context/ASYNC_GC_DESIGN.md b/packages/core/src/context/ASYNC_GC_DESIGN.md new file mode 100644 index 0000000000..7d96ca50e1 --- /dev/null +++ b/packages/core/src/context/ASYNC_GC_DESIGN.md @@ -0,0 +1,47 @@ +# Asynchronous Context Management (Dataflow Architecture) + +## The Problem +Context management today is an emergency response. When a chat session hits the maximum token limit (`maxTokens`), the system halts the user's request, synchronously runs expensive compression pipelines (masking tools, summarizing text with LLMs), and only proceeds when the token count falls below the limit. This introduces unacceptable latency and forces trade-offs between speed and data fidelity. + +## The Vision: Eager Subconscious Compute +Instead of a reactive, synchronous pipeline, Context Management should be an **asynchronous dataflow graph**. + +Because we know old memory will *eventually* need to be degraded or garbage collected, we should utilize the agent's idle time (while the user is reading or typing) to proactively compute "degraded variants" of episodes before there is any context pressure. + +### The Three Phases of Memory Lifecycle + +#### 1. The Eager Compute Phase (Background / Continuous Streaming) +Context pressure doesn't wait for an episode to finish. If a user pastes a 100k-token file, the budget explodes instantly. Therefore, the dataflow graph is fed continuously. +* Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a `TOOL_EXECUTION` step) and broadcast immediately. +* **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background workers.** +* They eagerly compute degraded variants on partial episodes. For instance, `SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the millisecond it arrives, without waiting for the model to reply. +* It attaches the result to the IR graph as `Episode#1.trigger.variants.summary`. +* **Result:** This costs the user zero latency. The agent is "dreaming/consolidating" granular memory chunks in the background, even during long-running "mono-episodes." + +#### 2. Opportunistic Replacement (`retainedTokens` Threshold) +When the active context window crosses the "ideal" size (e.g., 65k tokens): +* The system identifies the oldest episodes that have fallen outside the `retained` window. +* It checks if they have pre-computed variants (e.g., a `summary` or `masked` variant). +* If yes, it instantly and silently swaps the raw episode for the degraded variant. +* **Result:** The context gently decays over time, completely avoiding hard limits for as long as possible, with zero latency cost. + +#### 3. The Pressure Barrier (`maxTokens` Hard Limit) +When the active context window crosses the absolute hard limit (e.g., 150k tokens)β€”perhaps because the user pasted a massive file and the background workers couldn't keep upβ€”the system hits a **Synchronous Barrier**. + +At this barrier, the `ContextManager` checks the user's configured `ContextPressureStrategy` to decide how to unblock the request: + +* **Strategy A: `truncate` (The Baseline)** + * *Behavior:* Instantly drop the oldest episodes until under `maxTokens`. + * *Tradeoff:* Maximum speed, maximum data loss. +* **Strategy B: `incrementalGc` (Progressive)** + * *Behavior:* Look for any pre-computed summaries/masks. If none exist, synchronously block to compute *just enough* summaries to survive the current turn. + * *Tradeoff:* Medium speed, medium data retention. +* **Strategy C: `compress` (State Snapshot)** + * *Behavior:* Identify the oldest N episodes causing the overflow. If their N-to-1 World State Snapshot isn't ready yet, **block the user's request** and force the `StateSnapshotProcessor` to generate it synchronously. Once generated, replace the N episodes with the 1 snapshot and proceed. + * *Tradeoff:* Maximum latency, maximum data retention/fidelity. + +## Architectural Changes Required +1. **Episode Variants:** Update the `Episode` IR type to support a `variants` dictionary. +2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of `AgentChatHistory`). +3. **Processor Interface:** Change `ContextProcessor` from a synchronous `process(episodes[])` function to an asynchronous worker that listens to the event bus, updates the `variants` dictionary, and emits `VARIANT_READY` events. +4. **Projection Logic:** Update `projectCompressedHistory()` to act as the Pressure Barrier, reading the user's strategy and either applying ready variants, waiting for variants, or truncating. diff --git a/packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md b/packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000000..b08cef2a9b --- /dev/null +++ b/packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md @@ -0,0 +1,86 @@ +# Asynchronous Context Management Implementation Plan + +This document outlines the step-by-step implementation plan for refactoring `ContextManager` into a fully asynchronous, event-driven dataflow graph (Eager Subconscious Compute). + +--- + +## Phase 1: Stable Identity & Incremental IR Mapping +**The Problem:** Currently, `IrMapper.toIr()` is stateless. It generates random UUIDs for `Episode` and `Step` nodes every time it parses the `Content[]` array. If the array is rebuilt while an asynchronous processor is computing a summary, the target ID will be lost, and the variant will be orphaned. +**The Goal:** Episodes must maintain a stable identity across turns so background workers can confidently attach variants to them. + +**Tasks:** +1. **Deterministic Hashing or Stateful Mapping:** Update `IrMapper` to either generate deterministic UUIDs (e.g., hashing the part text/timestamp) OR make `ContextManager`'s pristine graph mutable, where new `PUSH` events are mapped *incrementally* onto the tail of `this.pristineEpisodes` rather than rebuilding the whole array. +2. **Test Update:** Ensure `IrMapper` tests verify stable IDs across successive parse events. + +--- + +## Phase 2: Data Structures & Event Bus +**The Problem:** The system lacks the internal types and communication channels to support asynchronous variant generation. +**The Goal:** Define the `Variant` schemas and the internal `EventEmitter` that will broadcast graph updates to the async workers. + +**Tasks:** +1. **Variant Types:** Update `packages/core/src/context/ir/types.ts`. + * Add a `variants?: Record` property to `Episode` and `Step` (where `Variant` is a discriminated union of `SummaryVariant`, `MaskedVariant`, `SnapshotVariant`, etc.). + * Include metadata on the variant: `status: 'computing' | 'ready' | 'failed'`, `promise?: Promise`, `recoveredTokens: number`. +2. **Event Bus (`ContextEventBus`):** + * Create an internal event emitter in `ContextManager` (using `events.EventEmitter` or a lightweight alternative). + * Define Events: + * `IR_NODE_CREATED`: Fired when a new Episode/Step is mapped. (Triggers eager compute). + * `VARIANT_READY`: Fired by a worker when it finishes computing a summary/snapshot. + * `BUDGET_RETAINED_CROSSED`: Fired when `currentTokens > retainedTokens`. + * `BUDGET_MAX_CROSSED`: Fired when `currentTokens > maxTokens`. + +--- + +## Phase 3: Refactoring Processors into Async Workers +**The Problem:** Processors currently implement a synchronous `process(episodes, state) -> Promise` interface and block the main loop. +**The Goal:** Convert them into background workers that listen to the `ContextEventBus`, perform LLM tasks asynchronously, and emit `VARIANT_READY`. + +**Tasks:** +1. **Define `AsyncContextWorker` Interface:** + * `start(bus: ContextEventBus): void` + * `stop(): void` +2. **Implement `SemanticCompressionWorker`:** + * Listens to `IR_NODE_CREATED` (or `BUDGET_RETAINED_CROSSED` for lazier eager compute). + * Batches old `USER_PROMPT` nodes. + * Calls LLM in background. + * Emits `VARIANT_READY` with the summary string and target Node IDs. +3. **Implement `StateSnapshotWorker`:** + * Listens to `BUDGET_RETAINED_CROSSED`. + * Identifies the N oldest raw episodes. + * Synthesizes them into a single `world_state_snapshot`. + * Emits `VARIANT_READY` containing the new Snapshot Episode and the IDs of the N episodes it replaces. +4. **Wire Event Listeners:** `ContextManager` listens to `VARIANT_READY` and updates the pristine graph's `variants` dictionary. + +--- + +## Phase 4: The Projection Engine & Pressure Barrier +**The Problem:** `projectCompressedHistory()` currently runs the synchronous pipeline. It needs to become the non-blocking opportunistic swapper and the blocking pressure barrier. +**The Goal:** Serve the LLM request instantly using pre-computed variants, or block strictly according to the user's `maxPressureStrategy`. + +**Tasks:** +1. **Opportunistic Swap (`retainedTokens`):** + * When traversing `this.pristineEpisodes` to build the projected array, if `currentTokens > retainedTokens`, check the oldest episodes. + * If an episode has a `variant.status === 'ready'`, use the variant's tokens and text *instead* of the raw episode. +2. **Pressure Barrier (`maxTokens`):** + * If the projected array is *still* `> maxTokens` after all ready variants are applied, hit the Barrier. + * Read `config.getContextManagementConfig().budget.maxPressureStrategy`. + * **If `truncate`:** Instantly drop the oldest episodes from the projection until under budget. (Fastest). + * **If `incrementalGc`:** Await any variants that are `status === 'computing'` for the oldest nodes until the deficit is cleared. If none are computing, force a synchronous masking/truncation. + * **If `compress`:** Await the `StateSnapshotWorker`'s active `Promise`. If it hasn't started, synchronously invoke it and block until the N-to-1 snapshot is ready. + +--- + +## Phase 5: Configuration & Telemetry +**The Goal:** Expose the new strategies to the user and ensure we can observe the background workers. + +**Tasks:** +1. **Config Schema:** Update `settingsSchema.ts` to include `maxPressureStrategy: 'truncate' | 'incrementalGc' | 'compress'`. +2. **Telemetry:** Log events when background workers start/finish, including the tokens saved and the latency of the background task. +3. **Testing:** Write concurrency tests simulating a user typing rapidly while background summaries are still resolving, ensuring no data corruption or dropped variants. + +--- + +## Open Questions & Risks +* **API Cost:** Eager compute means we might summarize an episode that the user *never* actually hits the context limit for. Should Eager Compute only begin when `current > retained`, or truly immediately? (Recommendation: Start at `retained` to save money, but `max` must be high enough above `retained` to give the async workers time to finish). +* **Race Conditions:** If the user deletes a message via the UI (triggering `AgentChatHistory.map/flatMap`), we must cleanly abort any pending Promises in the background workers for those deleted IDs. \ No newline at end of file diff --git a/packages/core/src/context/ASYNC_GC_STATUS_REPORT.md b/packages/core/src/context/ASYNC_GC_STATUS_REPORT.md new file mode 100644 index 0000000000..d5e3e8ab1e --- /dev/null +++ b/packages/core/src/context/ASYNC_GC_STATUS_REPORT.md @@ -0,0 +1,53 @@ +# Asynchronous Context Management: Status Report & Bug Sweep +*Date: End of Day 1* + +## 1. Inventory against Implementation Plan + +### βœ… Phase 1: Stable Identity & Incremental IR Mapping (100% Complete) +* **Accomplished:** Implemented an `IdentityMap` (`WeakMap`) in `IrMapper`. +* **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based on the underlying `Content` object references. Re-parsing the history array no longer orphans background variants. + +### βœ… Phase 2: Data Structures & Event Bus (100% Complete) +* **Accomplished:** Added `variants?: Record` to `Episode` IR types. +* **Accomplished:** Created `ContextEventBus` class and instantiated it on `ContextManager`. +* **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation) on every `PUSH`. + +### πŸ”„ Phase 3: Refactoring Processors into Async Workers (80% Complete) +* **Accomplished:** Defined `AsyncContextWorker` interface. +* **Accomplished:** Refactored `StateSnapshotProcessor` into `StateSnapshotWorker`. It successfully listens to the bus, batches unprotected dying episodes, and emits a `VARIANT_READY` event. +* **Pending:** Replace `setTimeout` dummy execution with the actual `config.getBaseLlmClient().generateContent()` API call. + +### πŸ”„ Phase 4.1: Opportunistic Replacement Engine (100% Complete) +* **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse from newest to oldest. When `rollingTokens > retainedTokens`, it successfully swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist. + +### ❌ Phase 4.2: The Synchronous Pressure Barrier (0% Complete) +* **Pending:** Implement the hard block at the end of `projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens` after all opportunistic swaps are applied. Must respect `maxPressureStrategy` (truncate, incrementalGc, compress). + +### ❌ Phase 5: Configuration & Telemetry (0% Complete) +* **Pending:** Expose `maxPressureStrategy` in `settingsSchema.ts`. Write rigorous concurrency tests. + +--- + +## 2. Bug Sweep & Architectural Review (Critical Findings) + +During our end-of-day audit, we challenged our assumptions and swept the new code. We discovered two critical logic flaws that must be addressed first thing tomorrow: + +### 🚨 Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting) +**The Flaw:** +In `StateSnapshotWorker`, we synthesize `N` episodes (e.g., Episodes 1, 2, 3) into a single `SnapshotVariant`. We currently attach this variant *only* to the newest episode in the batch (Episode 3) via `targetId`. +When the Opportunistic Swapper loops backwards (`i = 3, 2, 1`), it hits Episode 3, sees the Snapshot, and injects it. But then the loop continues to Episode 2 and Episode 1! Since they don't have the variant attached, the swapper injects them as **raw text**. The final projection contains *both* the snapshot AND the raw text it was supposed to replace. +**The Fix (The Working Buffer Architecture):** +Instead of projecting variants on the fly during a backwards sweep, the `ContextManager` will maintain two separate graphs: an immutable `pristineLog` (for future offloading to the Memory Wheel) and a mutable `workingContext`. When the `StateSnapshotWorker` finishes, it structurally *replaces* the N raw episodes with the 1 Snapshot episode directly in the `workingContext` array. This eliminates the duplicate projection bug entirely. + +### 🚨 Bug 2: Infinite RAM Growth (Pristine Graph Accumulation) +**The Flaw:** +Async variants only replace text in the *Projected* graph. The *Pristine* graph inside `ContextManager` (`this.pristineEpisodes`) never shrinks. Because `checkTriggers()` calculates tokens based on the pristine graph, once the history crosses `retainedTokens` (65k), it will *always* be over 65k, emitting `BUDGET_RETAINED_CROSSED` on every single turn forever. +Furthermore, if we never delete episodes from the pristine graph, the Node.js process will eventually run out of heap memory (OOM) on extremely long sessions. +**The Fix (The Working Buffer Architecture):** +By calculating the token budget against the mutable `workingContext` (which is actively compacted by background snapshots) rather than the immutable `pristineLog`, the token count will successfully drop back below `retainedTokens` (65k). This breaks the infinite event loop and prevents OOM crashes. The `pristineLog` will just grow until the future Memory Subsystem is built to page it to disk. + +### 🚨 Minor Risk: Identity Map Mutation +**The Risk:** +`IrMapper` relies on `WeakMap`. If the user uses a UI command to *edit* a previous message, `AgentChatHistory` might replace the `Content` object reference. This would generate a new UUID, instantly orphaning any background variants currently computing for the old reference. +**The Mitigation:** +We must ensure `ContextManager` handles orphaned `VARIANT_READY` events gracefully (e.g., if `targetId` is not found, simply discard the variant and log a debug warning). (I verified we already wrote `if (targetEp)` checks in `ContextManager`, so this is mitigated). \ No newline at end of file diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 4e7aaba8dd..494f5ac75d 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -12,6 +12,8 @@ import { debugLogger } from '../utils/debugLogger.js'; import { IrMapper } from './ir/mapper.js'; import type { Episode } from './ir/types.js'; +import { ContextEventBus } from './eventBus.js'; + export class ContextManager { private config: Config; private processors: ContextProcessor[] = []; @@ -20,9 +22,23 @@ export class ContextManager { // This allows the agent to remember and summarize continuously without losing data across turns. private pristineEpisodes: Episode[] = []; private unsubscribeHistory?: () => void; + public readonly eventBus: ContextEventBus; constructor(config: Config, _client: GeminiClient) { this.config = config; + this.eventBus = new ContextEventBus(); + + this.eventBus.onVariantReady((event) => { + // Find the target episode in the pristine graph + const targetEp = this.pristineEpisodes.find(ep => ep.id === event.targetId); + if (targetEp) { + if (!targetEp.variants) { + targetEp.variants = {}; + } + targetEp.variants[event.variantId] = event.variant; + debugLogger.log(`ContextManager: Received async variant [${event.variantId}] for Episode ${event.targetId}.`); + } + }); } setProcessors(processors: ContextProcessor[]) { @@ -44,9 +60,31 @@ export class ContextManager { // function calls and responses into unified Episodes. Pushing messages // individually would shatter these episodic boundaries. this.pristineEpisodes = IrMapper.toIr(chatHistory.get()); + this.checkTriggers(); // Eager Compute & Ship of Theseus Triggers }); } + private checkTriggers() { + if (!this.config.isContextManagementEnabled()) return; + + const mngConfig = this.config.getContextManagementConfig(); + const currentTokens = this.calculateIrTokens(this.pristineEpisodes); + + // 1. Eager Compute Trigger (Continuous Streaming) + // Broadcast the full graph to the async workers so they can proactively summarize partial massive files. + this.eventBus.emitChunkReceived({ episodes: this.pristineEpisodes }); + + // 2. The Ship of Theseus Trigger (retainedTokens crossed) + // If we exceed 65k, tell the background processors to opportunistically synthesize the oldest nodes. + if (currentTokens > mngConfig.budget.retainedTokens) { + const deficit = currentTokens - mngConfig.budget.retainedTokens; + this.eventBus.emitConsolidationNeeded({ + episodes: this.pristineEpisodes, + targetDeficit: deficit, + }); + } + } + /** * Returns a temporary, compressed Content[] array to be used exclusively for the LLM request. * This does NOT mutate the pristine episodic graph. @@ -65,12 +103,60 @@ export class ContextManager { // Deep-ish clone the IR graph so processors only mutate the projected copy. // The processors only modify `presentation` and `metadata.transformations`. - let currentEpisodes: Episode[] = this.pristineEpisodes.map((ep) => ({ - ...ep, - trigger: { ...ep.trigger, metadata: { ...ep.trigger.metadata, transformations: [...ep.trigger.metadata.transformations] }, semanticParts: ep.trigger.type === 'USER_PROMPT' ? [...ep.trigger.semanticParts.map(sp => ({...sp}))] : undefined } as any, - steps: ep.steps.map((step) => ({ ...step, metadata: { ...step.metadata, transformations: [...step.metadata.transformations] } } as any)), - yield: ep.yield ? { ...ep.yield, metadata: { ...ep.yield.metadata, transformations: [...ep.yield.metadata.transformations] } } : undefined, - })); + // 1. Opportunistic Swap (The Ship of Theseus) + // We build the projection array by sweeping through pristine history. + // If we are over the retained threshold, we look for pre-computed, 'ready' variants + // and seamlessly inject them instead of the raw text. + let currentEpisodes: Episode[] = []; + let rollingTokens = 0; + + // We walk backwards (newest to oldest) to easily know when we cross the retained threshold. + for (let i = this.pristineEpisodes.length - 1; i >= 0; i--) { + const ep = this.pristineEpisodes[i]; + let projectedEp = { + ...ep, + trigger: { ...ep.trigger, metadata: { ...ep.trigger.metadata, transformations: [...ep.trigger.metadata.transformations] }, semanticParts: ep.trigger.type === 'USER_PROMPT' ? [...ep.trigger.semanticParts.map(sp => ({...sp}))] : undefined } as any, + steps: ep.steps.map((step) => ({ ...step, metadata: { ...step.metadata, transformations: [...step.metadata.transformations] } } as any)), + yield: ep.yield ? { ...ep.yield, metadata: { ...ep.yield.metadata, transformations: [...ep.yield.metadata.transformations] } } : undefined, + }; + + const epTokens = this.calculateIrTokens([projectedEp]); + + // If this episode falls entirely outside the retained threshold AND has a ready variant, swap it! + if (rollingTokens > retainedTokens && ep.variants) { + // Look for the best available variant + const snapshot = ep.variants['snapshot']; + const summary = ep.variants['summary']; + const masked = ep.variants['masked']; + + if (snapshot && snapshot.status === 'ready' && snapshot.type === 'snapshot') { + // A snapshot replaces this node ENTIRELY (and potentially others, but for now we just swap this node) + // To be perfectly accurate, a snapshot variant usually replaces multiple episodes. + // But as a simplistic projection, we just use the snapshot's episode structure. + projectedEp = snapshot.episode as any; + debugLogger.log(`Opportunistically swapped Episode ${ep.id} for pre-computed Snapshot variant.`); + } else if (summary && summary.status === 'ready' && summary.type === 'summary') { + // A summary replaces all the steps with a single thought containing the summary text. + projectedEp.steps = [{ + id: ep.id + '-summary', + type: 'AGENT_THOUGHT', + text: summary.text, + metadata: { originalTokens: epTokens, currentTokens: summary.recoveredTokens || 50, transformations: [{ processorName: 'AsyncSemanticCompressor', action: 'SUMMARIZED', timestamp: Date.now() }] } + }] as any; + projectedEp.yield = undefined; // Drop the yield, the summary covers it + debugLogger.log(`Opportunistically swapped Episode ${ep.id} for pre-computed Summary variant.`); + } else if (masked && masked.status === 'ready' && masked.type === 'masked') { + // We just replace the raw text with the masked text variant + if (projectedEp.trigger.type === 'USER_PROMPT' && projectedEp.trigger.semanticParts.length > 0) { + projectedEp.trigger.semanticParts[0].presentation = { text: masked.text, tokens: masked.recoveredTokens || 10 }; + } + debugLogger.log(`Opportunistically swapped Episode ${ep.id} for pre-computed Masked variant.`); + } + } + + currentEpisodes.unshift(projectedEp); // Put it back in oldest-to-newest order + rollingTokens += this.calculateIrTokens([projectedEp]); + } let currentTokens = this.calculateIrTokens(currentEpisodes); diff --git a/packages/core/src/context/eventBus.ts b/packages/core/src/context/eventBus.ts new file mode 100644 index 0000000000..dc9e8b5b74 --- /dev/null +++ b/packages/core/src/context/eventBus.ts @@ -0,0 +1,49 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { EventEmitter } from 'node:events'; +import type { Episode, Variant } from './ir/types.js'; + +export interface ContextConsolidationEvent { + episodes: Episode[]; + targetDeficit: number; +} + +export interface IrChunkReceivedEvent { + episodes: Episode[]; +} + +export interface VariantReadyEvent { + targetId: string; // The Episode or Step ID this variant attaches to + variantId: string; // A unique ID for the variant itself + variant: Variant; +} + +export class ContextEventBus extends EventEmitter { + emitChunkReceived(event: IrChunkReceivedEvent) { + this.emit('IR_CHUNK_RECEIVED', event); + } + + onChunkReceived(listener: (event: IrChunkReceivedEvent) => void) { + this.on('IR_CHUNK_RECEIVED', listener); + } + + emitConsolidationNeeded(event: ContextConsolidationEvent) { + this.emit('BUDGET_RETAINED_CROSSED', event); + } + + onConsolidationNeeded(listener: (event: ContextConsolidationEvent) => void) { + this.on('BUDGET_RETAINED_CROSSED', listener); + } + + emitVariantReady(event: VariantReadyEvent) { + this.emit('VARIANT_READY', event); + } + + onVariantReady(listener: (event: VariantReadyEvent) => void) { + this.on('VARIANT_READY', listener); + } +} diff --git a/packages/core/src/context/ir/mapper.ts b/packages/core/src/context/ir/mapper.ts index 00b802a0bd..32c9c0187d 100644 --- a/packages/core/src/context/ir/mapper.ts +++ b/packages/core/src/context/ir/mapper.ts @@ -17,6 +17,18 @@ import type { } from './types.js'; import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; +// WeakMap to provide stable, deterministic identity across parses for the exact same Content/Part references +const nodeIdentityMap = new WeakMap(); + +function getStableId(obj: object): string { + let id = nodeIdentityMap.get(obj); + if (!id) { + id = randomUUID(); + nodeIdentityMap.set(obj, id); + } + return id; +} + export class IrMapper { /** * Translates a flat Gemini Content[] array into our rich Episodic Intermediate Representation. @@ -55,10 +67,10 @@ export class IrMapper { if (hasToolResponses) { if (!currentEpisode) { currentEpisode = { - id: randomUUID(), + id: getStableId(msg), timestamp: Date.now(), trigger: { - id: randomUUID(), + id: getStableId(msg.parts[0] || msg), type: 'SYSTEM_EVENT', name: 'history_resume', payload: {}, @@ -79,7 +91,7 @@ export class IrMapper { const obsTokens = estimateTokenCountSync([part]); const step: ToolExecution = { - id: randomUUID(), + id: getStableId(part), type: 'TOOL_EXECUTION', toolName: part.functionResponse.name || 'unknown', intent: @@ -134,7 +146,7 @@ export class IrMapper { } const trigger: UserPrompt = { - id: randomUUID(), + id: getStableId(msg.parts[0] || msg), type: 'USER_PROMPT', semanticParts, metadata: createMetadata( @@ -143,7 +155,7 @@ export class IrMapper { }; currentEpisode = { - id: randomUUID(), + id: getStableId(msg), timestamp: Date.now(), trigger, steps: [], @@ -152,10 +164,10 @@ export class IrMapper { } else if (msg.role === 'model') { if (!currentEpisode) { currentEpisode = { - id: randomUUID(), + id: getStableId(msg), timestamp: Date.now(), trigger: { - id: randomUUID(), + id: getStableId(msg.parts[0] || msg), type: 'SYSTEM_EVENT', name: 'model_init', payload: {}, @@ -171,7 +183,7 @@ export class IrMapper { if (callId) pendingCallParts.set(callId, part); } else if (part.text) { const thought: AgentThought = { - id: randomUUID(), + id: getStableId(part), type: 'AGENT_THOUGHT', text: part.text, metadata: createMetadata([part]), diff --git a/packages/core/src/context/ir/types.ts b/packages/core/src/context/ir/types.ts index 3625829734..e60964304f 100644 --- a/packages/core/src/context/ir/types.ts +++ b/packages/core/src/context/ir/types.ts @@ -37,11 +37,39 @@ export type IrNodeType = | 'TOOL_EXECUTION' | 'AGENT_YIELD'; +/** Base interface for all nodes in the Episodic IR */ +export type VariantStatus = 'computing' | 'ready' | 'failed'; + +export interface BaseVariant { + status: VariantStatus; + recoveredTokens?: number; + error?: string; +} + +export interface SummaryVariant extends BaseVariant { + type: 'summary'; + text: string; +} + +export interface MaskedVariant extends BaseVariant { + type: 'masked'; + text: string; +} + +export interface SnapshotVariant extends BaseVariant { + type: 'snapshot'; + episode: Episode; + replacedEpisodeIds: string[]; +} + +export type Variant = SummaryVariant | MaskedVariant | SnapshotVariant; + /** Base interface for all nodes in the Episodic IR */ export interface IrNode { readonly id: string; readonly type: IrNodeType; metadata: IrMetadata; + variants?: Record; } /** @@ -161,6 +189,7 @@ export interface Episode { readonly id: string; /** When the episode began */ readonly timestamp: number; + variants?: Record; /** The event that initiated this run */ trigger: EpisodeTrigger; diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts new file mode 100644 index 0000000000..5788a29019 --- /dev/null +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -0,0 +1,122 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { randomUUID } from 'node:crypto'; +import type { Config } from '../../config/config.js'; +import type { Episode } from '../ir/types.js'; +import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; +import { debugLogger } from '../../utils/debugLogger.js'; +import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; + +export class StateSnapshotProcessor implements ContextProcessor { + name = 'StateSnapshotProcessor'; + + constructor(_config: Config) {} + + async process( + episodes: Episode[], + state: ContextAccountingState, + ): Promise { + if (state.isBudgetSatisfied) return episodes; + + // TODO: Need a way to read from config if we are doing N-to-1 synthesis. + // For now, let's establish the structural skeleton. + + + // Identify the "dying" block of episodes that need to be collected. + // We grab unprotected episodes from oldest to newest. + const unprotectedOldest = episodes.filter( + (ep) => !state.protectedEpisodeIds.has(ep.id), + ); + + if (unprotectedOldest.length === 0) return episodes; + + let targetDeficit = state.deficitTokens; + const episodesToSynthesize: Episode[] = []; + let tokensToSynthesize = 0; + + for (const ep of unprotectedOldest) { + if (tokensToSynthesize >= targetDeficit) break; + episodesToSynthesize.push(ep); + // Rough estimate of tokens in this episode + const epTokens = ep.steps.reduce( + (sum, step) => sum + step.metadata.currentTokens, + ep.trigger.metadata.currentTokens + + (ep.yield?.metadata.currentTokens || 0), + ); + tokensToSynthesize += epTokens; + } + + if (episodesToSynthesize.length === 0) return episodes; + + debugLogger.log( + `StateSnapshotProcessor: Synthesizing ${episodesToSynthesize.length} episodes to recover ~${tokensToSynthesize} tokens.`, + ); + + // TODO: Perform the LLM call using this.config.getBaseLlmClient() + // For now, we will create a dummy structural snapshot to prove the topological transformation works. + + const mockSnapshotText = ` + +Synthesized ${episodesToSynthesize.length} episodes. +This is where the LLM's highly structured state representation will live. +`; + + const snapshotTokens = estimateTokenCountSync([{ text: mockSnapshotText }]); + + const snapshotEpisode: Episode = { + id: randomUUID(), + timestamp: Date.now(), + trigger: { + id: randomUUID(), + type: 'SYSTEM_EVENT', + name: 'world_state_snapshot', + payload: { + originalEpisodeCount: episodesToSynthesize.length, + recoveredTokens: tokensToSynthesize, + }, + metadata: { + originalTokens: snapshotTokens, + currentTokens: snapshotTokens, + transformations: [{processorName: 'StateSnapshotProcessor', action: 'SYNTHESIZED', timestamp: Date.now()}], + }, + }, + steps: [ + { + id: randomUUID(), + type: 'AGENT_THOUGHT', + text: mockSnapshotText, + metadata: { + originalTokens: snapshotTokens, + currentTokens: snapshotTokens, + transformations: [], + }, + }, + ], + }; + + // Filter out the episodes we synthesized from the main graph. + const synthesizedIds = new Set(episodesToSynthesize.map((e) => e.id)); + const newEpisodes = episodes.filter((ep) => !synthesizedIds.has(ep.id)); + + // Inject the new snapshot right after the protected System Prompt + // (or at the top if no system prompt is protected). + let insertionIndex = 0; + if ( + newEpisodes.length > 0 && + state.protectedEpisodeIds.has(newEpisodes[0].id) + ) { + insertionIndex = 1; + } + + newEpisodes.splice(insertionIndex, 0, snapshotEpisode); + + // Update state + // Accounting state is immutable in the pipeline design, it gets recalculated by ContextManager // (Trigger + Thought roughly) + + return newEpisodes; + } +} diff --git a/packages/core/src/context/workers/asyncContextWorker.ts b/packages/core/src/context/workers/asyncContextWorker.ts new file mode 100644 index 0000000000..12e6c323c6 --- /dev/null +++ b/packages/core/src/context/workers/asyncContextWorker.ts @@ -0,0 +1,18 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { ContextEventBus } from '../eventBus.js'; + +export interface AsyncContextWorker { + /** The unique name of the worker (e.g., 'StateSnapshotWorker') */ + readonly name: string; + + /** Starts listening to the ContextEventBus for background tasks */ + start(bus: ContextEventBus): void; + + /** Stops listening and aborts any pending background tasks */ + stop(): void; +} diff --git a/packages/core/src/context/workers/stateSnapshotWorker.ts b/packages/core/src/context/workers/stateSnapshotWorker.ts new file mode 100644 index 0000000000..6a490bf40f --- /dev/null +++ b/packages/core/src/context/workers/stateSnapshotWorker.ts @@ -0,0 +1,140 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { randomUUID } from 'node:crypto'; +import type { Config } from '../../config/config.js'; +import type { Episode, SnapshotVariant } from '../ir/types.js'; +import type { AsyncContextWorker } from './asyncContextWorker.js'; +import type { ContextEventBus, ContextConsolidationEvent } from '../eventBus.js'; +import { debugLogger } from '../../utils/debugLogger.js'; +import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; + +export class StateSnapshotWorker implements AsyncContextWorker { + name = 'StateSnapshotWorker'; + private bus?: ContextEventBus; + private isSynthesizing = false; + + constructor(_config: Config) {} + + start(bus: ContextEventBus): void { + this.bus = bus; + this.bus.onConsolidationNeeded(this.handleConsolidation.bind(this)); + } + + stop(): void { + if (this.bus) { + // In a real implementation we would `removeListener` here + this.bus = undefined; + } + } + + private async handleConsolidation(event: ContextConsolidationEvent): Promise { + if (this.isSynthesizing || event.targetDeficit <= 0) return; + + // Identify the "dying" block of episodes that need to be collected. + // For now, we assume older episodes are at the front of the array. + // We only want episodes that don't already have a snapshot variant computing/ready. + const unprotectedOldest = event.episodes.filter( + (ep) => !ep.variants?.['snapshot'] + ); + + if (unprotectedOldest.length === 0) return; + + let targetDeficit = event.targetDeficit; + const episodesToSynthesize: Episode[] = []; + let tokensToSynthesize = 0; + + for (const ep of unprotectedOldest) { + if (tokensToSynthesize >= targetDeficit) break; + episodesToSynthesize.push(ep); + // Rough estimate of tokens in this episode + const epTokens = ep.steps.reduce( + (sum, step) => sum + step.metadata.currentTokens, + ep.trigger.metadata.currentTokens + + (ep.yield?.metadata.currentTokens || 0), + ); + tokensToSynthesize += epTokens; + } + + if (episodesToSynthesize.length === 0) return; + + this.isSynthesizing = true; + + try { + debugLogger.log( + `StateSnapshotWorker: Asynchronously synthesizing ${episodesToSynthesize.length} episodes to recover ~${tokensToSynthesize} tokens.`, + ); + + // TODO: Perform the LLM call using this.config.getBaseLlmClient() + // For now, we will create a dummy structural snapshot to prove the topological transformation works. + await new Promise((resolve) => setTimeout(resolve, 500)); // Simulate async work + + const mockSnapshotText = ` + +Synthesized ${episodesToSynthesize.length} episodes. +This is where the LLM's highly structured state representation will live. +`; + + const snapshotTokens = estimateTokenCountSync([{ text: mockSnapshotText }]); + + const replacedEpisodeIds = episodesToSynthesize.map((e) => e.id); + + const snapshotEpisode: Episode = { + id: randomUUID(), + timestamp: Date.now(), + trigger: { + id: randomUUID(), + type: 'SYSTEM_EVENT', + name: 'world_state_snapshot', + payload: { + originalEpisodeCount: episodesToSynthesize.length, + recoveredTokens: tokensToSynthesize, + }, + metadata: { + originalTokens: snapshotTokens, + currentTokens: snapshotTokens, + transformations: [{processorName: 'StateSnapshotWorker', action: 'SYNTHESIZED', timestamp: Date.now()}], + }, + }, + steps: [ + { + id: randomUUID(), + type: 'AGENT_THOUGHT', + text: mockSnapshotText, + metadata: { + originalTokens: snapshotTokens, + currentTokens: snapshotTokens, + transformations: [], + }, + }, + ], + }; + + const variant: SnapshotVariant = { + type: 'snapshot', + status: 'ready', + recoveredTokens: tokensToSynthesize, + episode: snapshotEpisode, + replacedEpisodeIds, + }; + + // Emit the variant for the MOST RECENT episode in the batch, + // since the Opportunistic Swapper sweeps from newest to oldest. + const targetId = replacedEpisodeIds[replacedEpisodeIds.length - 1]; + + if (this.bus) { + this.bus.emitVariantReady({ + targetId, + variantId: 'snapshot', + variant, + }); + } + + } finally { + this.isSynthesizing = false; + } + } +}