diff --git a/packages/core/src/context/contextManager.golden.test.ts b/packages/core/src/context/contextManager.golden.test.ts index f0de3635b8..a46bec209d 100644 --- a/packages/core/src/context/contextManager.golden.test.ts +++ b/packages/core/src/context/contextManager.golden.test.ts @@ -71,6 +71,7 @@ describe('ContextManager Golden Tests', () => { const tracer = new ContextTracer('/tmp', 'test-session'); const env = new ContextEnvironmentImpl( {} as any, + 'test-prompt-id', 'test', '/tmp', '/tmp', diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index bd236c7523..5682011973 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -20,8 +20,8 @@ import type { ContextEnvironment } from './sidecar/environment.js'; import type { SidecarConfig } from './sidecar/types.js'; import { ProcessorRegistry } from './sidecar/registry.js'; +import { PipelineOrchestrator } from './sidecar/orchestrator.js'; import type { ContextProcessor } from './pipeline.js'; -import type { AsyncContextWorker } from './workers/asyncContextWorker.js'; import { ToolMaskingProcessor } from './processors/toolMaskingProcessor.js'; import { BlobDegradationProcessor } from './processors/blobDegradationProcessor.js'; @@ -40,7 +40,7 @@ export class ContextManager { // Internal sub-components // Synchronous processors are instantiated but effectively used as singletons within this class - private workers: AsyncContextWorker[] = []; + private orchestrator: PipelineOrchestrator; @@ -48,16 +48,17 @@ export class ContextManager { this.eventBus = new ContextEventBus(); + if ('setEventBus' in this.env) { + (this.env as any).setEventBus(this.eventBus); + } - - + this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer); // Register built-ins ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts as any) }); ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) }); ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts as any) }); ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts as any) }); - ProcessorRegistry.register({ id: 'StateSnapshotWorker', create: (env, opts) => new StateSnapshotWorker(env) }); this.eventBus.onVariantReady((event) => { @@ -76,25 +77,13 @@ export class ContextManager { ); } }); - - // Initialize synchronous fallback processors - // Order matters: Fast, lossless masking -> Intelligent degradation -> Brutal truncation fallback - - // Initialize and start background subconscious workers - for (const bgDef of this.sidecar.pipelines.eagerBackground) { - const worker = ProcessorRegistry.get(bgDef.processorId).create(this.env, bgDef.options) as AsyncContextWorker; - worker.start(this.eventBus); - this.workers.push(worker); - } } /** * Safely stops background workers and clears event listeners. */ shutdown() { - for (const worker of this.workers) { - worker.stop(); - } + this.orchestrator.shutdown(); if (this.unsubscribeHistory) { this.unsubscribeHistory(); } @@ -193,34 +182,14 @@ export class ContextManager { protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant } - const createAccountingState = (currentTotal: number) => ({ - currentTokens: currentTotal, + return this.orchestrator.executePipelineForking('Immediate Sanitization', this.getWorkingBufferView(), { + currentTokens: currentTokens, maxTokens: mngConfig.budget.maxTokens, retainedTokens: mngConfig.budget.retainedTokens, - deficitTokens: Math.max(0, currentTotal - mngConfig.budget.maxTokens), + deficitTokens: Math.max(0, currentTokens - mngConfig.budget.maxTokens), protectedEpisodeIds: protectedIds, - isBudgetSatisfied: currentTotal <= mngConfig.budget.maxTokens, // We use maxTokens here so processors don't prematurely short-circuit if they are trying to prevent a barrier hit + isBudgetSatisfied: currentTokens <= mngConfig.budget.maxTokens, }); - - // Run Retained Graph - let processedRetained = [...retainedWindow]; - for (const def of mngConfig.pipelines.retainedProcessingGraph) { - const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor; - this.tracer.logEvent('ContextManager', `Running ${processor.name} on retained window.`); - const state = createAccountingState(this.calculateIrTokens([...normalWindow, ...processedRetained])); - processedRetained = await processor.process(processedRetained, state); - } - - // Run Normal Graph - let processedNormal = [...normalWindow]; - for (const def of mngConfig.pipelines.normalProcessingGraph) { - const processor = ProcessorRegistry.get(def.processorId).create(this.env, def.options) as ContextProcessor; - this.tracer.logEvent('ContextManager', `Running ${processor.name} on normal window.`); - const state = createAccountingState(this.calculateIrTokens([...processedNormal, ...processedRetained])); - processedNormal = await processor.process(processedNormal, state); - } - - return [...processedNormal, ...processedRetained]; } public getWorkingBufferView(): Episode[] { diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 7dd821daa7..471f565f25 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -6,7 +6,7 @@ import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; import type { Episode, ToolExecution } from '../ir/types.js'; -import type { ContextEnvironment, ContextEventBus, ContextTracer } from '../sidecar/environment.js'; +import type { ContextEnvironment, ContextEventBus } from '../sidecar/environment.js'; import { estimateContextTokenCountSync as estimateTokenCountSync } from '../utils/contextTokenCalculator.js'; import { v4 as uuidv4 } from 'uuid'; import { LlmRole } from '../../telemetry/llmRole.js'; @@ -25,8 +25,6 @@ export class StateSnapshotProcessor implements ContextProcessor { readonly name = 'StateSnapshotProcessor'; readonly options: StateSnapshotProcessorOptions; private readonly env: ContextEnvironment; - private readonly eventBus: ContextEventBus; - private tracer?: ContextTracer; private isSynthesizing = false; constructor( @@ -36,7 +34,6 @@ export class StateSnapshotProcessor implements ContextProcessor { ) { this.env = env; this.options = options; - this.eventBus = eventBus; } async process(episodes: Episode[], state: ContextAccountingState): Promise { @@ -52,7 +49,7 @@ export class StateSnapshotProcessor implements ContextProcessor { const ep = episodes[i]; selectedEpisodes.push(ep); deficitAccumulator += estimateTokenCountSync([ - { text: ep.trigger?.semanticParts?.[0]?.text ?? '' }, + { text: ep.trigger?.parts?.[0]?.text ?? '' }, { text: ep.yield?.text ?? '' }, ]); if (deficitAccumulator >= targetDeficit) break; @@ -90,7 +87,7 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; for (const ep of episodes) { if (ep.trigger) { - userPromptText += `USER: ${ep.trigger.semanticParts?.map((p: any) => p.text).join('')}\n`; + userPromptText += `USER: ${ep.trigger.parts?.map((p: any) => p.text).join('')}\n`; } for (const step of ep.steps) { if (step.type === 'TOOL_EXECUTION') { @@ -147,11 +144,6 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo }, }; } catch (error) { - if (this.tracer) { - this.tracer.logEvent('WorkerError', 'Snapshot synthesis failed', { - error: error instanceof Error ? error.message : String(error), - }); - } console.error('Failed to synthesize snapshot:', error); throw error; } diff --git a/packages/core/src/context/sidecar/environmentImpl.ts b/packages/core/src/context/sidecar/environmentImpl.ts index fff58ea3fa..0cbfc12bba 100644 --- a/packages/core/src/context/sidecar/environmentImpl.ts +++ b/packages/core/src/context/sidecar/environmentImpl.ts @@ -23,7 +23,6 @@ export class ContextEnvironmentImpl implements ContextEnvironment { private charsPerToken: number, ) {} - // TODO(joshualitt): Idiomatic getters and setters setEventBus(bus: ContextEventBus) { this.eventBus = bus; } diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 8ab672db92..43ddb62737 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -74,7 +74,7 @@ export class PipelineOrchestrator { this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`); // Retrieve the most recent pristine state from the bus. // The EventBus must hold the current graph state for orchestrated async execution. - const currentState = []; + const currentState: Episode[] = []; if (!currentState || currentState.length === 0) return; // We assume the eventBus or ContextManager keeps accounting state updated. diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index d3cc18040f..918c1917c8 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -6,15 +6,20 @@ import { vi } from 'vitest'; import type { Config } from '../../config/config.js'; - import type { ContextEnvironment } from '../sidecar/environment.js'; +import type { Content } from '@google/genai'; +import { AgentChatHistory } from '../../core/agentChatHistory.js'; +import { ContextManager } from '../contextManager.js'; + export function createMockEnvironment(): ContextEnvironment { return { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion getLlmClient: vi.fn().mockReturnValue({ generateContent: vi.fn().mockResolvedValue({ text: 'Mock LLM summary response', }), - }) as any, + } as unknown as BaseLlmClient), + getPromptId: vi.fn().mockReturnValue('mock-prompt-id'), getSessionId: vi.fn().mockReturnValue('mock-session'), getTraceDir: vi.fn().mockReturnValue('/tmp/.gemini/trace'), getProjectTempDir: vi.fn().mockReturnValue('/tmp/.gemini/tool-outputs'), @@ -23,9 +28,6 @@ export function createMockEnvironment(): ContextEnvironment { getCharsPerToken: vi.fn().mockReturnValue(1), }; } -import type { Content } from '@google/genai'; -import { AgentChatHistory } from '../../core/agentChatHistory.js'; -import { ContextManager } from '../contextManager.js'; /** * Creates a block of synthetic conversation history designed to consume a specific number of tokens. @@ -76,6 +78,7 @@ export function createMockContextConfig( getSessionId: vi.fn().mockReturnValue('test-session'), }; + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion return { ...defaultConfig, ...overrides } as unknown as Config; } @@ -85,13 +88,15 @@ export function createMockContextConfig( import { ContextTracer } from '../tracer.js'; import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js'; import { SidecarLoader } from '../sidecar/SidecarLoader.js'; +import type { BaseLlmClient } from 'src/core/baseLlmClient.js'; export function setupContextComponentTest(config: Config) { const chatHistory = new AgentChatHistory(); const sidecar = SidecarLoader.fromLegacyConfig(config); const tracer = new ContextTracer('/tmp', 'test-session'); const env = new ContextEnvironmentImpl( - config.getBaseLlmClient() as any, + config.getBaseLlmClient(), + 'test prompt-id', 'test-session', '/tmp', '/tmp/gemini-test',