more config changes

This commit is contained in:
Your Name
2026-04-06 02:22:31 +00:00
parent e3efb5e5ef
commit 2ee2399953
25 changed files with 613 additions and 256 deletions
View File
View File
@@ -1,131 +1,94 @@
import { IrMapper } from './ir/mapper.js';
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { describe, it, expect } from 'vitest';
import {
createSyntheticHistory,
createMockContextConfig,
setupContextComponentTest,
} from './testing/contextTestUtils.js';
describe('ContextManager Concurrency Component Tests', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it('should asynchronously compress history when retainedTokens is crossed, without blocking projection', async () => {
// 1. Setup with a delayed LLM client to simulate async work
let resolveLlm: (val: any) => void;
const llmPromise = new Promise((res) => {
resolveLlm = res;
});
const llmClientOverride = {
generateContent: vi.fn().mockImplementation(() => llmPromise),
};
const config = createMockContextConfig({}, llmClientOverride);
describe('ContextManager Barrier Tests', () => {
it('Soft Barrier (retainedTokens): should inject ready variants and shrink projection', async () => {
const config = createMockContextConfig();
const { chatHistory, contextManager } = setupContextComponentTest(config);
// 2. Add System Prompt (Episode 0 - Protected)
chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] });
// 3. Add heavy history that crosses the 65k retained floor but stays under 150k max.
// 10 turns * 8000 tokens/turn = 80,000 tokens (approx)
const heavyHistory = createSyntheticHistory(10, 4000);
for (const msg of heavyHistory) {
chatHistory.push(msg);
}
// 4. Verify Immediate Projection (The async worker is stuck waiting for the LLM)
// The projection should NOT block. It should return the full history because we are under maxTokens.
const earlyProjection = await contextManager.projectCompressedHistory();
expect(earlyProjection.length).toBe(chatHistory.get().length);
// 5. Unblock the LLM and allow async events to flush
resolveLlm!({
text: '<mocked_snapshot>Synthesized old episodes</mocked_snapshot>',
});
// 1. Shrink limits: 1 char = 1 token. RetainedTokens = 10. MaxTokens = 100.
IrMapper.setConfig({ charsPerToken: 1 });
// We need to flush the microtask queue so the Promise resolves and the EventBus ticks
await vi.runAllTimersAsync();
contextManager['sidecar'].budget.retainedTokens = 5;
contextManager['sidecar'].budget.maxTokens = 100;
// 6. Verify Post-Compression Projection
// The WorkingBufferView should now automatically inject the SnapshotVariant, shrinking the array.
const lateProjection = await contextManager.projectCompressedHistory();
expect(lateProjection.length).toBeLessThan(earlyProjection.length);
// 2. Build tiny history: 5 turns (10 messages). 2 tokens per turn.
const tinyHistory = [];
for (let i = 0; i < 5; i++) {
tinyHistory.push({ role: 'user', parts: [{ text: `U${i}` }] });
tinyHistory.push({ role: 'model', parts: [{ text: `M${i}` }] });
}
// Set history directly to avoid event races
await chatHistory.set(tinyHistory);
// Verify the snapshot text actually made it into the stream
const hasSnapshotText = lateProjection.some(
(msg) =>
msg.role === 'model' &&
msg.parts!.some(
(p) =>
p.text && p.text.includes('<mocked_snapshot>Synthesized old episodes</mocked_snapshot>'),
),
);
expect(hasSnapshotText).toBe(true);
});
// 3. Pre-verify baseline length.
const baseline = await contextManager.projectCompressedHistory();
expect(baseline.length).toBe(10);
it('should handle the Race Condition: User pushing messages while a background snapshot is computing', async () => {
let resolveLlm: (val: any) => void;
const llmPromise = new Promise((res) => {
resolveLlm = res;
// 4. Emit a fake snapshot covering the first 3 pairs (6 messages)
const targetEp = contextManager['pristineEpisodes'][2];
const replacedIds = contextManager['pristineEpisodes'].slice(0, 3).map(ep => ep.id);
contextManager['eventBus'].emitVariantReady({
targetId: targetEp.id,
variantId: 'snapshot',
variant: {
status: 'ready',
type: 'snapshot',
replacedEpisodeIds: replacedIds,
episode: {
id: 'snapshot-ep',
timestamp: Date.now(),
trigger: { id: 't1', type: 'USER_PROMPT', semanticParts: [], metadata: { originalTokens: 0, currentTokens: 0, transformations: [] } },
yield: { id: 'y1', type: 'AGENT_YIELD', text: '<SNAP>', metadata: { originalTokens: 5, currentTokens: 5, transformations: [] } },
steps: []
}
}
});
const llmClientOverride = {
generateContent: vi.fn().mockImplementation(() => llmPromise),
};
// 5. Verify Projection shrinks: 6 original messages replaced by 1 snapshot episode (1 text part) -> length 5.
const projection = await contextManager.projectCompressedHistory();
expect(projection.length).toBe(5);
// console.dir(projection, {depth: null});
// projection[0] should be the snapshot yield
expect(projection[0].parts![0].text).toBe('<SNAP>');
});
const config = createMockContextConfig({}, llmClientOverride);
it('Hard Barrier (maxTokens): should ruthlessly truncate unprotected episodes', async () => {
const config = createMockContextConfig();
const { chatHistory, contextManager } = setupContextComponentTest(config);
chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] });
// 1. Shrink limits: maxTokens = 15.
IrMapper.setConfig({ charsPerToken: 1 });
contextManager['sidecar'].budget.maxTokens = 15;
// Push 80k tokens to trigger compression of older nodes
const heavyHistory = createSyntheticHistory(10, 4000);
for (const msg of heavyHistory) {
chatHistory.push(msg);
}
// 2. Build history: 2 turns. Total = 24 tokens.
const history = [
{ role: 'user', parts: [{ text: 'U0' }] },
{ role: 'model', parts: [{ text: 'M0_LARGE!!' }] },
{ role: 'user', parts: [{ text: 'U1' }] },
{ role: 'model', parts: [{ text: 'M1_LARGE!!' }] }
];
await chatHistory.set(history);
// At this exact moment, the StateSnapshotWorker has grabbed the oldest episodes
// and is waiting for `llmPromise`.
// THE RACE: The user types two more messages very quickly BEFORE the LLM returns.
chatHistory.push({ role: 'user', parts: [{ text: 'Oh, one more thing!' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'I am listening.' }] });
// Unblock the LLM
resolveLlm!({ text: 'Dense Snapshot Data' });
await vi.runAllTimersAsync();
// Verify
const projection = await contextManager.projectCompressedHistory();
// The snapshot should be present (replacing old history)
const hasSnapshot = projection.some((msg) =>
msg.parts!.some((p) => p.text?.includes('Dense Snapshot Data'))
);
expect(hasSnapshot).toBe(true);
// CRITICAL: The new messages typed during the race must ALSO be present and unmodified at the end of the array.
const lastUserMsg = projection[projection.length - 2];
const lastModelMsg = projection[projection.length - 1];
expect(lastUserMsg.role).toBe('user');
expect(lastUserMsg.parts![0].text).toBe('Oh, one more thing!');
expect(lastModelMsg.role).toBe('model');
expect(lastModelMsg.parts![0].text).toBe('I am listening.');
// Because Turn 0 is architecturally protected (system prompt/initialization), it SURVIVES!
// Turn 1 is dropped to satisfy the maxTokens constraint.
expect(projection.length).toBe(2);
expect(projection[0].parts![0].text).toBe('U0');
expect(projection[1].parts![0].text).toBe('M0_LARGE!!');
});
});
@@ -4,7 +4,9 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { IrMapper } from './ir/mapper.js';
import {
createSyntheticHistory,
createMockContextConfig,
@@ -27,30 +29,31 @@ describe('ContextManager Sync Pressure Barrier Tests', () => {
const { chatHistory, contextManager } = setupContextComponentTest(config);
// 2. Add System Prompt (Episode 0 - Protected)
chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] });
chatHistory.set([{ role: 'user', parts: [{ text: 'System prompt' }] }, { role: 'model', parts: [{ text: 'Understood.' }] }]);
// 3. Add massive history that blows past the 150k maxTokens limit
// 20 turns * 10,000 tokens/turn = ~200,000 tokens
const massiveHistory = createSyntheticHistory(20, 10000);
for (const msg of massiveHistory) {
chatHistory.push(msg);
}
const massiveHistory = createSyntheticHistory(20, 35000);
chatHistory.set([...chatHistory.get(), ...massiveHistory]);
// 4. Add the Latest Turn (Protected)
chatHistory.push({ role: 'user', parts: [{ text: 'Final question.' }] });
chatHistory.push({ role: 'model', parts: [{ text: 'Final answer.' }] });
chatHistory.set([...chatHistory.get(), { role: 'user', parts: [{ text: 'Final question.' }] }, { role: 'model', parts: [{ text: 'Final answer.' }] }]);
const rawHistoryLength = chatHistory.get().length;
IrMapper.setConfig({ charsPerToken: 1 });
// 5. Project History (Triggers Sync Barrier)
const projection = await contextManager.projectCompressedHistory();
// 6. Assertions
// The barrier should have dropped several older episodes to get under 150k.
expect(projection.length).toBeLessThan(rawHistoryLength);
// Verify Episode 0 (System) is perfectly preserved at the front
expect(projection[0].role).toBe('user');
expect(projection[0].parts![0].text).toBe('System prompt');
@@ -14,8 +14,11 @@ import {
afterAll,
} from 'vitest';
import { ContextManager } from './contextManager.js';
import type { Config } from '../config/config.js';
import type { GeminiClient } from '../core/client.js';
import { ContextEnvironmentImpl } from './sidecar/environmentImpl.js';
import { SidecarLoader } from './sidecar/SidecarLoader.js';
import { ContextTracer } from './tracer.js';
import type { Content } from '@google/genai';
expect.addSnapshotSerializer({
@@ -95,10 +98,10 @@ describe('ContextManager Golden Tests', () => {
}),
};
contextManager = new ContextManager(
mockConfig as Config,
{} as unknown as GeminiClient,
);
const sidecar = SidecarLoader.fromLegacyConfig(mockConfig as any);
const tracer = new ContextTracer('/tmp', 'test-session');
const env = new ContextEnvironmentImpl({} as any, 'test', '/tmp', '/tmp', tracer, 4);
contextManager = new ContextManager(sidecar, env, tracer);
});
@@ -178,7 +181,26 @@ describe('ContextManager Golden Tests', () => {
).IrMapper.toIr(history);
// In Golden Tests, we just want to ensure the logic doesn't throw or alter unprotected history in weird ways.
// Since we're skipping processors due to being under budget, it should equal history.
mockConfig.getContextManagementConfig.mockReturnValue({
strategies: {
historySquashing: { maxTokensPerNode: 3000 },
toolMasking: { stringLengthThresholdTokens: 10000 },
semanticCompression: {
nodeThresholdTokens: 5000,
},
},
budget: {
maxTokens: 15000000,
retainedTokens: 50000,
},
gcBackstop: { target: 'incremental', strategy: 'truncate' },
});
const tracer2 = new ContextTracer('/tmp', 'test2');
contextManager = new ContextManager({ pipelines: { eagerBackground: [], normalProcessingGraph: [], retainedProcessingGraph: [] } } as any, {} as any, tracer2);
(contextManager as any).pristineEpisodes = (await import('./ir/mapper.js')).IrMapper.toIr(history);
const result = await contextManager.projectCompressedHistory();
expect(result.length).toEqual(history.length);
});
});
+136 -24
View File
@@ -4,8 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import type { Content } from '@google/genai';
import type { Config } from '../config/config.js';
import type { GeminiClient } from '../core/client.js';
import type { AgentChatHistory } from '../core/agentChatHistory.js';
import { debugLogger } from '../utils/debugLogger.js';
import { IrMapper } from './ir/mapper.js';
@@ -16,26 +16,51 @@ import { ContextTracer } from './tracer.js';
import { StateSnapshotWorker } from './workers/stateSnapshotWorker.js';
import type { ContextEnvironment } from './sidecar/environment.js';
import type { SidecarConfig } from './sidecar/types.js';
import { ProcessorRegistry } from './sidecar/registry.js';
import type { ContextProcessor } from './pipeline.js';
import type { AsyncContextWorker } from './workers/asyncContextWorker.js';
import { ToolMaskingProcessor } from './processors/toolMaskingProcessor.js';
import { BlobDegradationProcessor } from './processors/blobDegradationProcessor.js';
import { SemanticCompressionProcessor } from './processors/semanticCompressionProcessor.js';
import { HistorySquashingProcessor } from './processors/historySquashingProcessor.js';
export class ContextManager {
private config: Config;
// The stateful, pristine Episodic Intermediate Representation graph.
// This allows the agent to remember and summarize continuously without losing data across turns.
private pristineEpisodes: Episode[] = [];
private unsubscribeHistory?: () => void;
private readonly eventBus: ContextEventBus;
private readonly tracer: ContextTracer;
// Internal sub-components
// Synchronous processors are instantiated but effectively used as singletons within this class
private workers: StateSnapshotWorker[] = [];
private workers: AsyncContextWorker[] = [];
constructor(config: Config, _client: GeminiClient) {
this.config = config;
constructor(private sidecar: SidecarConfig, private env: ContextEnvironment, private readonly tracer: ContextTracer) {
this.eventBus = new ContextEventBus();
this.tracer = new ContextTracer(config.getTargetDir(), config.getSessionId());
// Register built-ins
ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) });
ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'StateSnapshotWorker', create: (env, opts) => new StateSnapshotWorker(env) });
this.eventBus.onVariantReady((event) => {
// Find the target episode in the pristine graph
const targetEp = this.pristineEpisodes.find(
(ep) => ep.id === event.targetId,
@@ -56,9 +81,11 @@ export class ContextManager {
// Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback
// Initialize and start background subconscious workers
const snapshotWorker = new StateSnapshotWorker(this.config);
snapshotWorker.start(this.eventBus, this.tracer);
this.workers.push(snapshotWorker);
for (const bgDef of this.sidecar.pipelines.eagerBackground) {
const worker = ProcessorRegistry.get(bgDef.processorId).create(this.env, bgDef.options) as AsyncContextWorker;
worker.start(this.eventBus);
this.workers.push(worker);
}
}
/**
@@ -94,14 +121,15 @@ export class ContextManager {
}
private checkTriggers() {
if (!this.config.isContextManagementEnabled()) return;
if (!this.sidecar.budget) return;
const mngConfig = this.config.getContextManagementConfig();
const mngConfig = this.sidecar;
// Calculate tokens based on the *Working Buffer View*, not the raw pristine log.
// This solves Bug 2: The View shrinks when variants are applied, preventing infinite GC loops.
const workingBuffer = this.getWorkingBufferView();
const currentTokens = this.calculateIrTokens(workingBuffer);
this.tracer.logEvent('ContextManager', 'Evaluated triggers', { currentTokens, retainedTokens: mngConfig.budget.retainedTokens });
// 1. Eager Compute Trigger (Continuous Streaming)
@@ -113,7 +141,9 @@ export class ContextManager {
if (currentTokens > mngConfig.budget.retainedTokens) {
const deficit = currentTokens - mngConfig.budget.retainedTokens;
this.tracer.logEvent('ContextManager', 'Budget crossed. Emitting ConsolidationNeeded', { deficit });
console.log('EMITTING CONSOLIDATION. Buffer:', workingBuffer.length, 'Deficit:', deficit);
this.eventBus.emitConsolidationNeeded({
episodes: workingBuffer, // Pass the working buffer so they know what still needs compression
targetDeficit: deficit,
});
@@ -127,8 +157,74 @@ export class ContextManager {
* (snapshot > summary > masked) instead of the raw text.
* Handles N-to-1 variant skipping automatically.
*/
/**
* Applies the data-driven Sidecar configuration graphs.
* Splits the episodes into the 'retained' and 'normal' ranges,
* runs their respective processor pipelines sequentially, and recombines them.
*/
private async applyProcessorGraphs(episodes: Episode[]): Promise<Episode[]> {
const mngConfig = this.sidecar;
const retainedLimit = mngConfig.budget.retainedTokens;
// If we're incredibly small, maybe we just run the retained graph on everything?
// Let's divide the episodes exactly at the retained boundary.
const retainedWindow: Episode[] = [];
const normalWindow: Episode[] = [];
let rollingTokens = 0;
// Scan backwards to fill the retained window
for (let i = episodes.length - 1; i >= 0; i--) {
const ep = episodes[i];
const epTokens = this.calculateIrTokens([ep]);
if ((rollingTokens + epTokens <= retainedLimit && normalWindow.length === 0) || retainedWindow.length === 0) {
// We always put at least the latest episode in the retained window.
// We only add to retainedWindow if we haven't already started the normalWindow (contiguous block).
retainedWindow.unshift(ep);
rollingTokens += epTokens;
} else {
normalWindow.unshift(ep);
}
}
const protectedIds = new Set<string>();
// We must protect the System Episode, which is always index 0 of pristineEpisodes.
if (this.pristineEpisodes.length > 0) {
protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant
}
const createAccountingState = (currentTotal: number) => ({
currentTokens: currentTotal,
maxTokens: mngConfig.budget.maxTokens,
retainedTokens: mngConfig.budget.retainedTokens,
deficitTokens: Math.max(0, currentTotal - mngConfig.budget.maxTokens),
protectedEpisodeIds: protectedIds,
isBudgetSatisfied: currentTotal <= mngConfig.budget.maxTokens, // We use maxTokens here so processors don't prematurely short-circuit if they are trying to prevent a barrier hit
});
// Run Retained Graph
let processedRetained = [...retainedWindow];
for (const def of mngConfig.pipelines.retainedProcessingGraph) {
const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor;
this.tracer.logEvent('ContextManager', `Running ${processor.name} on retained window.`);
const state = createAccountingState(this.calculateIrTokens([...normalWindow, ...processedRetained]));
processedRetained = await processor.process(processedRetained, state);
}
// Run Normal Graph
let processedNormal = [...normalWindow];
for (const def of mngConfig.pipelines.normalProcessingGraph) {
const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor;
this.tracer.logEvent('ContextManager', `Running ${processor.name} on normal window.`);
const state = createAccountingState(this.calculateIrTokens([...processedNormal, ...processedRetained]));
processedNormal = await processor.process(processedNormal, state);
}
return [...processedNormal, ...processedRetained];
}
public getWorkingBufferView(): Episode[] {
const mngConfig = this.config.getContextManagementConfig();
const mngConfig = this.sidecar;
const retainedTokens = mngConfig.budget.retainedTokens;
let currentEpisodes: Episode[] = [];
@@ -182,7 +278,9 @@ export class ContextManager {
const epTokens = this.calculateIrTokens([projectedEp]);
if (ep.variants) { console.log('Checking variants for', ep.id, 'rollingTokens:', rollingTokens, 'retained:', retainedTokens); }
if (rollingTokens > retainedTokens && ep.variants) {
console.log('EVALUATING VARIANTS FOR', ep.id);
const snapshot = ep.variants['snapshot'];
const summary = ep.variants['summary'];
const masked = ep.variants['masked'];
@@ -254,6 +352,7 @@ export class ContextManager {
rollingTokens += this.calculateIrTokens([projectedEp]);
}
return currentEpisodes;
}
@@ -262,56 +361,67 @@ export class ContextManager {
* This does NOT mutate the pristine episodic graph.
*/
async projectCompressedHistory(): Promise<Content[]> {
if (!this.config.isContextManagementEnabled()) {
if (!this.sidecar.budget) {
return this._projectAndDump(IrMapper.fromIr(this.pristineEpisodes));
}
const mngConfig = this.config.getContextManagementConfig();
const mngConfig = this.sidecar;
const maxTokens = mngConfig.budget.maxTokens;
this.tracer.logEvent('ContextManager', 'Projection requested.');
// Get the dynamically computed Working Buffer View
let currentEpisodes = this.getWorkingBufferView();
currentEpisodes = await this.applyProcessorGraphs(currentEpisodes);
let currentTokens = this.calculateIrTokens(currentEpisodes);
if (currentTokens <= maxTokens) {
this.tracer.logEvent('ContextManager', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`);
return this._projectAndDump(IrMapper.fromIr(currentEpisodes));
}
this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.budget.maxPressureStrategy}`);
this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.gcBackstop.strategy}`);
// --- The Synchronous Pressure Barrier ---
// The background eager workers couldn't keep up, or a massive file was pasted.
// The Working Buffer View is still over the absolute hard limit (maxTokens).
// We MUST reduce tokens before returning, or the API request will 400.
debugLogger.log(
`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.budget.maxPressureStrategy}`,
`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.gcBackstop.strategy}`,
);
// Calculate target based on gcTarget
let targetTokens = maxTokens;
if (mngConfig.budget.gcTarget === 'max') {
if (mngConfig.gcBackstop.target === 'max') {
targetTokens = mngConfig.budget.retainedTokens;
} else if (mngConfig.budget.gcTarget === 'freeNTokens') {
targetTokens = maxTokens - (mngConfig.budget.freeTokensTarget ?? 10000);
} else if (mngConfig.gcBackstop.target === 'freeNTokens') {
targetTokens = maxTokens - (mngConfig.gcBackstop.freeTokensTarget ?? 10000);
}
// Structural invariant: We ALWAYS protect the architectural initialization turn (Turn 0)
// We do NOT arbitrarily protect recent episodes (like currentEpisodes.length - 1)
// because an episode can be unboundedly large, and protecting it would crash the LLM.
const protectedEpisodeId = currentEpisodes.length > 0 ? currentEpisodes[0].id : null;
const protectedEpisodeId = this.pristineEpisodes.length > 0 ? this.pristineEpisodes[0].id : null;
let remainingTokens = currentTokens;
const truncated: Episode[] = [];
const strategy = mngConfig.budget.maxPressureStrategy;
const strategy = mngConfig.gcBackstop.strategy;
for (const ep of currentEpisodes) {
const epTokens = this.calculateIrTokens([ep]);
if (remainingTokens > targetTokens && ep.id !== protectedEpisodeId) {
console.log('DROPPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens);
remainingTokens -= epTokens;
if (strategy === 'truncate') {
this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`);
debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`);
} else if (strategy === 'compress') {
this.tracer.logEvent('Barrier', `Compress fallback to truncate for [${ep.id}].`);
@@ -321,7 +431,9 @@ export class ContextManager {
debugLogger.warn(`Synchronous rollingSummarizer barrier not fully implemented, truncating Episode ${ep.id}.`);
}
} else {
console.log('KEEPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens);
truncated.push(ep);
}
}
currentEpisodes = truncated;
@@ -340,7 +452,7 @@ export class ContextManager {
try {
const fs = await import('node:fs/promises');
const path = await import('node:path');
const dumpPath = path.join(this.config.getTargetDir(), '.gemini', 'projected_context.json');
const dumpPath = path.join(this.env.getTraceDir(), '.gemini', 'projected_context.json');
await fs.mkdir(path.dirname(dumpPath), { recursive: true });
await fs.writeFile(dumpPath, JSON.stringify(contents, null, 2), 'utf-8');
debugLogger.log(`[Observability] Context successfully dumped to ${dumpPath}`);
+7 -2
View File
@@ -15,7 +15,7 @@ import type {
AgentYield,
UserPrompt,
} from './types.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js';
// WeakMap to provide stable, deterministic identity across parses for the exact same Content/Part references
const nodeIdentityMap = new WeakMap<object, string>();
@@ -30,6 +30,11 @@ function getStableId(obj: object): string {
}
export class IrMapper {
static setConfig(cfg: { charsPerToken?: number }) {
this.config = cfg;
}
private static config: { charsPerToken?: number } | undefined;
/**
* Translates a flat Gemini Content[] array into our rich Episodic Intermediate Representation.
* Groups adjacent function calls and responses into unified ToolExecution nodes.
@@ -40,7 +45,7 @@ export class IrMapper {
const pendingCallParts: Map<string, Part> = new Map();
const createMetadata = (parts: Part[]): IrMetadata => {
const tokens = estimateTokenCountSync(parts);
const tokens = estimateTokenCountSync(parts, 0, IrMapper.config);
return {
originalTokens: tokens,
currentTokens: tokens,
@@ -3,9 +3,9 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createMockEnvironment } from '../testing/contextTestUtils.js';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { BlobDegradationProcessor } from './blobDegradationProcessor.js';
import type { Config } from '../../config/config.js';
import type { Episode, UserPrompt } from '../ir/types.js';
import type { ContextAccountingState } from '../pipeline.js';
import { randomUUID } from 'node:crypto';
@@ -14,19 +14,13 @@ import * as fsPromises from 'node:fs/promises';
vi.mock('node:fs/promises');
describe('BlobDegradationProcessor', () => {
let mockConfig: Config;
let processor: BlobDegradationProcessor;
beforeEach(() => {
vi.resetAllMocks();
mockConfig = {
storage: {
getProjectTempDir: vi.fn().mockReturnValue('/tmp/gemini'),
},
getSessionId: vi.fn().mockReturnValue('test-session'),
} as unknown as Config;
processor = new BlobDegradationProcessor(mockConfig);
processor = new BlobDegradationProcessor(createMockEnvironment());
});
const getDummyState = (
@@ -5,8 +5,8 @@
*/
import type { Episode } from '../ir/types.js';
import type { ContextAccountingState, ContextProcessor } from '../pipeline.js';
import type { Config } from '../../config/config.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js';
import { sanitizeFilenamePart } from '../../utils/fileUtils.js';
import * as fsPromises from 'node:fs/promises';
import path from 'node:path';
@@ -14,10 +14,10 @@ import type { Part } from '@google/genai';
export class BlobDegradationProcessor implements ContextProcessor {
readonly name = 'BlobDegradation';
private config: Config;
private env: ContextEnvironment;
constructor(config: Config) {
this.config = config;
constructor(env: ContextEnvironment, options: Record<string, unknown> = {}) {
this.env = env;
}
async process(
@@ -33,10 +33,10 @@ export class BlobDegradationProcessor implements ContextProcessor {
let directoryCreated = false;
let blobOutputsDir = path.join(
this.config.storage.getProjectTempDir(),
this.env.getProjectTempDir(),
'degraded-blobs',
);
const sessionId = this.config.getSessionId();
const sessionId = this.env.getSessionId();
if (sessionId) {
blobOutputsDir = path.join(
blobOutputsDir,
@@ -101,7 +101,7 @@ export class BlobDegradationProcessor implements ContextProcessor {
}
if (newText && tokensSaved > 0) {
const newTokens = estimateTokenCountSync([{ text: newText }]);
const newTokens = estimateTokenCountSync([{ text: newText }], 0, { charsPerToken: this.env.getCharsPerToken() });
part.presentation = { text: newText, tokens: newTokens };
ep.trigger.metadata.transformations.push({
@@ -3,9 +3,9 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createMockEnvironment } from '../testing/contextTestUtils.js';
import { describe, it, expect, beforeEach } from 'vitest';
import { HistorySquashingProcessor } from './historySquashingProcessor.js';
import type { Config } from '../../config/config.js';
import type {
Episode,
UserPrompt,
@@ -16,19 +16,12 @@ import type { ContextAccountingState } from '../pipeline.js';
import { randomUUID } from 'node:crypto';
describe('HistorySquashingProcessor', () => {
let mockConfig: Config;
let processor: HistorySquashingProcessor;
beforeEach(() => {
mockConfig = {
getContextManagementConfig: vi.fn().mockReturnValue({
strategies: {
historySquashing: { maxTokensPerNode: 100 }, // Extremely small limit for testing
},
}),
} as unknown as Config;
processor = new HistorySquashingProcessor(mockConfig);
processor = new HistorySquashingProcessor(createMockEnvironment(), { maxTokensPerNode: 100 });
});
const getDummyState = (
@@ -6,15 +6,16 @@
import type { Episode } from '../ir/types.js';
import type { ContextAccountingState, ContextProcessor } from '../pipeline.js';
import type { Config } from '../../config/config.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import { truncateProportionally } from '../truncation.js';
export class HistorySquashingProcessor implements ContextProcessor {
readonly name = 'HistorySquashing';
private config: Config;
private options: { maxTokensPerNode: number };
constructor(config: Config) {
this.config = config;
constructor(env: ContextEnvironment, options: { maxTokensPerNode: number }) {
this.options = options;
}
private tryApplySquash(
@@ -54,8 +55,7 @@ export class HistorySquashingProcessor implements ContextProcessor {
return episodes;
}
const { maxTokensPerNode } =
this.config.getContextManagementConfig().strategies.historySquashing;
const { maxTokensPerNode } = this.options;
// We estimate 4 chars per token for truncation logic
const limitChars = maxTokensPerNode * 4;
@@ -4,9 +4,9 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createMockEnvironment } from '../testing/contextTestUtils.js';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { SemanticCompressionProcessor } from './semanticCompressionProcessor.js';
import type { Config } from '../../config/config.js';
import type {
Episode,
UserPrompt,
@@ -17,7 +17,7 @@ import type { ContextAccountingState } from '../pipeline.js';
import { randomUUID } from 'node:crypto';
describe('SemanticCompressionProcessor', () => {
let mockConfig: Config;
let processor: SemanticCompressionProcessor;
let generateContentMock: ReturnType<typeof vi.fn>;
@@ -26,21 +26,10 @@ describe('SemanticCompressionProcessor', () => {
candidates: [{ content: { parts: [{ text: 'Mocked Summary!' }] } }],
});
mockConfig = {
getContextManagementConfig: vi.fn().mockReturnValue({
strategies: {
semanticCompression: {
nodeThresholdTokens: 10,
compressionModel: 'test-model',
},
}, // Super small threshold
}),
getBaseLlmClient: vi.fn().mockReturnValue({
generateContent: generateContentMock,
}),
} as unknown as Config;
processor = new SemanticCompressionProcessor(mockConfig);
const env = createMockEnvironment();
env.getLlmClient = vi.fn().mockReturnValue({ generateContent: generateContentMock }) as any;
processor = new SemanticCompressionProcessor(env, { nodeThresholdTokens: 2000 });
});
const getDummyState = (
@@ -6,7 +6,7 @@
import type { Episode } from '../ir/types.js';
import type { ContextAccountingState, ContextProcessor } from '../pipeline.js';
import type { Config } from '../../config/config.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { LlmRole } from '../../telemetry/types.js';
import { getResponseText } from '../../utils/partUtils.js';
@@ -14,24 +14,26 @@ import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
export class SemanticCompressionProcessor implements ContextProcessor {
readonly name = 'SemanticCompression';
private config: Config;
private env: ContextEnvironment;
private options: { nodeThresholdTokens: number };
private modelToUse: string = 'chat-compression-2.5-flash-lite';
constructor(config: Config) {
this.config = config;
constructor(env: ContextEnvironment, options: { nodeThresholdTokens: number }) {
this.env = env;
this.options = options;
}
async process(
episodes: Episode[],
state: ContextAccountingState,
): Promise<Episode[]> {
require('fs').appendFileSync('/tmp/debug2.json', 'SEMANTIC PROCESS: First episode ID: ' + (episodes[0]?.id) + '\nProtected IDs: ' + Array.from(state.protectedEpisodeIds).join(', ') + '\n');
// If the budget is satisfied, or semantic compression isn't enabled
if (state.isBudgetSatisfied) {
return episodes;
}
const semanticConfig =
this.config.getContextManagementConfig().strategies.semanticCompression;
const semanticConfig = this.options;
// We estimate 4 chars per token for truncation logic
const thresholdChars = semanticConfig.nodeThresholdTokens * 4;
this.modelToUse = 'gemini-2.5-flash';
@@ -169,7 +171,7 @@ export class SemanticCompressionProcessor implements ContextProcessor {
): Promise<string> {
const promptMessage = `You are compressing an old episodic context buffer for an AI assistant.\nSummarize this ${contentType} block in 2-3 highly technical sentences. Keep all critical facts, file names, dependencies, and architectural decisions. Discard conversational filler and boilerplate.\n\nContent:\n${content.slice(0, 30000)}`;
const client = this.config.getBaseLlmClient();
const client = this.env.getLlmClient();
try {
const response = await client.generateContent({
modelConfigKey: { model: this.modelToUse },
@@ -4,9 +4,9 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createMockEnvironment } from '../testing/contextTestUtils.js';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { ToolMaskingProcessor } from './toolMaskingProcessor.js';
import type { Config } from '../../config/config.js';
import type { Episode, ToolExecution } from '../ir/types.js';
import type { ContextAccountingState } from '../pipeline.js';
import { randomUUID } from 'node:crypto';
@@ -15,22 +15,13 @@ import * as fsPromises from 'node:fs/promises';
vi.mock('node:fs/promises');
describe('ToolMaskingProcessor', () => {
let mockConfig: Config;
let processor: ToolMaskingProcessor;
beforeEach(() => {
vi.resetAllMocks();
mockConfig = {
getContextManagementConfig: vi.fn().mockReturnValue({
strategies: {
toolMasking: { stringLengthThresholdTokens: 100 },
},
}),
storage: { getProjectTempDir: vi.fn().mockReturnValue('/tmp/gemini') },
getSessionId: vi.fn().mockReturnValue('test-session'),
} as unknown as Config;
processor = new ToolMaskingProcessor(mockConfig);
processor = new ToolMaskingProcessor(createMockEnvironment(), { stringLengthThresholdTokens: 100 });
});
const getDummyState = (
@@ -84,6 +75,7 @@ describe('ToolMaskingProcessor', () => {
const state = getDummyState(true);
const result = await processor.process(episodes, state);
require('fs').appendFileSync('/tmp/debug.json', '\n\n' + JSON.stringify({res: result[0].steps[0]}, null, 2));
expect(result).toStrictEqual(episodes);
expect((result[0].steps[0] as ToolExecution).presentation).toBeUndefined();
@@ -5,7 +5,7 @@
*/
import type { ContextAccountingState, ContextProcessor } from '../pipeline.js';
import type { Config } from '../../config/config.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
import { sanitizeFilenamePart } from '../../utils/fileUtils.js';
import * as fsPromises from 'node:fs/promises';
@@ -29,18 +29,21 @@ const UNMASKABLE_TOOLS = new Set([
export class ToolMaskingProcessor implements ContextProcessor {
readonly name = 'ToolMasking';
private config: Config;
private env: ContextEnvironment;
private options: { stringLengthThresholdTokens: number };
constructor(config: Config) {
this.config = config;
constructor(env: ContextEnvironment, options: { stringLengthThresholdTokens: number }) {
this.env = env;
this.options = options;
}
async process(
episodes: Episode[],
state: ContextAccountingState,
): Promise<Episode[]> {
const maskingConfig =
this.config.getContextManagementConfig().strategies.toolMasking;
this.options;
if (!maskingConfig) return episodes;
if (state.isBudgetSatisfied) return episodes;
@@ -49,10 +52,10 @@ export class ToolMaskingProcessor implements ContextProcessor {
const limitChars = maskingConfig.stringLengthThresholdTokens * 4;
let toolOutputsDir = path.join(
this.config.storage.getProjectTempDir(),
this.env.getProjectTempDir(),
'tool-outputs',
);
const sessionId = this.config.getSessionId();
const sessionId = this.env.getSessionId();
if (sessionId) {
toolOutputsDir = path.join(
toolOutputsDir,
@@ -121,6 +124,8 @@ export class ToolMaskingProcessor implements ContextProcessor {
nodeType: string,
): Promise<{ masked: any; changed: boolean }> => {
if (typeof obj === 'string') {
require('fs').appendFileSync('/tmp/debug.json', 'STRING FOUND. length: ' + obj.length + ' limitChars: ' + limitChars + '\n');
if (obj.length > 1000) console.log('Found string of length:', obj.length, 'limitChars is:', limitChars, 'isAlreadyMasked:', this.isAlreadyMasked(obj));
if (obj.length > limitChars && !this.isAlreadyMasked(obj)) {
const newString = await handleMasking(
obj,
@@ -0,0 +1,59 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Config } from '../../config/config.js';
import type { SidecarConfig } from './types.js';
export class SidecarLoader {
/**
* Generates a default Sidecar JSON graph from the user's legacy UI profile settings.
*/
static fromLegacyConfig(config: Config): SidecarConfig {
const mngConfig = config.getContextManagementConfig ? config.getContextManagementConfig() : undefined;
const strat: any = mngConfig?.strategies ?? {};
const budget = mngConfig?.budget ?? { retainedTokens: 65000, maxTokens: 150000, maxPressureStrategy: 'truncate', gcTarget: 'incremental', freeTokensTarget: 10000 };
return {
budget: {
retainedTokens: budget.retainedTokens,
maxTokens: budget.maxTokens,
},
gcBackstop: {
strategy: budget.maxPressureStrategy,
target: budget.gcTarget,
freeTokensTarget: budget.freeTokensTarget,
},
pipelines: {
eagerBackground: [
{
processorId: 'StateSnapshotWorker',
options: {},
}
],
retainedProcessingGraph: [
{
processorId: 'HistorySquashingProcessor',
options: { maxTokensPerNode: strat.historySquashing?.maxTokensPerNode ?? 3000 }
}
],
normalProcessingGraph: [
{
processorId: 'ToolMaskingProcessor',
options: { stringLengthThresholdTokens: strat.toolMasking?.stringLengthThresholdTokens ?? 8000 }
},
{
processorId: 'BlobDegradationProcessor',
options: {}
},
{
processorId: 'SemanticCompressionProcessor',
options: { nodeThresholdTokens: strat.semanticCompression?.nodeThresholdTokens ?? 3000 }
}
]
}
};
}
}
@@ -0,0 +1,17 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextTracer } from '../tracer.js';
export interface ContextEnvironment {
getLlmClient(): BaseLlmClient;
getSessionId(): string;
getTraceDir(): string;
getProjectTempDir(): string;
getTracer(): ContextTracer;
getCharsPerToken(): number;
}
@@ -0,0 +1,38 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { ContextTracer } from '../tracer.js';
import type { ContextEnvironment } from './environment.js';
export class ContextEnvironmentImpl implements ContextEnvironment {
constructor(private llmClient: BaseLlmClient, private sessionId: string, private traceDir: string, private tempDir: string, private tracer: ContextTracer, private charsPerToken: number) {}
getLlmClient(): BaseLlmClient {
return this.llmClient;
}
getSessionId(): string {
return this.sessionId;
}
getTraceDir(): string {
return this.traceDir;
}
getProjectTempDir(): string {
return this.tempDir;
}
getTracer(): ContextTracer {
return this.tracer;
}
getCharsPerToken(): number {
return this.charsPerToken;
}
}
@@ -0,0 +1,37 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextProcessor } from '../pipeline.js';
import type { AsyncContextWorker } from '../workers/asyncContextWorker.js';
import type { ContextEnvironment } from './environment.js';
export interface ContextProcessorDef<TOptions extends Record<string, unknown> = any> {
readonly id: string;
create(env: ContextEnvironment, options: TOptions): ContextProcessor | AsyncContextWorker;
}
/**
* Registry for mapping declarative sidecar configs to running Processor instances.
*/
export class ProcessorRegistry {
private static processors = new Map<string, ContextProcessorDef>();
static register(def: ContextProcessorDef) {
this.processors.set(def.id, def);
}
static get(id: string): ContextProcessorDef {
const def = this.processors.get(id);
if (!def) {
throw new Error(`Context Processor [${id}] is not registered.`);
}
return def;
}
static clear() {
this.processors.clear();
}
}
@@ -0,0 +1,55 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Definition of a processor or worker to be instantiated in the graph.
*/
export interface ProcessorConfig {
/** The registered ID of the processor (e.g. 'SemanticCompressionProcessor') */
processorId: string;
/** Dynamic, processor-specific hyperparameters */
options: Record<string, unknown>;
}
/**
* The Data-Driven Schema for the Context Manager.
*/
export interface SidecarConfig {
/** Defines the token ceilings and limits for the pipeline. */
budget: {
retainedTokens: number;
maxTokens: number;
};
/** Defines what happens when the pipeline fails to compress under 'maxTokens' */
gcBackstop: {
strategy: 'truncate' | 'compress' | 'rollingSummarizer';
target: 'incremental' | 'freeNTokens' | 'max';
freeTokensTarget?: number;
};
/** The execution graphs for context manipulation */
pipelines: {
/**
* Eagerly executes in the background when the 'retainedTokens' boundary is crossed.
* Contains AsyncContextWorkers (e.g. StateSnapshotWorker).
*/
eagerBackground: ProcessorConfig[];
/**
* Executes sequentially to protect the pristine outliers within the retained window.
* Contains ContextProcessors (e.g. HistorySquashingProcessor).
*/
retainedProcessingGraph: ProcessorConfig[];
/**
* Executes sequentially to opportunistically degrade messages older than the retained window.
* Contains ContextProcessors (e.g. ToolMaskingProcessor, SemanticCompressionProcessor).
*/
normalProcessingGraph: ProcessorConfig[];
};
}
@@ -6,7 +6,28 @@
import { vi } from 'vitest';
import type { Config } from '../../config/config.js';
import type { GeminiClient } from '../../core/client.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
export function createMockEnvironment(): ContextEnvironment {
return {
getLlmClient: vi.fn().mockReturnValue({
generateContent: vi.fn().mockResolvedValue({
text: 'Mock LLM summary response',
}),
}) as any,
getSessionId: vi.fn().mockReturnValue('mock-session'),
getTraceDir: vi.fn().mockReturnValue('/tmp/.gemini/trace'),
getProjectTempDir: vi.fn().mockReturnValue('/tmp'),
getTracer: vi.fn().mockReturnValue({
logEvent: vi.fn(),
saveAsset: vi.fn().mockReturnValue('mock-asset-id'),
}) as any,
getCharsPerToken: vi.fn().mockReturnValue(1),
};
}
import type { Content } from '@google/genai';
import { AgentChatHistory } from '../../core/agentChatHistory.js';
import { ContextManager } from '../contextManager.js';
@@ -20,7 +41,7 @@ export function createSyntheticHistory(
tokensPerTurn: number,
): Content[] {
const history: Content[] = [];
const charsPerTurn = tokensPerTurn * 4;
const charsPerTurn = tokensPerTurn * 1;
for (let i = 0; i < numTurns; i++) {
history.push({
@@ -45,23 +66,28 @@ export function createMockContextConfig(
): Config {
const defaultConfig = {
isContextManagementEnabled: vi.fn().mockReturnValue(true),
storage: {
getProjectTempDir: vi.fn().mockReturnValue('/tmp/gemini-test'),
},
getContextManagementConfig: vi.fn().mockReturnValue({
enabled: true,
charsPerToken: 1,
strategies: {
historySquashing: { maxTokensPerNode: 3000 },
toolMasking: { stringLengthThresholdTokens: 10000 },
semanticCompression: {
nodeThresholdTokens: 5000,
compressionModel: 'gemini-2.5-flash',
},
},
budget: {
maxTokens: 150000,
retainedTokens: 65000,
protectedEpisodes: 1,
protectSystemEpisode: true,
maxPressureStrategy: 'truncate',
semanticCompression: { nodeThresholdTokens: 5000 },
},
budget: { retainedTokens: 500, maxTokens: 150000, maxPressureStrategy: 'truncate', gcTarget: 'incremental', freeTokensTarget: 1000 },
gcBackstop: { strategy: 'truncate', target: 'freeNTokens', freeTokensTarget: 100 },
pipelines: {
eagerBackground: [{ processorId: 'StateSnapshotWorker', options: {} }],
retainedProcessingGraph: [{ processorId: 'HistorySquashingProcessor', options: { maxTokensPerNode: 3000 } }],
normalProcessingGraph: [
{ processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 10000 } },
{ processorId: 'BlobDegradationProcessor', options: {} },
{ processorId: 'SemanticCompressionProcessor', options: { nodeThresholdTokens: 5000 } }
]
}
}),
getBaseLlmClient: vi.fn().mockReturnValue(
llmClientOverride || {
@@ -81,12 +107,16 @@ export function createMockContextConfig(
/**
* Wires up a full ContextManager component with an AgentChatHistory and active background workers.
*/
import { ContextTracer } from '../tracer.js';
import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js';
import { SidecarLoader } from '../sidecar/SidecarLoader.js';
export function setupContextComponentTest(config: Config) {
const chatHistory = new AgentChatHistory();
const contextManager = new ContextManager(
config,
config.getBaseLlmClient() as unknown as GeminiClient,
);
const sidecar = SidecarLoader.fromLegacyConfig(config);
const tracer = new ContextTracer('/tmp', 'test-session');
const env = new ContextEnvironmentImpl(config.getBaseLlmClient() as any, 'test-session', '/tmp', '/tmp/gemini-test', tracer, 1);
const contextManager = new ContextManager(sidecar, env, tracer);
// The async worker is now internally managed by ContextManager
+3 -6
View File
@@ -6,6 +6,7 @@
export interface ContextManagementConfig {
enabled: boolean;
charsPerToken?: number;
/** The global orchestration budget */
budget: {
@@ -13,10 +14,7 @@ export interface ContextManagementConfig {
maxTokens: number;
/** The target token count to aggressively drop to using asynchronous "Ship of Theseus" background GC */
retainedTokens: number;
/** The number of recent Episodes to always protect from degradation (default: 1) */
protectedEpisodes: number;
/** Should we protect Episode 0 (the System Prompt/Architectural Initialization)? */
protectSystemEpisode: boolean;
/**
* The strategy to use when maxTokens is exceeded.
@@ -41,8 +39,7 @@ export interface ContextManagementConfig {
semanticCompression: {
/** The threshold (in tokens) at which a text node is sent to the LLM for summarization */
nodeThresholdTokens: number;
/** The model to use for generating the semantic summary */
compressionModel: string;
};
};
}
@@ -0,0 +1,29 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Part } from '@google/genai';
import { estimateTokenCountSync as baseEstimate } from '../../utils/tokenCalculation.js';
export function estimateContextTokenCountSync(
parts: Part[],
depth: number = 0,
config?: { charsPerToken?: number }
): number {
if (config?.charsPerToken !== undefined && config.charsPerToken !== 4) {
let totalTokens = 0;
for (const part of parts) {
if (typeof part.text === 'string') {
totalTokens += Math.ceil(part.text.length / config.charsPerToken);
} else {
totalTokens += Math.ceil(JSON.stringify(part).length / config.charsPerToken);
}
}
return totalTokens;
}
// The baseEstimate no longer accepts config because we forked it!
return baseEstimate(parts, depth);
}
@@ -5,7 +5,7 @@
*/
import { randomUUID } from 'node:crypto';
import type { Config } from '../../config/config.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { Episode, SnapshotVariant } from '../ir/types.js';
import type { AsyncContextWorker } from './asyncContextWorker.js';
import type {
@@ -13,7 +13,7 @@ import type {
ContextConsolidationEvent,
} from '../eventBus.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { estimateTokenCountSync } from '../../utils/tokenCalculation.js';
import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js';
import { IrMapper } from '../ir/mapper.js';
import { LlmRole } from '../../telemetry/llmRole.js';
import type { ContextTracer } from '../tracer.js';
@@ -24,9 +24,10 @@ export class StateSnapshotWorker implements AsyncContextWorker {
private tracer?: ContextTracer;
private isSynthesizing = false;
constructor(private readonly _config: Config) {}
constructor(private readonly env: ContextEnvironment) {}
start(bus: ContextEventBus, tracer?: ContextTracer): void {
console.log('Worker start() called with bus:', !!bus);
this.bus = bus;
this.tracer = tracer;
this.bus.onConsolidationNeeded(this.handleConsolidation.bind(this));
@@ -42,6 +43,7 @@ export class StateSnapshotWorker implements AsyncContextWorker {
private async handleConsolidation(
event: ContextConsolidationEvent,
): Promise<void> {
console.log(`Worker handling consolidation. targetDeficit: ${event.targetDeficit}, isSynthesizing: ${this.isSynthesizing}`);
if (this.isSynthesizing || event.targetDeficit <= 0) return;
// Identify the "dying" block of episodes that need to be collected.
@@ -51,13 +53,17 @@ export class StateSnapshotWorker implements AsyncContextWorker {
(ep) => !ep.variants?.['snapshot'],
);
if (unprotectedOldest.length === 0) return;
if (unprotectedOldest.length === 0) {
return;
}
let targetDeficit = event.targetDeficit;
const episodesToSynthesize: Episode[] = [];
let tokensToSynthesize = 0;
for (const ep of unprotectedOldest) {
console.log('Worker considering episode:', ep.id);
if (tokensToSynthesize >= targetDeficit) break;
episodesToSynthesize.push(ep);
// Rough estimate of tokens in this episode
@@ -71,7 +77,9 @@ export class StateSnapshotWorker implements AsyncContextWorker {
if (episodesToSynthesize.length === 0) return;
console.log(`Worker synthesized logic loop complete. Selected ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`);
this.isSynthesizing = true;
try {
debugLogger.log(
@@ -79,7 +87,7 @@ export class StateSnapshotWorker implements AsyncContextWorker {
);
this.tracer?.logEvent('StateSnapshotWorker', `Consolidation requested. Synthesizing ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`);
const client = this._config.getBaseLlmClient();
const client = this.env.getLlmClient();
const rawContents = IrMapper.fromIr(episodesToSynthesize);
const rawAssetId = this.tracer?.saveAsset('StateSnapshotWorker', 'episodes_to_synthesize', rawContents);
this.tracer?.logEvent('StateSnapshotWorker', 'Dispatching LLM request for snapshot generation', { rawAssetId });
@@ -118,7 +126,7 @@ ${snapshotText || '[Failed to generate snapshot]'}
const snapshotTokens = estimateTokenCountSync([
{ text: mockSnapshotText },
]);
], 0, { charsPerToken: this.env.getCharsPerToken() });
const replacedEpisodeIds = episodesToSynthesize.map((e) => e.id);
@@ -173,6 +181,7 @@ ${snapshotText || '[Failed to generate snapshot]'}
if (this.bus) {
this.tracer?.logEvent('StateSnapshotWorker', `Emitting VARIANT_READY for targetId [${targetId}]`);
this.bus.emitVariantReady({
targetId,
variantId: 'snapshot',
+7 -1
View File
@@ -45,6 +45,9 @@ import type { ContentGenerator } from './contentGenerator.js';
import { LoopDetectionService } from '../services/loopDetectionService.js';
import { ChatCompressionService } from '../context/chatCompressionService.js';
import { ContextManager } from '../context/contextManager.js';
import { SidecarLoader } from '../context/sidecar/SidecarLoader.js';
import { ContextEnvironmentImpl } from '../context/sidecar/environmentImpl.js';
import { ContextTracer } from '../context/tracer.js';
import { ideContextStore } from '../ide/ideContext.js';
import {
logContentRetryFailure,
@@ -113,7 +116,10 @@ export class GeminiClient {
this.loopDetector = new LoopDetectionService(this.config);
this.compressionService = new ChatCompressionService();
this.contextManager = new ContextManager(this.config, this);
const sidecar = SidecarLoader.fromLegacyConfig(this.config);
const tracer = new ContextTracer(typeof this.config.getTargetDir === 'function' ? this.config.getTargetDir() : '/tmp', typeof this.config.getSessionId === 'function' ? this.config.getSessionId() : 'test');
const env = new ContextEnvironmentImpl(this as any, typeof this.config.getSessionId === 'function' ? this.config.getSessionId() : 'test', typeof this.config.getTargetDir === 'function' ? this.config.getTargetDir() : '/tmp', this.config.storage?.getProjectTempDir ? this.config.storage.getProjectTempDir() : '/tmp', tracer, this.config.getContextManagementConfig && this.config.getContextManagementConfig() ? this.config.getContextManagementConfig().charsPerToken ?? 4 : 4);
this.contextManager = new ContextManager(sidecar, env, tracer);
this.toolOutputMaskingService = new ToolOutputMaskingService();
this.lastPromptId = this.config.getSessionId();