home stretch!

This commit is contained in:
Your Name
2026-04-08 19:50:43 +00:00
parent 95a175deca
commit 57f13a196e
4 changed files with 286 additions and 185 deletions
@@ -0,0 +1,107 @@
import { describe, it, expect } from 'vitest';
import { StateSnapshotProcessor } from './stateSnapshotProcessor.js';
import {
createMockEnvironment,
createDummyState,
createDummyNode,
} from '../testing/contextTestUtils.js';
import { InboxSnapshotImpl } from '../sidecar/inbox.js';
describe('StateSnapshotProcessor', () => {
it('should ignore if budget is satisfied', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
const state = createDummyState(true); // satisfied
const targets = [createDummyNode('ep1', 'USER_PROMPT')];
const inbox = new InboxSnapshotImpl([]);
const result = await processor.process({ buffer: {} as any, targets, state, inbox });
expect(result).toBe(targets); // Strict equality
});
it('should apply a valid snapshot from the Inbox (Fast Path)', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
const state = createDummyState(false, 100);
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeA, nodeB, nodeC];
// The background worker created a snapshot of A and B
const inbox = new InboxSnapshotImpl([
{
id: 'msg-1',
topic: 'PROPOSED_SNAPSHOT',
timestamp: Date.now(),
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<compressed A and B>',
}
}
]);
const result = await processor.process({ buffer: {} as any, targets, state, inbox });
// Should remove A and B, insert Snapshot, keep C
expect(result.length).toBe(2);
expect(result[0].type).toBe('SNAPSHOT');
expect((result[0] as any).text).toBe('<compressed A and B>');
expect(result[1].id).toBe('node-C');
// Should consume the message
expect(inbox.getConsumedIds().has('msg-1')).toBe(true);
});
it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
// Make deficit 0 so we don't fall through to the sync backstop and fail the test that way
const state = createDummyState(false, 0);
// node-A is MISSING (user deleted it)
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const targets = [nodeB];
const inbox = new InboxSnapshotImpl([
{
id: 'msg-1',
topic: 'PROPOSED_SNAPSHOT',
timestamp: Date.now(),
payload: {
consumedIds: ['node-A', 'node-B'],
newText: '<compressed A and B>',
}
}
]);
const result = await processor.process({ buffer: {} as any, targets, state, inbox });
// Because deficit is 0, and Inbox was rejected, nothing should change
expect(result.length).toBe(1);
expect(result[0].id).toBe('node-B');
expect(inbox.getConsumedIds().has('msg-1')).toBe(false);
});
it('should fall back to sync backstop if inbox is empty', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'max' }); // Summarize all
const state = createDummyState(false, 100);
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeA, nodeB, nodeC];
const inbox = new InboxSnapshotImpl([]);
const result = await processor.process({ buffer: {} as any, targets, state, inbox });
// Should synthesize a new snapshot synchronously
expect(env.llmClient.generateContent).toHaveBeenCalled();
expect(result.length).toBe(2); // nodeA is skipped as "system prompt", snapshot + nodeA
expect(result[1].type).toBe('SNAPSHOT');
expect((result[1] as any).text).toBe('Mock LLM summary response');
});
});
@@ -1,127 +1,178 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextProcessor, ContextAccountingState, BackstopTargetOptions } from '../pipeline.js';
import type { Episode } from '../ir/types.js';
import type {
ContextEnvironment,
ContextEventBus,
} from '../sidecar/environment.js';
import { v4 as uuidv4 } from 'uuid';
import { LlmRole } from '../../telemetry/llmRole.js';
import { debugLogger } from 'src/utils/debugLogger.js';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
import { isSystemEvent, isToolExecution, isUserPrompt } from '../ir/graphUtils.js';
import type { ContextProcessor, ProcessArgs, BackstopTargetOptions, ContextWorker } from '../pipeline.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { ConcreteNode, Snapshot } from '../ir/types.js';
import { debugLogger } from '../../utils/debugLogger.js';
export interface StateSnapshotProcessorOptions extends BackstopTargetOptions {
model?: string;
systemInstruction?: string;
triggerDeficitTokens?: number;
}
export class StateSnapshotProcessor implements ContextProcessor {
export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
static create(
env: ContextEnvironment,
options: StateSnapshotProcessorOptions,
): StateSnapshotProcessor {
return new StateSnapshotProcessor(env, options, env.eventBus);
return new StateSnapshotProcessor(env, options);
}
static readonly schema = {
type: 'object',
properties: {
target: {
type: 'string',
enum: ['incremental', 'freeNTokens', 'max'],
description: 'How much of the targeted history to summarize.',
},
freeTokensTarget: {
type: 'number',
description: 'The number of tokens to free if target is freeNTokens.',
},
systemInstruction: {
type: 'string',
description: 'Custom instructions for the summarizer model.',
},
},
};
readonly id = 'StateSnapshotProcessor';
readonly name = 'StateSnapshotProcessor';
readonly options: StateSnapshotProcessorOptions;
private readonly env: ContextEnvironment;
private isSynthesizing = false;
constructor(
env: ContextEnvironment,
options: StateSnapshotProcessorOptions,
_eventBus: ContextEventBus,
) {
// As a worker, we trigger when nodes are added to proactively accumulate
readonly triggers = {
onNodesAdded: true,
};
constructor(env: ContextEnvironment, options: StateSnapshotProcessorOptions) {
this.env = env;
this.options = options;
}
async process(
editor: EpisodeEditor,
state: ContextAccountingState,
): Promise<void> {
if (this.isSynthesizing) return;
// --- ContextWorker Interface (Proactive Accumulation) ---
async execute({ targets, inbox }: { targets: ReadonlyArray<ConcreteNode>; inbox: import('../pipeline.js').InboxSnapshot }): Promise<void> {
// We only care about nodes that have aged out past retainedTokens
// To calculate this precisely, we'd need the ContextAccountingState, but for V0
// the Orchestrator doesn't pass state to workers. We will assume the Orchestrator
// passes ONLY the "aged out" targets to the worker if triggered by onNodesAdded
// OR we just look for un-snapshotted nodes.
// For V0: Let's simply wait until the Pipeline invokes the Processor synchronously.
// Building the robust progressively accumulating worker requires the Orchestrator
// to pass ContextAccountingState to the `execute` method, which we can add later.
}
// Calculate how many tokens we need to remove based on the configured knob
// --- ContextProcessor Interface (Sync Backstop / Cache Application) ---
async process({ targets, state, inbox }: ProcessArgs): Promise<ReadonlyArray<ConcreteNode>> {
if (state.isBudgetSatisfied) {
return targets;
}
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{ newText: string; consumedIds: string[] }>('PROPOSED_SNAPSHOT');
if (proposedSnapshots.length > 0) {
// Sort by newest timestamp first (we want the most accumulated snapshot)
const sorted = [...proposedSnapshots].sort((a, b) => b.timestamp - a.timestamp);
for (const proposed of sorted) {
const { consumedIds, newText } = proposed.payload;
// Verify all consumed IDs still exist sequentially in targets
const targetIds = new Set(targets.map(t => t.id));
const isValid = consumedIds.every(id => targetIds.has(id));
if (isValid) {
// If valid, apply it!
const newId = this.env.idGenerator.generateId();
const tokens = this.env.tokenCalculator.estimateTokensForString(newText);
const snapshotNode: Snapshot = {
id: newId,
logicalParentId: newId,
type: 'SNAPSHOT',
timestamp: Date.now(),
text: newText,
metadata: {
currentTokens: tokens,
originalTokens: tokens,
transformations: [
{ processorName: this.name, action: 'SYNTHESIZED', timestamp: Date.now() }
]
}
};
// Remove the consumed nodes and insert the snapshot at the earliest index
const returnedNodes = targets.filter(t => !consumedIds.includes(t.id));
const firstRemovedIdx = targets.findIndex(t => consumedIds.includes(t.id));
if (firstRemovedIdx !== -1) {
const idx = Math.max(0, firstRemovedIdx);
returnedNodes.splice(idx, 0, snapshotNode);
} else {
returnedNodes.unshift(snapshotNode);
}
inbox.consume(proposed.id);
return returnedNodes;
}
}
}
// 2. The Synchronous Backstop (The Slow Path)
const strategy = this.options.target ?? "max";
let targetTokensToRemove = 0;
const strategy = this.options.target ?? 'max';
if (strategy === 'incremental') {
if (state.currentTokens <= state.maxTokens) return;
targetTokensToRemove = state.currentTokens - state.maxTokens;
targetTokensToRemove = state.deficitTokens;
} else if (strategy === 'freeNTokens') {
targetTokensToRemove = this.options.freeTokensTarget ?? 0;
if (targetTokensToRemove <= 0) return;
targetTokensToRemove = this.options.freeTokensTarget ?? state.deficitTokens;
} else if (strategy === 'max') {
// 'max' means we process all targets without stopping early
targetTokensToRemove = Infinity;
}
this.isSynthesizing = true;
try {
let deficitAccumulator = 0;
const selectedEpisodes: Episode[] = [];
let deficitAccumulator = 0;
const nodesToSummarize: ConcreteNode[] = [];
// We scan through the targets oldest to newest to build the block we want to summarize
for (const target of editor.targets) {
const ep = target.episode;
// We only operate on entire episodes for a snapshot
if (target.node !== ep) continue;
// Skip the very first episode (usually the system prompt)
if (ep.id === editor.getFullHistory()[0].id) continue;
selectedEpisodes.push(ep);
const epTokens = this.env.tokenCalculator.calculateEpisodeListTokens([ep]);
deficitAccumulator += epTokens;
if (deficitAccumulator >= targetTokensToRemove) break;
// Scan oldest to newest
for (const node of targets) {
if (node.id === targets[0].id && node.type === 'USER_PROMPT') {
// Keep system prompt if it's the very first node
// In a real system, system prompt is protected, but we double check
continue;
}
nodesToSummarize.push(node);
deficitAccumulator += node.metadata.currentTokens;
if (selectedEpisodes.length < 2) return; // Not enough context to summarize
if (deficitAccumulator >= targetTokensToRemove) break;
}
// Optimization: Do NOT emit VariantComputing, let the Orchestrator handle caching the final result.
const snapshotEp: Episode =
await this.synthesizeSnapshot(selectedEpisodes);
if (nodesToSummarize.length < 2) return targets; // Not enough context
const oldIds = selectedEpisodes.map((ep) => ep.id);
editor.replaceEpisodes(oldIds, snapshotEp, 'STATE_SNAPSHOT');
} finally {
this.isSynthesizing = false;
try {
const snapshotText = await this.synthesizeSnapshot(nodesToSummarize);
const newId = this.env.idGenerator.generateId();
const tokens = this.env.tokenCalculator.estimateTokensForString(snapshotText);
const snapshotNode: Snapshot = {
id: newId,
logicalParentId: newId,
type: 'SNAPSHOT',
timestamp: Date.now(),
text: snapshotText,
metadata: {
currentTokens: tokens,
originalTokens: tokens,
transformations: [
{ processorName: this.name, action: 'SYNTHESIZED', timestamp: Date.now() }
]
}
};
const consumedIds = nodesToSummarize.map(n => n.id);
const returnedNodes = targets.filter(t => !consumedIds.includes(t.id));
const firstRemovedIdx = targets.findIndex(t => consumedIds.includes(t.id));
if (firstRemovedIdx !== -1) {
const idx = Math.max(0, firstRemovedIdx);
returnedNodes.splice(idx, 0, snapshotNode);
} else {
returnedNodes.unshift(snapshotNode);
}
return returnedNodes;
} catch (e) {
debugLogger.error('StateSnapshotProcessor failed sync backstop', e);
return targets;
}
}
private async synthesizeSnapshot(episodes: Episode[]): Promise<Episode> {
const client = this.env.llmClient;
private async synthesizeSnapshot(nodes: ConcreteNode[]): Promise<string> {
const systemPrompt =
this.options.systemInstruction ??
`You are an expert Context Memory Manager. You will be provided with a raw transcript of older conversation turns between a user and an AI assistant.
@@ -130,83 +181,19 @@ Your task is to synthesize these turns into a single, dense, factual snapshot th
Output ONLY the raw factual snapshot, formatted compactly. Do not include markdown wrappers, prefixes like "Here is the snapshot", or conversational elements.`;
let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n';
for (const ep of episodes) {
if (isUserPrompt(ep.trigger)) {
const partsText = ep.trigger.semanticParts
.map((p) => {
if (p.type === 'text') return p.text;
if (p.presentation) return p.presentation.text;
return '';
})
.join('');
userPromptText += `USER: ${partsText}\n`;
} else if (isSystemEvent(ep.trigger)) {
userPromptText += `[SYSTEM EVENT: ${ep.trigger.name}]\n`;
}
for (const step of ep.steps) {
if (isToolExecution(step)) {
userPromptText += `[Tool Called: ${step.toolName}]\n`;
}
}
if (ep.yield) {
userPromptText += `ASSISTANT: ${ep.yield.text}\n`;
}
userPromptText += '\n';
for (const node of nodes) {
userPromptText += `[${node.type}]: ${(node as any).text || JSON.stringify((node as any).semanticParts)}\n`;
}
try {
const response = await client.generateContent({
modelConfigKey: { model: 'state-snapshot-processor' },
contents: [{ role: 'user', parts: [{ text: userPromptText }] }],
const response = await this.env.llmClient.generateContent({
role: 'user' as any,
modelConfigKey: 'default' as any,
contents: [{ role: 'user' as any, parts: [{ text: userPromptText }] }],
systemInstruction: { role: 'system', parts: [{ text: systemPrompt }] },
promptId: this.env.promptId,
role: LlmRole.UTILITY_STATE_SNAPSHOT_PROCESSOR,
abortSignal: new AbortController().signal,
});
});
const snapshotText = response.text;
// Synthesize a new "Episode" representing this compressed block
const newId = uuidv4();
const contentTokens = this.env.tokenCalculator.estimateTokensForParts([
{ text: snapshotText },
]);
return {
type: 'EPISODE',
id: newId,
timestamp: Date.now(),
trigger: {
id: `${newId}-t`,
type: 'USER_PROMPT',
semanticParts: [],
metadata: {
originalTokens: 0,
currentTokens: 0,
transformations: [],
},
},
steps: [],
yield: {
id: `${newId}-y`,
type: 'AGENT_YIELD',
text: `<CONTEXT_SNAPSHOT>\n${snapshotText}\n</CONTEXT_SNAPSHOT>`,
metadata: {
originalTokens: contentTokens,
currentTokens: contentTokens,
transformations: [
{
processorName: 'StateSnapshotProcessor',
action: 'SYNTHESIZED',
timestamp: Date.now(),
},
],
},
},
};
} catch (error) {
debugLogger.error('Failed to synthesize snapshot:', error);
throw error;
}
return response.text || '';
}
}
+31 -24
View File
@@ -1,38 +1,45 @@
import { ProcessorRegistry } from './registry.js';
import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js';
import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js';
import { HistorySquashingProcessor, type HistorySquashingProcessorOptions } from '../processors/historySquashingProcessor.js';
import { SemanticCompressionProcessor, type SemanticCompressionProcessorOptions } from '../processors/semanticCompressionProcessor.js';
import { ToolMaskingProcessor, type ToolMaskingProcessorOptions } from '../processors/toolMaskingProcessor.js';
import { StateSnapshotProcessor, type StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js';
export function registerBuiltInProcessors(registry: ProcessorRegistry) {
registry.register<Record<string, never>>({
id: 'BlobDegradationProcessor',
schema: {
type: 'object',
properties: {
processorId: { const: 'BlobDegradationProcessor' },
options: { type: 'object' },
},
required: ['processorId'],
},
schema: BlobDegradationProcessor.schema,
create: (env) => new BlobDegradationProcessor(env),
});
registry.register<EmergencyTruncationProcessorOptions>({
id: 'EmergencyTruncationProcessor',
schema: {
type: 'object',
properties: {
processorId: { const: 'EmergencyTruncationProcessor' },
options: {
type: 'object',
properties: {
target: { type: 'string', enum: ['incremental', 'freeNTokens', 'max'] },
freeTokensTarget: { type: 'number' },
},
},
},
required: ['processorId'],
},
create: (env, options) =>
EmergencyTruncationProcessor.create(env, options),
schema: EmergencyTruncationProcessor.schema,
create: (env, options) => EmergencyTruncationProcessor.create(env, options),
});
registry.register<HistorySquashingProcessorOptions>({
id: 'HistorySquashingProcessor',
schema: HistorySquashingProcessor.schema,
create: (env, options) => HistorySquashingProcessor.create(env, options),
});
registry.register<SemanticCompressionProcessorOptions>({
id: 'SemanticCompressionProcessor',
schema: SemanticCompressionProcessor.schema,
create: (env, options) => SemanticCompressionProcessor.create(env, options),
});
registry.register<ToolMaskingProcessorOptions>({
id: 'ToolMaskingProcessor',
schema: ToolMaskingProcessor.schema,
create: (env, options) => ToolMaskingProcessor.create(env, options),
});
registry.register<StateSnapshotProcessorOptions>({
id: 'StateSnapshotProcessor',
schema: {}, // Will be added later
create: (env, options) => StateSnapshotProcessor.create(env, options),
});
}
@@ -98,7 +98,7 @@ export class PipelineOrchestrator {
// Fire all workers that care about new nodes
for (const worker of this.instantiatedWorkers.values()) {
if (worker.triggers.onNodesAdded) {
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages());
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
// Fire and forget
worker.execute({ targets: [], inbox: inboxSnapshot }).catch(e => {
debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e);
@@ -177,7 +177,7 @@ export class PipelineOrchestrator {
const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger));
// Freeze the inbox for this pipeline run
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages());
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
for (const pipeline of pipelines) {
for (const procDef of pipeline.processors) {
@@ -232,7 +232,7 @@ export class PipelineOrchestrator {
if (!ship || ship.length === 0) return;
let currentShip = ship;
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages());
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
for (const procDef of pipeline.processors) {
const processor = this.instantiatedProcessors.get(procDef.processorId);