feat(context): persist context engine snapshots across sessions

This commit implements an opaque state export/import pattern for the
ContextManager to ensure expensive LLM-derived snapshots are properly
rehydrated upon session resume.

The ContextManager now exposes `exportState` and `restoreState` methods,
delegating structural validation to the `SnapshotStateHelper`.
During active chat, the GeminiClient routinely passes the finalized
context state down to the ChatRecordingService, which seamlessly
embeds it into the existing JSONL metadata payload. Upon resume, the
saved snapshot is re-published as a draft to the LiveInbox, allowing
the synchronous pipeline to automatically and deterministically splice
it back into the raw graph without an additional LLM call.
This commit is contained in:
Your Name
2026-05-12 19:09:58 +00:00
parent 488d71b8c9
commit 65d4bdfc24
9 changed files with 233 additions and 65 deletions
@@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { createStateSnapshotHydrationProcessor } from '../processors/stateSnapshotHydrationProcessor.js';
import type {
AsyncPipelineDef,
ContextManagementConfig,
@@ -88,6 +89,29 @@ export const generalistProfile: ContextProfile = {
): PipelineDef[] =>
// Helper to merge default options with dynamically loaded processorOptions by ID
[
{
name: 'Initialization Hydration',
triggers: ['initialization'],
processors: [
createStateSnapshotHydrationProcessor('StateSnapshotHydration', env, {
target: (() => {
const res = resolveProcessorOptions(config, 'StateSnapshotSync', {
target: 'max',
maxStateTokens: 4000,
maxSummaryTurns: 5,
}).target;
if (
res === 'incremental' ||
res === 'freeNTokens' ||
res === 'max'
) {
return res;
}
return undefined;
})(),
}),
],
},
{
name: 'Immediate Sanitization',
triggers: ['new_message'],
@@ -7,6 +7,7 @@
import type { ContextProcessor, AsyncContextProcessor } from '../pipeline.js';
export type PipelineTrigger =
| 'initialization'
| 'new_message'
| 'retained_exceeded'
| 'gc_backstop'
@@ -16,6 +16,8 @@ import { HistoryObserver } from './historyObserver.js';
import { render } from './graph/render.js';
import { ContextWorkingBufferImpl } from './pipeline/contextWorkingBuffer.js';
import { debugLogger } from '../utils/debugLogger.js';
import { SnapshotStateHelper } from './utils/snapshotGenerator.js';
import type { ContextEngineState } from '../services/chatRecordingTypes.js';
import { hardenHistory } from '../utils/historyHardening.js';
import { checkContextInvariants } from './utils/invariantChecker.js';
import type { AdvancedTokenCalculator } from './utils/contextTokenCalculator.js';
@@ -429,4 +431,29 @@ export class ContextManager {
);
}
}
exportState(): ContextEngineState {
return SnapshotStateHelper.exportState(this.buffer.nodes);
}
async restoreState(state: ContextEngineState): Promise<void> {
if (!state) return;
SnapshotStateHelper.restoreState(state, this.env.inbox);
// Explicitly run the initialization trigger to eagerly splice the restored snapshot
// into the graph *before* the first user message creates cache artifacts.
const nodes = this.buffer.nodes;
const hydratedNodes = await this.orchestrator.executeTriggerSync(
'initialization',
nodes,
new Set(), // No trigger targets needed, it just reads the inbox
);
// Create a pseudo-processor result to apply the hydration without duplicating logic
this.buffer = this.buffer.applyProcessorResult(
'StateSnapshotHydration',
nodes,
hydratedNodes,
);
}
}
@@ -0,0 +1,111 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { randomUUID } from 'node:crypto';
import type { JSONSchemaType } from 'ajv';
import type { ContextProcessor, ProcessArgs } from '../pipeline.js';
import type { ContextEnvironment } from '../pipeline/environment.js';
import { type Snapshot, NodeType } from '../graph/types.js';
export interface StateSnapshotHydrationProcessorOptions {
target?: 'incremental' | 'freeNTokens' | 'max';
}
export const StateSnapshotHydrationProcessorOptionsSchema: JSONSchemaType<StateSnapshotHydrationProcessorOptions> =
{
type: 'object',
properties: {
target: {
type: 'string',
enum: ['incremental', 'freeNTokens', 'max'],
nullable: true,
},
},
required: [],
};
export function createStateSnapshotHydrationProcessor(
id: string,
env: ContextEnvironment,
options: StateSnapshotHydrationProcessorOptions,
): ContextProcessor {
return {
id,
name: 'StateSnapshotHydrationProcessor',
process: async ({ targets, inbox }: ProcessArgs) => {
if (targets.length === 0) {
return targets;
}
// Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate'
const strategy = options.target ?? 'max';
const expectedType =
strategy === 'incremental' ? 'point-in-time' : 'accumulate';
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{
newText: string;
consumedIds: string[];
type: string;
timestamp: number;
}>('PROPOSED_SNAPSHOT');
if (proposedSnapshots.length > 0) {
// Filter for the snapshot type that matches our processor mode
const matchingSnapshots = proposedSnapshots.filter(
(s) => s.payload.type === expectedType,
);
// Sort by newest timestamp first (we want the most accumulated snapshot)
const sorted = [...matchingSnapshots].sort(
(a, b) => b.timestamp - a.timestamp,
);
for (const proposed of sorted) {
const { consumedIds, newText, timestamp } = proposed.payload;
// Verify all consumed IDs still exist sequentially in targets
const targetIds = new Set(targets.map((t) => t.id));
const isValid = consumedIds.every((id) => targetIds.has(id));
if (isValid) {
// If valid, apply it!
const newId = randomUUID();
const snapshotNode: Snapshot = {
id: newId,
turnId: newId,
type: NodeType.SNAPSHOT,
timestamp: timestamp ?? Date.now(),
role: 'user',
payload: { text: newText },
abstractsIds: consumedIds,
};
// Remove the consumed nodes and insert the snapshot at the earliest index
const returnedNodes = targets.filter(
(t) => !consumedIds.includes(t.id),
);
const firstRemovedIdx = targets.findIndex((t) =>
consumedIds.includes(t.id),
);
if (firstRemovedIdx !== -1) {
const idx = Math.max(0, firstRemovedIdx);
returnedNodes.splice(idx, 0, snapshotNode);
} else {
returnedNodes.unshift(snapshotNode);
}
inbox.consume(proposed.id);
return returnedNodes;
}
}
}
return targets;
},
};
}
@@ -53,76 +53,13 @@ export function createStateSnapshotProcessor(
return {
id,
name: 'StateSnapshotProcessor',
process: async ({ targets, inbox }: ProcessArgs) => {
process: async ({ targets }: ProcessArgs) => {
if (targets.length === 0) {
return targets;
}
// Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate'
const strategy = options.target ?? 'max';
const expectedType =
strategy === 'incremental' ? 'point-in-time' : 'accumulate';
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{
newText: string;
consumedIds: string[];
type: string;
timestamp: number;
}>('PROPOSED_SNAPSHOT');
if (proposedSnapshots.length > 0) {
// Filter for the snapshot type that matches our processor mode
const matchingSnapshots = proposedSnapshots.filter(
(s) => s.payload.type === expectedType,
);
// Sort by newest timestamp first (we want the most accumulated snapshot)
const sorted = [...matchingSnapshots].sort(
(a, b) => b.timestamp - a.timestamp,
);
for (const proposed of sorted) {
const { consumedIds, newText, timestamp } = proposed.payload;
// Verify all consumed IDs still exist sequentially in targets
const targetIds = new Set(targets.map((t) => t.id));
const isValid = consumedIds.every((id) => targetIds.has(id));
if (isValid) {
// If valid, apply it!
const newId = randomUUID();
const snapshotNode: Snapshot = {
id: newId,
turnId: newId,
type: NodeType.SNAPSHOT,
timestamp: timestamp ?? Date.now(),
role: 'user',
payload: { text: newText },
abstractsIds: consumedIds,
};
// Remove the consumed nodes and insert the snapshot at the earliest index
const returnedNodes = targets.filter(
(t) => !consumedIds.includes(t.id),
);
const firstRemovedIdx = targets.findIndex((t) =>
consumedIds.includes(t.id),
);
if (firstRemovedIdx !== -1) {
const idx = Math.max(0, firstRemovedIdx);
returnedNodes.splice(idx, 0, snapshotNode);
} else {
returnedNodes.unshift(snapshotNode);
}
inbox.consume(proposed.id);
return returnedNodes;
}
}
}
// 2. The Synchronous Backstop (The Slow Path)
let targetTokensToRemove = 0;
@@ -52,6 +52,7 @@ export interface BaselineSnapshotInfo {
text: string;
abstractsIds: string[];
id: string;
timestamp: number;
}
/**
@@ -72,11 +73,47 @@ export function findLatestSnapshotBaseline(
? [...lastSnapshotNode.abstractsIds]
: [],
id: lastSnapshotNode.id,
timestamp: lastSnapshotNode.timestamp,
};
}
return undefined;
}
import type { LiveInbox } from '../pipeline/inbox.js';
import type { ContextEngineState } from '../../services/chatRecordingTypes.js';
export const SnapshotStateHelper = {
exportState(nodes: readonly ConcreteNode[]): ContextEngineState {
const baseline = findLatestSnapshotBaseline(nodes);
if (!baseline) return {};
return {
snapshot: {
text: baseline.text,
consumedIds: baseline.abstractsIds,
timestamp: baseline.timestamp,
},
};
},
restoreState(state: ContextEngineState, inbox: LiveInbox): void {
if (!state.snapshot) return;
if (
typeof state.snapshot.text === 'string' &&
Array.isArray(state.snapshot.consumedIds)
) {
inbox.publish('PROPOSED_SNAPSHOT', {
newText: state.snapshot.text,
consumedIds: state.snapshot.consumedIds,
type: 'accumulate',
timestamp: state.snapshot.timestamp ?? Date.now(),
});
}
},
};
export class SnapshotGenerator {
constructor(private readonly env: ContextEnvironment) {}
+11
View File
@@ -414,6 +414,14 @@ export class GeminiClient {
chat,
this.lastPromptId,
);
if (
this.contextManager &&
resumedSessionData?.conversation.contextState
) {
this.contextManager.restoreState(
resumedSessionData.conversation.contextState,
);
}
return chat;
} catch (error) {
await reportError(
@@ -819,6 +827,9 @@ export class GeminiClient {
promptBaseUnits: currentBaseUnits,
});
}
this.chat
?.getChatRecordingService()
?.saveContextState(this.contextManager.exportState());
}
this.updateTelemetryTokenCount();
if (event.type === GeminiEventType.Error) {
@@ -36,6 +36,7 @@ import {
type RewindRecord,
type MetadataUpdateRecord,
type PartialMetadataRecord,
type ContextEngineState,
} from './chatRecordingTypes.js';
export * from './chatRecordingTypes.js';
@@ -646,6 +647,15 @@ export class ChatRecordingService {
}
}
saveContextState(contextState: ContextEngineState): void {
if (!this.conversationFile) return;
try {
this.updateMetadata({ contextState } as Partial<ConversationRecord>);
} catch (e: unknown) {
debugLogger.error('Error saving context state to chat history.', e);
}
}
saveSummary(summary: string): void {
if (!this.conversationFile) return;
try {
@@ -89,6 +89,14 @@ export type MessageRecord = BaseMessageRecord & ConversationRecordExtra;
/**
* Complete conversation record stored in session files.
*/
export interface ContextEngineState {
snapshot?: {
text: string;
consumedIds: string[];
timestamp?: number;
};
}
export interface ConversationRecord {
sessionId: string;
projectHash: string;
@@ -101,8 +109,9 @@ export interface ConversationRecord {
directories?: string[];
/** The kind of conversation (main agent or subagent) */
kind?: 'main' | 'subagent';
/** Opaque state object representing Context Engine state (e.g. snapshots) */
contextState?: ContextEngineState;
}
/**
* Data structure for resuming an existing session.
*/
@@ -137,4 +146,5 @@ export interface PartialMetadataRecord {
memoryScratchpad?: MemoryScratchpad;
directories?: string[];
kind?: 'main' | 'subagent';
contextState?: ContextEngineState;
}