From abd46a84314ef023a78de36dede2c6f5fb5a0e29 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 10 Apr 2026 16:20:41 +0000 Subject: [PATCH] formatting --- docs/context-manager-async-mutations.md | 48 ++- packages/core/src/config/config.ts | 12 +- .../src/context/config/SidecarLoader.test.ts | 19 +- .../core/src/context/config/SidecarLoader.ts | 8 +- packages/core/src/context/config/profiles.ts | 68 +++- packages/core/src/context/config/registry.ts | 4 +- packages/core/src/context/config/schema.ts | 49 +-- packages/core/src/context/config/types.ts | 4 +- .../src/context/context-manager-v0-design.md | 14 +- packages/core/src/context/contextManager.ts | 3 +- packages/core/src/context/pipeline.ts | 2 +- .../src/context/pipeline/orchestrator.test.ts | 40 ++- .../core/src/context/pipeline/orchestrator.ts | 40 ++- .../blobDegradationProcessor.test.ts | 15 +- .../processors/blobDegradationProcessor.ts | 184 ++++++----- .../processors/historyTruncationProcessor.ts | 50 +-- .../nodeDistillationProcessor.test.ts | 20 +- .../processors/nodeDistillationProcessor.ts | 203 ++++++------ .../nodeTruncationProcessor.test.ts | 20 +- .../processors/nodeTruncationProcessor.ts | 133 ++++---- .../rollingSummaryProcessor.test.ts | 34 +- .../processors/rollingSummaryProcessor.ts | 126 ++++---- .../stateSnapshotAsyncProcessor.test.ts | 28 +- .../processors/stateSnapshotAsyncProcessor.ts | 127 ++++---- .../processors/stateSnapshotProcessor.test.ts | 36 ++- .../processors/stateSnapshotProcessor.ts | 227 +++++++------ .../processors/toolMaskingProcessor.ts | 300 +++++++++--------- .../context/system-tests/SimulationHarness.ts | 5 +- .../system-tests/lifecycle.golden.test.ts | 46 ++- .../src/context/testing/contextTestUtils.ts | 3 +- 30 files changed, 1035 insertions(+), 833 deletions(-) diff --git a/docs/context-manager-async-mutations.md b/docs/context-manager-async-mutations.md index a2fc5d38f9..db4545b4b5 100644 --- a/docs/context-manager-async-mutations.md +++ b/docs/context-manager-async-mutations.md @@ -1,28 +1,52 @@ # Async Context Mutations (V1 Architecture) ## The Problem -In V0, the \`ContextManager\` processes LLM inputs sequentially and synchronously. Processors like \`NodeTruncation\` can safely mutate the graph because they hold an exclusive lock on the context state. -However, operations like \`StateSnapshotAsyncProcessor\` take a long time to run (distilling thousands of tokens). If they run synchronously, they block the user from interacting with the agent. If they run asynchronously in the background, by the time they finish, the active context graph has likely moved on (new messages, tool calls, or other truncations have occurred). +In V0, the \`ContextManager\` processes LLM inputs sequentially and +synchronously. Processors like \`NodeTruncation\` can safely mutate the graph +because they hold an exclusive lock on the context state. + +However, operations like \`StateSnapshotAsyncProcessor\` take a long time to run +(distilling thousands of tokens). If they run synchronously, they block the user +from interacting with the agent. If they run asynchronously in the background, +by the time they finish, the active context graph has likely moved on (new +messages, tool calls, or other truncations have occurred). ## The V1 Solution: Ancestral Replacement (Optimistic Concurrency) -To allow async background pipelines to mutate the live context graph safely, we use an Optimistic Concurrency Control mechanism called **Ancestral Replacement**. +To allow async background pipelines to mutate the live context graph safely, we +use an Optimistic Concurrency Control mechanism called **Ancestral +Replacement**. ### 1. Proof of Claim -When an \`AsyncContextProcessor\` triggers, it is handed a \`ProcessArgs\` containing a snapshot of the graph (the targets it was asked to process). -The processor records the specific IDs of the \`ConcreteNode\`s it is reading. This is its "Proof of Claim". + +When an \`AsyncContextProcessor\` triggers, it is handed a \`ProcessArgs\` +containing a snapshot of the graph (the targets it was asked to process). The +processor records the specific IDs of the \`ConcreteNode\`s it is reading. This +is its "Proof of Claim". ### 2. Background Execution -The processor runs in the background, completely detached from the live graph. It synthesizes a new state (e.g., a summarized snapshot node). + +The processor runs in the background, completely detached from the live graph. +It synthesizes a new state (e.g., a summarized snapshot node). ### 3. The Commit Phase -When the processor finishes, it returns its proposed mutations (an array of new \`ConcreteNode\`s that specify which old nodes they replace via the \`replacesId\` property). + +When the processor finishes, it returns its proposed mutations (an array of new +\`ConcreteNode\`s that specify which old nodes they replace via the +\`replacesId\` property). The Orchestrator then attempts to "rebase" this mutation into the live graph: -1. It looks at the live graph. -2. It checks: *Do all the original nodes (the Proof of Claim) still exist unmodified in the live graph?* -3. **If Yes (Clean Fast-Forward):** The orchestrator deletes the old nodes and inserts the new synthesized nodes. -4. **If No (Conflict):** If *any* of the original nodes were deleted or modified by another processor while the async task was running, the orchestrator rejects the async mutation entirely (or handles it via a conflict resolution strategy). -This guarantees that async pipelines can never corrupt the context state by overwriting newer information with stale data. +1. It looks at the live graph. +2. It checks: _Do all the original nodes (the Proof of Claim) still exist + unmodified in the live graph?_ +3. **If Yes (Clean Fast-Forward):** The orchestrator deletes the old nodes and + inserts the new synthesized nodes. +4. **If No (Conflict):** If _any_ of the original nodes were deleted or modified + by another processor while the async task was running, the orchestrator + rejects the async mutation entirely (or handles it via a conflict resolution + strategy). + +This guarantees that async pipelines can never corrupt the context state by +overwriting newer information with stale data. diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index b145281754..bd9bb378fd 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -699,7 +699,7 @@ export interface ConfigParameters { experimentalJitContext?: boolean; autoDistillation?: boolean; experimentalMemoryManager?: boolean; - experimentalContextSidecarConfig?: string; + experimentalContextManagementConfig?: string; experimentalAgentHistoryTruncation?: boolean; experimentalAgentHistoryTruncationThreshold?: number; experimentalAgentHistoryRetainedMessages?: number; @@ -941,7 +941,7 @@ export class Config implements McpContext, AgentLoopContext { private readonly adminSkillsEnabled: boolean; private readonly experimentalJitContext: boolean; private readonly experimentalMemoryManager: boolean; - private readonly experimentalContextSidecarConfig?: string; + private readonly experimentalContextManagementConfig?: string; private readonly memoryBoundaryMarkers: readonly string[]; private readonly topicUpdateNarration: boolean; private readonly disableLLMCorrection: boolean; @@ -1153,8 +1153,8 @@ export class Config implements McpContext, AgentLoopContext { this.experimentalJitContext = params.experimentalJitContext ?? false; this.experimentalMemoryManager = params.experimentalMemoryManager ?? false; - this.experimentalContextSidecarConfig = - params.experimentalContextSidecarConfig; + this.experimentalContextManagementConfig = + params.experimentalContextManagementConfig; this.memoryBoundaryMarkers = params.memoryBoundaryMarkers ?? ['.git']; this.contextManagement = { enabled: params.contextManagement?.enabled ?? false, @@ -2417,8 +2417,8 @@ export class Config implements McpContext, AgentLoopContext { return this.experimentalMemoryManager; } - getExperimentalContextSidecarConfig(): string | undefined { - return this.experimentalContextSidecarConfig; + getExperimentalContextManagementConfig(): string | undefined { + return this.experimentalContextManagementConfig; } getContextManagementConfig(): ContextManagementConfig { diff --git a/packages/core/src/context/config/SidecarLoader.test.ts b/packages/core/src/context/config/SidecarLoader.test.ts index 1779dcc557..e0be7b6a37 100644 --- a/packages/core/src/context/config/SidecarLoader.test.ts +++ b/packages/core/src/context/config/SidecarLoader.test.ts @@ -18,11 +18,14 @@ describe('SidecarLoader (Fake FS)', () => { beforeEach(() => { fileSystem = new InMemoryFileSystem(); registry = new SidecarRegistry(); - registry.registerProcessor({ id: 'NodeTruncation', schema: { type: 'object', properties: { maxTokens: { type: 'number' } } }}); + registry.registerProcessor({ + id: 'NodeTruncation', + schema: { type: 'object', properties: { maxTokens: { type: 'number' } } }, + }); }); const mockConfig = { - getExperimentalContextSidecarConfig: () => '/path/to/sidecar.json', + getExperimentalContextManagementConfig: () => '/path/to/sidecar.json', } as unknown as Config; it('returns default profile if file does not exist', () => { @@ -49,9 +52,9 @@ describe('SidecarLoader (Fake FS)', () => { processorOptions: { myTruncation: { type: 'NodeTruncation', - options: { maxTokens: 500 } - } - } + options: { maxTokens: 500 }, + }, + }, }; fileSystem.setFile('/path/to/sidecar.json', JSON.stringify(validConfig)); const result = SidecarLoader.fromConfig(mockConfig, registry, fileSystem); @@ -65,9 +68,9 @@ describe('SidecarLoader (Fake FS)', () => { processorOptions: { myTruncation: { type: 'NodeTruncation', - options: { maxTokens: "this should be a number" } - } - } + options: { maxTokens: 'this should be a number' }, + }, + }, }; fileSystem.setFile('/path/to/sidecar.json', JSON.stringify(invalidConfig)); expect(() => diff --git a/packages/core/src/context/config/SidecarLoader.ts b/packages/core/src/context/config/SidecarLoader.ts index 8a3ac06833..8b34e847bc 100644 --- a/packages/core/src/context/config/SidecarLoader.ts +++ b/packages/core/src/context/config/SidecarLoader.ts @@ -45,7 +45,7 @@ export class SidecarLoader { getSidecarConfigSchema(registry), parsed, ); - + if (validationError) { throw new Error( `Invalid sidecar configuration in ${sidecarPath}. Validation error: ${validationError}`, @@ -64,7 +64,9 @@ export class SidecarLoader { config: { ...defaultSidecarProfile.config, ...(validConfig.budget ? { budget: validConfig.budget } : {}), - ...(validConfig.processorOptions ? { processorOptions: validConfig.processorOptions } : {}) + ...(validConfig.processorOptions + ? { processorOptions: validConfig.processorOptions } + : {}), }, }; } @@ -78,7 +80,7 @@ export class SidecarLoader { registry: SidecarRegistry, fileSystem: IFileSystem = new NodeFileSystem(), ): ContextProfile { - const sidecarPath = config.getExperimentalContextSidecarConfig(); + const sidecarPath = config.getExperimentalContextManagementConfig(); if (sidecarPath && fileSystem.existsSync(sidecarPath)) { const size = fileSystem.statSyncSize(sidecarPath); diff --git a/packages/core/src/context/config/profiles.ts b/packages/core/src/context/config/profiles.ts index cea39819ab..ec2ae032e9 100644 --- a/packages/core/src/context/config/profiles.ts +++ b/packages/core/src/context/config/profiles.ts @@ -17,8 +17,14 @@ import { createStateSnapshotAsyncProcessor } from '../processors/stateSnapshotAs export interface ContextProfile { config: SidecarConfig; - buildPipelines: (env: ContextEnvironment, config?: SidecarConfig) => PipelineDef[]; - buildAsyncPipelines: (env: ContextEnvironment, config?: SidecarConfig) => AsyncPipelineDef[]; + buildPipelines: ( + env: ContextEnvironment, + config?: SidecarConfig, + ) => PipelineDef[]; + buildAsyncPipelines: ( + env: ContextEnvironment, + config?: SidecarConfig, + ) => AsyncPipelineDef[]; } /** @@ -32,13 +38,19 @@ export const defaultSidecarProfile: ContextProfile = { maxTokens: 150000, }, }, - - buildPipelines: (env: ContextEnvironment, config?: SidecarConfig): PipelineDef[] => { + + buildPipelines: ( + env: ContextEnvironment, + config?: SidecarConfig, + ): PipelineDef[] => { // Helper to merge default options with dynamically loaded processorOptions by ID const getOptions = (id: string, defaultOptions: T): T => { if (config?.processorOptions && config.processorOptions[id]) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - return { ...defaultOptions, ...(config.processorOptions[id].options as T) }; + return { + ...defaultOptions, + ...(config.processorOptions[id].options as T), + }; } return defaultOptions; }; @@ -48,7 +60,11 @@ export const defaultSidecarProfile: ContextProfile = { name: 'Immediate Sanitization', triggers: ['new_message'], processors: [ - createToolMaskingProcessor('ToolMasking', env, getOptions('ToolMasking', { stringLengthThresholdTokens: 8000 })), + createToolMaskingProcessor( + 'ToolMasking', + env, + getOptions('ToolMasking', { stringLengthThresholdTokens: 8000 }), + ), createBlobDegradationProcessor('BlobDegradation', env), // No options ], }, @@ -56,25 +72,43 @@ export const defaultSidecarProfile: ContextProfile = { name: 'Normalization', triggers: ['retained_exceeded'], processors: [ - createNodeTruncationProcessor('NodeTruncation', env, getOptions('NodeTruncation', { maxTokensPerNode: 3000 })), - createNodeDistillationProcessor('NodeDistillation', env, getOptions('NodeDistillation', { nodeThresholdTokens: 5000 })), + createNodeTruncationProcessor( + 'NodeTruncation', + env, + getOptions('NodeTruncation', { maxTokensPerNode: 3000 }), + ), + createNodeDistillationProcessor( + 'NodeDistillation', + env, + getOptions('NodeDistillation', { nodeThresholdTokens: 5000 }), + ), ], }, { name: 'Emergency Backstop', triggers: ['gc_backstop'], processors: [ - createStateSnapshotProcessor('StateSnapshotSync', env, getOptions('StateSnapshotSync', { target: 'max' })), + createStateSnapshotProcessor( + 'StateSnapshotSync', + env, + getOptions('StateSnapshotSync', { target: 'max' }), + ), ], }, ]; }, - buildAsyncPipelines: (env: ContextEnvironment, config?: SidecarConfig): AsyncPipelineDef[] => { + buildAsyncPipelines: ( + env: ContextEnvironment, + config?: SidecarConfig, + ): AsyncPipelineDef[] => { const getOptions = (id: string, defaultOptions: T): T => { if (config?.processorOptions && config.processorOptions[id]) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - return { ...defaultOptions, ...(config.processorOptions[id].options as T) }; + return { + ...defaultOptions, + ...(config.processorOptions[id].options as T), + }; } return defaultOptions; }; @@ -84,9 +118,13 @@ export const defaultSidecarProfile: ContextProfile = { name: 'Async Background GC', triggers: ['nodes_aged_out'], processors: [ - createStateSnapshotAsyncProcessor('StateSnapshotAsync', env, getOptions('StateSnapshotAsync', { type: 'accumulate' })) - ] - } + createStateSnapshotAsyncProcessor( + 'StateSnapshotAsync', + env, + getOptions('StateSnapshotAsync', { type: 'accumulate' }), + ), + ], + }, ]; - } + }, }; diff --git a/packages/core/src/context/config/registry.ts b/packages/core/src/context/config/registry.ts index 65b0b60dac..1f051c4f34 100644 --- a/packages/core/src/context/config/registry.ts +++ b/packages/core/src/context/config/registry.ts @@ -24,8 +24,8 @@ export class SidecarRegistry { return this.processors.get(id)?.schema; } - getSchemaDefs(): Array<{ id: string; schema: object }> { - const defs: Array<{ id: string; schema: object }> = []; + getSchemaDefs(): ContextProcessorDef[] { + const defs = []; for (const def of this.processors.values()) { if (def.schema) defs.push({ id: def.id, schema: def.schema }); } diff --git a/packages/core/src/context/config/schema.ts b/packages/core/src/context/config/schema.ts index 6ca0df6123..f73bf6e907 100644 --- a/packages/core/src/context/config/schema.ts +++ b/packages/core/src/context/config/schema.ts @@ -10,25 +10,35 @@ export function getSidecarConfigSchema(registry?: SidecarRegistry) { // If a registry is provided, we can deeply validate processor overrides. // We do this by generating a `oneOf` list that matches the `type` discriminator // to the specific processor `options` schema. - const processorOptionSchemas = registry ? registry.getSchemaDefs().map(def => ({ - type: 'object', - required: ['type', 'options'], - properties: { - type: { const: def.id }, - options: def.schema - } - })) : []; - - const processorOptionsMapping = processorOptionSchemas.length > 0 - ? { oneOf: processorOptionSchemas } - : { + const processorOptionSchemas = registry + ? registry.getSchemaDefs().map((def) => ({ type: 'object', required: ['type', 'options'], properties: { - type: { type: 'string', description: 'The registry type of the processor (e.g. NodeTruncation)' }, - options: { type: 'object', description: 'The hyperparameter overrides' } - } - }; + type: { const: def.id }, + options: def.schema, + }, + })) + : []; + + const processorOptionsMapping = + processorOptionSchemas.length > 0 + ? { oneOf: processorOptionSchemas } + : { + type: 'object', + required: ['type', 'options'], + properties: { + type: { + type: 'string', + description: + 'The registry type of the processor (e.g. NodeTruncation)', + }, + options: { + type: 'object', + description: 'The hyperparameter overrides', + }, + }, + }; return { $schema: 'http://json-schema.org/draft-07/schema#', @@ -55,9 +65,10 @@ export function getSidecarConfigSchema(registry?: SidecarRegistry) { }, processorOptions: { type: 'object', - description: 'Named hyperparameter configurations for ContextProcessors and AsyncProcessors.', - additionalProperties: processorOptionsMapping - } + description: + 'Named hyperparameter configurations for ContextProcessors and AsyncProcessors.', + additionalProperties: processorOptionsMapping, + }, }, }; } diff --git a/packages/core/src/context/config/types.ts b/packages/core/src/context/config/types.ts index c04cfc08a2..b9ee476705 100644 --- a/packages/core/src/context/config/types.ts +++ b/packages/core/src/context/config/types.ts @@ -35,8 +35,8 @@ export interface SidecarConfig { retainedTokens: number; maxTokens: number; }; - /** - * Dynamic hyperparameter overrides for individual ContextProcessors and AsyncProcessors. + /** + * Dynamic hyperparameter overrides for individual ContextProcessors and AsyncProcessors. * Keys are named identifiers (e.g. "gentleTruncation"). */ processorOptions?: Record; diff --git a/packages/core/src/context/context-manager-v0-design.md b/packages/core/src/context/context-manager-v0-design.md index 061303f206..9490d3b248 100644 --- a/packages/core/src/context/context-manager-v0-design.md +++ b/packages/core/src/context/context-manager-v0-design.md @@ -79,11 +79,11 @@ To extend the system, developers author two types of plugins: 2. **Context AsyncProcessors:** Inspired by the **Actor Model**, these are event-triggered background jobs designed for isolated, long-running async computations (e.g., asking an LLM to distill 50 turns of history). -3. **Inboxes:** Because the graph can only be mutated synchronously, AsyncProcessors - cannot touch the graph directly (preventing race conditions). Instead, they - drop their results via message-passing into point-in-time snapshots called - _Inboxes_. Processors later read from these Inboxes during a synchronous - pipeline run to safely apply the async processor's findings. +3. **Inboxes:** Because the graph can only be mutated synchronously, + AsyncProcessors cannot touch the graph directly (preventing race conditions). + Instead, they drop their results via message-passing into point-in-time + snapshots called _Inboxes_. Processors later read from these Inboxes during a + synchronous pipeline run to safely apply the async processor's findings. ## 4. Proofs of Construction @@ -227,8 +227,8 @@ class MemoryExtractionProcessor implements ContextProcessor { By treating the Context Graph as an immutable ledger updated only via functional Pipelines, we have eliminated race conditions and untraceable graph corruption. -By utilizing AsyncProcessors and Inboxes, we have safely bridged the gap between slow -LLM analysis and fast, synchronous terminal UI updates. +By utilizing AsyncProcessors and Inboxes, we have safely bridged the gap between +slow LLM analysis and fast, synchronous terminal UI updates. We recognize this is not the final form—future iterations may require strict simple priority to updates, or more advanced generational garbage collection. diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 30918cafff..818b0c6bc0 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -104,7 +104,8 @@ export class ContextManager { if (agedOutNodes.size > 0) { this.eventBus.emitConsolidationNeeded({ nodes: this.buffer.nodes, - targetDeficit: currentTokens - this.sidecar.config.budget.retainedTokens, + targetDeficit: + currentTokens - this.sidecar.config.budget.retainedTokens, targetNodeIds: agedOutNodes, }); } diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index e5043489fd..7cfcbcd4fc 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -40,7 +40,7 @@ export interface ProcessArgs { /** * A ContextProcessor is a pure, closure-based object that returns a modified subset of nodes - * (or the original targets if no changes are needed). + * (or the original targets if no changes are needed). * The Orchestrator will use this to generate a new graph delta. */ export interface ContextProcessor { diff --git a/packages/core/src/context/pipeline/orchestrator.test.ts b/packages/core/src/context/pipeline/orchestrator.test.ts index b64553e614..f3647000a1 100644 --- a/packages/core/src/context/pipeline/orchestrator.test.ts +++ b/packages/core/src/context/pipeline/orchestrator.test.ts @@ -45,7 +45,7 @@ function createModifyingProcessor(id: string): ContextProcessor { }; } return newTargets; - } + }, }; } @@ -56,18 +56,21 @@ function createThrowingProcessor(id: string): ContextProcessor { name: 'Throwing', process: async (): Promise => { throw new Error('Processor failed intentionally'); - } + }, }; } // A mock async processor that signals it ran -function createMockAsyncProcessor(id: string, executeSpy: ReturnType): AsyncContextProcessor { +function createMockAsyncProcessor( + id: string, + executeSpy: ReturnType, +): AsyncContextProcessor { return { id, name: 'MockAsyncProcessor', process: async (args: ProcessArgs) => { executeSpy(args); - } + }, }; } @@ -156,7 +159,10 @@ describe('PipelineOrchestrator (Component)', () => { { name: 'FailingPipeline', triggers: ['new_message'], - processors: [createThrowingProcessor('Thrower'), createModifyingProcessor('Mod')], + processors: [ + createThrowingProcessor('Thrower'), + createModifyingProcessor('Mod'), + ], }, ]; @@ -183,13 +189,21 @@ describe('PipelineOrchestrator (Component)', () => { describe('Asynchronous async pipeline Events', () => { it('routes emitChunkReceived to async pipelines with nodes_added trigger', async () => { const executeSpy = vi.fn(); - const asyncProcessor = createMockAsyncProcessor('MyAsyncProcessor', executeSpy); - - setupOrchestrator([], [{ - name: 'TestAsync', - triggers: ['nodes_added'], - processors: [asyncProcessor] - }]); + const asyncProcessor = createMockAsyncProcessor( + 'MyAsyncProcessor', + executeSpy, + ); + + setupOrchestrator( + [], + [ + { + name: 'TestAsync', + triggers: ['nodes_added'], + processors: [asyncProcessor], + }, + ], + ); const node1 = createDummyNode('ep1', 'USER_PROMPT', 10); const node2 = createDummyNode('ep1', 'AGENT_THOUGHT', 20); @@ -200,7 +214,7 @@ describe('PipelineOrchestrator (Component)', () => { }); // Yield event loop - await new Promise(resolve => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 0)); expect(executeSpy).toHaveBeenCalledTimes(1); const callArgs = executeSpy.mock.calls[0][0]; diff --git a/packages/core/src/context/pipeline/orchestrator.ts b/packages/core/src/context/pipeline/orchestrator.ts index d92e6e3df4..032ab31800 100644 --- a/packages/core/src/context/pipeline/orchestrator.ts +++ b/packages/core/src/context/pipeline/orchestrator.ts @@ -5,7 +5,11 @@ */ import type { ConcreteNode } from '../ir/types.js'; -import type { AsyncPipelineDef, PipelineDef, PipelineTrigger } from '../config/types.js'; +import type { + AsyncPipelineDef, + PipelineDef, + PipelineTrigger, +} from '../config/types.js'; import type { ContextEnvironment, ContextEventBus, @@ -43,7 +47,12 @@ export class PipelineOrchestrator { private setupTriggers() { const bindTriggers = ( pipelines: PipelineDef[] | AsyncPipelineDef[], - executeFn: (pipeline: PipelineDef | AsyncPipelineDef, nodes: readonly ConcreteNode[], targets: ReadonlySet, protectedIds: ReadonlySet) => void + executeFn: ( + pipeline: PipelineDef | AsyncPipelineDef, + nodes: readonly ConcreteNode[], + targets: ReadonlySet, + protectedIds: ReadonlySet, + ) => void, ) => { for (const pipeline of pipelines) { for (const trigger of pipeline.triggers) { @@ -52,7 +61,10 @@ export class PipelineOrchestrator { // Background timers not fully implemented in V1 yet }, trigger.intervalMs); this.activeTimers.push(timer); - } else if (trigger === 'retained_exceeded' || trigger === 'nodes_aged_out') { + } else if ( + trigger === 'retained_exceeded' || + trigger === 'nodes_aged_out' + ) { this.eventBus.onConsolidationNeeded((event) => { executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); }); @@ -67,15 +79,29 @@ export class PipelineOrchestrator { bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - void this.executePipelineAsync(pipeline as PipelineDef, nodes, new Set(targets), new Set(protectedIds)); + void this.executePipelineAsync( + pipeline as PipelineDef, + nodes, + new Set(targets), + new Set(protectedIds), + ); }); bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => { - const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []); + const inboxSnapshot = new InboxSnapshotImpl( + this.env.inbox.getMessages() || [], + ); const targets = nodes.filter((n) => targetIds.has(n.id)); for (const processor of pipeline.processors) { - processor.process({ targets, inbox: inboxSnapshot, buffer: ContextWorkingBufferImpl.initialize(nodes) }) - .catch((e: unknown) => debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e)); + processor + .process({ + targets, + inbox: inboxSnapshot, + buffer: ContextWorkingBufferImpl.initialize(nodes), + }) + .catch((e: unknown) => + debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e), + ); } }); } diff --git a/packages/core/src/context/processors/blobDegradationProcessor.test.ts b/packages/core/src/context/processors/blobDegradationProcessor.test.ts index 5bd5d38dcb..01c4cb9f70 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.test.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.test.ts @@ -23,7 +23,10 @@ describe('BlobDegradationProcessor', () => { // So we make the blob data 200 chars. const fakeData = 'A'.repeat(200); - const processor = createBlobDegradationProcessor('BlobDegradationProcessor', env); + const processor = createBlobDegradationProcessor( + 'BlobDegradationProcessor', + env, + ); const parts: SemanticPart[] = [ { type: 'text', text: 'Hello' }, @@ -61,7 +64,10 @@ describe('BlobDegradationProcessor', () => { it('should degrade all blobs unconditionally', async () => { const env = createMockEnvironment(); - const processor = createBlobDegradationProcessor('BlobDegradationProcessor', env); + const processor = createBlobDegradationProcessor( + 'BlobDegradationProcessor', + env, + ); // Tokens for fileData = 258. // Degraded text = "[File Reference (video/mp4) degraded to text to preserve context window. Original URI: gs://test1]" @@ -91,7 +97,10 @@ describe('BlobDegradationProcessor', () => { it('should return exactly the targets array if targets are empty', async () => { const env = createMockEnvironment(); - const processor = createBlobDegradationProcessor('BlobDegradationProcessor', env); + const processor = createBlobDegradationProcessor( + 'BlobDegradationProcessor', + env, + ); const targets: ConcreteNode[] = []; const result = await processor.process(createMockProcessArgs(targets)); diff --git a/packages/core/src/context/processors/blobDegradationProcessor.ts b/packages/core/src/context/processors/blobDegradationProcessor.ts index f0d9a06d98..f9ff7d6940 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.ts @@ -16,80 +16,77 @@ export function createBlobDegradationProcessor( id, name: 'BlobDegradationProcessor', process: async ({ targets }: ProcessArgs) => { - if (targets.length === 0) { - return targets; - } - - let directoryCreated = false; - - let blobOutputsDir = env.fileSystem.join( - env.projectTempDir, - 'degraded-blobs', - ); - const sessionId = env.sessionId; - if (sessionId) { - blobOutputsDir = env.fileSystem.join( - blobOutputsDir, - `session-${sanitizeFilenamePart(sessionId)}`, - ); - } - - const ensureDir = async () => { - if (!directoryCreated) { - await env.fileSystem.mkdir(blobOutputsDir, { recursive: true }); - directoryCreated = true; + if (targets.length === 0) { + return targets; } - }; - const returnedNodes: ConcreteNode[] = []; + let directoryCreated = false; - // Forward scan, looking for bloated non-text parts to degrade - for (const node of targets) { - switch (node.type) { - case 'USER_PROMPT': { - let modified = false; - const newParts = [...node.semanticParts]; + let blobOutputsDir = env.fileSystem.join( + env.projectTempDir, + 'degraded-blobs', + ); + const sessionId = env.sessionId; + if (sessionId) { + blobOutputsDir = env.fileSystem.join( + blobOutputsDir, + `session-${sanitizeFilenamePart(sessionId)}`, + ); + } - for (let j = 0; j < node.semanticParts.length; j++) { - const part = node.semanticParts[j]; - if (part.type === 'text') continue; + const ensureDir = async () => { + if (!directoryCreated) { + await env.fileSystem.mkdir(blobOutputsDir, { recursive: true }); + directoryCreated = true; + } + }; - let newText = ''; - let tokensSaved = 0; + const returnedNodes: ConcreteNode[] = []; - switch (part.type) { - case 'inline_data': { - await ensureDir(); - const ext = part.mimeType.split('/')[1] || 'bin'; - const fileName = `blob_${Date.now()}_${env.idGenerator.generateId()}.${ext}`; - const filePath = env.fileSystem.join( - blobOutputsDir, - fileName, - ); + // Forward scan, looking for bloated non-text parts to degrade + for (const node of targets) { + switch (node.type) { + case 'USER_PROMPT': { + let modified = false; + const newParts = [...node.semanticParts]; - const buffer = Buffer.from(part.data, 'base64'); - await env.fileSystem.writeFile(filePath, buffer); + for (let j = 0; j < node.semanticParts.length; j++) { + const part = node.semanticParts[j]; + if (part.type === 'text') continue; - const mb = (buffer.byteLength / 1024 / 1024).toFixed(2); - newText = `[Multi-Modal Blob (${part.mimeType}, ${mb}MB) degraded to text to preserve context window. Saved to: ${filePath}]`; + let newText = ''; + let tokensSaved = 0; - const oldTokens = - env.tokenCalculator.estimateTokensForParts([ + switch (part.type) { + case 'inline_data': { + await ensureDir(); + const ext = part.mimeType.split('/')[1] || 'bin'; + const fileName = `blob_${Date.now()}_${env.idGenerator.generateId()}.${ext}`; + const filePath = env.fileSystem.join( + blobOutputsDir, + fileName, + ); + + const buffer = Buffer.from(part.data, 'base64'); + await env.fileSystem.writeFile(filePath, buffer); + + const mb = (buffer.byteLength / 1024 / 1024).toFixed(2); + newText = `[Multi-Modal Blob (${part.mimeType}, ${mb}MB) degraded to text to preserve context window. Saved to: ${filePath}]`; + + const oldTokens = env.tokenCalculator.estimateTokensForParts([ { inlineData: { mimeType: part.mimeType, data: part.data }, }, ]); - const newTokens = - env.tokenCalculator.estimateTokensForParts([ + const newTokens = env.tokenCalculator.estimateTokensForParts([ { text: newText }, ]); - tokensSaved = oldTokens - newTokens; - break; - } - case 'file_data': { - newText = `[File Reference (${part.mimeType}) degraded to text to preserve context window. Original URI: ${part.fileUri}]`; - const oldTokens = - env.tokenCalculator.estimateTokensForParts([ + tokensSaved = oldTokens - newTokens; + break; + } + case 'file_data': { + newText = `[File Reference (${part.mimeType}) degraded to text to preserve context window. Original URI: ${part.fileUri}]`; + const oldTokens = env.tokenCalculator.estimateTokensForParts([ { fileData: { mimeType: part.mimeType, @@ -97,54 +94,53 @@ export function createBlobDegradationProcessor( }, }, ]); - const newTokens = - env.tokenCalculator.estimateTokensForParts([ + const newTokens = env.tokenCalculator.estimateTokensForParts([ { text: newText }, ]); - tokensSaved = oldTokens - newTokens; - break; - } - case 'raw_part': { - newText = `[Unknown Part degraded to text to preserve context window.]`; - const oldTokens = - env.tokenCalculator.estimateTokensForParts([part.part]); - const newTokens = - env.tokenCalculator.estimateTokensForParts([ + tokensSaved = oldTokens - newTokens; + break; + } + case 'raw_part': { + newText = `[Unknown Part degraded to text to preserve context window.]`; + const oldTokens = env.tokenCalculator.estimateTokensForParts([ + part.part, + ]); + const newTokens = env.tokenCalculator.estimateTokensForParts([ { text: newText }, ]); - tokensSaved = oldTokens - newTokens; - break; + tokensSaved = oldTokens - newTokens; + break; + } + default: + break; + } + + if (newText && tokensSaved > 0) { + newParts[j] = { type: 'text', text: newText }; + modified = true; } - default: - break; } - if (newText && tokensSaved > 0) { - newParts[j] = { type: 'text', text: newText }; - modified = true; + if (modified) { + const degradedNode: UserPrompt = { + ...node, + id: env.idGenerator.generateId(), + semanticParts: newParts, + replacesId: node.id, + }; + returnedNodes.push(degradedNode); + } else { + returnedNodes.push(node); } + break; } - - if (modified) { - const degradedNode: UserPrompt = { - ...node, - id: env.idGenerator.generateId(), - semanticParts: newParts, - replacesId: node.id, - }; - returnedNodes.push(degradedNode); - } else { + default: returnedNodes.push(node); - } - break; + break; } - default: - returnedNodes.push(node); - break; } - } - return returnedNodes; + return returnedNodes; }, }; } diff --git a/packages/core/src/context/processors/historyTruncationProcessor.ts b/packages/core/src/context/processors/historyTruncationProcessor.ts index 528f64e099..9c830dca6e 100644 --- a/packages/core/src/context/processors/historyTruncationProcessor.ts +++ b/packages/core/src/context/processors/historyTruncationProcessor.ts @@ -23,35 +23,35 @@ export function createHistoryTruncationProcessor( id, name: 'HistoryTruncationProcessor', process: async ({ targets }: ProcessArgs) => { - // Calculate how many tokens we need to remove based on the configured knob - let targetTokensToRemove = 0; - const strategy = options.target ?? 'max'; + // Calculate how many tokens we need to remove based on the configured knob + let targetTokensToRemove = 0; + const strategy = options.target ?? 'max'; - if (strategy === 'incremental') { - targetTokensToRemove = Infinity; - } else if (strategy === 'freeNTokens') { - targetTokensToRemove = options.freeTokensTarget ?? 0; - if (targetTokensToRemove <= 0) return targets; - } else if (strategy === 'max') { - // 'max' means we remove all targets without stopping early - targetTokensToRemove = Infinity; - } - - let removedTokens = 0; - const keptNodes: ConcreteNode[] = []; - - // The targets are sequentially ordered from oldest to newest. - // We want to delete the oldest targets first. - for (const node of targets) { - if (removedTokens >= targetTokensToRemove) { - keptNodes.push(node); - continue; + if (strategy === 'incremental') { + targetTokensToRemove = Infinity; + } else if (strategy === 'freeNTokens') { + targetTokensToRemove = options.freeTokensTarget ?? 0; + if (targetTokensToRemove <= 0) return targets; + } else if (strategy === 'max') { + // 'max' means we remove all targets without stopping early + targetTokensToRemove = Infinity; } - removedTokens += env.tokenCalculator.getTokenCost(node); - } + let removedTokens = 0; + const keptNodes: ConcreteNode[] = []; - return keptNodes; + // The targets are sequentially ordered from oldest to newest. + // We want to delete the oldest targets first. + for (const node of targets) { + if (removedTokens >= targetTokensToRemove) { + keptNodes.push(node); + continue; + } + + removedTokens += env.tokenCalculator.getTokenCost(node); + } + + return keptNodes; }, }; } diff --git a/packages/core/src/context/processors/nodeDistillationProcessor.test.ts b/packages/core/src/context/processors/nodeDistillationProcessor.test.ts index 74b7cc47a0..9bcc5e070a 100644 --- a/packages/core/src/context/processors/nodeDistillationProcessor.test.ts +++ b/packages/core/src/context/processors/nodeDistillationProcessor.test.ts @@ -25,9 +25,13 @@ describe('NodeDistillationProcessor', () => { llmClient: mockLlmClient, }); - const processor = createNodeDistillationProcessor('NodeDistillationProcessor', env, { - nodeThresholdTokens: 10, - }); + const processor = createNodeDistillationProcessor( + 'NodeDistillationProcessor', + env, + { + nodeThresholdTokens: 10, + }, + ); const longText = 'A'.repeat(50); // 50 chars @@ -94,9 +98,13 @@ describe('NodeDistillationProcessor', () => { llmClient: mockLlmClient, }); - const processor = createNodeDistillationProcessor('NodeDistillationProcessor', env, { - nodeThresholdTokens: 100, // Very high threshold - }); + const processor = createNodeDistillationProcessor( + 'NodeDistillationProcessor', + env, + { + nodeThresholdTokens: 100, // Very high threshold + }, + ); const shortText = 'Short text'; // 10 chars diff --git a/packages/core/src/context/processors/nodeDistillationProcessor.ts b/packages/core/src/context/processors/nodeDistillationProcessor.ts index 93a9ed3a29..fb94ce06b4 100644 --- a/packages/core/src/context/processors/nodeDistillationProcessor.ts +++ b/packages/core/src/context/processors/nodeDistillationProcessor.ts @@ -59,103 +59,96 @@ export function createNodeDistillationProcessor( id, name: 'NodeDistillationProcessor', process: async ({ targets }: ProcessArgs) => { - const semanticConfig = options; - const limitTokens = semanticConfig.nodeThresholdTokens; - const thresholdChars = env.tokenCalculator.tokensToChars(limitTokens); + const semanticConfig = options; + const limitTokens = semanticConfig.nodeThresholdTokens; + const thresholdChars = env.tokenCalculator.tokensToChars(limitTokens); - const returnedNodes: ConcreteNode[] = []; + const returnedNodes: ConcreteNode[] = []; - // Scan the target working buffer and unconditionally apply the configured hyperparameter threshold - for (const node of targets) { - switch (node.type) { - case 'USER_PROMPT': { - let modified = false; - const newParts = [...node.semanticParts]; + // Scan the target working buffer and unconditionally apply the configured hyperparameter threshold + for (const node of targets) { + switch (node.type) { + case 'USER_PROMPT': { + let modified = false; + const newParts = [...node.semanticParts]; - for (let j = 0; j < node.semanticParts.length; j++) { - const part = node.semanticParts[j]; - if (part.type !== 'text') continue; + for (let j = 0; j < node.semanticParts.length; j++) { + const part = node.semanticParts[j]; + if (part.type !== 'text') continue; - if (part.text.length > thresholdChars) { - const summary = await generateSummary( - part.text, - 'User Prompt', - ); - const newTokens = env.tokenCalculator.estimateTokensForParts( - [{ text: summary }], - ); - const oldTokens = env.tokenCalculator.estimateTokensForParts( - [{ text: part.text }], - ); + if (part.text.length > thresholdChars) { + const summary = await generateSummary(part.text, 'User Prompt'); + const newTokens = env.tokenCalculator.estimateTokensForParts([ + { text: summary }, + ]); + const oldTokens = env.tokenCalculator.estimateTokensForParts([ + { text: part.text }, + ]); - if (newTokens < oldTokens) { - newParts[j] = { type: 'text', text: summary }; - modified = true; + if (newTokens < oldTokens) { + newParts[j] = { type: 'text', text: summary }; + modified = true; + } } } - } - if (modified) { - returnedNodes.push({ - ...node, - id: env.idGenerator.generateId(), - semanticParts: newParts, - replacesId: node.id, - }); - } else { - returnedNodes.push(node); - } - break; - } - - case 'AGENT_THOUGHT': { - if (node.text.length > thresholdChars) { - const summary = await generateSummary( - node.text, - 'Agent Thought', - ); - const newTokens = env.tokenCalculator.estimateTokensForParts([ - { text: summary }, - ]); - const oldTokens = env.tokenCalculator.getTokenCost(node); - - if (newTokens < oldTokens) { + if (modified) { returnedNodes.push({ ...node, id: env.idGenerator.generateId(), - text: summary, + semanticParts: newParts, replacesId: node.id, }); - break; - } - } - returnedNodes.push(node); - break; - } - - case 'TOOL_EXECUTION': { - const rawObs = node.observation; - - let stringifiedObs = ''; - if (typeof rawObs === 'string') { - stringifiedObs = rawObs; - } else { - try { - stringifiedObs = JSON.stringify(rawObs); - } catch { - stringifiedObs = String(rawObs); + } else { + returnedNodes.push(node); } + break; } - if (stringifiedObs.length > thresholdChars) { - const summary = await generateSummary( - stringifiedObs, - node.toolName || 'unknown', - ); - const newObsObject = { summary }; + case 'AGENT_THOUGHT': { + if (node.text.length > thresholdChars) { + const summary = await generateSummary(node.text, 'Agent Thought'); + const newTokens = env.tokenCalculator.estimateTokensForParts([ + { text: summary }, + ]); + const oldTokens = env.tokenCalculator.getTokenCost(node); - const newObsTokens = - env.tokenCalculator.estimateTokensForParts([ + if (newTokens < oldTokens) { + returnedNodes.push({ + ...node, + id: env.idGenerator.generateId(), + text: summary, + replacesId: node.id, + }); + break; + } + } + returnedNodes.push(node); + break; + } + + case 'TOOL_EXECUTION': { + const rawObs = node.observation; + + let stringifiedObs = ''; + if (typeof rawObs === 'string') { + stringifiedObs = rawObs; + } else { + try { + stringifiedObs = JSON.stringify(rawObs); + } catch { + stringifiedObs = String(rawObs); + } + } + + if (stringifiedObs.length > thresholdChars) { + const summary = await generateSummary( + stringifiedObs, + node.toolName || 'unknown', + ); + const newObsObject = { summary }; + + const newObsTokens = env.tokenCalculator.estimateTokensForParts([ { functionResponse: { name: node.toolName || 'unknown', @@ -165,36 +158,36 @@ export function createNodeDistillationProcessor( }, ]); - const oldObsTokens = - node.tokens?.observation ?? - env.tokenCalculator.getTokenCost(node); - const intentTokens = node.tokens?.intent ?? 0; + const oldObsTokens = + node.tokens?.observation ?? + env.tokenCalculator.getTokenCost(node); + const intentTokens = node.tokens?.intent ?? 0; - if (newObsTokens < oldObsTokens) { - returnedNodes.push({ - ...node, - id: env.idGenerator.generateId(), - observation: newObsObject as Record, - tokens: { - intent: intentTokens, - observation: newObsTokens, - }, - replacesId: node.id, - }); - break; + if (newObsTokens < oldObsTokens) { + returnedNodes.push({ + ...node, + id: env.idGenerator.generateId(), + observation: newObsObject as Record, + tokens: { + intent: intentTokens, + observation: newObsTokens, + }, + replacesId: node.id, + }); + break; + } } + returnedNodes.push(node); + break; } - returnedNodes.push(node); - break; + + default: + returnedNodes.push(node); + break; } - - default: - returnedNodes.push(node); - break; } - } - return returnedNodes; - } + return returnedNodes; + }, }; } diff --git a/packages/core/src/context/processors/nodeTruncationProcessor.test.ts b/packages/core/src/context/processors/nodeTruncationProcessor.test.ts index 2724fc3445..c3734c4999 100644 --- a/packages/core/src/context/processors/nodeTruncationProcessor.test.ts +++ b/packages/core/src/context/processors/nodeTruncationProcessor.test.ts @@ -19,9 +19,13 @@ describe('NodeTruncationProcessor', () => { // env.tokenCalculator uses charsPerToken=1 natively. const env = createMockEnvironment(); - const processor = createNodeTruncationProcessor('NodeTruncationProcessor', env, { - maxTokensPerNode: 10, // 10 chars limit - }); + const processor = createNodeTruncationProcessor( + 'NodeTruncationProcessor', + env, + { + maxTokensPerNode: 10, // 10 chars limit + }, + ); const longText = 'A'.repeat(50); // 50 tokens @@ -82,9 +86,13 @@ describe('NodeTruncationProcessor', () => { it('should ignore nodes that are below maxTokensPerNode', async () => { const env = createMockEnvironment(); - const processor = createNodeTruncationProcessor('NodeTruncationProcessor', env, { - maxTokensPerNode: 100, // 100 chars limit - }); + const processor = createNodeTruncationProcessor( + 'NodeTruncationProcessor', + env, + { + maxTokensPerNode: 100, // 100 chars limit + }, + ); const shortText = 'Short text'; // 10 chars diff --git a/packages/core/src/context/processors/nodeTruncationProcessor.ts b/packages/core/src/context/processors/nodeTruncationProcessor.ts index 2b3e760cd1..b57a4f73b0 100644 --- a/packages/core/src/context/processors/nodeTruncationProcessor.ts +++ b/packages/core/src/context/processors/nodeTruncationProcessor.ts @@ -37,8 +37,7 @@ export function createNodeTruncationProcessor( if (newText !== text) { // Using accurate TokenCalculator instead of simple math - const newTokens = - env.tokenCalculator.estimateTokensForString(newText); + const newTokens = env.tokenCalculator.estimateTokensForString(newText); const oldTokens = env.tokenCalculator.estimateTokensForString(text); const tokensSaved = oldTokens - newTokens; @@ -53,82 +52,82 @@ export function createNodeTruncationProcessor( id, name: 'NodeTruncationProcessor', process: async ({ targets }: ProcessArgs) => { - if (targets.length === 0) { - return targets; - } + if (targets.length === 0) { + return targets; + } - const { maxTokensPerNode } = options; - const limitChars = env.tokenCalculator.tokensToChars(maxTokensPerNode); + const { maxTokensPerNode } = options; + const limitChars = env.tokenCalculator.tokensToChars(maxTokensPerNode); - const returnedNodes: ConcreteNode[] = []; + const returnedNodes: ConcreteNode[] = []; - for (const node of targets) { - switch (node.type) { - case 'USER_PROMPT': { - let modified = false; - const newParts = [...node.semanticParts]; + for (const node of targets) { + switch (node.type) { + case 'USER_PROMPT': { + let modified = false; + const newParts = [...node.semanticParts]; - for (let j = 0; j < node.semanticParts.length; j++) { - const part = node.semanticParts[j]; - if (part.type === 'text') { - const squashResult = tryApplySquash(part.text, limitChars); - if (squashResult) { - newParts[j] = { type: 'text', text: squashResult.text }; - modified = true; + for (let j = 0; j < node.semanticParts.length; j++) { + const part = node.semanticParts[j]; + if (part.type === 'text') { + const squashResult = tryApplySquash(part.text, limitChars); + if (squashResult) { + newParts[j] = { type: 'text', text: squashResult.text }; + modified = true; + } } } + + if (modified) { + returnedNodes.push({ + ...node, + id: env.idGenerator.generateId(), + semanticParts: newParts, + replacesId: node.id, + }); + } else { + returnedNodes.push(node); + } + break; } - if (modified) { - returnedNodes.push({ - ...node, - id: env.idGenerator.generateId(), - semanticParts: newParts, - replacesId: node.id, - }); - } else { + case 'AGENT_THOUGHT': { + const squashResult = tryApplySquash(node.text, limitChars); + if (squashResult) { + returnedNodes.push({ + ...node, + id: env.idGenerator.generateId(), + text: squashResult.text, + replacesId: node.id, + }); + } else { + returnedNodes.push(node); + } + break; + } + + case 'AGENT_YIELD': { + const squashResult = tryApplySquash(node.text, limitChars); + if (squashResult) { + returnedNodes.push({ + ...node, + id: env.idGenerator.generateId(), + text: squashResult.text, + replacesId: node.id, + }); + } else { + returnedNodes.push(node); + } + break; + } + + default: returnedNodes.push(node); - } - break; + break; } - - case 'AGENT_THOUGHT': { - const squashResult = tryApplySquash(node.text, limitChars); - if (squashResult) { - returnedNodes.push({ - ...node, - id: env.idGenerator.generateId(), - text: squashResult.text, - replacesId: node.id, - }); - } else { - returnedNodes.push(node); - } - break; - } - - case 'AGENT_YIELD': { - const squashResult = tryApplySquash(node.text, limitChars); - if (squashResult) { - returnedNodes.push({ - ...node, - id: env.idGenerator.generateId(), - text: squashResult.text, - replacesId: node.id, - }); - } else { - returnedNodes.push(node); - } - break; - } - - default: - returnedNodes.push(node); - break; } - } - return returnedNodes; - } + return returnedNodes; + }, }; } diff --git a/packages/core/src/context/processors/rollingSummaryProcessor.test.ts b/packages/core/src/context/processors/rollingSummaryProcessor.test.ts index 5e3ef3b4f3..233e35c804 100644 --- a/packages/core/src/context/processors/rollingSummaryProcessor.test.ts +++ b/packages/core/src/context/processors/rollingSummaryProcessor.test.ts @@ -14,9 +14,13 @@ import { describe('RollingSummaryProcessor', () => { it('should initialize with correct default options', () => { const env = createMockEnvironment(); - const processor = createRollingSummaryProcessor('RollingSummaryProcessor', env, { - target: 'incremental', - }); + const processor = createRollingSummaryProcessor( + 'RollingSummaryProcessor', + env, + { + target: 'incremental', + }, + ); expect(processor.id).toBe('RollingSummaryProcessor'); }); @@ -26,10 +30,14 @@ describe('RollingSummaryProcessor', () => { // We want to free exactly 100 tokens. // We will supply nodes that cost 50 tokens each. - const processor = createRollingSummaryProcessor('RollingSummaryProcessor', env, { - target: 'freeNTokens', - freeTokensTarget: 100, - }); + const processor = createRollingSummaryProcessor( + 'RollingSummaryProcessor', + env, + { + target: 'freeNTokens', + freeTokensTarget: 100, + }, + ); const text50 = 'A'.repeat(50); const targets = [ @@ -59,10 +67,14 @@ describe('RollingSummaryProcessor', () => { const env = createMockEnvironment(); // We want to free 100 tokens, but our nodes will only cost 10 tokens each. - const processor = createRollingSummaryProcessor('RollingSummaryProcessor', env, { - target: 'freeNTokens', - freeTokensTarget: 100, - }); + const processor = createRollingSummaryProcessor( + 'RollingSummaryProcessor', + env, + { + target: 'freeNTokens', + freeTokensTarget: 100, + }, + ); const text10 = 'A'.repeat(10); const targets = [ diff --git a/packages/core/src/context/processors/rollingSummaryProcessor.ts b/packages/core/src/context/processors/rollingSummaryProcessor.ts index c08960e09b..8ef10af38c 100644 --- a/packages/core/src/context/processors/rollingSummaryProcessor.ts +++ b/packages/core/src/context/processors/rollingSummaryProcessor.ts @@ -28,77 +28,79 @@ export function createRollingSummaryProcessor( id, name: 'RollingSummaryProcessor', process: async ({ targets }: ProcessArgs) => { - if (targets.length === 0) return targets; + if (targets.length === 0) return targets; - const strategy = options.target ?? 'max'; - let targetTokensToRemove = 0; + const strategy = options.target ?? 'max'; + let targetTokensToRemove = 0; - if (strategy === 'incremental') { - // A rolling summary should target a small chunk. For now, since state isn't passed, - // we'll default to a fixed threshold, like 10000 tokens, to avoid eating the whole history. - // Ideally, the orchestrator should pass `tokensToRemove` explicitly. - targetTokensToRemove = 10000; - } else if (strategy === 'freeNTokens') { - targetTokensToRemove = options.freeTokensTarget ?? Infinity; - } else if (strategy === 'max') { - targetTokensToRemove = Infinity; - } - - if (targetTokensToRemove <= 0) return targets; - - let deficitAccumulator = 0; - const nodesToSummarize: ConcreteNode[] = []; - - // Scan oldest to newest to find the oldest block that exceeds the token requirement - for (const node of targets) { - if (node.id === targets[0].id && node.type === 'USER_PROMPT') { - // Keep system prompt if it's the very first node - continue; + if (strategy === 'incremental') { + // A rolling summary should target a small chunk. For now, since state isn't passed, + // we'll default to a fixed threshold, like 10000 tokens, to avoid eating the whole history. + // Ideally, the orchestrator should pass `tokensToRemove` explicitly. + targetTokensToRemove = 10000; + } else if (strategy === 'freeNTokens') { + targetTokensToRemove = options.freeTokensTarget ?? Infinity; + } else if (strategy === 'max') { + targetTokensToRemove = Infinity; } - nodesToSummarize.push(node); - deficitAccumulator += env.tokenCalculator.getTokenCost(node); + if (targetTokensToRemove <= 0) return targets; - if (deficitAccumulator >= targetTokensToRemove) break; - } + let deficitAccumulator = 0; + const nodesToSummarize: ConcreteNode[] = []; - if (nodesToSummarize.length < 2) return targets; // Not enough context to summarize + // Scan oldest to newest to find the oldest block that exceeds the token requirement + for (const node of targets) { + if (node.id === targets[0].id && node.type === 'USER_PROMPT') { + // Keep system prompt if it's the very first node + continue; + } - try { - // Synthesize the rolling summary synchronously - const snapshotText = await generator.synthesizeSnapshot( - nodesToSummarize, - options.systemInstruction, - ); - const newId = env.idGenerator.generateId(); + nodesToSummarize.push(node); + deficitAccumulator += env.tokenCalculator.getTokenCost(node); - const summaryNode: RollingSummary = { - id: newId, - logicalParentId: newId, - type: 'ROLLING_SUMMARY', - timestamp: Date.now(), - text: snapshotText, - abstractsIds: nodesToSummarize.map((n) => n.id), - }; - - const consumedIds = nodesToSummarize.map((n) => n.id); - const returnedNodes = targets.filter((t) => !consumedIds.includes(t.id)); - const firstRemovedIdx = targets.findIndex((t) => - consumedIds.includes(t.id), - ); - - if (firstRemovedIdx !== -1) { - const idx = Math.max(0, firstRemovedIdx); - returnedNodes.splice(idx, 0, summaryNode); - } else { - returnedNodes.unshift(summaryNode); + if (deficitAccumulator >= targetTokensToRemove) break; } - return returnedNodes; - } catch (e) { - debugLogger.error('RollingSummaryProcessor failed sync backstop', e); - return targets; - } - } + if (nodesToSummarize.length < 2) return targets; // Not enough context to summarize + + try { + // Synthesize the rolling summary synchronously + const snapshotText = await generator.synthesizeSnapshot( + nodesToSummarize, + options.systemInstruction, + ); + const newId = env.idGenerator.generateId(); + + const summaryNode: RollingSummary = { + id: newId, + logicalParentId: newId, + type: 'ROLLING_SUMMARY', + timestamp: Date.now(), + text: snapshotText, + abstractsIds: nodesToSummarize.map((n) => n.id), + }; + + const consumedIds = nodesToSummarize.map((n) => n.id); + const returnedNodes = targets.filter( + (t) => !consumedIds.includes(t.id), + ); + const firstRemovedIdx = targets.findIndex((t) => + consumedIds.includes(t.id), + ); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, summaryNode); + } else { + returnedNodes.unshift(summaryNode); + } + + return returnedNodes; + } catch (e) { + debugLogger.error('RollingSummaryProcessor failed sync backstop', e); + return targets; + } + }, }; } diff --git a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts index ade626f93d..b1217f0166 100644 --- a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts @@ -19,8 +19,12 @@ describe('StateSnapshotAsyncProcessor', () => { // Spy on the publish method const publishSpy = vi.spyOn(env.inbox, 'publish'); - const worker = createStateSnapshotAsyncProcessor('StateSnapshotAsyncProcessor', env, { type: 'point-in-time' }); - + const worker = createStateSnapshotAsyncProcessor( + 'StateSnapshotAsyncProcessor', + env, + { type: 'point-in-time' }, + ); + const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); @@ -47,8 +51,12 @@ describe('StateSnapshotAsyncProcessor', () => { const publishSpy = vi.spyOn(env.inbox, 'publish'); const drainSpy = vi.spyOn(env.inbox, 'drainConsumed'); - const worker = createStateSnapshotAsyncProcessor('StateSnapshotAsyncProcessor', env, { type: 'accumulate' }); - + const worker = createStateSnapshotAsyncProcessor( + 'StateSnapshotAsyncProcessor', + env, + { type: 'accumulate' }, + ); + const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); const targets = [nodeC]; @@ -70,7 +78,9 @@ describe('StateSnapshotAsyncProcessor', () => { await worker.process(args); // The old draft should be consumed - expect((args.inbox as InboxSnapshotImpl).getConsumedIds().has('draft-1')).toBe(true); + expect( + (args.inbox as InboxSnapshotImpl).getConsumedIds().has('draft-1'), + ).toBe(true); expect(drainSpy).toHaveBeenCalledWith(expect.any(Set)); // The new publish should contain ALL consumed IDs (old + new) @@ -103,8 +113,12 @@ describe('StateSnapshotAsyncProcessor', () => { it('should ignore empty targets', async () => { const env = createMockEnvironment(); const publishSpy = vi.spyOn(env.inbox, 'publish'); - const worker = createStateSnapshotAsyncProcessor('StateSnapshotAsyncProcessor', env, { type: 'accumulate' }); - + const worker = createStateSnapshotAsyncProcessor( + 'StateSnapshotAsyncProcessor', + env, + { type: 'accumulate' }, + ); + await worker.process(createMockProcessArgs([], [], [])); expect(env.llmClient.generateContent).not.toHaveBeenCalled(); diff --git a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts index 9c4db578ec..c37e4ae059 100644 --- a/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts @@ -27,72 +27,75 @@ export function createStateSnapshotAsyncProcessor( process: async ({ targets, inbox }: ProcessArgs): Promise => { if (targets.length === 0) return; - try { - let nodesToSummarize = [...targets]; - let previousConsumedIds: string[] = []; - const processorType = options.type ?? 'point-in-time'; + try { + let nodesToSummarize = [...targets]; + let previousConsumedIds: string[] = []; + const processorType = options.type ?? 'point-in-time'; - if (processorType === 'accumulate') { - // Look for the most recent unconsumed accumulate snapshot in the inbox - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - }>('PROPOSED_SNAPSHOT'); - const accumulateSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === 'accumulate', + if (processorType === 'accumulate') { + // Look for the most recent unconsumed accumulate snapshot in the inbox + const proposedSnapshots = inbox.getMessages<{ + newText: string; + consumedIds: string[]; + type: string; + }>('PROPOSED_SNAPSHOT'); + const accumulateSnapshots = proposedSnapshots.filter( + (s) => s.payload.type === 'accumulate', + ); + + if (accumulateSnapshots.length > 0) { + // Sort to find the most recent + const latest = [...accumulateSnapshots].sort( + (a, b) => b.timestamp - a.timestamp, + )[0]; + + // Consume the old draft so the inbox doesn't fill up with stale drafts + inbox.consume(latest.id); + // And we must persist its consumption back to the live inbox immediately, + // because we are effectively "taking" it from the shelf to modify. + env.inbox.drainConsumed(new Set([latest.id])); + + previousConsumedIds = latest.payload.consumedIds; + + // Prepend a synthetic node representing the previous rolling state + const previousStateNode: ConcreteNode = { + id: env.idGenerator.generateId(), + logicalParentId: '', + type: 'SNAPSHOT', + timestamp: latest.timestamp, + text: latest.payload.newText, + }; + + nodesToSummarize = [previousStateNode, ...targets]; + } + } + + const snapshotText = await generator.synthesizeSnapshot( + nodesToSummarize, + options.systemInstruction, ); - if (accumulateSnapshots.length > 0) { - // Sort to find the most recent - const latest = [...accumulateSnapshots].sort( - (a, b) => b.timestamp - a.timestamp, - )[0]; + const newConsumedIds = [ + ...previousConsumedIds, + ...targets.map((t) => t.id), + ]; - // Consume the old draft so the inbox doesn't fill up with stale drafts - inbox.consume(latest.id); - // And we must persist its consumption back to the live inbox immediately, - // because we are effectively "taking" it from the shelf to modify. - env.inbox.drainConsumed(new Set([latest.id])); - - previousConsumedIds = latest.payload.consumedIds; - - // Prepend a synthetic node representing the previous rolling state - const previousStateNode: ConcreteNode = { - id: env.idGenerator.generateId(), - logicalParentId: '', - type: 'SNAPSHOT', - timestamp: latest.timestamp, - text: latest.payload.newText, - }; - - nodesToSummarize = [previousStateNode, ...targets]; - } + // In V2, async pipelines communicate their work to the inbox, and the processor picks it up. + env.inbox.publish( + 'PROPOSED_SNAPSHOT', + { + newText: snapshotText, + consumedIds: newConsumedIds, + type: processorType, + }, + env.idGenerator, + ); + } catch (e) { + debugLogger.error( + 'StateSnapshotAsyncProcessor failed to generate snapshot', + e, + ); } - - const snapshotText = await generator.synthesizeSnapshot( - nodesToSummarize, - options.systemInstruction, - ); - - const newConsumedIds = [ - ...previousConsumedIds, - ...targets.map((t) => t.id), - ]; - - // In V2, async pipelines communicate their work to the inbox, and the processor picks it up. - env.inbox.publish( - 'PROPOSED_SNAPSHOT', - { - newText: snapshotText, - consumedIds: newConsumedIds, - type: processorType, - }, - env.idGenerator, - ); - } catch (e) { - debugLogger.error('StateSnapshotAsyncProcessor failed to generate snapshot', e); - } - } + }, }; } diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts index 5fb508a68a..d3eb53dc8a 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts @@ -15,9 +15,13 @@ import type { InboxSnapshotImpl } from '../pipeline/inbox.js'; describe('StateSnapshotProcessor', () => { it('should ignore if budget is satisfied', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { - target: 'incremental', - }); + const processor = createStateSnapshotProcessor( + 'StateSnapshotProcessor', + env, + { + target: 'incremental', + }, + ); const targets = [createDummyNode('ep1', 'USER_PROMPT')]; const result = await processor.process(createMockProcessArgs(targets)); expect(result).toBe(targets); // Strict equality @@ -25,9 +29,13 @@ describe('StateSnapshotProcessor', () => { it('should apply a valid snapshot from the Inbox (Fast Path)', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { - target: 'incremental', - }); + const processor = createStateSnapshotProcessor( + 'StateSnapshotProcessor', + env, + { + target: 'incremental', + }, + ); const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); @@ -65,9 +73,13 @@ describe('StateSnapshotProcessor', () => { it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { - target: 'incremental', - }); + const processor = createStateSnapshotProcessor( + 'StateSnapshotProcessor', + env, + { + target: 'incremental', + }, + ); // Make deficit 0 so we don't fall through to the sync backstop and fail the test that way // node-A is MISSING (user deleted it) @@ -99,7 +111,11 @@ describe('StateSnapshotProcessor', () => { it('should fall back to sync backstop if inbox is empty', async () => { const env = createMockEnvironment(); - const processor = createStateSnapshotProcessor('StateSnapshotProcessor', env, { target: 'max' }); // Summarize all + const processor = createStateSnapshotProcessor( + 'StateSnapshotProcessor', + env, + { target: 'max' }, + ); // Summarize all const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 8029dd9cd5..9fa3389843 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -28,141 +28,140 @@ export function createStateSnapshotProcessor( return { id, name: 'StateSnapshotProcessor', - process: async ({ - targets, - inbox, - }: ProcessArgs) => { - if (targets.length === 0) { - return targets; - } + process: async ({ targets, inbox }: ProcessArgs) => { + if (targets.length === 0) { + return targets; + } - // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' - const strategy = options.target ?? 'max'; - const expectedType = - strategy === 'incremental' ? 'point-in-time' : 'accumulate'; + // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' + const strategy = options.target ?? 'max'; + const expectedType = + strategy === 'incremental' ? 'point-in-time' : 'accumulate'; - // 1. Check Inbox for a completed Snapshot (The Fast Path) - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - }>('PROPOSED_SNAPSHOT'); + // 1. Check Inbox for a completed Snapshot (The Fast Path) + const proposedSnapshots = inbox.getMessages<{ + newText: string; + consumedIds: string[]; + type: string; + }>('PROPOSED_SNAPSHOT'); - if (proposedSnapshots.length > 0) { - // Filter for the snapshot type that matches our processor mode - const matchingSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === expectedType, - ); + if (proposedSnapshots.length > 0) { + // Filter for the snapshot type that matches our processor mode + const matchingSnapshots = proposedSnapshots.filter( + (s) => s.payload.type === expectedType, + ); - // Sort by newest timestamp first (we want the most accumulated snapshot) - const sorted = [...matchingSnapshots].sort( - (a, b) => b.timestamp - a.timestamp, - ); + // Sort by newest timestamp first (we want the most accumulated snapshot) + const sorted = [...matchingSnapshots].sort( + (a, b) => b.timestamp - a.timestamp, + ); - for (const proposed of sorted) { - const { consumedIds, newText } = proposed.payload; + for (const proposed of sorted) { + const { consumedIds, newText } = proposed.payload; - // Verify all consumed IDs still exist sequentially in targets - const targetIds = new Set(targets.map((t) => t.id)); - const isValid = consumedIds.every((id) => targetIds.has(id)); + // Verify all consumed IDs still exist sequentially in targets + const targetIds = new Set(targets.map((t) => t.id)); + const isValid = consumedIds.every((id) => targetIds.has(id)); - if (isValid) { - // If valid, apply it! - const newId = env.idGenerator.generateId(); + if (isValid) { + // If valid, apply it! + const newId = env.idGenerator.generateId(); - const snapshotNode: Snapshot = { - id: newId, - logicalParentId: newId, - type: 'SNAPSHOT', - timestamp: Date.now(), - text: newText, - abstractsIds: consumedIds, - }; + const snapshotNode: Snapshot = { + id: newId, + logicalParentId: newId, + type: 'SNAPSHOT', + timestamp: Date.now(), + text: newText, + abstractsIds: consumedIds, + }; - // Remove the consumed nodes and insert the snapshot at the earliest index - const returnedNodes = targets.filter( - (t) => !consumedIds.includes(t.id), - ); - const firstRemovedIdx = targets.findIndex((t) => - consumedIds.includes(t.id), - ); + // Remove the consumed nodes and insert the snapshot at the earliest index + const returnedNodes = targets.filter( + (t) => !consumedIds.includes(t.id), + ); + const firstRemovedIdx = targets.findIndex((t) => + consumedIds.includes(t.id), + ); - if (firstRemovedIdx !== -1) { - const idx = Math.max(0, firstRemovedIdx); - returnedNodes.splice(idx, 0, snapshotNode); - } else { - returnedNodes.unshift(snapshotNode); + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + inbox.consume(proposed.id); + return returnedNodes; } - - inbox.consume(proposed.id); - return returnedNodes; } } - } - // 2. The Synchronous Backstop (The Slow Path) - let targetTokensToRemove = 0; + // 2. The Synchronous Backstop (The Slow Path) + let targetTokensToRemove = 0; - if (strategy === 'incremental') { - targetTokensToRemove = Infinity; // incremental implies removing as much as possible if no state is passed - } else if (strategy === 'freeNTokens') { - targetTokensToRemove = options.freeTokensTarget ?? Infinity; - } else if (strategy === 'max') { - targetTokensToRemove = Infinity; - } - - let deficitAccumulator = 0; - const nodesToSummarize: ConcreteNode[] = []; - - // Scan oldest to newest - for (const node of targets) { - if (node.id === targets[0].id && node.type === 'USER_PROMPT') { - // Keep system prompt if it's the very first node - // In a real system, system prompt is protected, but we double check - continue; + if (strategy === 'incremental') { + targetTokensToRemove = Infinity; // incremental implies removing as much as possible if no state is passed + } else if (strategy === 'freeNTokens') { + targetTokensToRemove = options.freeTokensTarget ?? Infinity; + } else if (strategy === 'max') { + targetTokensToRemove = Infinity; } - nodesToSummarize.push(node); - deficitAccumulator += env.tokenCalculator.getTokenCost(node); + let deficitAccumulator = 0; + const nodesToSummarize: ConcreteNode[] = []; - if (deficitAccumulator >= targetTokensToRemove) break; - } + // Scan oldest to newest + for (const node of targets) { + if (node.id === targets[0].id && node.type === 'USER_PROMPT') { + // Keep system prompt if it's the very first node + // In a real system, system prompt is protected, but we double check + continue; + } - if (nodesToSummarize.length < 2) return targets; // Not enough context + nodesToSummarize.push(node); + deficitAccumulator += env.tokenCalculator.getTokenCost(node); - try { - const snapshotText = await generator.synthesizeSnapshot( - nodesToSummarize, - options.systemInstruction, - ); - const newId = env.idGenerator.generateId(); - const snapshotNode: Snapshot = { - id: newId, - logicalParentId: newId, - type: 'SNAPSHOT', - timestamp: Date.now(), - text: snapshotText, - abstractsIds: nodesToSummarize.map((n) => n.id), - }; - - const consumedIds = nodesToSummarize.map((n) => n.id); - const returnedNodes = targets.filter((t) => !consumedIds.includes(t.id)); - const firstRemovedIdx = targets.findIndex((t) => - consumedIds.includes(t.id), - ); - - if (firstRemovedIdx !== -1) { - const idx = Math.max(0, firstRemovedIdx); - returnedNodes.splice(idx, 0, snapshotNode); - } else { - returnedNodes.unshift(snapshotNode); + if (deficitAccumulator >= targetTokensToRemove) break; } - return returnedNodes; - } catch (e) { - debugLogger.error('StateSnapshotProcessor failed sync backstop', e); - return targets; - } - } + if (nodesToSummarize.length < 2) return targets; // Not enough context + + try { + const snapshotText = await generator.synthesizeSnapshot( + nodesToSummarize, + options.systemInstruction, + ); + const newId = env.idGenerator.generateId(); + const snapshotNode: Snapshot = { + id: newId, + logicalParentId: newId, + type: 'SNAPSHOT', + timestamp: Date.now(), + text: snapshotText, + abstractsIds: nodesToSummarize.map((n) => n.id), + }; + + const consumedIds = nodesToSummarize.map((n) => n.id); + const returnedNodes = targets.filter( + (t) => !consumedIds.includes(t.id), + ); + const firstRemovedIdx = targets.findIndex((t) => + consumedIds.includes(t.id), + ); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + return returnedNodes; + } catch (e) { + debugLogger.error('StateSnapshotProcessor failed sync backstop', e); + return targets; + } + }, }; } diff --git a/packages/core/src/context/processors/toolMaskingProcessor.ts b/packages/core/src/context/processors/toolMaskingProcessor.ts index 51c96ebba6..29e7a56310 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.ts @@ -68,194 +68,194 @@ export function createToolMaskingProcessor( env: ContextEnvironment, options: ToolMaskingProcessorOptions, ): ContextProcessor { - const isAlreadyMasked = (text: string): boolean => text.includes(''); + const isAlreadyMasked = (text: string): boolean => + text.includes(''); return { id, name: 'ToolMaskingProcessor', process: async ({ targets }: ProcessArgs) => { - const maskingConfig = options; - if (!maskingConfig) return targets; - if (targets.length === 0) return targets; + const maskingConfig = options; + if (!maskingConfig) return targets; + if (targets.length === 0) return targets; - const limitChars = env.tokenCalculator.tokensToChars( - maskingConfig.stringLengthThresholdTokens, - ); - - let toolOutputsDir = env.fileSystem.join( - env.projectTempDir, - 'tool-outputs', - ); - const sessionId = env.sessionId; - if (sessionId) { - toolOutputsDir = env.fileSystem.join( - toolOutputsDir, - `session-${sanitizeFilenamePart(sessionId)}`, + const limitChars = env.tokenCalculator.tokensToChars( + maskingConfig.stringLengthThresholdTokens, ); - } - let directoryCreated = false; - - const handleMasking = async ( - content: string, - toolName: string, - callId: string, - nodeType: string, - ): Promise => { - if (!directoryCreated) { - await env.fileSystem.mkdir(toolOutputsDir, { recursive: true }); - directoryCreated = true; + let toolOutputsDir = env.fileSystem.join( + env.projectTempDir, + 'tool-outputs', + ); + const sessionId = env.sessionId; + if (sessionId) { + toolOutputsDir = env.fileSystem.join( + toolOutputsDir, + `session-${sanitizeFilenamePart(sessionId)}`, + ); } - const fileName = `${sanitizeFilenamePart(toolName).toLowerCase()}_${sanitizeFilenamePart(callId).toLowerCase()}_${nodeType}_${env.idGenerator.generateId()}.txt`; - const filePath = env.fileSystem.join(toolOutputsDir, fileName); + let directoryCreated = false; - await env.fileSystem.writeFile(filePath, content); + const handleMasking = async ( + content: string, + toolName: string, + callId: string, + nodeType: string, + ): Promise => { + if (!directoryCreated) { + await env.fileSystem.mkdir(toolOutputsDir, { recursive: true }); + directoryCreated = true; + } - const fileSizeMB = ( - Buffer.byteLength(content, 'utf8') / - 1024 / - 1024 - ).toFixed(2); - const totalLines = content.split('\n').length; - return `\n[Tool ${nodeType} string (${fileSizeMB}MB, ${totalLines} lines) masked to preserve context window. Full string saved to: ${filePath}]\n`; - }; + const fileName = `${sanitizeFilenamePart(toolName).toLowerCase()}_${sanitizeFilenamePart(callId).toLowerCase()}_${nodeType}_${env.idGenerator.generateId()}.txt`; + const filePath = env.fileSystem.join(toolOutputsDir, fileName); - const returnedNodes: ConcreteNode[] = []; + await env.fileSystem.writeFile(filePath, content); - for (const node of targets) { - switch (node.type) { - case 'TOOL_EXECUTION': { - const toolName = node.toolName; - if (toolName && UNMASKABLE_TOOLS.has(toolName)) { - returnedNodes.push(node); - break; - } + const fileSizeMB = ( + Buffer.byteLength(content, 'utf8') / + 1024 / + 1024 + ).toFixed(2); + const totalLines = content.split('\n').length; + return `\n[Tool ${nodeType} string (${fileSizeMB}MB, ${totalLines} lines) masked to preserve context window. Full string saved to: ${filePath}]\n`; + }; - const callId = node.id || Date.now().toString(); + const returnedNodes: ConcreteNode[] = []; - const maskAsync = async ( - obj: MaskableValue, - nodeType: string, - ): Promise<{ masked: MaskableValue; changed: boolean }> => { - if (typeof obj === 'string') { - if (obj.length > limitChars && !isAlreadyMasked(obj)) { - const newString = await handleMasking( - obj, - toolName || 'unknown', - callId, - nodeType, - ); - return { masked: newString, changed: true }; + for (const node of targets) { + switch (node.type) { + case 'TOOL_EXECUTION': { + const toolName = node.toolName; + if (toolName && UNMASKABLE_TOOLS.has(toolName)) { + returnedNodes.push(node); + break; + } + + const callId = node.id || Date.now().toString(); + + const maskAsync = async ( + obj: MaskableValue, + nodeType: string, + ): Promise<{ masked: MaskableValue; changed: boolean }> => { + if (typeof obj === 'string') { + if (obj.length > limitChars && !isAlreadyMasked(obj)) { + const newString = await handleMasking( + obj, + toolName || 'unknown', + callId, + nodeType, + ); + return { masked: newString, changed: true }; + } + return { masked: obj, changed: false }; + } + if (Array.isArray(obj)) { + let changed = false; + const masked: MaskableValue[] = []; + for (const item of obj) { + const res = await maskAsync(item, nodeType); + if (res.changed) changed = true; + masked.push(res.masked); + } + return { masked, changed }; + } + if (typeof obj === 'object' && obj !== null) { + let changed = false; + const masked: Record = {}; + for (const [key, value] of Object.entries(obj)) { + const res = await maskAsync(value, nodeType); + if (res.changed) changed = true; + masked[key] = res.masked; + } + return { masked, changed }; } return { masked: obj, changed: false }; + }; + + const rawIntent = node.intent; + const rawObs = node.observation; + + if (!isMaskableRecord(rawIntent) || !isMaskableValue(rawObs)) { + returnedNodes.push(node); + break; } - if (Array.isArray(obj)) { - let changed = false; - const masked: MaskableValue[] = []; - for (const item of obj) { - const res = await maskAsync(item, nodeType); - if (res.changed) changed = true; - masked.push(res.masked); - } - return { masked, changed }; - } - if (typeof obj === 'object' && obj !== null) { - let changed = false; - const masked: Record = {}; - for (const [key, value] of Object.entries(obj)) { - const res = await maskAsync(value, nodeType); - if (res.changed) changed = true; - masked[key] = res.masked; - } - return { masked, changed }; - } - return { masked: obj, changed: false }; - }; - const rawIntent = node.intent; - const rawObs = node.observation; + const intentRes = await maskAsync(rawIntent, 'intent'); + const obsRes = await maskAsync(rawObs, 'observation'); - if (!isMaskableRecord(rawIntent) || !isMaskableValue(rawObs)) { - returnedNodes.push(node); - break; - } + if (intentRes.changed || obsRes.changed) { + const maskedIntent = isMaskableRecord(intentRes.masked) + ? (intentRes.masked as Record) + : undefined; + // Handle observation explicitly as string vs object + const maskedObs = + typeof obsRes.masked === 'string' + ? ({ message: obsRes.masked } as Record) + : isMaskableRecord(obsRes.masked) + ? (obsRes.masked as Record) + : undefined; - const intentRes = await maskAsync(rawIntent, 'intent'); - const obsRes = await maskAsync(rawObs, 'observation'); + const newIntentTokens = + env.tokenCalculator.estimateTokensForParts([ + { + functionCall: { + name: toolName || 'unknown', + args: maskedIntent, + id: callId, + }, + }, + ]); - if (intentRes.changed || obsRes.changed) { - const maskedIntent = isMaskableRecord(intentRes.masked) - ? (intentRes.masked as Record) - : undefined; - // Handle observation explicitly as string vs object - const maskedObs = - typeof obsRes.masked === 'string' - ? ({ message: obsRes.masked } as Record) - : isMaskableRecord(obsRes.masked) - ? (obsRes.masked as Record) - : undefined; - - const newIntentTokens = - env.tokenCalculator.estimateTokensForParts([ - { - functionCall: { + let obsPart: Record = {}; + if (maskedObs) { + obsPart = { + functionResponse: { name: toolName || 'unknown', - args: maskedIntent, + response: maskedObs, id: callId, }, - }, - ]); + }; + } - let obsPart: Record = {}; - if (maskedObs) { - obsPart = { - functionResponse: { - name: toolName || 'unknown', - response: maskedObs, - id: callId, - }, - }; - } - - const newObsTokens = - env.tokenCalculator.estimateTokensForParts([ + const newObsTokens = env.tokenCalculator.estimateTokensForParts([ obsPart as Part, ]); - const tokensSaved = - env.tokenCalculator.getTokenCost(node) - - (newIntentTokens + newObsTokens); + const tokensSaved = + env.tokenCalculator.getTokenCost(node) - + (newIntentTokens + newObsTokens); - if (tokensSaved > 0) { - const maskedNode: ToolExecution = { - ...node, - id: env.idGenerator.generateId(), // Modified, so generate new ID - intent: maskedIntent ?? node.intent, - observation: maskedObs ?? node.observation, - tokens: { - intent: newIntentTokens, - observation: newObsTokens, - }, - replacesId: node.id, - }; + if (tokensSaved > 0) { + const maskedNode: ToolExecution = { + ...node, + id: env.idGenerator.generateId(), // Modified, so generate new ID + intent: maskedIntent ?? node.intent, + observation: maskedObs ?? node.observation, + tokens: { + intent: newIntentTokens, + observation: newObsTokens, + }, + replacesId: node.id, + }; - returnedNodes.push(maskedNode); + returnedNodes.push(maskedNode); + } else { + returnedNodes.push(node); + } } else { returnedNodes.push(node); } - } else { - returnedNodes.push(node); + break; } - break; + default: + returnedNodes.push(node); + break; } - default: - returnedNodes.push(node); - break; } - } - return returnedNodes; - } + return returnedNodes; + }, }; } diff --git a/packages/core/src/context/system-tests/SimulationHarness.ts b/packages/core/src/context/system-tests/SimulationHarness.ts index 6273828853..26eec5bfa4 100644 --- a/packages/core/src/context/system-tests/SimulationHarness.ts +++ b/packages/core/src/context/system-tests/SimulationHarness.ts @@ -113,7 +113,10 @@ export class SimulationHarness { let currentView = this.contextManager.getNodes(); const currentTokens = this.env.tokenCalculator.calculateConcreteListTokens(currentView); - if (this.config.config.budget && currentTokens > this.config.config.budget.maxTokens) { + if ( + this.config.config.budget && + currentTokens > this.config.config.budget.maxTokens + ) { debugLogger.log( `[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.config.budget.maxTokens}`, ); diff --git a/packages/core/src/context/system-tests/lifecycle.golden.test.ts b/packages/core/src/context/system-tests/lifecycle.golden.test.ts index 12dde5b790..4ff1b6e566 100644 --- a/packages/core/src/context/system-tests/lifecycle.golden.test.ts +++ b/packages/core/src/context/system-tests/lifecycle.golden.test.ts @@ -46,7 +46,9 @@ describe('System Lifecycle Golden Tests', () => { triggers: ['retained_exceeded'], processors: [ createBlobDegradationProcessor('BlobDegradationProcessor', env), - createToolMaskingProcessor('ToolMaskingProcessor', env, { stringLengthThresholdTokens: 50 }), + createToolMaskingProcessor('ToolMaskingProcessor', env, { + stringLengthThresholdTokens: 50, + }), createStateSnapshotProcessor('StateSnapshotProcessor', env, {}), ], }, @@ -54,15 +56,27 @@ describe('System Lifecycle Golden Tests', () => { name: 'Immediate Sanitization', // The magic string the projector is hardcoded to use triggers: ['retained_exceeded'], processors: [ - createHistoryTruncationProcessor('HistoryTruncationProcessor', env, {}), + createHistoryTruncationProcessor( + 'HistoryTruncationProcessor', + env, + {}, + ), + ], + }, + ], + buildAsyncPipelines: (env) => [ + { + name: 'Async', + triggers: ['nodes_aged_out'], + processors: [ + createStateSnapshotAsyncProcessor( + 'StateSnapshotAsyncProcessor', + env, + {}, + ), ], }, ], - buildAsyncPipelines: (env) => [{ - name: 'Async', - triggers: ['nodes_aged_out'], - processors: [createStateSnapshotAsyncProcessor('StateSnapshotAsyncProcessor', env, {})] - }], }); const mockLlmClient = createMockLlmClient([ @@ -186,11 +200,19 @@ describe('System Lifecycle Golden Tests', () => { budget: { maxTokens: 200, retainedTokens: 100 }, }, buildPipelines: () => [], - buildAsyncPipelines: (env) => [{ - name: 'Async', - triggers: ['nodes_aged_out'], - processors: [createStateSnapshotAsyncProcessor('StateSnapshotAsyncProcessor', env, {})] - }], + buildAsyncPipelines: (env) => [ + { + name: 'Async', + triggers: ['nodes_aged_out'], + processors: [ + createStateSnapshotAsyncProcessor( + 'StateSnapshotAsyncProcessor', + env, + {}, + ), + ], + }, + ], }; const harness = await SimulationHarness.create(gcConfig, mockLlmClient); diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index 5a4832d155..23e6f65f41 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -96,7 +96,6 @@ export function createDummyToolNode( import type { Mock } from 'vitest'; - export interface MockLlmClient extends BaseLlmClient { generateContent: Mock; } @@ -231,7 +230,7 @@ export function createMockContextConfig( getUsageStatisticsEnabled: vi.fn().mockReturnValue(false), getTargetDir: vi.fn().mockReturnValue('/tmp'), getSessionId: vi.fn().mockReturnValue('test-session'), - getExperimentalContextSidecarConfig: vi.fn().mockReturnValue(undefined), + getExperimentalContextManagementConfig: vi.fn().mockReturnValue(undefined), }; // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion