From 897a4d7f83a2528f395b11ea025eb07524b3fa9e Mon Sep 17 00:00:00 2001 From: joshualitt Date: Wed, 6 May 2026 09:37:08 -0700 Subject: [PATCH] fix(core): Fix hysteresis in async context management pipelines. (#26452) --- packages/core/src/context/contextManager.ts | 22 ++- .../core/src/context/pipeline/orchestrator.ts | 139 +++++++++++++---- .../context/system-tests/hysteresis.test.ts | 142 ++++++++++++++++++ .../context/system-tests/simulationHarness.ts | 49 +----- 4 files changed, 278 insertions(+), 74 deletions(-) create mode 100644 packages/core/src/context/system-tests/hysteresis.test.ts diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 88c90f9c9f..48e8dcd88b 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -30,6 +30,9 @@ export class ContextManager { private readonly orchestrator: PipelineOrchestrator; private readonly historyObserver: HistoryObserver; + // Hysteresis tracking to prevent utility call churn + private lastTriggeredDeficit = 0; + // Cache for Anomaly 3 (Redundant Renders) private lastRenderCache?: { nodesHash: string; @@ -69,6 +72,7 @@ export class ContextManager { event.targets, event.returnedNodes, ); + this.evaluateTriggers(new Set()); }); this.historyObserver.start(); @@ -137,11 +141,24 @@ export class ContextManager { const targetDeficit = currentTokens - this.sidecar.config.budget.retainedTokens; + // If the deficit has shrunk (e.g. after a consolidation), update the baseline + // so we can track growth from this new, smaller deficit. + if (targetDeficit < this.lastTriggeredDeficit) { + this.lastTriggeredDeficit = targetDeficit; + } + // Respect coalescing threshold for background work const threshold = this.sidecar.config.budget.coalescingThresholdTokens || 0; - if (targetDeficit >= threshold) { + // Only trigger if deficit has grown significantly since last time + const growthSinceLast = targetDeficit - this.lastTriggeredDeficit; + + if ( + targetDeficit >= threshold && + (growthSinceLast >= threshold || this.lastTriggeredDeficit === 0) + ) { + this.lastTriggeredDeficit = targetDeficit; this.env.tokenCalculator.garbageCollectCache( new Set(this.buffer.nodes.map((n) => n.id)), ); @@ -151,6 +168,9 @@ export class ContextManager { targetNodeIds: agedOutNodes, }); } + } else { + // Budget is healthy, reset hysteresis + this.lastTriggeredDeficit = 0; } } } diff --git a/packages/core/src/context/pipeline/orchestrator.ts b/packages/core/src/context/pipeline/orchestrator.ts index a111f05af2..8b8dbe706f 100644 --- a/packages/core/src/context/pipeline/orchestrator.ts +++ b/packages/core/src/context/pipeline/orchestrator.ts @@ -23,6 +23,7 @@ export class PipelineOrchestrator { private activeTimers: NodeJS.Timeout[] = []; private readonly pendingPipelines = new Map>(); private readonly pipelineMutex = new Map>(); + private readonly pipelineScheduled = new Set(); private nodeProvider: (() => readonly ConcreteNode[]) | undefined; constructor( @@ -77,7 +78,7 @@ export class PipelineOrchestrator { nodes: readonly ConcreteNode[], targets: ReadonlySet, protectedIds: ReadonlySet, - ) => void, + ) => Promise, ) => { for (const pipeline of pipelines) { for (const trigger of pipeline.triggers) { @@ -91,30 +92,62 @@ export class PipelineOrchestrator { trigger === 'nodes_aged_out' ) { this.eventBus.onConsolidationNeeded((event) => { - executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); + void executeFn( + pipeline, + event.nodes, + event.targetNodeIds, + new Set(), + ); }); } else if (trigger === 'new_message' || trigger === 'nodes_added') { this.eventBus.onChunkReceived((event) => { - executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); + void executeFn( + pipeline, + event.nodes, + event.targetNodeIds, + new Set(), + ); }); } } } }; - bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => { - // Fetch the tail of the current chain for this pipeline, or start a new one + const handleSyncExecution = async ( + pipeline: PipelineDef, + nodes: readonly ConcreteNode[], + targets: ReadonlySet, + protectedIds: ReadonlySet, + ) => { + if (this.pipelineScheduled.has(pipeline.name)) { + debugLogger.log( + `[Orchestrator] Pipeline ${pipeline.name} already scheduled (sync), dropping.`, + ); + return; + } + this.pipelineScheduled.add(pipeline.name); + const existing = this.pipelineMutex.get(pipeline.name) || Promise.resolve(); const nextPromise = (async () => { try { - // Wait for the previous run of THIS pipeline to complete await existing; + this.pipelineScheduled.delete(pipeline.name); - // We re-fetch the LATEST nodes from the environment's live buffer - // to ensure this sequential run isn't operating on stale data from the trigger event. - const latestNodes = this.nodeProvider!(); + const latestNodes = this.nodeProvider ? this.nodeProvider() : nodes; + const latestTargets = latestNodes.filter((n) => targets.has(n.id)); + + debugLogger.log( + `[Orchestrator] Executing sync pipeline ${pipeline.name} with ${latestTargets.length} latest targets.`, + ); + + if (latestTargets.length === 0) { + debugLogger.log( + `[Orchestrator] No latest targets for sync pipeline ${pipeline.name}, returning.`, + ); + return; + } await this.executePipelineAsync( pipeline, @@ -123,41 +156,87 @@ export class PipelineOrchestrator { new Set(protectedIds), ); } catch (e) { - debugLogger.error(`Pipeline chain ${pipeline.name} failed:`, e); + debugLogger.error(`Sync pipeline chain ${pipeline.name} failed:`, e); } })(); - // Update the chain tail this.pipelineMutex.set(pipeline.name, nextPromise); - const pipelineId = `${pipeline.name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; this.pendingPipelines.set(pipelineId, nextPromise); void nextPromise.finally(() => { this.pendingPipelines.delete(pipelineId); - // Only clear the mutex if we are still the tail of the chain if (this.pipelineMutex.get(pipeline.name) === nextPromise) { this.pipelineMutex.delete(pipeline.name); } }); - }); + }; - bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => { - const inboxSnapshot = new InboxSnapshotImpl( - this.env.inbox.getMessages() || [], - ); - const targets = nodes.filter((n) => targetIds.has(n.id)); - for (const processor of pipeline.processors) { - processor - .process({ - targets, - inbox: inboxSnapshot, - buffer: ContextWorkingBufferImpl.initialize(nodes), - }) - .catch((e: unknown) => - debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e), - ); + const handleAsyncExecution = async ( + pipeline: AsyncPipelineDef, + nodes: readonly ConcreteNode[], + targets: ReadonlySet, + ) => { + if (this.pipelineScheduled.has(pipeline.name)) { + debugLogger.log( + `[Orchestrator] Pipeline ${pipeline.name} already scheduled (async), dropping.`, + ); + return; } - }); + this.pipelineScheduled.add(pipeline.name); + + const existing = + this.pipelineMutex.get(pipeline.name) || Promise.resolve(); + + const nextPromise = (async () => { + try { + await existing; + this.pipelineScheduled.delete(pipeline.name); + + const latestNodes = this.nodeProvider ? this.nodeProvider() : nodes; + const latestTargets = latestNodes.filter((n) => targets.has(n.id)); + + debugLogger.log( + `[Orchestrator] Executing async pipeline ${pipeline.name} with ${latestTargets.length} latest targets.`, + ); + + const inboxSnapshot = new InboxSnapshotImpl( + this.env.inbox.getMessages() || [], + ); + + for (const processor of pipeline.processors) { + debugLogger.log( + `[Orchestrator] Running async processor ${processor.id}`, + ); + await processor.process({ + targets: latestTargets, + inbox: inboxSnapshot, + buffer: ContextWorkingBufferImpl.initialize(latestNodes), + }); + } + this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds()); + } catch (e) { + debugLogger.error(`Async pipeline chain ${pipeline.name} failed:`, e); + } + })(); + + this.pipelineMutex.set(pipeline.name, nextPromise); + const pipelineId = `${pipeline.name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + this.pendingPipelines.set(pipelineId, nextPromise); + void nextPromise.finally(() => { + this.pendingPipelines.delete(pipelineId); + if (this.pipelineMutex.get(pipeline.name) === nextPromise) { + this.pipelineMutex.delete(pipeline.name); + } + }); + }; + + bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => + handleSyncExecution(pipeline, nodes, targets, protectedIds), + ); + + bindTriggers(this.asyncPipelines, (pipeline, nodes, targets) => + handleAsyncExecution(pipeline, nodes, targets), + ); } shutdown() { diff --git a/packages/core/src/context/system-tests/hysteresis.test.ts b/packages/core/src/context/system-tests/hysteresis.test.ts new file mode 100644 index 0000000000..ca335a43fa --- /dev/null +++ b/packages/core/src/context/system-tests/hysteresis.test.ts @@ -0,0 +1,142 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect } from 'vitest'; +import { SimulationHarness } from './simulationHarness.js'; +import { createMockLlmClient } from '../testing/contextTestUtils.js'; +import type { ContextProfile } from '../config/profiles.js'; +import { generalistProfile } from '../config/profiles.js'; + +describe('Context Manager Hysteresis Tests', () => { + const mockLlmClient = createMockLlmClient(['']); + + const getHysteresisConfig = (threshold: number): ContextProfile => ({ + ...generalistProfile, + name: 'Hysteresis Stress Test', + config: { + budget: { + maxTokens: 5000, + retainedTokens: 1000, + coalescingThresholdTokens: threshold, + }, + }, + }); + + it('should block consolidation when deficit is below coalescing threshold', async () => { + const threshold = 1500; + const harness = await SimulationHarness.create( + getHysteresisConfig(threshold), + mockLlmClient, + ); + + // Turn 0: INIT + await harness.simulateTurn([{ role: 'user', parts: [{ text: 'INIT' }] }]); + + // Turn 1: Add 1500 chars (~500 tokens). Total ~500. Under retained (1000). + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'A'.repeat(1500) }] }, + ]); + + // Turn 2: Add 3000 chars (~1000 tokens). Total ~1500. Deficit ~500 < 1500. + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'B'.repeat(3000) }] }, + ]); + + await new Promise((resolve) => setTimeout(resolve, 100)); + let state = await harness.getGoldenState(); + // No snapshot because maxTokens (5000) not exceeded, and deficit < threshold. + expect( + state.finalProjection.some((c) => + c.parts?.some((p) => p.text?.includes('')), + ), + ).toBe(false); + + // Turn 3: Add 9000 chars (~3000 tokens). Total ~4500. + // Deficit ~3500 > 1500. TRIGGER! + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'C'.repeat(9000) }] }, + ]); + + // Give it a moment for the async task to finish + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Exceed maxTokens to force a render that shows the snapshot + // Add 3000 more tokens (9000 chars). Total ~7500 > 5000. + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'D'.repeat(9000) }] }, + ]); + + state = await harness.getGoldenState(); + expect( + state.finalProjection.some((c) => + c.parts?.some((p) => p.text?.includes('')), + ), + ).toBe(true); + }); + + it('should track growth from the new baseline after consolidation', async () => { + const threshold = 1000; + const harness = await SimulationHarness.create( + getHysteresisConfig(threshold), + mockLlmClient, + ); + + // 1. Trigger first consolidation + // Add ~9000 chars (~3000 tokens). Total ~3000. Deficit ~2000 > 1000. + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'A'.repeat(9000) }] }, + ]); + await harness.simulateTurn([{ role: 'user', parts: [{ text: 'B' }] }]); // Make eligible + + await new Promise((resolve) => setTimeout(resolve, 500)); + // Exceed maxTokens (5000) to see it + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'X'.repeat(9000) }] }, + ]); + + const state = await harness.getGoldenState(); + expect( + state.finalProjection.some((c) => + c.parts?.some((p) => p.text?.includes('')), + ), + ).toBe(true); + + // Get baseline tokens + const baselineTokens = + harness.env.tokenCalculator.calculateConcreteListTokens( + harness.contextManager.getNodes(), + ); + + // 2. Add nodes again, staying below threshold growth + // Add 1500 chars (~500 tokens). Growth ~500 < 1000. + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'C'.repeat(1500) }] }, + ]); + await harness.simulateTurn([{ role: 'user', parts: [{ text: 'D' }] }]); // Make eligible + + await new Promise((resolve) => setTimeout(resolve, 200)); + const currentTokens = + harness.env.tokenCalculator.calculateConcreteListTokens( + harness.contextManager.getNodes(), + ); + // Should not have shrunk further (except for D's small addition) + expect(currentTokens).toBeGreaterThanOrEqual(baselineTokens); + + // 3. Exceed threshold growth + // Add 6000 chars (~2000 tokens). Growth = ~500 + ~2000 = ~2500 > 1000. + await harness.simulateTurn([ + { role: 'user', parts: [{ text: 'E'.repeat(6000) }] }, + ]); + await harness.simulateTurn([{ role: 'user', parts: [{ text: 'F' }] }]); // Make eligible + + await new Promise((resolve) => setTimeout(resolve, 500)); + const finalTokens = harness.env.tokenCalculator.calculateConcreteListTokens( + harness.contextManager.getNodes(), + ); + // Now it should have consolidated again (E should be replaced by a snapshot eventually) + expect(finalTokens).toBeLessThan(currentTokens + 2000); + }); +}); diff --git a/packages/core/src/context/system-tests/simulationHarness.ts b/packages/core/src/context/system-tests/simulationHarness.ts index 567aa95013..303b715273 100644 --- a/packages/core/src/context/system-tests/simulationHarness.ts +++ b/packages/core/src/context/system-tests/simulationHarness.ts @@ -12,7 +12,6 @@ import { ContextEnvironmentImpl } from '../pipeline/environmentImpl.js'; import { ContextTracer } from '../tracer.js'; import { ContextEventBus } from '../eventBus.js'; import { PipelineOrchestrator } from '../pipeline/orchestrator.js'; -import { debugLogger } from '../../utils/debugLogger.js'; import type { BaseLlmClient } from '../../core/baseLlmClient.js'; export interface TurnSummary { @@ -65,7 +64,7 @@ export class SimulationHarness { mockTempDir, mockTempDir, this.tracer, - 1, // 1 char per token average + 1, // 1 char per token average for estimation (but estimator uses 0.33) this.eventBus, ); @@ -85,60 +84,24 @@ export class SimulationHarness { ); } - /** - * Simulates a single "Turn" (User input + Model/Tool outputs) - * A turn might consist of multiple Content messages (e.g. user prompt -> model call -> user response -> model answer) - */ async simulateTurn(messages: Content[]) { // 1. Append the new messages const currentHistory = this.chatHistory.get(); this.chatHistory.set([...currentHistory, ...messages]); - // 2. Measure tokens immediately after append (Before background processing) + // 2. Measure tokens immediately after append const tokensBefore = this.env.tokenCalculator.calculateConcreteListTokens( this.contextManager.getNodes(), ); - debugLogger.log( - `[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`, - ); - // 3. Yield to event loop to allow internal async subscribers and orchestrator to finish - await new Promise((resolve) => setTimeout(resolve, 50)); + // 3. Yield to event loop and wait for async pipelines to finish + await this.contextManager.waitForPipelines(); + await new Promise((resolve) => setTimeout(resolve, 100)); // Extra beat for event bus propagation - // 3.1 Simulate what projectCompressedHistory does with the sync handlers - let currentView = this.contextManager.getNodes(); - const currentTokens = - this.env.tokenCalculator.calculateConcreteListTokens(currentView); - if ( - this.config.config.budget && - currentTokens > this.config.config.budget.maxTokens - ) { - debugLogger.log( - `[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.config.budget.maxTokens}`, - ); - const orchestrator = this.orchestrator; - // In the V2 simulation, we trigger the 'gc_backstop' to simulate emergency pressure. - // Since contextManager owns its buffer natively, the simulation now properly matches reality - // where the manager runs the orchestrator and keeps the resulting modified view. - const modifiedView = await orchestrator.executeTriggerSync( - 'gc_backstop', - currentView, - new Set(currentView.map((e) => e.id)), - new Set(), - ); - - // In the real system, ContextManager triggers this and retains it. - // We will emulate that behavior internally in the test loop for token counting. - currentView = modifiedView; - } - - // 4. Measure tokens after background processors have processed inboxes + // 4. Measure tokens after background processors const tokensAfter = this.env.tokenCalculator.calculateConcreteListTokens( this.contextManager.getNodes(), ); - debugLogger.log( - `[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`, - ); this.tokenTrajectory.push({ turnIndex: this.currentTurnIndex++,