From cdd482c2e0e824b1c15209225c0959ba2a1be42f Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 13 May 2026 00:37:25 +0000 Subject: [PATCH] fix(context): revert structural snapshot rehydration and rely on baseline gc --- packages/core/src/context/config/profiles.ts | 24 ---- packages/core/src/context/contextManager.ts | 17 +-- .../core/src/context/graph/toGraph.test.ts | 56 +++++++++ packages/core/src/context/graph/toGraph.ts | 11 +- .../stateSnapshotHydrationProcessor.ts | 111 ------------------ .../processors/stateSnapshotProcessor.ts | 65 +++++++++- 6 files changed, 129 insertions(+), 155 deletions(-) delete mode 100644 packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts diff --git a/packages/core/src/context/config/profiles.ts b/packages/core/src/context/config/profiles.ts index 356b13baf7..b721c01ad0 100644 --- a/packages/core/src/context/config/profiles.ts +++ b/packages/core/src/context/config/profiles.ts @@ -4,7 +4,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { createStateSnapshotHydrationProcessor } from '../processors/stateSnapshotHydrationProcessor.js'; import type { AsyncPipelineDef, ContextManagementConfig, @@ -89,29 +88,6 @@ export const generalistProfile: ContextProfile = { ): PipelineDef[] => // Helper to merge default options with dynamically loaded processorOptions by ID [ - { - name: 'Initialization Hydration', - triggers: ['initialization'], - processors: [ - createStateSnapshotHydrationProcessor('StateSnapshotHydration', env, { - target: (() => { - const res = resolveProcessorOptions(config, 'StateSnapshotSync', { - target: 'max', - maxStateTokens: 4000, - maxSummaryTurns: 5, - }).target; - if ( - res === 'incremental' || - res === 'freeNTokens' || - res === 'max' - ) { - return res; - } - return undefined; - })(), - }), - ], - }, { name: 'Immediate Sanitization', triggers: ['new_message'], diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 9ccccb2d3a..338cad63eb 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -304,6 +304,7 @@ export class ContextManager { const hotStartPromise = (async () => { if (!this.hasPerformedHotStart) { this.hasPerformedHotStart = true; + if (this.buffer.nodes.length > 0) { const nodesForHotStart = [...this.buffer.nodes, ...previewNodes]; await this.performHotStartCalibration(nodesForHotStart, abortSignal); @@ -439,21 +440,5 @@ export class ContextManager { async restoreState(state: ContextEngineState): Promise { if (!state) return; SnapshotStateHelper.restoreState(state, this.env.inbox); - - // Explicitly run the initialization trigger to eagerly splice the restored snapshot - // into the graph *before* the first user message creates cache artifacts. - const nodes = this.buffer.nodes; - const hydratedNodes = await this.orchestrator.executeTriggerSync( - 'initialization', - nodes, - new Set(), // No trigger targets needed, it just reads the inbox - ); - - // Create a pseudo-processor result to apply the hydration without duplicating logic - this.buffer = this.buffer.applyProcessorResult( - 'StateSnapshotHydration', - nodes, - hydratedNodes, - ); } } diff --git a/packages/core/src/context/graph/toGraph.test.ts b/packages/core/src/context/graph/toGraph.test.ts index 4a99202ffc..5f16e0cdf4 100644 --- a/packages/core/src/context/graph/toGraph.test.ts +++ b/packages/core/src/context/graph/toGraph.test.ts @@ -36,5 +36,61 @@ describe('ContextGraphBuilder', () => { expect((nodes[1] as BaseConcreteNode).payload.text).toBe('Reply 1'); expect((nodes[2] as BaseConcreteNode).payload.text).toBe('Message 2'); }); + + it('should generate completely deterministic graph structure and UUIDs across JSON serialization cycles', () => { + const complexHistory: Content[] = [ + { role: 'user', parts: [{ text: 'Step 1: complex analysis' }] }, + { + role: 'model', + parts: [ + { text: 'Thinking about the tool to use.' }, + { + functionCall: { + name: 'fetch_data', + args: { query: 'test data' }, + }, + }, + ], + }, + { + role: 'user', + parts: [ + { + functionResponse: { + name: 'fetch_data', + response: { status: 'success', data: [1, 2, 3] }, + }, + }, + ], + }, + { role: 'model', parts: [{ text: 'Analysis complete.' }] }, + ]; + + // 1. Initial Graph Generation + const builder1 = new ContextGraphBuilder(); + const nodes1 = builder1.processHistory(complexHistory); + + // 2. Serialize and Deserialize (Simulating saving and loading from disk) + const serializedHistory = JSON.stringify(complexHistory); + const parsedHistory = JSON.parse(serializedHistory) as Content[]; + + // 3. Second Graph Generation from parsed JSON + const builder2 = new ContextGraphBuilder(); + const nodes2 = builder2.processHistory(parsedHistory); + + // Assertion: The arrays must be completely identical, including all generated UUIDs + expect(nodes1).toEqual(nodes2); + + // Sanity check to ensure IDs are actually populated and consistent + expect(nodes1.length).toBeGreaterThan(0); + nodes1.forEach((node, index) => { + expect(node.id).toBeDefined(); + expect(node.id).toBe(nodes2[index].id); + if ('turnId' in node) { + expect(node.turnId).toBeDefined(); + expect(node.turnId).toBe((nodes2[index] as BaseConcreteNode).turnId); + } + }); + }); }); }); diff --git a/packages/core/src/context/graph/toGraph.ts b/packages/core/src/context/graph/toGraph.ts index f901f76659..f47aa8c7ee 100644 --- a/packages/core/src/context/graph/toGraph.ts +++ b/packages/core/src/context/graph/toGraph.ts @@ -123,7 +123,12 @@ export function getStableId( } if (!id) { - id = randomUUID(); + if (turnSalt && partIdx === -1) { + // Fallback for Turn objects (msg) since they don't have parts or content to hash directly here + id = `turn_${turnSalt}`; + } else { + id = randomUUID(); + } } nodeIdentityMap.set(obj, id); @@ -189,7 +194,7 @@ export class ContextGraphBuilder { apiId || getStableId(part, this.nodeIdentityMap, turnSalt, partIdx); const node: ConcreteNode = { id, - timestamp: Date.now(), + timestamp: 0, // Using 0 for deterministic structural equality. Actual time is applied by orchestrator. type: isFunctionResponsePart(part) ? NodeType.TOOL_EXECUTION : NodeType.USER_PROMPT, @@ -210,7 +215,7 @@ export class ContextGraphBuilder { apiId || getStableId(part, this.nodeIdentityMap, turnSalt, partIdx); const node: ConcreteNode = { id, - timestamp: Date.now(), + timestamp: 0, // Using 0 for deterministic structural equality. Actual time is applied by orchestrator. type: isFunctionCallPart(part) ? NodeType.TOOL_EXECUTION : NodeType.AGENT_THOUGHT, diff --git a/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts b/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts deleted file mode 100644 index cd3a690b25..0000000000 --- a/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts +++ /dev/null @@ -1,111 +0,0 @@ -/** - * @license - * Copyright 2026 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ -import { randomUUID } from 'node:crypto'; -import type { JSONSchemaType } from 'ajv'; -import type { ContextProcessor, ProcessArgs } from '../pipeline.js'; -import type { ContextEnvironment } from '../pipeline/environment.js'; -import { type Snapshot, NodeType } from '../graph/types.js'; - -export interface StateSnapshotHydrationProcessorOptions { - target?: 'incremental' | 'freeNTokens' | 'max'; -} - -export const StateSnapshotHydrationProcessorOptionsSchema: JSONSchemaType = - { - type: 'object', - properties: { - target: { - type: 'string', - enum: ['incremental', 'freeNTokens', 'max'], - nullable: true, - }, - }, - required: [], - }; - -export function createStateSnapshotHydrationProcessor( - id: string, - env: ContextEnvironment, - options: StateSnapshotHydrationProcessorOptions, -): ContextProcessor { - return { - id, - name: 'StateSnapshotHydrationProcessor', - process: async ({ targets, inbox }: ProcessArgs) => { - if (targets.length === 0) { - return targets; - } - - // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' - const strategy = options.target ?? 'max'; - const expectedType = - strategy === 'incremental' ? 'point-in-time' : 'accumulate'; - - // 1. Check Inbox for a completed Snapshot (The Fast Path) - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - timestamp: number; - }>('PROPOSED_SNAPSHOT'); - - if (proposedSnapshots.length > 0) { - // Filter for the snapshot type that matches our processor mode - const matchingSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === expectedType, - ); - - // Sort by newest timestamp first (we want the most accumulated snapshot) - const sorted = [...matchingSnapshots].sort( - (a, b) => b.timestamp - a.timestamp, - ); - - for (const proposed of sorted) { - const { consumedIds, newText, timestamp } = proposed.payload; - - // Verify all consumed IDs still exist sequentially in targets - const targetIds = new Set(targets.map((t) => t.id)); - const isValid = consumedIds.every((id) => targetIds.has(id)); - - if (isValid) { - // If valid, apply it! - const newId = randomUUID(); - - const snapshotNode: Snapshot = { - id: newId, - turnId: newId, - type: NodeType.SNAPSHOT, - timestamp: timestamp ?? Date.now(), - role: 'user', - payload: { text: newText }, - abstractsIds: consumedIds, - }; - - // Remove the consumed nodes and insert the snapshot at the earliest index - const returnedNodes = targets.filter( - (t) => !consumedIds.includes(t.id), - ); - const firstRemovedIdx = targets.findIndex((t) => - consumedIds.includes(t.id), - ); - - if (firstRemovedIdx !== -1) { - const idx = Math.max(0, firstRemovedIdx); - returnedNodes.splice(idx, 0, snapshotNode); - } else { - returnedNodes.unshift(snapshotNode); - } - - inbox.consume(proposed.id); - return returnedNodes; - } - } - } - - return targets; - }, - }; -} diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index df6b7f0a52..3ad3001a16 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -53,13 +53,76 @@ export function createStateSnapshotProcessor( return { id, name: 'StateSnapshotProcessor', - process: async ({ targets }: ProcessArgs) => { + process: async ({ targets, inbox }: ProcessArgs) => { if (targets.length === 0) { return targets; } // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' const strategy = options.target ?? 'max'; + const expectedType = + strategy === 'incremental' ? 'point-in-time' : 'accumulate'; + + // 1. Check Inbox for a completed Snapshot (The Fast Path) + const proposedSnapshots = inbox.getMessages<{ + newText: string; + consumedIds: string[]; + type: string; + timestamp: number; + }>('PROPOSED_SNAPSHOT'); + + if (proposedSnapshots.length > 0) { + // Filter for the snapshot type that matches our processor mode + const matchingSnapshots = proposedSnapshots.filter( + (s) => s.payload.type === expectedType, + ); + + // Sort by newest timestamp first (we want the most accumulated snapshot) + const sorted = [...matchingSnapshots].sort( + (a, b) => b.timestamp - a.timestamp, + ); + + for (const proposed of sorted) { + const { consumedIds, newText, timestamp } = proposed.payload; + + // Verify all consumed IDs still exist sequentially in targets + const targetIds = new Set(targets.map((t) => t.id)); + const isValid = consumedIds.every((id) => targetIds.has(id)); + + if (isValid) { + // If valid, apply it! + const newId = randomUUID(); + + const snapshotNode: Snapshot = { + id: newId, + turnId: newId, + type: NodeType.SNAPSHOT, + timestamp: timestamp ?? Date.now(), + role: 'user', + payload: { text: newText }, + abstractsIds: consumedIds, + }; + + // Remove the consumed nodes and insert the snapshot at the earliest index + const returnedNodes = targets.filter( + (t) => !consumedIds.includes(t.id), + ); + const firstRemovedIdx = targets.findIndex((t) => + consumedIds.includes(t.id), + ); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + inbox.consume(proposed.id); + return returnedNodes; + } + } + } // 2. The Synchronous Backstop (The Slow Path) let targetTokensToRemove = 0;