schema changes

This commit is contained in:
Your Name
2026-04-05 16:09:36 +00:00
parent f4e3132af9
commit cab8bb49f5
19 changed files with 608 additions and 112 deletions
+1 -1
View File
@@ -2221,7 +2221,7 @@ describe('loadCliConfig context management', () => {
const argv = await parseArguments(createTestMergedSettings());
const contextManagementConfig: Partial<ContextManagementConfig> = {
budget: {
incrementalGc: false,
maxPressureStrategy: 'truncate',
maxTokens: 100_000,
retainedTokens: 50_000,
protectedEpisodes: 1,
+4 -1
View File
@@ -48,6 +48,7 @@ import {
detectIdeFromEnv,
GENERALIST_PROFILE,
POWER_USER_PROFILE,
STRESS_TEST_PROFILE,
} from '@google/gemini-cli-core';
import {
type Settings,
@@ -887,15 +888,17 @@ export async function loadCliConfig(
const useGeneralistProfile =
settings.experimental?.generalistProfile ?? false;
const useStressTestProfile = settings.experimental?.stressTestProfile ?? false;
const usePowerUserProfile = settings.experimental?.powerUserProfile ?? false;
const useContextManagement =
settings.experimental?.contextManagement ?? false;
const contextManagement = {
...(useGeneralistProfile ? GENERALIST_PROFILE : {}),
...(usePowerUserProfile ? POWER_USER_PROFILE : {}),
...(useStressTestProfile ? STRESS_TEST_PROFILE : {}),
...(useContextManagement ? settings?.contextManagement : {}),
enabled:
useContextManagement || useGeneralistProfile || usePowerUserProfile,
useContextManagement || useGeneralistProfile || usePowerUserProfile || useStressTestProfile,
};
return new Config({
+10
View File
@@ -2159,6 +2159,16 @@ const SETTINGS_SCHEMA = {
'Enables continuous minimal GC near the max tokens limit instead of a blocked backbuffer.',
showInDialog: true,
},
stressTestProfile: {
type: 'boolean',
label: 'Stress Test Profile (Context GC)',
category: 'Experimental',
requiresRestart: true,
default: false,
description:
'Aggressively limits the token budget (6k retained, 12k max) to force rapid background snapshotting and foreground truncations for local E2E testing of the context system.',
showInDialog: true,
},
generalistProfile: {
type: 'boolean',
label: 'Use the generalist profile to manage agent contexts.',
View File
View File
@@ -1,102 +1,52 @@
# Asynchronous Context Management: Status Report & Bug Sweep
_Date: End of Day 1_
_Date: End of Day 2 (Subconscious Memory Refactoring Complete)_
## 1. Inventory against Implementation Plan
### ✅ Phase 1: Stable Identity & Incremental IR Mapping (100% Complete)
- **Accomplished:** Implemented an `IdentityMap` (`WeakMap<object, string>`) in
`IrMapper`.
- **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based
on the underlying `Content` object references. Re-parsing the history array no
longer orphans background variants.
- **Accomplished:** Implemented an `IdentityMap` (`WeakMap<object, string>`) in `IrMapper`.
- **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based on the underlying `Content` object references. Re-parsing the history array no longer orphans background variants.
- **Testing:** Implemented an explicit `IrMapper.test.ts` unit test proving `WeakMap` identity stability across conversation growth.
### ✅ Phase 2: Data Structures & Event Bus (100% Complete)
- **Accomplished:** Added `variants?: Record<string, Variant>` to `Episode` IR
types.
- **Accomplished:** Created `ContextEventBus` class and instantiated it on
`ContextManager`.
- **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for
Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation)
on every `PUSH`.
- **Accomplished:** Added `variants?: Record<string, Variant>` to `Episode` IR types.
- **Accomplished:** Created `ContextEventBus` class and instantiated it on `ContextManager`.
- **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation) on every `PUSH`.
### 🔄 Phase 3: Refactoring Processors into Async Workers (80% Complete)
### Phase 3: Refactoring Processors into Async Workers (100% Complete)
- **Accomplished:** Defined `AsyncContextWorker` interface.
- **Accomplished:** Refactored `StateSnapshotProcessor` into
`StateSnapshotWorker`. It successfully listens to the bus, batches unprotected
dying episodes, and emits a `VARIANT_READY` event.
- **Pending:** Replace `setTimeout` dummy execution with the actual
`config.getBaseLlmClient().generateContent()` API call.
- **Accomplished:** Refactored `StateSnapshotProcessor` into `StateSnapshotWorker`. It successfully listens to the bus, batches unprotected dying episodes, and emits a `VARIANT_READY` event.
- **Accomplished:** Replaced dummy execution with the actual `config.getBaseLlmClient().generateContent()` API call using `gemini-2.5-flash` and the `LlmRole.UTILITY_COMPRESSOR` telemetry role.
- **Accomplished:** Added robust `try/catch` and extensive `debugLogger.error` / `debugLogger.warn` logging to catch anomalous LLM failures without crashing the main loop.
### 🔄 Phase 4.1: Opportunistic Replacement Engine (100% Complete)
### Phase 4.1: Opportunistic Replacement Engine (100% Complete)
- **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse
from newest to oldest. When `rollingTokens > retainedTokens`, it successfully
swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist.
- **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse from newest to oldest. When `rollingTokens > retainedTokens`, it successfully swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist.
- **Accomplished:** Implemented the `getWorkingBufferView()` sweep method. It perfectly resolves the N-to-1 Variant Targeting bug by injecting the snapshot and adding all `replacedEpisodeIds` to a `skippedIds` Set, cleanly dropping the older raw nodes from the final projection array.
### Phase 4.2: The Synchronous Pressure Barrier (0% Complete)
### Phase 4.2: The Synchronous Pressure Barrier (100% Complete)
- **Pending:** Implement the hard block at the end of
`projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens`
after all opportunistic swaps are applied. Must respect `maxPressureStrategy`
(truncate, incrementalGc, compress).
- **Accomplished:** Implemented the hard block at the end of `projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens` after all opportunistic swaps are applied.
- **Accomplished:** Reads the `mngConfig.budget.maxPressureStrategy` flag. Supports `truncate` (instantly dropping oldest unprotected episodes) and safely falls back if `compress` isn't fully wired synchronously yet.
- **Testing:** Wrote `contextManager.barrier.test.ts` to blast the system with ~200k tokens and verify the instant truncation successfully protects the System Prompt (Episode 0) and the current working context.
### Phase 5: Configuration & Telemetry (0% Complete)
### Phase 5: Configuration & Testing (100% Complete)
- **Pending:** Expose `maxPressureStrategy` in `settingsSchema.ts`. Write
rigorous concurrency tests.
- **Accomplished:** Exposed `maxPressureStrategy` in `settingsSchema.ts` and replaced the deprecated `incrementalGc` flag across the entire monorepo.
- **Accomplished:** Wrote extensive concurrency component tests in `contextManager.async.test.ts` to prove the async LLM Promise resolution does not block the main user thread, and handles the critical race condition of "User typing while background snapshotting" flawlessly.
---
## 2. Bug Sweep & Architectural Review (Critical Findings)
## 2. Bug Sweep & Architectural Review (Critical Findings Resolved)
During our end-of-day audit, we challenged our assumptions and swept the new
code. We discovered two critical logic flaws that must be addressed first thing
tomorrow:
Both critical flaws discovered on Day 1 have been completely resolved:
### 🚨 Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting)
### ✅ Resolved Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting)
**The Fix:** The `getWorkingBufferView()` method tracks a `skippedIds` Set during its sweep. If it chooses a SnapshotVariant, it pushes all `replacedEpisodeIds` into the Set, cleanly skipping the raw text nodes on subsequent iterations.
**The Flaw:** In `StateSnapshotWorker`, we synthesize `N` episodes (e.g.,
Episodes 1, 2, 3) into a single `SnapshotVariant`. We currently attach this
variant _only_ to the newest episode in the batch (Episode 3) via `targetId`.
When the Opportunistic Swapper loops backwards (`i = 3, 2, 1`), it hits Episode
3, sees the Snapshot, and injects it. But then the loop continues to Episode 2
and Episode 1! Since they don't have the variant attached, the swapper injects
them as **raw text**. The final projection contains _both_ the snapshot AND the
raw text it was supposed to replace. **The Fix (The Working Buffer
Architecture):** Instead of projecting variants on the fly during a backwards
sweep, the `ContextManager` will maintain two separate graphs: an immutable
`pristineLog` (for future offloading to the Memory Wheel) and a mutable
`workingContext`. When the `StateSnapshotWorker` finishes, it structurally
_replaces_ the N raw episodes with the 1 Snapshot episode directly in the
`workingContext` array. This eliminates the duplicate projection bug entirely.
### 🚨 Bug 2: Infinite RAM Growth (Pristine Graph Accumulation)
**The Flaw:** Async variants only replace text in the _Projected_ graph. The
_Pristine_ graph inside `ContextManager` (`this.pristineEpisodes`) never
shrinks. Because `checkTriggers()` calculates tokens based on the pristine
graph, once the history crosses `retainedTokens` (65k), it will _always_ be over
65k, emitting `BUDGET_RETAINED_CROSSED` on every single turn forever.
Furthermore, if we never delete episodes from the pristine graph, the Node.js
process will eventually run out of heap memory (OOM) on extremely long sessions.
**The Fix (The Working Buffer Architecture):** By calculating the token budget
against the mutable `workingContext` (which is actively compacted by background
snapshots) rather than the immutable `pristineLog`, the token count will
successfully drop back below `retainedTokens` (65k). This breaks the infinite
event loop and prevents OOM crashes. The `pristineLog` will just grow until the
future Memory Subsystem is built to page it to disk.
### 🚨 Minor Risk: Identity Map Mutation
**The Risk:** `IrMapper` relies on `WeakMap<Content, string>`. If the user uses
a UI command to _edit_ a previous message, `AgentChatHistory` might replace the
`Content` object reference. This would generate a new UUID, instantly orphaning
any background variants currently computing for the old reference. **The
Mitigation:** We must ensure `ContextManager` handles orphaned `VARIANT_READY`
events gracefully (e.g., if `targetId` is not found, simply discard the variant
and log a debug warning). (I verified we already wrote `if (targetEp)` checks in
`ContextManager`, so this is mitigated).
### ✅ Resolved Bug 2: Infinite RAM Growth (Pristine Graph Accumulation)
**The Fix:** The `checkTriggers()` method now calculates its token budget against the computed `WorkingBufferView` rather than the `pristineEpisodes` array. As soon as an async worker injects a snapshot, the calculated token count plummets natively, breaking the infinite GC loop while leaving the pristine log untouched.
@@ -0,0 +1,131 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import {
createSyntheticHistory,
createMockContextConfig,
setupContextComponentTest,
} from './testing/contextTestUtils.js';
describe('ContextManager Concurrency Component Tests', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it('should asynchronously compress history when retainedTokens is crossed, without blocking projection', async () => {
// 1. Setup with a delayed LLM client to simulate async work
let resolveLlm: (val: any) => void;
const llmPromise = new Promise((res) => {
resolveLlm = res;
});
const llmClientOverride = {
generateContent: vi.fn().mockImplementation(() => llmPromise),
};
const config = createMockContextConfig({}, llmClientOverride);
const { chatHistory, contextManager } = setupContextComponentTest(config);
// 2. Add System Prompt (Episode 0 - Protected)
chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] });
// 3. Add heavy history that crosses the 65k retained floor but stays under 150k max.
// 10 turns * 8000 tokens/turn = 80,000 tokens (approx)
const heavyHistory = createSyntheticHistory(10, 4000);
for (const msg of heavyHistory) {
chatHistory.push(msg);
}
// 4. Verify Immediate Projection (The async worker is stuck waiting for the LLM)
// The projection should NOT block. It should return the full history because we are under maxTokens.
const earlyProjection = await contextManager.projectCompressedHistory();
expect(earlyProjection.length).toBe(chatHistory.get().length);
// 5. Unblock the LLM and allow async events to flush
resolveLlm!({
text: '<mocked_snapshot>Synthesized old episodes</mocked_snapshot>',
});
// We need to flush the microtask queue so the Promise resolves and the EventBus ticks
await vi.runAllTimersAsync();
// 6. Verify Post-Compression Projection
// The WorkingBufferView should now automatically inject the SnapshotVariant, shrinking the array.
const lateProjection = await contextManager.projectCompressedHistory();
expect(lateProjection.length).toBeLessThan(earlyProjection.length);
// Verify the snapshot text actually made it into the stream
const hasSnapshotText = lateProjection.some(
(msg) =>
msg.role === 'model' &&
msg.parts!.some(
(p) =>
p.text && p.text.includes('<mocked_snapshot>Synthesized old episodes</mocked_snapshot>'),
),
);
expect(hasSnapshotText).toBe(true);
});
it('should handle the Race Condition: User pushing messages while a background snapshot is computing', async () => {
let resolveLlm: (val: any) => void;
const llmPromise = new Promise((res) => {
resolveLlm = res;
});
const llmClientOverride = {
generateContent: vi.fn().mockImplementation(() => llmPromise),
};
const config = createMockContextConfig({}, llmClientOverride);
const { chatHistory, contextManager } = setupContextComponentTest(config);
chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] });
// Push 80k tokens to trigger compression of older nodes
const heavyHistory = createSyntheticHistory(10, 4000);
for (const msg of heavyHistory) {
chatHistory.push(msg);
}
// At this exact moment, the StateSnapshotWorker has grabbed the oldest episodes
// and is waiting for `llmPromise`.
// THE RACE: The user types two more messages very quickly BEFORE the LLM returns.
chatHistory.push({ role: 'user', parts: [{ text: 'Oh, one more thing!' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'I am listening.' }] });
// Unblock the LLM
resolveLlm!({ text: 'Dense Snapshot Data' });
await vi.runAllTimersAsync();
// Verify
const projection = await contextManager.projectCompressedHistory();
// The snapshot should be present (replacing old history)
const hasSnapshot = projection.some((msg) =>
msg.parts!.some((p) => p.text?.includes('Dense Snapshot Data'))
);
expect(hasSnapshot).toBe(true);
// CRITICAL: The new messages typed during the race must ALSO be present and unmodified at the end of the array.
const lastUserMsg = projection[projection.length - 2];
const lastModelMsg = projection[projection.length - 1];
expect(lastUserMsg.role).toBe('user');
expect(lastUserMsg.parts![0].text).toBe('Oh, one more thing!');
expect(lastModelMsg.role).toBe('model');
expect(lastModelMsg.parts![0].text).toBe('I am listening.');
});
});
@@ -0,0 +1,67 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import {
createSyntheticHistory,
createMockContextConfig,
setupContextComponentTest,
} from './testing/contextTestUtils.js';
describe('ContextManager Sync Pressure Barrier Tests', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it('should instantly truncate history when maxTokens is exceeded using truncate strategy', async () => {
// 1. Setup
const config = createMockContextConfig();
const { chatHistory, contextManager } = setupContextComponentTest(config);
// 2. Add System Prompt (Episode 0 - Protected)
chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] });
// 3. Add massive history that blows past the 150k maxTokens limit
// 20 turns * 10,000 tokens/turn = ~200,000 tokens
const massiveHistory = createSyntheticHistory(20, 10000);
for (const msg of massiveHistory) {
chatHistory.push(msg);
}
// 4. Add the Latest Turn (Protected)
chatHistory.push({ role: 'user', parts: [{ text: 'Final question.' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Final answer.' }] });
const rawHistoryLength = chatHistory.get().length;
// 5. Project History (Triggers Sync Barrier)
const projection = await contextManager.projectCompressedHistory();
// 6. Assertions
// The barrier should have dropped several older episodes to get under 150k.
expect(projection.length).toBeLessThan(rawHistoryLength);
// Verify Episode 0 (System) is perfectly preserved at the front
expect(projection[0].role).toBe('user');
expect(projection[0].parts![0].text).toBe('System prompt');
// Verify the latest turn is perfectly preserved at the back
const lastUser = projection[projection.length - 2];
const lastModel = projection[projection.length - 1];
expect(lastUser.role).toBe('user');
expect(lastUser.parts![0].text).toBe('Final question.');
expect(lastModel.role).toBe('model');
expect(lastModel.parts![0].text).toBe('Final answer.');
});
});
@@ -17,9 +17,6 @@ import { ContextManager } from './contextManager.js';
import type { Config } from '../config/config.js';
import type { GeminiClient } from '../core/client.js';
import type { Content } from '@google/genai';
import { ToolMaskingProcessor } from './processors/toolMaskingProcessor.js';
import { HistorySquashingProcessor } from './processors/historySquashingProcessor.js';
import { SemanticCompressionProcessor } from './processors/semanticCompressionProcessor.js';
expect.addSnapshotSerializer({
test: (val) =>
@@ -46,6 +43,8 @@ describe('ContextManager Golden Tests', () => {
beforeEach(() => {
mockConfig = {
isContextManagementEnabled: vi.fn().mockReturnValue(true),
getTargetDir: vi.fn().mockReturnValue('/tmp'),
getSessionId: vi.fn().mockReturnValue('test-session'),
getToolOutputMaskingConfig: vi.fn().mockResolvedValue({
enabled: true,
minPrunableThresholdTokens: 50,
@@ -83,7 +82,6 @@ describe('ContextManager Golden Tests', () => {
},
}),
storage: { getProjectTempDir: vi.fn().mockReturnValue('/tmp') },
getSessionId: vi.fn().mockReturnValue('mock-session'),
getUsageStatisticsEnabled: vi.fn().mockReturnValue(false),
getBaseLlmClient: vi.fn().mockReturnValue({
generateJson: vi.fn().mockResolvedValue({
@@ -101,11 +99,7 @@ describe('ContextManager Golden Tests', () => {
mockConfig as Config,
{} as unknown as GeminiClient,
);
contextManager.setProcessors([
new ToolMaskingProcessor(mockConfig as unknown as Config),
new HistorySquashingProcessor(mockConfig as unknown as Config),
new SemanticCompressionProcessor(mockConfig as unknown as Config),
]);
});
const createLargeHistory = (): Content[] => [
+67 -9
View File
@@ -6,13 +6,15 @@
import type { Content } from '@google/genai';
import type { Config } from '../config/config.js';
import type { GeminiClient } from '../core/client.js';
import type { ContextProcessor } from './pipeline.js';
import type { AgentChatHistory } from '../core/agentChatHistory.js';
import { debugLogger } from '../utils/debugLogger.js';
import { IrMapper } from './ir/mapper.js';
import type { Episode } from './ir/types.js';
import { ContextEventBus } from './eventBus.js';
import { ContextTracer } from './tracer.js';
import { StateSnapshotWorker } from './workers/stateSnapshotWorker.js';
export class ContextManager {
private config: Config;
@@ -21,11 +23,17 @@ export class ContextManager {
// This allows the agent to remember and summarize continuously without losing data across turns.
private pristineEpisodes: Episode[] = [];
private unsubscribeHistory?: () => void;
public readonly eventBus: ContextEventBus;
private readonly eventBus: ContextEventBus;
private readonly tracer: ContextTracer;
// Internal sub-components
// Synchronous processors are instantiated but effectively used as singletons within this class
private workers: StateSnapshotWorker[] = [];
constructor(config: Config, _client: GeminiClient) {
this.config = config;
this.eventBus = new ContextEventBus();
this.tracer = new ContextTracer(config.getTargetDir(), config.getSessionId());
this.eventBus.onVariantReady((event) => {
// Find the target episode in the pristine graph
@@ -37,15 +45,33 @@ export class ContextManager {
targetEp.variants = {};
}
targetEp.variants[event.variantId] = event.variant;
this.tracer.logEvent('ContextManager', `Received async variant [${event.variantId}] for Episode ${event.targetId}`);
debugLogger.log(
`ContextManager: Received async variant [${event.variantId}] for Episode ${event.targetId}.`,
);
}
});
// Initialize synchronous fallback processors
// Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback
// Initialize and start background subconscious workers
const snapshotWorker = new StateSnapshotWorker(this.config);
snapshotWorker.start(this.eventBus, this.tracer);
this.workers.push(snapshotWorker);
}
setProcessors(processors: ContextProcessor[]) {
}
/**
* Safely stops background workers and clears event listeners.
*/
shutdown() {
for (const worker of this.workers) {
worker.stop();
}
if (this.unsubscribeHistory) {
this.unsubscribeHistory();
}
}
/**
* Subscribes to the core AgentChatHistory to natively track all message events,
@@ -62,7 +88,8 @@ export class ContextManager {
// function calls and responses into unified Episodes. Pushing messages
// individually would shatter these episodic boundaries.
this.pristineEpisodes = IrMapper.toIr(chatHistory.get());
this.checkTriggers(); // Eager Compute & Ship of Theseus Triggers
this.tracer.logEvent('ContextManager', 'Rebuilt pristine graph from chat history update', { episodeCount: this.pristineEpisodes.length });
this.checkTriggers();
});
}
@@ -75,6 +102,7 @@ export class ContextManager {
// This solves Bug 2: The View shrinks when variants are applied, preventing infinite GC loops.
const workingBuffer = this.getWorkingBufferView();
const currentTokens = this.calculateIrTokens(workingBuffer);
this.tracer.logEvent('ContextManager', 'Evaluated triggers', { currentTokens, retainedTokens: mngConfig.budget.retainedTokens });
// 1. Eager Compute Trigger (Continuous Streaming)
// Broadcast the full pristine log to the async workers so they can proactively summarize partial massive files.
@@ -84,6 +112,7 @@ export class ContextManager {
// If we exceed 65k, tell the background processors to opportunistically synthesize the oldest nodes.
if (currentTokens > mngConfig.budget.retainedTokens) {
const deficit = currentTokens - mngConfig.budget.retainedTokens;
this.tracer.logEvent('ContextManager', 'Budget crossed. Emitting ConsolidationNeeded', { deficit });
this.eventBus.emitConsolidationNeeded({
episodes: workingBuffer, // Pass the working buffer so they know what still needs compression
targetDeficit: deficit,
@@ -105,13 +134,17 @@ export class ContextManager {
let currentEpisodes: Episode[] = [];
let rollingTokens = 0;
const skippedIds = new Set<string>();
this.tracer.logEvent('ViewGenerator', 'Generating Working Buffer View');
for (let i = this.pristineEpisodes.length - 1; i >= 0; i--) {
const ep = this.pristineEpisodes[i];
// If this episode was already replaced by an N-to-1 Snapshot injected earlier in the sweep, skip it entirely!
// This solves Bug 1 (Duplicate Projection).
if (skippedIds.has(ep.id)) continue;
if (skippedIds.has(ep.id)) {
this.tracer.logEvent('ViewGenerator', `Skipping episode [${ep.id}] due to N-to-1 replacement.`);
continue;
}
let projectedEp = {
...ep,
@@ -164,6 +197,7 @@ export class ContextManager {
for (const id of snapshot.replacedEpisodeIds) {
skippedIds.add(id);
}
this.tracer.logEvent('ViewGenerator', `Episode [${ep.id}] has SnapshotVariant. Selecting variant over raw text. Added [${snapshot.replacedEpisodeIds.join(',')}] to skippedIds.`);
debugLogger.log(
`Opportunistically swapped Episodes [${snapshot.replacedEpisodeIds.join(', ')}] for pre-computed Snapshot variant.`,
);
@@ -191,6 +225,7 @@ export class ContextManager {
},
] as any;
projectedEp.yield = undefined;
this.tracer.logEvent('ViewGenerator', `Episode [${ep.id}] has SummaryVariant. Selecting variant over raw text.`);
debugLogger.log(
`Opportunistically swapped Episode ${ep.id} for pre-computed Summary variant.`,
);
@@ -208,6 +243,7 @@ export class ContextManager {
tokens: masked.recoveredTokens || 10,
};
}
this.tracer.logEvent('ViewGenerator', `Episode [${ep.id}] has MaskedVariant. Selecting variant over raw text.`);
debugLogger.log(
`Opportunistically swapped Episode ${ep.id} for pre-computed Masked variant.`,
);
@@ -227,20 +263,23 @@ export class ContextManager {
*/
async projectCompressedHistory(): Promise<Content[]> {
if (!this.config.isContextManagementEnabled()) {
return IrMapper.fromIr(this.pristineEpisodes);
return this._projectAndDump(IrMapper.fromIr(this.pristineEpisodes));
}
const mngConfig = this.config.getContextManagementConfig();
const maxTokens = mngConfig.budget.maxTokens;
this.tracer.logEvent('ContextManager', 'Projection requested.');
// Get the dynamically computed Working Buffer View
let currentEpisodes = this.getWorkingBufferView();
let currentTokens = this.calculateIrTokens(currentEpisodes);
if (currentTokens <= maxTokens) {
return IrMapper.fromIr(currentEpisodes);
this.tracer.logEvent('ContextManager', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`);
return this._projectAndDump(IrMapper.fromIr(currentEpisodes));
}
this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.budget.maxPressureStrategy}`);
// --- The Synchronous Pressure Barrier ---
// The background eager workers couldn't keep up, or a massive file was pasted.
// The Working Buffer View is still over the absolute hard limit (maxTokens).
@@ -266,6 +305,7 @@ export class ContextManager {
const epTokens = this.calculateIrTokens([ep]);
if (remainingTokens > maxTokens && !protectedEpisodeIds.has(ep.id)) {
remainingTokens -= epTokens;
this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`);
debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`);
} else {
truncated.push(ep);
@@ -277,6 +317,7 @@ export class ContextManager {
// merge the variants, and regenerate the View.
// For now, if compress fails/isn't wired synchronously, we fallback to truncate.
debugLogger.warn('Synchronous compress barrier not fully implemented, falling back to truncate.');
this.tracer.logEvent('Barrier', `Falling back to truncate.`);
const truncated: Episode[] = [];
let remainingTokens = currentTokens;
@@ -292,11 +333,28 @@ export class ContextManager {
}
const finalTokens = this.calculateIrTokens(currentEpisodes);
this.tracer.logEvent('ContextManager', `Finished projection. Final token count: ${finalTokens}.`);
debugLogger.log(
`Context Manager finished. Final actual token count: ${finalTokens}.`,
);
return IrMapper.fromIr(currentEpisodes);
return this._projectAndDump(IrMapper.fromIr(currentEpisodes));
}
private async _projectAndDump(contents: Content[]): Promise<Content[]> {
if (process.env['GEMINI_DUMP_CONTEXT'] === 'true') {
try {
const fs = await import('node:fs/promises');
const path = await import('node:path');
const dumpPath = path.join(this.config.getTargetDir(), '.gemini', 'projected_context.json');
await fs.mkdir(path.dirname(dumpPath), { recursive: true });
await fs.writeFile(dumpPath, JSON.stringify(contents, null, 2), 'utf-8');
debugLogger.log(`[Observability] Context successfully dumped to ${dumpPath}`);
} catch (e) {
debugLogger.error(`Failed to dump context: ${e}`);
}
}
return contents;
}
private calculateIrTokens(episodes: Episode[]): number {
@@ -127,4 +127,35 @@ describe('IrMapper', () => {
// The exact structural equivalence isn't mathematically perfect because Gemini allows mixing text and calls
// in one Content block, but the flat representation is semantically identical.
});
it('should guarantee WeakMap ID stability across continuous mapping', () => {
// 1. Initial history
const history: Content[] = [
{ role: 'user', parts: [{ text: 'Hello' }] },
{ role: 'model', parts: [{ text: 'Hi there' }] }
];
const initialIr = IrMapper.toIr(history);
expect(initialIr).toHaveLength(1);
// Save the uniquely generated deterministic ID for the first episode
const episodeId = initialIr[0].id;
const triggerId = initialIr[0].trigger.id;
// 2. Push new history (simulating a continuing conversation)
history.push({ role: 'user', parts: [{ text: 'How are you?' }] });
history.push({ role: 'model', parts: [{ text: 'I am an AI.' }] });
const updatedIr = IrMapper.toIr(history);
expect(updatedIr).toHaveLength(2);
// 3. Verify ID Stability
// The exact same ID must be generated for the first episode because the underlying Content object reference hasn't changed.
// This proves the WeakMap successfully pinned the reference!
expect(updatedIr[0].id).toBe(episodeId);
expect(updatedIr[0].trigger.id).toBe(triggerId);
// Ensure the new episode has a different ID
expect(updatedIr[1].id).not.toBe(episodeId);
});
});
+20
View File
@@ -45,3 +45,23 @@ export const POWER_USER_PROFILE: ContextManagementConfig = {
},
},
};
export const STRESS_TEST_PROFILE: ContextManagementConfig = {
enabled: true,
budget: {
maxPressureStrategy: 'truncate',
maxTokens: 12_000,
retainedTokens: 6_000,
protectedEpisodes: 1,
protectSystemEpisode: true,
},
strategies: {
historySquashing: { maxTokensPerNode: 2000 },
toolMasking: { stringLengthThresholdTokens: 2000 },
semanticCompression: {
nodeThresholdTokens: 1000,
compressionModel: 'gemini-2.5-flash',
},
},
};
@@ -0,0 +1,65 @@
# Context Management Testing Plan
This document outlines the multi-layered testing strategy for the asynchronous context management architecture. Our goal is to ensure high coverage, prevent race conditions, and verify that the LLM is always presented with accurate, well-formatted state.
## Testing Strategy Heuristics
1. **Golden Tests:** Used to verify *formatting* and *structural stability*. Whenever we want to answer the question, "What exact JSON/Content array will the Gemini API receive?", we use a golden test.
2. **Component Tests:** Used to verify *logic, system invariants, and concurrency*. These tests instantiate multiple classes (e.g., `ContextManager` + `AgentChatHistory` + `Workers`) and use mocks for the network (LLM) and timers. This is our primary defense against race conditions.
3. **Unit Tests:** Used to verify *isolated, complex algorithms* and *corner cases*. If a method involves math, string manipulation, or complex isolated logic (like iterating and skipping nodes), it gets a unit test.
---
## 1. ContextManager (The Orchestrator)
The `ContextManager` sits at the center, maintaining the View and enforcing the Synchronous Pressure Barrier.
### Component Tests
* **[COMPLETED] Race Condition: User Typing:** The user pushes new history while a background snapshot is computing. Ensure the snapshot is applied to older nodes, but the new nodes are preserved at the tail.
* **[COMPLETED] Async GC Trigger:** Pushing history past `retainedTokens` triggers the `StateSnapshotWorker` without blocking the main thread.
* **Sync Barrier - Truncate Strategy:** If history is pushed past `maxTokens` (e.g., pasting a massive file) and `maxPressureStrategy = 'truncate'`, the `projectCompressedHistory` method must instantly drop the oldest unprotected episodes until the budget is satisfied.
* **Sync Barrier - Compress Strategy:** If history is pushed past `maxTokens` and strategy is `compress`, it blocks and falls back (or synchronously calls the worker).
* **Protection Boundaries:** Ensure the System Prompt (Episode 0) and the Latest Turn (working context) are never dropped or heavily compressed, even under severe max token pressure.
### Unit Tests
* **The View Generator Sweep (`getWorkingBufferView`):**
* Test N-to-1 Replacement: If a snapshot covers IDs `[A, B, C]`, ensure all three are completely omitted from the resulting array and replaced by the single snapshot node.
* Test Priority: If an episode has both a `snapshot` and a `summary` variant ready, ensure `snapshot` wins.
---
## 2. IrMapper (The Translation Layer)
The `IrMapper` translates flat `Content[]` arrays into the pristine `Episode[]` graph, and vice-versa.
### Golden Tests
* **Multi-turn Flattening:** Ensure that `prompt -> thought -> toolCall -> toolResponse -> thought -> yield` translates back into a perfectly ordered `Content[]` array that the Gemini API accepts.
### Unit Tests
* **WeakMap Node Pinning (ID Stability):** Call `toIr(history)` -> get Episode IDs. Push one more message to history. Call `toIr(history)` again. Assert that the IDs of the older episodes are identical (proving the `WeakMap` successfully pinned the reference). This is critical; if this fails, async variants will orphan.
* **Token Estimation Integration:** Verify that `metadata.currentTokens` and `metadata.originalTokens` are populated accurately during mapping.
---
## 3. Async Workers (The Subconscious)
Workers listen to the Event Bus, do heavy LLM lifting, and emit ready variants.
### Component Tests
* **StateSnapshotWorker Batching:** When triggered, it should gather the *oldest unprotected* episodes. It must stop gathering once `tokensToSynthesize >= targetDeficit`.
* **StateSnapshotWorker Telemetry & Role:** Ensure the API call is dispatched using the `gemini-2.5-flash` model and the `LlmRole.UTILITY_COMPRESSOR` role.
* **(Future) AsyncSemanticCompressor:** Testing the eager-compute summarization of large files.
---
## 4. Sync Processors (The Fallback / Bloom Filter)
Processors execute synchronously during `projectCompressedHistory` if the background workers haven't caught up, or to squash tokens down to the `retainedTokens` floor.
### Unit Tests
* **HistorySquashingProcessor:** Test proportional truncation math. If an episode is 10k tokens and the budget demands saving 5k tokens, ensure the text is sliced cleanly without breaking formatting.
* **ToolMaskingProcessor:** Verify the leaf-node deep JSON truncation logic. Ensure deeply nested massive arrays are masked (`"[1000 items hidden]"`), but the outer schema remains valid JSON.
* **SemanticCompressionProcessor:** Verify it skips episodes that already have `summary` variants in the View.
### Golden Tests
* **Masked Output Shapes:** Verify the visual presentation of a squashed or masked node (e.g., ensuring `[System: Truncated...]` headers are formatted nicely).
@@ -0,0 +1,97 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { vi } from 'vitest';
import type { Config } from '../../config/config.js';
import type { GeminiClient } from '../../core/client.js';
import type { Content } from '@google/genai';
import { AgentChatHistory } from '../../core/agentChatHistory.js';
import { ContextManager } from '../contextManager.js';
/**
* Creates a block of synthetic conversation history designed to consume a specific number of tokens.
* Assumes roughly 4 characters per token for standard English text.
*/
export function createSyntheticHistory(
numTurns: number,
tokensPerTurn: number,
): Content[] {
const history: Content[] = [];
const charsPerTurn = tokensPerTurn * 4;
for (let i = 0; i < numTurns; i++) {
history.push({
role: 'user',
parts: [{ text: `User turn ${i}. ` + 'A'.repeat(charsPerTurn) }],
});
history.push({
role: 'model',
parts: [{ text: `Model response ${i}. ` + 'B'.repeat(charsPerTurn) }],
});
}
return history;
}
/**
* Creates a fully mocked Config object tailored for Context Component testing.
*/
export function createMockContextConfig(
overrides?: Record<string, unknown>,
llmClientOverride?: unknown,
): Config {
const defaultConfig = {
isContextManagementEnabled: vi.fn().mockReturnValue(true),
getContextManagementConfig: vi.fn().mockReturnValue({
enabled: true,
strategies: {
historySquashing: { maxTokensPerNode: 3000 },
toolMasking: { stringLengthThresholdTokens: 10000 },
semanticCompression: {
nodeThresholdTokens: 5000,
compressionModel: 'gemini-2.5-flash',
},
},
budget: {
maxTokens: 150000,
retainedTokens: 65000,
protectedEpisodes: 1,
protectSystemEpisode: true,
maxPressureStrategy: 'truncate',
},
}),
getBaseLlmClient: vi.fn().mockReturnValue(
llmClientOverride || {
generateContent: vi.fn().mockResolvedValue({
text: '<mocked_snapshot>Synthesized state</mocked_snapshot>',
}),
},
),
getUsageStatisticsEnabled: vi.fn().mockReturnValue(false),
getTargetDir: vi.fn().mockReturnValue('/tmp'),
getSessionId: vi.fn().mockReturnValue('test-session'),
};
return { ...defaultConfig, ...overrides } as unknown as Config;
}
/**
* Wires up a full ContextManager component with an AgentChatHistory and active background workers.
*/
export function setupContextComponentTest(config: Config) {
const chatHistory = new AgentChatHistory();
const contextManager = new ContextManager(
config,
config.getBaseLlmClient() as unknown as GeminiClient,
);
// The async worker is now internally managed by ContextManager
// Subscribe to history to enable the Eager/Opportunistic triggers
contextManager.subscribeToHistory(chatHistory);
return { chatHistory, contextManager };
}
+58
View File
@@ -0,0 +1,58 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import * as fs from 'node:fs';
import * as path from 'node:path';
import { randomUUID } from 'node:crypto';
export class ContextTracer {
private traceDir: string;
private assetsDir: string;
private enabled: boolean;
constructor(targetDir: string, sessionId: string) {
this.enabled = process.env['GEMINI_CONTEXT_TRACE'] === 'true';
this.traceDir = path.join(targetDir, '.gemini', 'context_trace', sessionId);
this.assetsDir = path.join(this.traceDir, 'assets');
if (this.enabled) {
try {
fs.mkdirSync(this.assetsDir, { recursive: true });
this.logEvent('SYSTEM', 'Context Tracer Initialized', { sessionId });
} catch (e) {
console.error('Failed to initialize ContextTracer', e);
this.enabled = false;
}
}
}
logEvent(component: string, action: string, details?: Record<string, unknown>) {
if (!this.enabled) return;
try {
const timestamp = new Date().toISOString();
const detailsStr = details ? ` | Details: ${JSON.stringify(details)}` : '';
const logLine = `[${timestamp}] [${component}] ${action}${detailsStr}\n`;
fs.appendFileSync(path.join(this.traceDir, 'trace.log'), logLine, 'utf-8');
} catch (e) {
// fail silently in trace
}
}
saveAsset(component: string, assetName: string, data: unknown): string {
if (!this.enabled) return 'asset-recording-disabled';
try {
const assetId = `${Date.now()}-${randomUUID().slice(0, 6)}-${assetName}.json`;
const assetPath = path.join(this.assetsDir, assetId);
fs.writeFileSync(assetPath, JSON.stringify(data, null, 2), 'utf-8');
this.logEvent(component, `Saved asset: ${assetName}`, { assetId });
return assetId;
} catch (e) {
this.logEvent(component, `Failed to save asset: ${assetName}`, { error: String(e) });
return 'asset-save-failed';
}
}
}
@@ -16,16 +16,19 @@ import { debugLogger } from '../../utils/debugLogger.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
import { IrMapper } from '../ir/mapper.js';
import { LlmRole } from '../../telemetry/llmRole.js';
import type { ContextTracer } from '../tracer.js';
export class StateSnapshotWorker implements AsyncContextWorker {
name = 'StateSnapshotWorker';
private bus?: ContextEventBus;
private tracer?: ContextTracer;
private isSynthesizing = false;
constructor(private readonly _config: Config) {}
start(bus: ContextEventBus): void {
start(bus: ContextEventBus, tracer?: ContextTracer): void {
this.bus = bus;
this.tracer = tracer;
this.bus.onConsolidationNeeded(this.handleConsolidation.bind(this));
}
@@ -74,9 +77,12 @@ export class StateSnapshotWorker implements AsyncContextWorker {
debugLogger.log(
`StateSnapshotWorker: Asynchronously synthesizing ${episodesToSynthesize.length} episodes to recover ~${tokensToSynthesize} tokens.`,
);
this.tracer?.logEvent('StateSnapshotWorker', `Consolidation requested. Synthesizing ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`);
const client = this._config.getBaseLlmClient();
const rawContents = IrMapper.fromIr(episodesToSynthesize);
const rawAssetId = this.tracer?.saveAsset('StateSnapshotWorker', 'episodes_to_synthesize', rawContents);
this.tracer?.logEvent('StateSnapshotWorker', 'Dispatching LLM request for snapshot generation', { rawAssetId });
const promptText = `
You are a background memory consolidation worker for an AI assistant.
@@ -98,10 +104,16 @@ Output the snapshot as a dense, structured summary.`;
});
// Extract text safely from the GenAI response
const snapshotText = response.text || '[Failed to generate snapshot]';
const snapshotText = response.text;
const responseAssetId = this.tracer?.saveAsset('StateSnapshotWorker', 'snapshot_response', snapshotText || '');
this.tracer?.logEvent('StateSnapshotWorker', 'Received LLM response', { responseAssetId });
if (!snapshotText) {
debugLogger.warn('StateSnapshotWorker: LLM returned empty response for snapshot generation.');
}
const mockSnapshotText = `
<world_state_snapshot>
${snapshotText}
${snapshotText || '[Failed to generate snapshot]'}
</world_state_snapshot>`;
const snapshotTokens = estimateTokenCountSync([
@@ -160,12 +172,17 @@ ${snapshotText}
const targetId = replacedEpisodeIds[replacedEpisodeIds.length - 1];
if (this.bus) {
this.tracer?.logEvent('StateSnapshotWorker', `Emitting VARIANT_READY for targetId [${targetId}]`);
this.bus.emitVariantReady({
targetId,
variantId: 'snapshot',
variant,
});
} else {
debugLogger.warn('StateSnapshotWorker: Event bus disconnected before variant could be emitted.');
}
} catch (error) {
debugLogger.error(`StateSnapshotWorker: Critical failure during snapshot synthesis: ${error instanceof Error ? error.message : String(error)}`);
} finally {
this.isSynthesizing = false;
}
+5 -1
View File
@@ -284,7 +284,7 @@ describe('Gemini Client (client.ts)', () => {
setActiveModel: vi.fn(),
resetTurn: vi.fn(),
isAutoDistillationEnabled: vi.fn().mockReturnValue(false),
isContextManagementEnabled: vi.fn().mockReturnValue(false),
getContextManagementConfig: vi.fn().mockReturnValue({ enabled: false }),
getModelAvailabilityService: vi
.fn()
@@ -2130,6 +2130,8 @@ ${JSON.stringify(
initialRequest,
expect.any(AbortSignal),
undefined,
'main',
[]
);
// Second call with "Please continue."
@@ -2139,6 +2141,8 @@ ${JSON.stringify(
[{ text: 'System: Please continue.' }],
expect.any(AbortSignal),
undefined,
'main',
[]
);
});
+1 -11
View File
@@ -45,10 +45,6 @@ import type { ContentGenerator } from './contentGenerator.js';
import { LoopDetectionService } from '../services/loopDetectionService.js';
import { ChatCompressionService } from '../context/chatCompressionService.js';
import { ContextManager } from '../context/contextManager.js';
import { ToolMaskingProcessor } from '../context/processors/toolMaskingProcessor.js';
import { HistorySquashingProcessor } from '../context/processors/historySquashingProcessor.js';
import { BlobDegradationProcessor } from '../context/processors/blobDegradationProcessor.js';
import { SemanticCompressionProcessor } from '../context/processors/semanticCompressionProcessor.js';
import { ideContextStore } from '../ide/ideContext.js';
import {
logContentRetryFailure,
@@ -118,13 +114,6 @@ export class GeminiClient {
this.compressionService = new ChatCompressionService();
this.contextManager = new ContextManager(this.config, this);
// Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback
this.contextManager.setProcessors([
new ToolMaskingProcessor(this.config),
new BlobDegradationProcessor(this.config),
new SemanticCompressionProcessor(this.config),
new HistorySquashingProcessor(this.config),
]);
this.toolOutputMaskingService = new ToolOutputMaskingService();
this.lastPromptId = this.config.getSessionId();
@@ -329,6 +318,7 @@ export class GeminiClient {
dispose() {
coreEvents.off(CoreEvent.ModelChanged, this.handleModelChanged);
coreEvents.off(CoreEvent.MemoryChanged, this.handleMemoryChanged);
this.contextManager.shutdown();
}
async resumeChat(
+1
View File
@@ -109,6 +109,7 @@ describe('Turn', () => {
expect.any(AbortSignal),
LlmRole.MAIN,
undefined,
undefined
);
expect(events).toEqual([