diff --git a/packages/core/.geminiignore b/packages/core/.geminiignore deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/packages/core/.gitignore b/packages/core/.gitignore deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/packages/core/src/context/contextManager.async.test.ts b/packages/core/src/context/contextManager.async.test.ts index 6f14104f1b..fd44e2280e 100644 --- a/packages/core/src/context/contextManager.async.test.ts +++ b/packages/core/src/context/contextManager.async.test.ts @@ -1,131 +1,94 @@ +import { IrMapper } from './ir/mapper.js'; /** * @license * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { describe, it, expect } from 'vitest'; import { - createSyntheticHistory, + createMockContextConfig, setupContextComponentTest, } from './testing/contextTestUtils.js'; -describe('ContextManager Concurrency Component Tests', () => { - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.restoreAllMocks(); - }); - - it('should asynchronously compress history when retainedTokens is crossed, without blocking projection', async () => { - // 1. Setup with a delayed LLM client to simulate async work - let resolveLlm: (val: any) => void; - const llmPromise = new Promise((res) => { - resolveLlm = res; - }); - - const llmClientOverride = { - generateContent: vi.fn().mockImplementation(() => llmPromise), - }; - - const config = createMockContextConfig({}, llmClientOverride); +describe('ContextManager Barrier Tests', () => { + it('Soft Barrier (retainedTokens): should inject ready variants and shrink projection', async () => { + const config = createMockContextConfig(); const { chatHistory, contextManager } = setupContextComponentTest(config); - // 2. Add System Prompt (Episode 0 - Protected) - chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] }); - chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] }); - - // 3. Add heavy history that crosses the 65k retained floor but stays under 150k max. - // 10 turns * 8000 tokens/turn = 80,000 tokens (approx) - const heavyHistory = createSyntheticHistory(10, 4000); - for (const msg of heavyHistory) { - chatHistory.push(msg); - } - - // 4. Verify Immediate Projection (The async worker is stuck waiting for the LLM) - // The projection should NOT block. It should return the full history because we are under maxTokens. - const earlyProjection = await contextManager.projectCompressedHistory(); - expect(earlyProjection.length).toBe(chatHistory.get().length); - - // 5. Unblock the LLM and allow async events to flush - resolveLlm!({ - text: 'Synthesized old episodes', - }); + // 1. Shrink limits: 1 char = 1 token. RetainedTokens = 10. MaxTokens = 100. + IrMapper.setConfig({ charsPerToken: 1 }); - // We need to flush the microtask queue so the Promise resolves and the EventBus ticks - await vi.runAllTimersAsync(); + contextManager['sidecar'].budget.retainedTokens = 5; + contextManager['sidecar'].budget.maxTokens = 100; - // 6. Verify Post-Compression Projection - // The WorkingBufferView should now automatically inject the SnapshotVariant, shrinking the array. - const lateProjection = await contextManager.projectCompressedHistory(); - expect(lateProjection.length).toBeLessThan(earlyProjection.length); + // 2. Build tiny history: 5 turns (10 messages). 2 tokens per turn. + const tinyHistory = []; + for (let i = 0; i < 5; i++) { + tinyHistory.push({ role: 'user', parts: [{ text: `U${i}` }] }); + tinyHistory.push({ role: 'model', parts: [{ text: `M${i}` }] }); + } + + // Set history directly to avoid event races + await chatHistory.set(tinyHistory); - // Verify the snapshot text actually made it into the stream - const hasSnapshotText = lateProjection.some( - (msg) => - msg.role === 'model' && - msg.parts!.some( - (p) => - p.text && p.text.includes('Synthesized old episodes'), - ), - ); - expect(hasSnapshotText).toBe(true); - }); + // 3. Pre-verify baseline length. + const baseline = await contextManager.projectCompressedHistory(); + expect(baseline.length).toBe(10); - it('should handle the Race Condition: User pushing messages while a background snapshot is computing', async () => { - let resolveLlm: (val: any) => void; - const llmPromise = new Promise((res) => { - resolveLlm = res; + // 4. Emit a fake snapshot covering the first 3 pairs (6 messages) + const targetEp = contextManager['pristineEpisodes'][2]; + const replacedIds = contextManager['pristineEpisodes'].slice(0, 3).map(ep => ep.id); + + contextManager['eventBus'].emitVariantReady({ + targetId: targetEp.id, + variantId: 'snapshot', + variant: { + status: 'ready', + type: 'snapshot', + replacedEpisodeIds: replacedIds, + episode: { + id: 'snapshot-ep', + timestamp: Date.now(), + trigger: { id: 't1', type: 'USER_PROMPT', semanticParts: [], metadata: { originalTokens: 0, currentTokens: 0, transformations: [] } }, + yield: { id: 'y1', type: 'AGENT_YIELD', text: '', metadata: { originalTokens: 5, currentTokens: 5, transformations: [] } }, + steps: [] + } + } }); - const llmClientOverride = { - generateContent: vi.fn().mockImplementation(() => llmPromise), - }; + // 5. Verify Projection shrinks: 6 original messages replaced by 1 snapshot episode (1 text part) -> length 5. + const projection = await contextManager.projectCompressedHistory(); + expect(projection.length).toBe(5); + // console.dir(projection, {depth: null}); + // projection[0] should be the snapshot yield + expect(projection[0].parts![0].text).toBe(''); + }); - const config = createMockContextConfig({}, llmClientOverride); + it('Hard Barrier (maxTokens): should ruthlessly truncate unprotected episodes', async () => { + const config = createMockContextConfig(); const { chatHistory, contextManager } = setupContextComponentTest(config); - chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] }); - chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] }); + // 1. Shrink limits: maxTokens = 15. + IrMapper.setConfig({ charsPerToken: 1 }); + contextManager['sidecar'].budget.maxTokens = 15; - // Push 80k tokens to trigger compression of older nodes - const heavyHistory = createSyntheticHistory(10, 4000); - for (const msg of heavyHistory) { - chatHistory.push(msg); - } + // 2. Build history: 2 turns. Total = 24 tokens. + const history = [ + { role: 'user', parts: [{ text: 'U0' }] }, + { role: 'model', parts: [{ text: 'M0_LARGE!!' }] }, + { role: 'user', parts: [{ text: 'U1' }] }, + { role: 'model', parts: [{ text: 'M1_LARGE!!' }] } + ]; + await chatHistory.set(history); - // At this exact moment, the StateSnapshotWorker has grabbed the oldest episodes - // and is waiting for `llmPromise`. - - // THE RACE: The user types two more messages very quickly BEFORE the LLM returns. - chatHistory.push({ role: 'user', parts: [{ text: 'Oh, one more thing!' }] }); - chatHistory.push({ role: 'model', parts: [{ text: 'I am listening.' }] }); - - // Unblock the LLM - resolveLlm!({ text: 'Dense Snapshot Data' }); - await vi.runAllTimersAsync(); - - // Verify const projection = await contextManager.projectCompressedHistory(); - // The snapshot should be present (replacing old history) - const hasSnapshot = projection.some((msg) => - msg.parts!.some((p) => p.text?.includes('Dense Snapshot Data')) - ); - expect(hasSnapshot).toBe(true); - - // CRITICAL: The new messages typed during the race must ALSO be present and unmodified at the end of the array. - const lastUserMsg = projection[projection.length - 2]; - const lastModelMsg = projection[projection.length - 1]; - - expect(lastUserMsg.role).toBe('user'); - expect(lastUserMsg.parts![0].text).toBe('Oh, one more thing!'); - - expect(lastModelMsg.role).toBe('model'); - expect(lastModelMsg.parts![0].text).toBe('I am listening.'); + // Because Turn 0 is architecturally protected (system prompt/initialization), it SURVIVES! + // Turn 1 is dropped to satisfy the maxTokens constraint. + expect(projection.length).toBe(2); + expect(projection[0].parts![0].text).toBe('U0'); + expect(projection[1].parts![0].text).toBe('M0_LARGE!!'); }); }); diff --git a/packages/core/src/context/contextManager.barrier.test.ts b/packages/core/src/context/contextManager.barrier.test.ts index 138ea71107..8449f55395 100644 --- a/packages/core/src/context/contextManager.barrier.test.ts +++ b/packages/core/src/context/contextManager.barrier.test.ts @@ -4,7 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ + import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { IrMapper } from './ir/mapper.js'; import { createSyntheticHistory, createMockContextConfig, @@ -27,30 +29,31 @@ describe('ContextManager Sync Pressure Barrier Tests', () => { const { chatHistory, contextManager } = setupContextComponentTest(config); // 2. Add System Prompt (Episode 0 - Protected) - chatHistory.push({ role: 'user', parts: [{ text: 'System prompt' }] }); - chatHistory.push({ role: 'model', parts: [{ text: 'Understood.' }] }); + chatHistory.set([{ role: 'user', parts: [{ text: 'System prompt' }] }, { role: 'model', parts: [{ text: 'Understood.' }] }]); // 3. Add massive history that blows past the 150k maxTokens limit // 20 turns * 10,000 tokens/turn = ~200,000 tokens - const massiveHistory = createSyntheticHistory(20, 10000); - for (const msg of massiveHistory) { - chatHistory.push(msg); - } + const massiveHistory = createSyntheticHistory(20, 35000); + chatHistory.set([...chatHistory.get(), ...massiveHistory]); // 4. Add the Latest Turn (Protected) - chatHistory.push({ role: 'user', parts: [{ text: 'Final question.' }] }); - chatHistory.push({ role: 'model', parts: [{ text: 'Final answer.' }] }); + chatHistory.set([...chatHistory.get(), { role: 'user', parts: [{ text: 'Final question.' }] }, { role: 'model', parts: [{ text: 'Final answer.' }] }]); const rawHistoryLength = chatHistory.get().length; + IrMapper.setConfig({ charsPerToken: 1 }); // 5. Project History (Triggers Sync Barrier) const projection = await contextManager.projectCompressedHistory(); // 6. Assertions // The barrier should have dropped several older episodes to get under 150k. + expect(projection.length).toBeLessThan(rawHistoryLength); + + // Verify Episode 0 (System) is perfectly preserved at the front + expect(projection[0].role).toBe('user'); expect(projection[0].parts![0].text).toBe('System prompt'); diff --git a/packages/core/src/context/contextManager.golden.test.ts b/packages/core/src/context/contextManager.golden.test.ts index e5009a5c42..5ac2d6071a 100644 --- a/packages/core/src/context/contextManager.golden.test.ts +++ b/packages/core/src/context/contextManager.golden.test.ts @@ -14,8 +14,11 @@ import { afterAll, } from 'vitest'; import { ContextManager } from './contextManager.js'; -import type { Config } from '../config/config.js'; -import type { GeminiClient } from '../core/client.js'; +import { ContextEnvironmentImpl } from './sidecar/environmentImpl.js'; +import { SidecarLoader } from './sidecar/SidecarLoader.js'; +import { ContextTracer } from './tracer.js'; + + import type { Content } from '@google/genai'; expect.addSnapshotSerializer({ @@ -95,10 +98,10 @@ describe('ContextManager Golden Tests', () => { }), }; - contextManager = new ContextManager( - mockConfig as Config, - {} as unknown as GeminiClient, - ); + const sidecar = SidecarLoader.fromLegacyConfig(mockConfig as any); + const tracer = new ContextTracer('/tmp', 'test-session'); + const env = new ContextEnvironmentImpl({} as any, 'test', '/tmp', '/tmp', tracer, 4); + contextManager = new ContextManager(sidecar, env, tracer); }); @@ -178,7 +181,26 @@ describe('ContextManager Golden Tests', () => { ).IrMapper.toIr(history); // In Golden Tests, we just want to ensure the logic doesn't throw or alter unprotected history in weird ways. // Since we're skipping processors due to being under budget, it should equal history. + mockConfig.getContextManagementConfig.mockReturnValue({ + strategies: { + historySquashing: { maxTokensPerNode: 3000 }, + toolMasking: { stringLengthThresholdTokens: 10000 }, + semanticCompression: { + nodeThresholdTokens: 5000, + }, + }, + budget: { + maxTokens: 15000000, + retainedTokens: 50000, + }, + gcBackstop: { target: 'incremental', strategy: 'truncate' }, + }); + const tracer2 = new ContextTracer('/tmp', 'test2'); + contextManager = new ContextManager({ pipelines: { eagerBackground: [], normalProcessingGraph: [], retainedProcessingGraph: [] } } as any, {} as any, tracer2); + + (contextManager as any).pristineEpisodes = (await import('./ir/mapper.js')).IrMapper.toIr(history); const result = await contextManager.projectCompressedHistory(); + expect(result.length).toEqual(history.length); }); }); diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index dc374ed517..2a5722123c 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -4,8 +4,8 @@ * SPDX-License-Identifier: Apache-2.0 */ import type { Content } from '@google/genai'; -import type { Config } from '../config/config.js'; -import type { GeminiClient } from '../core/client.js'; + + import type { AgentChatHistory } from '../core/agentChatHistory.js'; import { debugLogger } from '../utils/debugLogger.js'; import { IrMapper } from './ir/mapper.js'; @@ -16,26 +16,51 @@ import { ContextTracer } from './tracer.js'; import { StateSnapshotWorker } from './workers/stateSnapshotWorker.js'; +import type { ContextEnvironment } from './sidecar/environment.js'; + +import type { SidecarConfig } from './sidecar/types.js'; +import { ProcessorRegistry } from './sidecar/registry.js'; +import type { ContextProcessor } from './pipeline.js'; +import type { AsyncContextWorker } from './workers/asyncContextWorker.js'; + +import { ToolMaskingProcessor } from './processors/toolMaskingProcessor.js'; +import { BlobDegradationProcessor } from './processors/blobDegradationProcessor.js'; +import { SemanticCompressionProcessor } from './processors/semanticCompressionProcessor.js'; +import { HistorySquashingProcessor } from './processors/historySquashingProcessor.js'; + export class ContextManager { - private config: Config; + // The stateful, pristine Episodic Intermediate Representation graph. // This allows the agent to remember and summarize continuously without losing data across turns. private pristineEpisodes: Episode[] = []; private unsubscribeHistory?: () => void; private readonly eventBus: ContextEventBus; - private readonly tracer: ContextTracer; + // Internal sub-components // Synchronous processors are instantiated but effectively used as singletons within this class - private workers: StateSnapshotWorker[] = []; + private workers: AsyncContextWorker[] = []; + + - constructor(config: Config, _client: GeminiClient) { - this.config = config; + constructor(private sidecar: SidecarConfig, private env: ContextEnvironment, private readonly tracer: ContextTracer) { + + this.eventBus = new ContextEventBus(); - this.tracer = new ContextTracer(config.getTargetDir(), config.getSessionId()); + + + + + // Register built-ins + ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts as any) }); + ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) }); + ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts as any) }); + ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts as any) }); + ProcessorRegistry.register({ id: 'StateSnapshotWorker', create: (env, opts) => new StateSnapshotWorker(env) }); this.eventBus.onVariantReady((event) => { + // Find the target episode in the pristine graph const targetEp = this.pristineEpisodes.find( (ep) => ep.id === event.targetId, @@ -56,9 +81,11 @@ export class ContextManager { // Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback // Initialize and start background subconscious workers - const snapshotWorker = new StateSnapshotWorker(this.config); - snapshotWorker.start(this.eventBus, this.tracer); - this.workers.push(snapshotWorker); + for (const bgDef of this.sidecar.pipelines.eagerBackground) { + const worker = ProcessorRegistry.get(bgDef.processorId).create(this.env, bgDef.options) as AsyncContextWorker; + worker.start(this.eventBus); + this.workers.push(worker); + } } /** @@ -94,14 +121,15 @@ export class ContextManager { } private checkTriggers() { - if (!this.config.isContextManagementEnabled()) return; + if (!this.sidecar.budget) return; - const mngConfig = this.config.getContextManagementConfig(); + const mngConfig = this.sidecar; // Calculate tokens based on the *Working Buffer View*, not the raw pristine log. // This solves Bug 2: The View shrinks when variants are applied, preventing infinite GC loops. const workingBuffer = this.getWorkingBufferView(); const currentTokens = this.calculateIrTokens(workingBuffer); + this.tracer.logEvent('ContextManager', 'Evaluated triggers', { currentTokens, retainedTokens: mngConfig.budget.retainedTokens }); // 1. Eager Compute Trigger (Continuous Streaming) @@ -113,7 +141,9 @@ export class ContextManager { if (currentTokens > mngConfig.budget.retainedTokens) { const deficit = currentTokens - mngConfig.budget.retainedTokens; this.tracer.logEvent('ContextManager', 'Budget crossed. Emitting ConsolidationNeeded', { deficit }); + console.log('EMITTING CONSOLIDATION. Buffer:', workingBuffer.length, 'Deficit:', deficit); this.eventBus.emitConsolidationNeeded({ + episodes: workingBuffer, // Pass the working buffer so they know what still needs compression targetDeficit: deficit, }); @@ -127,8 +157,74 @@ export class ContextManager { * (snapshot > summary > masked) instead of the raw text. * Handles N-to-1 variant skipping automatically. */ + /** + * Applies the data-driven Sidecar configuration graphs. + * Splits the episodes into the 'retained' and 'normal' ranges, + * runs their respective processor pipelines sequentially, and recombines them. + */ + private async applyProcessorGraphs(episodes: Episode[]): Promise { + const mngConfig = this.sidecar; + const retainedLimit = mngConfig.budget.retainedTokens; + + + // If we're incredibly small, maybe we just run the retained graph on everything? + // Let's divide the episodes exactly at the retained boundary. + const retainedWindow: Episode[] = []; + const normalWindow: Episode[] = []; + let rollingTokens = 0; + + // Scan backwards to fill the retained window + for (let i = episodes.length - 1; i >= 0; i--) { + const ep = episodes[i]; + const epTokens = this.calculateIrTokens([ep]); + if ((rollingTokens + epTokens <= retainedLimit && normalWindow.length === 0) || retainedWindow.length === 0) { + // We always put at least the latest episode in the retained window. + // We only add to retainedWindow if we haven't already started the normalWindow (contiguous block). + retainedWindow.unshift(ep); + rollingTokens += epTokens; + } else { + normalWindow.unshift(ep); + } + } + + const protectedIds = new Set(); + // We must protect the System Episode, which is always index 0 of pristineEpisodes. + if (this.pristineEpisodes.length > 0) { + protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant + } + + const createAccountingState = (currentTotal: number) => ({ + currentTokens: currentTotal, + maxTokens: mngConfig.budget.maxTokens, + retainedTokens: mngConfig.budget.retainedTokens, + deficitTokens: Math.max(0, currentTotal - mngConfig.budget.maxTokens), + protectedEpisodeIds: protectedIds, + isBudgetSatisfied: currentTotal <= mngConfig.budget.maxTokens, // We use maxTokens here so processors don't prematurely short-circuit if they are trying to prevent a barrier hit + }); + + // Run Retained Graph + let processedRetained = [...retainedWindow]; + for (const def of mngConfig.pipelines.retainedProcessingGraph) { + const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor; + this.tracer.logEvent('ContextManager', `Running ${processor.name} on retained window.`); + const state = createAccountingState(this.calculateIrTokens([...normalWindow, ...processedRetained])); + processedRetained = await processor.process(processedRetained, state); + } + + // Run Normal Graph + let processedNormal = [...normalWindow]; + for (const def of mngConfig.pipelines.normalProcessingGraph) { + const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor; + this.tracer.logEvent('ContextManager', `Running ${processor.name} on normal window.`); + const state = createAccountingState(this.calculateIrTokens([...processedNormal, ...processedRetained])); + processedNormal = await processor.process(processedNormal, state); + } + + return [...processedNormal, ...processedRetained]; + } + public getWorkingBufferView(): Episode[] { - const mngConfig = this.config.getContextManagementConfig(); + const mngConfig = this.sidecar; const retainedTokens = mngConfig.budget.retainedTokens; let currentEpisodes: Episode[] = []; @@ -182,7 +278,9 @@ export class ContextManager { const epTokens = this.calculateIrTokens([projectedEp]); + if (ep.variants) { console.log('Checking variants for', ep.id, 'rollingTokens:', rollingTokens, 'retained:', retainedTokens); } if (rollingTokens > retainedTokens && ep.variants) { + console.log('EVALUATING VARIANTS FOR', ep.id); const snapshot = ep.variants['snapshot']; const summary = ep.variants['summary']; const masked = ep.variants['masked']; @@ -254,6 +352,7 @@ export class ContextManager { rollingTokens += this.calculateIrTokens([projectedEp]); } + return currentEpisodes; } @@ -262,56 +361,67 @@ export class ContextManager { * This does NOT mutate the pristine episodic graph. */ async projectCompressedHistory(): Promise { - if (!this.config.isContextManagementEnabled()) { + if (!this.sidecar.budget) { return this._projectAndDump(IrMapper.fromIr(this.pristineEpisodes)); } - const mngConfig = this.config.getContextManagementConfig(); + const mngConfig = this.sidecar; const maxTokens = mngConfig.budget.maxTokens; this.tracer.logEvent('ContextManager', 'Projection requested.'); // Get the dynamically computed Working Buffer View let currentEpisodes = this.getWorkingBufferView(); + + currentEpisodes = await this.applyProcessorGraphs(currentEpisodes); + let currentTokens = this.calculateIrTokens(currentEpisodes); + if (currentTokens <= maxTokens) { this.tracer.logEvent('ContextManager', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`); return this._projectAndDump(IrMapper.fromIr(currentEpisodes)); } - this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.budget.maxPressureStrategy}`); + this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.gcBackstop.strategy}`); // --- The Synchronous Pressure Barrier --- // The background eager workers couldn't keep up, or a massive file was pasted. // The Working Buffer View is still over the absolute hard limit (maxTokens). // We MUST reduce tokens before returning, or the API request will 400. debugLogger.log( - `Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.budget.maxPressureStrategy}`, + `Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.gcBackstop.strategy}`, ); // Calculate target based on gcTarget let targetTokens = maxTokens; - if (mngConfig.budget.gcTarget === 'max') { + + if (mngConfig.gcBackstop.target === 'max') { targetTokens = mngConfig.budget.retainedTokens; - } else if (mngConfig.budget.gcTarget === 'freeNTokens') { - targetTokens = maxTokens - (mngConfig.budget.freeTokensTarget ?? 10000); + } else if (mngConfig.gcBackstop.target === 'freeNTokens') { + targetTokens = maxTokens - (mngConfig.gcBackstop.freeTokensTarget ?? 10000); } // Structural invariant: We ALWAYS protect the architectural initialization turn (Turn 0) // We do NOT arbitrarily protect recent episodes (like currentEpisodes.length - 1) // because an episode can be unboundedly large, and protecting it would crash the LLM. - const protectedEpisodeId = currentEpisodes.length > 0 ? currentEpisodes[0].id : null; + const protectedEpisodeId = this.pristineEpisodes.length > 0 ? this.pristineEpisodes[0].id : null; let remainingTokens = currentTokens; + const truncated: Episode[] = []; - const strategy = mngConfig.budget.maxPressureStrategy; + + const strategy = mngConfig.gcBackstop.strategy; + for (const ep of currentEpisodes) { const epTokens = this.calculateIrTokens([ep]); if (remainingTokens > targetTokens && ep.id !== protectedEpisodeId) { + console.log('DROPPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens); + remainingTokens -= epTokens; if (strategy === 'truncate') { this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`); + debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`); } else if (strategy === 'compress') { this.tracer.logEvent('Barrier', `Compress fallback to truncate for [${ep.id}].`); @@ -321,7 +431,9 @@ export class ContextManager { debugLogger.warn(`Synchronous rollingSummarizer barrier not fully implemented, truncating Episode ${ep.id}.`); } } else { + console.log('KEEPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens); truncated.push(ep); + } } currentEpisodes = truncated; @@ -340,7 +452,7 @@ export class ContextManager { try { const fs = await import('node:fs/promises'); const path = await import('node:path'); - const dumpPath = path.join(this.config.getTargetDir(), '.gemini', 'projected_context.json'); + const dumpPath = path.join(this.env.getTraceDir(), '.gemini', 'projected_context.json'); await fs.mkdir(path.dirname(dumpPath), { recursive: true }); await fs.writeFile(dumpPath, JSON.stringify(contents, null, 2), 'utf-8'); debugLogger.log(`[Observability] Context successfully dumped to ${dumpPath}`); diff --git a/packages/core/src/context/ir/mapper.ts b/packages/core/src/context/ir/mapper.ts index 5e9dd753e1..29adcb5a54 100644 --- a/packages/core/src/context/ir/mapper.ts +++ b/packages/core/src/context/ir/mapper.ts @@ -15,7 +15,7 @@ import type { AgentYield, UserPrompt, } from './types.js'; -import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; +import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js'; // WeakMap to provide stable, deterministic identity across parses for the exact same Content/Part references const nodeIdentityMap = new WeakMap(); @@ -30,6 +30,11 @@ function getStableId(obj: object): string { } export class IrMapper { + static setConfig(cfg: { charsPerToken?: number }) { + this.config = cfg; + } + private static config: { charsPerToken?: number } | undefined; + /** * Translates a flat Gemini Content[] array into our rich Episodic Intermediate Representation. * Groups adjacent function calls and responses into unified ToolExecution nodes. @@ -40,7 +45,7 @@ export class IrMapper { const pendingCallParts: Map = new Map(); const createMetadata = (parts: Part[]): IrMetadata => { - const tokens = estimateTokenCountSync(parts); + const tokens = estimateTokenCountSync(parts, 0, IrMapper.config); return { originalTokens: tokens, currentTokens: tokens, diff --git a/packages/core/src/context/processors/blobDegradationProcessor.test.ts b/packages/core/src/context/processors/blobDegradationProcessor.test.ts index 9d457a591d..1f7facd422 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.test.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.test.ts @@ -3,9 +3,9 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { createMockEnvironment } from '../testing/contextTestUtils.js'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; import { BlobDegradationProcessor } from './blobDegradationProcessor.js'; -import type { Config } from '../../config/config.js'; import type { Episode, UserPrompt } from '../ir/types.js'; import type { ContextAccountingState } from '../pipeline.js'; import { randomUUID } from 'node:crypto'; @@ -14,19 +14,13 @@ import * as fsPromises from 'node:fs/promises'; vi.mock('node:fs/promises'); describe('BlobDegradationProcessor', () => { - let mockConfig: Config; + let processor: BlobDegradationProcessor; beforeEach(() => { vi.resetAllMocks(); - mockConfig = { - storage: { - getProjectTempDir: vi.fn().mockReturnValue('/tmp/gemini'), - }, - getSessionId: vi.fn().mockReturnValue('test-session'), - } as unknown as Config; - - processor = new BlobDegradationProcessor(mockConfig); + + processor = new BlobDegradationProcessor(createMockEnvironment()); }); const getDummyState = ( diff --git a/packages/core/src/context/processors/blobDegradationProcessor.ts b/packages/core/src/context/processors/blobDegradationProcessor.ts index bde8fcc87e..10ee1122be 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.ts @@ -5,8 +5,8 @@ */ import type { Episode } from '../ir/types.js'; import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; -import type { Config } from '../../config/config.js'; -import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; +import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js'; import { sanitizeFilenamePart } from '../../utils/fileUtils.js'; import * as fsPromises from 'node:fs/promises'; import path from 'node:path'; @@ -14,10 +14,10 @@ import type { Part } from '@google/genai'; export class BlobDegradationProcessor implements ContextProcessor { readonly name = 'BlobDegradation'; - private config: Config; + private env: ContextEnvironment; - constructor(config: Config) { - this.config = config; + constructor(env: ContextEnvironment, options: Record = {}) { + this.env = env; } async process( @@ -33,10 +33,10 @@ export class BlobDegradationProcessor implements ContextProcessor { let directoryCreated = false; let blobOutputsDir = path.join( - this.config.storage.getProjectTempDir(), + this.env.getProjectTempDir(), 'degraded-blobs', ); - const sessionId = this.config.getSessionId(); + const sessionId = this.env.getSessionId(); if (sessionId) { blobOutputsDir = path.join( blobOutputsDir, @@ -101,7 +101,7 @@ export class BlobDegradationProcessor implements ContextProcessor { } if (newText && tokensSaved > 0) { - const newTokens = estimateTokenCountSync([{ text: newText }]); + const newTokens = estimateTokenCountSync([{ text: newText }], 0, { charsPerToken: this.env.getCharsPerToken() }); part.presentation = { text: newText, tokens: newTokens }; ep.trigger.metadata.transformations.push({ diff --git a/packages/core/src/context/processors/historySquashingProcessor.test.ts b/packages/core/src/context/processors/historySquashingProcessor.test.ts index 4d155e2534..a334db4340 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.test.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.test.ts @@ -3,9 +3,9 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { createMockEnvironment } from '../testing/contextTestUtils.js'; +import { describe, it, expect, beforeEach } from 'vitest'; import { HistorySquashingProcessor } from './historySquashingProcessor.js'; -import type { Config } from '../../config/config.js'; import type { Episode, UserPrompt, @@ -16,19 +16,12 @@ import type { ContextAccountingState } from '../pipeline.js'; import { randomUUID } from 'node:crypto'; describe('HistorySquashingProcessor', () => { - let mockConfig: Config; + let processor: HistorySquashingProcessor; beforeEach(() => { - mockConfig = { - getContextManagementConfig: vi.fn().mockReturnValue({ - strategies: { - historySquashing: { maxTokensPerNode: 100 }, // Extremely small limit for testing - }, - }), - } as unknown as Config; - - processor = new HistorySquashingProcessor(mockConfig); + + processor = new HistorySquashingProcessor(createMockEnvironment(), { maxTokensPerNode: 100 }); }); const getDummyState = ( diff --git a/packages/core/src/context/processors/historySquashingProcessor.ts b/packages/core/src/context/processors/historySquashingProcessor.ts index 18eee96da3..557f9dc43b 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.ts @@ -6,15 +6,16 @@ import type { Episode } from '../ir/types.js'; import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; -import type { Config } from '../../config/config.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; import { truncateProportionally } from '../truncation.js'; export class HistorySquashingProcessor implements ContextProcessor { readonly name = 'HistorySquashing'; - private config: Config; + private options: { maxTokensPerNode: number }; - constructor(config: Config) { - this.config = config; + constructor(env: ContextEnvironment, options: { maxTokensPerNode: number }) { + + this.options = options; } private tryApplySquash( @@ -54,8 +55,7 @@ export class HistorySquashingProcessor implements ContextProcessor { return episodes; } - const { maxTokensPerNode } = - this.config.getContextManagementConfig().strategies.historySquashing; + const { maxTokensPerNode } = this.options; // We estimate 4 chars per token for truncation logic const limitChars = maxTokensPerNode * 4; diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.test.ts b/packages/core/src/context/processors/semanticCompressionProcessor.test.ts index 59b14b7d74..4922cd6572 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.test.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.test.ts @@ -4,9 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { createMockEnvironment } from '../testing/contextTestUtils.js'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; import { SemanticCompressionProcessor } from './semanticCompressionProcessor.js'; -import type { Config } from '../../config/config.js'; import type { Episode, UserPrompt, @@ -17,7 +17,7 @@ import type { ContextAccountingState } from '../pipeline.js'; import { randomUUID } from 'node:crypto'; describe('SemanticCompressionProcessor', () => { - let mockConfig: Config; + let processor: SemanticCompressionProcessor; let generateContentMock: ReturnType; @@ -26,21 +26,10 @@ describe('SemanticCompressionProcessor', () => { candidates: [{ content: { parts: [{ text: 'Mocked Summary!' }] } }], }); - mockConfig = { - getContextManagementConfig: vi.fn().mockReturnValue({ - strategies: { - semanticCompression: { - nodeThresholdTokens: 10, - compressionModel: 'test-model', - }, - }, // Super small threshold - }), - getBaseLlmClient: vi.fn().mockReturnValue({ - generateContent: generateContentMock, - }), - } as unknown as Config; - - processor = new SemanticCompressionProcessor(mockConfig); + + const env = createMockEnvironment(); + env.getLlmClient = vi.fn().mockReturnValue({ generateContent: generateContentMock }) as any; + processor = new SemanticCompressionProcessor(env, { nodeThresholdTokens: 2000 }); }); const getDummyState = ( diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.ts b/packages/core/src/context/processors/semanticCompressionProcessor.ts index 5762227c31..e3dab9c4bc 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.ts @@ -6,7 +6,7 @@ import type { Episode } from '../ir/types.js'; import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; -import type { Config } from '../../config/config.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; import { debugLogger } from '../../utils/debugLogger.js'; import { LlmRole } from '../../telemetry/types.js'; import { getResponseText } from '../../utils/partUtils.js'; @@ -14,24 +14,26 @@ import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; export class SemanticCompressionProcessor implements ContextProcessor { readonly name = 'SemanticCompression'; - private config: Config; + private env: ContextEnvironment; + private options: { nodeThresholdTokens: number }; private modelToUse: string = 'chat-compression-2.5-flash-lite'; - constructor(config: Config) { - this.config = config; + constructor(env: ContextEnvironment, options: { nodeThresholdTokens: number }) { + this.env = env; + this.options = options; } async process( episodes: Episode[], state: ContextAccountingState, ): Promise { + require('fs').appendFileSync('/tmp/debug2.json', 'SEMANTIC PROCESS: First episode ID: ' + (episodes[0]?.id) + '\nProtected IDs: ' + Array.from(state.protectedEpisodeIds).join(', ') + '\n'); // If the budget is satisfied, or semantic compression isn't enabled if (state.isBudgetSatisfied) { return episodes; } - const semanticConfig = - this.config.getContextManagementConfig().strategies.semanticCompression; + const semanticConfig = this.options; // We estimate 4 chars per token for truncation logic const thresholdChars = semanticConfig.nodeThresholdTokens * 4; this.modelToUse = 'gemini-2.5-flash'; @@ -169,7 +171,7 @@ export class SemanticCompressionProcessor implements ContextProcessor { ): Promise { const promptMessage = `You are compressing an old episodic context buffer for an AI assistant.\nSummarize this ${contentType} block in 2-3 highly technical sentences. Keep all critical facts, file names, dependencies, and architectural decisions. Discard conversational filler and boilerplate.\n\nContent:\n${content.slice(0, 30000)}`; - const client = this.config.getBaseLlmClient(); + const client = this.env.getLlmClient(); try { const response = await client.generateContent({ modelConfigKey: { model: this.modelToUse }, diff --git a/packages/core/src/context/processors/toolMaskingProcessor.test.ts b/packages/core/src/context/processors/toolMaskingProcessor.test.ts index 425fba2ca0..1b6bc23fd5 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.test.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.test.ts @@ -4,9 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { createMockEnvironment } from '../testing/contextTestUtils.js'; +import { describe, it, expect, beforeEach, vi } from 'vitest'; import { ToolMaskingProcessor } from './toolMaskingProcessor.js'; -import type { Config } from '../../config/config.js'; import type { Episode, ToolExecution } from '../ir/types.js'; import type { ContextAccountingState } from '../pipeline.js'; import { randomUUID } from 'node:crypto'; @@ -15,22 +15,13 @@ import * as fsPromises from 'node:fs/promises'; vi.mock('node:fs/promises'); describe('ToolMaskingProcessor', () => { - let mockConfig: Config; + let processor: ToolMaskingProcessor; beforeEach(() => { vi.resetAllMocks(); - mockConfig = { - getContextManagementConfig: vi.fn().mockReturnValue({ - strategies: { - toolMasking: { stringLengthThresholdTokens: 100 }, - }, - }), - storage: { getProjectTempDir: vi.fn().mockReturnValue('/tmp/gemini') }, - getSessionId: vi.fn().mockReturnValue('test-session'), - } as unknown as Config; - - processor = new ToolMaskingProcessor(mockConfig); + + processor = new ToolMaskingProcessor(createMockEnvironment(), { stringLengthThresholdTokens: 100 }); }); const getDummyState = ( @@ -84,6 +75,7 @@ describe('ToolMaskingProcessor', () => { const state = getDummyState(true); const result = await processor.process(episodes, state); + require('fs').appendFileSync('/tmp/debug.json', '\n\n' + JSON.stringify({res: result[0].steps[0]}, null, 2)); expect(result).toStrictEqual(episodes); expect((result[0].steps[0] as ToolExecution).presentation).toBeUndefined(); diff --git a/packages/core/src/context/processors/toolMaskingProcessor.ts b/packages/core/src/context/processors/toolMaskingProcessor.ts index bb002bd7d7..99276f1752 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.ts @@ -5,7 +5,7 @@ */ import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; -import type { Config } from '../../config/config.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; import { sanitizeFilenamePart } from '../../utils/fileUtils.js'; import * as fsPromises from 'node:fs/promises'; @@ -29,18 +29,21 @@ const UNMASKABLE_TOOLS = new Set([ export class ToolMaskingProcessor implements ContextProcessor { readonly name = 'ToolMasking'; - private config: Config; + private env: ContextEnvironment; + private options: { stringLengthThresholdTokens: number }; - constructor(config: Config) { - this.config = config; + constructor(env: ContextEnvironment, options: { stringLengthThresholdTokens: number }) { + this.env = env; + this.options = options; } async process( episodes: Episode[], state: ContextAccountingState, ): Promise { + const maskingConfig = - this.config.getContextManagementConfig().strategies.toolMasking; + this.options; if (!maskingConfig) return episodes; if (state.isBudgetSatisfied) return episodes; @@ -49,10 +52,10 @@ export class ToolMaskingProcessor implements ContextProcessor { const limitChars = maskingConfig.stringLengthThresholdTokens * 4; let toolOutputsDir = path.join( - this.config.storage.getProjectTempDir(), + this.env.getProjectTempDir(), 'tool-outputs', ); - const sessionId = this.config.getSessionId(); + const sessionId = this.env.getSessionId(); if (sessionId) { toolOutputsDir = path.join( toolOutputsDir, @@ -121,6 +124,8 @@ export class ToolMaskingProcessor implements ContextProcessor { nodeType: string, ): Promise<{ masked: any; changed: boolean }> => { if (typeof obj === 'string') { + require('fs').appendFileSync('/tmp/debug.json', 'STRING FOUND. length: ' + obj.length + ' limitChars: ' + limitChars + '\n'); + if (obj.length > 1000) console.log('Found string of length:', obj.length, 'limitChars is:', limitChars, 'isAlreadyMasked:', this.isAlreadyMasked(obj)); if (obj.length > limitChars && !this.isAlreadyMasked(obj)) { const newString = await handleMasking( obj, diff --git a/packages/core/src/context/sidecar/SidecarLoader.ts b/packages/core/src/context/sidecar/SidecarLoader.ts new file mode 100644 index 0000000000..a0fb7012d5 --- /dev/null +++ b/packages/core/src/context/sidecar/SidecarLoader.ts @@ -0,0 +1,59 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { Config } from '../../config/config.js'; +import type { SidecarConfig } from './types.js'; + +export class SidecarLoader { + /** + * Generates a default Sidecar JSON graph from the user's legacy UI profile settings. + */ + static fromLegacyConfig(config: Config): SidecarConfig { + const mngConfig = config.getContextManagementConfig ? config.getContextManagementConfig() : undefined; + const strat: any = mngConfig?.strategies ?? {}; + const budget = mngConfig?.budget ?? { retainedTokens: 65000, maxTokens: 150000, maxPressureStrategy: 'truncate', gcTarget: 'incremental', freeTokensTarget: 10000 }; + + return { + budget: { + retainedTokens: budget.retainedTokens, + maxTokens: budget.maxTokens, + }, + gcBackstop: { + strategy: budget.maxPressureStrategy, + target: budget.gcTarget, + freeTokensTarget: budget.freeTokensTarget, + }, + pipelines: { + eagerBackground: [ + { + processorId: 'StateSnapshotWorker', + options: {}, + } + ], + retainedProcessingGraph: [ + { + processorId: 'HistorySquashingProcessor', + options: { maxTokensPerNode: strat.historySquashing?.maxTokensPerNode ?? 3000 } + } + ], + normalProcessingGraph: [ + { + processorId: 'ToolMaskingProcessor', + options: { stringLengthThresholdTokens: strat.toolMasking?.stringLengthThresholdTokens ?? 8000 } + }, + { + processorId: 'BlobDegradationProcessor', + options: {} + }, + { + processorId: 'SemanticCompressionProcessor', + options: { nodeThresholdTokens: strat.semanticCompression?.nodeThresholdTokens ?? 3000 } + } + ] + } + }; + } +} diff --git a/packages/core/src/context/sidecar/environment.ts b/packages/core/src/context/sidecar/environment.ts new file mode 100644 index 0000000000..a313926538 --- /dev/null +++ b/packages/core/src/context/sidecar/environment.ts @@ -0,0 +1,17 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; +import type { ContextTracer } from '../tracer.js'; + +export interface ContextEnvironment { + getLlmClient(): BaseLlmClient; + getSessionId(): string; + getTraceDir(): string; + getProjectTempDir(): string; + getTracer(): ContextTracer; + getCharsPerToken(): number; +} diff --git a/packages/core/src/context/sidecar/environmentImpl.ts b/packages/core/src/context/sidecar/environmentImpl.ts new file mode 100644 index 0000000000..3e0824a230 --- /dev/null +++ b/packages/core/src/context/sidecar/environmentImpl.ts @@ -0,0 +1,38 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + + +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; +import type { ContextTracer } from '../tracer.js'; +import type { ContextEnvironment } from './environment.js'; + +export class ContextEnvironmentImpl implements ContextEnvironment { + constructor(private llmClient: BaseLlmClient, private sessionId: string, private traceDir: string, private tempDir: string, private tracer: ContextTracer, private charsPerToken: number) {} + + getLlmClient(): BaseLlmClient { + return this.llmClient; + } + + getSessionId(): string { + return this.sessionId; + } + + getTraceDir(): string { + return this.traceDir; + } + + getProjectTempDir(): string { + return this.tempDir; + } + + getTracer(): ContextTracer { + return this.tracer; + } + + getCharsPerToken(): number { + return this.charsPerToken; + } +} diff --git a/packages/core/src/context/sidecar/registry.ts b/packages/core/src/context/sidecar/registry.ts new file mode 100644 index 0000000000..048b79d2cf --- /dev/null +++ b/packages/core/src/context/sidecar/registry.ts @@ -0,0 +1,37 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { ContextProcessor } from '../pipeline.js'; +import type { AsyncContextWorker } from '../workers/asyncContextWorker.js'; +import type { ContextEnvironment } from './environment.js'; + +export interface ContextProcessorDef = any> { + readonly id: string; + create(env: ContextEnvironment, options: TOptions): ContextProcessor | AsyncContextWorker; +} + +/** + * Registry for mapping declarative sidecar configs to running Processor instances. + */ +export class ProcessorRegistry { + private static processors = new Map(); + + static register(def: ContextProcessorDef) { + this.processors.set(def.id, def); + } + + static get(id: string): ContextProcessorDef { + const def = this.processors.get(id); + if (!def) { + throw new Error(`Context Processor [${id}] is not registered.`); + } + return def; + } + + static clear() { + this.processors.clear(); + } +} diff --git a/packages/core/src/context/sidecar/types.ts b/packages/core/src/context/sidecar/types.ts new file mode 100644 index 0000000000..443e311e9a --- /dev/null +++ b/packages/core/src/context/sidecar/types.ts @@ -0,0 +1,55 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Definition of a processor or worker to be instantiated in the graph. + */ +export interface ProcessorConfig { + /** The registered ID of the processor (e.g. 'SemanticCompressionProcessor') */ + processorId: string; + + /** Dynamic, processor-specific hyperparameters */ + options: Record; +} + +/** + * The Data-Driven Schema for the Context Manager. + */ +export interface SidecarConfig { + /** Defines the token ceilings and limits for the pipeline. */ + budget: { + retainedTokens: number; + maxTokens: number; + }; + + /** Defines what happens when the pipeline fails to compress under 'maxTokens' */ + gcBackstop: { + strategy: 'truncate' | 'compress' | 'rollingSummarizer'; + target: 'incremental' | 'freeNTokens' | 'max'; + freeTokensTarget?: number; + }; + + /** The execution graphs for context manipulation */ + pipelines: { + /** + * Eagerly executes in the background when the 'retainedTokens' boundary is crossed. + * Contains AsyncContextWorkers (e.g. StateSnapshotWorker). + */ + eagerBackground: ProcessorConfig[]; + + /** + * Executes sequentially to protect the pristine outliers within the retained window. + * Contains ContextProcessors (e.g. HistorySquashingProcessor). + */ + retainedProcessingGraph: ProcessorConfig[]; + + /** + * Executes sequentially to opportunistically degrade messages older than the retained window. + * Contains ContextProcessors (e.g. ToolMaskingProcessor, SemanticCompressionProcessor). + */ + normalProcessingGraph: ProcessorConfig[]; + }; +} diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index d84c0d3cc1..0f5fdbd8c7 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -6,7 +6,28 @@ import { vi } from 'vitest'; import type { Config } from '../../config/config.js'; -import type { GeminiClient } from '../../core/client.js'; + +import type { ContextEnvironment } from '../sidecar/environment.js'; + +export function createMockEnvironment(): ContextEnvironment { + return { + getLlmClient: vi.fn().mockReturnValue({ + generateContent: vi.fn().mockResolvedValue({ + text: 'Mock LLM summary response', + }), + }) as any, + getSessionId: vi.fn().mockReturnValue('mock-session'), + getTraceDir: vi.fn().mockReturnValue('/tmp/.gemini/trace'), + getProjectTempDir: vi.fn().mockReturnValue('/tmp'), + getTracer: vi.fn().mockReturnValue({ + logEvent: vi.fn(), + saveAsset: vi.fn().mockReturnValue('mock-asset-id'), + }) as any, + getCharsPerToken: vi.fn().mockReturnValue(1), + }; +} + + import type { Content } from '@google/genai'; import { AgentChatHistory } from '../../core/agentChatHistory.js'; import { ContextManager } from '../contextManager.js'; @@ -20,7 +41,7 @@ export function createSyntheticHistory( tokensPerTurn: number, ): Content[] { const history: Content[] = []; - const charsPerTurn = tokensPerTurn * 4; + const charsPerTurn = tokensPerTurn * 1; for (let i = 0; i < numTurns; i++) { history.push({ @@ -45,23 +66,28 @@ export function createMockContextConfig( ): Config { const defaultConfig = { isContextManagementEnabled: vi.fn().mockReturnValue(true), + storage: { + getProjectTempDir: vi.fn().mockReturnValue('/tmp/gemini-test'), + }, getContextManagementConfig: vi.fn().mockReturnValue({ enabled: true, + charsPerToken: 1, strategies: { historySquashing: { maxTokensPerNode: 3000 }, toolMasking: { stringLengthThresholdTokens: 10000 }, - semanticCompression: { - nodeThresholdTokens: 5000, - compressionModel: 'gemini-2.5-flash', - }, - }, - budget: { - maxTokens: 150000, - retainedTokens: 65000, - protectedEpisodes: 1, - protectSystemEpisode: true, - maxPressureStrategy: 'truncate', + semanticCompression: { nodeThresholdTokens: 5000 }, }, + budget: { retainedTokens: 500, maxTokens: 150000, maxPressureStrategy: 'truncate', gcTarget: 'incremental', freeTokensTarget: 1000 }, + gcBackstop: { strategy: 'truncate', target: 'freeNTokens', freeTokensTarget: 100 }, + pipelines: { + eagerBackground: [{ processorId: 'StateSnapshotWorker', options: {} }], + retainedProcessingGraph: [{ processorId: 'HistorySquashingProcessor', options: { maxTokensPerNode: 3000 } }], + normalProcessingGraph: [ + { processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 10000 } }, + { processorId: 'BlobDegradationProcessor', options: {} }, + { processorId: 'SemanticCompressionProcessor', options: { nodeThresholdTokens: 5000 } } + ] + } }), getBaseLlmClient: vi.fn().mockReturnValue( llmClientOverride || { @@ -81,12 +107,16 @@ export function createMockContextConfig( /** * Wires up a full ContextManager component with an AgentChatHistory and active background workers. */ +import { ContextTracer } from '../tracer.js'; +import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js'; +import { SidecarLoader } from '../sidecar/SidecarLoader.js'; + export function setupContextComponentTest(config: Config) { const chatHistory = new AgentChatHistory(); - const contextManager = new ContextManager( - config, - config.getBaseLlmClient() as unknown as GeminiClient, - ); + const sidecar = SidecarLoader.fromLegacyConfig(config); + const tracer = new ContextTracer('/tmp', 'test-session'); + const env = new ContextEnvironmentImpl(config.getBaseLlmClient() as any, 'test-session', '/tmp', '/tmp/gemini-test', tracer, 1); + const contextManager = new ContextManager(sidecar, env, tracer); // The async worker is now internally managed by ContextManager diff --git a/packages/core/src/context/types.ts b/packages/core/src/context/types.ts index 33206cff12..9e4ad91cb9 100644 --- a/packages/core/src/context/types.ts +++ b/packages/core/src/context/types.ts @@ -6,6 +6,7 @@ export interface ContextManagementConfig { enabled: boolean; + charsPerToken?: number; /** The global orchestration budget */ budget: { @@ -13,10 +14,7 @@ export interface ContextManagementConfig { maxTokens: number; /** The target token count to aggressively drop to using asynchronous "Ship of Theseus" background GC */ retainedTokens: number; - /** The number of recent Episodes to always protect from degradation (default: 1) */ - protectedEpisodes: number; - /** Should we protect Episode 0 (the System Prompt/Architectural Initialization)? */ - protectSystemEpisode: boolean; + /** * The strategy to use when maxTokens is exceeded. @@ -41,8 +39,7 @@ export interface ContextManagementConfig { semanticCompression: { /** The threshold (in tokens) at which a text node is sent to the LLM for summarization */ nodeThresholdTokens: number; - /** The model to use for generating the semantic summary */ - compressionModel: string; + }; }; } diff --git a/packages/core/src/context/utils/contextTokenCalculator.ts b/packages/core/src/context/utils/contextTokenCalculator.ts new file mode 100644 index 0000000000..ba583a952d --- /dev/null +++ b/packages/core/src/context/utils/contextTokenCalculator.ts @@ -0,0 +1,29 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { Part } from '@google/genai'; +import { estimateTokenCountSync as baseEstimate } from '../../utils/tokenCalculation.js'; + +export function estimateContextTokenCountSync( + parts: Part[], + depth: number = 0, + config?: { charsPerToken?: number } +): number { + if (config?.charsPerToken !== undefined && config.charsPerToken !== 4) { + let totalTokens = 0; + for (const part of parts) { + if (typeof part.text === 'string') { + totalTokens += Math.ceil(part.text.length / config.charsPerToken); + } else { + totalTokens += Math.ceil(JSON.stringify(part).length / config.charsPerToken); + } + } + return totalTokens; + } + + // The baseEstimate no longer accepts config because we forked it! + return baseEstimate(parts, depth); +} diff --git a/packages/core/src/context/workers/stateSnapshotWorker.ts b/packages/core/src/context/workers/stateSnapshotWorker.ts index 4f8b4088bd..9eed7aa85e 100644 --- a/packages/core/src/context/workers/stateSnapshotWorker.ts +++ b/packages/core/src/context/workers/stateSnapshotWorker.ts @@ -5,7 +5,7 @@ */ import { randomUUID } from 'node:crypto'; -import type { Config } from '../../config/config.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; import type { Episode, SnapshotVariant } from '../ir/types.js'; import type { AsyncContextWorker } from './asyncContextWorker.js'; import type { @@ -13,7 +13,7 @@ import type { ContextConsolidationEvent, } from '../eventBus.js'; import { debugLogger } from '../../utils/debugLogger.js'; -import { estimateTokenCountSync } from '../../utils/tokenCalculation.js'; +import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js'; import { IrMapper } from '../ir/mapper.js'; import { LlmRole } from '../../telemetry/llmRole.js'; import type { ContextTracer } from '../tracer.js'; @@ -24,9 +24,10 @@ export class StateSnapshotWorker implements AsyncContextWorker { private tracer?: ContextTracer; private isSynthesizing = false; - constructor(private readonly _config: Config) {} + constructor(private readonly env: ContextEnvironment) {} start(bus: ContextEventBus, tracer?: ContextTracer): void { + console.log('Worker start() called with bus:', !!bus); this.bus = bus; this.tracer = tracer; this.bus.onConsolidationNeeded(this.handleConsolidation.bind(this)); @@ -42,6 +43,7 @@ export class StateSnapshotWorker implements AsyncContextWorker { private async handleConsolidation( event: ContextConsolidationEvent, ): Promise { + console.log(`Worker handling consolidation. targetDeficit: ${event.targetDeficit}, isSynthesizing: ${this.isSynthesizing}`); if (this.isSynthesizing || event.targetDeficit <= 0) return; // Identify the "dying" block of episodes that need to be collected. @@ -51,13 +53,17 @@ export class StateSnapshotWorker implements AsyncContextWorker { (ep) => !ep.variants?.['snapshot'], ); - if (unprotectedOldest.length === 0) return; + if (unprotectedOldest.length === 0) { + + return; + } let targetDeficit = event.targetDeficit; const episodesToSynthesize: Episode[] = []; let tokensToSynthesize = 0; for (const ep of unprotectedOldest) { + console.log('Worker considering episode:', ep.id); if (tokensToSynthesize >= targetDeficit) break; episodesToSynthesize.push(ep); // Rough estimate of tokens in this episode @@ -71,7 +77,9 @@ export class StateSnapshotWorker implements AsyncContextWorker { if (episodesToSynthesize.length === 0) return; + console.log(`Worker synthesized logic loop complete. Selected ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`); this.isSynthesizing = true; + try { debugLogger.log( @@ -79,7 +87,7 @@ export class StateSnapshotWorker implements AsyncContextWorker { ); this.tracer?.logEvent('StateSnapshotWorker', `Consolidation requested. Synthesizing ${episodesToSynthesize.length} episodes for ~${tokensToSynthesize} tokens.`); - const client = this._config.getBaseLlmClient(); + const client = this.env.getLlmClient(); const rawContents = IrMapper.fromIr(episodesToSynthesize); const rawAssetId = this.tracer?.saveAsset('StateSnapshotWorker', 'episodes_to_synthesize', rawContents); this.tracer?.logEvent('StateSnapshotWorker', 'Dispatching LLM request for snapshot generation', { rawAssetId }); @@ -118,7 +126,7 @@ ${snapshotText || '[Failed to generate snapshot]'} const snapshotTokens = estimateTokenCountSync([ { text: mockSnapshotText }, - ]); + ], 0, { charsPerToken: this.env.getCharsPerToken() }); const replacedEpisodeIds = episodesToSynthesize.map((e) => e.id); @@ -173,6 +181,7 @@ ${snapshotText || '[Failed to generate snapshot]'} if (this.bus) { this.tracer?.logEvent('StateSnapshotWorker', `Emitting VARIANT_READY for targetId [${targetId}]`); + this.bus.emitVariantReady({ targetId, variantId: 'snapshot', diff --git a/packages/core/src/core/client.ts b/packages/core/src/core/client.ts index af9f85d1a3..2f36e424bb 100644 --- a/packages/core/src/core/client.ts +++ b/packages/core/src/core/client.ts @@ -45,6 +45,9 @@ import type { ContentGenerator } from './contentGenerator.js'; import { LoopDetectionService } from '../services/loopDetectionService.js'; import { ChatCompressionService } from '../context/chatCompressionService.js'; import { ContextManager } from '../context/contextManager.js'; +import { SidecarLoader } from '../context/sidecar/SidecarLoader.js'; +import { ContextEnvironmentImpl } from '../context/sidecar/environmentImpl.js'; +import { ContextTracer } from '../context/tracer.js'; import { ideContextStore } from '../ide/ideContext.js'; import { logContentRetryFailure, @@ -113,7 +116,10 @@ export class GeminiClient { this.loopDetector = new LoopDetectionService(this.config); this.compressionService = new ChatCompressionService(); - this.contextManager = new ContextManager(this.config, this); + const sidecar = SidecarLoader.fromLegacyConfig(this.config); + const tracer = new ContextTracer(typeof this.config.getTargetDir === 'function' ? this.config.getTargetDir() : '/tmp', typeof this.config.getSessionId === 'function' ? this.config.getSessionId() : 'test'); + const env = new ContextEnvironmentImpl(this as any, typeof this.config.getSessionId === 'function' ? this.config.getSessionId() : 'test', typeof this.config.getTargetDir === 'function' ? this.config.getTargetDir() : '/tmp', this.config.storage?.getProjectTempDir ? this.config.storage.getProjectTempDir() : '/tmp', tracer, this.config.getContextManagementConfig && this.config.getContextManagementConfig() ? this.config.getContextManagementConfig().charsPerToken ?? 4 : 4); + this.contextManager = new ContextManager(sidecar, env, tracer); this.toolOutputMaskingService = new ToolOutputMaskingService(); this.lastPromptId = this.config.getSessionId();