diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts new file mode 100644 index 0000000000..bc7453482b --- /dev/null +++ b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts @@ -0,0 +1,107 @@ +import { describe, it, expect } from 'vitest'; +import { StateSnapshotProcessor } from './stateSnapshotProcessor.js'; +import { + createMockEnvironment, + createDummyState, + createDummyNode, +} from '../testing/contextTestUtils.js'; +import { InboxSnapshotImpl } from '../sidecar/inbox.js'; + +describe('StateSnapshotProcessor', () => { + it('should ignore if budget is satisfied', async () => { + const env = createMockEnvironment(); + const processor = StateSnapshotProcessor.create(env, { target: 'incremental' }); + const state = createDummyState(true); // satisfied + const targets = [createDummyNode('ep1', 'USER_PROMPT')]; + const inbox = new InboxSnapshotImpl([]); + + const result = await processor.process({ buffer: {} as any, targets, state, inbox }); + expect(result).toBe(targets); // Strict equality + }); + + it('should apply a valid snapshot from the Inbox (Fast Path)', async () => { + const env = createMockEnvironment(); + const processor = StateSnapshotProcessor.create(env, { target: 'incremental' }); + const state = createDummyState(false, 100); + + const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); + const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); + const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); + + const targets = [nodeA, nodeB, nodeC]; + + // The background worker created a snapshot of A and B + const inbox = new InboxSnapshotImpl([ + { + id: 'msg-1', + topic: 'PROPOSED_SNAPSHOT', + timestamp: Date.now(), + payload: { + consumedIds: ['node-A', 'node-B'], + newText: '', + } + } + ]); + + const result = await processor.process({ buffer: {} as any, targets, state, inbox }); + + // Should remove A and B, insert Snapshot, keep C + expect(result.length).toBe(2); + expect(result[0].type).toBe('SNAPSHOT'); + expect((result[0] as any).text).toBe(''); + expect(result[1].id).toBe('node-C'); + + // Should consume the message + expect(inbox.getConsumedIds().has('msg-1')).toBe(true); + }); + + it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => { + const env = createMockEnvironment(); + const processor = StateSnapshotProcessor.create(env, { target: 'incremental' }); + // Make deficit 0 so we don't fall through to the sync backstop and fail the test that way + const state = createDummyState(false, 0); + + // node-A is MISSING (user deleted it) + const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); + const targets = [nodeB]; + + const inbox = new InboxSnapshotImpl([ + { + id: 'msg-1', + topic: 'PROPOSED_SNAPSHOT', + timestamp: Date.now(), + payload: { + consumedIds: ['node-A', 'node-B'], + newText: '', + } + } + ]); + + const result = await processor.process({ buffer: {} as any, targets, state, inbox }); + + // Because deficit is 0, and Inbox was rejected, nothing should change + expect(result.length).toBe(1); + expect(result[0].id).toBe('node-B'); + expect(inbox.getConsumedIds().has('msg-1')).toBe(false); + }); + + it('should fall back to sync backstop if inbox is empty', async () => { + const env = createMockEnvironment(); + const processor = StateSnapshotProcessor.create(env, { target: 'max' }); // Summarize all + const state = createDummyState(false, 100); + + const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); + const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); + const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); + const targets = [nodeA, nodeB, nodeC]; + const inbox = new InboxSnapshotImpl([]); + + const result = await processor.process({ buffer: {} as any, targets, state, inbox }); + + // Should synthesize a new snapshot synchronously + expect(env.llmClient.generateContent).toHaveBeenCalled(); + expect(result.length).toBe(2); // nodeA is skipped as "system prompt", snapshot + nodeA + expect(result[1].type).toBe('SNAPSHOT'); + expect((result[1] as any).text).toBe('Mock LLM summary response'); + }); +}); diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 4982095afc..57df8e9b6f 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -1,127 +1,178 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -import type { ContextProcessor, ContextAccountingState, BackstopTargetOptions } from '../pipeline.js'; -import type { Episode } from '../ir/types.js'; -import type { - ContextEnvironment, - ContextEventBus, -} from '../sidecar/environment.js'; -import { v4 as uuidv4 } from 'uuid'; -import { LlmRole } from '../../telemetry/llmRole.js'; -import { debugLogger } from 'src/utils/debugLogger.js'; -import type { EpisodeEditor } from '../ir/episodeEditor.js'; -import { isSystemEvent, isToolExecution, isUserPrompt } from '../ir/graphUtils.js'; +import type { ContextProcessor, ProcessArgs, BackstopTargetOptions, ContextWorker } from '../pipeline.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; +import type { ConcreteNode, Snapshot } from '../ir/types.js'; +import { debugLogger } from '../../utils/debugLogger.js'; export interface StateSnapshotProcessorOptions extends BackstopTargetOptions { model?: string; systemInstruction?: string; - triggerDeficitTokens?: number; } -export class StateSnapshotProcessor implements ContextProcessor { +export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { static create( env: ContextEnvironment, options: StateSnapshotProcessorOptions, ): StateSnapshotProcessor { - return new StateSnapshotProcessor(env, options, env.eventBus); + return new StateSnapshotProcessor(env, options); } - - static readonly schema = { - type: 'object', - properties: { - target: { - type: 'string', - enum: ['incremental', 'freeNTokens', 'max'], - description: 'How much of the targeted history to summarize.', - }, - freeTokensTarget: { - type: 'number', - description: 'The number of tokens to free if target is freeNTokens.', - }, - systemInstruction: { - type: 'string', - description: 'Custom instructions for the summarizer model.', - }, - }, - }; readonly id = 'StateSnapshotProcessor'; readonly name = 'StateSnapshotProcessor'; readonly options: StateSnapshotProcessorOptions; private readonly env: ContextEnvironment; - private isSynthesizing = false; - constructor( - env: ContextEnvironment, - options: StateSnapshotProcessorOptions, - _eventBus: ContextEventBus, - ) { + // As a worker, we trigger when nodes are added to proactively accumulate + readonly triggers = { + onNodesAdded: true, + }; + + constructor(env: ContextEnvironment, options: StateSnapshotProcessorOptions) { this.env = env; this.options = options; } - async process( - editor: EpisodeEditor, - state: ContextAccountingState, - ): Promise { - if (this.isSynthesizing) return; + // --- ContextWorker Interface (Proactive Accumulation) --- + async execute({ targets, inbox }: { targets: ReadonlyArray; inbox: import('../pipeline.js').InboxSnapshot }): Promise { + + // We only care about nodes that have aged out past retainedTokens + // To calculate this precisely, we'd need the ContextAccountingState, but for V0 + // the Orchestrator doesn't pass state to workers. We will assume the Orchestrator + // passes ONLY the "aged out" targets to the worker if triggered by onNodesAdded + // OR we just look for un-snapshotted nodes. + + // For V0: Let's simply wait until the Pipeline invokes the Processor synchronously. + // Building the robust progressively accumulating worker requires the Orchestrator + // to pass ContextAccountingState to the `execute` method, which we can add later. + } - // Calculate how many tokens we need to remove based on the configured knob + // --- ContextProcessor Interface (Sync Backstop / Cache Application) --- + async process({ targets, state, inbox }: ProcessArgs): Promise> { + if (state.isBudgetSatisfied) { + return targets; + } + + // 1. Check Inbox for a completed Snapshot (The Fast Path) + const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[] }>('PROPOSED_SNAPSHOT'); + + if (proposedSnapshots.length > 0) { + // Sort by newest timestamp first (we want the most accumulated snapshot) + const sorted = [...proposedSnapshots].sort((a, b) => b.timestamp - a.timestamp); + + for (const proposed of sorted) { + const { consumedIds, newText } = proposed.payload; + + // Verify all consumed IDs still exist sequentially in targets + const targetIds = new Set(targets.map(t => t.id)); + const isValid = consumedIds.every(id => targetIds.has(id)); + + if (isValid) { + // If valid, apply it! + const newId = this.env.idGenerator.generateId(); + const tokens = this.env.tokenCalculator.estimateTokensForString(newText); + + const snapshotNode: Snapshot = { + id: newId, + logicalParentId: newId, + type: 'SNAPSHOT', + timestamp: Date.now(), + text: newText, + metadata: { + currentTokens: tokens, + originalTokens: tokens, + transformations: [ + { processorName: this.name, action: 'SYNTHESIZED', timestamp: Date.now() } + ] + } + }; + + // Remove the consumed nodes and insert the snapshot at the earliest index + const returnedNodes = targets.filter(t => !consumedIds.includes(t.id)); + const firstRemovedIdx = targets.findIndex(t => consumedIds.includes(t.id)); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + inbox.consume(proposed.id); + return returnedNodes; + } + } + } + + // 2. The Synchronous Backstop (The Slow Path) + const strategy = this.options.target ?? "max"; let targetTokensToRemove = 0; - const strategy = this.options.target ?? 'max'; if (strategy === 'incremental') { - if (state.currentTokens <= state.maxTokens) return; - targetTokensToRemove = state.currentTokens - state.maxTokens; + targetTokensToRemove = state.deficitTokens; } else if (strategy === 'freeNTokens') { - targetTokensToRemove = this.options.freeTokensTarget ?? 0; - if (targetTokensToRemove <= 0) return; + targetTokensToRemove = this.options.freeTokensTarget ?? state.deficitTokens; } else if (strategy === 'max') { - // 'max' means we process all targets without stopping early targetTokensToRemove = Infinity; } - this.isSynthesizing = true; - try { - let deficitAccumulator = 0; - const selectedEpisodes: Episode[] = []; + let deficitAccumulator = 0; + const nodesToSummarize: ConcreteNode[] = []; - // We scan through the targets oldest to newest to build the block we want to summarize - for (const target of editor.targets) { - const ep = target.episode; - // We only operate on entire episodes for a snapshot - if (target.node !== ep) continue; - - // Skip the very first episode (usually the system prompt) - if (ep.id === editor.getFullHistory()[0].id) continue; - - selectedEpisodes.push(ep); - - const epTokens = this.env.tokenCalculator.calculateEpisodeListTokens([ep]); - deficitAccumulator += epTokens; - - if (deficitAccumulator >= targetTokensToRemove) break; + // Scan oldest to newest + for (const node of targets) { + if (node.id === targets[0].id && node.type === 'USER_PROMPT') { + // Keep system prompt if it's the very first node + // In a real system, system prompt is protected, but we double check + continue; } + + nodesToSummarize.push(node); + deficitAccumulator += node.metadata.currentTokens; - if (selectedEpisodes.length < 2) return; // Not enough context to summarize + if (deficitAccumulator >= targetTokensToRemove) break; + } - // Optimization: Do NOT emit VariantComputing, let the Orchestrator handle caching the final result. - const snapshotEp: Episode = - await this.synthesizeSnapshot(selectedEpisodes); + if (nodesToSummarize.length < 2) return targets; // Not enough context - const oldIds = selectedEpisodes.map((ep) => ep.id); - editor.replaceEpisodes(oldIds, snapshotEp, 'STATE_SNAPSHOT'); - } finally { - this.isSynthesizing = false; + try { + const snapshotText = await this.synthesizeSnapshot(nodesToSummarize); + const newId = this.env.idGenerator.generateId(); + const tokens = this.env.tokenCalculator.estimateTokensForString(snapshotText); + + const snapshotNode: Snapshot = { + id: newId, + logicalParentId: newId, + type: 'SNAPSHOT', + timestamp: Date.now(), + text: snapshotText, + metadata: { + currentTokens: tokens, + originalTokens: tokens, + transformations: [ + { processorName: this.name, action: 'SYNTHESIZED', timestamp: Date.now() } + ] + } + }; + + const consumedIds = nodesToSummarize.map(n => n.id); + const returnedNodes = targets.filter(t => !consumedIds.includes(t.id)); + const firstRemovedIdx = targets.findIndex(t => consumedIds.includes(t.id)); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + return returnedNodes; + + } catch (e) { + debugLogger.error('StateSnapshotProcessor failed sync backstop', e); + return targets; } } - private async synthesizeSnapshot(episodes: Episode[]): Promise { - const client = this.env.llmClient; + private async synthesizeSnapshot(nodes: ConcreteNode[]): Promise { const systemPrompt = this.options.systemInstruction ?? `You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant. @@ -130,83 +181,19 @@ Your task is to synthesize these turns into a single, dense, factual snapshot th Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`; let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; - for (const ep of episodes) { - if (isUserPrompt(ep.trigger)) { - const partsText = ep.trigger.semanticParts - .map((p) => { - if (p.type === 'text') return p.text; - if (p.presentation) return p.presentation.text; - return ''; - }) - .join(''); - userPromptText += `USER: ${partsText}\n`; - } else if (isSystemEvent(ep.trigger)) { - userPromptText += `[SYSTEM EVENT: ${ep.trigger.name}]\n`; - } - for (const step of ep.steps) { - if (isToolExecution(step)) { - userPromptText += `[Tool Called: ${step.toolName}]\n`; - } - } - if (ep.yield) { - userPromptText += `ASSISTANT: ${ep.yield.text}\n`; - } - userPromptText += '\n'; + for (const node of nodes) { + userPromptText += `[${node.type}]: ${(node as any).text || JSON.stringify((node as any).semanticParts)}\n`; } - try { - const response = await client.generateContent({ - modelConfigKey: { model: 'state-snapshot-processor' }, - contents: [{ role: 'user', parts: [{ text: userPromptText }] }], + const response = await this.env.llmClient.generateContent({ + role: 'user' as any, + modelConfigKey: 'default' as any, + contents: [{ role: 'user' as any, parts: [{ text: userPromptText }] }], systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] }, promptId: this.env.promptId, - role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR, abortSignal: new AbortController().signal, - }); + }); - const snapshotText = response.text; - - // Synthesize a new "Episode" representing this compressed block - const newId = uuidv4(); - const contentTokens = this.env.tokenCalculator.estimateTokensForParts([ - { text: snapshotText }, - ]); - - return { - type: 'EPISODE', - id: newId, - timestamp: Date.now(), - trigger: { - id: `${newId}-t`, - type: 'USER_PROMPT', - semanticParts: [], - metadata: { - originalTokens: 0, - currentTokens: 0, - transformations: [], - }, - }, - steps: [], - yield: { - id: `${newId}-y`, - type: 'AGENT_YIELD', - text: `\n${snapshotText}\n`, - metadata: { - originalTokens: contentTokens, - currentTokens: contentTokens, - transformations: [ - { - processorName: 'StateSnapshotProcessor', - action: 'SYNTHESIZED', - timestamp: Date.now(), - }, - ], - }, - }, - }; - } catch (error) { - debugLogger.error('Failed to synthesize snapshot:', error); - throw error; - } + return response.text || ''; } } diff --git a/packages/core/src/context/sidecar/builtins.ts b/packages/core/src/context/sidecar/builtins.ts index c1ec6511bc..b246006d55 100644 --- a/packages/core/src/context/sidecar/builtins.ts +++ b/packages/core/src/context/sidecar/builtins.ts @@ -1,38 +1,45 @@ import { ProcessorRegistry } from './registry.js'; import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js'; import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js'; +import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from '../processors/historySquashingProcessor.js'; +import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js'; +import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js'; +import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js'; export function registerBuiltInProcessors(registry: ProcessorRegistry) { registry.register>({ id: 'BlobDegradationProcessor', - schema: { - type: 'object', - properties: { - processorId: { const: 'BlobDegradationProcessor' }, - options: { type: 'object' }, - }, - required: ['processorId'], - }, + schema: BlobDegradationProcessor.schema, create: (env) => new BlobDegradationProcessor(env), }); registry.register({ id: 'EmergencyTruncationProcessor', - schema: { - type: 'object', - properties: { - processorId: { const: 'EmergencyTruncationProcessor' }, - options: { - type: 'object', - properties: { - target: { type: 'string', enum: ['incremental', 'freeNTokens', 'max'] }, - freeTokensTarget: { type: 'number' }, - }, - }, - }, - required: ['processorId'], - }, - create: (env, options) => - EmergencyTruncationProcessor.create(env, options), + schema: EmergencyTruncationProcessor.schema, + create: (env, options) => EmergencyTruncationProcessor.create(env, options), + }); + + registry.register({ + id: 'HistorySquashingProcessor', + schema: HistorySquashingProcessor.schema, + create: (env, options) => HistorySquashingProcessor.create(env, options), + }); + + registry.register({ + id: 'SemanticCompressionProcessor', + schema: SemanticCompressionProcessor.schema, + create: (env, options) => SemanticCompressionProcessor.create(env, options), + }); + + registry.register({ + id: 'ToolMaskingProcessor', + schema: ToolMaskingProcessor.schema, + create: (env, options) => ToolMaskingProcessor.create(env, options), + }); + + registry.register({ + id: 'StateSnapshotProcessor', + schema: {}, // Will be added later + create: (env, options) => StateSnapshotProcessor.create(env, options), }); } diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 4cae47d550..1bc9d1d2e6 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -98,7 +98,7 @@ export class PipelineOrchestrator { // Fire all workers that care about new nodes for (const worker of this.instantiatedWorkers.values()) { if (worker.triggers.onNodesAdded) { - const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages()); + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []); // Fire and forget worker.execute({ targets: [], inbox: inboxSnapshot }).catch(e => { debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e); @@ -177,7 +177,7 @@ export class PipelineOrchestrator { const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger)); // Freeze the inbox for this pipeline run - const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages()); + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []); for (const pipeline of pipelines) { for (const procDef of pipeline.processors) { @@ -232,7 +232,7 @@ export class PipelineOrchestrator { if (!ship || ship.length === 0) return; let currentShip = ship; - const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages()); + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []); for (const procDef of pipeline.processors) { const processor = this.instantiatedProcessors.get(procDef.processorId);