snapshotter

This commit is contained in:
Your Name
2026-04-09 00:55:10 +00:00
parent 6e7987696f
commit 4a34f64efa
7 changed files with 292 additions and 59 deletions
@@ -41,6 +41,7 @@ describe('StateSnapshotProcessor', () => {
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<compressed A and B>',
type: 'point-in-time',
}
}
]);
@@ -3,9 +3,10 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextProcessor, ProcessArgs, BackstopTargetOptions, ContextWorker } from '../pipeline.js';
import type { ContextProcessor, ProcessArgs, BackstopTargetOptions } from '../pipeline.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { ConcreteNode, Snapshot } from '../ir/types.js';
import { SnapshotGenerator } from '../utils/snapshotGenerator.js';
import { debugLogger } from '../../utils/debugLogger.js';
export interface StateSnapshotProcessorOptions extends BackstopTargetOptions {
@@ -13,7 +14,7 @@ export interface StateSnapshotProcessorOptions extends BackstopTargetOptions {
systemInstruction?: string;
}
export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
export class StateSnapshotProcessor implements ContextProcessor {
static create(
env: ContextEnvironment,
options: StateSnapshotProcessorOptions,
@@ -25,29 +26,12 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
readonly name = 'StateSnapshotProcessor';
readonly options: StateSnapshotProcessorOptions;
private readonly env: ContextEnvironment;
// As a worker, we trigger when nodes are added to proactively accumulate
readonly triggers = {
onNodesAdded: true,
};
private readonly generator: SnapshotGenerator;
constructor(env: ContextEnvironment, options: StateSnapshotProcessorOptions) {
this.env = env;
this.options = options;
}
// --- ContextWorker Interface (Proactive Accumulation) ---
async execute({ targets: _targets, inbox: _inbox }: { targets: readonly ConcreteNode[]; inbox: import('../pipeline.js').InboxSnapshot }): Promise<void> {
// We only care about nodes that have aged out past retainedTokens
// To calculate this precisely, we'd need the ContextAccountingState, but for V0
// the Orchestrator doesn't pass state to workers. We will assume the Orchestrator
// passes ONLY the "aged out" targets to the worker if triggered by onNodesAdded
// OR we just look for un-snapshotted nodes.
// For V0: Let's simply wait until the Pipeline invokes the Processor synchronously.
// Building the robust progressively accumulating worker requires the Orchestrator
// to pass ContextAccountingState to the `execute` method, which we can add later.
this.generator = new SnapshotGenerator(env);
}
// --- ContextProcessor Interface (Sync Backstop / Cache Application) ---
@@ -56,12 +40,19 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
return targets;
}
// Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate'
const strategy = this.options.target ?? 'max';
const expectedType = strategy === 'incremental' ? 'point-in-time' : 'accumulate';
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[] }>('PROPOSED_SNAPSHOT');
const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[]; type: string }>('PROPOSED_SNAPSHOT');
if (proposedSnapshots.length > 0) {
// Filter for the snapshot type that matches our processor mode
const matchingSnapshots = proposedSnapshots.filter(s => s.payload.type === expectedType);
// Sort by newest timestamp first (we want the most accumulated snapshot)
const sorted = [...proposedSnapshots].sort((a, b) => b.timestamp - a.timestamp);
const sorted = [...matchingSnapshots].sort((a, b) => b.timestamp - a.timestamp);
for (const proposed of sorted) {
const { consumedIds, newText } = proposed.payload;
@@ -100,7 +91,6 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
}
// 2. The Synchronous Backstop (The Slow Path)
const strategy = this.options.target ?? "max";
let targetTokensToRemove = 0;
if (strategy === 'incremental') {
@@ -131,7 +121,7 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
if (nodesToSummarize.length < 2) return targets; // Not enough context
try {
const snapshotText = await this.synthesizeSnapshot(nodesToSummarize);
const snapshotText = await this.generator.synthesizeSnapshot(nodesToSummarize, this.options.systemInstruction);
const newId = this.env.idGenerator.generateId();
const snapshotNode: Snapshot = {
id: newId,
@@ -159,38 +149,4 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
return targets;
}
}
private async synthesizeSnapshot(nodes: ConcreteNode[]): Promise<string> {
const systemPrompt =
this.options.systemInstruction ??
`You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant.
Your task is to synthesize these turns into a single, dense, factual snapshot that preserves all critical context, preferences, active tasks, and factual knowledge, but discards conversational filler, pleasantries, and redundant back-and-forth iterations.
Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`;
let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n';
for (const node of nodes) {
let nodeContent = '';
if ('text' in node && typeof node.text === 'string') {
nodeContent = node.text;
} else if ('semanticParts' in node) {
nodeContent = JSON.stringify(node.semanticParts);
} else if ('observation' in node) {
nodeContent = typeof node.observation === 'string' ? node.observation : JSON.stringify(node.observation);
}
userPromptText += `[${node.type}]: ${nodeContent}\n`;
}
const response = await this.env.llmClient.generateContent({
role: 'utility_state_snapshot_processr' as import('../../telemetry/llmRole.js').LlmRole,
modelConfigKey: { model: 'default' },
contents: [{ role: 'user', parts: [{ text: userPromptText }] }],
systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] },
promptId: this.env.promptId,
abortSignal: new AbortController().signal,
});
return response.text || '';
}
}
@@ -0,0 +1,112 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi } from 'vitest';
import { StateSnapshotWorker } from './stateSnapshotWorker.js';
import {
createMockEnvironment,
createDummyNode,
} from '../testing/contextTestUtils.js';
import { InboxSnapshotImpl } from '../sidecar/inbox.js';
describe('StateSnapshotWorker', () => {
it('should generate a snapshot and publish it to the inbox', async () => {
const env = createMockEnvironment();
// Spy on the publish method
const publishSpy = vi.spyOn(env.inbox, 'publish');
const worker = StateSnapshotWorker.create(env, { type: 'point-in-time' });
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const targets = [nodeA, nodeB];
const inbox = new InboxSnapshotImpl([]);
await worker.execute({ targets, inbox });
// Ensure generateContent was called
expect(env.llmClient.generateContent).toHaveBeenCalled();
// Verify it published to the inbox
expect(publishSpy).toHaveBeenCalledWith(
'PROPOSED_SNAPSHOT',
expect.objectContaining({
newText: 'Mock LLM summary response',
consumedIds: ['node-A', 'node-B'],
type: 'point-in-time',
}),
env.idGenerator
);
});
it('should pull previous accumulate snapshot from inbox and append new targets', async () => {
const env = createMockEnvironment();
const publishSpy = vi.spyOn(env.inbox, 'publish');
const drainSpy = vi.spyOn(env.inbox, 'drainConsumed');
const worker = StateSnapshotWorker.create(env, { type: 'accumulate' });
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeC];
// Simulate an existing accumulate draft in the inbox
const inbox = new InboxSnapshotImpl([
{
id: 'draft-1',
topic: 'PROPOSED_SNAPSHOT',
timestamp: Date.now() - 1000,
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<old snapshot>',
type: 'accumulate',
}
}
]);
await worker.execute({ targets, inbox });
// The old draft should be consumed
expect(inbox.getConsumedIds().has('draft-1')).toBe(true);
expect(drainSpy).toHaveBeenCalledWith(expect.any(Set));
// The new publish should contain ALL consumed IDs (old + new)
expect(publishSpy).toHaveBeenCalledWith(
'PROPOSED_SNAPSHOT',
expect.objectContaining({
newText: 'Mock LLM summary response',
consumedIds: ['node-A', 'node-B', 'node-C'], // Aggregated!
type: 'accumulate',
}),
env.idGenerator
);
// Verify the LLM was called with the old snapshot prepended
expect(env.llmClient.generateContent).toHaveBeenCalledWith(
expect.objectContaining({
contents: expect.arrayContaining([
expect.objectContaining({
parts: expect.arrayContaining([
expect.objectContaining({
text: expect.stringContaining('<old snapshot>'),
})
])
})
])
})
);
});
it('should ignore empty targets', async () => {
const env = createMockEnvironment();
const publishSpy = vi.spyOn(env.inbox, 'publish');
const worker = StateSnapshotWorker.create(env, { type: 'accumulate' });
await worker.execute({ targets: [], inbox: new InboxSnapshotImpl([]) });
expect(env.llmClient.generateContent).not.toHaveBeenCalled();
expect(publishSpy).not.toHaveBeenCalled();
});
});
@@ -0,0 +1,98 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextWorker, InboxSnapshot } from '../pipeline.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { ConcreteNode } from '../ir/types.js';
import { SnapshotGenerator } from '../utils/snapshotGenerator.js';
import { debugLogger } from '../../utils/debugLogger.js';
export interface StateSnapshotWorkerOptions {
type?: 'accumulate' | 'point-in-time';
systemInstruction?: string;
}
export class StateSnapshotWorker implements ContextWorker {
static create(
env: ContextEnvironment,
options: StateSnapshotWorkerOptions,
): StateSnapshotWorker {
return new StateSnapshotWorker(env, options);
}
readonly id = 'StateSnapshotWorker';
readonly name = 'StateSnapshotWorker';
readonly options: StateSnapshotWorkerOptions;
private readonly env: ContextEnvironment;
private readonly generator: SnapshotGenerator;
// Triggers when nodes exceed retained threshold (via retained_exceeded in Orchestrator)
readonly triggers = {
onNodesAdded: true,
};
constructor(env: ContextEnvironment, options: StateSnapshotWorkerOptions) {
this.env = env;
this.options = options;
this.generator = new SnapshotGenerator(env);
}
async execute({ targets, inbox }: { targets: readonly ConcreteNode[]; inbox: InboxSnapshot }): Promise<void> {
if (targets.length === 0) return;
try {
let nodesToSummarize = [...targets];
let previousConsumedIds: string[] = [];
const workerType = this.options.type ?? 'point-in-time';
if (workerType === 'accumulate') {
// Look for the most recent unconsumed accumulate snapshot in the inbox
const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[]; type: string }>('PROPOSED_SNAPSHOT');
const accumulateSnapshots = proposedSnapshots.filter(s => s.payload.type === 'accumulate');
if (accumulateSnapshots.length > 0) {
// Sort to find the most recent
const latest = [...accumulateSnapshots].sort((a, b) => b.timestamp - a.timestamp)[0];
// Consume the old draft so the inbox doesn't fill up with stale drafts
inbox.consume(latest.id);
// And we must persist its consumption back to the live inbox immediately,
// because we are effectively "taking" it from the shelf to modify.
this.env.inbox.drainConsumed(new Set([latest.id]));
previousConsumedIds = latest.payload.consumedIds;
// Prepend a synthetic node representing the previous rolling state
const previousStateNode: ConcreteNode = {
id: this.env.idGenerator.generateId(),
logicalParentId: '',
type: 'SNAPSHOT',
timestamp: latest.timestamp,
text: latest.payload.newText,
} as import('../ir/types.js').Snapshot;
nodesToSummarize = [previousStateNode, ...targets];
}
}
const snapshotText = await this.generator.synthesizeSnapshot(
nodesToSummarize,
this.options.systemInstruction,
);
const newConsumedIds = [...previousConsumedIds, ...targets.map((t) => t.id)];
// In V2, workers communicate their work to the inbox, and the processor picks it up.
this.env.inbox.publish('PROPOSED_SNAPSHOT', {
newText: snapshotText,
consumedIds: newConsumedIds,
type: workerType,
}, this.env.idGenerator);
} catch (e) {
debugLogger.error('StateSnapshotWorker failed to generate snapshot', e);
}
}
}
@@ -10,6 +10,7 @@ import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from
import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js';
import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js';
import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js';
import { StateSnapshotWorker, type StateSnapshotWorkerOptions } from '../processors/stateSnapshotWorker.js';
export function registerBuiltInProcessors(registry: ProcessorRegistry) {
registry.register<Record<string, never>>({
@@ -47,4 +48,10 @@ export function registerBuiltInProcessors(registry: ProcessorRegistry) {
schema: {}, // Will be added later
create: (env, options) => StateSnapshotProcessor.create(env, options),
});
registry.register<StateSnapshotWorkerOptions>({
id: 'StateSnapshotWorker',
schema: {}, // Will be added later
create: (env, options) => StateSnapshotWorker.create(env, options) as any, // ContextWorker instead of ContextProcessor
});
}
@@ -118,6 +118,11 @@ export function createMockEnvironment(
fileSystem: new InMemoryFileSystem(),
idGenerator: new DeterministicIdGenerator('mock-uuid-'),
behaviorRegistry: registry,
inbox: {
publish: vi.fn(),
getMessages: vi.fn().mockReturnValue([]),
drainConsumed: vi.fn(),
} as any,
irMapper,
...overrides,
} as ContextEnvironment;
@@ -0,0 +1,54 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ConcreteNode } from '../ir/types.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import { LlmRole } from '../../telemetry/llmRole.js';
export class SnapshotGenerator {
constructor(private readonly env: ContextEnvironment) {}
async synthesizeSnapshot(
nodes: readonly ConcreteNode[],
systemInstruction?: string,
): Promise<string> {
const systemPrompt =
systemInstruction ??
`You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant.
Your task is to synthesize these turns into a single, dense, factual snapshot that preserves all critical context, preferences, active tasks, and factual knowledge, but discards conversational filler, pleasantries, and redundant back-and-forth iterations.
Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`;
let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n';
for (const node of nodes) {
let nodeContent = '';
if ('text' in node && typeof node.text === 'string') {
nodeContent = node.text;
} else if ('semanticParts' in node) {
nodeContent = JSON.stringify(node.semanticParts);
} else if ('observation' in node) {
nodeContent =
typeof node.observation === 'string'
? node.observation
: JSON.stringify(node.observation);
}
userPromptText += `[${node.type}]: ${nodeContent}\n`;
}
const response = await this.env.llmClient.generateContent({
role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR,
modelConfigKey: { model: 'default' },
contents: [{ role: 'user', parts: [{ text: userPromptText }] }],
systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] },
promptId: this.env.promptId,
abortSignal: new AbortController().signal,
});
const candidate = response.candidates?.[0];
const textPart = candidate?.content?.parts?.[0];
return textPart?.text || '';
}
}