fix(context): revert structural snapshot rehydration and rely on baseline gc

This commit is contained in:
Your Name
2026-05-13 00:37:25 +00:00
parent 65d4bdfc24
commit cdd482c2e0
6 changed files with 129 additions and 155 deletions
@@ -4,7 +4,6 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { createStateSnapshotHydrationProcessor } from '../processors/stateSnapshotHydrationProcessor.js';
import type {
AsyncPipelineDef,
ContextManagementConfig,
@@ -89,29 +88,6 @@ export const generalistProfile: ContextProfile = {
): PipelineDef[] =>
// Helper to merge default options with dynamically loaded processorOptions by ID
[
{
name: 'Initialization Hydration',
triggers: ['initialization'],
processors: [
createStateSnapshotHydrationProcessor('StateSnapshotHydration', env, {
target: (() => {
const res = resolveProcessorOptions(config, 'StateSnapshotSync', {
target: 'max',
maxStateTokens: 4000,
maxSummaryTurns: 5,
}).target;
if (
res === 'incremental' ||
res === 'freeNTokens' ||
res === 'max'
) {
return res;
}
return undefined;
})(),
}),
],
},
{
name: 'Immediate Sanitization',
triggers: ['new_message'],
+1 -16
View File
@@ -304,6 +304,7 @@ export class ContextManager {
const hotStartPromise = (async () => {
if (!this.hasPerformedHotStart) {
this.hasPerformedHotStart = true;
if (this.buffer.nodes.length > 0) {
const nodesForHotStart = [...this.buffer.nodes, ...previewNodes];
await this.performHotStartCalibration(nodesForHotStart, abortSignal);
@@ -439,21 +440,5 @@ export class ContextManager {
async restoreState(state: ContextEngineState): Promise<void> {
if (!state) return;
SnapshotStateHelper.restoreState(state, this.env.inbox);
// Explicitly run the initialization trigger to eagerly splice the restored snapshot
// into the graph *before* the first user message creates cache artifacts.
const nodes = this.buffer.nodes;
const hydratedNodes = await this.orchestrator.executeTriggerSync(
'initialization',
nodes,
new Set(), // No trigger targets needed, it just reads the inbox
);
// Create a pseudo-processor result to apply the hydration without duplicating logic
this.buffer = this.buffer.applyProcessorResult(
'StateSnapshotHydration',
nodes,
hydratedNodes,
);
}
}
@@ -36,5 +36,61 @@ describe('ContextGraphBuilder', () => {
expect((nodes[1] as BaseConcreteNode).payload.text).toBe('Reply 1');
expect((nodes[2] as BaseConcreteNode).payload.text).toBe('Message 2');
});
it('should generate completely deterministic graph structure and UUIDs across JSON serialization cycles', () => {
const complexHistory: Content[] = [
{ role: 'user', parts: [{ text: 'Step 1: complex analysis' }] },
{
role: 'model',
parts: [
{ text: 'Thinking about the tool to use.' },
{
functionCall: {
name: 'fetch_data',
args: { query: 'test data' },
},
},
],
},
{
role: 'user',
parts: [
{
functionResponse: {
name: 'fetch_data',
response: { status: 'success', data: [1, 2, 3] },
},
},
],
},
{ role: 'model', parts: [{ text: 'Analysis complete.' }] },
];
// 1. Initial Graph Generation
const builder1 = new ContextGraphBuilder();
const nodes1 = builder1.processHistory(complexHistory);
// 2. Serialize and Deserialize (Simulating saving and loading from disk)
const serializedHistory = JSON.stringify(complexHistory);
const parsedHistory = JSON.parse(serializedHistory) as Content[];
// 3. Second Graph Generation from parsed JSON
const builder2 = new ContextGraphBuilder();
const nodes2 = builder2.processHistory(parsedHistory);
// Assertion: The arrays must be completely identical, including all generated UUIDs
expect(nodes1).toEqual(nodes2);
// Sanity check to ensure IDs are actually populated and consistent
expect(nodes1.length).toBeGreaterThan(0);
nodes1.forEach((node, index) => {
expect(node.id).toBeDefined();
expect(node.id).toBe(nodes2[index].id);
if ('turnId' in node) {
expect(node.turnId).toBeDefined();
expect(node.turnId).toBe((nodes2[index] as BaseConcreteNode).turnId);
}
});
});
});
});
+8 -3
View File
@@ -123,7 +123,12 @@ export function getStableId(
}
if (!id) {
id = randomUUID();
if (turnSalt && partIdx === -1) {
// Fallback for Turn objects (msg) since they don't have parts or content to hash directly here
id = `turn_${turnSalt}`;
} else {
id = randomUUID();
}
}
nodeIdentityMap.set(obj, id);
@@ -189,7 +194,7 @@ export class ContextGraphBuilder {
apiId || getStableId(part, this.nodeIdentityMap, turnSalt, partIdx);
const node: ConcreteNode = {
id,
timestamp: Date.now(),
timestamp: 0, // Using 0 for deterministic structural equality. Actual time is applied by orchestrator.
type: isFunctionResponsePart(part)
? NodeType.TOOL_EXECUTION
: NodeType.USER_PROMPT,
@@ -210,7 +215,7 @@ export class ContextGraphBuilder {
apiId || getStableId(part, this.nodeIdentityMap, turnSalt, partIdx);
const node: ConcreteNode = {
id,
timestamp: Date.now(),
timestamp: 0, // Using 0 for deterministic structural equality. Actual time is applied by orchestrator.
type: isFunctionCallPart(part)
? NodeType.TOOL_EXECUTION
: NodeType.AGENT_THOUGHT,
@@ -1,111 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { randomUUID } from 'node:crypto';
import type { JSONSchemaType } from 'ajv';
import type { ContextProcessor, ProcessArgs } from '../pipeline.js';
import type { ContextEnvironment } from '../pipeline/environment.js';
import { type Snapshot, NodeType } from '../graph/types.js';
export interface StateSnapshotHydrationProcessorOptions {
target?: 'incremental' | 'freeNTokens' | 'max';
}
export const StateSnapshotHydrationProcessorOptionsSchema: JSONSchemaType<StateSnapshotHydrationProcessorOptions> =
{
type: 'object',
properties: {
target: {
type: 'string',
enum: ['incremental', 'freeNTokens', 'max'],
nullable: true,
},
},
required: [],
};
export function createStateSnapshotHydrationProcessor(
id: string,
env: ContextEnvironment,
options: StateSnapshotHydrationProcessorOptions,
): ContextProcessor {
return {
id,
name: 'StateSnapshotHydrationProcessor',
process: async ({ targets, inbox }: ProcessArgs) => {
if (targets.length === 0) {
return targets;
}
// Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate'
const strategy = options.target ?? 'max';
const expectedType =
strategy === 'incremental' ? 'point-in-time' : 'accumulate';
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{
newText: string;
consumedIds: string[];
type: string;
timestamp: number;
}>('PROPOSED_SNAPSHOT');
if (proposedSnapshots.length > 0) {
// Filter for the snapshot type that matches our processor mode
const matchingSnapshots = proposedSnapshots.filter(
(s) => s.payload.type === expectedType,
);
// Sort by newest timestamp first (we want the most accumulated snapshot)
const sorted = [...matchingSnapshots].sort(
(a, b) => b.timestamp - a.timestamp,
);
for (const proposed of sorted) {
const { consumedIds, newText, timestamp } = proposed.payload;
// Verify all consumed IDs still exist sequentially in targets
const targetIds = new Set(targets.map((t) => t.id));
const isValid = consumedIds.every((id) => targetIds.has(id));
if (isValid) {
// If valid, apply it!
const newId = randomUUID();
const snapshotNode: Snapshot = {
id: newId,
turnId: newId,
type: NodeType.SNAPSHOT,
timestamp: timestamp ?? Date.now(),
role: 'user',
payload: { text: newText },
abstractsIds: consumedIds,
};
// Remove the consumed nodes and insert the snapshot at the earliest index
const returnedNodes = targets.filter(
(t) => !consumedIds.includes(t.id),
);
const firstRemovedIdx = targets.findIndex((t) =>
consumedIds.includes(t.id),
);
if (firstRemovedIdx !== -1) {
const idx = Math.max(0, firstRemovedIdx);
returnedNodes.splice(idx, 0, snapshotNode);
} else {
returnedNodes.unshift(snapshotNode);
}
inbox.consume(proposed.id);
return returnedNodes;
}
}
}
return targets;
},
};
}
@@ -53,13 +53,76 @@ export function createStateSnapshotProcessor(
return {
id,
name: 'StateSnapshotProcessor',
process: async ({ targets }: ProcessArgs) => {
process: async ({ targets, inbox }: ProcessArgs) => {
if (targets.length === 0) {
return targets;
}
// Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate'
const strategy = options.target ?? 'max';
const expectedType =
strategy === 'incremental' ? 'point-in-time' : 'accumulate';
// 1. Check Inbox for a completed Snapshot (The Fast Path)
const proposedSnapshots = inbox.getMessages<{
newText: string;
consumedIds: string[];
type: string;
timestamp: number;
}>('PROPOSED_SNAPSHOT');
if (proposedSnapshots.length > 0) {
// Filter for the snapshot type that matches our processor mode
const matchingSnapshots = proposedSnapshots.filter(
(s) => s.payload.type === expectedType,
);
// Sort by newest timestamp first (we want the most accumulated snapshot)
const sorted = [...matchingSnapshots].sort(
(a, b) => b.timestamp - a.timestamp,
);
for (const proposed of sorted) {
const { consumedIds, newText, timestamp } = proposed.payload;
// Verify all consumed IDs still exist sequentially in targets
const targetIds = new Set(targets.map((t) => t.id));
const isValid = consumedIds.every((id) => targetIds.has(id));
if (isValid) {
// If valid, apply it!
const newId = randomUUID();
const snapshotNode: Snapshot = {
id: newId,
turnId: newId,
type: NodeType.SNAPSHOT,
timestamp: timestamp ?? Date.now(),
role: 'user',
payload: { text: newText },
abstractsIds: consumedIds,
};
// Remove the consumed nodes and insert the snapshot at the earliest index
const returnedNodes = targets.filter(
(t) => !consumedIds.includes(t.id),
);
const firstRemovedIdx = targets.findIndex((t) =>
consumedIds.includes(t.id),
);
if (firstRemovedIdx !== -1) {
const idx = Math.max(0, firstRemovedIdx);
returnedNodes.splice(idx, 0, snapshotNode);
} else {
returnedNodes.unshift(snapshotNode);
}
inbox.consume(proposed.id);
return returnedNodes;
}
}
}
// 2. The Synchronous Backstop (The Slow Path)
let targetTokensToRemove = 0;