From 63e8b825a7ebb3f088f781d473dd813453e22645 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 7 Apr 2026 03:13:14 +0000 Subject: [PATCH] burndown --- .../src/context/contextManager.golden.test.ts | 4 +- packages/core/src/context/contextManager.ts | 19 ++++-- .../processors/blobDegradationProcessor.ts | 12 +++- .../processors/historySquashingProcessor.ts | 28 +++++++-- .../semanticCompressionProcessor.ts | 26 +++++++- .../processors/toolMaskingProcessor.ts | 26 +++++++- packages/core/src/context/sidecar/builtins.ts | 22 +++---- .../core/src/context/sidecar/orchestrator.ts | 4 +- packages/core/src/context/sidecar/registry.ts | 9 +-- packages/core/src/context/sidecar/types.ts | 2 +- .../context/system-tests/SimulationHarness.ts | 61 +++++++++---------- .../system-tests/lifecycle.golden.test.ts | 3 +- .../src/context/testing/contextTestUtils.ts | 2 +- 13 files changed, 145 insertions(+), 73 deletions(-) diff --git a/packages/core/src/context/contextManager.golden.test.ts b/packages/core/src/context/contextManager.golden.test.ts index 4b053b587f..d3d3d59354 100644 --- a/packages/core/src/context/contextManager.golden.test.ts +++ b/packages/core/src/context/contextManager.golden.test.ts @@ -83,7 +83,7 @@ describe('ContextManager Golden Tests', () => { 4, eventBus ); - contextManager = new ContextManager(sidecar, env, tracer); + contextManager = ContextManager.create(sidecar, env, tracer); }); const createLargeHistory = (): Content[] => [ @@ -144,7 +144,7 @@ describe('ContextManager Golden Tests', () => { 4, eventBus2 ); - contextManager = new ContextManager( + contextManager = ContextManager.create( { budget: { retainedTokens: 100000, maxTokens: 150000 }, pipelines: [], diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index ea0f1ea4c5..ec276d7dfc 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -44,18 +44,25 @@ export class ContextManager { private pristineEpisodes: Episode[] = []; private readonly eventBus: ContextEventBus; - // Internal sub-components // Synchronous processors are instantiated but effectively used as singletons within this class private orchestrator: PipelineOrchestrator; private historyObserver?: HistoryObserver; - - - constructor(private sidecar: SidecarConfig, private env: ContextEnvironment, private readonly tracer: ContextTracer) { + static create(sidecar: SidecarConfig, env: ContextEnvironment, tracer: ContextTracer, orchestrator?: PipelineOrchestrator): ContextManager { + const orch = orchestrator || new PipelineOrchestrator(sidecar, env, env.eventBus, tracer); + return new ContextManager(sidecar, env, tracer, orch); + } + + // Use ContextManager.create() instead + private constructor( + private sidecar: SidecarConfig, + private env: ContextEnvironment, + private readonly tracer: ContextTracer, + orchestrator: PipelineOrchestrator + ) { this.eventBus = env.eventBus; - - this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer); + this.orchestrator = orchestrator; this.eventBus.onPristineHistoryUpdated((event) => { this.pristineEpisodes = event.episodes; diff --git a/packages/core/src/context/processors/blobDegradationProcessor.ts b/packages/core/src/context/processors/blobDegradationProcessor.ts index 452a6a854f..b46a081b4a 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.ts @@ -11,11 +11,19 @@ import type { Part } from '@google/genai'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; +export type BlobDegradationProcessorOptions = Record; + export class BlobDegradationProcessor implements ContextProcessor { - readonly name = 'BlobDegradation'; + static create(env: ContextEnvironment, _options: BlobDegradationProcessorOptions): BlobDegradationProcessor { + return new BlobDegradationProcessor(env); + } + + readonly id = 'BlobDegradationProcessor'; + readonly name = 'BlobDegradationProcessor'; + readonly options = {}; private env: ContextEnvironment; - constructor(env: ContextEnvironment, _options: Record = {}) { + constructor(env: ContextEnvironment) { this.env = env; } diff --git a/packages/core/src/context/processors/historySquashingProcessor.ts b/packages/core/src/context/processors/historySquashingProcessor.ts index 8e187f57ae..97d8c2f7de 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.ts @@ -9,11 +9,31 @@ import type { ContextEnvironment } from '../sidecar/environment.js'; import { truncateProportionally } from '../truncation.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; -export class HistorySquashingProcessor implements ContextProcessor { - readonly name = 'HistorySquashing'; - private options: { maxTokensPerNode: number }; +export interface HistorySquashingProcessorOptions { + maxTokensPerNode: number; +} - constructor(env: ContextEnvironment, options: { maxTokensPerNode: number }) { +export class HistorySquashingProcessor implements ContextProcessor { + static create(env: ContextEnvironment, options: HistorySquashingProcessorOptions): HistorySquashingProcessor { + return new HistorySquashingProcessor(env, options); + } + + static readonly schema = { + type: 'object', + properties: { + maxTokensPerNode: { + type: 'number', + description: 'The maximum tokens a node can have before being truncated.', + }, + }, + required: ['maxTokensPerNode'], + }; + + readonly id = 'HistorySquashingProcessor'; + readonly name = 'HistorySquashingProcessor'; + readonly options: HistorySquashingProcessorOptions; + + constructor(env: ContextEnvironment, options: HistorySquashingProcessorOptions) { this.options = options; } diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.ts b/packages/core/src/context/processors/semanticCompressionProcessor.ts index 5e05c6e625..685ea9a34d 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.ts @@ -13,15 +13,35 @@ import { getResponseText } from '../../utils/partUtils.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; +export interface SemanticCompressionProcessorOptions { + nodeThresholdTokens: number; +} + export class SemanticCompressionProcessor implements ContextProcessor { - readonly name = 'SemanticCompression'; + static create(env: ContextEnvironment, options: SemanticCompressionProcessorOptions): SemanticCompressionProcessor { + return new SemanticCompressionProcessor(env, options); + } + + static readonly schema = { + type: 'object', + properties: { + nodeThresholdTokens: { + type: 'number', + description: 'The token threshold above which nodes are summarized.', + }, + }, + required: ['nodeThresholdTokens'], + }; + + readonly id = 'SemanticCompressionProcessor'; + readonly name = 'SemanticCompressionProcessor'; + readonly options: SemanticCompressionProcessorOptions; private env: ContextEnvironment; - private options: { nodeThresholdTokens: number }; private modelToUse: string = 'chat-compression-2.5-flash-lite'; constructor( env: ContextEnvironment, - options: { nodeThresholdTokens: number }, + options: SemanticCompressionProcessorOptions, ) { this.env = env; this.options = options; diff --git a/packages/core/src/context/processors/toolMaskingProcessor.ts b/packages/core/src/context/processors/toolMaskingProcessor.ts index e59d005bc4..5250364b7a 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.ts @@ -26,14 +26,34 @@ const UNMASKABLE_TOOLS = new Set([ import type { EpisodeEditor } from '../ir/episodeEditor.js'; +export interface ToolMaskingProcessorOptions { + stringLengthThresholdTokens: number; +} + export class ToolMaskingProcessor implements ContextProcessor { - readonly name = 'ToolMasking'; - private options: { stringLengthThresholdTokens: number }; + static create(env: ContextEnvironment, options: ToolMaskingProcessorOptions): ToolMaskingProcessor { + return new ToolMaskingProcessor(env, options); + } + + static readonly schema = { + type: 'object', + properties: { + stringLengthThresholdTokens: { + type: 'number', + description: 'The token threshold above which tool intents/observations are masked.', + }, + }, + required: ['stringLengthThresholdTokens'], + }; + + readonly id = 'ToolMaskingProcessor'; + readonly name = 'ToolMaskingProcessor'; + readonly options: ToolMaskingProcessorOptions; private env: ContextEnvironment; constructor( env: ContextEnvironment, - options: { stringLengthThresholdTokens: number }, + options: ToolMaskingProcessorOptions, ) { this.env = env; this.options = options; diff --git a/packages/core/src/context/sidecar/builtins.ts b/packages/core/src/context/sidecar/builtins.ts index dd6a78c84c..7b8bc80fad 100644 --- a/packages/core/src/context/sidecar/builtins.ts +++ b/packages/core/src/context/sidecar/builtins.ts @@ -5,15 +5,15 @@ */ import { ProcessorRegistry } from './registry.js'; -import { ToolMaskingProcessor } from '../processors/toolMaskingProcessor.js'; +import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js'; import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js'; -import { SemanticCompressionProcessor } from '../processors/semanticCompressionProcessor.js'; -import { HistorySquashingProcessor } from '../processors/historySquashingProcessor.js'; -import { StateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js'; -import { EmergencyTruncationProcessor } from '../processors/emergencyTruncationProcessor.js'; +import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js'; +import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from '../processors/historySquashingProcessor.js'; +import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js'; +import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js'; export function registerBuiltInProcessors() { - ProcessorRegistry.register({ + ProcessorRegistry.register({ id: 'ToolMaskingProcessor', schema: { type: 'object', @@ -30,7 +30,7 @@ export function registerBuiltInProcessors() { create: (env, opts) => new ToolMaskingProcessor(env, opts) }); - ProcessorRegistry.register({ + ProcessorRegistry.register>({ id: 'BlobDegradationProcessor', schema: { type: 'object', @@ -43,7 +43,7 @@ export function registerBuiltInProcessors() { create: (env) => new BlobDegradationProcessor(env) }); - ProcessorRegistry.register({ + ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', schema: { type: 'object', @@ -60,7 +60,7 @@ export function registerBuiltInProcessors() { create: (env, opts) => new SemanticCompressionProcessor(env, opts) }); - ProcessorRegistry.register({ + ProcessorRegistry.register({ id: 'HistorySquashingProcessor', schema: { type: 'object', @@ -77,7 +77,7 @@ export function registerBuiltInProcessors() { create: (env, opts) => new HistorySquashingProcessor(env, opts) }); - ProcessorRegistry.register({ + ProcessorRegistry.register({ id: 'StateSnapshotProcessor', schema: { type: 'object', @@ -97,7 +97,7 @@ export function registerBuiltInProcessors() { create: (env, opts) => StateSnapshotProcessor.create(env, opts) }); - ProcessorRegistry.register({ + ProcessorRegistry.register({ id: 'EmergencyTruncationProcessor', schema: { type: 'object', diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index e2d1714112..add63fd603 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -39,7 +39,7 @@ export class PipelineOrchestrator { } // The Orchestrator injects standard dependencies required by processors // If a processor needs the eventBus (like Snapshot), it expects it via constructor. - const instance = processorClass.create(this.env, procDef.options); + const instance = processorClass.create(this.env, procDef.options ?? {}); this.instantiatedProcessors.set(procDef.processorId, instance); } } @@ -171,7 +171,7 @@ export class PipelineOrchestrator { type: 'snapshot', episode: ep, recoveredTokens: ep.yield?.metadata?.currentTokens || 10, - replacedEpisodeIds: mutation.originalIds, + replacedEpisodeIds: mutation.originalIds || [], } : { status: 'ready', type: vType, diff --git a/packages/core/src/context/sidecar/registry.ts b/packages/core/src/context/sidecar/registry.ts index 5c82aa7c8a..0b5abad7dd 100644 --- a/packages/core/src/context/sidecar/registry.ts +++ b/packages/core/src/context/sidecar/registry.ts @@ -7,10 +7,7 @@ import type { ContextProcessor } from '../pipeline.js'; import type { ContextEnvironment } from './environment.js'; - -export interface ContextProcessorDef< - TOptions extends Record = Record, -> { +export interface ContextProcessorDef { readonly id: string; readonly schema?: object; create( @@ -23,9 +20,9 @@ export interface ContextProcessorDef< * Registry for mapping declarative sidecar configs to running Processor instances. */ export class ProcessorRegistry { - private static processors = new Map(); + private static processors = new Map>(); - static register(def: ContextProcessorDef) { + static register(def: ContextProcessorDef) { this.processors.set(def.id, def); } diff --git a/packages/core/src/context/sidecar/types.ts b/packages/core/src/context/sidecar/types.ts index 0a2fd9a2dc..37afc9004d 100644 --- a/packages/core/src/context/sidecar/types.ts +++ b/packages/core/src/context/sidecar/types.ts @@ -11,7 +11,7 @@ import type { StateSnapshotProcessorOptions } from '../processors/stateSnapshotP */ export type ProcessorConfig = | { processorId: 'ToolMaskingProcessor'; options: { stringLengthThresholdTokens: number } } - | { processorId: 'BlobDegradationProcessor'; options?: Record } + | { processorId: 'BlobDegradationProcessor'; options?: object } | { processorId: 'SemanticCompressionProcessor'; options: { nodeThresholdTokens: number } } | { processorId: 'HistorySquashingProcessor'; options: { maxTokensPerNode: number } } | { processorId: 'StateSnapshotProcessor'; options: StateSnapshotProcessorOptions } diff --git a/packages/core/src/context/system-tests/SimulationHarness.ts b/packages/core/src/context/system-tests/SimulationHarness.ts index ed9e464727..e968f62e61 100644 --- a/packages/core/src/context/system-tests/SimulationHarness.ts +++ b/packages/core/src/context/system-tests/SimulationHarness.ts @@ -1,3 +1,9 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import type { BaseLlmClient } from "../../core/baseLlmClient.js"; /** * @license * Copyright 2026 Google LLC @@ -11,14 +17,9 @@ import type { SidecarConfig } from '../sidecar/types.js'; import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js'; import { ContextTracer } from '../tracer.js'; import { ContextEventBus } from '../eventBus.js'; - -import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js'; -import { ToolMaskingProcessor } from '../processors/toolMaskingProcessor.js'; -import { HistorySquashingProcessor } from '../processors/historySquashingProcessor.js'; -import { SemanticCompressionProcessor } from '../processors/semanticCompressionProcessor.js'; -import { StateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js'; -import { EmergencyTruncationProcessor } from '../processors/emergencyTruncationProcessor.js'; -import { ProcessorRegistry } from '../sidecar/registry.js'; +import { PipelineOrchestrator } from '../sidecar/orchestrator.js'; +import { registerBuiltInProcessors } from '../sidecar/builtins.js'; +import { debugLogger } from "../../utils/debugLogger.js"; export interface TurnSummary { turnIndex: number; @@ -29,13 +30,15 @@ export interface TurnSummary { export class SimulationHarness { readonly chatHistory: AgentChatHistory; contextManager!: ContextManager; + env!: ContextEnvironmentImpl; + orchestrator!: PipelineOrchestrator; readonly eventBus: ContextEventBus; config!: SidecarConfig; private tracer!: ContextTracer; private currentTurnIndex = 0; private tokenTrajectory: TurnSummary[] = []; - static async create(config: SidecarConfig, mockLlmClient: any, mockTempDir = '/tmp/sim'): Promise { + static async create(config: SidecarConfig, mockLlmClient: BaseLlmClient, mockTempDir = '/tmp/sim'): Promise { const harness = new SimulationHarness(); await harness.init(config, mockLlmClient, mockTempDir); return harness; @@ -48,26 +51,20 @@ export class SimulationHarness { private async init( config: SidecarConfig, - mockLlmClient: any, + mockLlmClient: BaseLlmClient, mockTempDir: string ) { this.config = config; // Register all standard processors - ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) }); - ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts) }); - ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts) }); - ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts) }); - ProcessorRegistry.register({ id: 'StateSnapshotProcessor', create: (env, opts) => new StateSnapshotProcessor(env, opts, env.eventBus) }); - ProcessorRegistry.register({ id: 'EmergencyTruncationProcessor', create: (env, opts) => new EmergencyTruncationProcessor(env, opts) }); + registerBuiltInProcessors(); - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (this as any).tracer = new ContextTracer({ targetDir: mockTempDir, sessionId: 'sim-session' }); + this.tracer = new ContextTracer({ targetDir: mockTempDir, sessionId: 'sim-session' }); // Using real token calculator instead of mock, so we test actual string sizes const InMemoryFS = (await import('../system/InMemoryFileSystem.js')).InMemoryFileSystem; const DetIdGen = (await import('../system/DeterministicIdGenerator.js')).DeterministicIdGenerator; - const env = new ContextEnvironmentImpl( + this.env = new ContextEnvironmentImpl( mockLlmClient, 'sim-prompt', 'sim-session', @@ -80,7 +77,8 @@ export class SimulationHarness { new DetIdGen() ); - this.contextManager = new ContextManager(config, env, this.tracer); + this.orchestrator = new PipelineOrchestrator(config, this.env, this.eventBus, this.tracer); + this.contextManager = ContextManager.create(config, this.env, this.tracer, this.orchestrator); this.contextManager.subscribeToHistory(this.chatHistory); } @@ -91,32 +89,34 @@ export class SimulationHarness { async simulateTurn(messages: Content[]) { // 1. Append the new messages const currentHistory = this.chatHistory.get(); - await this.chatHistory.set([...currentHistory, ...messages]); + this.chatHistory.set([...currentHistory, ...messages]); // 2. Measure tokens immediately after append (Before background processing) - const tokensBefore = (this.contextManager as any).env.tokenCalculator.calculateEpisodeListTokens( + const tokensBefore = this.env.tokenCalculator.calculateEpisodeListTokens( this.contextManager.getWorkingBufferView() ); - console.log(`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`); + debugLogger.log(`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`); // 3. Yield to event loop to allow internal async subscribers and orchestrator to finish await new Promise(resolve => setTimeout(resolve, 50)); // 3.1 Simulate what projectCompressedHistory does with the sync handlers let currentView = this.contextManager.getWorkingBufferView(); - const currentTokens = (this.contextManager as any).env.tokenCalculator.calculateEpisodeListTokens(currentView); + const currentTokens = this.env.tokenCalculator.calculateEpisodeListTokens(currentView); if (this.config.budget && currentTokens > this.config.budget.maxTokens) { - console.log(`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.budget.maxTokens}`); + debugLogger.log(`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.budget.maxTokens}`); const syncPipelines = this.config.pipelines.filter(p => p.execution === 'blocking'); - const orchestrator = (this.contextManager as any).orchestrator; + const orchestrator = this.orchestrator; for (const pipe of syncPipelines) { - currentView = await orchestrator.executePipeline(pipe.name, currentView, { + await orchestrator.executePipeline(pipe.name, currentView, { currentTokens, maxTokens: this.config.budget.maxTokens, retainedTokens: this.config.budget.retainedTokens, + isBudgetSatisfied: false, deficitTokens: currentTokens - this.config.budget.maxTokens, protectedEpisodeIds: new Set() }); + currentView = this.contextManager.getWorkingBufferView(); } // Inject the truncated view back into the graph @@ -126,13 +126,12 @@ export class SimulationHarness { this.eventBus.emitVariantReady({ targetId: ep.id, variantId: 'v-emergency', - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion variant: { status: 'ready', type: 'masked', // Truncation is technically a mask text: ep.yield?.text || '', recoveredTokens: 0, - } as any + } }); } } @@ -141,10 +140,10 @@ export class SimulationHarness { } // 4. Measure tokens after background processors have (hopefully) emitted variants - const tokensAfter = (this.contextManager as any).env.tokenCalculator.calculateEpisodeListTokens( + const tokensAfter = this.env.tokenCalculator.calculateEpisodeListTokens( this.contextManager.getWorkingBufferView() ); - console.log(`[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`); + debugLogger.log(`[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`); this.tokenTrajectory.push({ turnIndex: this.currentTurnIndex++, diff --git a/packages/core/src/context/system-tests/lifecycle.golden.test.ts b/packages/core/src/context/system-tests/lifecycle.golden.test.ts index e916fe7641..c78f2331ee 100644 --- a/packages/core/src/context/system-tests/lifecycle.golden.test.ts +++ b/packages/core/src/context/system-tests/lifecycle.golden.test.ts @@ -7,6 +7,7 @@ import { describe, it, expect, vi, beforeAll, afterAll } from 'vitest'; import { SimulationHarness } from './SimulationHarness.js'; import type { SidecarConfig } from '../sidecar/types.js'; +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; expect.addSnapshotSerializer({ test: (val) => @@ -54,7 +55,7 @@ describe('System Lifecycle Golden Tests', () => { generateContent: vi.fn().mockResolvedValue({ text: '', }) - }; + } as unknown as BaseLlmClient; it('Scenario 1: Organic Growth with Huge Tool Output & Images', async () => { const harness = await SimulationHarness.create(getAggressiveConfig(), mockLlmClient); diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index a1c8a32d41..bf3830f682 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -165,7 +165,7 @@ export function setupContextComponentTest(config: Config) { 1, eventBus ); - const contextManager = new ContextManager(sidecar, env, tracer); + const contextManager = ContextManager.create(sidecar, env, tracer); // The async worker is now internally managed by ContextManager