fix(core): Fix hysteresis in async context management pipelines. (#26452)

This commit is contained in:
joshualitt
2026-05-06 09:37:08 -07:00
committed by GitHub
parent 5155221bbe
commit 897a4d7f83
4 changed files with 278 additions and 74 deletions
+21 -1
View File
@@ -30,6 +30,9 @@ export class ContextManager {
private readonly orchestrator: PipelineOrchestrator;
private readonly historyObserver: HistoryObserver;
// Hysteresis tracking to prevent utility call churn
private lastTriggeredDeficit = 0;
// Cache for Anomaly 3 (Redundant Renders)
private lastRenderCache?: {
nodesHash: string;
@@ -69,6 +72,7 @@ export class ContextManager {
event.targets,
event.returnedNodes,
);
this.evaluateTriggers(new Set());
});
this.historyObserver.start();
@@ -137,11 +141,24 @@ export class ContextManager {
const targetDeficit =
currentTokens - this.sidecar.config.budget.retainedTokens;
// If the deficit has shrunk (e.g. after a consolidation), update the baseline
// so we can track growth from this new, smaller deficit.
if (targetDeficit < this.lastTriggeredDeficit) {
this.lastTriggeredDeficit = targetDeficit;
}
// Respect coalescing threshold for background work
const threshold =
this.sidecar.config.budget.coalescingThresholdTokens || 0;
if (targetDeficit >= threshold) {
// Only trigger if deficit has grown significantly since last time
const growthSinceLast = targetDeficit - this.lastTriggeredDeficit;
if (
targetDeficit >= threshold &&
(growthSinceLast >= threshold || this.lastTriggeredDeficit === 0)
) {
this.lastTriggeredDeficit = targetDeficit;
this.env.tokenCalculator.garbageCollectCache(
new Set(this.buffer.nodes.map((n) => n.id)),
);
@@ -151,6 +168,9 @@ export class ContextManager {
targetNodeIds: agedOutNodes,
});
}
} else {
// Budget is healthy, reset hysteresis
this.lastTriggeredDeficit = 0;
}
}
}
@@ -23,6 +23,7 @@ export class PipelineOrchestrator {
private activeTimers: NodeJS.Timeout[] = [];
private readonly pendingPipelines = new Map<string, Promise<void>>();
private readonly pipelineMutex = new Map<string, Promise<void>>();
private readonly pipelineScheduled = new Set<string>();
private nodeProvider: (() => readonly ConcreteNode[]) | undefined;
constructor(
@@ -77,7 +78,7 @@ export class PipelineOrchestrator {
nodes: readonly ConcreteNode[],
targets: ReadonlySet<string>,
protectedIds: ReadonlySet<string>,
) => void,
) => Promise<void>,
) => {
for (const pipeline of pipelines) {
for (const trigger of pipeline.triggers) {
@@ -91,30 +92,62 @@ export class PipelineOrchestrator {
trigger === 'nodes_aged_out'
) {
this.eventBus.onConsolidationNeeded((event) => {
executeFn(pipeline, event.nodes, event.targetNodeIds, new Set());
void executeFn(
pipeline,
event.nodes,
event.targetNodeIds,
new Set(),
);
});
} else if (trigger === 'new_message' || trigger === 'nodes_added') {
this.eventBus.onChunkReceived((event) => {
executeFn(pipeline, event.nodes, event.targetNodeIds, new Set());
void executeFn(
pipeline,
event.nodes,
event.targetNodeIds,
new Set(),
);
});
}
}
}
};
bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => {
// Fetch the tail of the current chain for this pipeline, or start a new one
const handleSyncExecution = async (
pipeline: PipelineDef,
nodes: readonly ConcreteNode[],
targets: ReadonlySet<string>,
protectedIds: ReadonlySet<string>,
) => {
if (this.pipelineScheduled.has(pipeline.name)) {
debugLogger.log(
`[Orchestrator] Pipeline ${pipeline.name} already scheduled (sync), dropping.`,
);
return;
}
this.pipelineScheduled.add(pipeline.name);
const existing =
this.pipelineMutex.get(pipeline.name) || Promise.resolve();
const nextPromise = (async () => {
try {
// Wait for the previous run of THIS pipeline to complete
await existing;
this.pipelineScheduled.delete(pipeline.name);
// We re-fetch the LATEST nodes from the environment's live buffer
// to ensure this sequential run isn't operating on stale data from the trigger event.
const latestNodes = this.nodeProvider!();
const latestNodes = this.nodeProvider ? this.nodeProvider() : nodes;
const latestTargets = latestNodes.filter((n) => targets.has(n.id));
debugLogger.log(
`[Orchestrator] Executing sync pipeline ${pipeline.name} with ${latestTargets.length} latest targets.`,
);
if (latestTargets.length === 0) {
debugLogger.log(
`[Orchestrator] No latest targets for sync pipeline ${pipeline.name}, returning.`,
);
return;
}
await this.executePipelineAsync(
pipeline,
@@ -123,41 +156,87 @@ export class PipelineOrchestrator {
new Set(protectedIds),
);
} catch (e) {
debugLogger.error(`Pipeline chain ${pipeline.name} failed:`, e);
debugLogger.error(`Sync pipeline chain ${pipeline.name} failed:`, e);
}
})();
// Update the chain tail
this.pipelineMutex.set(pipeline.name, nextPromise);
const pipelineId = `${pipeline.name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
this.pendingPipelines.set(pipelineId, nextPromise);
void nextPromise.finally(() => {
this.pendingPipelines.delete(pipelineId);
// Only clear the mutex if we are still the tail of the chain
if (this.pipelineMutex.get(pipeline.name) === nextPromise) {
this.pipelineMutex.delete(pipeline.name);
}
});
});
};
bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => {
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
const targets = nodes.filter((n) => targetIds.has(n.id));
for (const processor of pipeline.processors) {
processor
.process({
targets,
inbox: inboxSnapshot,
buffer: ContextWorkingBufferImpl.initialize(nodes),
})
.catch((e: unknown) =>
debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e),
);
const handleAsyncExecution = async (
pipeline: AsyncPipelineDef,
nodes: readonly ConcreteNode[],
targets: ReadonlySet<string>,
) => {
if (this.pipelineScheduled.has(pipeline.name)) {
debugLogger.log(
`[Orchestrator] Pipeline ${pipeline.name} already scheduled (async), dropping.`,
);
return;
}
});
this.pipelineScheduled.add(pipeline.name);
const existing =
this.pipelineMutex.get(pipeline.name) || Promise.resolve();
const nextPromise = (async () => {
try {
await existing;
this.pipelineScheduled.delete(pipeline.name);
const latestNodes = this.nodeProvider ? this.nodeProvider() : nodes;
const latestTargets = latestNodes.filter((n) => targets.has(n.id));
debugLogger.log(
`[Orchestrator] Executing async pipeline ${pipeline.name} with ${latestTargets.length} latest targets.`,
);
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
for (const processor of pipeline.processors) {
debugLogger.log(
`[Orchestrator] Running async processor ${processor.id}`,
);
await processor.process({
targets: latestTargets,
inbox: inboxSnapshot,
buffer: ContextWorkingBufferImpl.initialize(latestNodes),
});
}
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
} catch (e) {
debugLogger.error(`Async pipeline chain ${pipeline.name} failed:`, e);
}
})();
this.pipelineMutex.set(pipeline.name, nextPromise);
const pipelineId = `${pipeline.name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
this.pendingPipelines.set(pipelineId, nextPromise);
void nextPromise.finally(() => {
this.pendingPipelines.delete(pipelineId);
if (this.pipelineMutex.get(pipeline.name) === nextPromise) {
this.pipelineMutex.delete(pipeline.name);
}
});
};
bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) =>
handleSyncExecution(pipeline, nodes, targets, protectedIds),
);
bindTriggers(this.asyncPipelines, (pipeline, nodes, targets) =>
handleAsyncExecution(pipeline, nodes, targets),
);
}
shutdown() {
@@ -0,0 +1,142 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect } from 'vitest';
import { SimulationHarness } from './simulationHarness.js';
import { createMockLlmClient } from '../testing/contextTestUtils.js';
import type { ContextProfile } from '../config/profiles.js';
import { generalistProfile } from '../config/profiles.js';
describe('Context Manager Hysteresis Tests', () => {
const mockLlmClient = createMockLlmClient(['<SNAPSHOT>']);
const getHysteresisConfig = (threshold: number): ContextProfile => ({
...generalistProfile,
name: 'Hysteresis Stress Test',
config: {
budget: {
maxTokens: 5000,
retainedTokens: 1000,
coalescingThresholdTokens: threshold,
},
},
});
it('should block consolidation when deficit is below coalescing threshold', async () => {
const threshold = 1500;
const harness = await SimulationHarness.create(
getHysteresisConfig(threshold),
mockLlmClient,
);
// Turn 0: INIT
await harness.simulateTurn([{ role: 'user', parts: [{ text: 'INIT' }] }]);
// Turn 1: Add 1500 chars (~500 tokens). Total ~500. Under retained (1000).
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'A'.repeat(1500) }] },
]);
// Turn 2: Add 3000 chars (~1000 tokens). Total ~1500. Deficit ~500 < 1500.
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'B'.repeat(3000) }] },
]);
await new Promise((resolve) => setTimeout(resolve, 100));
let state = await harness.getGoldenState();
// No snapshot because maxTokens (5000) not exceeded, and deficit < threshold.
expect(
state.finalProjection.some((c) =>
c.parts?.some((p) => p.text?.includes('<SNAPSHOT>')),
),
).toBe(false);
// Turn 3: Add 9000 chars (~3000 tokens). Total ~4500.
// Deficit ~3500 > 1500. TRIGGER!
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'C'.repeat(9000) }] },
]);
// Give it a moment for the async task to finish
await new Promise((resolve) => setTimeout(resolve, 500));
// Exceed maxTokens to force a render that shows the snapshot
// Add 3000 more tokens (9000 chars). Total ~7500 > 5000.
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'D'.repeat(9000) }] },
]);
state = await harness.getGoldenState();
expect(
state.finalProjection.some((c) =>
c.parts?.some((p) => p.text?.includes('<SNAPSHOT>')),
),
).toBe(true);
});
it('should track growth from the new baseline after consolidation', async () => {
const threshold = 1000;
const harness = await SimulationHarness.create(
getHysteresisConfig(threshold),
mockLlmClient,
);
// 1. Trigger first consolidation
// Add ~9000 chars (~3000 tokens). Total ~3000. Deficit ~2000 > 1000.
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'A'.repeat(9000) }] },
]);
await harness.simulateTurn([{ role: 'user', parts: [{ text: 'B' }] }]); // Make eligible
await new Promise((resolve) => setTimeout(resolve, 500));
// Exceed maxTokens (5000) to see it
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'X'.repeat(9000) }] },
]);
const state = await harness.getGoldenState();
expect(
state.finalProjection.some((c) =>
c.parts?.some((p) => p.text?.includes('<SNAPSHOT>')),
),
).toBe(true);
// Get baseline tokens
const baselineTokens =
harness.env.tokenCalculator.calculateConcreteListTokens(
harness.contextManager.getNodes(),
);
// 2. Add nodes again, staying below threshold growth
// Add 1500 chars (~500 tokens). Growth ~500 < 1000.
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'C'.repeat(1500) }] },
]);
await harness.simulateTurn([{ role: 'user', parts: [{ text: 'D' }] }]); // Make eligible
await new Promise((resolve) => setTimeout(resolve, 200));
const currentTokens =
harness.env.tokenCalculator.calculateConcreteListTokens(
harness.contextManager.getNodes(),
);
// Should not have shrunk further (except for D's small addition)
expect(currentTokens).toBeGreaterThanOrEqual(baselineTokens);
// 3. Exceed threshold growth
// Add 6000 chars (~2000 tokens). Growth = ~500 + ~2000 = ~2500 > 1000.
await harness.simulateTurn([
{ role: 'user', parts: [{ text: 'E'.repeat(6000) }] },
]);
await harness.simulateTurn([{ role: 'user', parts: [{ text: 'F' }] }]); // Make eligible
await new Promise((resolve) => setTimeout(resolve, 500));
const finalTokens = harness.env.tokenCalculator.calculateConcreteListTokens(
harness.contextManager.getNodes(),
);
// Now it should have consolidated again (E should be replaced by a snapshot eventually)
expect(finalTokens).toBeLessThan(currentTokens + 2000);
});
});
@@ -12,7 +12,6 @@ import { ContextEnvironmentImpl } from '../pipeline/environmentImpl.js';
import { ContextTracer } from '../tracer.js';
import { ContextEventBus } from '../eventBus.js';
import { PipelineOrchestrator } from '../pipeline/orchestrator.js';
import { debugLogger } from '../../utils/debugLogger.js';
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
export interface TurnSummary {
@@ -65,7 +64,7 @@ export class SimulationHarness {
mockTempDir,
mockTempDir,
this.tracer,
1, // 1 char per token average
1, // 1 char per token average for estimation (but estimator uses 0.33)
this.eventBus,
);
@@ -85,60 +84,24 @@ export class SimulationHarness {
);
}
/**
* Simulates a single "Turn" (User input + Model/Tool outputs)
* A turn might consist of multiple Content messages (e.g. user prompt -> model call -> user response -> model answer)
*/
async simulateTurn(messages: Content[]) {
// 1. Append the new messages
const currentHistory = this.chatHistory.get();
this.chatHistory.set([...currentHistory, ...messages]);
// 2. Measure tokens immediately after append (Before background processing)
// 2. Measure tokens immediately after append
const tokensBefore = this.env.tokenCalculator.calculateConcreteListTokens(
this.contextManager.getNodes(),
);
debugLogger.log(
`[Turn ${this.currentTurnIndex}] Tokens BEFORE: ${tokensBefore}`,
);
// 3. Yield to event loop to allow internal async subscribers and orchestrator to finish
await new Promise((resolve) => setTimeout(resolve, 50));
// 3. Yield to event loop and wait for async pipelines to finish
await this.contextManager.waitForPipelines();
await new Promise((resolve) => setTimeout(resolve, 100)); // Extra beat for event bus propagation
// 3.1 Simulate what projectCompressedHistory does with the sync handlers
let currentView = this.contextManager.getNodes();
const currentTokens =
this.env.tokenCalculator.calculateConcreteListTokens(currentView);
if (
this.config.config.budget &&
currentTokens > this.config.config.budget.maxTokens
) {
debugLogger.log(
`[Turn ${this.currentTurnIndex}] Sync panic triggered! ${currentTokens} > ${this.config.config.budget.maxTokens}`,
);
const orchestrator = this.orchestrator;
// In the V2 simulation, we trigger the 'gc_backstop' to simulate emergency pressure.
// Since contextManager owns its buffer natively, the simulation now properly matches reality
// where the manager runs the orchestrator and keeps the resulting modified view.
const modifiedView = await orchestrator.executeTriggerSync(
'gc_backstop',
currentView,
new Set(currentView.map((e) => e.id)),
new Set<string>(),
);
// In the real system, ContextManager triggers this and retains it.
// We will emulate that behavior internally in the test loop for token counting.
currentView = modifiedView;
}
// 4. Measure tokens after background processors have processed inboxes
// 4. Measure tokens after background processors
const tokensAfter = this.env.tokenCalculator.calculateConcreteListTokens(
this.contextManager.getNodes(),
);
debugLogger.log(
`[Turn ${this.currentTurnIndex}] Tokens AFTER: ${tokensAfter}`,
);
this.tokenTrajectory.push({
turnIndex: this.currentTurnIndex++,