diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index e2cfdee7eb..9cfb6dfd38 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -50,3 +50,18 @@ export interface ContextProcessor { */ process(editor: EpisodeEditor, state: ContextAccountingState): Promise; } + +/** + * Standardized configuration options for processors that act as a GC Backstop. + * Defines exactly how much of the targeted (degraded/aged-out) history should be cleared. + */ +export interface BackstopTargetOptions { + /** + * - 'incremental': Remove just enough to get under the threshold (maxTokens or retainedTokens). + * - 'freeNTokens': Remove enough to free an explicit number of tokens (defined in freeTokensTarget). + * - 'max': Remove/Summarize all explicitly targeted nodes (everything that aged out). + */ + target?: 'incremental' | 'freeNTokens' | 'max'; + /** If target is 'freeNTokens', this is the amount of tokens to clear. */ + freeTokensTarget?: number; +} diff --git a/packages/core/src/context/processors/emergencyTruncationProcessor.ts b/packages/core/src/context/processors/emergencyTruncationProcessor.ts index 2b768829aa..df154ee2a0 100644 --- a/packages/core/src/context/processors/emergencyTruncationProcessor.ts +++ b/packages/core/src/context/processors/emergencyTruncationProcessor.ts @@ -4,11 +4,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; +import type { + ContextProcessor, + ContextAccountingState, + BackstopTargetOptions, +} from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; -export type EmergencyTruncationProcessorOptions = Record; +export type EmergencyTruncationProcessorOptions = BackstopTargetOptions; export class EmergencyTruncationProcessor implements ContextProcessor { static create( @@ -18,6 +22,21 @@ export class EmergencyTruncationProcessor implements ContextProcessor { return new EmergencyTruncationProcessor(env, options); } + static readonly schema = { + type: 'object', + properties: { + target: { + type: 'string', + enum: ['incremental', 'freeNTokens', 'max'], + description: 'How much of the targeted history to truncate.', + }, + freeTokensTarget: { + type: 'number', + description: 'The number of tokens to free if target is freeNTokens.', + }, + }, + }; + readonly id = 'EmergencyTruncationProcessor'; readonly name = 'EmergencyTruncationProcessor'; readonly options: EmergencyTruncationProcessorOptions; @@ -32,23 +51,41 @@ export class EmergencyTruncationProcessor implements ContextProcessor { editor: EpisodeEditor, state: ContextAccountingState, ): Promise { - if (state.currentTokens <= state.maxTokens) return; - - let remainingTokens = state.currentTokens; - const targetTokens = state.maxTokens; const toRemove: string[] = []; - // We respect the global protected Episode IDs (like the system prompt at index 0) - for (const ep of editor.getFullHistory()) { + // Calculate how many tokens we need to remove based on the configured knob + let targetTokensToRemove = 0; + const strategy = this.options.target ?? 'max'; + + if (strategy === 'incremental') { + if (state.currentTokens <= state.maxTokens) return; + targetTokensToRemove = state.currentTokens - state.maxTokens; + } else if (strategy === 'freeNTokens') { + targetTokensToRemove = this.options.freeTokensTarget ?? 0; + if (targetTokensToRemove <= 0) return; + } else if (strategy === 'max') { + // 'max' means we remove all targets without stopping early + targetTokensToRemove = Infinity; + } + + let removedTokens = 0; + + // Iterate specifically over targets (which represent the aged-out delta). + // The editor returns targets from oldest to newest based on the working order. + // For truncation, we want to cut the oldest first. + for (const target of editor.targets) { + const ep = target.episode; + // We only truncate entire episodes here for safety and structural integrity + if (target.node !== ep) continue; + + if (removedTokens >= targetTokensToRemove) break; + const epTokens = this._env.tokenCalculator.calculateEpisodeListTokens([ ep, ]); - if ( - remainingTokens > targetTokens && - !state.protectedEpisodeIds.has(ep.id) - ) { - remainingTokens -= epTokens; + if (!state.protectedEpisodeIds.has(ep.id) && !toRemove.includes(ep.id)) { + removedTokens += epTokens; toRemove.push(ep.id); } } diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index a914b973b0..4982095afc 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { ContextProcessor, ContextAccountingState } from '../pipeline.js'; +import type { ContextProcessor, ContextAccountingState, BackstopTargetOptions } from '../pipeline.js'; import type { Episode } from '../ir/types.js'; import type { ContextEnvironment, @@ -16,7 +16,7 @@ import { debugLogger } from 'src/utils/debugLogger.js'; import type { EpisodeEditor } from '../ir/episodeEditor.js'; import { isSystemEvent, isToolExecution, isUserPrompt } from '../ir/graphUtils.js'; -export interface StateSnapshotProcessorOptions { +export interface StateSnapshotProcessorOptions extends BackstopTargetOptions { model?: string; systemInstruction?: string; triggerDeficitTokens?: number; @@ -29,6 +29,26 @@ export class StateSnapshotProcessor implements ContextProcessor { ): StateSnapshotProcessor { return new StateSnapshotProcessor(env, options, env.eventBus); } + + static readonly schema = { + type: 'object', + properties: { + target: { + type: 'string', + enum: ['incremental', 'freeNTokens', 'max'], + description: 'How much of the targeted history to summarize.', + }, + freeTokensTarget: { + type: 'number', + description: 'The number of tokens to free if target is freeNTokens.', + }, + systemInstruction: { + type: 'string', + description: 'Custom instructions for the summarizer model.', + }, + }, + }; + readonly id = 'StateSnapshotProcessor'; readonly name = 'StateSnapshotProcessor'; readonly options: StateSnapshotProcessorOptions; @@ -48,35 +68,43 @@ export class StateSnapshotProcessor implements ContextProcessor { editor: EpisodeEditor, state: ContextAccountingState, ): Promise { - const targetDeficit = Math.max( - 0, - state.currentTokens - state.retainedTokens, - ); - if (this.isSynthesizing || targetDeficit <= 0) return; + if (this.isSynthesizing) return; + + // Calculate how many tokens we need to remove based on the configured knob + let targetTokensToRemove = 0; + const strategy = this.options.target ?? 'max'; + + if (strategy === 'incremental') { + if (state.currentTokens <= state.maxTokens) return; + targetTokensToRemove = state.currentTokens - state.maxTokens; + } else if (strategy === 'freeNTokens') { + targetTokensToRemove = this.options.freeTokensTarget ?? 0; + if (targetTokensToRemove <= 0) return; + } else if (strategy === 'max') { + // 'max' means we process all targets without stopping early + targetTokensToRemove = Infinity; + } this.isSynthesizing = true; try { let deficitAccumulator = 0; const selectedEpisodes: Episode[] = []; - for (let i = 1; i < editor.getFullHistory().length - 1; i++) { - const ep = editor.getFullHistory()[i]; + // We scan through the targets oldest to newest to build the block we want to summarize + for (const target of editor.targets) { + const ep = target.episode; + // We only operate on entire episodes for a snapshot + if (target.node !== ep) continue; + + // Skip the very first episode (usually the system prompt) + if (ep.id === editor.getFullHistory()[0].id) continue; + selectedEpisodes.push(ep); - let triggerText = ''; - if (isUserPrompt(ep.trigger)) { - const firstPart = ep.trigger.semanticParts?.[0]; - if (firstPart) { - triggerText = - firstPart.type === 'text' - ? firstPart.text - : (firstPart.presentation?.text ?? ''); - } - } - deficitAccumulator += this.env.tokenCalculator.estimateTokensForParts([ - { text: triggerText }, - { text: ep.yield?.text ?? '' }, - ]); - if (deficitAccumulator >= targetDeficit) break; + + const epTokens = this.env.tokenCalculator.calculateEpisodeListTokens([ep]); + deficitAccumulator += epTokens; + + if (deficitAccumulator >= targetTokensToRemove) break; } if (selectedEpisodes.length < 2) return; // Not enough context to summarize diff --git a/packages/core/src/context/sidecar/builtins.ts b/packages/core/src/context/sidecar/builtins.ts index 7609b46567..c1c886880a 100644 --- a/packages/core/src/context/sidecar/builtins.ts +++ b/packages/core/src/context/sidecar/builtins.ts @@ -89,39 +89,50 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { }, required: ['processorId', 'options'], }, - create: (env, opts) => new HistorySquashingProcessor(env, opts), + create: (env, options) => + HistorySquashingProcessor.create(env, options), }); registry.register({ - id: 'StateSnapshotProcessor', - schema: { - type: 'object', - properties: { - processorId: { const: 'StateSnapshotProcessor' }, - options: { - type: 'object', - properties: { - model: { type: 'string' }, - systemInstruction: { type: 'string' }, - triggerDeficitTokens: { type: 'number' }, + id: 'StateSnapshotProcessor', + schema: { + type: 'object', + properties: { + processorId: { const: 'StateSnapshotProcessor' }, + options: { + type: 'object', + properties: { + model: { type: 'string' }, + systemInstruction: { type: 'string' }, + triggerDeficitTokens: { type: 'number' }, + target: { type: 'string', enum: ['incremental', 'freeNTokens', 'max'] }, + freeTokensTarget: { type: 'number' }, + }, }, }, + required: ['processorId'], }, - required: ['processorId'], - }, - create: (env, opts) => StateSnapshotProcessor.create(env, opts), - }); + create: (env, options) => + StateSnapshotProcessor.create(env, options), + }); - registry.register({ - id: 'EmergencyTruncationProcessor', - schema: { - type: 'object', - properties: { - processorId: { const: 'EmergencyTruncationProcessor' }, - options: { type: 'object' }, + registry.register({ + id: 'EmergencyTruncationProcessor', + schema: { + type: 'object', + properties: { + processorId: { const: 'EmergencyTruncationProcessor' }, + options: { + type: 'object', + properties: { + target: { type: 'string', enum: ['incremental', 'freeNTokens', 'max'] }, + freeTokensTarget: { type: 'number' }, + }, + }, + }, + required: ['processorId'], }, - required: ['processorId'], - }, - create: (env, opts) => EmergencyTruncationProcessor.create(env, opts), - }); + create: (env, options) => + EmergencyTruncationProcessor.create(env, options), + }); }