diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts index 59511d48b6..43c80c62dc 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts @@ -41,6 +41,7 @@ describe('StateSnapshotProcessor', () => { payload: { consumedIds: ['node-A', 'node-B'], newText: '', + type: 'point-in-time', } } ]); diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 9e71dfd07d..a7e5170488 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -3,9 +3,10 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import type { ContextProcessor, ProcessArgs, BackstopTargetOptions, ContextWorker } from '../pipeline.js'; +import type { ContextProcessor, ProcessArgs, BackstopTargetOptions } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import type { ConcreteNode, Snapshot } from '../ir/types.js'; +import { SnapshotGenerator } from '../utils/snapshotGenerator.js'; import { debugLogger } from '../../utils/debugLogger.js'; export interface StateSnapshotProcessorOptions extends BackstopTargetOptions { @@ -13,7 +14,7 @@ export interface StateSnapshotProcessorOptions extends BackstopTargetOptions { systemInstruction?: string; } -export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { +export class StateSnapshotProcessor implements ContextProcessor { static create( env: ContextEnvironment, options: StateSnapshotProcessorOptions, @@ -25,29 +26,12 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { readonly name = 'StateSnapshotProcessor'; readonly options: StateSnapshotProcessorOptions; private readonly env: ContextEnvironment; - - // As a worker, we trigger when nodes are added to proactively accumulate - readonly triggers = { - onNodesAdded: true, - }; + private readonly generator: SnapshotGenerator; constructor(env: ContextEnvironment, options: StateSnapshotProcessorOptions) { this.env = env; this.options = options; - } - - // --- ContextWorker Interface (Proactive Accumulation) --- - async execute({ targets: _targets, inbox: _inbox }: { targets: readonly ConcreteNode[]; inbox: import('../pipeline.js').InboxSnapshot }): Promise { - - // We only care about nodes that have aged out past retainedTokens - // To calculate this precisely, we'd need the ContextAccountingState, but for V0 - // the Orchestrator doesn't pass state to workers. We will assume the Orchestrator - // passes ONLY the "aged out" targets to the worker if triggered by onNodesAdded - // OR we just look for un-snapshotted nodes. - - // For V0: Let's simply wait until the Pipeline invokes the Processor synchronously. - // Building the robust progressively accumulating worker requires the Orchestrator - // to pass ContextAccountingState to the `execute` method, which we can add later. + this.generator = new SnapshotGenerator(env); } // --- ContextProcessor Interface (Sync Backstop / Cache Application) --- @@ -56,12 +40,19 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { return targets; } + // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' + const strategy = this.options.target ?? 'max'; + const expectedType = strategy === 'incremental' ? 'point-in-time' : 'accumulate'; + // 1. Check Inbox for a completed Snapshot (The Fast Path) - const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[] }>('PROPOSED_SNAPSHOT'); + const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[]; type: string }>('PROPOSED_SNAPSHOT'); if (proposedSnapshots.length > 0) { + // Filter for the snapshot type that matches our processor mode + const matchingSnapshots = proposedSnapshots.filter(s => s.payload.type === expectedType); + // Sort by newest timestamp first (we want the most accumulated snapshot) - const sorted = [...proposedSnapshots].sort((a, b) => b.timestamp - a.timestamp); + const sorted = [...matchingSnapshots].sort((a, b) => b.timestamp - a.timestamp); for (const proposed of sorted) { const { consumedIds, newText } = proposed.payload; @@ -100,7 +91,6 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { } // 2. The Synchronous Backstop (The Slow Path) - const strategy = this.options.target ?? "max"; let targetTokensToRemove = 0; if (strategy === 'incremental') { @@ -131,7 +121,7 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { if (nodesToSummarize.length < 2) return targets; // Not enough context try { - const snapshotText = await this.synthesizeSnapshot(nodesToSummarize); + const snapshotText = await this.generator.synthesizeSnapshot(nodesToSummarize, this.options.systemInstruction); const newId = this.env.idGenerator.generateId(); const snapshotNode: Snapshot = { id: newId, @@ -159,38 +149,4 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker { return targets; } } - - private async synthesizeSnapshot(nodes: ConcreteNode[]): Promise { - const systemPrompt = - this.options.systemInstruction ?? - `You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant. -Your task is to synthesize these turns into a single, dense, factual snapshot that preserves all critical context, preferences, active tasks, and factual knowledge, but discards conversational filler, pleasantries, and redundant back-and-forth iterations. - -Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`; - - let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; - for (const node of nodes) { - let nodeContent = ''; - if ('text' in node && typeof node.text === 'string') { - nodeContent = node.text; - } else if ('semanticParts' in node) { - nodeContent = JSON.stringify(node.semanticParts); - } else if ('observation' in node) { - nodeContent = typeof node.observation === 'string' ? node.observation : JSON.stringify(node.observation); - } - - userPromptText += `[${node.type}]: ${nodeContent}\n`; - } - - const response = await this.env.llmClient.generateContent({ - role: 'utility_state_snapshot_processr' as import('../../telemetry/llmRole.js').LlmRole, - modelConfigKey: { model: 'default' }, - contents: [{ role: 'user', parts: [{ text: userPromptText }] }], - systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] }, - promptId: this.env.promptId, - abortSignal: new AbortController().signal, - }); - - return response.text || ''; - } } diff --git a/packages/core/src/context/processors/stateSnapshotWorker.test.ts b/packages/core/src/context/processors/stateSnapshotWorker.test.ts new file mode 100644 index 0000000000..6abe4c42a1 --- /dev/null +++ b/packages/core/src/context/processors/stateSnapshotWorker.test.ts @@ -0,0 +1,112 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import { describe, it, expect, vi } from 'vitest'; +import { StateSnapshotWorker } from './stateSnapshotWorker.js'; +import { + createMockEnvironment, + createDummyNode, +} from '../testing/contextTestUtils.js'; +import { InboxSnapshotImpl } from '../sidecar/inbox.js'; + +describe('StateSnapshotWorker', () => { + it('should generate a snapshot and publish it to the inbox', async () => { + const env = createMockEnvironment(); + // Spy on the publish method + const publishSpy = vi.spyOn(env.inbox, 'publish'); + + const worker = StateSnapshotWorker.create(env, { type: 'point-in-time' }); + + const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); + const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); + + const targets = [nodeA, nodeB]; + const inbox = new InboxSnapshotImpl([]); + + await worker.execute({ targets, inbox }); + + // Ensure generateContent was called + expect(env.llmClient.generateContent).toHaveBeenCalled(); + + // Verify it published to the inbox + expect(publishSpy).toHaveBeenCalledWith( + 'PROPOSED_SNAPSHOT', + expect.objectContaining({ + newText: 'Mock LLM summary response', + consumedIds: ['node-A', 'node-B'], + type: 'point-in-time', + }), + env.idGenerator + ); + }); + + it('should pull previous accumulate snapshot from inbox and append new targets', async () => { + const env = createMockEnvironment(); + const publishSpy = vi.spyOn(env.inbox, 'publish'); + const drainSpy = vi.spyOn(env.inbox, 'drainConsumed'); + + const worker = StateSnapshotWorker.create(env, { type: 'accumulate' }); + + const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); + const targets = [nodeC]; + + // Simulate an existing accumulate draft in the inbox + const inbox = new InboxSnapshotImpl([ + { + id: 'draft-1', + topic: 'PROPOSED_SNAPSHOT', + timestamp: Date.now() - 1000, + payload: { + consumedIds: ['node-A', 'node-B'], + newText: '', + type: 'accumulate', + } + } + ]); + + await worker.execute({ targets, inbox }); + + // The old draft should be consumed + expect(inbox.getConsumedIds().has('draft-1')).toBe(true); + expect(drainSpy).toHaveBeenCalledWith(expect.any(Set)); + + // The new publish should contain ALL consumed IDs (old + new) + expect(publishSpy).toHaveBeenCalledWith( + 'PROPOSED_SNAPSHOT', + expect.objectContaining({ + newText: 'Mock LLM summary response', + consumedIds: ['node-A', 'node-B', 'node-C'], // Aggregated! + type: 'accumulate', + }), + env.idGenerator + ); + + // Verify the LLM was called with the old snapshot prepended + expect(env.llmClient.generateContent).toHaveBeenCalledWith( + expect.objectContaining({ + contents: expect.arrayContaining([ + expect.objectContaining({ + parts: expect.arrayContaining([ + expect.objectContaining({ + text: expect.stringContaining(''), + }) + ]) + }) + ]) + }) + ); + }); + + it('should ignore empty targets', async () => { + const env = createMockEnvironment(); + const publishSpy = vi.spyOn(env.inbox, 'publish'); + const worker = StateSnapshotWorker.create(env, { type: 'accumulate' }); + + await worker.execute({ targets: [], inbox: new InboxSnapshotImpl([]) }); + + expect(env.llmClient.generateContent).not.toHaveBeenCalled(); + expect(publishSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/core/src/context/processors/stateSnapshotWorker.ts b/packages/core/src/context/processors/stateSnapshotWorker.ts new file mode 100644 index 0000000000..ac4c4952ac --- /dev/null +++ b/packages/core/src/context/processors/stateSnapshotWorker.ts @@ -0,0 +1,98 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import type { ContextWorker, InboxSnapshot } from '../pipeline.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; +import type { ConcreteNode } from '../ir/types.js'; +import { SnapshotGenerator } from '../utils/snapshotGenerator.js'; +import { debugLogger } from '../../utils/debugLogger.js'; + +export interface StateSnapshotWorkerOptions { + type?: 'accumulate' | 'point-in-time'; + systemInstruction?: string; +} + +export class StateSnapshotWorker implements ContextWorker { + static create( + env: ContextEnvironment, + options: StateSnapshotWorkerOptions, + ): StateSnapshotWorker { + return new StateSnapshotWorker(env, options); + } + + readonly id = 'StateSnapshotWorker'; + readonly name = 'StateSnapshotWorker'; + readonly options: StateSnapshotWorkerOptions; + private readonly env: ContextEnvironment; + private readonly generator: SnapshotGenerator; + + // Triggers when nodes exceed retained threshold (via retained_exceeded in Orchestrator) + readonly triggers = { + onNodesAdded: true, + }; + + constructor(env: ContextEnvironment, options: StateSnapshotWorkerOptions) { + this.env = env; + this.options = options; + this.generator = new SnapshotGenerator(env); + } + + async execute({ targets, inbox }: { targets: readonly ConcreteNode[]; inbox: InboxSnapshot }): Promise { + if (targets.length === 0) return; + + try { + let nodesToSummarize = [...targets]; + let previousConsumedIds: string[] = []; + const workerType = this.options.type ?? 'point-in-time'; + + if (workerType === 'accumulate') { + // Look for the most recent unconsumed accumulate snapshot in the inbox + const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[]; type: string }>('PROPOSED_SNAPSHOT'); + const accumulateSnapshots = proposedSnapshots.filter(s => s.payload.type === 'accumulate'); + + if (accumulateSnapshots.length > 0) { + // Sort to find the most recent + const latest = [...accumulateSnapshots].sort((a, b) => b.timestamp - a.timestamp)[0]; + + // Consume the old draft so the inbox doesn't fill up with stale drafts + inbox.consume(latest.id); + // And we must persist its consumption back to the live inbox immediately, + // because we are effectively "taking" it from the shelf to modify. + this.env.inbox.drainConsumed(new Set([latest.id])); + + previousConsumedIds = latest.payload.consumedIds; + + // Prepend a synthetic node representing the previous rolling state + const previousStateNode: ConcreteNode = { + id: this.env.idGenerator.generateId(), + logicalParentId: '', + type: 'SNAPSHOT', + timestamp: latest.timestamp, + text: latest.payload.newText, + } as import('../ir/types.js').Snapshot; + + nodesToSummarize = [previousStateNode, ...targets]; + } + } + + const snapshotText = await this.generator.synthesizeSnapshot( + nodesToSummarize, + this.options.systemInstruction, + ); + + const newConsumedIds = [...previousConsumedIds, ...targets.map((t) => t.id)]; + + // In V2, workers communicate their work to the inbox, and the processor picks it up. + this.env.inbox.publish('PROPOSED_SNAPSHOT', { + newText: snapshotText, + consumedIds: newConsumedIds, + type: workerType, + }, this.env.idGenerator); + + } catch (e) { + debugLogger.error('StateSnapshotWorker failed to generate snapshot', e); + } + } +} diff --git a/packages/core/src/context/sidecar/builtins.ts b/packages/core/src/context/sidecar/builtins.ts index 3dd2f651f6..02892c064e 100644 --- a/packages/core/src/context/sidecar/builtins.ts +++ b/packages/core/src/context/sidecar/builtins.ts @@ -10,6 +10,7 @@ import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js'; import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js'; import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js'; +import { StateSnapshotWorker, type StateSnapshotWorkerOptions } from '../processors/stateSnapshotWorker.js'; export function registerBuiltInProcessors(registry: ProcessorRegistry) { registry.register>({ @@ -47,4 +48,10 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) { schema: {}, // Will be added later create: (env, options) => StateSnapshotProcessor.create(env, options), }); + + registry.register({ + id: 'StateSnapshotWorker', + schema: {}, // Will be added later + create: (env, options) => StateSnapshotWorker.create(env, options) as any, // ContextWorker instead of ContextProcessor + }); } diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index b01ff7c089..3e495c6cb2 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -118,6 +118,11 @@ export function createMockEnvironment( fileSystem: new InMemoryFileSystem(), idGenerator: new DeterministicIdGenerator('mock-uuid-'), behaviorRegistry: registry, + inbox: { + publish: vi.fn(), + getMessages: vi.fn().mockReturnValue([]), + drainConsumed: vi.fn(), + } as any, irMapper, ...overrides, } as ContextEnvironment; diff --git a/packages/core/src/context/utils/snapshotGenerator.ts b/packages/core/src/context/utils/snapshotGenerator.ts new file mode 100644 index 0000000000..9d0a57a175 --- /dev/null +++ b/packages/core/src/context/utils/snapshotGenerator.ts @@ -0,0 +1,54 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import type { ConcreteNode } from '../ir/types.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; +import { LlmRole } from '../../telemetry/llmRole.js'; + +export class SnapshotGenerator { + constructor(private readonly env: ContextEnvironment) {} + + async synthesizeSnapshot( + nodes: readonly ConcreteNode[], + systemInstruction?: string, + ): Promise { + const systemPrompt = + systemInstruction ?? + `You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant. +Your task is to synthesize these turns into a single, dense, factual snapshot that preserves all critical context, preferences, active tasks, and factual knowledge, but discards conversational filler, pleasantries, and redundant back-and-forth iterations. + +Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`; + + let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; + for (const node of nodes) { + let nodeContent = ''; + if ('text' in node && typeof node.text === 'string') { + nodeContent = node.text; + } else if ('semanticParts' in node) { + nodeContent = JSON.stringify(node.semanticParts); + } else if ('observation' in node) { + nodeContent = + typeof node.observation === 'string' + ? node.observation + : JSON.stringify(node.observation); + } + + userPromptText += `[${node.type}]: ${nodeContent}\n`; + } + + const response = await this.env.llmClient.generateContent({ + role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR, + modelConfigKey: { model: 'default' }, + contents: [{ role: 'user', parts: [{ text: userPromptText }] }], + systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] }, + promptId: this.env.promptId, + abortSignal: new AbortController().signal, + }); + + const candidate = response.candidates?.[0]; + const textPart = candidate?.content?.parts?.[0]; + return textPart?.text || ''; + } +}