not working

This commit is contained in:
Your Name
2026-04-06 17:44:28 +00:00
parent ac6dc1d477
commit aa0deb05a4
14 changed files with 857 additions and 475 deletions
@@ -0,0 +1,94 @@
# Asynchronous Context Management (Dataflow Architecture)
## The Problem
Context management today is an emergency response. When a chat session hits the
maximum token limit (`maxTokens`), the system halts the user's request,
synchronously runs expensive compression pipelines (masking tools, summarizing
text with LLMs), and only proceeds when the token count falls below the limit.
This introduces unacceptable latency and forces trade-offs between speed and
data fidelity.
## The Vision: Eager Subconscious Compute
Instead of a reactive, synchronous pipeline, Context Management should be an
**asynchronous dataflow graph**.
Because we know old memory will _eventually_ need to be degraded or garbage
collected, we should utilize the agent's idle time (while the user is reading or
typing) to proactively compute "degraded variants" of episodes before there is
any context pressure.
### The Three Phases of Memory Lifecycle
#### 1. The Eager Compute Phase (Background / Continuous Streaming)
Context pressure doesn't wait for an episode to finish. If a user pastes a
100k-token file, the budget explodes instantly. Therefore, the dataflow graph is
fed continuously.
- Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped
into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a
`TOOL_EXECUTION` step) and broadcast immediately.
- **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background
workers.**
- They eagerly compute degraded variants on partial episodes. For instance,
`SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the
millisecond it arrives, without waiting for the model to reply.
- It attaches the result to the IR graph as
`Episode#1.trigger.variants.summary`.
- **Result:** This costs the user zero latency. The agent is
"dreaming/consolidating" granular memory chunks in the background, even during
long-running "mono-episodes."
#### 2. Opportunistic Replacement (`retainedTokens` Threshold)
When the active context window crosses the "ideal" size (e.g., 65k tokens):
- The system identifies the oldest episodes that have fallen outside the
`retained` window.
- It checks if they have pre-computed variants (e.g., a `summary` or `masked`
variant).
- If yes, it instantly and silently swaps the raw episode for the degraded
variant.
- **Result:** The context gently decays over time, completely avoiding hard
limits for as long as possible, with zero latency cost.
#### 3. The Pressure Barrier (`maxTokens` Hard Limit)
When the active context window crosses the absolute hard limit (e.g., 150k
tokens)—perhaps because the user pasted a massive file and the background
workers couldn't keep up—the system hits a **Synchronous Barrier**.
At this barrier, the `ContextManager` checks the user's configured
`ContextPressureStrategy` to decide how to unblock the request:
- **Strategy A: `truncate` (The Baseline)**
- _Behavior:_ Instantly drop the oldest episodes until under `maxTokens`.
- _Tradeoff:_ Maximum speed, maximum data loss.
- **Strategy B: `incrementalGc` (Progressive)**
- _Behavior:_ Look for any pre-computed summaries/masks. If none exist,
synchronously block to compute _just enough_ summaries to survive the
current turn.
- _Tradeoff:_ Medium speed, medium data retention.
- **Strategy C: `compress` (State Snapshot)**
- _Behavior:_ Identify the oldest N episodes causing the overflow. If their
N-to-1 World State Snapshot isn't ready yet, **block the user's request**
and force the `StateSnapshotProcessor` to generate it synchronously. Once
generated, replace the N episodes with the 1 snapshot and proceed.
- _Tradeoff:_ Maximum latency, maximum data retention/fidelity.
## Architectural Changes Required
1. **Episode Variants:** Update the `Episode` IR type to support a `variants`
dictionary.
2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to
dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of
`AgentChatHistory`).
3. **Processor Interface:** Change `ContextProcessor` from a synchronous
`process(episodes[])` function to an asynchronous worker that listens to the
event bus, updates the `variants` dictionary, and emits `VARIANT_READY`
events.
4. **Projection Logic:** Update `projectCompressedHistory()` to act as the
Pressure Barrier, reading the user's strategy and either applying ready
variants, waiting for variants, or truncating.
@@ -0,0 +1,144 @@
# Asynchronous Context Management Implementation Plan
This document outlines the step-by-step implementation plan for refactoring
`ContextManager` into a fully asynchronous, event-driven dataflow graph (Eager
Subconscious Compute).
---
## Phase 1: Stable Identity & Incremental IR Mapping
**The Problem:** Currently, `IrMapper.toIr()` is stateless. It generates random
UUIDs for `Episode` and `Step` nodes every time it parses the `Content[]` array.
If the array is rebuilt while an asynchronous processor is computing a summary,
the target ID will be lost, and the variant will be orphaned. **The Goal:**
Episodes must maintain a stable identity across turns so background workers can
confidently attach variants to them.
**Tasks:**
1. **Deterministic Hashing or Stateful Mapping:** Update `IrMapper` to either
generate deterministic UUIDs (e.g., hashing the part text/timestamp) OR make
`ContextManager`'s pristine graph mutable, where new `PUSH` events are
mapped _incrementally_ onto the tail of `this.pristineEpisodes` rather than
rebuilding the whole array.
2. **Test Update:** Ensure `IrMapper` tests verify stable IDs across successive
parse events.
---
## Phase 2: Data Structures & Event Bus
**The Problem:** The system lacks the internal types and communication channels
to support asynchronous variant generation. **The Goal:** Define the `Variant`
schemas and the internal `EventEmitter` that will broadcast graph updates to the
async workers.
**Tasks:**
1. **Variant Types:** Update `packages/core/src/context/ir/types.ts`.
- Add a `variants?: Record<string, Variant>` property to `Episode` and
`Step` (where `Variant` is a discriminated union of `SummaryVariant`,
`MaskedVariant`, `SnapshotVariant`, etc.).
- Include metadata on the variant:
`status: 'computing' | 'ready' | 'failed'`, `promise?: Promise<void>`,
`recoveredTokens: number`.
2. **Event Bus (`ContextEventBus`):**
- Create an internal event emitter in `ContextManager` (using
`events.EventEmitter` or a lightweight alternative).
- Define Events:
- `IR_NODE_CREATED`: Fired when a new Episode/Step is mapped. (Triggers
eager compute).
- `VARIANT_READY`: Fired by a worker when it finishes computing a
summary/snapshot.
- `BUDGET_RETAINED_CROSSED`: Fired when `currentTokens > retainedTokens`.
- `BUDGET_MAX_CROSSED`: Fired when `currentTokens > maxTokens`.
---
## Phase 3: Refactoring Processors into Async Workers
**The Problem:** Processors currently implement a synchronous
`process(episodes, state) -> Promise<Episode[]>` interface and block the main
loop. **The Goal:** Convert them into background workers that listen to the
`ContextEventBus`, perform LLM tasks asynchronously, and emit `VARIANT_READY`.
**Tasks:**
1. **Define `AsyncContextWorker` Interface:**
- `start(bus: ContextEventBus): void`
- `stop(): void`
2. **Implement `SemanticCompressionWorker`:**
- Listens to `IR_NODE_CREATED` (or `BUDGET_RETAINED_CROSSED` for lazier
eager compute).
- Batches old `USER_PROMPT` nodes.
- Calls LLM in background.
- Emits `VARIANT_READY` with the summary string and target Node IDs.
3. **Implement `StateSnapshotWorker`:**
- Listens to `BUDGET_RETAINED_CROSSED`.
- Identifies the N oldest raw episodes.
- Synthesizes them into a single `world_state_snapshot`.
- Emits `VARIANT_READY` containing the new Snapshot Episode and the IDs of
the N episodes it replaces.
4. **Wire Event Listeners:** `ContextManager` listens to `VARIANT_READY` and
updates the pristine graph's `variants` dictionary.
---
## Phase 4: The Projection Engine & Pressure Barrier
**The Problem:** `projectCompressedHistory()` currently runs the synchronous
pipeline. It needs to become the non-blocking opportunistic swapper and the
blocking pressure barrier. **The Goal:** Serve the LLM request instantly using
pre-computed variants, or block strictly according to the user's
`maxPressureStrategy`.
**Tasks:**
1. **Opportunistic Swap (`retainedTokens`):**
- When traversing `this.pristineEpisodes` to build the projected array, if
`currentTokens > retainedTokens`, check the oldest episodes.
- If an episode has a `variant.status === 'ready'`, use the variant's tokens
and text _instead_ of the raw episode.
2. **Pressure Barrier (`maxTokens`):**
- If the projected array is _still_ `> maxTokens` after all ready variants
are applied, hit the Barrier.
- Read `config.getContextManagementConfig().budget.maxPressureStrategy`.
- **If `truncate`:** Instantly drop the oldest episodes from the projection
until under budget. (Fastest).
- **If `incrementalGc`:** Await any variants that are
`status === 'computing'` for the oldest nodes until the deficit is
cleared. If none are computing, force a synchronous masking/truncation.
- **If `compress`:** Await the `StateSnapshotWorker`'s active `Promise`. If
it hasn't started, synchronously invoke it and block until the N-to-1
snapshot is ready.
---
## Phase 5: Configuration & Telemetry
**The Goal:** Expose the new strategies to the user and ensure we can observe
the background workers.
**Tasks:**
1. **Config Schema:** Update `settingsSchema.ts` to include
`maxPressureStrategy: 'truncate' | 'incrementalGc' | 'compress'`.
2. **Telemetry:** Log events when background workers start/finish, including
the tokens saved and the latency of the background task.
3. **Testing:** Write concurrency tests simulating a user typing rapidly while
background summaries are still resolving, ensuring no data corruption or
dropped variants.
---
## Open Questions & Risks
- **API Cost:** Eager compute means we might summarize an episode that the user
_never_ actually hits the context limit for. Should Eager Compute only begin
when `current > retained`, or truly immediately? (Recommendation: Start at
`retained` to save money, but `max` must be high enough above `retained` to
give the async workers time to finish).
- **Race Conditions:** If the user deletes a message via the UI (triggering
`AgentChatHistory.map/flatMap`), we must cleanly abort any pending Promises in
the background workers for those deleted IDs.
@@ -0,0 +1,52 @@
# Asynchronous Context Management: Status Report & Bug Sweep
_Date: End of Day 2 (Subconscious Memory Refactoring Complete)_
## 1. Inventory against Implementation Plan
### ✅ Phase 1: Stable Identity & Incremental IR Mapping (100% Complete)
- **Accomplished:** Implemented an `IdentityMap` (`WeakMap<object, string>`) in `IrMapper`.
- **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based on the underlying `Content` object references. Re-parsing the history array no longer orphans background variants.
- **Testing:** Implemented an explicit `IrMapper.test.ts` unit test proving `WeakMap` identity stability across conversation growth.
### ✅ Phase 2: Data Structures & Event Bus (100% Complete)
- **Accomplished:** Added `variants?: Record<string, Variant>` to `Episode` IR types.
- **Accomplished:** Created `ContextEventBus` class and instantiated it on `ContextManager`.
- **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation) on every `PUSH`.
### ✅ Phase 3: Refactoring Processors into Async Workers (100% Complete)
- **Accomplished:** Defined `AsyncContextWorker` interface.
- **Accomplished:** Refactored `StateSnapshotProcessor` into `StateSnapshotWorker`. It successfully listens to the bus, batches unprotected dying episodes, and emits a `VARIANT_READY` event.
- **Accomplished:** Replaced dummy execution with the actual `config.getBaseLlmClient().generateContent()` API call using `gemini-2.5-flash` and the `LlmRole.UTILITY_COMPRESSOR` telemetry role.
- **Accomplished:** Added robust `try/catch` and extensive `debugLogger.error` / `debugLogger.warn` logging to catch anomalous LLM failures without crashing the main loop.
### ✅ Phase 4.1: Opportunistic Replacement Engine (100% Complete)
- **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse from newest to oldest. When `rollingTokens > retainedTokens`, it successfully swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist.
- **Accomplished:** Implemented the `getWorkingBufferView()` sweep method. It perfectly resolves the N-to-1 Variant Targeting bug by injecting the snapshot and adding all `replacedEpisodeIds` to a `skippedIds` Set, cleanly dropping the older raw nodes from the final projection array.
### ✅ Phase 4.2: The Synchronous Pressure Barrier (100% Complete)
- **Accomplished:** Implemented the hard block at the end of `projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens` after all opportunistic swaps are applied.
- **Accomplished:** Reads the `mngConfig.budget.maxPressureStrategy` flag. Supports `truncate` (instantly dropping oldest unprotected episodes) and safely falls back if `compress` isn't fully wired synchronously yet.
- **Testing:** Wrote `contextManager.barrier.test.ts` to blast the system with ~200k tokens and verify the instant truncation successfully protects the System Prompt (Episode 0) and the current working context.
### ✅ Phase 5: Configuration & Testing (100% Complete)
- **Accomplished:** Exposed `maxPressureStrategy` in `settingsSchema.ts` and replaced the deprecated `incrementalGc` flag across the entire monorepo.
- **Accomplished:** Wrote extensive concurrency component tests in `contextManager.async.test.ts` to prove the async LLM Promise resolution does not block the main user thread, and handles the critical race condition of "User typing while background snapshotting" flawlessly.
---
## 2. Bug Sweep & Architectural Review (Critical Findings Resolved)
Both critical flaws discovered on Day 1 have been completely resolved:
### ✅ Resolved Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting)
**The Fix:** The `getWorkingBufferView()` method tracks a `skippedIds` Set during its sweep. If it chooses a SnapshotVariant, it pushes all `replacedEpisodeIds` into the Set, cleanly skipping the raw text nodes on subsequent iterations.
### ✅ Resolved Bug 2: Infinite RAM Growth (Pristine Graph Accumulation)
**The Fix:** The `checkTriggers()` method now calculates its token budget against the computed `WorkingBufferView` rather than the `pristineEpisodes` array. As soon as an async worker injects a snapshot, the calculated token count plummets natively, breaking the infinite GC loop while leaving the pristine log untouched.
@@ -0,0 +1,86 @@
# Data-Driven Context Pipeline (Sidecar Config)
## 1. Motivation
The Context Management subsystem has grown sophisticated, but its configuration
is currently entangled with the global CLI `Config` god-object and the static
`settingsSchema.ts`. This entanglement causes several problems:
1. **Rigidity:** The order of processors (`ToolMasking` -> `Degradation` ->
`Semantic` -> `Squashing`) is hardcoded in TypeScript.
2. **Hyperparameter Bloat:** Every new tuning knob requires modifying the global
schema, UI dialogs, and types.
3. **Pipeline Isolation:** Background tasks like the `StateSnapshotWorker` were
isolated silos. They managed their own triggers and could not participate in a
sequential data pipeline (e.g. receiving degraded messages as input).
## 2. Vision: The Orthogonal "Forking" Pipeline
We will transition the Context Manager to be entirely configured by an independent,
strictly internal "Sidecar JSON" that represents a Directed Acyclic Graph (DAG) of
**Triggers** and **Processors**.
By completely separating the "Execution Strategy" (when something runs) from the
"Data Transformation Logic" (what it does), we can arbitrarily compose tools.
Crucially, the architecture supports a **"Forking Pipeline" mechanic**:
- **Synchronous Execution:** If all processors in a pipeline return `Episode[]`,
the orchestrator runs them inline and immediately returns the result (e.g. for
instant LLM prompting).
- **Asynchronous Forking (Eventual Consistency):** If a processor returns a
`Promise<Episode[]>` (like a heavy LLM summarizer), the orchestrator immediately
halts the synchronous chain, returns the previously processed state to the caller
so the CLI doesn't freeze, and lets the rest of the pipeline continue resolving
in the background. When it finishes, it caches the result for the *next* turn.
## 3. High-Level Architecture
### A. The Sidecar Schema
The sidecar JSON defines the **Budget** and an array of **Pipelines**.
```json
{
"budget": {
"retainedTokens": 65000,
"maxTokens": 150000
},
"pipelines": [
{
"name": "Immediate Sanitization",
"triggers": ["on_turn"],
"processors": [
{ "processorId": "ToolMaskingProcessor", "options": { "stringLengthThresholdTokens": 8000 } },
{ "processorId": "BlobDegradationProcessor", "options": {} },
{ "processorId": "SemanticCompressionProcessor", "options": { "nodeThresholdTokens": 5000 } }
]
},
{
"name": "Deep Background Compression",
"triggers": [{ "type": "timer", "intervalMs": 5000 }, "budget_exceeded"],
"processors": [
{ "processorId": "HistorySquashingProcessor", "options": { "maxTokensPerNode": 3000 } },
{ "processorId": "StateSnapshotProcessor", "options": {} }
]
}
]
}
```
### B. Processor Registry & Reification
To convert the JSON into a running graph, we use a dynamic registry. Every
processor implements the `ContextProcessor` interface and defines its own explicit Options.
```typescript
export interface ContextProcessor {
process(episodes: Episode[]): Episode[] | Promise<Episode[]>;
}
```
## 4. Implementation Phases
- **Phase 1: Interfaces & Registry:** Define `PipelineDef`, `Trigger`, and a `ProcessorRegistry`.
- **Phase 2: Normalize Workers:** Demote `StateSnapshotWorker` into a standard `StateSnapshotProcessor` so it can be composed in any pipeline array.
- **Phase 3: The Pipeline Orchestrator:** Build the central orchestration engine that listens to triggers, pumps `pristineEpisodes` through the arrays, and handles the Sync/Async forking logic to ensure zero-blocking eventual consistency.
- **Phase 4: ContextManager Integration:** Wire the `ContextManager` to delegate execution and caching to the Orchestrator.
+79 -188
View File
@@ -5,6 +5,7 @@
*/
import type { Content } from '@google/genai';
import type { AgentChatHistory } from '../core/agentChatHistory.js';
import { debugLogger } from '../utils/debugLogger.js';
import { IrMapper } from './ir/mapper.js';
@@ -13,7 +14,7 @@ import type { Episode } from './ir/types.js';
import { ContextEventBus } from './eventBus.js';
import { ContextTracer } from './tracer.js';
import { StateSnapshotWorker } from './workers/stateSnapshotWorker.js';
import type { ContextEnvironment } from './sidecar/environment.js';
@@ -28,46 +29,38 @@ import { SemanticCompressionProcessor } from './processors/semanticCompressionPr
import { HistorySquashingProcessor } from './processors/historySquashingProcessor.js';
export class ContextManager {
// The stateful, pristine Episodic Intermediate Representation graph.
// This allows the agent to remember and summarize continuously without losing data across turns.
private pristineEpisodes: Episode[] = [];
private unsubscribeHistory?: () => void;
private readonly eventBus: ContextEventBus;
// Internal sub-components
// Synchronous processors are instantiated but effectively used as singletons within this class
private workers: AsyncContextWorker[] = [];
constructor(
private sidecar: SidecarConfig,
private env: ContextEnvironment,
private readonly tracer: ContextTracer,
) {
constructor(private sidecar: SidecarConfig, private env: ContextEnvironment, private readonly tracer: ContextTracer) {
this.eventBus = new ContextEventBus();
// Register built-ins
ProcessorRegistry.register({
id: 'ToolMaskingProcessor',
create: (env, opts) => new ToolMaskingProcessor(env, opts as any),
});
ProcessorRegistry.register({
id: 'BlobDegradationProcessor',
create: (env, opts) => new BlobDegradationProcessor(env),
});
ProcessorRegistry.register({
id: 'SemanticCompressionProcessor',
create: (env, opts) => new SemanticCompressionProcessor(env, opts as any),
});
ProcessorRegistry.register({
id: 'HistorySquashingProcessor',
create: (env, opts) => new HistorySquashingProcessor(env, opts as any),
});
ProcessorRegistry.register({
id: 'StateSnapshotWorker',
create: (env, opts) => new StateSnapshotWorker(env),
});
ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) });
ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'StateSnapshotWorker', create: (env, opts) => new StateSnapshotWorker(env) });
this.eventBus.onVariantReady((event) => {
// Find the target episode in the pristine graph
const targetEp = this.pristineEpisodes.find(
(ep) => ep.id === event.targetId,
@@ -77,10 +70,7 @@ export class ContextManager {
targetEp.variants = {};
}
targetEp.variants[event.variantId] = event.variant;
this.tracer.logEvent(
'ContextManager',
`Received async variant [${event.variantId}] for Episode ${event.targetId}`,
);
this.tracer.logEvent('ContextManager', `Received async variant [${event.variantId}] for Episode ${event.targetId}`);
debugLogger.log(
`ContextManager: Received async variant [${event.variantId}] for Episode ${event.targetId}.`,
);
@@ -92,10 +82,7 @@ export class ContextManager {
// Initialize and start background subconscious workers
for (const bgDef of this.sidecar.pipelines.eagerBackground) {
const worker = ProcessorRegistry.get(bgDef.processorId).create(
this.env,
bgDef.options,
) as AsyncContextWorker;
const worker = ProcessorRegistry.get(bgDef.processorId).create(this.env, bgDef.options) as AsyncContextWorker;
worker.start(this.eventBus);
this.workers.push(worker);
}
@@ -128,11 +115,7 @@ export class ContextManager {
// function calls and responses into unified Episodes. Pushing messages
// individually would shatter these episodic boundaries.
this.pristineEpisodes = IrMapper.toIr(chatHistory.get());
this.tracer.logEvent(
'ContextManager',
'Rebuilt pristine graph from chat history update',
{ episodeCount: this.pristineEpisodes.length },
);
this.tracer.logEvent('ContextManager', 'Rebuilt pristine graph from chat history update', { episodeCount: this.pristineEpisodes.length });
this.checkTriggers();
});
}
@@ -141,16 +124,13 @@ export class ContextManager {
if (!this.sidecar.budget) return;
const mngConfig = this.sidecar;
// Calculate tokens based on the *Working Buffer View*, not the raw pristine log.
// This solves Bug 2: The View shrinks when variants are applied, preventing infinite GC loops.
const workingBuffer = this.getWorkingBufferView();
const currentTokens = this.calculateIrTokens(workingBuffer);
this.tracer.logEvent('ContextManager', 'Evaluated triggers', {
currentTokens,
retainedTokens: mngConfig.budget.retainedTokens,
});
this.tracer.logEvent('ContextManager', 'Evaluated triggers', { currentTokens, retainedTokens: mngConfig.budget.retainedTokens });
// 1. Eager Compute Trigger (Continuous Streaming)
// Broadcast the full pristine log to the async workers so they can proactively summarize partial massive files.
@@ -160,18 +140,10 @@ export class ContextManager {
// If we exceed 65k, tell the background processors to opportunistically synthesize the oldest nodes.
if (currentTokens > mngConfig.budget.retainedTokens) {
const deficit = currentTokens - mngConfig.budget.retainedTokens;
this.tracer.logEvent(
'ContextManager',
'Budget crossed. Emitting ConsolidationNeeded',
{ deficit },
);
console.log(
'EMITTING CONSOLIDATION. Buffer:',
workingBuffer.length,
'Deficit:',
deficit,
);
this.tracer.logEvent('ContextManager', 'Budget crossed. Emitting ConsolidationNeeded', { deficit });
console.log('EMITTING CONSOLIDATION. Buffer:', workingBuffer.length, 'Deficit:', deficit);
this.eventBus.emitConsolidationNeeded({
episodes: workingBuffer, // Pass the working buffer so they know what still needs compression
targetDeficit: deficit,
});
@@ -181,7 +153,7 @@ export class ContextManager {
/**
* Generates a computed view of the pristine log.
* Sweeps backwards (newest to oldest), tracking rolling tokens.
* When rollingTokens > retainedTokens, it injects the "best" available ready variant
* When rollingTokens > retainedTokens, it injects the "best" available ready variant
* (snapshot > summary > masked) instead of the raw text.
* Handles N-to-1 variant skipping automatically.
*/
@@ -193,6 +165,7 @@ export class ContextManager {
private async applyProcessorGraphs(episodes: Episode[]): Promise<Episode[]> {
const mngConfig = this.sidecar;
const retainedLimit = mngConfig.budget.retainedTokens;
// If we're incredibly small, maybe we just run the retained graph on everything?
// Let's divide the episodes exactly at the retained boundary.
@@ -204,11 +177,7 @@ export class ContextManager {
for (let i = episodes.length - 1; i >= 0; i--) {
const ep = episodes[i];
const epTokens = this.calculateIrTokens([ep]);
if (
(rollingTokens + epTokens <= retainedLimit &&
normalWindow.length === 0) ||
retainedWindow.length === 0
) {
if ((rollingTokens + epTokens <= retainedLimit && normalWindow.length === 0) || retainedWindow.length === 0) {
// We always put at least the latest episode in the retained window.
// We only add to retainedWindow if we haven't already started the normalWindow (contiguous block).
retainedWindow.unshift(ep);
@@ -236,34 +205,18 @@ export class ContextManager {
// Run Retained Graph
let processedRetained = [...retainedWindow];
for (const def of mngConfig.pipelines.retainedProcessingGraph) {
const processor = ProcessorRegistry.get(def.processorId).create(
this.env,
def.options,
) as ContextProcessor;
this.tracer.logEvent(
'ContextManager',
`Running ${processor.name} on retained window.`,
);
const state = createAccountingState(
this.calculateIrTokens([...normalWindow, ...processedRetained]),
);
const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor;
this.tracer.logEvent('ContextManager', `Running ${processor.name} on retained window.`);
const state = createAccountingState(this.calculateIrTokens([...normalWindow, ...processedRetained]));
processedRetained = await processor.process(processedRetained, state);
}
// Run Normal Graph
let processedNormal = [...normalWindow];
for (const def of mngConfig.pipelines.normalProcessingGraph) {
const processor = ProcessorRegistry.get(def.processorId).create(
this.env,
def.options,
) as ContextProcessor;
this.tracer.logEvent(
'ContextManager',
`Running ${processor.name} on normal window.`,
);
const state = createAccountingState(
this.calculateIrTokens([...processedNormal, ...processedRetained]),
);
const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor;
this.tracer.logEvent('ContextManager', `Running ${processor.name} on normal window.`);
const state = createAccountingState(this.calculateIrTokens([...processedNormal, ...processedRetained]));
processedNormal = await processor.process(processedNormal, state);
}
@@ -273,7 +226,7 @@ export class ContextManager {
public getWorkingBufferView(): Episode[] {
const mngConfig = this.sidecar;
const retainedTokens = mngConfig.budget.retainedTokens;
let currentEpisodes: Episode[] = [];
let rollingTokens = 0;
const skippedIds = new Set<string>();
@@ -281,14 +234,11 @@ export class ContextManager {
for (let i = this.pristineEpisodes.length - 1; i >= 0; i--) {
const ep = this.pristineEpisodes[i];
// If this episode was already replaced by an N-to-1 Snapshot injected earlier in the sweep, skip it entirely!
// This solves Bug 1 (Duplicate Projection).
if (skippedIds.has(ep.id)) {
this.tracer.logEvent(
'ViewGenerator',
`Skipping episode [${ep.id}] due to N-to-1 replacement.`,
);
this.tracer.logEvent('ViewGenerator', `Skipping episode [${ep.id}] due to N-to-1 replacement.`);
continue;
}
@@ -328,16 +278,7 @@ export class ContextManager {
const epTokens = this.calculateIrTokens([projectedEp]);
if (ep.variants) {
console.log(
'Checking variants for',
ep.id,
'rollingTokens:',
rollingTokens,
'retained:',
retainedTokens,
);
}
if (ep.variants) { console.log('Checking variants for', ep.id, 'rollingTokens:', rollingTokens, 'retained:', retainedTokens); }
if (rollingTokens > retainedTokens && ep.variants) {
console.log('EVALUATING VARIANTS FOR', ep.id);
const snapshot = ep.variants['snapshot'];
@@ -354,10 +295,7 @@ export class ContextManager {
for (const id of snapshot.replacedEpisodeIds) {
skippedIds.add(id);
}
this.tracer.logEvent(
'ViewGenerator',
`Episode [${ep.id}] has SnapshotVariant. Selecting variant over raw text. Added [${snapshot.replacedEpisodeIds.join(',')}] to skippedIds.`,
);
this.tracer.logEvent('ViewGenerator', `Episode [${ep.id}] has SnapshotVariant. Selecting variant over raw text. Added [${snapshot.replacedEpisodeIds.join(',')}] to skippedIds.`);
debugLogger.log(
`Opportunistically swapped Episodes [${snapshot.replacedEpisodeIds.join(', ')}] for pre-computed Snapshot variant.`,
);
@@ -385,10 +323,7 @@ export class ContextManager {
},
] as any;
projectedEp.yield = undefined;
this.tracer.logEvent(
'ViewGenerator',
`Episode [${ep.id}] has SummaryVariant. Selecting variant over raw text.`,
);
this.tracer.logEvent('ViewGenerator', `Episode [${ep.id}] has SummaryVariant. Selecting variant over raw text.`);
debugLogger.log(
`Opportunistically swapped Episode ${ep.id} for pre-computed Summary variant.`,
);
@@ -406,10 +341,7 @@ export class ContextManager {
tokens: masked.recoveredTokens || 10,
};
}
this.tracer.logEvent(
'ViewGenerator',
`Episode [${ep.id}] has MaskedVariant. Selecting variant over raw text.`,
);
this.tracer.logEvent('ViewGenerator', `Episode [${ep.id}] has MaskedVariant. Selecting variant over raw text.`);
debugLogger.log(
`Opportunistically swapped Episode ${ep.id} for pre-computed Masked variant.`,
);
@@ -420,6 +352,7 @@ export class ContextManager {
rollingTokens += this.calculateIrTokens([projectedEp]);
}
return currentEpisodes;
}
@@ -438,107 +371,75 @@ export class ContextManager {
// Get the dynamically computed Working Buffer View
let currentEpisodes = this.getWorkingBufferView();
currentEpisodes = await this.applyProcessorGraphs(currentEpisodes);
let currentTokens = this.calculateIrTokens(currentEpisodes);
if (currentTokens <= maxTokens) {
this.tracer.logEvent(
'ContextManager',
`View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`,
);
this.tracer.logEvent('ContextManager', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`);
return this._projectAndDump(IrMapper.fromIr(currentEpisodes));
}
this.tracer.logEvent(
'ContextManager',
`View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.gcBackstop.strategy}`,
);
this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.gcBackstop.strategy}`);
// --- The Synchronous Pressure Barrier ---
// The background eager workers couldn't keep up, or a massive file was pasted.
// The Working Buffer View is still over the absolute hard limit (maxTokens).
// We MUST reduce tokens before returning, or the API request will 400.
debugLogger.log(
`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.gcBackstop.strategy}`,
);
// Calculate target based on gcTarget
let targetTokens = maxTokens;
if (mngConfig.gcBackstop.target === 'max') {
targetTokens = mngConfig.budget.retainedTokens;
targetTokens = mngConfig.budget.retainedTokens;
} else if (mngConfig.gcBackstop.target === 'freeNTokens') {
targetTokens =
maxTokens - (mngConfig.gcBackstop.freeTokensTarget ?? 10000);
targetTokens = maxTokens - (mngConfig.gcBackstop.freeTokensTarget ?? 10000);
}
// Structural invariant: We ALWAYS protect the architectural initialization turn (Turn 0)
// We do NOT arbitrarily protect recent episodes (like currentEpisodes.length - 1)
// because an episode can be unboundedly large, and protecting it would crash the LLM.
const protectedEpisodeId =
this.pristineEpisodes.length > 0 ? this.pristineEpisodes[0].id : null;
const protectedEpisodeId = this.pristineEpisodes.length > 0 ? this.pristineEpisodes[0].id : null;
let remainingTokens = currentTokens;
const truncated: Episode[] = [];
const strategy = mngConfig.gcBackstop.strategy;
for (const ep of currentEpisodes) {
const epTokens = this.calculateIrTokens([ep]);
if (remainingTokens > targetTokens && ep.id !== protectedEpisodeId) {
console.log(
'DROPPING EPISODE:',
ep.id,
'rem:',
remainingTokens,
'tgt:',
targetTokens,
);
console.log('DROPPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens);
remainingTokens -= epTokens;
if (strategy === 'truncate') {
this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`);
debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`);
} else if (strategy === 'compress') {
this.tracer.logEvent(
'Barrier',
`Compress fallback to truncate for [${ep.id}].`,
);
debugLogger.warn(
`Synchronous compress barrier not fully implemented, truncating Episode ${ep.id}.`,
);
} else if (strategy === 'rollingSummarizer') {
this.tracer.logEvent(
'Barrier',
`RollingSummarizer fallback to truncate for [${ep.id}].`,
);
debugLogger.warn(
`Synchronous rollingSummarizer barrier not fully implemented, truncating Episode ${ep.id}.`,
);
}
remainingTokens -= epTokens;
if (strategy === 'truncate') {
this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`);
debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`);
} else if (strategy === 'compress') {
this.tracer.logEvent('Barrier', `Compress fallback to truncate for [${ep.id}].`);
debugLogger.warn(`Synchronous compress barrier not fully implemented, truncating Episode ${ep.id}.`);
} else if (strategy === 'rollingSummarizer') {
this.tracer.logEvent('Barrier', `RollingSummarizer fallback to truncate for [${ep.id}].`);
debugLogger.warn(`Synchronous rollingSummarizer barrier not fully implemented, truncating Episode ${ep.id}.`);
}
} else {
console.log(
'KEEPING EPISODE:',
ep.id,
'rem:',
remainingTokens,
'tgt:',
targetTokens,
);
truncated.push(ep);
console.log('KEEPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens);
truncated.push(ep);
}
}
currentEpisodes = truncated;
const finalTokens = this.calculateIrTokens(currentEpisodes);
this.tracer.logEvent(
'ContextManager',
`Finished projection. Final token count: ${finalTokens}.`,
);
this.tracer.logEvent('ContextManager', `Finished projection. Final token count: ${finalTokens}.`);
debugLogger.log(
`Context Manager finished. Final actual token count: ${finalTokens}.`,
);
@@ -551,20 +452,10 @@ export class ContextManager {
try {
const fs = await import('node:fs/promises');
const path = await import('node:path');
const dumpPath = path.join(
this.env.getTraceDir(),
'.gemini',
'projected_context.json',
);
const dumpPath = path.join(this.env.getTraceDir(), '.gemini', 'projected_context.json');
await fs.mkdir(path.dirname(dumpPath), { recursive: true });
await fs.writeFile(
dumpPath,
JSON.stringify(contents, null, 2),
'utf-8',
);
debugLogger.log(
`[Observability] Context successfully dumped to ${dumpPath}`,
);
await fs.writeFile(dumpPath, JSON.stringify(contents, null, 2), 'utf-8');
debugLogger.log(`[Observability] Context successfully dumped to ${dumpPath}`);
} catch (e) {
debugLogger.error(`Failed to dump context: ${e}`);
}
@@ -0,0 +1,159 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextProcessor, ContextAccountingState } from '../pipeline.js';
import type { Episode, ToolExecution } from '../ir/types.js';
import type { ContextEnvironment, ContextEventBus, ContextTracer } from '../sidecar/environment.js';
import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js';
import { v4 as uuidv4 } from 'uuid';
import { LlmRole } from '../../telemetry/llmRole.js';
export interface StateSnapshotProcessorOptions {
model?: string;
systemInstruction?: string;
triggerDeficitTokens?: number;
}
export class StateSnapshotProcessor implements ContextProcessor {
static create(env: ContextEnvironment, options: StateSnapshotProcessorOptions): StateSnapshotProcessor {
return new StateSnapshotProcessor(env, options, (env as any).getEventBus());
}
readonly id = 'StateSnapshotProcessor';
readonly name = 'StateSnapshotProcessor';
readonly options: StateSnapshotProcessorOptions;
private readonly env: ContextEnvironment;
private readonly eventBus: ContextEventBus;
private tracer?: ContextTracer;
private isSynthesizing = false;
constructor(
env: ContextEnvironment,
options: StateSnapshotProcessorOptions,
eventBus: ContextEventBus,
) {
this.env = env;
this.options = options;
this.eventBus = eventBus;
}
async process(episodes: Episode[], state: ContextAccountingState): Promise<Episode[]> {
const targetDeficit = Math.max(0, state.currentTokens - state.retainedTokens);
if (this.isSynthesizing || targetDeficit <= 0) return episodes;
this.isSynthesizing = true;
try {
let deficitAccumulator = 0;
const selectedEpisodes: Episode[] = [];
for (let i = 1; i < episodes.length - 1; i++) {
const ep = episodes[i];
selectedEpisodes.push(ep);
deficitAccumulator += estimateTokenCountSync([
{ text: ep.trigger?.semanticParts?.[0]?.text ?? '' },
{ text: ep.yield?.text ?? '' },
]);
if (deficitAccumulator >= targetDeficit) break;
}
if (selectedEpisodes.length < 2) return episodes; // Not enough context to summarize
// Optimization: Do NOT emit VariantComputing, let the Orchestrator handle caching the final result.
const snapshotEp: Episode = await this.synthesizeSnapshot(selectedEpisodes);
const newEpisodes = [...episodes];
// Calculate indices to splice
const firstIndex = newEpisodes.findIndex(e => e.id === selectedEpisodes[0].id);
if (firstIndex !== -1) {
newEpisodes.splice(firstIndex, selectedEpisodes.length, snapshotEp);
}
return newEpisodes;
} finally {
this.isSynthesizing = false;
}
}
private async synthesizeSnapshot(episodes: Episode[]): Promise<Episode> {
const client = this.env.getLlmClient();
const systemPrompt =
this.options.systemInstruction ??
`You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant.
Your task is to synthesize these turns into a single, dense, factual snapshot that preserves all critical context, preferences, active tasks, and factual knowledge, but discards conversational filler, pleasantries, and redundant back-and-forth iterations.
Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`;
let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n';
for (const ep of episodes) {
if (ep.trigger) {
userPromptText += `USER: ${ep.trigger.semanticParts?.map((p: any) => p.text).join('')}\n`;
}
for (const step of ep.steps) {
if (step.type === 'TOOL_EXECUTION') {
userPromptText += `[Tool Called: ${(step as ToolExecution).toolName}]\n`;
}
}
if (ep.yield) {
userPromptText += `ASSISTANT: ${ep.yield.text}\n`;
}
userPromptText += '\n';
}
try {
const response = await client.generateContent(
{
modelConfigKey: { model: 'state-snapshot-processor' },
contents: [{ role: 'user', parts: [{ text: userPromptText }] }],
systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] },
promptId: this.env.getPromptId(),
role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR,
abortSignal: new AbortController().signal,
},
);
const snapshotText = response.text;
// Synthesize a new "Episode" representing this compressed block
const newId = uuidv4();
const contentTokens = estimateTokenCountSync([{ text: snapshotText }]);
return {
id: newId,
timestamp: Date.now(),
trigger: {
id: `${newId}-t`,
type: 'USER_PROMPT',
semanticParts: [],
metadata: {
originalTokens: 0,
currentTokens: 0,
transformations: [],
},
},
steps: [],
yield: {
id: `${newId}-y`,
type: 'AGENT_YIELD',
text: `<CONTEXT_SNAPSHOT>\n${snapshotText}\n</CONTEXT_SNAPSHOT>`,
metadata: {
originalTokens: contentTokens,
currentTokens: contentTokens,
transformations: [{ processorName: 'StateSnapshotProcessor', action: 'SYNTHESIZED', timestamp: Date.now() }],
},
},
};
} catch (error) {
if (this.tracer) {
this.tracer.logEvent('WorkerError', 'Snapshot synthesis failed', {
error: error instanceof Error ? error.message : String(error),
});
}
console.error('Failed to synthesize snapshot:', error);
throw error;
}
}
}
@@ -3,15 +3,18 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextTracer } from '../tracer.js';
import type { ContextEventBus } from '../eventBus.js';
export type { ContextTracer, ContextEventBus };
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextTracer } from '../tracer.js';
export interface ContextEnvironment {
export interface ContextEnvironment {
getLlmClient(): BaseLlmClient;
getPromptId(): string;
getSessionId(): string;
getTraceDir(): string;
getProjectTempDir(): string;
getEventBus(): ContextEventBus;
getTracer(): ContextTracer;
getCharsPerToken(): number;
}
@@ -8,16 +8,31 @@ import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextTracer } from '../tracer.js';
import type { ContextEnvironment } from './environment.js';
import type { ContextEventBus } from '../eventBus.js';
export class ContextEnvironmentImpl implements ContextEnvironment {
private eventBus?: ContextEventBus;
constructor(
private llmClient: BaseLlmClient,
private sessionId: string,
private promptId: string,
private traceDir: string,
private tempDir: string,
private tracer: ContextTracer,
private charsPerToken: number,
) {}
// TODO(joshualitt): Idiomatic getters and setters
setEventBus(bus: ContextEventBus) {
this.eventBus = bus;
}
getEventBus(): ContextEventBus {
if (!this.eventBus) throw new Error('EventBus not bound');
return this.eventBus;
}
getLlmClient(): BaseLlmClient {
return this.llmClient;
}
@@ -41,4 +56,8 @@ export class ContextEnvironmentImpl implements ContextEnvironment {
getCharsPerToken(): number {
return this.charsPerToken;
}
getPromptId(): string {
return this.promptId;
}
}
@@ -0,0 +1,181 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Episode } from '../ir/types.js';
import type { ContextProcessor, ContextAccountingState } from '../pipeline.js';
import type { SidecarConfig, PipelineDef } from './types.js';
import type { ContextEnvironment, ContextEventBus, ContextTracer } from './environment.js';
import { ProcessorRegistry } from './registry.js';
import { debugLogger } from '../../utils/debugLogger.js';
export class PipelineOrchestrator {
private activeTimers: NodeJS.Timeout[] = [];
private readonly instantiatedProcessors = new Map<string, ContextProcessor>();
constructor(
private readonly config: SidecarConfig,
private readonly env: ContextEnvironment,
private readonly eventBus: ContextEventBus,
private readonly tracer: ContextTracer
) {
this.instantiateProcessors();
this.registerTriggers();
}
/**
* Pre-loads and configures all processors defined in the sidecar config.
*/
private instantiateProcessors() {
for (const pipeline of this.config.pipelines) {
for (const procDef of pipeline.processors) {
if (!this.instantiatedProcessors.has(procDef.processorId)) {
const processorClass = ProcessorRegistry.get(procDef.processorId);
if (!processorClass) {
throw new Error(`Unknown processor ID: ${procDef.processorId}`);
}
// The Orchestrator injects standard dependencies required by processors
// If a processor needs the eventBus (like Snapshot), it expects it via constructor.
const instance = processorClass.create(this.env, procDef.options) as unknown as ContextProcessor;
this.instantiatedProcessors.set(procDef.processorId, instance);
}
}
}
}
/**
* Sets up listeners for the triggers defined in the SidecarConfig.
*/
private registerTriggers() {
for (const pipeline of this.config.pipelines) {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {
this.executePipelineAsync(pipeline);
}, trigger.intervalMs);
this.activeTimers.push(timer);
} else if (trigger === 'budget_exceeded') {
this.eventBus.onConsolidationNeeded(() => {
this.executePipelineAsync(pipeline);
});
}
// 'on_turn' and 'post_turn' are handled synchronously via direct calls from the ContextManager.
}
}
}
/**
* Executes a pipeline asynchronously in the background. This is the "Eventual Consistency" path.
* When the pipeline resolves, it emits a VariantReady event to cache the new graph.
*/
private async executePipelineAsync(pipeline: PipelineDef) {
this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`);
// Retrieve the most recent pristine state from the bus.
// The EventBus must hold the current graph state for orchestrated async execution.
const currentState = [];
if (!currentState || currentState.length === 0) return;
// We assume the eventBus or ContextManager keeps accounting state updated.
const state: ContextAccountingState = {
currentTokens: 0,
// This needs to be calculated or passed down. For now, processors re-calculate.
retainedTokens: this.config.budget.retainedTokens,
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: 0,
protectedEpisodeIds: new Set()
};
let currentEpisodes = [...currentState];
for (const procDef of pipeline.processors) {
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
try {
const result = processor.process(currentEpisodes, state);
if (result instanceof Promise) {
currentEpisodes = await result;
} else {
currentEpisodes = result;
}
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error);
return; // Halt pipeline
}
}
// Success! The background pipeline finished.
// Instead of forcing the Orchestrator to emit complex variant geometries,
// we can just emit a "GraphUpdated" or standard "VariantReady" event containing the entire new subset.
// For simplicity right now, if a pipeline runs asynchronously, we emit a "GraphVariant" event.
// this.eventBus.emitGraphVariantReady(currentEpisodes);
}
/**
* Executes a pipeline synchronously. If any processor returns a Promise, this method
* automatically forks that Promise to the background (falling back to async/eventual consistency)
* and immediately returns the synchronous results computed up to that point.
*/
executePipelineForking(pipelineName: string, episodes: Episode[], state: ContextAccountingState): Episode[] {
const pipeline = this.config.pipelines.find(p => p.name === pipelineName);
if (!pipeline) return episodes;
let currentEpisodes = [...episodes];
for (let i = 0; i < pipeline.processors.length; i++) {
const procDef = pipeline.processors[i];
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
try {
const result = processor.process(currentEpisodes, state);
if (result instanceof Promise) {
// *** THE FORK ***
// A processor went Async. We halt the synchronous chain here and return the state as-is.
this.tracer.logEvent('Orchestrator', `Pipeline ${pipeline.name} forked to background at ${procDef.processorId}`);
// Continue resolving the rest of the pipeline in the background.
this.continuePipelineAsync(pipeline, result, i + 1, state).catch(e => {
debugLogger.error(`Background fork of ${pipeline.name} failed:`, e);
});
// Return the strictly synchronous output back to the LLM immediately!
return currentEpisodes;
} else {
currentEpisodes = result;
}
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, error);
return currentEpisodes; // Return what we have so far
}
}
return currentEpisodes;
}
private async continuePipelineAsync(pipeline: PipelineDef, asyncResult: Promise<Episode[]>, startIndex: number, state: ContextAccountingState) {
let currentEpisodes = await asyncResult;
for (let i = startIndex; i < pipeline.processors.length; i++) {
const procDef = pipeline.processors[i];
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
const result = processor.process(currentEpisodes, state);
if (result instanceof Promise) {
currentEpisodes = await result;
} else {
currentEpisodes = result;
}
}
// this.eventBus.emitGraphVariantReady(currentEpisodes);
}
shutdown() {
this.activeTimers.forEach(clearInterval);
}
}
+19 -28
View File
@@ -20,32 +20,23 @@ export const defaultSidecarProfile: SidecarConfig = {
target: 'incremental',
freeTokensTarget: 10000,
},
pipelines: {
eagerBackground: [
{
processorId: 'StateSnapshotWorker',
options: { pollingIntervalMs: 5000 },
},
],
retainedProcessingGraph: [
{
processorId: 'HistorySquashingProcessor',
options: { maxTokensPerNode: 3000 },
},
],
normalProcessingGraph: [
{
processorId: 'ToolMaskingProcessor',
options: { stringLengthThresholdTokens: 8000 },
},
{
processorId: 'BlobDegradationProcessor',
options: {},
},
{
processorId: 'SemanticCompressionProcessor',
options: { nodeThresholdTokens: 5000, contextWindowPercentage: 0.2 },
},
],
},
pipelines: [
{
name: 'Immediate Sanitization',
triggers: ['on_turn'],
processors: [
{ processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 8000 } },
{ processorId: 'BlobDegradationProcessor', options: {} },
{ processorId: 'SemanticCompressionProcessor', options: { nodeThresholdTokens: 5000, contextWindowPercentage: 0.2 } }
]
},
{
name: 'Deep Background Compression',
triggers: [{ type: 'timer', intervalMs: 5000 }, 'budget_exceeded'],
processors: [
{ processorId: 'HistorySquashingProcessor', options: { maxTokensPerNode: 3000 } },
{ processorId: 'StateSnapshotProcessor', options: {} }
]
}
]
};
+13 -19
View File
@@ -15,6 +15,18 @@ export interface ProcessorConfig {
options: Record<string, unknown>;
}
export type PipelineTrigger =
| 'on_turn'
| 'post_turn'
| 'budget_exceeded'
| { type: 'timer'; intervalMs: number };
export interface PipelineDef {
name: string;
triggers: PipelineTrigger[];
processors: ProcessorConfig[];
}
/**
* The Data-Driven Schema for the Context Manager.
*/
@@ -33,23 +45,5 @@ export interface SidecarConfig {
};
/** The execution graphs for context manipulation */
pipelines: {
/**
* Eagerly executes in the background when the 'retainedTokens' boundary is crossed.
* Contains AsyncContextWorkers (e.g. StateSnapshotWorker).
*/
eagerBackground: ProcessorConfig[];
/**
* Executes sequentially to protect the pristine outliers within the retained window.
* Contains ContextProcessors (e.g. HistorySquashingProcessor).
*/
retainedProcessingGraph: ProcessorConfig[];
/**
* Executes sequentially to opportunistically degrade messages older than the retained window.
* Contains ContextProcessors (e.g. ToolMaskingProcessor, SemanticCompressionProcessor).
*/
normalProcessingGraph: ProcessorConfig[];
};
pipelines: PipelineDef[];
}
@@ -8,7 +8,6 @@ import { vi } from 'vitest';
import type { Config } from '../../config/config.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
export function createMockEnvironment(): ContextEnvironment {
return {
getLlmClient: vi.fn().mockReturnValue({
@@ -18,15 +17,12 @@ export function createMockEnvironment(): ContextEnvironment {
}) as any,
getSessionId: vi.fn().mockReturnValue('mock-session'),
getTraceDir: vi.fn().mockReturnValue('/tmp/.gemini/trace'),
getProjectTempDir: vi.fn().mockReturnValue('/tmp'),
getTracer: vi.fn().mockReturnValue({
logEvent: vi.fn(),
saveAsset: vi.fn().mockReturnValue('mock-asset-id'),
}) as any,
getProjectTempDir: vi.fn().mockReturnValue('/tmp/.gemini/tool-outputs'),
getEventBus: vi.fn(),
getTracer: vi.fn(),
getCharsPerToken: vi.fn().mockReturnValue(1),
};
}
import type { Content } from '@google/genai';
import { AgentChatHistory } from '../../core/agentChatHistory.js';
import { ContextManager } from '../contextManager.js';
@@ -1,229 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { randomUUID } from 'node:crypto';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { Episode, SnapshotVariant } from '../ir/types.js';
import type { AsyncContextWorker } from './asyncContextWorker.js';
import type {
ContextEventBus,
ContextConsolidationEvent,
} from '../eventBus.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js';
import { IrMapper } from '../ir/mapper.js';
import { LlmRole } from '../../telemetry/llmRole.js';
import type { ContextTracer } from '../tracer.js';
export class StateSnapshotWorker implements AsyncContextWorker {
name = 'StateSnapshotWorker';
private bus?: ContextEventBus;
private tracer?: ContextTracer;
private isSynthesizing = false;
constructor(private readonly env: ContextEnvironment) {}
start(bus: ContextEventBus, tracer?: ContextTracer): void {
console.log('Worker start() called with bus:', !!bus);
this.bus = bus;
this.tracer = tracer;
this.bus.onConsolidationNeeded(this.handleConsolidation.bind(this));
}
stop(): void {
if (this.bus) {
// In a real implementation we would `removeListener` here
this.bus = undefined;
}
}
private async handleConsolidation(
event: ContextConsolidationEvent,
): Promise<void> {
console.log(
`Worker handling consolidation. targetDeficit: ${event.targetDeficit}, isSynthesizing: ${this.isSynthesizing}`,
);
if (this.isSynthesizing || event.targetDeficit <= 0) return;
// Identify the "dying" block of episodes that need to be collected.
// For now, we assume older episodes are at the front of the array.
// We only want episodes that don't already have a snapshot variant computing/ready.
const unprotectedOldest = event.episodes.filter(
(ep) => !ep.variants?.['snapshot'],
);
if (unprotectedOldest.length === 0) {
return;
}
let targetDeficit = event.targetDeficit;
const episodesToSynthesize: Episode[] = [];
let tokensToSynthesize = 0;
for (const ep of unprotectedOldest) {
console.log('Worker considering episode:', ep.id);
if (tokensToSynthesize >= targetDeficit) break;
episodesToSynthesize.push(ep);
// Rough estimate of tokens in this episode
const epTokens = ep.steps.reduce(
(sum, step) => sum + step.metadata.currentTokens,
ep.trigger.metadata.currentTokens +
(ep.yield?.metadata.currentTokens || 0),
);
tokensToSynthesize += epTokens;
}
if (episodesToSynthesize.length === 0) return;
console.log(
`Worker synthesized logic loop complete. Selected ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`,
);
this.isSynthesizing = true;
try {
debugLogger.log(
`StateSnapshotWorker: Asynchronously synthesizing ${episodesToSynthesize.length} episodes to recover ~${tokensToSynthesize} tokens.`,
);
this.tracer?.logEvent(
'StateSnapshotWorker',
`Consolidation requested. Synthesizing ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`,
);
const client = this.env.getLlmClient();
const rawContents = IrMapper.fromIr(episodesToSynthesize);
const rawAssetId = this.tracer?.saveAsset(
'StateSnapshotWorker',
'episodes_to_synthesize',
rawContents,
);
this.tracer?.logEvent(
'StateSnapshotWorker',
'Dispatching LLM request for snapshot generation',
{ rawAssetId },
);
const promptText = `
You are a background memory consolidation worker for an AI assistant.
Your task is to review the following block of the oldest conversation history and synthesize it into a highly dense, accurate "World State Snapshot".
This snapshot will completely replace these old memories.
Preserve all critical facts, technical decisions, file paths, and outstanding tasks. Discard all conversational filler.
Conversation History to Synthesize:
${JSON.stringify(rawContents, null, 2).slice(0, 50000)}
Output the snapshot as a dense, structured summary.`;
const response = await client.generateContent({
modelConfigKey: { model: 'gemini-2.5-flash' }, // Fast and cheap for background tasks
contents: [{ role: 'user', parts: [{ text: promptText }] }],
promptId: 'async-world-state-snapshot',
role: LlmRole.UTILITY_COMPRESSOR,
abortSignal: new AbortController().signal, // Run in background, could add cancellation logic later
});
// Extract text safely from the GenAI response
const snapshotText = response.text;
const responseAssetId = this.tracer?.saveAsset(
'StateSnapshotWorker',
'snapshot_response',
snapshotText || '',
);
this.tracer?.logEvent('StateSnapshotWorker', 'Received LLM response', {
responseAssetId,
});
if (!snapshotText) {
debugLogger.warn(
'StateSnapshotWorker: LLM returned empty response for snapshot generation.',
);
}
const mockSnapshotText = `
<world_state_snapshot>
${snapshotText || '[Failed to generate snapshot]'}
</world_state_snapshot>`;
const snapshotTokens = estimateTokenCountSync(
[{ text: mockSnapshotText }],
0,
{ charsPerToken: this.env.getCharsPerToken() },
);
const replacedEpisodeIds = episodesToSynthesize.map((e) => e.id);
const snapshotEpisode: Episode = {
id: randomUUID(),
timestamp: Date.now(),
trigger: {
id: randomUUID(),
type: 'SYSTEM_EVENT',
name: 'world_state_snapshot',
payload: {
originalEpisodeCount: episodesToSynthesize.length,
recoveredTokens: tokensToSynthesize,
},
metadata: {
originalTokens: snapshotTokens,
currentTokens: snapshotTokens,
transformations: [
{
processorName: 'StateSnapshotWorker',
action: 'SYNTHESIZED',
timestamp: Date.now(),
},
],
},
},
steps: [
{
id: randomUUID(),
type: 'AGENT_THOUGHT',
text: mockSnapshotText,
metadata: {
originalTokens: snapshotTokens,
currentTokens: snapshotTokens,
transformations: [],
},
},
],
};
const variant: SnapshotVariant = {
type: 'snapshot',
status: 'ready',
recoveredTokens: tokensToSynthesize,
episode: snapshotEpisode,
replacedEpisodeIds,
};
// Emit the variant for the MOST RECENT episode in the batch,
// since the Opportunistic Swapper sweeps from newest to oldest.
const targetId = replacedEpisodeIds[replacedEpisodeIds.length - 1];
if (this.bus) {
this.tracer?.logEvent(
'StateSnapshotWorker',
`Emitting VARIANT_READY for targetId [${targetId}]`,
);
this.bus.emitVariantReady({
targetId,
variantId: 'snapshot',
variant,
});
} else {
debugLogger.warn(
'StateSnapshotWorker: Event bus disconnected before variant could be emitted.',
);
}
} catch (error) {
debugLogger.error(
`StateSnapshotWorker: Critical failure during snapshot synthesis: ${error instanceof Error ? error.message : String(error)}`,
);
} finally {
this.isSynthesizing = false;
}
}
}
+1
View File
@@ -16,4 +16,5 @@ export enum LlmRole {
UTILITY_EDIT_CORRECTOR = 'utility_edit_corrector',
UTILITY_AUTOCOMPLETE = 'utility_autocomplete',
UTILITY_FAST_ACK_HELPER = 'utility_fast_ack_helper',
UTILITY_STATE_SNAPSHOT_PROCESSOR = 'utility_state_snapshot_processr',
}