This commit is contained in:
Your Name
2026-04-07 03:13:14 +00:00
parent 61dacecacf
commit 63e8b825a7
13 changed files with 145 additions and 73 deletions
@@ -83,7 +83,7 @@ describe('ContextManager Golden Tests', () => {
4,
eventBus
);
contextManager = new ContextManager(sidecar, env, tracer);
contextManager = ContextManager.create(sidecar, env, tracer);
});
const createLargeHistory = (): Content[] => [
@@ -144,7 +144,7 @@ describe('ContextManager Golden Tests', () => {
4,
eventBus2
);
contextManager = new ContextManager(
contextManager = ContextManager.create(
{
budget: { retainedTokens: 100000, maxTokens: 150000 },
pipelines: [],
+13 -6
View File
@@ -44,18 +44,25 @@ export class ContextManager {
private pristineEpisodes: Episode[] = [];
private readonly eventBus: ContextEventBus;
// Internal sub-components
// Synchronous processors are instantiated but effectively used as singletons within this class
private orchestrator: PipelineOrchestrator;
private historyObserver?: HistoryObserver;
constructor(private sidecar: SidecarConfig, private env: ContextEnvironment, private readonly tracer: ContextTracer) {
static create(sidecar: SidecarConfig, env: ContextEnvironment, tracer: ContextTracer, orchestrator?: PipelineOrchestrator): ContextManager {
const orch = orchestrator || new PipelineOrchestrator(sidecar, env, env.eventBus, tracer);
return new ContextManager(sidecar, env, tracer, orch);
}
// Use ContextManager.create() instead
private constructor(
private sidecar: SidecarConfig,
private env: ContextEnvironment,
private readonly tracer: ContextTracer,
orchestrator: PipelineOrchestrator
) {
this.eventBus = env.eventBus;
this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer);
this.orchestrator = orchestrator;
this.eventBus.onPristineHistoryUpdated((event) => {
this.pristineEpisodes = event.episodes;
@@ -11,11 +11,19 @@ import type { Part } from '@google/genai';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export type BlobDegradationProcessorOptions = Record<string, never>;
export class BlobDegradationProcessor implements ContextProcessor {
readonly name = 'BlobDegradation';
static create(env: ContextEnvironment, _options: BlobDegradationProcessorOptions): BlobDegradationProcessor {
return new BlobDegradationProcessor(env);
}
readonly id = 'BlobDegradationProcessor';
readonly name = 'BlobDegradationProcessor';
readonly options = {};
private env: ContextEnvironment;
constructor(env: ContextEnvironment, _options: Record<string, unknown> = {}) {
constructor(env: ContextEnvironment) {
this.env = env;
}
@@ -9,11 +9,31 @@ import type { ContextEnvironment } from '../sidecar/environment.js';
import { truncateProportionally } from '../truncation.js';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export class HistorySquashingProcessor implements ContextProcessor {
readonly name = 'HistorySquashing';
private options: { maxTokensPerNode: number };
export interface HistorySquashingProcessorOptions {
maxTokensPerNode: number;
}
constructor(env: ContextEnvironment, options: { maxTokensPerNode: number }) {
export class HistorySquashingProcessor implements ContextProcessor {
static create(env: ContextEnvironment, options: HistorySquashingProcessorOptions): HistorySquashingProcessor {
return new HistorySquashingProcessor(env, options);
}
static readonly schema = {
type: 'object',
properties: {
maxTokensPerNode: {
type: 'number',
description: 'The maximum tokens a node can have before being truncated.',
},
},
required: ['maxTokensPerNode'],
};
readonly id = 'HistorySquashingProcessor';
readonly name = 'HistorySquashingProcessor';
readonly options: HistorySquashingProcessorOptions;
constructor(env: ContextEnvironment, options: HistorySquashingProcessorOptions) {
this.options = options;
}
@@ -13,15 +13,35 @@ import { getResponseText } from '../../utils/partUtils.js';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export interface SemanticCompressionProcessorOptions {
nodeThresholdTokens: number;
}
export class SemanticCompressionProcessor implements ContextProcessor {
readonly name = 'SemanticCompression';
static create(env: ContextEnvironment, options: SemanticCompressionProcessorOptions): SemanticCompressionProcessor {
return new SemanticCompressionProcessor(env, options);
}
static readonly schema = {
type: 'object',
properties: {
nodeThresholdTokens: {
type: 'number',
description: 'The token threshold above which nodes are summarized.',
},
},
required: ['nodeThresholdTokens'],
};
readonly id = 'SemanticCompressionProcessor';
readonly name = 'SemanticCompressionProcessor';
readonly options: SemanticCompressionProcessorOptions;
private env: ContextEnvironment;
private options: { nodeThresholdTokens: number };
private modelToUse: string = 'chat-compression-2.5-flash-lite';
constructor(
env: ContextEnvironment,
options: { nodeThresholdTokens: number },
options: SemanticCompressionProcessorOptions,
) {
this.env = env;
this.options = options;
@@ -26,14 +26,34 @@ const UNMASKABLE_TOOLS = new Set([
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export interface ToolMaskingProcessorOptions {
stringLengthThresholdTokens: number;
}
export class ToolMaskingProcessor implements ContextProcessor {
readonly name = 'ToolMasking';
private options: { stringLengthThresholdTokens: number };
static create(env: ContextEnvironment, options: ToolMaskingProcessorOptions): ToolMaskingProcessor {
return new ToolMaskingProcessor(env, options);
}
static readonly schema = {
type: 'object',
properties: {
stringLengthThresholdTokens: {
type: 'number',
description: 'The token threshold above which tool intents/observations are masked.',
},
},
required: ['stringLengthThresholdTokens'],
};
readonly id = 'ToolMaskingProcessor';
readonly name = 'ToolMaskingProcessor';
readonly options: ToolMaskingProcessorOptions;
private env: ContextEnvironment;
constructor(
env: ContextEnvironment,
options: { stringLengthThresholdTokens: number },
options: ToolMaskingProcessorOptions,
) {
this.env = env;
this.options = options;
+11 -11
View File
@@ -5,15 +5,15 @@
*/
import { ProcessorRegistry } from './registry.js';
import { ToolMaskingProcessor } from '../processors/toolMaskingProcessor.js';
import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js';
import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js';
import { SemanticCompressionProcessor } from '../processors/semanticCompressionProcessor.js';
import { HistorySquashingProcessor } from '../processors/historySquashingProcessor.js';
import { StateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js';
import { EmergencyTruncationProcessor } from '../processors/emergencyTruncationProcessor.js';
import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js';
import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from '../processors/historySquashingProcessor.js';
import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js';
import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js';
export function registerBuiltInProcessors() {
ProcessorRegistry.register({
ProcessorRegistry.register<ToolMaskingProcessorOptions>({
id: 'ToolMaskingProcessor',
schema: {
type: 'object',
@@ -30,7 +30,7 @@ export function registerBuiltInProcessors() {
create: (env, opts) => new ToolMaskingProcessor(env, opts)
});
ProcessorRegistry.register({
ProcessorRegistry.register<Record<string, never>>({
id: 'BlobDegradationProcessor',
schema: {
type: 'object',
@@ -43,7 +43,7 @@ export function registerBuiltInProcessors() {
create: (env) => new BlobDegradationProcessor(env)
});
ProcessorRegistry.register({
ProcessorRegistry.register<SemanticCompressionProcessorOptions>({
id: 'SemanticCompressionProcessor',
schema: {
type: 'object',
@@ -60,7 +60,7 @@ export function registerBuiltInProcessors() {
create: (env, opts) => new SemanticCompressionProcessor(env, opts)
});
ProcessorRegistry.register({
ProcessorRegistry.register<HistorySquashingProcessorOptions>({
id: 'HistorySquashingProcessor',
schema: {
type: 'object',
@@ -77,7 +77,7 @@ export function registerBuiltInProcessors() {
create: (env, opts) => new HistorySquashingProcessor(env, opts)
});
ProcessorRegistry.register({
ProcessorRegistry.register<StateSnapshotProcessorOptions>({
id: 'StateSnapshotProcessor',
schema: {
type: 'object',
@@ -97,7 +97,7 @@ export function registerBuiltInProcessors() {
create: (env, opts) => StateSnapshotProcessor.create(env, opts)
});
ProcessorRegistry.register({
ProcessorRegistry.register<EmergencyTruncationProcessorOptions>({
id: 'EmergencyTruncationProcessor',
schema: {
type: 'object',
@@ -39,7 +39,7 @@ export class PipelineOrchestrator {
}
// The Orchestrator injects standard dependencies required by processors
// If a processor needs the eventBus (like Snapshot), it expects it via constructor.
const instance = processorClass.create(this.env, procDef.options);
const instance = processorClass.create(this.env, procDef.options ?? {});
this.instantiatedProcessors.set(procDef.processorId, instance);
}
}
@@ -171,7 +171,7 @@ export class PipelineOrchestrator {
type: 'snapshot',
episode: ep,
recoveredTokens: ep.yield?.metadata?.currentTokens || 10,
replacedEpisodeIds: mutation.originalIds,
replacedEpisodeIds: mutation.originalIds || [],
} : {
status: 'ready',
type: vType,
@@ -7,10 +7,7 @@
import type { ContextProcessor } from '../pipeline.js';
import type { ContextEnvironment } from './environment.js';
export interface ContextProcessorDef<
TOptions extends Record<string, unknown> = Record<string, unknown>,
> {
export interface ContextProcessorDef<TOptions = object> {
readonly id: string;
readonly schema?: object;
create(
@@ -23,9 +20,9 @@ export interface ContextProcessorDef<
* Registry for mapping declarative sidecar configs to running Processor instances.
*/
export class ProcessorRegistry {
private static processors = new Map<string, ContextProcessorDef>();
private static processors = new Map<string, ContextProcessorDef<unknown>>();
static register(def: ContextProcessorDef) {
static register<TOptions>(def: ContextProcessorDef<TOptions>) {
this.processors.set(def.id, def);
}
+1 -1
View File
@@ -11,7 +11,7 @@ import type { StateSnapshotProcessorOptions } from '../processors/stateSnapshotP
*/
export type ProcessorConfig =
| { processorId: 'ToolMaskingProcessor'; options: { stringLengthThresholdTokens: number } }
| { processorId: 'BlobDegradationProcessor'; options?: Record<string, unknown> }
| { processorId: 'BlobDegradationProcessor'; options?: object }
| { processorId: 'SemanticCompressionProcessor'; options: { nodeThresholdTokens: number } }
| { processorId: 'HistorySquashingProcessor'; options: { maxTokensPerNode: number } }
| { processorId: 'StateSnapshotProcessor'; options: StateSnapshotProcessorOptions }
@@ -1,3 +1,9 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { BaseLlmClient } from "../../core/baseLlmClient.js";
/**
* @license
* Copyright 2026 Google LLC
@@ -11,14 +17,9 @@ import type { SidecarConfig } from '../sidecar/types.js';
import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js';
import { ContextTracer } from '../tracer.js';
import { ContextEventBus } from '../eventBus.js';
import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js';
import { ToolMaskingProcessor } from '../processors/toolMaskingProcessor.js';
import { HistorySquashingProcessor } from '../processors/historySquashingProcessor.js';
import { SemanticCompressionProcessor } from '../processors/semanticCompressionProcessor.js';
import { StateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js';
import { EmergencyTruncationProcessor } from '../processors/emergencyTruncationProcessor.js';
import { ProcessorRegistry } from '../sidecar/registry.js';
import { PipelineOrchestrator } from '../sidecar/orchestrator.js';
import { registerBuiltInProcessors } from '../sidecar/builtins.js';
import { debugLogger } from "../../utils/debugLogger.js";
export interface TurnSummary {
turnIndex: number;
@@ -29,13 +30,15 @@ export interface TurnSummary {
export class SimulationHarness {
readonly chatHistory: AgentChatHistory;
contextManager!: ContextManager;
env!: ContextEnvironmentImpl;
orchestrator!: PipelineOrchestrator;
readonly eventBus: ContextEventBus;
config!: SidecarConfig;
private tracer!: ContextTracer;
private currentTurnIndex = 0;
private tokenTrajectory: TurnSummary[] = [];
static async create(config: SidecarConfig, mockLlmClient: any, mockTempDir = '/tmp/sim'): Promise<SimulationHarness> {
static async create(config: SidecarConfig, mockLlmClient: BaseLlmClient, mockTempDir = '/tmp/sim'): Promise<SimulationHarness> {
const harness = new SimulationHarness();
await harness.init(config, mockLlmClient, mockTempDir);
return harness;
@@ -48,26 +51,20 @@ export class SimulationHarness {
private async init(
config: SidecarConfig,
mockLlmClient: any,
mockLlmClient: BaseLlmClient,
mockTempDir: string
) {
this.config = config;
// Register all standard processors
ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) });
ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts) });
ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts) });
ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts) });
ProcessorRegistry.register({ id: 'StateSnapshotProcessor', create: (env, opts) => new StateSnapshotProcessor(env, opts, env.eventBus) });
ProcessorRegistry.register({ id: 'EmergencyTruncationProcessor', create: (env, opts) => new EmergencyTruncationProcessor(env, opts) });
registerBuiltInProcessors();
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
(this as any).tracer = new ContextTracer({ targetDir: mockTempDir, sessionId: 'sim-session' });
this.tracer = new ContextTracer({ targetDir: mockTempDir, sessionId: 'sim-session' });
// Using real token calculator instead of mock, so we test actual string sizes
const InMemoryFS = (await import('../system/InMemoryFileSystem.js')).InMemoryFileSystem;
const DetIdGen = (await import('../system/DeterministicIdGenerator.js')).DeterministicIdGenerator;
const env = new ContextEnvironmentImpl(
this.env = new ContextEnvironmentImpl(
mockLlmClient,
'sim-prompt',
'sim-session',
@@ -80,7 +77,8 @@ export class SimulationHarness {
new DetIdGen()
);
this.contextManager = new ContextManager(config, env, this.tracer);
this.orchestrator = new PipelineOrchestrator(config, this.env, this.eventBus, this.tracer);
this.contextManager = ContextManager.create(config, this.env, this.tracer, this.orchestrator);
this.contextManager.subscribeToHistory(this.chatHistory);
}
@@ -91,32 +89,34 @@ export class SimulationHarness {
async simulateTurn(messages: Content[]) {
// 1. Append the new messages
const currentHistory = this.chatHistory.get();
await this.chatHistory.set([...currentHistory, ...messages]);
this.chatHistory.set([...currentHistory, ...messages]);
// 2. Measure tokens immediately after append (Before background processing)
const tokensBefore = (this.contextManager as any).env.tokenCalculator.calculateEpisodeListTokens(
const tokensBefore = this.env.tokenCalculator.calculateEpisodeListTokens(
this.contextManager.getWorkingBufferView()
);
console.log(`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`);
debugLogger.log(`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`);
// 3. Yield to event loop to allow internal async subscribers and orchestrator to finish
await new Promise(resolve => setTimeout(resolve, 50));
// 3.1 Simulate what projectCompressedHistory does with the sync handlers
let currentView = this.contextManager.getWorkingBufferView();
const currentTokens = (this.contextManager as any).env.tokenCalculator.calculateEpisodeListTokens(currentView);
const currentTokens = this.env.tokenCalculator.calculateEpisodeListTokens(currentView);
if (this.config.budget && currentTokens > this.config.budget.maxTokens) {
console.log(`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.budget.maxTokens}`);
debugLogger.log(`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.budget.maxTokens}`);
const syncPipelines = this.config.pipelines.filter(p => p.execution === 'blocking');
const orchestrator = (this.contextManager as any).orchestrator;
const orchestrator = this.orchestrator;
for (const pipe of syncPipelines) {
currentView = await orchestrator.executePipeline(pipe.name, currentView, {
await orchestrator.executePipeline(pipe.name, currentView, {
currentTokens,
maxTokens: this.config.budget.maxTokens,
retainedTokens: this.config.budget.retainedTokens,
isBudgetSatisfied: false,
deficitTokens: currentTokens - this.config.budget.maxTokens,
protectedEpisodeIds: new Set()
});
currentView = this.contextManager.getWorkingBufferView();
}
// Inject the truncated view back into the graph
@@ -126,13 +126,12 @@ export class SimulationHarness {
this.eventBus.emitVariantReady({
targetId: ep.id,
variantId: 'v-emergency',
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
variant: {
status: 'ready',
type: 'masked', // Truncation is technically a mask
text: ep.yield?.text || '',
recoveredTokens: 0,
} as any
}
});
}
}
@@ -141,10 +140,10 @@ export class SimulationHarness {
}
// 4. Measure tokens after background processors have (hopefully) emitted variants
const tokensAfter = (this.contextManager as any).env.tokenCalculator.calculateEpisodeListTokens(
const tokensAfter = this.env.tokenCalculator.calculateEpisodeListTokens(
this.contextManager.getWorkingBufferView()
);
console.log(`[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`);
debugLogger.log(`[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`);
this.tokenTrajectory.push({
turnIndex: this.currentTurnIndex++,
@@ -7,6 +7,7 @@
import { describe, it, expect, vi, beforeAll, afterAll } from 'vitest';
import { SimulationHarness } from './SimulationHarness.js';
import type { SidecarConfig } from '../sidecar/types.js';
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
expect.addSnapshotSerializer({
test: (val) =>
@@ -54,7 +55,7 @@ describe('System Lifecycle Golden Tests', () => {
generateContent: vi.fn().mockResolvedValue({
text: '<MOCKED_STATE_SNAPSHOT_SUMMARY>',
})
};
} as unknown as BaseLlmClient;
it('Scenario 1: Organic Growth with Huge Tool Output & Images', async () => {
const harness = await SimulationHarness.create(getAggressiveConfig(), mockLlmClient);
@@ -165,7 +165,7 @@ export function setupContextComponentTest(config: Config) {
1,
eventBus
);
const contextManager = new ContextManager(sidecar, env, tracer);
const contextManager = ContextManager.create(sidecar, env, tracer);
// The async worker is now internally managed by ContextManager