diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 2627e63174..01652d0ac0 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -139,87 +139,19 @@ export class ContextManager { * This does NOT mutate the pristine episodic graph. */ async projectCompressedHistory(): Promise { - if (!this.sidecar.budget) { - return IrProjector.projectAndDump(this.pristineEpisodes, this.env); - } - - const mngConfig = this.sidecar; - const maxTokens = mngConfig.budget.maxTokens; this.tracer.logEvent('ContextManager', 'Projection requested.'); - - // Get the dynamically computed Working Buffer View - let currentEpisodes = this.getWorkingBufferView(); - - let currentTokens = calculateEpisodeListTokens(currentEpisodes); - - - if (currentTokens <= maxTokens) { - this.tracer.logEvent('ContextManager', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`); - return IrProjector.projectAndDump(currentEpisodes, this.env); + const protectedIds = new Set(); + if (this.pristineEpisodes.length > 0) { + protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant } - - this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.gcBackstop.strategy}`); - // --- The Synchronous Pressure Barrier --- - // The background eager workers couldn't keep up, or a massive file was pasted. - // The Working Buffer View is still over the absolute hard limit (maxTokens). - // We MUST reduce tokens before returning, or the API request will 400. - debugLogger.log( - `Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.gcBackstop.strategy}`, + return IrProjector.project( + this.getWorkingBufferView(), + this.orchestrator, + this.sidecar, + this.tracer, + this.env, + protectedIds ); - - // Calculate target based on gcTarget - let targetTokens = maxTokens; - - if (mngConfig.gcBackstop.target === 'max') { - targetTokens = mngConfig.budget.retainedTokens; - } else if (mngConfig.gcBackstop.target === 'freeNTokens') { - targetTokens = maxTokens - (mngConfig.gcBackstop.freeTokensTarget ?? 10000); - } - - // Structural invariant: We ALWAYS protect the architectural initialization turn (Turn 0) - // We do NOT arbitrarily protect recent episodes (like currentEpisodes.length - 1) - // because an episode can be unboundedly large, and protecting it would crash the LLM. - const protectedEpisodeId = this.pristineEpisodes.length > 0 ? this.pristineEpisodes[0].id : null; - - let remainingTokens = currentTokens; - - const truncated: Episode[] = []; - - const strategy = mngConfig.gcBackstop.strategy; - - - for (const ep of currentEpisodes) { - const epTokens = calculateEpisodeListTokens([ep]); - if (remainingTokens > targetTokens && ep.id !== protectedEpisodeId) { - console.log('DROPPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens); - - remainingTokens -= epTokens; - if (strategy === 'truncate') { - this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`); - - debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`); - } else if (strategy === 'compress') { - this.tracer.logEvent('Barrier', `Compress fallback to truncate for [${ep.id}].`); - debugLogger.warn(`Synchronous compress barrier not fully implemented, truncating Episode ${ep.id}.`); - } else if (strategy === 'rollingSummarizer') { - this.tracer.logEvent('Barrier', `RollingSummarizer fallback to truncate for [${ep.id}].`); - debugLogger.warn(`Synchronous rollingSummarizer barrier not fully implemented, truncating Episode ${ep.id}.`); - } - } else { - console.log('KEEPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens); - truncated.push(ep); - - } - } - currentEpisodes = truncated; - - const finalTokens = calculateEpisodeListTokens(currentEpisodes); - this.tracer.logEvent('ContextManager', `Finished projection. Final token count: ${finalTokens}.`); - debugLogger.log( - `Context Manager finished. Final actual token count: ${finalTokens}.`, - ); - - return IrProjector.projectAndDump(currentEpisodes, this.env); } } diff --git a/packages/core/src/context/ir/projector.ts b/packages/core/src/context/ir/projector.ts index 1f777a95a3..22fc1a3221 100644 --- a/packages/core/src/context/ir/projector.ts +++ b/packages/core/src/context/ir/projector.ts @@ -8,14 +8,60 @@ import type { Content } from '@google/genai'; import { IrMapper } from './mapper.js'; import type { Episode } from './types.js'; import { debugLogger } from '../../utils/debugLogger.js'; -import type { ContextEnvironment } from '../sidecar/environment.js'; +import type { ContextEnvironment, ContextTracer } from '../sidecar/environment.js'; +import type { PipelineOrchestrator } from '../sidecar/orchestrator.js'; +import type { SidecarConfig } from '../sidecar/types.js'; +import { calculateEpisodeListTokens } from '../utils/contextTokenCalculator.js'; export class IrProjector { + /** + * Orchestrates the final projection: takes a working buffer view, + * applies the Immediate Sanitization pipeline, and enforces token boundaries. + */ + static async project( + workingBuffer: Episode[], + orchestrator: PipelineOrchestrator, + sidecar: SidecarConfig, + tracer: ContextTracer, + env: ContextEnvironment, + protectedIds: Set + ): Promise { + if (!sidecar.budget) { + return this.projectAndDump(workingBuffer, env); + } + + const maxTokens = sidecar.budget.maxTokens; + let currentTokens = calculateEpisodeListTokens(workingBuffer); + + if (currentTokens <= maxTokens) { + tracer.logEvent('IrProjector', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`); + return this.projectAndDump(workingBuffer, env); + } + + tracer.logEvent('IrProjector', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier.`); + debugLogger.log(`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}).`); + + const processedEpisodes = await orchestrator.executePipeline('Immediate Sanitization', workingBuffer, { + currentTokens: currentTokens, + maxTokens: sidecar.budget.maxTokens, + retainedTokens: sidecar.budget.retainedTokens, + deficitTokens: Math.max(0, currentTokens - sidecar.budget.maxTokens), + protectedEpisodeIds: protectedIds, + isBudgetSatisfied: currentTokens <= sidecar.budget.maxTokens, + }); + + const finalTokens = calculateEpisodeListTokens(processedEpisodes); + tracer.logEvent('IrProjector', `Finished projection. Final token count: ${finalTokens}.`); + debugLogger.log(`Context Manager finished. Final actual token count: ${finalTokens}.`); + + return this.projectAndDump(processedEpisodes, env); + } + /** * Converts the internal IR graph into a flat Content[] array for the LLM. * If tracing is enabled via environment variables, dumps the payload to disk. */ - static async projectAndDump(episodes: Episode[], env: ContextEnvironment): Promise { + private static async projectAndDump(episodes: Episode[], env: ContextEnvironment): Promise { const contents = IrMapper.fromIr(episodes); if (process.env['GEMINI_DUMP_CONTEXT'] === 'true') { diff --git a/packages/core/src/context/processors/emergencyTruncationProcessor.ts b/packages/core/src/context/processors/emergencyTruncationProcessor.ts index 69e077fdeb..3611933555 100644 --- a/packages/core/src/context/processors/emergencyTruncationProcessor.ts +++ b/packages/core/src/context/processors/emergencyTruncationProcessor.ts @@ -7,7 +7,7 @@ import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; import type { Episode } from '../ir/types.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; -import { estimateContextTokenCountSync } from '../utils/contextTokenCalculator.js'; +import { calculateEpisodeListTokens } from '../utils/contextTokenCalculator.js'; export interface EmergencyTruncationProcessorOptions {} @@ -32,8 +32,7 @@ export class EmergencyTruncationProcessor implements ContextProcessor { // We respect the global protected Episode IDs (like the system prompt at index 0) for (const ep of episodes) { - // Calculate individual episode tokens efficiently (assume metadata is accurate if present) - const epTokens = ep.yield?.metadata?.currentTokens ?? estimateContextTokenCountSync([{ text: ep.yield?.text ?? '' }]); + const epTokens = calculateEpisodeListTokens([ep]); if (remainingTokens > targetTokens && !state.protectedEpisodeIds.has(ep.id)) { remainingTokens -= epTokens; diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 43ddb62737..e6a57b3504 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -53,100 +53,61 @@ export class PipelineOrchestrator { for (const trigger of pipeline.triggers) { if (typeof trigger === 'object' && trigger.type === 'timer') { const timer = setInterval(() => { - this.executePipelineAsync(pipeline); + // For background timers, we need a way to get the latest state + // But timers are generally disabled right now via the triggers config. + // If needed, we will pass it via event bus. }, trigger.intervalMs); this.activeTimers.push(timer); } else if (trigger === 'budget_exceeded') { - this.eventBus.onConsolidationNeeded(() => { - this.executePipelineAsync(pipeline); + this.eventBus.onConsolidationNeeded((event) => { + const state: ContextAccountingState = { + currentTokens: 0, + retainedTokens: this.config.budget.retainedTokens, + maxTokens: this.config.budget.maxTokens, + isBudgetSatisfied: false, + deficitTokens: event.targetDeficit, + protectedEpisodeIds: new Set() + }; + this.executePipelineAsync(pipeline, event.episodes, state); }); } - // 'on_turn' and 'post_turn' are handled synchronously via direct calls from the ContextManager. } } } + shutdown() { + for (const timer of this.activeTimers) { + clearInterval(timer); + } + } + /** * Executes a pipeline asynchronously in the background. This is the "Eventual Consistency" path. * When the pipeline resolves, it emits a VariantReady event to cache the new graph. */ - private async executePipelineAsync(pipeline: PipelineDef) { - this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`); - // Retrieve the most recent pristine state from the bus. - // The EventBus must hold the current graph state for orchestrated async execution. - const currentState: Episode[] = []; - if (!currentState || currentState.length === 0) return; - - // We assume the eventBus or ContextManager keeps accounting state updated. - const state: ContextAccountingState = { - currentTokens: 0, - // This needs to be calculated or passed down. For now, processors re-calculate. - retainedTokens: this.config.budget.retainedTokens, - maxTokens: this.config.budget.maxTokens, - isBudgetSatisfied: false, - deficitTokens: 0, - protectedEpisodeIds: new Set() - }; - - let currentEpisodes = [...currentState]; - - for (const procDef of pipeline.processors) { - const processor = this.instantiatedProcessors.get(procDef.processorId); - if (!processor) continue; - - try { - const result = processor.process(currentEpisodes, state); - if (result instanceof Promise) { - currentEpisodes = await result; - } else { - currentEpisodes = result; - } - } catch (error) { - debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error); - return; // Halt pipeline - } - } - - // Success! The background pipeline finished. - // Instead of forcing the Orchestrator to emit complex variant geometries, - // we can just emit a "GraphUpdated" or standard "VariantReady" event containing the entire new subset. - // For simplicity right now, if a pipeline runs asynchronously, we emit a "GraphVariant" event. - // this.eventBus.emitGraphVariantReady(currentEpisodes); - } - /** - * Executes a pipeline synchronously. If any processor returns a Promise, this method - * automatically forks that Promise to the background (falling back to async/eventual consistency) - * and immediately returns the synchronous results computed up to that point. + * Executes a pipeline based on its configured execution strategy ('blocking' or 'background'). */ - executePipelineForking(pipelineName: string, episodes: Episode[], state: ContextAccountingState): Episode[] { + async executePipeline(pipelineName: string, episodes: Episode[], state: ContextAccountingState): Promise { const pipeline = this.config.pipelines.find(p => p.name === pipelineName); if (!pipeline) return episodes; - let currentEpisodes = [...episodes]; + if (pipeline.execution === 'background') { + this.executePipelineAsync(pipeline, episodes, state).catch(e => { + debugLogger.error(`Background pipeline ${pipeline.name} failed:`, e); + }); + return episodes; // Return immediately + } + // Blocking execution + let currentEpisodes = [...episodes]; for (let i = 0; i < pipeline.processors.length; i++) { const procDef = pipeline.processors[i]; const processor = this.instantiatedProcessors.get(procDef.processorId); if (!processor) continue; try { - const result = processor.process(currentEpisodes, state); - if (result instanceof Promise) { - // *** THE FORK *** - // A processor went Async. We halt the synchronous chain here and return the state as-is. - this.tracer.logEvent('Orchestrator', `Pipeline ${pipeline.name} forked to background at ${procDef.processorId}`); - - // Continue resolving the rest of the pipeline in the background. - this.continuePipelineAsync(pipeline, result, i + 1, state).catch(e => { - debugLogger.error(`Background fork of ${pipeline.name} failed:`, e); - }); - - // Return the strictly synchronous output back to the LLM immediately! - return currentEpisodes; - } else { - currentEpisodes = result; - } + currentEpisodes = await processor.process(currentEpisodes, state); } catch (error) { debugLogger.error(`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, error); return currentEpisodes; // Return what we have so far @@ -156,26 +117,25 @@ export class PipelineOrchestrator { return currentEpisodes; } - private async continuePipelineAsync(pipeline: PipelineDef, asyncResult: Promise, startIndex: number, state: ContextAccountingState) { - let currentEpisodes = await asyncResult; + /** + * Internal method for running a pipeline entirely in the background. + */ + private async executePipelineAsync(pipeline: PipelineDef, currentState: Episode[], state: ContextAccountingState) { + this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`); + if (!currentState || currentState.length === 0) return; - for (let i = startIndex; i < pipeline.processors.length; i++) { - const procDef = pipeline.processors[i]; + let currentEpisodes = [...currentState]; + + for (const procDef of pipeline.processors) { const processor = this.instantiatedProcessors.get(procDef.processorId); if (!processor) continue; - const result = processor.process(currentEpisodes, state); - if (result instanceof Promise) { - currentEpisodes = await result; - } else { - currentEpisodes = result; + try { + currentEpisodes = await processor.process(currentEpisodes, state); + } catch (error) { + debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error); + return; // Halt pipeline } } - - // this.eventBus.emitGraphVariantReady(currentEpisodes); - } - - shutdown() { - this.activeTimers.forEach(clearInterval); } } diff --git a/packages/core/src/context/sidecar/profiles.ts b/packages/core/src/context/sidecar/profiles.ts index 8d4fea60c5..505b14d0e9 100644 --- a/packages/core/src/context/sidecar/profiles.ts +++ b/packages/core/src/context/sidecar/profiles.ts @@ -24,6 +24,7 @@ export const defaultSidecarProfile: SidecarConfig = { { name: 'Immediate Sanitization', triggers: ['on_turn'], + execution: 'blocking', processors: [ { processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 8000 } }, { processorId: 'BlobDegradationProcessor', options: {} }, @@ -34,6 +35,7 @@ export const defaultSidecarProfile: SidecarConfig = { { name: 'Deep Background Compression', triggers: [{ type: 'timer', intervalMs: 5000 }, 'budget_exceeded'], + execution: 'background', processors: [ { processorId: 'HistorySquashingProcessor', options: { maxTokensPerNode: 3000 } }, { processorId: 'StateSnapshotProcessor', options: {} } diff --git a/packages/core/src/context/sidecar/types.ts b/packages/core/src/context/sidecar/types.ts index 31fa153394..2253a8b320 100644 --- a/packages/core/src/context/sidecar/types.ts +++ b/packages/core/src/context/sidecar/types.ts @@ -24,6 +24,7 @@ export type PipelineTrigger = export interface PipelineDef { name: string; triggers: PipelineTrigger[]; + execution: 'blocking' | 'background'; processors: ProcessorConfig[]; }