diff --git a/packages/core/src/context/contextManager.async.test.ts b/packages/core/src/context/contextManager.async.test.ts index 3fb824a7c0..b07512b914 100644 --- a/packages/core/src/context/contextManager.async.test.ts +++ b/packages/core/src/context/contextManager.async.test.ts @@ -1,4 +1,3 @@ - /** * @license * Copyright 2026 Google LLC @@ -17,7 +16,6 @@ describe('ContextManager Barrier Tests', () => { const { chatHistory, contextManager } = setupContextComponentTest(config); // 1. Shrink limits: 1 char = 1 token. RetainedTokens = 10. MaxTokens = 100. - contextManager['sidecar'].budget.retainedTokens = 5; contextManager['sidecar'].budget.maxTokens = 100; @@ -89,7 +87,7 @@ describe('ContextManager Barrier Tests', () => { const { chatHistory, contextManager } = setupContextComponentTest(config); // 1. Shrink limits: maxTokens = 15. - + contextManager['sidecar'].budget.maxTokens = 15; // 2. Build history: 2 turns. Total = 24 tokens. diff --git a/packages/core/src/context/contextManager.barrier.test.ts b/packages/core/src/context/contextManager.barrier.test.ts index 5d8579fe1d..41cc54015e 100644 --- a/packages/core/src/context/contextManager.barrier.test.ts +++ b/packages/core/src/context/contextManager.barrier.test.ts @@ -46,7 +46,6 @@ describe('ContextManager Sync Pressure Barrier Tests', () => { ]); const rawHistoryLength = chatHistory.get().length; - // 5. Project History (Triggers Sync Barrier) const projection = await contextManager.projectCompressedHistory(); diff --git a/packages/core/src/context/contextManager.golden.test.ts b/packages/core/src/context/contextManager.golden.test.ts index 3694b6ab69..b7791d4f9b 100644 --- a/packages/core/src/context/contextManager.golden.test.ts +++ b/packages/core/src/context/contextManager.golden.test.ts @@ -23,9 +23,9 @@ import type { Content } from '@google/genai'; import type { BaseLlmClient } from '../core/baseLlmClient.js'; import type { Episode } from './ir/types.js'; import type { SidecarConfig } from './sidecar/types.js'; -import { ProcessorRegistry } from "./sidecar/registry.js"; -import { registerBuiltInProcessors } from "./sidecar/builtins.js"; - +import { ProcessorRegistry } from './sidecar/registry.js'; +import { registerBuiltInProcessors } from './sidecar/builtins.js'; +import { IrMapper } from './ir/mapper.js'; expect.addSnapshotSerializer({ test: (val) => @@ -79,19 +79,31 @@ describe('ContextManager Golden Tests', () => { registerBuiltInProcessors(registry); const sidecar = SidecarLoader.fromConfig(mockConfig, registry); - const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'test-session' }); + const tracer = new ContextTracer({ + targetDir: '/tmp', + sessionId: 'test-session', + }); const eventBus = new ContextEventBus(); const env = new ContextEnvironmentImpl( - { generateContent: async () => ({}), generateJson: async () => ({}) } as unknown as BaseLlmClient, + { + generateContent: async () => ({}), + generateJson: async () => ({}), + } as unknown as BaseLlmClient, 'test-prompt-id', 'test', '/tmp', '/tmp', tracer, 4, - eventBus + eventBus, + ); + contextManager = ContextManager.create( + sidecar, + env, + tracer, + undefined, + registry, ); - contextManager = ContextManager.create(sidecar, env, tracer, undefined, registry); }); const createLargeHistory = (): Content[] => [ @@ -126,31 +138,37 @@ describe('ContextManager Golden Tests', () => { it('should process history and match golden snapshot', async () => { const history = createLargeHistory(); - (contextManager as unknown as { pristineEpisodes: Episode[] }).pristineEpisodes = ( - await import('./ir/mapper.js') - ).IrMapper.toIr(history, new ContextTokenCalculator(4)); + ( + contextManager as unknown as { pristineEpisodes: Episode[] } + ).pristineEpisodes = IrMapper.toIr(history, new ContextTokenCalculator(4)); const result = await contextManager.projectCompressedHistory(); expect(result).toMatchSnapshot(); }); it('should not modify history when under budget', async () => { const history = createLargeHistory(); - (contextManager as unknown as { pristineEpisodes: Episode[] }).pristineEpisodes = ( - await import('./ir/mapper.js') - ).IrMapper.toIr(history, new ContextTokenCalculator(4)); + ( + contextManager as unknown as { pristineEpisodes: Episode[] } + ).pristineEpisodes = IrMapper.toIr(history, new ContextTokenCalculator(4)); // In Golden Tests, we just want to ensure the logic doesn't throw or alter unprotected history in weird ways. // Since we're skipping processors due to being under budget, it should equal history. - const tracer2 = new ContextTracer({ targetDir: '/tmp', sessionId: 'test2' }); + const tracer2 = new ContextTracer({ + targetDir: '/tmp', + sessionId: 'test2', + }); const eventBus2 = new ContextEventBus(); const env2 = new ContextEnvironmentImpl( - { generateContent: async () => ({}), generateJson: async () => ({}) } as unknown as BaseLlmClient, + { + generateContent: async () => ({}), + generateJson: async () => ({}), + } as unknown as BaseLlmClient, 'test-prompt-id', 'test', '/tmp', '/tmp', tracer2, 4, - eventBus2 + eventBus2, ); contextManager = ContextManager.create( { @@ -161,9 +179,9 @@ describe('ContextManager Golden Tests', () => { tracer2, ); - (contextManager as unknown as { pristineEpisodes: Episode[] }).pristineEpisodes = ( - await import('./ir/mapper.js') - ).IrMapper.toIr(history, new ContextTokenCalculator(4)); + ( + contextManager as unknown as { pristineEpisodes: Episode[] } + ).pristineEpisodes = IrMapper.toIr(history, new ContextTokenCalculator(4)); const result = await contextManager.projectCompressedHistory(); expect(result.length).toEqual(history.length); diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 6d1e28ff2f..7a1078a15b 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -3,6 +3,7 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + import type { Content } from '@google/genai'; import type { AgentChatHistory } from '../core/agentChatHistory.js'; import { debugLogger } from '../utils/debugLogger.js'; @@ -19,33 +20,39 @@ import { registerBuiltInProcessors } from './sidecar/builtins.js'; import { ProcessorRegistry } from './sidecar/registry.js'; export class ContextManager { - - // The stateful, pristine Episodic Intermediate Representation graph. // This allows the agent to remember and summarize continuously without losing data across turns. private pristineEpisodes: Episode[] = []; private readonly eventBus: ContextEventBus; - + // Internal sub-components // Synchronous processors are instantiated but effectively used as singletons within this class private orchestrator: PipelineOrchestrator; private historyObserver?: HistoryObserver; - static create(sidecar: SidecarConfig, env: ContextEnvironment, tracer: ContextTracer, orchestrator?: PipelineOrchestrator, registry?: ProcessorRegistry): ContextManager { - if (!registry) { - registry = new ProcessorRegistry(); - registerBuiltInProcessors(registry); - } - const orch = orchestrator || new PipelineOrchestrator(sidecar, env, env.eventBus, tracer, registry); - return new ContextManager(sidecar, env, tracer, orch); + static create( + sidecar: SidecarConfig, + env: ContextEnvironment, + tracer: ContextTracer, + orchestrator?: PipelineOrchestrator, + registry?: ProcessorRegistry, + ): ContextManager { + if (!registry) { + registry = new ProcessorRegistry(); + registerBuiltInProcessors(registry); + } + const orch = + orchestrator || + new PipelineOrchestrator(sidecar, env, env.eventBus, tracer, registry); + return new ContextManager(sidecar, env, tracer, orch); } // Use ContextManager.create() instead private constructor( - private sidecar: SidecarConfig, - private env: ContextEnvironment, - private readonly tracer: ContextTracer, - orchestrator: PipelineOrchestrator + private sidecar: SidecarConfig, + private env: ContextEnvironment, + private readonly tracer: ContextTracer, + orchestrator: PipelineOrchestrator, ) { this.eventBus = env.eventBus; this.orchestrator = orchestrator; @@ -56,7 +63,6 @@ export class ContextManager { }); this.eventBus.onVariantReady((event) => { - // Find the target episode in the pristine graph const targetEp = this.pristineEpisodes.find( (ep) => ep.id === event.targetId, @@ -66,7 +72,10 @@ export class ContextManager { targetEp.variants = {}; } targetEp.variants[event.variantId] = event.variant; - this.tracer.logEvent('ContextManager', `Received async variant [${event.variantId}] for Episode ${event.targetId}`); + this.tracer.logEvent( + 'ContextManager', + `Received async variant [${event.variantId}] for Episode ${event.targetId}`, + ); debugLogger.log( `ContextManager: Received async variant [${event.variantId}] for Episode ${event.targetId}.`, ); @@ -92,9 +101,13 @@ export class ContextManager { if (!this.sidecar.budget) return; const workingBuffer = this.getWorkingBufferView(); - const currentTokens = this.env.tokenCalculator.calculateEpisodeListTokens(workingBuffer); - - this.tracer.logEvent('ContextManager', 'Evaluated triggers', { currentTokens, retainedTokens: this.sidecar.budget.retainedTokens }); + const currentTokens = + this.env.tokenCalculator.calculateEpisodeListTokens(workingBuffer); + + this.tracer.logEvent('ContextManager', 'Evaluated triggers', { + currentTokens, + retainedTokens: this.sidecar.budget.retainedTokens, + }); // 1. Eager Compute Trigger this.eventBus.emitChunkReceived({ episodes: this.pristineEpisodes }); @@ -102,9 +115,13 @@ export class ContextManager { // 2. Budget Crossed Trigger if (currentTokens > this.sidecar.budget.retainedTokens) { const deficit = currentTokens - this.sidecar.budget.retainedTokens; - this.tracer.logEvent('ContextManager', 'Budget crossed. Emitting ConsolidationNeeded', { deficit }); + this.tracer.logEvent( + 'ContextManager', + 'Budget crossed. Emitting ConsolidationNeeded', + { deficit }, + ); this.eventBus.emitConsolidationNeeded({ - episodes: workingBuffer, + episodes: workingBuffer, targetDeficit: deficit, }); } @@ -131,7 +148,7 @@ export class ContextManager { /** * Generates a computed view of the pristine log. * Sweeps backwards (newest to oldest), tracking rolling tokens. - * When rollingTokens > retainedTokens, it injects the "best" available ready variant + * When rollingTokens > retainedTokens, it injects the "best" available ready variant * (snapshot > summary > masked) instead of the raw text. * Handles N-to-1 variant skipping automatically. */ @@ -140,7 +157,7 @@ export class ContextManager { this.pristineEpisodes, this.sidecar.budget.retainedTokens, this.tracer, - this.env + this.env, ); } @@ -154,14 +171,14 @@ export class ContextManager { if (this.pristineEpisodes.length > 0) { protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant } - + return IrProjector.project( this.getWorkingBufferView(), this.orchestrator, this.sidecar, this.tracer, this.env, - protectedIds + protectedIds, ); } } diff --git a/packages/core/src/context/eventBus.ts b/packages/core/src/context/eventBus.ts index 2fed823c4c..d7e8a9e0c5 100644 --- a/packages/core/src/context/eventBus.ts +++ b/packages/core/src/context/eventBus.ts @@ -31,7 +31,9 @@ export class ContextEventBus extends EventEmitter { this.emit('PRISTINE_HISTORY_UPDATED', event); } - onPristineHistoryUpdated(listener: (event: PristineHistoryUpdatedEvent) => void) { + onPristineHistoryUpdated( + listener: (event: PristineHistoryUpdatedEvent) => void, + ) { this.on('PRISTINE_HISTORY_UPDATED', listener); } diff --git a/packages/core/src/context/historyObserver.ts b/packages/core/src/context/historyObserver.ts index e6132d3873..012a1f2e27 100644 --- a/packages/core/src/context/historyObserver.ts +++ b/packages/core/src/context/historyObserver.ts @@ -4,7 +4,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { AgentChatHistory, HistoryEvent } from '../core/agentChatHistory.js'; +import type { + AgentChatHistory, + HistoryEvent, +} from '../core/agentChatHistory.js'; import { IrMapper } from './ir/mapper.js'; import type { ContextTokenCalculator } from './utils/contextTokenCalculator.js'; import type { ContextEventBus } from './eventBus.js'; @@ -30,13 +33,24 @@ export class HistoryObserver { this.unsubscribeHistory(); } - this.unsubscribeHistory = this.chatHistory.subscribe((_event: HistoryEvent) => { - // Rebuild the pristine IR graph from the full source history on every change. - const pristineEpisodes = IrMapper.toIr(this.chatHistory.get(), this.tokenCalculator); - this.tracer.logEvent('HistoryObserver', 'Rebuilt pristine graph from chat history update', { episodeCount: pristineEpisodes.length }); - - this.eventBus.emitPristineHistoryUpdated({ episodes: pristineEpisodes }); - }); + this.unsubscribeHistory = this.chatHistory.subscribe( + (_event: HistoryEvent) => { + // Rebuild the pristine IR graph from the full source history on every change. + const pristineEpisodes = IrMapper.toIr( + this.chatHistory.get(), + this.tokenCalculator, + ); + this.tracer.logEvent( + 'HistoryObserver', + 'Rebuilt pristine graph from chat history update', + { episodeCount: pristineEpisodes.length }, + ); + + this.eventBus.emitPristineHistoryUpdated({ + episodes: pristineEpisodes, + }); + }, + ); } stop() { diff --git a/packages/core/src/context/ir/episodeEditor.ts b/packages/core/src/context/ir/episodeEditor.ts index 6597367212..8e71d55df4 100644 --- a/packages/core/src/context/ir/episodeEditor.ts +++ b/packages/core/src/context/ir/episodeEditor.ts @@ -19,21 +19,21 @@ export class EpisodeEditor { private workingOrder: string[]; private workingMap: Map; private mutations: MutationRecord[] = []; - + constructor(episodes: Episode[]) { - this.originalMap = new Map(episodes.map(e => [e.id, e])); - this.workingOrder = episodes.map(e => e.id); - this.workingMap = new Map(episodes.map(e => [e.id, e])); + this.originalMap = new Map(episodes.map((e) => [e.id, e])); + this.workingOrder = episodes.map((e) => e.id); + this.workingMap = new Map(episodes.map((e) => [e.id, e])); } - + /** * Provides a readonly view of the current working state of the episodes. * Processors should iterate over this to decide what to mutate. */ get episodes(): readonly Episode[] { - return this.workingOrder.map(id => this.workingMap.get(id)!); + return this.workingOrder.map((id) => this.workingMap.get(id)!); } - + /** * Safely edits an existing episode. * The framework will handle deeply cloning the episode before passing it to the mutator, @@ -42,19 +42,24 @@ export class EpisodeEditor { editEpisode(id: string, action: string, mutator: (draft: Episode) => void) { const ep = this.workingMap.get(id); if (!ep) return; - + // Lazy deep clone only if it's the original reference if (ep === this.originalMap.get(id)) { - const clone = structuredClone(ep); - this.workingMap.set(id, clone); + const clone = structuredClone(ep); + this.workingMap.set(id, clone); } - + const draft = this.workingMap.get(id)!; mutator(draft); - + // Log mutation if not already tracked as modified/inserted/replaced - if (!this.mutations.find(m => m.episodeId === id)) { - this.mutations.push({ episodeId: id, type: 'modified', action, episode: draft }); + if (!this.mutations.find((m) => m.episodeId === id)) { + this.mutations.push({ + episodeId: id, + type: 'modified', + action, + episode: draft, + }); } } @@ -62,49 +67,56 @@ export class EpisodeEditor { * Inserts a brand new episode into the graph at the specified index. */ insertEpisode(index: number, newEpisode: Episode, action: string) { - this.workingMap.set(newEpisode.id, newEpisode); - this.workingOrder.splice(index, 0, newEpisode.id); - this.mutations.push({ episodeId: newEpisode.id, type: 'inserted', action, episode: newEpisode }); + this.workingMap.set(newEpisode.id, newEpisode); + this.workingOrder.splice(index, 0, newEpisode.id); + this.mutations.push({ + episodeId: newEpisode.id, + type: 'inserted', + action, + episode: newEpisode, + }); } - + /** * Replaces a set of older episodes with a single new episode (e.g., a Summary or Snapshot). * It inserts the new episode at the lowest index of the removed episodes. */ replaceEpisodes(oldIds: string[], newEpisode: Episode, action: string) { - const indices = oldIds.map(id => this.workingOrder.indexOf(id)).filter(i => i !== -1); - if (indices.length === 0) return; - - const insertIndex = Math.min(...indices); - - // Remove old - this.workingOrder = this.workingOrder.filter(id => !oldIds.includes(id)); - for (const id of oldIds) { - this.workingMap.delete(id); - } - - // Insert new - this.workingOrder.splice(insertIndex, 0, newEpisode.id); - this.workingMap.set(newEpisode.id, newEpisode); - - this.mutations.push({ - episodeId: newEpisode.id, - type: 'replaced', - action, - originalIds: oldIds, - episode: newEpisode - }); + const indices = oldIds + .map((id) => this.workingOrder.indexOf(id)) + .filter((i) => i !== -1); + if (indices.length === 0) return; + + const insertIndex = Math.min(...indices); + + // Remove old + this.workingOrder = this.workingOrder.filter((id) => !oldIds.includes(id)); + for (const id of oldIds) { + this.workingMap.delete(id); + } + + // Insert new + this.workingOrder.splice(insertIndex, 0, newEpisode.id); + this.workingMap.set(newEpisode.id, newEpisode); + + this.mutations.push({ + episodeId: newEpisode.id, + type: 'replaced', + action, + originalIds: oldIds, + episode: newEpisode, + }); } /** * Removes episodes from the graph completely (e.g., emergency truncation). */ removeEpisodes(oldIds: string[], action: string) { - this.workingOrder = this.workingOrder.filter(id => !oldIds.includes(id)); - for (const id of oldIds) { - this.workingMap.delete(id); - this.mutations.push({ episodeId: id, type: 'deleted', action }); - } + this.workingOrder = this.workingOrder.filter((id) => !oldIds.includes(id)); + for (const id of oldIds) { + this.workingMap.delete(id); + this.mutations.push({ episodeId: id, type: 'deleted', action }); + } } /** @@ -112,9 +124,9 @@ export class EpisodeEditor { * Called by the Orchestrator. */ getFinalEpisodes(): Episode[] { - return this.workingOrder.map(id => this.workingMap.get(id)!); + return this.workingOrder.map((id) => this.workingMap.get(id)!); } - + /** * Retrieves a log of all structural and property mutations performed by this editor. * Called by the Orchestrator to emit VariantReady events. diff --git a/packages/core/src/context/ir/fromIr.ts b/packages/core/src/context/ir/fromIr.ts index f745b00cfa..b1d2be18b5 100644 --- a/packages/core/src/context/ir/fromIr.ts +++ b/packages/core/src/context/ir/fromIr.ts @@ -43,8 +43,7 @@ function serializeTrigger(trigger: UserPrompt): Content | null { fileData: { mimeType: sp.mimeType, fileUri: sp.fileUri }, }); } else if (sp.type === 'raw_part') { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion, @typescript-eslint/no-unsafe-type-assertion - parts.push(sp.part as unknown as Part); + parts.push(sp.part); } } return parts.length > 0 ? { role: 'user', parts } : null; @@ -76,7 +75,7 @@ function serializeSteps(steps: EpisodeStep[]): Content[] { pendingModelParts.push({ functionCall: { name: step.toolName, - args: step.intent as unknown as Record, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + args: step.intent, id: step.id, }, }); @@ -86,7 +85,10 @@ function serializeSteps(steps: EpisodeStep[]): Content[] { pendingUserParts.push({ functionResponse: { name: step.toolName, - response: observation as unknown as Record, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + response: + typeof observation === 'string' + ? { message: observation } + : observation, id: step.id, }, }); diff --git a/packages/core/src/context/ir/graphUtils.test.ts b/packages/core/src/context/ir/graphUtils.test.ts index eca87a0d69..9ba63db80f 100644 --- a/packages/core/src/context/ir/graphUtils.test.ts +++ b/packages/core/src/context/ir/graphUtils.test.ts @@ -6,7 +6,10 @@ import { describe, it, expect, beforeEach, vi } from 'vitest'; import { generateWorkingBufferView } from './graphUtils.js'; -import { createMockEnvironment, createDummyEpisode } from '../testing/contextTestUtils.js'; +import { + createMockEnvironment, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import type { AgentThought, UserPrompt } from './types.js'; @@ -17,7 +20,15 @@ describe('graphUtils (View Generator)', () => { vi.resetAllMocks(); env = createMockEnvironment(); // Our token mock is 1 char = 1 token for simplicity - vi.spyOn(env.tokenCalculator, 'calculateEpisodeListTokens').mockImplementation((eps) => eps.reduce((acc, ep) => acc + (ep.trigger.metadata.originalTokens || 100), 0)); + vi.spyOn( + env.tokenCalculator, + 'calculateEpisodeListTokens', + ).mockImplementation((eps) => + eps.reduce( + (acc, ep) => acc + (ep.trigger.metadata.originalTokens || 100), + 0, + ), + ); }); it('returns pristine episodes untouched if under budget', () => { @@ -25,10 +36,10 @@ describe('graphUtils (View Generator)', () => { createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: '1' }]), createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: '2' }]), ]; - + // We retain 5000 tokens. Total mock tokens = 200. const view = generateWorkingBufferView(episodes, 5000, env.tracer, env); - + expect(view).toHaveLength(2); // Must be a deep copy! The view generator clones episodes. expect(view).not.toBe(episodes); @@ -37,38 +48,58 @@ describe('graphUtils (View Generator)', () => { }); it('swaps to Masked variant when over budget (rolling backwards)', () => { - const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [{ text: '1', type: 'text' }]); - const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [{ text: '2', type: 'text' }]); - + const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [ + { text: '1', type: 'text' }, + ]); + const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [ + { text: '2', type: 'text' }, + ]); + ep1.variants = { - 'masked': { type: 'masked', status: 'ready', text: '', recoveredTokens: 10 } + masked: { + type: 'masked', + status: 'ready', + text: '', + recoveredTokens: 10, + }, }; - + // We only retain 100 tokens. // ep-2 (newest) takes 100 tokens. // Now rolling = 100. Over budget! // ep-1 is evaluated, and swapped for Masked. const view = generateWorkingBufferView([ep1, ep2], 10, env.tracer, env); - + expect(view).toHaveLength(2); expect(view[1].id).toBe('ep-2'); // Unchanged (newest) - + expect(view[0].id).toBe('ep-1'); - expect((view[0].trigger as UserPrompt).semanticParts[0].presentation?.text).toBe(''); + expect( + (view[0].trigger as UserPrompt).semanticParts[0].presentation?.text, + ).toBe(''); }); it('swaps to Summary variant when over budget', () => { - const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: '1' }]); - const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: '2' }]); - + const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: '1' }, + ]); + const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [ + { type: 'text', text: '2' }, + ]); + ep1.variants = { - 'summary': { type: 'summary', status: 'ready', text: '', recoveredTokens: 50 } + summary: { + type: 'summary', + status: 'ready', + text: '', + recoveredTokens: 50, + }, }; - + const view = generateWorkingBufferView([ep1, ep2], 10, env.tracer, env); - + expect(view).toHaveLength(2); - + // The summary completely replaces the internal steps and clears the yield. expect(view[0].steps).toHaveLength(1); expect(view[0].steps[0].type).toBe('AGENT_THOUGHT'); @@ -77,26 +108,39 @@ describe('graphUtils (View Generator)', () => { }); it('handles complex N-to-1 Snapshot skipping gracefully', () => { - const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: '1' }]); - const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: '2' }]); - const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', [{ type: 'text', text: '3' }]); - const ep4 = createDummyEpisode('ep-4', 'USER_PROMPT', [{ type: 'text', text: '4' }]); - + const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: '1' }, + ]); + const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [ + { type: 'text', text: '2' }, + ]); + const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', [ + { type: 'text', text: '3' }, + ]); + const ep4 = createDummyEpisode('ep-4', 'USER_PROMPT', [ + { type: 'text', text: '4' }, + ]); + // ep-3 has a snapshot that replaces [ep-1, ep-2, ep-3] const snapshotEp = createDummyEpisode('snap-1', 'SYSTEM_EVENT', []); - + ep3.variants = { - 'snapshot': { - type: 'snapshot', - status: 'ready', - episode: snapshotEp, - replacedEpisodeIds: ['ep-1', 'ep-2', 'ep-3'] - } + snapshot: { + type: 'snapshot', + status: 'ready', + episode: snapshotEp, + replacedEpisodeIds: ['ep-1', 'ep-2', 'ep-3'], + }, }; - + // We only retain 5 tokens, forcing the sweep to use variants for EVERYTHING except ep4. - const view = generateWorkingBufferView([ep1, ep2, ep3, ep4], 5, env.tracer, env); - + const view = generateWorkingBufferView( + [ep1, ep2, ep3, ep4], + 5, + env.tracer, + env, + ); + // Result should be exactly: [snapshot, ep-4] expect(view).toHaveLength(2); expect(view[0].id).toBe('snap-1'); @@ -104,17 +148,28 @@ describe('graphUtils (View Generator)', () => { }); it('ignores variants that are not yet "ready"', () => { - const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: '1' }]); - const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: '2' }]); - + const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: '1' }, + ]); + const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [ + { type: 'text', text: '2' }, + ]); + ep1.variants = { - 'masked': { type: 'masked', status: 'computing', text: '', recoveredTokens: 10 } + masked: { + type: 'masked', + status: 'computing', + text: '', + recoveredTokens: 10, + }, }; - + const view = generateWorkingBufferView([ep1, ep2], 10, env.tracer, env); - + // Because the variant was computing, it must fall back to the raw pristine text. expect(view).toHaveLength(2); - expect((view[0].trigger as UserPrompt).semanticParts[0].presentation).toBeUndefined(); + expect( + (view[0].trigger as UserPrompt).semanticParts[0].presentation, + ).toBeUndefined(); }); }); diff --git a/packages/core/src/context/ir/graphUtils.ts b/packages/core/src/context/ir/graphUtils.ts index f68aca1063..62e9d0a4c2 100644 --- a/packages/core/src/context/ir/graphUtils.ts +++ b/packages/core/src/context/ir/graphUtils.ts @@ -7,7 +7,7 @@ import type { Episode } from './types.js'; import type { ContextTracer } from '../tracer.js'; import { debugLogger } from '../../utils/debugLogger.js'; - +import type { ContextEnvironment } from '../sidecar/environment.js'; /** * Generates a computed view of the pristine log. @@ -16,7 +16,6 @@ import { debugLogger } from '../../utils/debugLogger.js'; * (snapshot > summary > masked) instead of the raw text. * Handles N-to-1 variant skipping automatically. */ -import type { ContextEnvironment } from "../sidecar/environment.js"; export function generateWorkingBufferView( pristineEpisodes: Episode[], @@ -42,7 +41,7 @@ export function generateWorkingBufferView( } let projectedTrigger: typeof ep.trigger; - + if (ep.trigger.type === 'USER_PROMPT') { projectedTrigger = { ...ep.trigger, @@ -50,7 +49,7 @@ export function generateWorkingBufferView( ...ep.trigger.metadata, transformations: [...(ep.trigger.metadata?.transformations || [])], }, - semanticParts: ep.trigger.semanticParts.map(sp => ({...sp})) + semanticParts: ep.trigger.semanticParts.map((sp) => ({ ...sp })), }; } else { projectedTrigger = { @@ -58,23 +57,20 @@ export function generateWorkingBufferView( metadata: { ...ep.trigger.metadata, transformations: [...(ep.trigger.metadata?.transformations || [])], - } + }, }; } let projectedEp: Episode = { ...ep, trigger: projectedTrigger, - steps: ep.steps.map( - (step) => - ({ - ...step, - metadata: { - ...step.metadata, - transformations: [...(step.metadata?.transformations || [])], - }, - }) - ), + steps: ep.steps.map((step) => ({ + ...step, + metadata: { + ...step.metadata, + transformations: [...(step.metadata?.transformations || [])], + }, + })), yield: ep.yield ? { ...ep.yield, @@ -86,7 +82,9 @@ export function generateWorkingBufferView( : undefined, }; - const epTokens = env.tokenCalculator.calculateEpisodeListTokens([projectedEp]); + const epTokens = env.tokenCalculator.calculateEpisodeListTokens([ + projectedEp, + ]); if (rollingTokens > retainedTokens && ep.variants) { const snapshot = ep.variants['snapshot']; @@ -167,7 +165,9 @@ export function generateWorkingBufferView( } currentEpisodes.unshift(projectedEp); - rollingTokens += env.tokenCalculator.calculateEpisodeListTokens([projectedEp]); + rollingTokens += env.tokenCalculator.calculateEpisodeListTokens([ + projectedEp, + ]); } return currentEpisodes; diff --git a/packages/core/src/context/ir/mapper.test.ts b/packages/core/src/context/ir/mapper.test.ts index 04680925d2..d74befd62b 100644 --- a/packages/core/src/context/ir/mapper.test.ts +++ b/packages/core/src/context/ir/mapper.test.ts @@ -132,7 +132,10 @@ describe('IrMapper', () => { it('should correctly handle multi-tool-calls grouped within a single turn without dropping observations', () => { const rawHistory: Content[] = [ - { role: 'user', parts: [{ text: 'Examine both of these tools please.' }] }, + { + role: 'user', + parts: [{ text: 'Examine both of these tools please.' }], + }, { role: 'model', parts: [ @@ -192,11 +195,13 @@ describe('IrMapper', () => { // 0: AgentThought ("I will call them concurrently") // 1: ToolExecution(tool_one) // 2: ToolExecution(tool_two) - + expect(ep.steps).toHaveLength(3); - + expect(ep.steps[0].type).toBe('AGENT_THOUGHT'); - expect((ep.steps[0] as AgentThought).text).toBe('I will call them concurrently.'); + expect((ep.steps[0] as AgentThought).text).toBe( + 'I will call them concurrently.', + ); expect(ep.steps[1].type).toBe('TOOL_EXECUTION'); expect((ep.steps[1] as ToolExecution).toolName).toBe('tool_one'); @@ -212,19 +217,19 @@ describe('IrMapper', () => { expect(ep.yield).toBeDefined(); expect(ep.yield?.type).toBe('AGENT_YIELD'); expect(ep.yield?.text).toBe('Both complete.'); - + // Now verify we can reconstitute it without dropping the multiple calls const reconstituted = IrMapper.fromIr(episodes); - + // The reconstituted history should have exactly 4 turns, same as original expect(reconstituted).toHaveLength(4); - + // Check that the Model turn has both function calls expect(reconstituted[1].role).toBe('model'); expect(reconstituted[1].parts).toHaveLength(3); // text + call1 + call2 expect(reconstituted[1].parts![1].functionCall?.name).toBe('tool_one'); expect(reconstituted[1].parts![2].functionCall?.name).toBe('tool_two'); - + // Check that the User turn has both function responses expect(reconstituted[2].role).toBe('user'); expect(reconstituted[2].parts).toHaveLength(2); // response1 + response2 diff --git a/packages/core/src/context/ir/mapper.ts b/packages/core/src/context/ir/mapper.ts index 4b0a34f222..bf2c09100b 100644 --- a/packages/core/src/context/ir/mapper.ts +++ b/packages/core/src/context/ir/mapper.ts @@ -15,7 +15,10 @@ export class IrMapper { * Translates a flat Gemini Content[] array into our rich Episodic Intermediate Representation. * Groups adjacent function calls and responses into unified ToolExecution nodes. */ - static toIr(history: readonly Content[], tokenCalculator: ContextTokenCalculator): Episode[] { + static toIr( + history: readonly Content[], + tokenCalculator: ContextTokenCalculator, + ): Episode[] { return toIr(history, tokenCalculator); } diff --git a/packages/core/src/context/ir/projector.ts b/packages/core/src/context/ir/projector.ts index 19deec9008..b98d494a90 100644 --- a/packages/core/src/context/ir/projector.ts +++ b/packages/core/src/context/ir/projector.ts @@ -8,14 +8,16 @@ import type { Content } from '@google/genai'; import { IrMapper } from './mapper.js'; import type { Episode } from './types.js'; import { debugLogger } from '../../utils/debugLogger.js'; -import type { ContextEnvironment, ContextTracer } from '../sidecar/environment.js'; +import type { + ContextEnvironment, + ContextTracer, +} from '../sidecar/environment.js'; import type { PipelineOrchestrator } from '../sidecar/orchestrator.js'; import type { SidecarConfig } from '../sidecar/types.js'; - export class IrProjector { /** - * Orchestrates the final projection: takes a working buffer view, + * Orchestrates the final projection: takes a working buffer view, * applies the Immediate Sanitization pipeline, and enforces token boundaries. */ static async project( @@ -24,42 +26,67 @@ export class IrProjector { sidecar: SidecarConfig, tracer: ContextTracer, env: ContextEnvironment, - protectedIds: Set + protectedIds: Set, ): Promise { if (!sidecar.budget) { const contents = IrMapper.fromIr(workingBuffer); - tracer.logEvent('IrProjector', 'Projected Context to LLM (No Budget)', { projectedContext: contents }); + tracer.logEvent('IrProjector', 'Projected Context to LLM (No Budget)', { + projectedContext: contents, + }); return contents; } const maxTokens = sidecar.budget.maxTokens; - const currentTokens = env.tokenCalculator.calculateEpisodeListTokens(workingBuffer); + const currentTokens = + env.tokenCalculator.calculateEpisodeListTokens(workingBuffer); if (currentTokens <= maxTokens) { - tracer.logEvent('IrProjector', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`); + tracer.logEvent( + 'IrProjector', + `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`, + ); const contents = IrMapper.fromIr(workingBuffer); - tracer.logEvent('IrProjector', 'Projected Context to LLM', { projectedContext: contents }); + tracer.logEvent('IrProjector', 'Projected Context to LLM', { + projectedContext: contents, + }); return contents; } - tracer.logEvent('IrProjector', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier.`); - debugLogger.log(`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}).`); + tracer.logEvent( + 'IrProjector', + `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier.`, + ); + debugLogger.log( + `Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}).`, + ); - const processedEpisodes = await orchestrator.executePipeline('Immediate Sanitization', workingBuffer, { - currentTokens, - maxTokens: sidecar.budget.maxTokens, - retainedTokens: sidecar.budget.retainedTokens, - deficitTokens: Math.max(0, currentTokens - sidecar.budget.maxTokens), - protectedEpisodeIds: protectedIds, - isBudgetSatisfied: currentTokens <= sidecar.budget.maxTokens, - }); + const processedEpisodes = await orchestrator.executePipeline( + 'Immediate Sanitization', + workingBuffer, + { + currentTokens, + maxTokens: sidecar.budget.maxTokens, + retainedTokens: sidecar.budget.retainedTokens, + deficitTokens: Math.max(0, currentTokens - sidecar.budget.maxTokens), + protectedEpisodeIds: protectedIds, + isBudgetSatisfied: currentTokens <= sidecar.budget.maxTokens, + }, + ); - const finalTokens = env.tokenCalculator.calculateEpisodeListTokens(processedEpisodes); - tracer.logEvent('IrProjector', `Finished projection. Final token count: ${finalTokens}.`); - debugLogger.log(`Context Manager finished. Final actual token count: ${finalTokens}.`); + const finalTokens = + env.tokenCalculator.calculateEpisodeListTokens(processedEpisodes); + tracer.logEvent( + 'IrProjector', + `Finished projection. Final token count: ${finalTokens}.`, + ); + debugLogger.log( + `Context Manager finished. Final actual token count: ${finalTokens}.`, + ); const contents = IrMapper.fromIr(processedEpisodes); - tracer.logEvent('IrProjector', 'Projected Sanitized Context to LLM', { projectedContextSanitized: contents }); + tracer.logEvent('IrProjector', 'Projected Sanitized Context to LLM', { + projectedContextSanitized: contents, + }); return contents; } } diff --git a/packages/core/src/context/ir/toIr.ts b/packages/core/src/context/ir/toIr.ts index e4c956f060..7081d0817a 100644 --- a/packages/core/src/context/ir/toIr.ts +++ b/packages/core/src/context/ir/toIr.ts @@ -30,7 +30,23 @@ export function getStableId(obj: object): string { return id; } -export function toIr(history: readonly Content[], tokenCalculator: ContextTokenCalculator): Episode[] { +function isRecord(v: unknown): v is Record { + return typeof v === 'object' && v !== null && !Array.isArray(v); +} + +function isCompleteEpisode(ep: Partial): ep is Episode { + return ( + typeof ep.id === 'string' && + typeof ep.timestamp === 'number' && + !!ep.trigger && + Array.isArray(ep.steps) + ); +} + +export function toIr( + history: readonly Content[], + tokenCalculator: ContextTokenCalculator, +): Episode[] { const episodes: Episode[] = []; let currentEpisode: Partial | null = null; const pendingCallParts: Map = new Map(); @@ -45,8 +61,8 @@ export function toIr(history: readonly Content[], tokenCalculator: ContextTokenC }; const finalizeEpisode = () => { - if (currentEpisode && currentEpisode.trigger) { - episodes.push(currentEpisode as unknown as Episode); // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion + if (currentEpisode && isCompleteEpisode(currentEpisode)) { + episodes.push(currentEpisode); } currentEpisode = null; }; @@ -61,7 +77,13 @@ export function toIr(history: readonly Content[], tokenCalculator: ContextTokenC ); if (hasToolResponses) { - currentEpisode = parseToolResponses(msg, currentEpisode, pendingCallParts, tokenCalculator, createMetadata); + currentEpisode = parseToolResponses( + msg, + currentEpisode, + pendingCallParts, + tokenCalculator, + createMetadata, + ); } if (hasUserParts) { @@ -69,7 +91,12 @@ export function toIr(history: readonly Content[], tokenCalculator: ContextTokenC currentEpisode = parseUserParts(msg, createMetadata); } } else if (msg.role === 'model') { - currentEpisode = parseModelParts(msg, currentEpisode, pendingCallParts, createMetadata); + currentEpisode = parseModelParts( + msg, + currentEpisode, + pendingCallParts, + createMetadata, + ); } } @@ -86,7 +113,7 @@ function parseToolResponses( currentEpisode: Partial | null, pendingCallParts: Map, tokenCalculator: ContextTokenCalculator, - createMetadata: (parts: Part[]) => IrMetadata + createMetadata: (parts: Part[]) => IrMetadata, ): Partial { if (!currentEpisode) { currentEpisode = { @@ -117,18 +144,12 @@ function parseToolResponses( id: getStableId(part), type: 'TOOL_EXECUTION', toolName: part.functionResponse.name || 'unknown', - intent: - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (matchingCall?.functionCall?.args as unknown as Record< - string, - unknown - >) || {}, - observation: - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (part.functionResponse.response as unknown as Record< - string, - unknown - >) || {}, + intent: isRecord(matchingCall?.functionCall?.args) + ? matchingCall.functionCall.args + : {}, + observation: isRecord(part.functionResponse.response) + ? part.functionResponse.response + : {}, tokens: { intent: intentTokens, observation: obsTokens, @@ -146,7 +167,10 @@ function parseToolResponses( return currentEpisode; } -function parseUserParts(msg: Content, createMetadata: (parts: Part[]) => IrMetadata): Partial { +function parseUserParts( + msg: Content, + createMetadata: (parts: Part[]) => IrMetadata, +): Partial { const semanticParts: SemanticPart[] = []; for (const p of msg.parts!) { if (p.text !== undefined) @@ -171,9 +195,7 @@ function parseUserParts(msg: Content, createMetadata: (parts: Part[]) => IrMetad id: getStableId(msg.parts![0] || msg), type: 'USER_PROMPT', semanticParts, - metadata: createMetadata( - msg.parts!.filter((p) => !p.functionResponse), - ), + metadata: createMetadata(msg.parts!.filter((p) => !p.functionResponse)), }; return { @@ -188,7 +210,7 @@ function parseModelParts( msg: Content, currentEpisode: Partial | null, pendingCallParts: Map, - createMetadata: (parts: Part[]) => IrMetadata + createMetadata: (parts: Part[]) => IrMetadata, ): Partial { if (!currentEpisode) { currentEpisode = { diff --git a/packages/core/src/context/ir/types.ts b/packages/core/src/context/ir/types.ts index e60964304f..fddf55197b 100644 --- a/packages/core/src/context/ir/types.ts +++ b/packages/core/src/context/ir/types.ts @@ -4,6 +4,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +import type { Part } from '@google/genai'; + /** * Universal Audit Metadata * Tracks the lifecycle and transformations of a node or part within the IR. @@ -96,7 +98,7 @@ export type SemanticPart = } | { type: 'raw_part'; - part: unknown; + part: Part; presentation?: { text: string; tokens: number }; }; diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index 1a2b3981a2..b114098a74 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -3,6 +3,7 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + import type { EpisodeEditor } from './ir/episodeEditor.js'; /** @@ -40,8 +41,5 @@ export interface ContextProcessor { * Processes the episodic history payload via the provided EpisodeEditor, based on the current accounting state. * Processors should safely mutate or replace episodes using the editor's API. */ - process( - editor: EpisodeEditor, - state: ContextAccountingState, - ): Promise; + process(editor: EpisodeEditor, state: ContextAccountingState): Promise; } diff --git a/packages/core/src/context/processors/blobDegradationProcessor.test.ts b/packages/core/src/context/processors/blobDegradationProcessor.test.ts index cd1368c55f..82cc182d16 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.test.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.test.ts @@ -3,7 +3,12 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { createMockEnvironment, createDummyState, createDummyEpisode } from '../testing/contextTestUtils.js'; + +import { + createMockEnvironment, + createDummyState, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import { describe, it, expect, beforeEach, vi } from 'vitest'; import { BlobDegradationProcessor } from './blobDegradationProcessor.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; @@ -47,14 +52,20 @@ describe('BlobDegradationProcessor', () => { // Inline data should be degraded expect(parts[1].presentation).toBeDefined(); - expect(parts[1].presentation!.text).toContain('[Multi-Modal Blob (image/png'); - expect(parts[1].presentation!.text).toContain('degraded to text to preserve context window'); + expect(parts[1].presentation!.text).toContain( + '[Multi-Modal Blob (image/png', + ); + expect(parts[1].presentation!.text).toContain( + 'degraded to text to preserve context window', + ); // Verify it was written to fake FS expect(fileSystem.getFiles().size).toBeGreaterThan(0); const files = Array.from(fileSystem.getFiles().keys()); - expect(files[0]).toContain('.gemini/tool-outputs/degraded-blobs/session-mock-session/blob_'); - + expect(files[0]).toContain( + '.gemini/tool-outputs/degraded-blobs/session-mock-session/blob_', + ); + expect(result[0].trigger.metadata.transformations.length).toBe(1); }); @@ -74,8 +85,12 @@ describe('BlobDegradationProcessor', () => { const parts = (result[0].trigger as UserPrompt).semanticParts; expect(parts[0].presentation).toBeDefined(); - expect(parts[0].presentation!.text).toContain('[File Reference (application/pdf)'); - expect(parts[0].presentation!.text).toContain('Original URI: gs://fake-bucket/doc.pdf'); + expect(parts[0].presentation!.text).toContain( + '[File Reference (application/pdf)', + ); + expect(parts[0].presentation!.text).toContain( + 'Original URI: gs://fake-bucket/doc.pdf', + ); expect(fileSystem.getFiles().size).toBe(0); }); diff --git a/packages/core/src/context/processors/blobDegradationProcessor.ts b/packages/core/src/context/processors/blobDegradationProcessor.ts index b46a081b4a..2981a5d7e8 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.ts @@ -3,18 +3,19 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; - import { sanitizeFilenamePart } from '../../utils/fileUtils.js'; -import type { Part } from '@google/genai'; - import type { EpisodeEditor } from '../ir/episodeEditor.js'; export type BlobDegradationProcessorOptions = Record; export class BlobDegradationProcessor implements ContextProcessor { - static create(env: ContextEnvironment, _options: BlobDegradationProcessorOptions): BlobDegradationProcessor { + static create( + env: ContextEnvironment, + _options: BlobDegradationProcessorOptions, + ): BlobDegradationProcessor { return new BlobDegradationProcessor(env); } @@ -89,34 +90,46 @@ export class BlobDegradationProcessor implements ContextProcessor { const oldTokens = this.env.tokenCalculator.estimateTokensForParts([ { inlineData: { mimeType: part.mimeType, data: part.data } }, ]); - const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: newText }]); + const newTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: newText }, + ]); tokensSaved = oldTokens - newTokens; } else if (part.type === 'file_data') { newText = `[File Reference (${part.mimeType}) degraded to text to preserve context window. Original URI: ${part.fileUri}]`; const oldTokens = this.env.tokenCalculator.estimateTokensForParts([ { fileData: { mimeType: part.mimeType, fileUri: part.fileUri } }, ]); - const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: newText }]); + const newTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: newText }, + ]); tokensSaved = oldTokens - newTokens; } else if (part.type === 'raw_part') { newText = `[Unknown Part degraded to text to preserve context window.]`; - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const oldTokens = this.env.tokenCalculator.estimateTokensForParts([part.part as Part]); - const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: newText }]); + const oldTokens = this.env.tokenCalculator.estimateTokensForParts([ + part.part, + ]); + const newTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: newText }, + ]); tokensSaved = oldTokens - newTokens; } if (newText && tokensSaved > 0) { - const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: newText }]); - + const newTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: newText }, + ]); + editor.editEpisode(ep.id, 'DEGRADE_BLOB', (draft) => { if (draft.trigger.type === 'USER_PROMPT') { - draft.trigger.semanticParts[j].presentation = { text: newText, tokens: newTokens }; - draft.trigger.metadata.transformations.push({ - processorName: this.name, - action: 'DEGRADED', - timestamp: Date.now(), - }); + draft.trigger.semanticParts[j].presentation = { + text: newText, + tokens: newTokens, + }; + draft.trigger.metadata.transformations.push({ + processorName: this.name, + action: 'DEGRADED', + timestamp: Date.now(), + }); } }); diff --git a/packages/core/src/context/processors/emergencyTruncationProcessor.test.ts b/packages/core/src/context/processors/emergencyTruncationProcessor.test.ts index ce0bade3ad..892e0d53ce 100644 --- a/packages/core/src/context/processors/emergencyTruncationProcessor.test.ts +++ b/packages/core/src/context/processors/emergencyTruncationProcessor.test.ts @@ -4,7 +4,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { createMockEnvironment, createDummyState, createDummyEpisode } from '../testing/contextTestUtils.js'; +import { + createMockEnvironment, + createDummyState, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import { describe, it, expect, beforeEach, vi } from 'vitest'; import { EmergencyTruncationProcessor } from './emergencyTruncationProcessor.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; @@ -18,9 +22,15 @@ describe('EmergencyTruncationProcessor', () => { vi.resetAllMocks(); env = createMockEnvironment(); // Force token calculator to return exactly what we tell it for deterministic testing - vi.spyOn(env.tokenCalculator, 'calculateEpisodeListTokens').mockImplementation((episodes) => - // Just sum up the metadata originalTokens for our dummy episodes - episodes.reduce((acc, ep) => acc + (ep.trigger.metadata.originalTokens || 100), 0) + vi.spyOn( + env.tokenCalculator, + 'calculateEpisodeListTokens', + ).mockImplementation((episodes) => + // Just sum up the metadata originalTokens for our dummy episodes + episodes.reduce( + (acc, ep) => acc + (ep.trigger.metadata.originalTokens || 100), + 0, + ), ); processor = new EmergencyTruncationProcessor(env, {}); @@ -28,10 +38,12 @@ describe('EmergencyTruncationProcessor', () => { it('bypasses processing if currentTokens <= maxTokens', async () => { const episodes = [ - createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: 'short' }]) + createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: 'short' }, + ]), ]; // State says we are under budget (5000 < 10000) - const state = createDummyState(true, 0, new Set(), 5000, 10000); + const state = createDummyState(true, 0, new Set(), 5000, 10000); const editor = new EpisodeEditor(episodes); await processor.process(editor, state); @@ -41,20 +53,26 @@ describe('EmergencyTruncationProcessor', () => { }); it('truncates episodes from the front (oldest) until targetTokens is met', async () => { - const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: 'oldest' }]); - const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: 'middle' }]); - const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', [{ type: 'text', text: 'newest' }]); - + const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: 'oldest' }, + ]); + const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [ + { type: 'text', text: 'middle' }, + ]); + const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', [ + { type: 'text', text: 'newest' }, + ]); + // Each is worth 100 tokens according to our mock const episodes = [ep1, ep2, ep3]; - + // We have 300 tokens, but max is 200. We need to drop 100 tokens. const state = createDummyState(false, 100, new Set(), 300, 200); const editor = new EpisodeEditor(episodes); await processor.process(editor, state); const result = editor.getFinalEpisodes(); - + // It should drop the FIRST episode (ep-1) and keep the rest. expect(result.length).toBe(2); expect(result[0].id).toBe('ep-2'); @@ -62,12 +80,18 @@ describe('EmergencyTruncationProcessor', () => { }); it('never drops protected episodes (e.g. system instructions)', async () => { - const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: 'protected system prompt' }]); - const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: 'middle' }]); - const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', [{ type: 'text', text: 'newest' }]); - + const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: 'protected system prompt' }, + ]); + const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', [ + { type: 'text', text: 'middle' }, + ]); + const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', [ + { type: 'text', text: 'newest' }, + ]); + const episodes = [ep1, ep2, ep3]; - + // We have 300 tokens, max is 200. We need to drop 100 tokens. // However, ep-1 is protected! const state = createDummyState(false, 100, new Set(['ep-1']), 300, 200); @@ -75,7 +99,7 @@ describe('EmergencyTruncationProcessor', () => { const editor = new EpisodeEditor(episodes); await processor.process(editor, state); const result = editor.getFinalEpisodes(); - + // It should SKIP dropping ep-1 (protected) and drop ep-2 instead. expect(result.length).toBe(2); expect(result[0].id).toBe('ep-1'); // Protected, survived @@ -86,19 +110,19 @@ describe('EmergencyTruncationProcessor', () => { const ep1 = createDummyEpisode('ep-1', 'USER_PROMPT', []); const ep2 = createDummyEpisode('ep-2', 'USER_PROMPT', []); const ep3 = createDummyEpisode('ep-3', 'USER_PROMPT', []); - + const episodes = [ep1, ep2, ep3]; - + // We have 300 tokens, max is 50. We need to drop 250 tokens! const state = createDummyState(false, 250, new Set(), 300, 50); const editor = new EpisodeEditor(episodes); await processor.process(editor, state); const result = editor.getFinalEpisodes(); - - // It must drop ep1 (100t) and ep2 (100t). - // Remaining is ep3 (100t). - // Wait, if it drops ep1 (remaining=200) and ep2 (remaining=100), + + // It must drop ep1 (100t) and ep2 (100t). + // Remaining is ep3 (100t). + // Wait, if it drops ep1 (remaining=200) and ep2 (remaining=100), // when it looks at ep3, remaining (100) > max (50), so it will drop ep3 too! expect(result.length).toBe(0); }); diff --git a/packages/core/src/context/processors/emergencyTruncationProcessor.ts b/packages/core/src/context/processors/emergencyTruncationProcessor.ts index c9c866970d..ed1e120dd3 100644 --- a/packages/core/src/context/processors/emergencyTruncationProcessor.ts +++ b/packages/core/src/context/processors/emergencyTruncationProcessor.ts @@ -6,41 +6,53 @@ import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; - - import type { EpisodeEditor } from '../ir/episodeEditor.js'; export type EmergencyTruncationProcessorOptions = Record; export class EmergencyTruncationProcessor implements ContextProcessor { - static create(env: ContextEnvironment, options: EmergencyTruncationProcessorOptions): EmergencyTruncationProcessor { + static create( + env: ContextEnvironment, + options: EmergencyTruncationProcessorOptions, + ): EmergencyTruncationProcessor { return new EmergencyTruncationProcessor(env, options); } readonly id = 'EmergencyTruncationProcessor'; readonly name = 'EmergencyTruncationProcessor'; readonly options: EmergencyTruncationProcessorOptions; - constructor(private readonly _env: ContextEnvironment, options: EmergencyTruncationProcessorOptions) { + constructor( + private readonly _env: ContextEnvironment, + options: EmergencyTruncationProcessorOptions, + ) { this.options = options; } - async process(editor: EpisodeEditor, state: ContextAccountingState): Promise { + async process( + editor: EpisodeEditor, + state: ContextAccountingState, + ): Promise { if (state.currentTokens <= state.maxTokens) return; let remainingTokens = state.currentTokens; const targetTokens = state.maxTokens; const toRemove: string[] = []; - + // We respect the global protected Episode IDs (like the system prompt at index 0) for (const ep of editor.episodes) { - const epTokens = this._env.tokenCalculator.calculateEpisodeListTokens([ep]); - - if (remainingTokens > targetTokens && !state.protectedEpisodeIds.has(ep.id)) { + const epTokens = this._env.tokenCalculator.calculateEpisodeListTokens([ + ep, + ]); + + if ( + remainingTokens > targetTokens && + !state.protectedEpisodeIds.has(ep.id) + ) { remainingTokens -= epTokens; toRemove.push(ep.id); } } - + if (toRemove.length > 0) { editor.removeEpisodes(toRemove, 'TRUNCATED'); } diff --git a/packages/core/src/context/processors/historySquashingProcessor.test.ts b/packages/core/src/context/processors/historySquashingProcessor.test.ts index 228ecec564..bdbae5a159 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.test.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.test.ts @@ -3,15 +3,16 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { createMockEnvironment, createDummyState, createDummyEpisode } from '../testing/contextTestUtils.js'; + +import { + createMockEnvironment, + createDummyState, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import { describe, it, expect, beforeEach } from 'vitest'; import { HistorySquashingProcessor } from './historySquashingProcessor.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; -import type { - UserPrompt, - AgentThought, - AgentYield, -} from '../ir/types.js'; +import type { UserPrompt, AgentThought, AgentYield } from '../ir/types.js'; import { randomUUID } from 'node:crypto'; describe('HistorySquashingProcessor', () => { @@ -23,8 +24,14 @@ describe('HistorySquashingProcessor', () => { }); }); - const createThoughtEpisode = (id: string, userText: string, modelThought: string) => { - const ep = createDummyEpisode(id, 'USER_PROMPT', [{ type: 'text', text: userText }]); + const createThoughtEpisode = ( + id: string, + userText: string, + modelThought: string, + ) => { + const ep = createDummyEpisode(id, 'USER_PROMPT', [ + { type: 'text', text: userText }, + ]); // Replace the tool steps with a thought step for this test ep.steps = [ { diff --git a/packages/core/src/context/processors/historySquashingProcessor.ts b/packages/core/src/context/processors/historySquashingProcessor.ts index 97d8c2f7de..add0c813fb 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.ts @@ -14,7 +14,10 @@ export interface HistorySquashingProcessorOptions { } export class HistorySquashingProcessor implements ContextProcessor { - static create(env: ContextEnvironment, options: HistorySquashingProcessorOptions): HistorySquashingProcessor { + static create( + env: ContextEnvironment, + options: HistorySquashingProcessorOptions, + ): HistorySquashingProcessor { return new HistorySquashingProcessor(env, options); } @@ -23,7 +26,8 @@ export class HistorySquashingProcessor implements ContextProcessor { properties: { maxTokensPerNode: { type: 'number', - description: 'The maximum tokens a node can have before being truncated.', + description: + 'The maximum tokens a node can have before being truncated.', }, }, required: ['maxTokensPerNode'], @@ -33,7 +37,10 @@ export class HistorySquashingProcessor implements ContextProcessor { readonly name = 'HistorySquashingProcessor'; readonly options: HistorySquashingProcessorOptions; - constructor(env: ContextEnvironment, options: HistorySquashingProcessorOptions) { + constructor( + env: ContextEnvironment, + options: HistorySquashingProcessorOptions, + ) { this.options = options; } @@ -95,21 +102,21 @@ export class HistorySquashingProcessor implements ContextProcessor { limitChars, currentDeficit, (p) => { - editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => { - if (draft.trigger.type === 'USER_PROMPT') { - draft.trigger.semanticParts[j].presentation = p; - } - }); + editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => { + if (draft.trigger.type === 'USER_PROMPT') { + draft.trigger.semanticParts[j].presentation = p; + } + }); }, () => { - editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => { - draft.trigger.metadata.transformations.push({ - processorName: this.name, - action: 'TRUNCATED', - timestamp: Date.now(), - }); - }); - } + editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => { + draft.trigger.metadata.transformations.push({ + processorName: this.name, + action: 'TRUNCATED', + timestamp: Date.now(), + }); + }); + }, ); currentDeficit -= saved; } @@ -127,25 +134,25 @@ export class HistorySquashingProcessor implements ContextProcessor { limitChars, currentDeficit, (p) => { - editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => { - const draftStep = draft.steps[j]; - if (draftStep.type === 'AGENT_THOUGHT') { - draftStep.presentation = p; - } - }); + editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => { + const draftStep = draft.steps[j]; + if (draftStep.type === 'AGENT_THOUGHT') { + draftStep.presentation = p; + } + }); }, () => { - editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => { - const draftStep = draft.steps[j]; - if (draftStep.type === 'AGENT_THOUGHT') { - draftStep.metadata.transformations.push({ - processorName: this.name, - action: 'TRUNCATED', - timestamp: Date.now(), - }); - } - }); - } + editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => { + const draftStep = draft.steps[j]; + if (draftStep.type === 'AGENT_THOUGHT') { + draftStep.metadata.transformations.push({ + processorName: this.name, + action: 'TRUNCATED', + timestamp: Date.now(), + }); + } + }); + }, ); currentDeficit -= saved; } @@ -159,21 +166,21 @@ export class HistorySquashingProcessor implements ContextProcessor { limitChars, currentDeficit, (p) => { - editor.editEpisode(ep.id, 'SQUASH_YIELD', (draft) => { - if (draft.yield) draft.yield.presentation = p; - }); + editor.editEpisode(ep.id, 'SQUASH_YIELD', (draft) => { + if (draft.yield) draft.yield.presentation = p; + }); }, () => { - editor.editEpisode(ep.id, 'SQUASH_YIELD', (draft) => { - if (draft.yield) { - draft.yield.metadata.transformations.push({ - processorName: this.name, - action: 'TRUNCATED', - timestamp: Date.now(), - }); - } - }); - } + editor.editEpisode(ep.id, 'SQUASH_YIELD', (draft) => { + if (draft.yield) { + draft.yield.metadata.transformations.push({ + processorName: this.name, + action: 'TRUNCATED', + timestamp: Date.now(), + }); + } + }); + }, ); currentDeficit -= saved; } diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.test.ts b/packages/core/src/context/processors/semanticCompressionProcessor.test.ts index 2f04ed9572..a3f3c96143 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.test.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.test.ts @@ -4,15 +4,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { createMockEnvironment, createDummyState, createDummyEpisode } from '../testing/contextTestUtils.js'; +import { + createMockEnvironment, + createDummyState, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import { describe, it, expect, beforeEach, vi } from 'vitest'; import { SemanticCompressionProcessor } from './semanticCompressionProcessor.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; -import type { - UserPrompt, - ToolExecution, - AgentThought, -} from '../ir/types.js'; +import type { UserPrompt, ToolExecution, AgentThought } from '../ir/types.js'; import { randomUUID } from 'node:crypto'; import type { BaseLlmClient } from 'src/core/baseLlmClient.js'; @@ -27,8 +27,10 @@ describe('SemanticCompressionProcessor', () => { const env = createMockEnvironment(); // Re-mock llmClient properly - vi.spyOn(env, 'llmClient', 'get').mockReturnValue({ generateContent: generateContentMock } as unknown as BaseLlmClient); - + vi.spyOn(env, 'llmClient', 'get').mockReturnValue({ + generateContent: generateContentMock, + } as unknown as BaseLlmClient); + processor = new SemanticCompressionProcessor(env, { nodeThresholdTokens: 2000, }); @@ -40,10 +42,12 @@ describe('SemanticCompressionProcessor', () => { thoughtText: string, toolObs: string, ) => { - const ep = createDummyEpisode(id, 'USER_PROMPT', [{ type: 'text', text: userText }]); + const ep = createDummyEpisode(id, 'USER_PROMPT', [ + { type: 'text', text: userText }, + ]); // We override metadata for threshold triggering ep.trigger.metadata.currentTokens = 3800; - + ep.steps = [ { id: randomUUID(), @@ -73,38 +77,50 @@ describe('SemanticCompressionProcessor', () => { }; it('bypasses processing if budget is satisfied', async () => { - const episodes = [createEpisodeWithThoughtsAndTools('1', 'short', 'short', 'short')]; + const episodes = [ + createEpisodeWithThoughtsAndTools('1', 'short', 'short', 'short'), + ]; const state = createDummyState(true); const editor = new EpisodeEditor(episodes); await processor.process(editor, state); - + expect(generateContentMock).not.toHaveBeenCalled(); }); it('skips protected episodes even if over budget', async () => { - const massiveStr = 'M'.repeat(15000); + const massiveStr = 'M'.repeat(15000); const episodes = [ - createEpisodeWithThoughtsAndTools('ep-1', massiveStr, massiveStr, massiveStr), + createEpisodeWithThoughtsAndTools( + 'ep-1', + massiveStr, + massiveStr, + massiveStr, + ), ]; const state = createDummyState(false, 1000, new Set(['ep-1'])); const editor = new EpisodeEditor(episodes); await processor.process(editor, state); - + expect(generateContentMock).not.toHaveBeenCalled(); }); it('summarizes unprotected UserPrompts, Thoughts, and Tool observations until deficit is met', async () => { const massiveStr = 'M'.repeat(15000); const episodes = [ - createEpisodeWithThoughtsAndTools('ep-1', massiveStr, massiveStr, massiveStr), + createEpisodeWithThoughtsAndTools( + 'ep-1', + massiveStr, + massiveStr, + massiveStr, + ), ]; const state = createDummyState(false, 50000); // Massive deficit, forces all 3 to summarize const editor = new EpisodeEditor(episodes); await processor.process(editor, state); - + expect(generateContentMock).toHaveBeenCalledTimes(3); // Verify presentation layers were injected @@ -128,7 +144,12 @@ describe('SemanticCompressionProcessor', () => { it('stops calling LLM when deficit hits zero', async () => { const massiveStr = 'M'.repeat(15000); const episodes = [ - createEpisodeWithThoughtsAndTools('ep-1', massiveStr, massiveStr, massiveStr), + createEpisodeWithThoughtsAndTools( + 'ep-1', + massiveStr, + massiveStr, + massiveStr, + ), ]; // Set deficit low enough that ONE summary solves the problem @@ -136,7 +157,7 @@ describe('SemanticCompressionProcessor', () => { const editor = new EpisodeEditor(episodes); await processor.process(editor, state); - + // It should only compress the UserPrompt and then stop expect(generateContentMock).toHaveBeenCalledTimes(1); }); diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.ts b/packages/core/src/context/processors/semanticCompressionProcessor.ts index 685ea9a34d..9ba737124f 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.ts @@ -9,8 +9,6 @@ import type { ContextEnvironment } from '../sidecar/environment.js'; import { debugLogger } from '../../utils/debugLogger.js'; import { LlmRole } from '../../telemetry/types.js'; import { getResponseText } from '../../utils/partUtils.js'; - - import type { EpisodeEditor } from '../ir/episodeEditor.js'; export interface SemanticCompressionProcessorOptions { @@ -18,7 +16,10 @@ export interface SemanticCompressionProcessorOptions { } export class SemanticCompressionProcessor implements ContextProcessor { - static create(env: ContextEnvironment, options: SemanticCompressionProcessorOptions): SemanticCompressionProcessor { + static create( + env: ContextEnvironment, + options: SemanticCompressionProcessorOptions, + ): SemanticCompressionProcessor { return new SemanticCompressionProcessor(env, options); } @@ -82,19 +83,26 @@ export class SemanticCompressionProcessor implements ContextProcessor { part.text, 'User Prompt', ); - const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: summary }]); - const oldTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: part.text }]); + const newTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: summary }, + ]); + const oldTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: part.text }, + ]); if (newTokens < oldTokens) { editor.editEpisode(ep.id, 'SUMMARIZE_PROMPT', (draft) => { - if (draft.trigger.type === 'USER_PROMPT') { - draft.trigger.semanticParts[j].presentation = { text: summary, tokens: newTokens }; - draft.trigger.metadata.transformations.push({ - processorName: this.name, - action: 'SUMMARIZED', - timestamp: Date.now(), - }); - } + if (draft.trigger.type === 'USER_PROMPT') { + draft.trigger.semanticParts[j].presentation = { + text: summary, + tokens: newTokens, + }; + draft.trigger.metadata.transformations.push({ + processorName: this.name, + action: 'SUMMARIZED', + timestamp: Date.now(), + }); + } }); currentDeficit -= oldTokens - newTokens; } @@ -114,20 +122,27 @@ export class SemanticCompressionProcessor implements ContextProcessor { step.text, 'Agent Thought', ); - const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: summary }]); - const oldTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: step.text }]); + const newTokens = this.env.tokenCalculator.estimateTokensForParts( + [{ text: summary }], + ); + const oldTokens = this.env.tokenCalculator.estimateTokensForParts( + [{ text: step.text }], + ); if (newTokens < oldTokens) { editor.editEpisode(ep.id, 'SUMMARIZE_THOUGHT', (draft) => { - const draftStep = draft.steps[j]; - if (draftStep.type === 'AGENT_THOUGHT') { - draftStep.presentation = { text: summary, tokens: newTokens }; - draftStep.metadata.transformations.push({ - processorName: this.name, - action: 'SUMMARIZED', - timestamp: Date.now(), - }); - } + const draftStep = draft.steps[j]; + if (draftStep.type === 'AGENT_THOUGHT') { + draftStep.presentation = { + text: summary, + tokens: newTokens, + }; + draftStep.metadata.transformations.push({ + processorName: this.name, + action: 'SUMMARIZED', + timestamp: Date.now(), + }); + } }); currentDeficit -= oldTokens - newTokens; } @@ -161,38 +176,53 @@ export class SemanticCompressionProcessor implements ContextProcessor { // Wrap the summary in an object so the Gemini API accepts it as a valid functionResponse.response const newObsObject = { summary }; - const newObsTokens = this.env.tokenCalculator.estimateTokensForParts([ - { - functionResponse: { - name: step.toolName, - response: newObsObject as unknown as Record, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion - id: step.id, + const newObsTokens = + this.env.tokenCalculator.estimateTokensForParts([ + { + functionResponse: { + name: step.toolName, + response: newObsObject, + id: step.id, + }, }, - }, - ]); + ]); const oldObsTokens = - step.presentation?.tokens?.observation ?? step.tokens?.observation ?? step.tokens; + step.presentation?.tokens?.observation ?? + step.tokens?.observation ?? + step.tokens; const intentTokens = step.presentation?.tokens?.intent ?? step.tokens?.intent ?? 0; if (newObsTokens < oldObsTokens) { editor.editEpisode(ep.id, 'SUMMARIZE_TOOL', (draft) => { - const draftStep = draft.steps[j]; - if (draftStep.type === 'TOOL_EXECUTION') { - draftStep.presentation = { - intent: draftStep.presentation?.intent ?? draftStep.intent, - observation: newObsObject, - tokens: { intent: intentTokens, observation: newObsTokens }, + const draftStep = draft.steps[j]; + if (draftStep.type === 'TOOL_EXECUTION') { + draftStep.presentation = { + intent: + draftStep.presentation?.intent ?? draftStep.intent, + observation: newObsObject, + tokens: { + intent: intentTokens, + observation: newObsTokens, + }, + }; + if (!draftStep.metadata) { + draftStep.metadata = { + transformations: [], + currentTokens: 0, + originalTokens: 0, }; - if (!draftStep.metadata) { draftStep.metadata = { transformations: [], currentTokens: 0, originalTokens: 0 } }; - if (!draftStep.metadata.transformations) { draftStep.metadata.transformations = [] }; - draftStep.metadata.transformations.push({ - processorName: this.name, - action: 'SUMMARIZED', - timestamp: Date.now(), - }); - } + } + if (!draftStep.metadata.transformations) { + draftStep.metadata.transformations = []; + } + draftStep.metadata.transformations.push({ + processorName: this.name, + action: 'SUMMARIZED', + timestamp: Date.now(), + }); + } }); currentDeficit -= oldObsTokens - newObsTokens; } diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts index 77d6264e59..477cae8e68 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts @@ -4,7 +4,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { createMockEnvironment, createDummyState, createDummyEpisode } from '../testing/contextTestUtils.js'; +import { + createMockEnvironment, + createDummyState, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import { describe, it, expect, beforeEach, vi } from 'vitest'; import { StateSnapshotProcessor } from './stateSnapshotProcessor.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; @@ -23,17 +27,23 @@ describe('StateSnapshotProcessor', () => { generateContentMock = vi.fn().mockResolvedValue({ text: 'Mocked Compressed State Snapshot!', }); - vi.spyOn(env, 'llmClient', 'get').mockReturnValue({ generateContent: generateContentMock } as unknown as BaseLlmClient); + vi.spyOn(env, 'llmClient', 'get').mockReturnValue({ + generateContent: generateContentMock, + } as unknown as BaseLlmClient); // Override token calc for testing - vi.spyOn(env.tokenCalculator, 'estimateTokensForParts').mockReturnValue(100); + vi.spyOn(env.tokenCalculator, 'estimateTokensForParts').mockReturnValue( + 100, + ); processor = new StateSnapshotProcessor(env, {}, env.eventBus); }); it('bypasses processing if deficit is <= 0', async () => { const episodes = [ - createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: 'hello' }]) + createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: 'hello' }, + ]), ]; // current: 100, max: 1000, retained: 200 (deficit 0) const state = createDummyState(false, 0, new Set(), 100, 1000, 200); @@ -48,9 +58,11 @@ describe('StateSnapshotProcessor', () => { it('bypasses processing if not enough episodes to summarize (needs at least 2 inner episodes)', async () => { const episodes = [ createDummyEpisode('ep-sys', 'SYSTEM_EVENT', []), - createDummyEpisode('ep-active', 'USER_PROMPT', [{ type: 'text', text: 'help' }]), + createDummyEpisode('ep-active', 'USER_PROMPT', [ + { type: 'text', text: 'help' }, + ]), ]; - + // current: 1000, max: 10000, retained: 500. Target deficit = 500 const state = createDummyState(false, 500, new Set(), 1000, 10000, 500); @@ -64,33 +76,41 @@ describe('StateSnapshotProcessor', () => { it('summarizes intermediate episodes into a single snapshot episode', async () => { const episodes = [ createDummyEpisode('ep-0', 'SYSTEM_EVENT', []), - createDummyEpisode('ep-1', 'USER_PROMPT', [{ type: 'text', text: 'old 1' }]), - createDummyEpisode('ep-2', 'USER_PROMPT', [{ type: 'text', text: 'old 2' }]), - createDummyEpisode('ep-3', 'USER_PROMPT', [{ type: 'text', text: 'current' }]), + createDummyEpisode('ep-1', 'USER_PROMPT', [ + { type: 'text', text: 'old 1' }, + ]), + createDummyEpisode('ep-2', 'USER_PROMPT', [ + { type: 'text', text: 'old 2' }, + ]), + createDummyEpisode('ep-3', 'USER_PROMPT', [ + { type: 'text', text: 'current' }, + ]), ]; - + // Target deficit = 200 const state = createDummyState(false, 200, new Set(), 1000, 10000, 800); const editor = new EpisodeEditor(episodes); await processor.process(editor, state); const result = editor.getFinalEpisodes(); - + // We started with 4 episodes. // Episodes [1, 2] were synthesized into a single new Snapshot episode. // Final array should be: [0, SNAPSHOT, 3] = length 3. expect(result.length).toBe(3); expect(result[0].id).toBe('ep-0'); - + const snapshotEp = result[1]; expect(snapshotEp.yield).toBeDefined(); expect(snapshotEp.yield!.text).toContain(''); - expect(snapshotEp.yield!.text).toContain('Mocked Compressed State Snapshot!'); - + expect(snapshotEp.yield!.text).toContain( + 'Mocked Compressed State Snapshot!', + ); + expect(result[2].id).toBe('ep-3'); - + expect(generateContentMock).toHaveBeenCalledTimes(1); - + const llmArgs = generateContentMock.mock.calls[0][0]; expect(llmArgs.contents[0].parts[0].text).toContain('old 1'); expect(llmArgs.contents[0].parts[0].text).toContain('old 2'); diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 40c253f927..6872288130 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -6,12 +6,13 @@ import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; import type { Episode } from '../ir/types.js'; -import type { ContextEnvironment, ContextEventBus } from '../sidecar/environment.js'; - +import type { + ContextEnvironment, + ContextEventBus, +} from '../sidecar/environment.js'; import { v4 as uuidv4 } from 'uuid'; import { LlmRole } from '../../telemetry/llmRole.js'; import { debugLogger } from 'src/utils/debugLogger.js'; - import type { EpisodeEditor } from '../ir/episodeEditor.js'; export interface StateSnapshotProcessorOptions { @@ -21,7 +22,10 @@ export interface StateSnapshotProcessorOptions { } export class StateSnapshotProcessor implements ContextProcessor { - static create(env: ContextEnvironment, options: StateSnapshotProcessorOptions): StateSnapshotProcessor { + static create( + env: ContextEnvironment, + options: StateSnapshotProcessorOptions, + ): StateSnapshotProcessor { return new StateSnapshotProcessor(env, options, env.eventBus); } readonly id = 'StateSnapshotProcessor'; @@ -39,8 +43,14 @@ export class StateSnapshotProcessor implements ContextProcessor { this.options = options; } - async process(editor: EpisodeEditor, state: ContextAccountingState): Promise { - const targetDeficit = Math.max(0, state.currentTokens - state.retainedTokens); + async process( + editor: EpisodeEditor, + state: ContextAccountingState, + ): Promise { + const targetDeficit = Math.max( + 0, + state.currentTokens - state.retainedTokens, + ); if (this.isSynthesizing || targetDeficit <= 0) return; this.isSynthesizing = true; @@ -53,10 +63,13 @@ export class StateSnapshotProcessor implements ContextProcessor { selectedEpisodes.push(ep); let triggerText = ''; if (ep.trigger?.type === 'USER_PROMPT') { - const firstPart = ep.trigger.semanticParts?.[0]; - if (firstPart) { - triggerText = firstPart.type === 'text' ? firstPart.text : (firstPart.presentation?.text ?? ''); - } + const firstPart = ep.trigger.semanticParts?.[0]; + if (firstPart) { + triggerText = + firstPart.type === 'text' + ? firstPart.text + : (firstPart.presentation?.text ?? ''); + } } deficitAccumulator += this.env.tokenCalculator.estimateTokensForParts([ { text: triggerText }, @@ -68,11 +81,11 @@ export class StateSnapshotProcessor implements ContextProcessor { if (selectedEpisodes.length < 2) return; // Not enough context to summarize // Optimization: Do NOT emit VariantComputing, let the Orchestrator handle caching the final result. - const snapshotEp: Episode = await this.synthesizeSnapshot(selectedEpisodes); - - const oldIds = selectedEpisodes.map(ep => ep.id); + const snapshotEp: Episode = + await this.synthesizeSnapshot(selectedEpisodes); + + const oldIds = selectedEpisodes.map((ep) => ep.id); editor.replaceEpisodes(oldIds, snapshotEp, 'STATE_SNAPSHOT'); - } finally { this.isSynthesizing = false; } @@ -90,11 +103,13 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; for (const ep of episodes) { if (ep.trigger?.type === 'USER_PROMPT') { - const partsText = ep.trigger.semanticParts.map(p => { - if (p.type === 'text') return p.text; - if (p.presentation) return p.presentation.text; - return ''; - }).join(''); + const partsText = ep.trigger.semanticParts + .map((p) => { + if (p.type === 'text') return p.text; + if (p.presentation) return p.presentation.text; + return ''; + }) + .join(''); userPromptText += `USER: ${partsText}\n`; } else if (ep.trigger?.type === 'SYSTEM_EVENT') { userPromptText += `[SYSTEM EVENT: ${ep.trigger.name}]\n`; @@ -111,22 +126,22 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo } try { - const response = await client.generateContent( - { - modelConfigKey: { model: 'state-snapshot-processor' }, - contents: [{ role: 'user', parts: [{ text: userPromptText }] }], - systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] }, - promptId: this.env.promptId, - role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR, - abortSignal: new AbortController().signal, - }, - ); + const response = await client.generateContent({ + modelConfigKey: { model: 'state-snapshot-processor' }, + contents: [{ role: 'user', parts: [{ text: userPromptText }] }], + systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] }, + promptId: this.env.promptId, + role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR, + abortSignal: new AbortController().signal, + }); const snapshotText = response.text; // Synthesize a new "Episode" representing this compressed block const newId = uuidv4(); - const contentTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: snapshotText }]); + const contentTokens = this.env.tokenCalculator.estimateTokensForParts([ + { text: snapshotText }, + ]); return { id: newId, @@ -149,7 +164,13 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo metadata: { originalTokens: contentTokens, currentTokens: contentTokens, - transformations: [{ processorName: 'StateSnapshotProcessor', action: 'SYNTHESIZED', timestamp: Date.now() }], + transformations: [ + { + processorName: 'StateSnapshotProcessor', + action: 'SYNTHESIZED', + timestamp: Date.now(), + }, + ], }, }, }; diff --git a/packages/core/src/context/processors/toolMaskingProcessor.ts b/packages/core/src/context/processors/toolMaskingProcessor.ts index 5250364b7a..15812d1629 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.ts @@ -6,7 +6,6 @@ import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; - import { sanitizeFilenamePart } from '../../utils/fileUtils.js'; import { ACTIVATE_SKILL_TOOL_NAME, @@ -15,6 +14,7 @@ import { ENTER_PLAN_MODE_TOOL_NAME, EXIT_PLAN_MODE_TOOL_NAME, } from '../../tools/tool-names.js'; +import type { EpisodeEditor } from '../ir/episodeEditor.js'; const UNMASKABLE_TOOLS = new Set([ ACTIVATE_SKILL_TOOL_NAME, @@ -24,14 +24,50 @@ const UNMASKABLE_TOOLS = new Set([ EXIT_PLAN_MODE_TOOL_NAME, ]); -import type { EpisodeEditor } from '../ir/episodeEditor.js'; - export interface ToolMaskingProcessorOptions { stringLengthThresholdTokens: number; } +type MaskableValue = + | string + | number + | boolean + | null + | MaskableValue[] + | { [key: string]: MaskableValue }; + +function isMaskableValue(val: unknown): val is MaskableValue { + if ( + val === null || + typeof val === 'string' || + typeof val === 'number' || + typeof val === 'boolean' + ) { + return true; + } + if (Array.isArray(val)) { + return val.every(isMaskableValue); + } + if (typeof val === 'object') { + return Object.values(val).every(isMaskableValue); + } + return false; +} + +function isMaskableRecord(val: unknown): val is Record { + return ( + typeof val === 'object' && + val !== null && + !Array.isArray(val) && + isMaskableValue(val) + ); +} + export class ToolMaskingProcessor implements ContextProcessor { - static create(env: ContextEnvironment, options: ToolMaskingProcessorOptions): ToolMaskingProcessor { + static create( + env: ContextEnvironment, + options: ToolMaskingProcessorOptions, + ): ToolMaskingProcessor { return new ToolMaskingProcessor(env, options); } @@ -40,7 +76,8 @@ export class ToolMaskingProcessor implements ContextProcessor { properties: { stringLengthThresholdTokens: { type: 'number', - description: 'The token threshold above which tool intents/observations are masked.', + description: + 'The token threshold above which tool intents/observations are masked.', }, }, required: ['stringLengthThresholdTokens'], @@ -51,10 +88,7 @@ export class ToolMaskingProcessor implements ContextProcessor { readonly options: ToolMaskingProcessorOptions; private env: ContextEnvironment; - constructor( - env: ContextEnvironment, - options: ToolMaskingProcessorOptions, - ) { + constructor(env: ContextEnvironment, options: ToolMaskingProcessorOptions) { this.env = env; this.options = options; } @@ -68,7 +102,9 @@ export class ToolMaskingProcessor implements ContextProcessor { if (state.isBudgetSatisfied) return; let currentDeficit = state.deficitTokens; - const limitChars = this.env.tokenCalculator.tokensToChars(maskingConfig.stringLengthThresholdTokens); + const limitChars = this.env.tokenCalculator.tokensToChars( + maskingConfig.stringLengthThresholdTokens, + ); let toolOutputsDir = this.env.fileSystem.join( this.env.projectTempDir, @@ -135,12 +171,10 @@ export class ToolMaskingProcessor implements ContextProcessor { const callId = step.id || Date.now().toString(); - /* eslint-disable @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment */ - const maskAsync = async ( - obj: any, + obj: MaskableValue, nodeType: string, - ): Promise<{ masked: any; changed: boolean }> => { + ): Promise<{ masked: MaskableValue; changed: boolean }> => { if (typeof obj === 'string') { if (obj.length > limitChars && !this.isAlreadyMasked(obj)) { const newString = await handleMasking( @@ -155,7 +189,7 @@ export class ToolMaskingProcessor implements ContextProcessor { } if (Array.isArray(obj)) { let changed = false; - const masked = []; + const masked: MaskableValue[] = []; for (const item of obj) { const res = await maskAsync(item, nodeType); if (res.changed) changed = true; @@ -165,7 +199,7 @@ export class ToolMaskingProcessor implements ContextProcessor { } if (typeof obj === 'object' && obj !== null) { let changed = false; - const masked: Record = {}; + const masked: Record = {}; for (const [key, value] of Object.entries(obj)) { const res = await maskAsync(value, nodeType); if (res.changed) changed = true; @@ -176,31 +210,50 @@ export class ToolMaskingProcessor implements ContextProcessor { return { masked: obj, changed: false }; }; - const intentRes = await maskAsync( - step.presentation.intent ?? step.intent, - 'intent', - ); - const obsRes = await maskAsync( - step.presentation.observation ?? step.observation, - 'observation', - ); + const rawIntent = step.presentation?.intent ?? step.intent; + const rawObs = step.presentation?.observation ?? step.observation; + + if (!isMaskableRecord(rawIntent)) { + throw new Error( + `ToolMaskingProcessor: step intent is not a valid JSON record. CallID: ${callId}`, + ); + } + if (!isMaskableValue(rawObs)) { + throw new Error( + `ToolMaskingProcessor: step observation is not a valid JSON value. CallID: ${callId}`, + ); + } + + const intentRes = await maskAsync(rawIntent, 'intent'); + const obsRes = await maskAsync(rawObs, 'observation'); if (intentRes.changed || obsRes.changed) { + const maskedIntent = isMaskableRecord(intentRes.masked) + ? (intentRes.masked as Record) + : undefined; + const maskedObs = isMaskableRecord(obsRes.masked) + ? (obsRes.masked as Record) + : undefined; + // Recalculate tokens perfectly - const newIntentTokens = this.env.tokenCalculator.estimateTokensForParts([ - { - functionCall: { - name: toolName, - args: intentRes.masked, - id: callId, + const newIntentTokens = + this.env.tokenCalculator.estimateTokensForParts([ + { + functionCall: { + name: toolName, + args: maskedIntent, + id: callId, + }, }, - }, - ]); + ]); const newObsTokens = this.env.tokenCalculator.estimateTokensForParts([ { functionResponse: { name: toolName, - response: obsRes.masked, + response: + typeof obsRes.masked === 'string' + ? { message: obsRes.masked } + : maskedObs, id: callId, }, }, @@ -217,20 +270,27 @@ export class ToolMaskingProcessor implements ContextProcessor { if (savings > 0) { currentDeficit -= savings; - this.env.tracer.logEvent('ToolMaskingProcessor', `Masked tool ${toolName}`, { recoveredTokens: savings }); - + this.env.tracer.logEvent( + 'ToolMaskingProcessor', + `Masked tool ${toolName}`, + { recoveredTokens: savings }, + ); + editor.editEpisode(ep.id, 'MASK_TOOL', (draft) => { const draftStep = draft.steps[j]; if (draftStep.type !== 'TOOL_EXECUTION') return; if (!draftStep.presentation) { - draftStep.presentation = { - intent: draftStep.intent, - observation: draftStep.observation, - tokens: draftStep.tokens, - }; + draftStep.presentation = { + intent: draftStep.intent, + observation: draftStep.observation, + tokens: draftStep.tokens, + }; } - draftStep.presentation.intent = intentRes.masked; - draftStep.presentation.observation = obsRes.masked; + draftStep.presentation.intent = maskedIntent ?? {}; + draftStep.presentation.observation = + typeof obsRes.masked === 'string' + ? { message: obsRes.masked } + : (maskedObs ?? {}); draftStep.presentation.tokens = { intent: newIntentTokens, observation: newObsTokens, @@ -243,8 +303,8 @@ export class ToolMaskingProcessor implements ContextProcessor { processorName: 'ToolMasking', action: 'MASKED', timestamp: Date.now(), - } - ] + }, + ], }; }); } diff --git a/packages/core/src/context/sidecar/SidecarLoader.test.ts b/packages/core/src/context/sidecar/SidecarLoader.test.ts index 6da19fb952..88add76c20 100644 --- a/packages/core/src/context/sidecar/SidecarLoader.test.ts +++ b/packages/core/src/context/sidecar/SidecarLoader.test.ts @@ -3,8 +3,9 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import { ProcessorRegistry } from "./registry.js"; -import { registerBuiltInProcessors } from "./builtins.js"; + +import { ProcessorRegistry } from './registry.js'; +import { registerBuiltInProcessors } from './builtins.js'; import { describe, it, expect, beforeEach } from 'vitest'; import { SidecarLoader } from './SidecarLoader.js'; import { defaultSidecarProfile } from './profiles.js'; @@ -22,7 +23,7 @@ describe('SidecarLoader (Fake FS)', () => { }); const mockConfig = { - getExperimentalContextSidecarConfig: () => '/path/to/sidecar.json' + getExperimentalContextSidecarConfig: () => '/path/to/sidecar.json', } as unknown as Config; it('returns default profile if file does not exist', () => { @@ -38,14 +39,16 @@ describe('SidecarLoader (Fake FS)', () => { it('throws an error if file is empty whitespace', () => { fileSystem.setFile('/path/to/sidecar.json', ' \n '); - expect(() => SidecarLoader.fromConfig(mockConfig, registry, fileSystem)).toThrow('is empty'); + expect(() => + SidecarLoader.fromConfig(mockConfig, registry, fileSystem), + ).toThrow('is empty'); }); it('returns parsed config if file is valid', () => { const validConfig = { budget: { retainedTokens: 1000, maxTokens: 2000 }, gcBackstop: { strategy: 'truncate', target: 'max' }, - pipelines: [] + pipelines: [], }; fileSystem.setFile('/path/to/sidecar.json', JSON.stringify(validConfig)); const result = SidecarLoader.fromConfig(mockConfig, registry, fileSystem); @@ -54,9 +57,11 @@ describe('SidecarLoader (Fake FS)', () => { it('throws validation error if file is invalid', () => { const invalidConfig = { - budget: { retainedTokens: 1000 } // missing maxTokens + budget: { retainedTokens: 1000 }, // missing maxTokens }; fileSystem.setFile('/path/to/sidecar.json', JSON.stringify(invalidConfig)); - expect(() => SidecarLoader.fromConfig(mockConfig, registry, fileSystem)).toThrow('Validation error:'); + expect(() => + SidecarLoader.fromConfig(mockConfig, registry, fileSystem), + ).toThrow('Validation error:'); }); }); diff --git a/packages/core/src/context/sidecar/SidecarLoader.ts b/packages/core/src/context/sidecar/SidecarLoader.ts index ee8e25b1e7..87f4dce8b6 100644 --- a/packages/core/src/context/sidecar/SidecarLoader.ts +++ b/packages/core/src/context/sidecar/SidecarLoader.ts @@ -19,9 +19,9 @@ export class SidecarLoader { * Throws an error if the file cannot be read, parsed, or fails schema validation. */ static loadFromFile( - sidecarPath: string, + sidecarPath: string, registry: ProcessorRegistry, - fileSystem: IFileSystem = new NodeFileSystem() + fileSystem: IFileSystem = new NodeFileSystem(), ): SidecarConfig { const fileContent = fileSystem.readFileSync(sidecarPath, 'utf8'); @@ -40,7 +40,10 @@ export class SidecarLoader { ); } - const validationError = SchemaValidator.validate(getSidecarConfigSchema(registry), parsed); + const validationError = SchemaValidator.validate( + getSidecarConfigSchema(registry), + parsed, + ); if (validationError) { throw new Error( `Invalid sidecar configuration in ${sidecarPath}. Validation error: ${validationError}`, @@ -48,8 +51,13 @@ export class SidecarLoader { } // Schema has been validated. - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - return parsed as SidecarConfig; + const isSidecarConfig = (val: unknown): val is SidecarConfig => true; + if (isSidecarConfig(parsed)) { + return parsed; + } + throw new Error( + 'Unreachable: schema validation passed but type predicate failed.', + ); } /** @@ -57,9 +65,9 @@ export class SidecarLoader { * If a config file is present but invalid, this will THROW to prevent silent misconfiguration. */ static fromConfig( - config: Config, + config: Config, registry: ProcessorRegistry, - fileSystem: IFileSystem = new NodeFileSystem() + fileSystem: IFileSystem = new NodeFileSystem(), ): SidecarConfig { const sidecarPath = config.getExperimentalContextSidecarConfig(); diff --git a/packages/core/src/context/sidecar/builtins.ts b/packages/core/src/context/sidecar/builtins.ts index 8c34face24..7609b46567 100644 --- a/packages/core/src/context/sidecar/builtins.ts +++ b/packages/core/src/context/sidecar/builtins.ts @@ -5,12 +5,27 @@ */ import type { ProcessorRegistry } from './registry.js'; -import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js'; +import { + ToolMaskingProcessor, + type ToolMaskingProcessorOptions, +} from '../processors/toolMaskingProcessor.js'; import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js'; -import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js'; -import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from '../processors/historySquashingProcessor.js'; -import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js'; -import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js'; +import { + SemanticCompressionProcessor, + type SemanticCompressionProcessorOptions, +} from '../processors/semanticCompressionProcessor.js'; +import { + HistorySquashingProcessor, + type HistorySquashingProcessorOptions, +} from '../processors/historySquashingProcessor.js'; +import { + StateSnapshotProcessor, + type StateSnapshotProcessorOptions, +} from '../processors/stateSnapshotProcessor.js'; +import { + EmergencyTruncationProcessor, + type EmergencyTruncationProcessorOptions, +} from '../processors/emergencyTruncationProcessor.js'; export function registerBuiltInProcessors(registry: ProcessorRegistry) { registry.register({ @@ -22,12 +37,12 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { options: { type: 'object', properties: { stringLengthThresholdTokens: { type: 'number' } }, - required: ['stringLengthThresholdTokens'] - } + required: ['stringLengthThresholdTokens'], + }, }, - required: ['processorId', 'options'] + required: ['processorId', 'options'], }, - create: (env, opts) => new ToolMaskingProcessor(env, opts) + create: (env, opts) => new ToolMaskingProcessor(env, opts), }); registry.register>({ @@ -36,11 +51,11 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { type: 'object', properties: { processorId: { const: 'BlobDegradationProcessor' }, - options: { type: 'object' } + options: { type: 'object' }, }, - required: ['processorId'] + required: ['processorId'], }, - create: (env) => new BlobDegradationProcessor(env) + create: (env) => new BlobDegradationProcessor(env), }); registry.register({ @@ -52,12 +67,12 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { options: { type: 'object', properties: { nodeThresholdTokens: { type: 'number' } }, - required: ['nodeThresholdTokens'] - } + required: ['nodeThresholdTokens'], + }, }, - required: ['processorId', 'options'] + required: ['processorId', 'options'], }, - create: (env, opts) => new SemanticCompressionProcessor(env, opts) + create: (env, opts) => new SemanticCompressionProcessor(env, opts), }); registry.register({ @@ -69,12 +84,12 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { options: { type: 'object', properties: { maxTokensPerNode: { type: 'number' } }, - required: ['maxTokensPerNode'] - } + required: ['maxTokensPerNode'], + }, }, - required: ['processorId', 'options'] + required: ['processorId', 'options'], }, - create: (env, opts) => new HistorySquashingProcessor(env, opts) + create: (env, opts) => new HistorySquashingProcessor(env, opts), }); registry.register({ @@ -88,13 +103,13 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { properties: { model: { type: 'string' }, systemInstruction: { type: 'string' }, - triggerDeficitTokens: { type: 'number' } - } - } + triggerDeficitTokens: { type: 'number' }, + }, + }, }, - required: ['processorId'] + required: ['processorId'], }, - create: (env, opts) => StateSnapshotProcessor.create(env, opts) + create: (env, opts) => StateSnapshotProcessor.create(env, opts), }); registry.register({ @@ -103,10 +118,10 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { type: 'object', properties: { processorId: { const: 'EmergencyTruncationProcessor' }, - options: { type: 'object' } + options: { type: 'object' }, }, - required: ['processorId'] + required: ['processorId'], }, - create: (env, opts) => EmergencyTruncationProcessor.create(env, opts) + create: (env, opts) => EmergencyTruncationProcessor.create(env, opts), }); } diff --git a/packages/core/src/context/sidecar/environment.ts b/packages/core/src/context/sidecar/environment.ts index 369ae93933..ee66ec13d0 100644 --- a/packages/core/src/context/sidecar/environment.ts +++ b/packages/core/src/context/sidecar/environment.ts @@ -3,7 +3,7 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ - import type { BaseLlmClient } from '../../core/baseLlmClient.js'; +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; import type { ContextEventBus } from '../eventBus.js'; import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; import type { ContextTracer } from '../tracer.js'; @@ -13,16 +13,15 @@ import type { IIdGenerator } from '../system/IIdGenerator.js'; export type { ContextTracer, ContextEventBus }; export interface ContextEnvironment { - readonly llmClient: BaseLlmClient; - readonly promptId: string; - readonly sessionId: string; - readonly traceDir: string; - readonly projectTempDir: string; - readonly tracer: ContextTracer; - readonly charsPerToken: number; - readonly tokenCalculator: ContextTokenCalculator; - readonly fileSystem: IFileSystem; - readonly idGenerator: IIdGenerator; - - eventBus: ContextEventBus; + readonly llmClient: BaseLlmClient; + readonly promptId: string; + readonly sessionId: string; + readonly traceDir: string; + readonly projectTempDir: string; + readonly tracer: ContextTracer; + readonly charsPerToken: number; + readonly tokenCalculator: ContextTokenCalculator; + readonly fileSystem: IFileSystem; + readonly idGenerator: IIdGenerator; + readonly eventBus: ContextEventBus; } diff --git a/packages/core/src/context/sidecar/environmentImpl.ts b/packages/core/src/context/sidecar/environmentImpl.ts index 4179c65b9b..0987e317de 100644 --- a/packages/core/src/context/sidecar/environmentImpl.ts +++ b/packages/core/src/context/sidecar/environmentImpl.ts @@ -7,9 +7,7 @@ import type { BaseLlmClient } from '../../core/baseLlmClient.js'; import type { ContextTracer } from '../tracer.js'; import type { ContextEnvironment } from './environment.js'; - import type { ContextEventBus } from '../eventBus.js'; - import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; import type { IFileSystem } from '../system/IFileSystem.js'; import { NodeFileSystem } from '../system/NodeFileSystem.js'; diff --git a/packages/core/src/context/sidecar/orchestrator.test.ts b/packages/core/src/context/sidecar/orchestrator.test.ts index b6d65ede9f..3ecd342263 100644 --- a/packages/core/src/context/sidecar/orchestrator.test.ts +++ b/packages/core/src/context/sidecar/orchestrator.test.ts @@ -7,48 +7,68 @@ import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import { PipelineOrchestrator } from './orchestrator.js'; import { ProcessorRegistry } from './registry.js'; -import { createMockEnvironment, createDummyState, createDummyEpisode } from '../testing/contextTestUtils.js'; +import { + createMockEnvironment, + createDummyState, + createDummyEpisode, +} from '../testing/contextTestUtils.js'; import type { ContextEnvironment } from './environment.js'; import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; import type { PipelineDef, ProcessorConfig, SidecarConfig } from './types.js'; import type { ContextEventBus } from '../eventBus.js'; - import type { EpisodeEditor } from '../ir/episodeEditor.js'; // Create a Dummy Processor for testing Orchestration routing class DummySyncProcessor implements ContextProcessor { - static create() { return new DummySyncProcessor(); } + static create() { + return new DummySyncProcessor(); + } constructor() {} readonly name = 'DummySync'; readonly id = 'DummySync'; readonly options = {}; async process(editor: EpisodeEditor, _state: ContextAccountingState) { - editor.editEpisode(editor.episodes[0].id, 'DUMMY_EDIT', (draft: unknown) => { + editor.editEpisode( + editor.episodes[0].id, + 'DUMMY_EDIT', + (draft: unknown) => { (draft as Record)['dummyModified'] = true; - }); + }, + ); } } class DummyAsyncProcessor implements ContextProcessor { - static create() { return new DummyAsyncProcessor(); } + static create() { + return new DummyAsyncProcessor(); + } constructor() {} readonly name = 'DummyAsync'; readonly id = 'DummyAsync'; readonly options = {}; async process(editor: EpisodeEditor, _state: ContextAccountingState) { - editor.editEpisode(editor.episodes[0].id, 'DUMMY_EDIT', (draft: unknown) => { + editor.editEpisode( + editor.episodes[0].id, + 'DUMMY_EDIT', + (draft: unknown) => { (draft as Record)['dummyAsyncModified'] = true; - }); + }, + ); } } class ThrowingProcessor implements ContextProcessor { - static create() { return new ThrowingProcessor(); } + static create() { + return new ThrowingProcessor(); + } constructor() {} readonly name = 'Throwing'; readonly id = 'Throwing'; readonly options = {}; - async process(_editor: EpisodeEditor, _state: ContextAccountingState): Promise { + async process( + _editor: EpisodeEditor, + _state: ContextAccountingState, + ): Promise { throw new Error('Processor failed intentionally'); } } @@ -63,11 +83,23 @@ describe('PipelineOrchestrator (Component)', () => { env = createMockEnvironment(); eventBus = env.eventBus; registry = new ProcessorRegistry(); - + // Register our test processors - registry.register({ id: 'DummySyncProcessor', create: () => new DummySyncProcessor() }); - registry.register({ id: 'DummyAsyncProcessor', create: () => new DummyAsyncProcessor() }); - registry.register({ id: 'ThrowingProcessor', create: () => new ThrowingProcessor() }); + registry.register({ + id: 'DummySyncProcessor', + schema: {}, + create: () => new DummySyncProcessor(), + }); + registry.register({ + id: 'DummyAsyncProcessor', + schema: {}, + create: () => new DummyAsyncProcessor(), + }); + registry.register({ + id: 'ThrowingProcessor', + schema: {}, + create: () => new ThrowingProcessor(), + }); }); afterEach(() => { @@ -78,7 +110,7 @@ describe('PipelineOrchestrator (Component)', () => { const createConfig = (pipelines: PipelineDef[]): SidecarConfig => ({ budget: { maxTokens: 100, retainedTokens: 50 }, gcBackstop: { strategy: 'truncate', target: 'max' }, - pipelines + pipelines, }); it('instantiates processors from the registry on initialization', () => { @@ -87,13 +119,23 @@ describe('PipelineOrchestrator (Component)', () => { name: 'Sync', execution: 'blocking', triggers: [], - processors: [{ processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig] - } + processors: [ + { processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig, + ], + }, ]); - - const orchestrator = new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - expect((orchestrator as any).instantiatedProcessors.has('DummySyncProcessor')).toBe(true); + + const orchestrator = new PipelineOrchestrator( + config, + env, + eventBus, + env.tracer, + registry, + ); + expect( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (orchestrator as any).instantiatedProcessors.has('DummySyncProcessor'), + ).toBe(true); }); it('throws an error if a config requests an unknown processor', () => { @@ -102,12 +144,16 @@ describe('PipelineOrchestrator (Component)', () => { name: 'Bad', execution: 'blocking', triggers: [], - processors: [{ processorId: 'DoesNotExist' } as unknown as ProcessorConfig] - } + processors: [ + { processorId: 'DoesNotExist' } as unknown as ProcessorConfig, + ], + }, ]); - - expect(() => new PipelineOrchestrator(config, env, eventBus, env.tracer, registry)) - .toThrow('Context Processor [DoesNotExist] is not registered.'); + + expect( + () => + new PipelineOrchestrator(config, env, eventBus, env.tracer, registry), + ).toThrow('Context Processor [DoesNotExist] is not registered.'); }); it('executes blocking pipelines synchronously and returns the modified array', async () => { @@ -116,18 +162,32 @@ describe('PipelineOrchestrator (Component)', () => { name: 'SyncPipe', execution: 'blocking', triggers: [], - processors: [{ processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig] - } + processors: [ + { processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig, + ], + }, ]); - const orchestrator = new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); - + const orchestrator = new PipelineOrchestrator( + config, + env, + eventBus, + env.tracer, + registry, + ); + const episodes = [createDummyEpisode('1', 'USER_PROMPT', [])]; const state = createDummyState(false); - - const result = await orchestrator.executePipeline('SyncPipe', episodes, state); - + + const result = await orchestrator.executePipeline( + 'SyncPipe', + episodes, + state, + ); + expect(result).toHaveLength(1); - expect((result[0] as unknown as {dummyModified: boolean}).dummyModified).toBe(true); + expect( + (result[0] as unknown as { dummyModified: boolean }).dummyModified, + ).toBe(true); }); it('executes background pipelines asynchronously without blocking the return', async () => { @@ -136,22 +196,36 @@ describe('PipelineOrchestrator (Component)', () => { name: 'AsyncPipe', execution: 'background', triggers: [], - processors: [{ processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig] - } + processors: [ + { processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig, + ], + }, ]); - const orchestrator = new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); - + const orchestrator = new PipelineOrchestrator( + config, + env, + eventBus, + env.tracer, + registry, + ); + const episodes = [createDummyEpisode('1', 'USER_PROMPT', [])]; const state = createDummyState(false); - + // This should resolve immediately with the UNMODIFIED array because execution is background - const result = await orchestrator.executePipeline('AsyncPipe', episodes, state); - + const result = await orchestrator.executePipeline( + 'AsyncPipe', + episodes, + state, + ); + expect(result).toHaveLength(1); - expect((result[0] as unknown as {asyncModified: unknown}).asyncModified).toBeUndefined(); // Not modified yet! - + expect( + (result[0] as unknown as { asyncModified: unknown }).asyncModified, + ).toBeUndefined(); // Not modified yet! + // Wait for the background task to complete (50ms delay in DummyAsyncProcessor) - await new Promise(resolve => setTimeout(resolve, 60)); + await new Promise((resolve) => setTimeout(resolve, 60)); }); it('gracefully handles and swallows processor errors in synchronous pipelines', async () => { @@ -160,17 +234,29 @@ describe('PipelineOrchestrator (Component)', () => { name: 'ThrowingPipe', execution: 'blocking', triggers: [], - processors: [{ processorId: 'ThrowingProcessor' } as unknown as ProcessorConfig] - } + processors: [ + { processorId: 'ThrowingProcessor' } as unknown as ProcessorConfig, + ], + }, ]); - const orchestrator = new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); - + const orchestrator = new PipelineOrchestrator( + config, + env, + eventBus, + env.tracer, + registry, + ); + const episodes = [createDummyEpisode('1', 'USER_PROMPT', [])]; const state = createDummyState(false); - + // It should not throw! It should swallow the error and return the unmodified array. - const result = await orchestrator.executePipeline('ThrowingPipe', episodes, state); - + const result = await orchestrator.executePipeline( + 'ThrowingPipe', + episodes, + state, + ); + expect(result).toHaveLength(1); expect(result).toStrictEqual(episodes); }); @@ -181,21 +267,26 @@ describe('PipelineOrchestrator (Component)', () => { name: 'PressureRelief', execution: 'background', triggers: ['budget_exceeded'], - processors: [{ processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig] - } + processors: [ + { processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig, + ], + }, ]); - + // Spy on the private method to see if the trigger fires it - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const executeSpy = vi.spyOn(PipelineOrchestrator.prototype as any, 'executePipelineAsync'); - + const executeSpy = vi.spyOn( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + PipelineOrchestrator.prototype as any, + 'executePipelineAsync', + ); + new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); - + const episodes = [createDummyEpisode('1', 'USER_PROMPT', [])]; - + // Emit the trigger eventBus.emitConsolidationNeeded({ episodes, targetDeficit: 100 }); - + expect(executeSpy).toHaveBeenCalled(); }); }); diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 085be7c9fb..10905f0015 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -7,7 +7,11 @@ import type { Episode } from '../ir/types.js'; import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; import type { SidecarConfig, PipelineDef } from './types.js'; -import type { ContextEnvironment, ContextEventBus, ContextTracer } from './environment.js'; +import type { + ContextEnvironment, + ContextEventBus, + ContextTracer, +} from './environment.js'; import type { ProcessorRegistry } from './registry.js'; import { debugLogger } from '../../utils/debugLogger.js'; import { EpisodeEditor } from '../ir/episodeEditor.js'; @@ -21,7 +25,7 @@ export class PipelineOrchestrator { private readonly env: ContextEnvironment, private readonly eventBus: ContextEventBus, private readonly tracer: ContextTracer, - private readonly registry: ProcessorRegistry + private readonly registry: ProcessorRegistry, ) { this.instantiateProcessors(); this.registerTriggers(); @@ -36,11 +40,16 @@ export class PipelineOrchestrator { if (!this.instantiatedProcessors.has(procDef.processorId)) { const processorClass = this.registry.get(procDef.processorId); if (!processorClass) { - throw new Error(`Context Processor [${procDef.processorId}] is not registered.`); + throw new Error( + `Context Processor [${procDef.processorId}] is not registered.`, + ); } // The Orchestrator injects standard dependencies required by processors // If a processor needs the eventBus (like Snapshot), it expects it via constructor. - const instance = processorClass.create(this.env, procDef.options ?? {}); + const instance = processorClass.create( + this.env, + procDef.options ?? {}, + ); this.instantiatedProcessors.set(procDef.processorId, instance); } } @@ -55,22 +64,22 @@ export class PipelineOrchestrator { for (const trigger of pipeline.triggers) { if (typeof trigger === 'object' && trigger.type === 'timer') { const timer = setInterval(() => { - // For background timers, we need a way to get the latest state - // But timers are generally disabled right now via the triggers config. - // If needed, we will pass it via event bus. + // For background timers, we need a way to get the latest state + // But timers are generally disabled right now via the triggers config. + // If needed, we will pass it via event bus. }, trigger.intervalMs); this.activeTimers.push(timer); } else if (trigger === 'budget_exceeded') { this.eventBus.onConsolidationNeeded((event) => { - const state: ContextAccountingState = { - currentTokens: 0, - retainedTokens: this.config.budget.retainedTokens, - maxTokens: this.config.budget.maxTokens, - isBudgetSatisfied: false, - deficitTokens: event.targetDeficit, - protectedEpisodeIds: new Set() - }; - void this.executePipelineAsync(pipeline, event.episodes, state); + const state: ContextAccountingState = { + currentTokens: 0, + retainedTokens: this.config.budget.retainedTokens, + maxTokens: this.config.budget.maxTokens, + isBudgetSatisfied: false, + deficitTokens: event.targetDeficit, + protectedEpisodeIds: new Set(), + }; + void this.executePipelineAsync(pipeline, event.episodes, state); }); } } @@ -90,19 +99,26 @@ export class PipelineOrchestrator { /** * Executes a pipeline based on its configured execution strategy ('blocking' or 'background'). */ - async executePipeline(pipelineName: string, episodes: Episode[], state: ContextAccountingState): Promise { - const pipeline = this.config.pipelines.find(p => p.name === pipelineName); + async executePipeline( + pipelineName: string, + episodes: Episode[], + state: ContextAccountingState, + ): Promise { + const pipeline = this.config.pipelines.find((p) => p.name === pipelineName); if (!pipeline) return episodes; if (pipeline.execution === 'background') { - this.executePipelineAsync(pipeline, episodes, state).catch(e => { - debugLogger.error(`Background pipeline ${pipeline.name} failed:`, e); - }); - return episodes; // Return immediately + this.executePipelineAsync(pipeline, episodes, state).catch((e) => { + debugLogger.error(`Background pipeline ${pipeline.name} failed:`, e); + }); + return episodes; // Return immediately } // Blocking execution - this.tracer.logEvent('Orchestrator', `Triggering synchronous pipeline: ${pipeline.name}`); + this.tracer.logEvent( + 'Orchestrator', + `Triggering synchronous pipeline: ${pipeline.name}`, + ); let currentEpisodes = [...episodes]; for (let i = 0; i < pipeline.processors.length; i++) { const procDef = pipeline.processors[i]; @@ -110,12 +126,18 @@ export class PipelineOrchestrator { if (!processor) continue; try { - this.tracer.logEvent('Orchestrator', `Executing processor: ${procDef.processorId}`); + this.tracer.logEvent( + 'Orchestrator', + `Executing processor: ${procDef.processorId}`, + ); const editor = new EpisodeEditor(currentEpisodes); await processor.process(editor, state); currentEpisodes = editor.getFinalEpisodes(); } catch (error) { - debugLogger.error(`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, error); + debugLogger.error( + `Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, + error, + ); return currentEpisodes; // Return what we have so far } } @@ -126,8 +148,15 @@ export class PipelineOrchestrator { /** * Internal method for running a pipeline entirely in the background. */ - private async executePipelineAsync(pipeline: PipelineDef, currentState: Episode[], state: ContextAccountingState) { - this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`); + private async executePipelineAsync( + pipeline: PipelineDef, + currentState: Episode[], + state: ContextAccountingState, + ) { + this.tracer.logEvent( + 'Orchestrator', + `Triggering async pipeline: ${pipeline.name}`, + ); if (!currentState || currentState.length === 0) return; let currentEpisodes = [...currentState]; @@ -137,53 +166,67 @@ export class PipelineOrchestrator { if (!processor) continue; try { - this.tracer.logEvent('Orchestrator', `Executing processor: ${procDef.processorId} (async)`); - + this.tracer.logEvent( + 'Orchestrator', + `Executing processor: ${procDef.processorId} (async)`, + ); + const editor = new EpisodeEditor(currentEpisodes); await processor.process(editor, state); currentEpisodes = editor.getFinalEpisodes(); - + // Synthesize VariantReady events for anything that changed or was newly created for (const mutation of editor.getMutations()) { - // We only broadcast modifications or replacements - // (Insertions without replacement and deletions are not tracked as variants on an existing node) - if (mutation.type === 'modified' || mutation.type === 'replaced') { - const variantId = `v-${procDef.processorId.toLowerCase()}`; - - let vType: 'snapshot' | 'summary' | 'masked' = 'masked'; - if (procDef.processorId.includes('Snapshot')) vType = 'snapshot'; - else if (procDef.processorId.includes('Semantic')) vType = 'summary'; - - const ep = mutation.episode!; - let fallbackText = ''; - if (ep.yield?.text) fallbackText = ep.yield.text; - else if (ep.trigger?.type === 'USER_PROMPT') { - const firstPart = ep.trigger.semanticParts?.[0]; - if (firstPart) { - fallbackText = firstPart.type === 'text' ? (firstPart.presentation?.text || firstPart.text) : ''; - } - } + // We only broadcast modifications or replacements + // (Insertions without replacement and deletions are not tracked as variants on an existing node) + if (mutation.type === 'modified' || mutation.type === 'replaced') { + const variantId = `v-${procDef.processorId.toLowerCase()}`; - this.eventBus.emitVariantReady({ - targetId: mutation.type === 'replaced' ? mutation.originalIds![0] : ep.id, - variantId, - variant: (vType === 'snapshot' ? { + let vType: 'snapshot' | 'summary' | 'masked' = 'masked'; + if (procDef.processorId.includes('Snapshot')) vType = 'snapshot'; + else if (procDef.processorId.includes('Semantic')) + vType = 'summary'; + + const ep = mutation.episode!; + let fallbackText = ''; + if (ep.yield?.text) fallbackText = ep.yield.text; + else if (ep.trigger?.type === 'USER_PROMPT') { + const firstPart = ep.trigger.semanticParts?.[0]; + if (firstPart) { + fallbackText = + firstPart.type === 'text' + ? firstPart.presentation?.text || firstPart.text + : ''; + } + } + + this.eventBus.emitVariantReady({ + targetId: + mutation.type === 'replaced' ? mutation.originalIds![0] : ep.id, + variantId, + variant: + vType === 'snapshot' + ? { status: 'ready', type: 'snapshot', episode: ep, recoveredTokens: ep.yield?.metadata?.currentTokens || 10, replacedEpisodeIds: mutation.originalIds || [], - } : { + } + : { status: 'ready', type: vType, text: fallbackText, recoveredTokens: ep.yield?.metadata?.currentTokens || 10, - }) - }); - } + }, + }); + } } } catch (error) { - debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error); + debugLogger.error( + `Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, + error, + ); return; // Halt pipeline } } diff --git a/packages/core/src/context/sidecar/profiles.ts b/packages/core/src/context/sidecar/profiles.ts index fbfb97daa3..f5012319f7 100644 --- a/packages/core/src/context/sidecar/profiles.ts +++ b/packages/core/src/context/sidecar/profiles.ts @@ -26,20 +26,29 @@ export const defaultSidecarProfile: SidecarConfig = { triggers: ['on_turn'], execution: 'blocking', processors: [ - { processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 8000 } }, + { + processorId: 'ToolMaskingProcessor', + options: { stringLengthThresholdTokens: 8000 }, + }, { processorId: 'BlobDegradationProcessor', options: {} }, - { processorId: 'SemanticCompressionProcessor', options: { nodeThresholdTokens: 5000 } }, - { processorId: 'EmergencyTruncationProcessor', options: {} } - ] + { + processorId: 'SemanticCompressionProcessor', + options: { nodeThresholdTokens: 5000 }, + }, + { processorId: 'EmergencyTruncationProcessor', options: {} }, + ], }, { name: 'Deep Background Compression', triggers: [{ type: 'timer', intervalMs: 5000 }, 'budget_exceeded'], execution: 'background', processors: [ - { processorId: 'HistorySquashingProcessor', options: { maxTokensPerNode: 3000 } }, - { processorId: 'StateSnapshotProcessor', options: {} } - ] - } - ] + { + processorId: 'HistorySquashingProcessor', + options: { maxTokensPerNode: 3000 }, + }, + { processorId: 'StateSnapshotProcessor', options: {} }, + ], + }, + ], }; diff --git a/packages/core/src/context/sidecar/registry.ts b/packages/core/src/context/sidecar/registry.ts index 46232baa23..6010ded765 100644 --- a/packages/core/src/context/sidecar/registry.ts +++ b/packages/core/src/context/sidecar/registry.ts @@ -9,11 +9,8 @@ import type { ContextEnvironment } from './environment.js'; export interface ContextProcessorDef { readonly id: string; - readonly schema?: object; - create( - env: ContextEnvironment, - options: TOptions, - ): ContextProcessor; + readonly schema: object; + create(env: ContextEnvironment, options: TOptions): ContextProcessor; } /** diff --git a/packages/core/src/context/sidecar/schema.ts b/packages/core/src/context/sidecar/schema.ts index 066d07a2b8..6f1efd57e7 100644 --- a/packages/core/src/context/sidecar/schema.ts +++ b/packages/core/src/context/sidecar/schema.ts @@ -3,97 +3,101 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + import type { ProcessorRegistry } from './registry.js'; import './builtins.js'; export function getSidecarConfigSchema(registry: ProcessorRegistry) { return { - $schema: "http://json-schema.org/draft-07/schema#", - title: "SidecarConfig", - description: "The Data-Driven Schema for the Context Manager.", - type: "object", - required: ["budget", "gcBackstop", "pipelines"], + $schema: 'http://json-schema.org/draft-07/schema#', + title: 'SidecarConfig', + description: 'The Data-Driven Schema for the Context Manager.', + type: 'object', + required: ['budget', 'gcBackstop', 'pipelines'], properties: { budget: { - type: "object", - description: "Defines the token ceilings and limits for the pipeline.", - required: ["retainedTokens", "maxTokens"], + type: 'object', + description: 'Defines the token ceilings and limits for the pipeline.', + required: ['retainedTokens', 'maxTokens'], properties: { retainedTokens: { - type: "number", - description: "The ideal token count the pipeline tries to shrink down to." + type: 'number', + description: + 'The ideal token count the pipeline tries to shrink down to.', }, maxTokens: { - type: "number", - description: "The absolute maximum token count allowed before synchronous truncation kicks in." - } - } + type: 'number', + description: + 'The absolute maximum token count allowed before synchronous truncation kicks in.', + }, + }, }, gcBackstop: { - type: "object", - description: "Defines what happens when the pipeline fails to compress under 'maxTokens'", - required: ["strategy", "target"], + type: 'object', + description: + "Defines what happens when the pipeline fails to compress under 'maxTokens'", + required: ['strategy', 'target'], properties: { strategy: { - type: "string", - enum: ["truncate", "compress", "rollingSummarizer"] + type: 'string', + enum: ['truncate', 'compress', 'rollingSummarizer'], }, target: { - type: "string", - enum: ["incremental", "freeNTokens", "max"] + type: 'string', + enum: ['incremental', 'freeNTokens', 'max'], }, freeTokensTarget: { - type: "number" - } - } + type: 'number', + }, + }, }, pipelines: { - type: "array", - description: "The execution graphs for context manipulation.", + type: 'array', + description: 'The execution graphs for context manipulation.', items: { - type: "object", - required: ["name", "triggers", "execution", "processors"], + type: 'object', + required: ['name', 'triggers', 'execution', 'processors'], properties: { name: { - type: "string" + type: 'string', }, triggers: { - type: "array", + type: 'array', items: { anyOf: [ { - type: "string", - enum: ["on_turn", "post_turn", "budget_exceeded"] + type: 'string', + enum: ['on_turn', 'post_turn', 'budget_exceeded'], }, { - type: "object", - required: ["type", "intervalMs"], + type: 'object', + required: ['type', 'intervalMs'], properties: { type: { - type: "string", - const: "timer" + type: 'string', + const: 'timer', }, intervalMs: { - type: "number" - } - } - } - ] - } + type: 'number', + }, + }, + }, + ], + }, }, execution: { - type: "string", - enum: ["blocking", "background"] + type: 'string', + enum: ['blocking', 'background'], }, processors: { - type: "array", + type: 'array', items: { - oneOf: registry.getSchemas() - } - } - } - } - } - } + oneOf: registry.getSchemas(), + }, + }, + }, + }, + }, + }, }; } diff --git a/packages/core/src/context/sidecar/types.ts b/packages/core/src/context/sidecar/types.ts index 37afc9004d..19e7a4f74a 100644 --- a/packages/core/src/context/sidecar/types.ts +++ b/packages/core/src/context/sidecar/types.ts @@ -10,12 +10,27 @@ import type { StateSnapshotProcessorOptions } from '../processors/stateSnapshotP * Definition of a processor or worker to be instantiated in the graph. */ export type ProcessorConfig = - | { processorId: 'ToolMaskingProcessor'; options: { stringLengthThresholdTokens: number } } + | { + processorId: 'ToolMaskingProcessor'; + options: { stringLengthThresholdTokens: number }; + } | { processorId: 'BlobDegradationProcessor'; options?: object } - | { processorId: 'SemanticCompressionProcessor'; options: { nodeThresholdTokens: number } } - | { processorId: 'HistorySquashingProcessor'; options: { maxTokensPerNode: number } } - | { processorId: 'StateSnapshotProcessor'; options: StateSnapshotProcessorOptions } - | { processorId: 'EmergencyTruncationProcessor'; options?: Record }; + | { + processorId: 'SemanticCompressionProcessor'; + options: { nodeThresholdTokens: number }; + } + | { + processorId: 'HistorySquashingProcessor'; + options: { maxTokensPerNode: number }; + } + | { + processorId: 'StateSnapshotProcessor'; + options: StateSnapshotProcessorOptions; + } + | { + processorId: 'EmergencyTruncationProcessor'; + options?: Record; + }; export type PipelineTrigger = | 'on_turn' diff --git a/packages/core/src/context/system-tests/SimulationHarness.ts b/packages/core/src/context/system-tests/SimulationHarness.ts index dcb12abded..65d5feb896 100644 --- a/packages/core/src/context/system-tests/SimulationHarness.ts +++ b/packages/core/src/context/system-tests/SimulationHarness.ts @@ -1,9 +1,3 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ -import type { BaseLlmClient } from "../../core/baseLlmClient.js"; /** * @license * Copyright 2026 Google LLC @@ -19,8 +13,11 @@ import { ContextTracer } from '../tracer.js'; import { ContextEventBus } from '../eventBus.js'; import { PipelineOrchestrator } from '../sidecar/orchestrator.js'; import { registerBuiltInProcessors } from '../sidecar/builtins.js'; -import { debugLogger } from "../../utils/debugLogger.js"; -import { ProcessorRegistry } from "../sidecar/registry.js"; +import { debugLogger } from '../../utils/debugLogger.js'; +import { ProcessorRegistry } from '../sidecar/registry.js'; +import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js'; +import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js'; +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; export interface TurnSummary { turnIndex: number; @@ -39,7 +36,11 @@ export class SimulationHarness { private currentTurnIndex = 0; private tokenTrajectory: TurnSummary[] = []; - static async create(config: SidecarConfig, mockLlmClient: BaseLlmClient, mockTempDir = '/tmp/sim'): Promise { + static async create( + config: SidecarConfig, + mockLlmClient: BaseLlmClient, + mockTempDir = '/tmp/sim', + ): Promise { const harness = new SimulationHarness(); await harness.init(config, mockLlmClient, mockTempDir); return harness; @@ -53,19 +54,17 @@ export class SimulationHarness { private async init( config: SidecarConfig, mockLlmClient: BaseLlmClient, - mockTempDir: string + mockTempDir: string, ) { this.config = config; const registry = new ProcessorRegistry(); // Register all standard processors registerBuiltInProcessors(registry); - this.tracer = new ContextTracer({ targetDir: mockTempDir, sessionId: 'sim-session' }); - - // Using real token calculator instead of mock, so we test actual string sizes - const InMemoryFS = (await import('../system/InMemoryFileSystem.js')).InMemoryFileSystem; - const DetIdGen = (await import('../system/DeterministicIdGenerator.js')).DeterministicIdGenerator; - + this.tracer = new ContextTracer({ + targetDir: mockTempDir, + sessionId: 'sim-session', + }); this.env = new ContextEnvironmentImpl( mockLlmClient, 'sim-prompt', @@ -75,12 +74,24 @@ export class SimulationHarness { this.tracer, 4, // 4 chars per token average this.eventBus, - new InMemoryFS(), - new DetIdGen() + new InMemoryFileSystem(), + new DeterministicIdGenerator(), ); - this.orchestrator = new PipelineOrchestrator(config, this.env, this.eventBus, this.tracer, registry); - this.contextManager = ContextManager.create(config, this.env, this.tracer, this.orchestrator, registry); + this.orchestrator = new PipelineOrchestrator( + config, + this.env, + this.eventBus, + this.tracer, + registry, + ); + this.contextManager = ContextManager.create( + config, + this.env, + this.tracer, + this.orchestrator, + registry, + ); this.contextManager.subscribeToHistory(this.chatHistory); } @@ -92,61 +103,74 @@ export class SimulationHarness { // 1. Append the new messages const currentHistory = this.chatHistory.get(); this.chatHistory.set([...currentHistory, ...messages]); - + // 2. Measure tokens immediately after append (Before background processing) const tokensBefore = this.env.tokenCalculator.calculateEpisodeListTokens( - this.contextManager.getWorkingBufferView() + this.contextManager.getWorkingBufferView(), ); - debugLogger.log(`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`); - + debugLogger.log( + `[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`, + ); + // 3. Yield to event loop to allow internal async subscribers and orchestrator to finish - await new Promise(resolve => setTimeout(resolve, 50)); - + await new Promise((resolve) => setTimeout(resolve, 50)); + // 3.1 Simulate what projectCompressedHistory does with the sync handlers let currentView = this.contextManager.getWorkingBufferView(); - const currentTokens = this.env.tokenCalculator.calculateEpisodeListTokens(currentView); + const currentTokens = + this.env.tokenCalculator.calculateEpisodeListTokens(currentView); if (this.config.budget && currentTokens > this.config.budget.maxTokens) { - debugLogger.log(`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.budget.maxTokens}`); - const syncPipelines = this.config.pipelines.filter(p => p.execution === 'blocking'); + debugLogger.log( + `[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.budget.maxTokens}`, + ); + const syncPipelines = this.config.pipelines.filter( + (p) => p.execution === 'blocking', + ); const orchestrator = this.orchestrator; for (const pipe of syncPipelines) { - await orchestrator.executePipeline(pipe.name, currentView, { - currentTokens, - maxTokens: this.config.budget.maxTokens, - retainedTokens: this.config.budget.retainedTokens, - isBudgetSatisfied: false, - deficitTokens: currentTokens - this.config.budget.maxTokens, - protectedEpisodeIds: new Set() - }); - currentView = this.contextManager.getWorkingBufferView(); + await orchestrator.executePipeline(pipe.name, currentView, { + currentTokens, + maxTokens: this.config.budget.maxTokens, + retainedTokens: this.config.budget.retainedTokens, + isBudgetSatisfied: false, + deficitTokens: currentTokens - this.config.budget.maxTokens, + protectedEpisodeIds: new Set(), + }); + currentView = this.contextManager.getWorkingBufferView(); } - + // Inject the truncated view back into the graph for (let i = 0; i < currentView.length; i++) { - const ep = currentView[i]; - if (!this.contextManager.getWorkingBufferView().find(c => c.id === ep.id)) { - this.eventBus.emitVariantReady({ - targetId: ep.id, - variantId: 'v-emergency', - variant: { - status: 'ready', - type: 'masked', // Truncation is technically a mask - text: ep.yield?.text || '', - recoveredTokens: 0, - } - }); - } + const ep = currentView[i]; + if ( + !this.contextManager + .getWorkingBufferView() + .find((c) => c.id === ep.id) + ) { + this.eventBus.emitVariantReady({ + targetId: ep.id, + variantId: 'v-emergency', + variant: { + status: 'ready', + type: 'masked', // Truncation is technically a mask + text: ep.yield?.text || '', + recoveredTokens: 0, + }, + }); + } } // Wait for variant propagation - await new Promise(resolve => setTimeout(resolve, 50)); + await new Promise((resolve) => setTimeout(resolve, 50)); } - + // 4. Measure tokens after background processors have (hopefully) emitted variants const tokensAfter = this.env.tokenCalculator.calculateEpisodeListTokens( - this.contextManager.getWorkingBufferView() + this.contextManager.getWorkingBufferView(), ); - debugLogger.log(`[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`); - + debugLogger.log( + `[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`, + ); + this.tokenTrajectory.push({ turnIndex: this.currentTurnIndex++, tokensBeforeBackground: tokensBefore, @@ -155,10 +179,11 @@ export class SimulationHarness { } async getGoldenState() { - const finalProjection = await this.contextManager.projectCompressedHistory(); + const finalProjection = + await this.contextManager.projectCompressedHistory(); return { tokenTrajectory: this.tokenTrajectory, - finalProjection + finalProjection, }; } } diff --git a/packages/core/src/context/system-tests/lifecycle.golden.test.ts b/packages/core/src/context/system-tests/lifecycle.golden.test.ts index c78f2331ee..fb8dc5b28a 100644 --- a/packages/core/src/context/system-tests/lifecycle.golden.test.ts +++ b/packages/core/src/context/system-tests/lifecycle.golden.test.ts @@ -12,9 +12,14 @@ import type { BaseLlmClient } from '../../core/baseLlmClient.js'; expect.addSnapshotSerializer({ test: (val) => typeof val === 'string' && - (/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(val) || - /^\/tmp\/sim/.test(val)), // Mask temp directories and UUIDs - print: (val) => (typeof val === 'string' && /^\/tmp\/sim/.test(val) ? '""' : '""'), + (/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test( + val, + ) || + /^\/tmp\/sim/.test(val)), // Mask temp directories and UUIDs + print: (val) => + typeof val === 'string' && /^\/tmp\/sim/.test(val) + ? '""' + : '""', }); describe('System Lifecycle Golden Tests', () => { @@ -36,69 +41,106 @@ describe('System Lifecycle Golden Tests', () => { triggers: ['budget_exceeded'], processors: [ { processorId: 'BlobDegradationProcessor' }, - { processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 50 } }, // Mask any tool string > 200 chars - { processorId: 'StateSnapshotProcessor', options: {} } // Squash old history - ] + { + processorId: 'ToolMaskingProcessor', + options: { stringLengthThresholdTokens: 50 }, + }, // Mask any tool string > 200 chars + { processorId: 'StateSnapshotProcessor', options: {} }, // Squash old history + ], }, { name: 'Immediate Sanitization', // The magic string the projector is hardcoded to use execution: 'blocking', triggers: ['budget_exceeded'], processors: [ - { processorId: 'EmergencyTruncationProcessor', options: {} } - ] - } - ] + { processorId: 'EmergencyTruncationProcessor', options: {} }, + ], + }, + ], }); const mockLlmClient = { generateContent: vi.fn().mockResolvedValue({ text: '', - }) + }), } as unknown as BaseLlmClient; it('Scenario 1: Organic Growth with Huge Tool Output & Images', async () => { - const harness = await SimulationHarness.create(getAggressiveConfig(), mockLlmClient); + const harness = await SimulationHarness.create( + getAggressiveConfig(), + mockLlmClient, + ); // Turn 0: System Prompt await harness.simulateTurn([ { role: 'user', parts: [{ text: 'System Instructions' }] }, - { role: 'model', parts: [{ text: 'Ack.' }] } + { role: 'model', parts: [{ text: 'Ack.' }] }, ]); // Turn 1: Normal conversation await harness.simulateTurn([ { role: 'user', parts: [{ text: 'Hello!' }] }, - { role: 'model', parts: [{ text: 'Hi, how can I help?' }] } + { role: 'model', parts: [{ text: 'Hi, how can I help?' }] }, ]); // Turn 2: Massive Tool Output (Should trigger ToolMaskingProcessor in background) await harness.simulateTurn([ { role: 'user', parts: [{ text: 'Read the logs.' }] }, - { role: 'model', parts: [{ functionCall: { name: 'run_shell_command', args: { cmd: 'cat server.log' } } }] }, - { role: 'user', parts: [{ functionResponse: { name: 'run_shell_command', response: { output: 'LOG '.repeat(5000) } } }] }, - { role: 'model', parts: [{ text: 'The logs are very long.' }] } + { + role: 'model', + parts: [ + { + functionCall: { + name: 'run_shell_command', + args: { cmd: 'cat server.log' }, + }, + }, + ], + }, + { + role: 'user', + parts: [ + { + functionResponse: { + name: 'run_shell_command', + response: { output: 'LOG '.repeat(5000) }, + }, + }, + ], + }, + { role: 'model', parts: [{ text: 'The logs are very long.' }] }, ]); // Turn 3: Multi-modal blob (Should trigger BlobDegradationProcessor) await harness.simulateTurn([ - { role: 'user', parts: [{ text: 'Look at this architecture diagram:' }, { inlineData: { mimeType: 'image/png', data: 'fake_base64_data_'.repeat(1000) } }] }, - { role: 'model', parts: [{ text: 'Nice diagram.' }] } + { + role: 'user', + parts: [ + { text: 'Look at this architecture diagram:' }, + { + inlineData: { + mimeType: 'image/png', + data: 'fake_base64_data_'.repeat(1000), + }, + }, + ], + }, + { role: 'model', parts: [{ text: 'Nice diagram.' }] }, ]); // Turn 4: More conversation to trigger StateSnapshot await harness.simulateTurn([ { role: 'user', parts: [{ text: 'Can we refactor?' }] }, - { role: 'model', parts: [{ text: 'Yes we can.' }] } + { role: 'model', parts: [{ text: 'Yes we can.' }] }, ]); // Get final state const goldenState = await harness.getGoldenState(); - - // In a perfectly functioning opportunistic system, the token trajectory should show + + // In a perfectly functioning opportunistic system, the token trajectory should show // the massive spikes in Turn 2 and 3 being immediately resolved by the background tasks. // The final projection should fit neatly under the Max Tokens limit. - + expect(goldenState).toMatchSnapshot(); }); }); diff --git a/packages/core/src/context/system/IFileSystem.ts b/packages/core/src/context/system/IFileSystem.ts index bb5ede4054..cfeca1f3ff 100644 --- a/packages/core/src/context/system/IFileSystem.ts +++ b/packages/core/src/context/system/IFileSystem.ts @@ -11,7 +11,7 @@ export interface IFileSystem { writeFileSync(path: string, data: string | Buffer, encoding?: 'utf-8'): void; appendFileSync(path: string, data: string, encoding: 'utf-8'): void; mkdirSync(path: string, options?: { recursive?: boolean }): void; - + writeFile(path: string, data: string | Buffer): Promise; mkdir(path: string, options?: { recursive?: boolean }): Promise; diff --git a/packages/core/src/context/system/InMemoryFileSystem.ts b/packages/core/src/context/system/InMemoryFileSystem.ts index 40ea3f4830..b407ae31f5 100644 --- a/packages/core/src/context/system/InMemoryFileSystem.ts +++ b/packages/core/src/context/system/InMemoryFileSystem.ts @@ -18,43 +18,47 @@ export class InMemoryFileSystem implements IFileSystem { } private normalize(p: string): string { - return p.replace(/\/+/g, '/'); + return p.replace(/\/+/g, '/'); } existsSync(p: string): boolean { return this.files.has(this.normalize(p)); } - + statSyncSize(p: string): number { const content = this.files.get(this.normalize(p)); if (content === undefined) { - throw new Error(`ENOENT: no such file or directory, stat '${p}'`); + throw new Error(`ENOENT: no such file or directory, stat '${p}'`); } - return Buffer.isBuffer(content) ? content.byteLength : Buffer.byteLength(content, 'utf8'); + return Buffer.isBuffer(content) + ? content.byteLength + : Buffer.byteLength(content, 'utf8'); } - + readFileSync(p: string, encoding: 'utf8'): string { const content = this.files.get(this.normalize(p)); if (content === undefined) { - throw new Error(`ENOENT: no such file or directory, open '${p}'`); + throw new Error(`ENOENT: no such file or directory, open '${p}'`); } if (Buffer.isBuffer(content)) { - return content.toString(encoding); + return content.toString(encoding); } return content; } - + writeFileSync(p: string, data: string | Buffer, _encoding?: 'utf-8'): void { this.files.set(this.normalize(p), data); } - + appendFileSync(p: string, data: string, _encoding: 'utf-8'): void { const norm = this.normalize(p); const existing = this.files.get(norm) || ''; - const existingStr = Buffer.isBuffer(existing) ? existing.toString('utf8') : existing; + const existingStr = Buffer.isBuffer(existing) + ? existing.toString('utf8') + : existing; this.files.set(norm, existingStr + data); } - + mkdirSync(_p: string, _options?: { recursive?: boolean }): void {} async writeFile(p: string, data: string | Buffer): Promise { diff --git a/packages/core/src/context/system/NodeFileSystem.ts b/packages/core/src/context/system/NodeFileSystem.ts index bd455b94f5..a2d71c468c 100644 --- a/packages/core/src/context/system/NodeFileSystem.ts +++ b/packages/core/src/context/system/NodeFileSystem.ts @@ -13,27 +13,27 @@ export class NodeFileSystem implements IFileSystem { existsSync(p: string): boolean { return fs.existsSync(p); } - + statSyncSize(p: string): number { return fs.statSync(p).size; } - + readFileSync(p: string, encoding: 'utf8'): string { return fs.readFileSync(p, encoding); } - + writeFileSync(p: string, data: string | Buffer, encoding?: 'utf-8'): void { if (Buffer.isBuffer(data)) { - fs.writeFileSync(p, data); + fs.writeFileSync(p, data); } else { - fs.writeFileSync(p, data, encoding); + fs.writeFileSync(p, data, encoding); } } - + appendFileSync(p: string, data: string, encoding: 'utf-8'): void { fs.appendFileSync(p, data, encoding); } - + mkdirSync(p: string, options?: { recursive?: boolean }): void { fs.mkdirSync(p, options); } diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index fef8259117..fa15f61a05 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -10,10 +10,14 @@ import type { ContextEnvironment } from '../sidecar/environment.js'; import type { Content } from '@google/genai'; import { AgentChatHistory } from '../../core/agentChatHistory.js'; import { ContextManager } from '../contextManager.js'; - import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js'; import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js'; -import type { Episode, UserPrompt, SystemEvent, SemanticPart } from '../ir/types.js'; +import type { + Episode, + UserPrompt, + SystemEvent, + SemanticPart, +} from '../ir/types.js'; import type { ContextAccountingState } from '../pipeline.js'; import { randomUUID } from 'node:crypto'; @@ -39,39 +43,56 @@ export function createDummyEpisode( id: string, type: 'USER_PROMPT' | 'SYSTEM_EVENT', parts: SemanticPart[] = [], - toolSteps: Array<{ intent: Record; observation: Record; toolName?: string; tokens?: { intent: number; observation: number } }> = [] + toolSteps: Array<{ + intent: Record; + observation: Record; + toolName?: string; + tokens?: { intent: number; observation: number }; + }> = [], ): Episode { let trigger: UserPrompt | SystemEvent; if (type === 'USER_PROMPT') { - trigger = { - id: randomUUID(), - type: 'USER_PROMPT', - semanticParts: parts, - metadata: { originalTokens: 100, currentTokens: 100, transformations: [] }, - }; + trigger = { + id: randomUUID(), + type: 'USER_PROMPT', + semanticParts: parts, + metadata: { + originalTokens: 100, + currentTokens: 100, + transformations: [], + }, + }; } else { - trigger = { - id: randomUUID(), - type: 'SYSTEM_EVENT', - name: 'dummy_event', - payload: {}, - metadata: { originalTokens: 100, currentTokens: 100, transformations: [] }, - }; + trigger = { + id: randomUUID(), + type: 'SYSTEM_EVENT', + name: 'dummy_event', + payload: {}, + metadata: { + originalTokens: 100, + currentTokens: 100, + transformations: [], + }, + }; } return { id, timestamp: Date.now(), trigger, - steps: toolSteps.map(step => ({ + steps: toolSteps.map((step) => ({ id: randomUUID(), type: 'TOOL_EXECUTION', toolName: step.toolName || 'test_tool', intent: step.intent, observation: step.observation, tokens: step.tokens || { intent: 50, observation: 50 }, - metadata: { originalTokens: 100, currentTokens: 100, transformations: [] }, + metadata: { + originalTokens: 100, + currentTokens: 100, + transformations: [], + }, })), }; } @@ -168,7 +189,10 @@ export function setupContextComponentTest(config: Config) { const registry = new ProcessorRegistry(); registerBuiltInProcessors(registry); const sidecar = SidecarLoader.fromConfig(config, registry); - const tracer = new ContextTracer({ targetDir: '/tmp', sessionId: 'test-session' }); + const tracer = new ContextTracer({ + targetDir: '/tmp', + sessionId: 'test-session', + }); const eventBus = new ContextEventBus(); const env = new ContextEnvironmentImpl( config.getBaseLlmClient(), @@ -178,9 +202,15 @@ export function setupContextComponentTest(config: Config) { '/tmp/gemini-test', tracer, 1, - eventBus + eventBus, + ); + const contextManager = ContextManager.create( + sidecar, + env, + tracer, + undefined, + registry, ); - const contextManager = ContextManager.create(sidecar, env, tracer, undefined, registry); // The async worker is now internally managed by ContextManager diff --git a/packages/core/src/context/tracer.test.ts b/packages/core/src/context/tracer.test.ts index b012893e53..11d602a963 100644 --- a/packages/core/src/context/tracer.test.ts +++ b/packages/core/src/context/tracer.test.ts @@ -3,6 +3,7 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + import { describe, it, expect, beforeEach, vi } from 'vitest'; import { ContextTracer } from './tracer.js'; import { InMemoryFileSystem } from './system/InMemoryFileSystem.js'; @@ -15,7 +16,7 @@ describe('ContextTracer (Fake FS & ID Gen)', () => { beforeEach(() => { fileSystem = new InMemoryFileSystem(); idGenerator = new DeterministicIdGenerator('mock-uuid-'); - + // We must mock Date.now() to ensure asset file names are perfectly deterministic vi.useFakeTimers(); vi.setSystemTime(new Date('2026-01-01T12:00:00Z')); @@ -25,47 +26,59 @@ describe('ContextTracer (Fake FS & ID Gen)', () => { const tracer = new ContextTracer( { enabled: true, targetDir: '/fake/target', sessionId: 'test-session' }, fileSystem, - idGenerator + idGenerator, ); // Verify Initialization - const initTraceLog = fileSystem.readFileSync('/fake/target/.gemini/context_trace/test-session/trace.log', 'utf8'); + const initTraceLog = fileSystem.readFileSync( + '/fake/target/.gemini/context_trace/test-session/trace.log', + 'utf8', + ); expect(initTraceLog).toContain('[SYSTEM] Context Tracer Initialized'); // Small logging: shouldn't trigger saveAsset tracer.logEvent('TestComponent', 'TestAction', { key: 'value' }); - - const smallTraceLog = fileSystem.readFileSync('/fake/target/.gemini/context_trace/test-session/trace.log', 'utf8'); + + const smallTraceLog = fileSystem.readFileSync( + '/fake/target/.gemini/context_trace/test-session/trace.log', + 'utf8', + ); expect(smallTraceLog).toContain('[TestComponent] TestAction'); expect(smallTraceLog).toContain('{"key":"value"}'); // Large logging: should trigger auto-asset save const hugeString = 'a'.repeat(2000); tracer.logEvent('TestComponent', 'LargeAction', { largeKey: hugeString }); - + // 1767268800000 is 2026-01-01T12:00:00Z - const expectedAssetPath = '/fake/target/.gemini/context_trace/test-session/assets/1767268800000-mock-uuid-1-largeKey.json'; - + const expectedAssetPath = + '/fake/target/.gemini/context_trace/test-session/assets/1767268800000-mock-uuid-1-largeKey.json'; + // Assert asset was written to FS expect(fileSystem.existsSync(expectedAssetPath)).toBe(true); - - const largeTraceLog = fileSystem.readFileSync('/fake/target/.gemini/context_trace/test-session/trace.log', 'utf8'); + + const largeTraceLog = fileSystem.readFileSync( + '/fake/target/.gemini/context_trace/test-session/trace.log', + 'utf8', + ); expect(largeTraceLog).toContain('[TestComponent] LargeAction'); - expect(largeTraceLog).toContain(`{"largeKey":{"$asset":"1767268800000-mock-uuid-1-largeKey.json"}}`); + expect(largeTraceLog).toContain( + `{"largeKey":{"$asset":"1767268800000-mock-uuid-1-largeKey.json"}}`, + ); }); it('silently ignores logging when disabled', () => { const tracer = new ContextTracer( { enabled: false, targetDir: '/fake/target', sessionId: 'test-session' }, fileSystem, - idGenerator + idGenerator, ); tracer.logEvent('TestComponent', 'TestAction'); - + const hugeString = 'a'.repeat(2000); tracer.logEvent('TestComponent', 'LargeAction', { largeKey: hugeString }); - + // FS should be completely empty expect(fileSystem.getFiles().size).toBe(0); }); diff --git a/packages/core/src/context/tracer.ts b/packages/core/src/context/tracer.ts index 38c0d27cc0..cf7299f0fa 100644 --- a/packages/core/src/context/tracer.ts +++ b/packages/core/src/context/tracer.ts @@ -28,19 +28,26 @@ export class ContextTracer { constructor( options: ContextTracerOptions, fileSystem: IFileSystem = new NodeFileSystem(), - idGenerator: IIdGenerator = new NodeIdGenerator() + idGenerator: IIdGenerator = new NodeIdGenerator(), ) { this.enabled = options.enabled ?? false; this.fileSystem = fileSystem; this.idGenerator = idGenerator; - this.traceDir = this.fileSystem.join(options.targetDir, '.gemini', 'context_trace', options.sessionId); + this.traceDir = this.fileSystem.join( + options.targetDir, + '.gemini', + 'context_trace', + options.sessionId, + ); this.assetsDir = this.fileSystem.join(this.traceDir, 'assets'); if (this.enabled) { try { this.fileSystem.mkdirSync(this.assetsDir, { recursive: true }); - this.logEvent('SYSTEM', 'Context Tracer Initialized', { sessionId: options.sessionId }); + this.logEvent('SYSTEM', 'Context Tracer Initialized', { + sessionId: options.sessionId, + }); } catch (e) { debugLogger.error('Failed to initialize ContextTracer', e); this.enabled = false; @@ -60,12 +67,13 @@ export class ContextTracer { if (details) { processedDetails = {}; for (const [key, value] of Object.entries(details)) { - const strValue = typeof value === 'string' ? value : JSON.stringify(value); + const strValue = + typeof value === 'string' ? value : JSON.stringify(value); if (strValue && strValue.length > this.MAX_INLINE_SIZE) { - const assetId = this.saveAsset(component, key, value); - processedDetails[key] = { $asset: assetId }; + const assetId = this.saveAsset(component, key, value); + processedDetails[key] = { $asset: assetId }; } else { - processedDetails[key] = value; + processedDetails[key] = value; } } } @@ -85,13 +93,21 @@ export class ContextTracer { } } - private saveAsset(component: string, assetName: string, data: unknown): string { + private saveAsset( + component: string, + assetName: string, + data: unknown, + ): string { if (!this.enabled) return 'asset-recording-disabled'; try { const assetId = `${Date.now()}-${this.idGenerator.generateId()}-${assetName}.json`; const assetPath = this.fileSystem.join(this.assetsDir, assetId); - this.fileSystem.writeFileSync(assetPath, JSON.stringify(data, null, 2), 'utf-8'); + this.fileSystem.writeFileSync( + assetPath, + JSON.stringify(data, null, 2), + 'utf-8', + ); this.logEvent(component, `Saved asset: ${assetName}`, { assetId }); return assetId; } catch (e) {