This commit is contained in:
Your Name
2026-04-06 18:58:49 +00:00
parent e601563652
commit 7c2135574c
6 changed files with 107 additions and 167 deletions
+10 -78
View File
@@ -139,87 +139,19 @@ export class ContextManager {
* This does NOT mutate the pristine episodic graph.
*/
async projectCompressedHistory(): Promise<Content[]> {
if (!this.sidecar.budget) {
return IrProjector.projectAndDump(this.pristineEpisodes, this.env);
}
const mngConfig = this.sidecar;
const maxTokens = mngConfig.budget.maxTokens;
this.tracer.logEvent('ContextManager', 'Projection requested.');
// Get the dynamically computed Working Buffer View
let currentEpisodes = this.getWorkingBufferView();
let currentTokens = calculateEpisodeListTokens(currentEpisodes);
if (currentTokens <= maxTokens) {
this.tracer.logEvent('ContextManager', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`);
return IrProjector.projectAndDump(currentEpisodes, this.env);
const protectedIds = new Set<string>();
if (this.pristineEpisodes.length > 0) {
protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant
}
this.tracer.logEvent('ContextManager', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier. Strategy: ${mngConfig.gcBackstop.strategy}`);
// --- The Synchronous Pressure Barrier ---
// The background eager workers couldn't keep up, or a massive file was pasted.
// The Working Buffer View is still over the absolute hard limit (maxTokens).
// We MUST reduce tokens before returning, or the API request will 400.
debugLogger.log(
`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}). Strategy: ${mngConfig.gcBackstop.strategy}`,
return IrProjector.project(
this.getWorkingBufferView(),
this.orchestrator,
this.sidecar,
this.tracer,
this.env,
protectedIds
);
// Calculate target based on gcTarget
let targetTokens = maxTokens;
if (mngConfig.gcBackstop.target === 'max') {
targetTokens = mngConfig.budget.retainedTokens;
} else if (mngConfig.gcBackstop.target === 'freeNTokens') {
targetTokens = maxTokens - (mngConfig.gcBackstop.freeTokensTarget ?? 10000);
}
// Structural invariant: We ALWAYS protect the architectural initialization turn (Turn 0)
// We do NOT arbitrarily protect recent episodes (like currentEpisodes.length - 1)
// because an episode can be unboundedly large, and protecting it would crash the LLM.
const protectedEpisodeId = this.pristineEpisodes.length > 0 ? this.pristineEpisodes[0].id : null;
let remainingTokens = currentTokens;
const truncated: Episode[] = [];
const strategy = mngConfig.gcBackstop.strategy;
for (const ep of currentEpisodes) {
const epTokens = calculateEpisodeListTokens([ep]);
if (remainingTokens > targetTokens && ep.id !== protectedEpisodeId) {
console.log('DROPPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens);
remainingTokens -= epTokens;
if (strategy === 'truncate') {
this.tracer.logEvent('Barrier', `Truncating episode [${ep.id}].`);
debugLogger.log(`Barrier (truncate): Dropped Episode ${ep.id}`);
} else if (strategy === 'compress') {
this.tracer.logEvent('Barrier', `Compress fallback to truncate for [${ep.id}].`);
debugLogger.warn(`Synchronous compress barrier not fully implemented, truncating Episode ${ep.id}.`);
} else if (strategy === 'rollingSummarizer') {
this.tracer.logEvent('Barrier', `RollingSummarizer fallback to truncate for [${ep.id}].`);
debugLogger.warn(`Synchronous rollingSummarizer barrier not fully implemented, truncating Episode ${ep.id}.`);
}
} else {
console.log('KEEPING EPISODE:', ep.id, 'rem:', remainingTokens, 'tgt:', targetTokens);
truncated.push(ep);
}
}
currentEpisodes = truncated;
const finalTokens = calculateEpisodeListTokens(currentEpisodes);
this.tracer.logEvent('ContextManager', `Finished projection. Final token count: ${finalTokens}.`);
debugLogger.log(
`Context Manager finished. Final actual token count: ${finalTokens}.`,
);
return IrProjector.projectAndDump(currentEpisodes, this.env);
}
}
+48 -2
View File
@@ -8,14 +8,60 @@ import type { Content } from '@google/genai';
import { IrMapper } from './mapper.js';
import type { Episode } from './types.js';
import { debugLogger } from '../../utils/debugLogger.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { ContextEnvironment, ContextTracer } from '../sidecar/environment.js';
import type { PipelineOrchestrator } from '../sidecar/orchestrator.js';
import type { SidecarConfig } from '../sidecar/types.js';
import { calculateEpisodeListTokens } from '../utils/contextTokenCalculator.js';
export class IrProjector {
/**
* Orchestrates the final projection: takes a working buffer view,
* applies the Immediate Sanitization pipeline, and enforces token boundaries.
*/
static async project(
workingBuffer: Episode[],
orchestrator: PipelineOrchestrator,
sidecar: SidecarConfig,
tracer: ContextTracer,
env: ContextEnvironment,
protectedIds: Set<string>
): Promise<Content[]> {
if (!sidecar.budget) {
return this.projectAndDump(workingBuffer, env);
}
const maxTokens = sidecar.budget.maxTokens;
let currentTokens = calculateEpisodeListTokens(workingBuffer);
if (currentTokens <= maxTokens) {
tracer.logEvent('IrProjector', `View is within maxTokens (${currentTokens} <= ${maxTokens}). Returning view.`);
return this.projectAndDump(workingBuffer, env);
}
tracer.logEvent('IrProjector', `View exceeds maxTokens (${currentTokens} > ${maxTokens}). Hitting Synchronous Pressure Barrier.`);
debugLogger.log(`Context Manager Synchronous Barrier triggered: View at ${currentTokens} tokens (limit: ${maxTokens}).`);
const processedEpisodes = await orchestrator.executePipeline('Immediate Sanitization', workingBuffer, {
currentTokens: currentTokens,
maxTokens: sidecar.budget.maxTokens,
retainedTokens: sidecar.budget.retainedTokens,
deficitTokens: Math.max(0, currentTokens - sidecar.budget.maxTokens),
protectedEpisodeIds: protectedIds,
isBudgetSatisfied: currentTokens <= sidecar.budget.maxTokens,
});
const finalTokens = calculateEpisodeListTokens(processedEpisodes);
tracer.logEvent('IrProjector', `Finished projection. Final token count: ${finalTokens}.`);
debugLogger.log(`Context Manager finished. Final actual token count: ${finalTokens}.`);
return this.projectAndDump(processedEpisodes, env);
}
/**
* Converts the internal IR graph into a flat Content[] array for the LLM.
* If tracing is enabled via environment variables, dumps the payload to disk.
*/
static async projectAndDump(episodes: Episode[], env: ContextEnvironment): Promise<Content[]> {
private static async projectAndDump(episodes: Episode[], env: ContextEnvironment): Promise<Content[]> {
const contents = IrMapper.fromIr(episodes);
if (process.env['GEMINI_DUMP_CONTEXT'] === 'true') {
@@ -7,7 +7,7 @@
import type { ContextProcessor, ContextAccountingState } from '../pipeline.js';
import type { Episode } from '../ir/types.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import { estimateContextTokenCountSync } from '../utils/contextTokenCalculator.js';
import { calculateEpisodeListTokens } from '../utils/contextTokenCalculator.js';
export interface EmergencyTruncationProcessorOptions {}
@@ -32,8 +32,7 @@ export class EmergencyTruncationProcessor implements ContextProcessor {
// We respect the global protected Episode IDs (like the system prompt at index 0)
for (const ep of episodes) {
// Calculate individual episode tokens efficiently (assume metadata is accurate if present)
const epTokens = ep.yield?.metadata?.currentTokens ?? estimateContextTokenCountSync([{ text: ep.yield?.text ?? '' }]);
const epTokens = calculateEpisodeListTokens([ep]);
if (remainingTokens > targetTokens && !state.protectedEpisodeIds.has(ep.id)) {
remainingTokens -= epTokens;
@@ -53,100 +53,61 @@ export class PipelineOrchestrator {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {
this.executePipelineAsync(pipeline);
// For background timers, we need a way to get the latest state
// But timers are generally disabled right now via the triggers config.
// If needed, we will pass it via event bus.
}, trigger.intervalMs);
this.activeTimers.push(timer);
} else if (trigger === 'budget_exceeded') {
this.eventBus.onConsolidationNeeded(() => {
this.executePipelineAsync(pipeline);
this.eventBus.onConsolidationNeeded((event) => {
const state: ContextAccountingState = {
currentTokens: 0,
retainedTokens: this.config.budget.retainedTokens,
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: event.targetDeficit,
protectedEpisodeIds: new Set()
};
this.executePipelineAsync(pipeline, event.episodes, state);
});
}
// 'on_turn' and 'post_turn' are handled synchronously via direct calls from the ContextManager.
}
}
}
shutdown() {
for (const timer of this.activeTimers) {
clearInterval(timer);
}
}
/**
* Executes a pipeline asynchronously in the background. This is the "Eventual Consistency" path.
* When the pipeline resolves, it emits a VariantReady event to cache the new graph.
*/
private async executePipelineAsync(pipeline: PipelineDef) {
this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`);
// Retrieve the most recent pristine state from the bus.
// The EventBus must hold the current graph state for orchestrated async execution.
const currentState: Episode[] = [];
if (!currentState || currentState.length === 0) return;
// We assume the eventBus or ContextManager keeps accounting state updated.
const state: ContextAccountingState = {
currentTokens: 0,
// This needs to be calculated or passed down. For now, processors re-calculate.
retainedTokens: this.config.budget.retainedTokens,
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: 0,
protectedEpisodeIds: new Set()
};
let currentEpisodes = [...currentState];
for (const procDef of pipeline.processors) {
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
try {
const result = processor.process(currentEpisodes, state);
if (result instanceof Promise) {
currentEpisodes = await result;
} else {
currentEpisodes = result;
}
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error);
return; // Halt pipeline
}
}
// Success! The background pipeline finished.
// Instead of forcing the Orchestrator to emit complex variant geometries,
// we can just emit a "GraphUpdated" or standard "VariantReady" event containing the entire new subset.
// For simplicity right now, if a pipeline runs asynchronously, we emit a "GraphVariant" event.
// this.eventBus.emitGraphVariantReady(currentEpisodes);
}
/**
* Executes a pipeline synchronously. If any processor returns a Promise, this method
* automatically forks that Promise to the background (falling back to async/eventual consistency)
* and immediately returns the synchronous results computed up to that point.
* Executes a pipeline based on its configured execution strategy ('blocking' or 'background').
*/
executePipelineForking(pipelineName: string, episodes: Episode[], state: ContextAccountingState): Episode[] {
async executePipeline(pipelineName: string, episodes: Episode[], state: ContextAccountingState): Promise<Episode[]> {
const pipeline = this.config.pipelines.find(p => p.name === pipelineName);
if (!pipeline) return episodes;
let currentEpisodes = [...episodes];
if (pipeline.execution === 'background') {
this.executePipelineAsync(pipeline, episodes, state).catch(e => {
debugLogger.error(`Background pipeline ${pipeline.name} failed:`, e);
});
return episodes; // Return immediately
}
// Blocking execution
let currentEpisodes = [...episodes];
for (let i = 0; i < pipeline.processors.length; i++) {
const procDef = pipeline.processors[i];
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
try {
const result = processor.process(currentEpisodes, state);
if (result instanceof Promise) {
// *** THE FORK ***
// A processor went Async. We halt the synchronous chain here and return the state as-is.
this.tracer.logEvent('Orchestrator', `Pipeline ${pipeline.name} forked to background at ${procDef.processorId}`);
// Continue resolving the rest of the pipeline in the background.
this.continuePipelineAsync(pipeline, result, i + 1, state).catch(e => {
debugLogger.error(`Background fork of ${pipeline.name} failed:`, e);
});
// Return the strictly synchronous output back to the LLM immediately!
return currentEpisodes;
} else {
currentEpisodes = result;
}
currentEpisodes = await processor.process(currentEpisodes, state);
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, error);
return currentEpisodes; // Return what we have so far
@@ -156,26 +117,25 @@ export class PipelineOrchestrator {
return currentEpisodes;
}
private async continuePipelineAsync(pipeline: PipelineDef, asyncResult: Promise<Episode[]>, startIndex: number, state: ContextAccountingState) {
let currentEpisodes = await asyncResult;
/**
* Internal method for running a pipeline entirely in the background.
*/
private async executePipelineAsync(pipeline: PipelineDef, currentState: Episode[], state: ContextAccountingState) {
this.tracer.logEvent('Orchestrator', `Triggering async pipeline: ${pipeline.name}`);
if (!currentState || currentState.length === 0) return;
for (let i = startIndex; i < pipeline.processors.length; i++) {
const procDef = pipeline.processors[i];
let currentEpisodes = [...currentState];
for (const procDef of pipeline.processors) {
const processor = this.instantiatedProcessors.get(procDef.processorId);
if (!processor) continue;
const result = processor.process(currentEpisodes, state);
if (result instanceof Promise) {
currentEpisodes = await result;
} else {
currentEpisodes = result;
try {
currentEpisodes = await processor.process(currentEpisodes, state);
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed at ${procDef.processorId}:`, error);
return; // Halt pipeline
}
}
// this.eventBus.emitGraphVariantReady(currentEpisodes);
}
shutdown() {
this.activeTimers.forEach(clearInterval);
}
}
@@ -24,6 +24,7 @@ export const defaultSidecarProfile: SidecarConfig = {
{
name: 'Immediate Sanitization',
triggers: ['on_turn'],
execution: 'blocking',
processors: [
{ processorId: 'ToolMaskingProcessor', options: { stringLengthThresholdTokens: 8000 } },
{ processorId: 'BlobDegradationProcessor', options: {} },
@@ -34,6 +35,7 @@ export const defaultSidecarProfile: SidecarConfig = {
{
name: 'Deep Background Compression',
triggers: [{ type: 'timer', intervalMs: 5000 }, 'budget_exceeded'],
execution: 'background',
processors: [
{ processorId: 'HistorySquashingProcessor', options: { maxTokensPerNode: 3000 } },
{ processorId: 'StateSnapshotProcessor', options: {} }
@@ -24,6 +24,7 @@ export type PipelineTrigger =
export interface PipelineDef {
name: string;
triggers: PipelineTrigger[];
execution: 'blocking' | 'background';
processors: ProcessorConfig[];
}