speculative changes

This commit is contained in:
Your Name
2026-04-05 01:46:19 +00:00
parent 5873d7f6d2
commit f4e3132af9
21 changed files with 524 additions and 380 deletions
+3 -3
View File
@@ -887,15 +887,15 @@ export async function loadCliConfig(
const useGeneralistProfile =
settings.experimental?.generalistProfile ?? false;
const usePowerUserProfile =
settings.experimental?.powerUserProfile ?? false;
const usePowerUserProfile = settings.experimental?.powerUserProfile ?? false;
const useContextManagement =
settings.experimental?.contextManagement ?? false;
const contextManagement = {
...(useGeneralistProfile ? GENERALIST_PROFILE : {}),
...(usePowerUserProfile ? POWER_USER_PROFILE : {}),
...(useContextManagement ? settings?.contextManagement : {}),
enabled: useContextManagement || useGeneralistProfile || usePowerUserProfile,
enabled:
useContextManagement || useGeneralistProfile || usePowerUserProfile,
};
return new Config({
+2 -1
View File
@@ -2155,7 +2155,8 @@ const SETTINGS_SCHEMA = {
category: 'Experimental',
requiresRestart: true,
default: false,
description: 'Enables continuous minimal GC near the max tokens limit instead of a blocked backbuffer.',
description:
'Enables continuous minimal GC near the max tokens limit instead of a blocked backbuffer.',
showInDialog: true,
},
generalistProfile: {
+1 -1
View File
@@ -1137,7 +1137,7 @@ export class Config implements McpContext, AgentLoopContext {
params.contextManagement?.budget?.protectedEpisodes ?? 1,
protectSystemEpisode:
params.contextManagement?.budget?.protectSystemEpisode ?? true,
incrementalGc: params.contextManagement?.budget?.incrementalGc ?? false,
maxPressureStrategy: params.contextManagement?.budget?.maxPressureStrategy ?? 'truncate',
},
strategies: {
historySquashing: {
+75 -28
View File
@@ -1,47 +1,94 @@
# Asynchronous Context Management (Dataflow Architecture)
## The Problem
Context management today is an emergency response. When a chat session hits the maximum token limit (`maxTokens`), the system halts the user's request, synchronously runs expensive compression pipelines (masking tools, summarizing text with LLMs), and only proceeds when the token count falls below the limit. This introduces unacceptable latency and forces trade-offs between speed and data fidelity.
Context management today is an emergency response. When a chat session hits the
maximum token limit (`maxTokens`), the system halts the user's request,
synchronously runs expensive compression pipelines (masking tools, summarizing
text with LLMs), and only proceeds when the token count falls below the limit.
This introduces unacceptable latency and forces trade-offs between speed and
data fidelity.
## The Vision: Eager Subconscious Compute
Instead of a reactive, synchronous pipeline, Context Management should be an **asynchronous dataflow graph**.
Because we know old memory will *eventually* need to be degraded or garbage collected, we should utilize the agent's idle time (while the user is reading or typing) to proactively compute "degraded variants" of episodes before there is any context pressure.
Instead of a reactive, synchronous pipeline, Context Management should be an
**asynchronous dataflow graph**.
Because we know old memory will _eventually_ need to be degraded or garbage
collected, we should utilize the agent's idle time (while the user is reading or
typing) to proactively compute "degraded variants" of episodes before there is
any context pressure.
### The Three Phases of Memory Lifecycle
#### 1. The Eager Compute Phase (Background / Continuous Streaming)
Context pressure doesn't wait for an episode to finish. If a user pastes a 100k-token file, the budget explodes instantly. Therefore, the dataflow graph is fed continuously.
* Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a `TOOL_EXECUTION` step) and broadcast immediately.
* **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background workers.**
* They eagerly compute degraded variants on partial episodes. For instance, `SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the millisecond it arrives, without waiting for the model to reply.
* It attaches the result to the IR graph as `Episode#1.trigger.variants.summary`.
* **Result:** This costs the user zero latency. The agent is "dreaming/consolidating" granular memory chunks in the background, even during long-running "mono-episodes."
Context pressure doesn't wait for an episode to finish. If a user pastes a
100k-token file, the budget explodes instantly. Therefore, the dataflow graph is
fed continuously.
- Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped
into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a
`TOOL_EXECUTION` step) and broadcast immediately.
- **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background
workers.**
- They eagerly compute degraded variants on partial episodes. For instance,
`SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the
millisecond it arrives, without waiting for the model to reply.
- It attaches the result to the IR graph as
`Episode#1.trigger.variants.summary`.
- **Result:** This costs the user zero latency. The agent is
"dreaming/consolidating" granular memory chunks in the background, even during
long-running "mono-episodes."
#### 2. Opportunistic Replacement (`retainedTokens` Threshold)
When the active context window crosses the "ideal" size (e.g., 65k tokens):
* The system identifies the oldest episodes that have fallen outside the `retained` window.
* It checks if they have pre-computed variants (e.g., a `summary` or `masked` variant).
* If yes, it instantly and silently swaps the raw episode for the degraded variant.
* **Result:** The context gently decays over time, completely avoiding hard limits for as long as possible, with zero latency cost.
- The system identifies the oldest episodes that have fallen outside the
`retained` window.
- It checks if they have pre-computed variants (e.g., a `summary` or `masked`
variant).
- If yes, it instantly and silently swaps the raw episode for the degraded
variant.
- **Result:** The context gently decays over time, completely avoiding hard
limits for as long as possible, with zero latency cost.
#### 3. The Pressure Barrier (`maxTokens` Hard Limit)
When the active context window crosses the absolute hard limit (e.g., 150k tokens)—perhaps because the user pasted a massive file and the background workers couldn't keep up—the system hits a **Synchronous Barrier**.
At this barrier, the `ContextManager` checks the user's configured `ContextPressureStrategy` to decide how to unblock the request:
When the active context window crosses the absolute hard limit (e.g., 150k
tokens)—perhaps because the user pasted a massive file and the background
workers couldn't keep up—the system hits a **Synchronous Barrier**.
* **Strategy A: `truncate` (The Baseline)**
* *Behavior:* Instantly drop the oldest episodes until under `maxTokens`.
* *Tradeoff:* Maximum speed, maximum data loss.
* **Strategy B: `incrementalGc` (Progressive)**
* *Behavior:* Look for any pre-computed summaries/masks. If none exist, synchronously block to compute *just enough* summaries to survive the current turn.
* *Tradeoff:* Medium speed, medium data retention.
* **Strategy C: `compress` (State Snapshot)**
* *Behavior:* Identify the oldest N episodes causing the overflow. If their N-to-1 World State Snapshot isn't ready yet, **block the user's request** and force the `StateSnapshotProcessor` to generate it synchronously. Once generated, replace the N episodes with the 1 snapshot and proceed.
* *Tradeoff:* Maximum latency, maximum data retention/fidelity.
At this barrier, the `ContextManager` checks the user's configured
`ContextPressureStrategy` to decide how to unblock the request:
- **Strategy A: `truncate` (The Baseline)**
- _Behavior:_ Instantly drop the oldest episodes until under `maxTokens`.
- _Tradeoff:_ Maximum speed, maximum data loss.
- **Strategy B: `incrementalGc` (Progressive)**
- _Behavior:_ Look for any pre-computed summaries/masks. If none exist,
synchronously block to compute _just enough_ summaries to survive the
current turn.
- _Tradeoff:_ Medium speed, medium data retention.
- **Strategy C: `compress` (State Snapshot)**
- _Behavior:_ Identify the oldest N episodes causing the overflow. If their
N-to-1 World State Snapshot isn't ready yet, **block the user's request**
and force the `StateSnapshotProcessor` to generate it synchronously. Once
generated, replace the N episodes with the 1 snapshot and proceed.
- _Tradeoff:_ Maximum latency, maximum data retention/fidelity.
## Architectural Changes Required
1. **Episode Variants:** Update the `Episode` IR type to support a `variants` dictionary.
2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of `AgentChatHistory`).
3. **Processor Interface:** Change `ContextProcessor` from a synchronous `process(episodes[])` function to an asynchronous worker that listens to the event bus, updates the `variants` dictionary, and emits `VARIANT_READY` events.
4. **Projection Logic:** Update `projectCompressedHistory()` to act as the Pressure Barrier, reading the user's strategy and either applying ready variants, waiting for variants, or truncating.
1. **Episode Variants:** Update the `Episode` IR type to support a `variants`
dictionary.
2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to
dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of
`AgentChatHistory`).
3. **Processor Interface:** Change `ContextProcessor` from a synchronous
`process(episodes[])` function to an asynchronous worker that listens to the
event bus, updates the `variants` dictionary, and emits `VARIANT_READY`
events.
4. **Projection Logic:** Update `projectCompressedHistory()` to act as the
Pressure Barrier, reading the user's strategy and either applying ready
variants, waiting for variants, or truncating.
@@ -1,86 +1,144 @@
# Asynchronous Context Management Implementation Plan
This document outlines the step-by-step implementation plan for refactoring `ContextManager` into a fully asynchronous, event-driven dataflow graph (Eager Subconscious Compute).
This document outlines the step-by-step implementation plan for refactoring
`ContextManager` into a fully asynchronous, event-driven dataflow graph (Eager
Subconscious Compute).
---
## Phase 1: Stable Identity & Incremental IR Mapping
**The Problem:** Currently, `IrMapper.toIr()` is stateless. It generates random UUIDs for `Episode` and `Step` nodes every time it parses the `Content[]` array. If the array is rebuilt while an asynchronous processor is computing a summary, the target ID will be lost, and the variant will be orphaned.
**The Goal:** Episodes must maintain a stable identity across turns so background workers can confidently attach variants to them.
**The Problem:** Currently, `IrMapper.toIr()` is stateless. It generates random
UUIDs for `Episode` and `Step` nodes every time it parses the `Content[]` array.
If the array is rebuilt while an asynchronous processor is computing a summary,
the target ID will be lost, and the variant will be orphaned. **The Goal:**
Episodes must maintain a stable identity across turns so background workers can
confidently attach variants to them.
**Tasks:**
1. **Deterministic Hashing or Stateful Mapping:** Update `IrMapper` to either generate deterministic UUIDs (e.g., hashing the part text/timestamp) OR make `ContextManager`'s pristine graph mutable, where new `PUSH` events are mapped *incrementally* onto the tail of `this.pristineEpisodes` rather than rebuilding the whole array.
2. **Test Update:** Ensure `IrMapper` tests verify stable IDs across successive parse events.
1. **Deterministic Hashing or Stateful Mapping:** Update `IrMapper` to either
generate deterministic UUIDs (e.g., hashing the part text/timestamp) OR make
`ContextManager`'s pristine graph mutable, where new `PUSH` events are
mapped _incrementally_ onto the tail of `this.pristineEpisodes` rather than
rebuilding the whole array.
2. **Test Update:** Ensure `IrMapper` tests verify stable IDs across successive
parse events.
---
## Phase 2: Data Structures & Event Bus
**The Problem:** The system lacks the internal types and communication channels to support asynchronous variant generation.
**The Goal:** Define the `Variant` schemas and the internal `EventEmitter` that will broadcast graph updates to the async workers.
**The Problem:** The system lacks the internal types and communication channels
to support asynchronous variant generation. **The Goal:** Define the `Variant`
schemas and the internal `EventEmitter` that will broadcast graph updates to the
async workers.
**Tasks:**
1. **Variant Types:** Update `packages/core/src/context/ir/types.ts`.
* Add a `variants?: Record<string, Variant>` property to `Episode` and `Step` (where `Variant` is a discriminated union of `SummaryVariant`, `MaskedVariant`, `SnapshotVariant`, etc.).
* Include metadata on the variant: `status: 'computing' | 'ready' | 'failed'`, `promise?: Promise<void>`, `recoveredTokens: number`.
- Add a `variants?: Record<string, Variant>` property to `Episode` and
`Step` (where `Variant` is a discriminated union of `SummaryVariant`,
`MaskedVariant`, `SnapshotVariant`, etc.).
- Include metadata on the variant:
`status: 'computing' | 'ready' | 'failed'`, `promise?: Promise<void>`,
`recoveredTokens: number`.
2. **Event Bus (`ContextEventBus`):**
* Create an internal event emitter in `ContextManager` (using `events.EventEmitter` or a lightweight alternative).
* Define Events:
* `IR_NODE_CREATED`: Fired when a new Episode/Step is mapped. (Triggers eager compute).
* `VARIANT_READY`: Fired by a worker when it finishes computing a summary/snapshot.
* `BUDGET_RETAINED_CROSSED`: Fired when `currentTokens > retainedTokens`.
* `BUDGET_MAX_CROSSED`: Fired when `currentTokens > maxTokens`.
- Create an internal event emitter in `ContextManager` (using
`events.EventEmitter` or a lightweight alternative).
- Define Events:
- `IR_NODE_CREATED`: Fired when a new Episode/Step is mapped. (Triggers
eager compute).
- `VARIANT_READY`: Fired by a worker when it finishes computing a
summary/snapshot.
- `BUDGET_RETAINED_CROSSED`: Fired when `currentTokens > retainedTokens`.
- `BUDGET_MAX_CROSSED`: Fired when `currentTokens > maxTokens`.
---
## Phase 3: Refactoring Processors into Async Workers
**The Problem:** Processors currently implement a synchronous `process(episodes, state) -> Promise<Episode[]>` interface and block the main loop.
**The Goal:** Convert them into background workers that listen to the `ContextEventBus`, perform LLM tasks asynchronously, and emit `VARIANT_READY`.
**The Problem:** Processors currently implement a synchronous
`process(episodes, state) -> Promise<Episode[]>` interface and block the main
loop. **The Goal:** Convert them into background workers that listen to the
`ContextEventBus`, perform LLM tasks asynchronously, and emit `VARIANT_READY`.
**Tasks:**
1. **Define `AsyncContextWorker` Interface:**
* `start(bus: ContextEventBus): void`
* `stop(): void`
- `start(bus: ContextEventBus): void`
- `stop(): void`
2. **Implement `SemanticCompressionWorker`:**
* Listens to `IR_NODE_CREATED` (or `BUDGET_RETAINED_CROSSED` for lazier eager compute).
* Batches old `USER_PROMPT` nodes.
* Calls LLM in background.
* Emits `VARIANT_READY` with the summary string and target Node IDs.
- Listens to `IR_NODE_CREATED` (or `BUDGET_RETAINED_CROSSED` for lazier
eager compute).
- Batches old `USER_PROMPT` nodes.
- Calls LLM in background.
- Emits `VARIANT_READY` with the summary string and target Node IDs.
3. **Implement `StateSnapshotWorker`:**
* Listens to `BUDGET_RETAINED_CROSSED`.
* Identifies the N oldest raw episodes.
* Synthesizes them into a single `world_state_snapshot`.
* Emits `VARIANT_READY` containing the new Snapshot Episode and the IDs of the N episodes it replaces.
4. **Wire Event Listeners:** `ContextManager` listens to `VARIANT_READY` and updates the pristine graph's `variants` dictionary.
- Listens to `BUDGET_RETAINED_CROSSED`.
- Identifies the N oldest raw episodes.
- Synthesizes them into a single `world_state_snapshot`.
- Emits `VARIANT_READY` containing the new Snapshot Episode and the IDs of
the N episodes it replaces.
4. **Wire Event Listeners:** `ContextManager` listens to `VARIANT_READY` and
updates the pristine graph's `variants` dictionary.
---
## Phase 4: The Projection Engine & Pressure Barrier
**The Problem:** `projectCompressedHistory()` currently runs the synchronous pipeline. It needs to become the non-blocking opportunistic swapper and the blocking pressure barrier.
**The Goal:** Serve the LLM request instantly using pre-computed variants, or block strictly according to the user's `maxPressureStrategy`.
**The Problem:** `projectCompressedHistory()` currently runs the synchronous
pipeline. It needs to become the non-blocking opportunistic swapper and the
blocking pressure barrier. **The Goal:** Serve the LLM request instantly using
pre-computed variants, or block strictly according to the user's
`maxPressureStrategy`.
**Tasks:**
1. **Opportunistic Swap (`retainedTokens`):**
* When traversing `this.pristineEpisodes` to build the projected array, if `currentTokens > retainedTokens`, check the oldest episodes.
* If an episode has a `variant.status === 'ready'`, use the variant's tokens and text *instead* of the raw episode.
- When traversing `this.pristineEpisodes` to build the projected array, if
`currentTokens > retainedTokens`, check the oldest episodes.
- If an episode has a `variant.status === 'ready'`, use the variant's tokens
and text _instead_ of the raw episode.
2. **Pressure Barrier (`maxTokens`):**
* If the projected array is *still* `> maxTokens` after all ready variants are applied, hit the Barrier.
* Read `config.getContextManagementConfig().budget.maxPressureStrategy`.
* **If `truncate`:** Instantly drop the oldest episodes from the projection until under budget. (Fastest).
* **If `incrementalGc`:** Await any variants that are `status === 'computing'` for the oldest nodes until the deficit is cleared. If none are computing, force a synchronous masking/truncation.
* **If `compress`:** Await the `StateSnapshotWorker`'s active `Promise`. If it hasn't started, synchronously invoke it and block until the N-to-1 snapshot is ready.
- If the projected array is _still_ `> maxTokens` after all ready variants
are applied, hit the Barrier.
- Read `config.getContextManagementConfig().budget.maxPressureStrategy`.
- **If `truncate`:** Instantly drop the oldest episodes from the projection
until under budget. (Fastest).
- **If `incrementalGc`:** Await any variants that are
`status === 'computing'` for the oldest nodes until the deficit is
cleared. If none are computing, force a synchronous masking/truncation.
- **If `compress`:** Await the `StateSnapshotWorker`'s active `Promise`. If
it hasn't started, synchronously invoke it and block until the N-to-1
snapshot is ready.
---
## Phase 5: Configuration & Telemetry
**The Goal:** Expose the new strategies to the user and ensure we can observe the background workers.
**The Goal:** Expose the new strategies to the user and ensure we can observe
the background workers.
**Tasks:**
1. **Config Schema:** Update `settingsSchema.ts` to include `maxPressureStrategy: 'truncate' | 'incrementalGc' | 'compress'`.
2. **Telemetry:** Log events when background workers start/finish, including the tokens saved and the latency of the background task.
3. **Testing:** Write concurrency tests simulating a user typing rapidly while background summaries are still resolving, ensuring no data corruption or dropped variants.
1. **Config Schema:** Update `settingsSchema.ts` to include
`maxPressureStrategy: 'truncate' | 'incrementalGc' | 'compress'`.
2. **Telemetry:** Log events when background workers start/finish, including
the tokens saved and the latency of the background task.
3. **Testing:** Write concurrency tests simulating a user typing rapidly while
background summaries are still resolving, ensuring no data corruption or
dropped variants.
---
## Open Questions & Risks
* **API Cost:** Eager compute means we might summarize an episode that the user *never* actually hits the context limit for. Should Eager Compute only begin when `current > retained`, or truly immediately? (Recommendation: Start at `retained` to save money, but `max` must be high enough above `retained` to give the async workers time to finish).
* **Race Conditions:** If the user deletes a message via the UI (triggering `AgentChatHistory.map/flatMap`), we must cleanly abort any pending Promises in the background workers for those deleted IDs.
- **API Cost:** Eager compute means we might summarize an episode that the user
_never_ actually hits the context limit for. Should Eager Compute only begin
when `current > retained`, or truly immediately? (Recommendation: Start at
`retained` to save money, but `max` must be high enough above `retained` to
give the async workers time to finish).
- **Race Conditions:** If the user deletes a message via the UI (triggering
`AgentChatHistory.map/flatMap`), we must cleanly abort any pending Promises in
the background workers for those deleted IDs.
@@ -1,53 +1,102 @@
# Asynchronous Context Management: Status Report & Bug Sweep
*Date: End of Day 1*
_Date: End of Day 1_
## 1. Inventory against Implementation Plan
### ✅ Phase 1: Stable Identity & Incremental IR Mapping (100% Complete)
* **Accomplished:** Implemented an `IdentityMap` (`WeakMap<object, string>`) in `IrMapper`.
* **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based on the underlying `Content` object references. Re-parsing the history array no longer orphans background variants.
- **Accomplished:** Implemented an `IdentityMap` (`WeakMap<object, string>`) in
`IrMapper`.
- **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based
on the underlying `Content` object references. Re-parsing the history array no
longer orphans background variants.
### ✅ Phase 2: Data Structures & Event Bus (100% Complete)
* **Accomplished:** Added `variants?: Record<string, Variant>` to `Episode` IR types.
* **Accomplished:** Created `ContextEventBus` class and instantiated it on `ContextManager`.
* **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation) on every `PUSH`.
- **Accomplished:** Added `variants?: Record<string, Variant>` to `Episode` IR
types.
- **Accomplished:** Created `ContextEventBus` class and instantiated it on
`ContextManager`.
- **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for
Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation)
on every `PUSH`.
### 🔄 Phase 3: Refactoring Processors into Async Workers (80% Complete)
* **Accomplished:** Defined `AsyncContextWorker` interface.
* **Accomplished:** Refactored `StateSnapshotProcessor` into `StateSnapshotWorker`. It successfully listens to the bus, batches unprotected dying episodes, and emits a `VARIANT_READY` event.
* **Pending:** Replace `setTimeout` dummy execution with the actual `config.getBaseLlmClient().generateContent()` API call.
- **Accomplished:** Defined `AsyncContextWorker` interface.
- **Accomplished:** Refactored `StateSnapshotProcessor` into
`StateSnapshotWorker`. It successfully listens to the bus, batches unprotected
dying episodes, and emits a `VARIANT_READY` event.
- **Pending:** Replace `setTimeout` dummy execution with the actual
`config.getBaseLlmClient().generateContent()` API call.
### 🔄 Phase 4.1: Opportunistic Replacement Engine (100% Complete)
* **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse from newest to oldest. When `rollingTokens > retainedTokens`, it successfully swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist.
- **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse
from newest to oldest. When `rollingTokens > retainedTokens`, it successfully
swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist.
### ❌ Phase 4.2: The Synchronous Pressure Barrier (0% Complete)
* **Pending:** Implement the hard block at the end of `projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens` after all opportunistic swaps are applied. Must respect `maxPressureStrategy` (truncate, incrementalGc, compress).
- **Pending:** Implement the hard block at the end of
`projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens`
after all opportunistic swaps are applied. Must respect `maxPressureStrategy`
(truncate, incrementalGc, compress).
### ❌ Phase 5: Configuration & Telemetry (0% Complete)
* **Pending:** Expose `maxPressureStrategy` in `settingsSchema.ts`. Write rigorous concurrency tests.
- **Pending:** Expose `maxPressureStrategy` in `settingsSchema.ts`. Write
rigorous concurrency tests.
---
## 2. Bug Sweep & Architectural Review (Critical Findings)
During our end-of-day audit, we challenged our assumptions and swept the new code. We discovered two critical logic flaws that must be addressed first thing tomorrow:
During our end-of-day audit, we challenged our assumptions and swept the new
code. We discovered two critical logic flaws that must be addressed first thing
tomorrow:
### 🚨 Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting)
**The Flaw:**
In `StateSnapshotWorker`, we synthesize `N` episodes (e.g., Episodes 1, 2, 3) into a single `SnapshotVariant`. We currently attach this variant *only* to the newest episode in the batch (Episode 3) via `targetId`.
When the Opportunistic Swapper loops backwards (`i = 3, 2, 1`), it hits Episode 3, sees the Snapshot, and injects it. But then the loop continues to Episode 2 and Episode 1! Since they don't have the variant attached, the swapper injects them as **raw text**. The final projection contains *both* the snapshot AND the raw text it was supposed to replace.
**The Fix (The Working Buffer Architecture):**
Instead of projecting variants on the fly during a backwards sweep, the `ContextManager` will maintain two separate graphs: an immutable `pristineLog` (for future offloading to the Memory Wheel) and a mutable `workingContext`. When the `StateSnapshotWorker` finishes, it structurally *replaces* the N raw episodes with the 1 Snapshot episode directly in the `workingContext` array. This eliminates the duplicate projection bug entirely.
**The Flaw:** In `StateSnapshotWorker`, we synthesize `N` episodes (e.g.,
Episodes 1, 2, 3) into a single `SnapshotVariant`. We currently attach this
variant _only_ to the newest episode in the batch (Episode 3) via `targetId`.
When the Opportunistic Swapper loops backwards (`i = 3, 2, 1`), it hits Episode
3, sees the Snapshot, and injects it. But then the loop continues to Episode 2
and Episode 1! Since they don't have the variant attached, the swapper injects
them as **raw text**. The final projection contains _both_ the snapshot AND the
raw text it was supposed to replace. **The Fix (The Working Buffer
Architecture):** Instead of projecting variants on the fly during a backwards
sweep, the `ContextManager` will maintain two separate graphs: an immutable
`pristineLog` (for future offloading to the Memory Wheel) and a mutable
`workingContext`. When the `StateSnapshotWorker` finishes, it structurally
_replaces_ the N raw episodes with the 1 Snapshot episode directly in the
`workingContext` array. This eliminates the duplicate projection bug entirely.
### 🚨 Bug 2: Infinite RAM Growth (Pristine Graph Accumulation)
**The Flaw:**
Async variants only replace text in the *Projected* graph. The *Pristine* graph inside `ContextManager` (`this.pristineEpisodes`) never shrinks. Because `checkTriggers()` calculates tokens based on the pristine graph, once the history crosses `retainedTokens` (65k), it will *always* be over 65k, emitting `BUDGET_RETAINED_CROSSED` on every single turn forever.
Furthermore, if we never delete episodes from the pristine graph, the Node.js process will eventually run out of heap memory (OOM) on extremely long sessions.
**The Fix (The Working Buffer Architecture):**
By calculating the token budget against the mutable `workingContext` (which is actively compacted by background snapshots) rather than the immutable `pristineLog`, the token count will successfully drop back below `retainedTokens` (65k). This breaks the infinite event loop and prevents OOM crashes. The `pristineLog` will just grow until the future Memory Subsystem is built to page it to disk.
**The Flaw:** Async variants only replace text in the _Projected_ graph. The
_Pristine_ graph inside `ContextManager` (`this.pristineEpisodes`) never
shrinks. Because `checkTriggers()` calculates tokens based on the pristine
graph, once the history crosses `retainedTokens` (65k), it will _always_ be over
65k, emitting `BUDGET_RETAINED_CROSSED` on every single turn forever.
Furthermore, if we never delete episodes from the pristine graph, the Node.js
process will eventually run out of heap memory (OOM) on extremely long sessions.
**The Fix (The Working Buffer Architecture):** By calculating the token budget
against the mutable `workingContext` (which is actively compacted by background
snapshots) rather than the immutable `pristineLog`, the token count will
successfully drop back below `retainedTokens` (65k). This breaks the infinite
event loop and prevents OOM crashes. The `pristineLog` will just grow until the
future Memory Subsystem is built to page it to disk.
### 🚨 Minor Risk: Identity Map Mutation
**The Risk:**
`IrMapper` relies on `WeakMap<Content, string>`. If the user uses a UI command to *edit* a previous message, `AgentChatHistory` might replace the `Content` object reference. This would generate a new UUID, instantly orphaning any background variants currently computing for the old reference.
**The Mitigation:**
We must ensure `ContextManager` handles orphaned `VARIANT_READY` events gracefully (e.g., if `targetId` is not found, simply discard the variant and log a debug warning). (I verified we already wrote `if (targetEp)` checks in `ContextManager`, so this is mitigated).
**The Risk:** `IrMapper` relies on `WeakMap<Content, string>`. If the user uses
a UI command to _edit_ a previous message, `AgentChatHistory` might replace the
`Content` object reference. This would generate a new UUID, instantly orphaning
any background variants currently computing for the old reference. **The
Mitigation:** We must ensure `ContextManager` handles orphaned `VARIANT_READY`
events gracefully (e.g., if `targetId` is not found, simply discard the variant
and log a debug warning). (I verified we already wrote `if (targetEp)` checks in
`ContextManager`, so this is mitigated).
@@ -15,11 +15,6 @@ exports[`ContextManager Golden Tests > should process history and match golden s
{
"text": "in a galaxy far far away...",
},
],
"role": "model",
},
{
"parts": [
{
"functionCall": {
"args": {},
@@ -140,7 +140,9 @@ describe('ContextManager Golden Tests', () => {
it('should process history and match golden snapshot', async () => {
const history = createLargeHistory();
(contextManager as any).pristineEpisodes = (await import('./ir/mapper.js')).IrMapper.toIr(history);
(contextManager as any).pristineEpisodes = (
await import('./ir/mapper.js')
).IrMapper.toIr(history);
const result = await contextManager.projectCompressedHistory();
expect(result).toMatchSnapshot();
});
@@ -177,7 +179,9 @@ describe('ContextManager Golden Tests', () => {
},
});
const history = createLargeHistory();
(contextManager as any).pristineEpisodes = (await import('./ir/mapper.js')).IrMapper.toIr(history);
(contextManager as any).pristineEpisodes = (
await import('./ir/mapper.js')
).IrMapper.toIr(history);
// In Golden Tests, we just want to ensure the logic doesn't throw or alter unprotected history in weird ways.
// Since we're skipping processors due to being under budget, it should equal history.
const result = await contextManager.projectCompressedHistory();
+185 -112
View File
@@ -6,7 +6,7 @@
import type { Content } from '@google/genai';
import type { Config } from '../config/config.js';
import type { GeminiClient } from '../core/client.js';
import type { ContextAccountingState, ContextProcessor } from './pipeline.js';
import type { ContextProcessor } from './pipeline.js';
import type { AgentChatHistory } from '../core/agentChatHistory.js';
import { debugLogger } from '../utils/debugLogger.js';
import { IrMapper } from './ir/mapper.js';
@@ -16,7 +16,6 @@ import { ContextEventBus } from './eventBus.js';
export class ContextManager {
private config: Config;
private processors: ContextProcessor[] = [];
// The stateful, pristine Episodic Intermediate Representation graph.
// This allows the agent to remember and summarize continuously without losing data across turns.
@@ -30,20 +29,23 @@ export class ContextManager {
this.eventBus.onVariantReady((event) => {
// Find the target episode in the pristine graph
const targetEp = this.pristineEpisodes.find(ep => ep.id === event.targetId);
const targetEp = this.pristineEpisodes.find(
(ep) => ep.id === event.targetId,
);
if (targetEp) {
if (!targetEp.variants) {
targetEp.variants = {};
}
targetEp.variants[event.variantId] = event.variant;
debugLogger.log(`ContextManager: Received async variant [${event.variantId}] for Episode ${event.targetId}.`);
debugLogger.log(
`ContextManager: Received async variant [${event.variantId}] for Episode ${event.targetId}.`,
);
}
});
}
setProcessors(processors: ContextProcessor[]) {
this.processors = processors;
}
}
/**
* Subscribes to the core AgentChatHistory to natively track all message events,
@@ -53,11 +55,11 @@ export class ContextManager {
if (this.unsubscribeHistory) {
this.unsubscribeHistory();
}
this.unsubscribeHistory = chatHistory.subscribe((event) => {
// Rebuild the pristine IR graph from the full source history on every change.
// We must map the FULL array at once because IrMapper groups adjacent
// function calls and responses into unified Episodes. Pushing messages
// We must map the FULL array at once because IrMapper groups adjacent
// function calls and responses into unified Episodes. Pushing messages
// individually would shatter these episodic boundaries.
this.pristineEpisodes = IrMapper.toIr(chatHistory.get());
this.checkTriggers(); // Eager Compute & Ship of Theseus Triggers
@@ -68,10 +70,14 @@ export class ContextManager {
if (!this.config.isContextManagementEnabled()) return;
const mngConfig = this.config.getContextManagementConfig();
const currentTokens = this.calculateIrTokens(this.pristineEpisodes);
// Calculate tokens based on the *Working Buffer View*, not the raw pristine log.
// This solves Bug 2: The View shrinks when variants are applied, preventing infinite GC loops.
const workingBuffer = this.getWorkingBufferView();
const currentTokens = this.calculateIrTokens(workingBuffer);
// 1. Eager Compute Trigger (Continuous Streaming)
// Broadcast the full graph to the async workers so they can proactively summarize partial massive files.
// Broadcast the full pristine log to the async workers so they can proactively summarize partial massive files.
this.eventBus.emitChunkReceived({ episodes: this.pristineEpisodes });
// 2. The Ship of Theseus Trigger (retainedTokens crossed)
@@ -79,12 +85,142 @@ export class ContextManager {
if (currentTokens > mngConfig.budget.retainedTokens) {
const deficit = currentTokens - mngConfig.budget.retainedTokens;
this.eventBus.emitConsolidationNeeded({
episodes: this.pristineEpisodes,
episodes: workingBuffer, // Pass the working buffer so they know what still needs compression
targetDeficit: deficit,
});
}
}
/**
* Generates a computed view of the pristine log.
* Sweeps backwards (newest to oldest), tracking rolling tokens.
* When rollingTokens > retainedTokens, it injects the "best" available ready variant
* (snapshot > summary > masked) instead of the raw text.
* Handles N-to-1 variant skipping automatically.
*/
public getWorkingBufferView(): Episode[] {
const mngConfig = this.config.getContextManagementConfig();
const retainedTokens = mngConfig.budget.retainedTokens;
let currentEpisodes: Episode[] = [];
let rollingTokens = 0;
const skippedIds = new Set<string>();
for (let i = this.pristineEpisodes.length - 1; i >= 0; i--) {
const ep = this.pristineEpisodes[i];
// If this episode was already replaced by an N-to-1 Snapshot injected earlier in the sweep, skip it entirely!
// This solves Bug 1 (Duplicate Projection).
if (skippedIds.has(ep.id)) continue;
let projectedEp = {
...ep,
trigger: {
...ep.trigger,
metadata: {
...ep.trigger.metadata,
transformations: [...ep.trigger.metadata.transformations],
},
semanticParts:
ep.trigger.type === 'USER_PROMPT'
? [...ep.trigger.semanticParts.map((sp) => ({ ...sp }))]
: undefined,
} as any,
steps: ep.steps.map(
(step) =>
({
...step,
metadata: {
...step.metadata,
transformations: [...step.metadata.transformations],
},
}) as any,
),
yield: ep.yield
? {
...ep.yield,
metadata: {
...ep.yield.metadata,
transformations: [...ep.yield.metadata.transformations],
},
}
: undefined,
};
const epTokens = this.calculateIrTokens([projectedEp]);
if (rollingTokens > retainedTokens && ep.variants) {
const snapshot = ep.variants['snapshot'];
const summary = ep.variants['summary'];
const masked = ep.variants['masked'];
if (
snapshot &&
snapshot.status === 'ready' &&
snapshot.type === 'snapshot'
) {
projectedEp = snapshot.episode as any;
// Mark all the episodes this snapshot covers to be skipped by the backwards sweep.
for (const id of snapshot.replacedEpisodeIds) {
skippedIds.add(id);
}
debugLogger.log(
`Opportunistically swapped Episodes [${snapshot.replacedEpisodeIds.join(', ')}] for pre-computed Snapshot variant.`,
);
} else if (
summary &&
summary.status === 'ready' &&
summary.type === 'summary'
) {
projectedEp.steps = [
{
id: ep.id + '-summary',
type: 'AGENT_THOUGHT',
text: summary.text,
metadata: {
originalTokens: epTokens,
currentTokens: summary.recoveredTokens || 50,
transformations: [
{
processorName: 'AsyncSemanticCompressor',
action: 'SUMMARIZED',
timestamp: Date.now(),
},
],
},
},
] as any;
projectedEp.yield = undefined;
debugLogger.log(
`Opportunistically swapped Episode ${ep.id} for pre-computed Summary variant.`,
);
} else if (
masked &&
masked.status === 'ready' &&
masked.type === 'masked'
) {
if (
projectedEp.trigger.type === 'USER_PROMPT' &&
projectedEp.trigger.semanticParts.length > 0
) {
projectedEp.trigger.semanticParts[0].presentation = {
text: masked.text,
tokens: masked.recoveredTokens || 10,
};
}
debugLogger.log(
`Opportunistically swapped Episode ${ep.id} for pre-computed Masked variant.`,
);
}
}
currentEpisodes.unshift(projectedEp);
rollingTokens += this.calculateIrTokens([projectedEp]);
}
return currentEpisodes;
}
/**
* Returns a temporary, compressed Content[] array to be used exclusively for the LLM request.
* This does NOT mutate the pristine episodic graph.
@@ -96,126 +232,63 @@ export class ContextManager {
const mngConfig = this.config.getContextManagementConfig();
const maxTokens = mngConfig.budget.maxTokens;
const retainedTokens = mngConfig.budget.retainedTokens;
// Default block GC: target the 65k floor instantly.
let targetTokens = retainedTokens;
// Deep-ish clone the IR graph so processors only mutate the projected copy.
// The processors only modify `presentation` and `metadata.transformations`.
// 1. Opportunistic Swap (The Ship of Theseus)
// We build the projection array by sweeping through pristine history.
// If we are over the retained threshold, we look for pre-computed, 'ready' variants
// and seamlessly inject them instead of the raw text.
let currentEpisodes: Episode[] = [];
let rollingTokens = 0;
// We walk backwards (newest to oldest) to easily know when we cross the retained threshold.
for (let i = this.pristineEpisodes.length - 1; i >= 0; i--) {
const ep = this.pristineEpisodes[i];
let projectedEp = {
...ep,
trigger: { ...ep.trigger, metadata: { ...ep.trigger.metadata, transformations: [...ep.trigger.metadata.transformations] }, semanticParts: ep.trigger.type === 'USER_PROMPT' ? [...ep.trigger.semanticParts.map(sp => ({...sp}))] : undefined } as any,
steps: ep.steps.map((step) => ({ ...step, metadata: { ...step.metadata, transformations: [...step.metadata.transformations] } } as any)),
yield: ep.yield ? { ...ep.yield, metadata: { ...ep.yield.metadata, transformations: [...ep.yield.metadata.transformations] } } : undefined,
};
const epTokens = this.calculateIrTokens([projectedEp]);
// If this episode falls entirely outside the retained threshold AND has a ready variant, swap it!
if (rollingTokens > retainedTokens && ep.variants) {
// Look for the best available variant
const snapshot = ep.variants['snapshot'];
const summary = ep.variants['summary'];
const masked = ep.variants['masked'];
if (snapshot && snapshot.status === 'ready' && snapshot.type === 'snapshot') {
// A snapshot replaces this node ENTIRELY (and potentially others, but for now we just swap this node)
// To be perfectly accurate, a snapshot variant usually replaces multiple episodes.
// But as a simplistic projection, we just use the snapshot's episode structure.
projectedEp = snapshot.episode as any;
debugLogger.log(`Opportunistically swapped Episode ${ep.id} for pre-computed Snapshot variant.`);
} else if (summary && summary.status === 'ready' && summary.type === 'summary') {
// A summary replaces all the steps with a single thought containing the summary text.
projectedEp.steps = [{
id: ep.id + '-summary',
type: 'AGENT_THOUGHT',
text: summary.text,
metadata: { originalTokens: epTokens, currentTokens: summary.recoveredTokens || 50, transformations: [{ processorName: 'AsyncSemanticCompressor', action: 'SUMMARIZED', timestamp: Date.now() }] }
}] as any;
projectedEp.yield = undefined; // Drop the yield, the summary covers it
debugLogger.log(`Opportunistically swapped Episode ${ep.id} for pre-computed Summary variant.`);
} else if (masked && masked.status === 'ready' && masked.type === 'masked') {
// We just replace the raw text with the masked text variant
if (projectedEp.trigger.type === 'USER_PROMPT' && projectedEp.trigger.semanticParts.length > 0) {
projectedEp.trigger.semanticParts[0].presentation = { text: masked.text, tokens: masked.recoveredTokens || 10 };
}
debugLogger.log(`Opportunistically swapped Episode ${ep.id} for pre-computed Masked variant.`);
}
}
currentEpisodes.unshift(projectedEp); // Put it back in oldest-to-newest order
rollingTokens += this.calculateIrTokens([projectedEp]);
}
// Get the dynamically computed Working Buffer View
let currentEpisodes = this.getWorkingBufferView();
let currentTokens = this.calculateIrTokens(currentEpisodes);
if (currentTokens <= maxTokens) {
return IrMapper.fromIr(currentEpisodes);
}
// incrementalGc: instead of instantly dropping from 150k to 65k (block GC),
// we only prune exactly enough tokens to survive the incoming turn.
// However, the processors are STILL instructed to squash/compress down to the
// 65k floor (the "bloom filter" backbuffer). They just stop early once
// the immediate maxTokens deficit is cleared.
if (mngConfig.budget.incrementalGc) {
const immediateDeficit = currentTokens - maxTokens;
// We set the target just beneath the current ceiling to clear the immediate deficit.
// This forces the oldest nodes to heavily compress (since they are furthest from the 65k floor),
// but stops the pipeline as soon as we drop back under 150k.
targetTokens = currentTokens - immediateDeficit;
}
// --- The Synchronous Pressure Barrier ---
// The background eager workers couldn't keep up, or a massive file was pasted.
// The Working Buffer View is still over the absolute hard limit (maxTokens).
// We MUST reduce tokens before returning, or the API request will 400.
debugLogger.log(
`Context Manager triggered: Context window at ${currentTokens} tokens (limit: ${maxTokens}, target: ${targetTokens}).`,
`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.budget.maxPressureStrategy}`,
);
const protectedEpisodeIds = new Set<string>();
// Protect the very first episode (often contains the initial architectural ask/system prompt)
if (mngConfig.budget.protectSystemEpisode && currentEpisodes.length > 0) {
protectedEpisodeIds.add(currentEpisodes[0].id);
}
// Protect the most recent episode (current working context)
if (currentEpisodes.length > 1) {
protectedEpisodeIds.add(currentEpisodes[currentEpisodes.length - 1].id);
}
for (const processor of this.processors) {
const state: ContextAccountingState = {
currentTokens,
maxTokens,
retainedTokens: targetTokens,
deficitTokens: Math.max(0, currentTokens - targetTokens),
protectedEpisodeIds,
isBudgetSatisfied: currentTokens <= targetTokens,
};
if (state.isBudgetSatisfied) {
debugLogger.log('Context Manager satisfied budget. Stopping early.');
break;
if (mngConfig.budget.maxPressureStrategy === 'truncate') {
// Simplest, fastest fallback. Drop oldest unprotected episodes until under maxTokens.
const truncated: Episode[] = [];
let remainingTokens = currentTokens;
for (const ep of currentEpisodes) {
const epTokens = this.calculateIrTokens([ep]);
if (remainingTokens > maxTokens && !protectedEpisodeIds.has(ep.id)) {
remainingTokens -= epTokens;
debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`);
} else {
truncated.push(ep);
}
}
debugLogger.log(`Running ContextProcessor: ${processor.name}`);
currentEpisodes = await processor.process(currentEpisodes, state);
const newTokens = this.calculateIrTokens(currentEpisodes);
if (newTokens < currentTokens) {
debugLogger.log(
`Processor [${processor.name}] saved approx ${currentTokens - newTokens} tokens. New estimate: ${newTokens}.`,
);
currentTokens = newTokens;
currentEpisodes = truncated;
} else if (mngConfig.budget.maxPressureStrategy === 'compress') {
// TODO: Synchronously invoke the StateSnapshotWorker, wait for it to finish,
// merge the variants, and regenerate the View.
// For now, if compress fails/isn't wired synchronously, we fallback to truncate.
debugLogger.warn('Synchronous compress barrier not fully implemented, falling back to truncate.');
const truncated: Episode[] = [];
let remainingTokens = currentTokens;
for (const ep of currentEpisodes) {
const epTokens = this.calculateIrTokens([ep]);
if (remainingTokens > maxTokens && !protectedEpisodeIds.has(ep.id)) {
remainingTokens -= epTokens;
} else {
truncated.push(ep);
}
}
currentEpisodes = truncated;
}
const finalTokens = this.calculateIrTokens(currentEpisodes);
+2 -4
View File
@@ -121,10 +121,8 @@ describe('IrMapper', () => {
// Compare basic structure (the reconstituted version might have slightly different grouping of calls/responses
// based on flush logic, but semantically equivalent)
expect(reconstituted[0]).toEqual(rawHistory[0]);
expect(reconstituted[1]).toEqual({
role: 'model',
parts: [{ text: 'Let me check those files.' }],
}); // We flushed after thought
// Reconstituted history is identical except tool IDs will be reassigned because IrMapper discards string IDs in favor of deterministic object hash IDs
expect(reconstituted[1].parts![0]).toEqual(rawHistory[1].parts![0]);
// The exact structural equivalence isn't mathematically perfect because Gemini allows mixing text and calls
// in one Content block, but the flat representation is semantically identical.
+2 -5
View File
@@ -262,11 +262,8 @@ export class IrMapper {
for (const step of ep.steps) {
if (step.type === 'AGENT_THOUGHT') {
flushPending();
history.push({
role: 'model',
parts: [{ text: step.presentation?.text ?? step.text }],
});
if (pendingUserParts.length > 0) flushPending();
pendingModelParts.push({ text: step.presentation?.text ?? step.text });
} else if (step.type === 'TOOL_EXECUTION') {
pendingModelParts.push({
functionCall: {
@@ -142,7 +142,9 @@ describe('SemanticCompressionProcessor', () => {
expect(thoughtPart.presentation!.text).toContain('Mocked Summary!');
expect(toolPart.presentation).toBeDefined();
expect((toolPart.presentation!.observation as Record<string, string>)['summary']).toContain('Mocked Summary!');
expect(
(toolPart.presentation!.observation as Record<string, string>)['summary'],
).toContain('Mocked Summary!');
});
it('stops calling LLM when deficit hits zero', async () => {
@@ -102,7 +102,7 @@ export class SemanticCompressionProcessor implements ContextProcessor {
// 3. Compress Tool Observations
if (step.type === 'TOOL_EXECUTION') {
const rawObs = step.presentation?.observation ?? step.observation;
let stringifiedObs = '';
if (typeof rawObs === 'string') {
stringifiedObs = rawObs;
@@ -1,122 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { randomUUID } from 'node:crypto';
import type { Config } from '../../config/config.js';
import type { Episode } from '../ir/types.js';
import type { ContextProcessor, ContextAccountingState } from '../pipeline.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
export class StateSnapshotProcessor implements ContextProcessor {
name = 'StateSnapshotProcessor';
constructor(_config: Config) {}
async process(
episodes: Episode[],
state: ContextAccountingState,
): Promise<Episode[]> {
if (state.isBudgetSatisfied) return episodes;
// TODO: Need a way to read from config if we are doing N-to-1 synthesis.
// For now, let's establish the structural skeleton.
// Identify the "dying" block of episodes that need to be collected.
// We grab unprotected episodes from oldest to newest.
const unprotectedOldest = episodes.filter(
(ep) => !state.protectedEpisodeIds.has(ep.id),
);
if (unprotectedOldest.length === 0) return episodes;
let targetDeficit = state.deficitTokens;
const episodesToSynthesize: Episode[] = [];
let tokensToSynthesize = 0;
for (const ep of unprotectedOldest) {
if (tokensToSynthesize >= targetDeficit) break;
episodesToSynthesize.push(ep);
// Rough estimate of tokens in this episode
const epTokens = ep.steps.reduce(
(sum, step) => sum + step.metadata.currentTokens,
ep.trigger.metadata.currentTokens +
(ep.yield?.metadata.currentTokens || 0),
);
tokensToSynthesize += epTokens;
}
if (episodesToSynthesize.length === 0) return episodes;
debugLogger.log(
`StateSnapshotProcessor: Synthesizing ${episodesToSynthesize.length} episodes to recover ~${tokensToSynthesize} tokens.`,
);
// TODO: Perform the LLM call using this.config.getBaseLlmClient()
// For now, we will create a dummy structural snapshot to prove the topological transformation works.
const mockSnapshotText = `
<world_state_snapshot>
Synthesized ${episodesToSynthesize.length} episodes.
This is where the LLM's highly structured state representation will live.
</world_state_snapshot>`;
const snapshotTokens = estimateTokenCountSync([{ text: mockSnapshotText }]);
const snapshotEpisode: Episode = {
id: randomUUID(),
timestamp: Date.now(),
trigger: {
id: randomUUID(),
type: 'SYSTEM_EVENT',
name: 'world_state_snapshot',
payload: {
originalEpisodeCount: episodesToSynthesize.length,
recoveredTokens: tokensToSynthesize,
},
metadata: {
originalTokens: snapshotTokens,
currentTokens: snapshotTokens,
transformations: [{processorName: 'StateSnapshotProcessor', action: 'SYNTHESIZED', timestamp: Date.now()}],
},
},
steps: [
{
id: randomUUID(),
type: 'AGENT_THOUGHT',
text: mockSnapshotText,
metadata: {
originalTokens: snapshotTokens,
currentTokens: snapshotTokens,
transformations: [],
},
},
],
};
// Filter out the episodes we synthesized from the main graph.
const synthesizedIds = new Set(episodesToSynthesize.map((e) => e.id));
const newEpisodes = episodes.filter((ep) => !synthesizedIds.has(ep.id));
// Inject the new snapshot right after the protected System Prompt
// (or at the top if no system prompt is protected).
let insertionIndex = 0;
if (
newEpisodes.length > 0 &&
state.protectedEpisodeIds.has(newEpisodes[0].id)
) {
insertionIndex = 1;
}
newEpisodes.splice(insertionIndex, 0, snapshotEpisode);
// Update state
// Accounting state is immutable in the pipeline design, it gets recalculated by ContextManager // (Trigger + Thought roughly)
return newEpisodes;
}
}
+2 -2
View File
@@ -8,7 +8,7 @@ import type { ContextManagementConfig } from './types.js';
export const GENERALIST_PROFILE: ContextManagementConfig = {
enabled: true,
budget: {
incrementalGc: false,
maxPressureStrategy: 'truncate',
maxTokens: 150_000,
retainedTokens: 65_000,
protectedEpisodes: 1,
@@ -30,7 +30,7 @@ export const GENERALIST_PROFILE: ContextManagementConfig = {
export const POWER_USER_PROFILE: ContextManagementConfig = {
enabled: true,
budget: {
incrementalGc: true,
maxPressureStrategy: 'truncate',
maxTokens: 150_000, // The absolute ceiling
retainedTokens: 65_000, // The "bloom filter" backbuffer floor
protectedEpisodes: 1,
+9 -4
View File
@@ -9,16 +9,21 @@ export interface ContextManagementConfig {
/** The global orchestration budget */
budget: {
/** The absolute maximum tokens before the context manager triggers */
/** The absolute maximum tokens before the context manager triggers the Synchronous Pressure Barrier */
maxTokens: number;
/** The target token count to reduce to when triggered */
/** The target token count to aggressively drop to using asynchronous "Ship of Theseus" background GC */
retainedTokens: number;
/** The number of recent Episodes to always protect from degradation (default: 1) */
protectedEpisodes: number;
/** Should we protect Episode 0 (the System Prompt/Architectural Initialization)? */
protectSystemEpisode: boolean;
/** If true, the system only evicts exactly enough tokens to stay under maxTokens, ignoring retainedTokens. (default: false) */
incrementalGc?: boolean;
/**
* The strategy to use when maxTokens is exceeded.
* - 'truncate': Drop oldest episodes until under limit (Instant, data loss)
* - 'compress': Block request, perform N-to-1 Snapshot generation, then proceed (Slow, no data loss)
*/
maxPressureStrategy: 'truncate' | 'compress';
};
/** Specific hyperparameters for degrading the context when over budget */
@@ -9,7 +9,7 @@ import type { ContextEventBus } from '../eventBus.js';
export interface AsyncContextWorker {
/** The unique name of the worker (e.g., 'StateSnapshotWorker') */
readonly name: string;
/** Starts listening to the ContextEventBus for background tasks */
start(bus: ContextEventBus): void;
@@ -8,16 +8,21 @@ import { randomUUID } from 'node:crypto';
import type { Config } from '../../config/config.js';
import type { Episode, SnapshotVariant } from '../ir/types.js';
import type { AsyncContextWorker } from './asyncContextWorker.js';
import type { ContextEventBus, ContextConsolidationEvent } from '../eventBus.js';
import type {
ContextEventBus,
ContextConsolidationEvent,
} from '../eventBus.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
import { IrMapper } from '../ir/mapper.js';
import { LlmRole } from '../../telemetry/llmRole.js';
export class StateSnapshotWorker implements AsyncContextWorker {
name = 'StateSnapshotWorker';
private bus?: ContextEventBus;
private isSynthesizing = false;
constructor(_config: Config) {}
constructor(private readonly _config: Config) {}
start(bus: ContextEventBus): void {
this.bus = bus;
@@ -31,14 +36,16 @@ export class StateSnapshotWorker implements AsyncContextWorker {
}
}
private async handleConsolidation(event: ContextConsolidationEvent): Promise<void> {
private async handleConsolidation(
event: ContextConsolidationEvent,
): Promise<void> {
if (this.isSynthesizing || event.targetDeficit <= 0) return;
// Identify the "dying" block of episodes that need to be collected.
// For now, we assume older episodes are at the front of the array.
// We only want episodes that don't already have a snapshot variant computing/ready.
const unprotectedOldest = event.episodes.filter(
(ep) => !ep.variants?.['snapshot']
(ep) => !ep.variants?.['snapshot'],
);
if (unprotectedOldest.length === 0) return;
@@ -68,17 +75,38 @@ export class StateSnapshotWorker implements AsyncContextWorker {
`StateSnapshotWorker: Asynchronously synthesizing ${episodesToSynthesize.length} episodes to recover ~${tokensToSynthesize} tokens.`,
);
// TODO: Perform the LLM call using this.config.getBaseLlmClient()
// For now, we will create a dummy structural snapshot to prove the topological transformation works.
await new Promise((resolve) => setTimeout(resolve, 500)); // Simulate async work
const client = this._config.getBaseLlmClient();
const rawContents = IrMapper.fromIr(episodesToSynthesize);
const promptText = `
You are a background memory consolidation worker for an AI assistant.
Your task is to review the following block of the oldest conversation history and synthesize it into a highly dense, accurate "World State Snapshot".
This snapshot will completely replace these old memories.
Preserve all critical facts, technical decisions, file paths, and outstanding tasks. Discard all conversational filler.
Conversation History to Synthesize:
${JSON.stringify(rawContents, null, 2).slice(0, 50000)}
Output the snapshot as a dense, structured summary.`;
const response = await client.generateContent({
modelConfigKey: { model: 'gemini-2.5-flash' }, // Fast and cheap for background tasks
contents: [{ role: 'user', parts: [{ text: promptText }] }],
promptId: 'async-world-state-snapshot',
role: LlmRole.UTILITY_COMPRESSOR,
abortSignal: new AbortController().signal, // Run in background, could add cancellation logic later
});
// Extract text safely from the GenAI response
const snapshotText = response.text || '[Failed to generate snapshot]';
const mockSnapshotText = `
<world_state_snapshot>
Synthesized ${episodesToSynthesize.length} episodes.
This is where the LLM's highly structured state representation will live.
${snapshotText}
</world_state_snapshot>`;
const snapshotTokens = estimateTokenCountSync([{ text: mockSnapshotText }]);
const snapshotTokens = estimateTokenCountSync([
{ text: mockSnapshotText },
]);
const replacedEpisodeIds = episodesToSynthesize.map((e) => e.id);
@@ -96,7 +124,13 @@ This is where the LLM's highly structured state representation will live.
metadata: {
originalTokens: snapshotTokens,
currentTokens: snapshotTokens,
transformations: [{processorName: 'StateSnapshotWorker', action: 'SYNTHESIZED', timestamp: Date.now()}],
transformations: [
{
processorName: 'StateSnapshotWorker',
action: 'SYNTHESIZED',
timestamp: Date.now(),
},
],
},
},
steps: [
@@ -121,7 +155,7 @@ This is where the LLM's highly structured state representation will live.
replacedEpisodeIds,
};
// Emit the variant for the MOST RECENT episode in the batch,
// Emit the variant for the MOST RECENT episode in the batch,
// since the Opportunistic Swapper sweeps from newest to oldest.
const targetId = replacedEpisodeIds[replacedEpisodeIds.length - 1];
@@ -132,7 +166,6 @@ This is where the LLM's highly structured state representation will live.
variant,
});
}
} finally {
this.isSynthesizing = false;
}
+5 -1
View File
@@ -62,7 +62,11 @@ export class AgentChatHistory {
}
flatMap<U>(
callback: (value: Content, index: number, array: Content[]) => U | readonly U[]
callback: (
value: Content,
index: number,
array: Content[],
) => U | readonly U[],
): U[] {
return this.history.flatMap(callback);
}
+2 -2
View File
@@ -118,7 +118,7 @@ export class GeminiClient {
this.compressionService = new ChatCompressionService();
this.contextManager = new ContextManager(this.config, this);
// Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback
// Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback
this.contextManager.setProcessors([
new ToolMaskingProcessor(this.config),
new BlobDegradationProcessor(this.config),
@@ -651,7 +651,7 @@ export class GeminiClient {
request,
this.getContentGeneratorOrFail(),
modelForLimitCheck,
activeHistory // Added a new parameter to calculate tokens against the projected history!
activeHistory, // Added a new parameter to calculate tokens against the projected history!
);
if (estimatedRequestTokenCount > remainingTokenCount) {
yield {
+2 -2
View File
@@ -347,10 +347,10 @@ export class GeminiChat {
// Add user content to pristine history ONCE before any attempts.
this.agentHistory.push(userContent as Content);
// We use the injected activeHistory (which contains the projected, compressed context),
// but we MUST append the newly created userContent to it for the immediate network request.
const requestContents = activeHistory
const requestContents = activeHistory
? [...activeHistory, userContent]
: this.getHistory(true);