refactor(context): unify processors and workers into sync/async pipelines

This commit formally unifies the architecture of synchronous and asynchronous context modification.

Key Changes:
- **Unified Triggers:** Triggers are no longer embedded inside individual processors. Instead, they are defined on `PipelineDef` and `AsyncPipelineDef` wrappers.
- **AsyncContextProcessor:** `ContextWorker` has been renamed to `AsyncContextProcessor`. It shares the exact same functional closure pattern as its synchronous sibling: `process(args: ProcessArgs): Promise<void>`.
- **Shared Inbox State:** Both sync and async pipelines now share the exact same `ProcessArgs` interface, which provides unified access to the graph targets, the Working Buffer, and the Inbox.
- **Architecture Doc:** Drafted `docs/context-manager-async-mutations.md` documenting the future V1 Optimistic Concurrency mechanism for async graph mutations.
This commit is contained in:
Your Name
2026-04-10 03:06:43 +00:00
parent be4bee5f1a
commit a52ded7357
12 changed files with 142 additions and 179 deletions
+28
View File
@@ -0,0 +1,28 @@
# Async Context Mutations (V1 Architecture)
## The Problem
In V0, the \`ContextManager\` processes LLM inputs sequentially and synchronously. Processors like \`NodeTruncation\` can safely mutate the graph because they hold an exclusive lock on the context state.
However, operations like \`StateSnapshotAsyncProcessor\` take a long time to run (distilling thousands of tokens). If they run synchronously, they block the user from interacting with the agent. If they run asynchronously in the background, by the time they finish, the active context graph has likely moved on (new messages, tool calls, or other truncations have occurred).
## The V1 Solution: Ancestral Replacement (Optimistic Concurrency)
To allow async background pipelines to mutate the live context graph safely, we use an Optimistic Concurrency Control mechanism called **Ancestral Replacement**.
### 1. Proof of Claim
When an \`AsyncContextProcessor\` triggers, it is handed a \`ProcessArgs\` containing a snapshot of the graph (the targets it was asked to process).
The processor records the specific IDs of the \`ConcreteNode\`s it is reading. This is its "Proof of Claim".
### 2. Background Execution
The processor runs in the background, completely detached from the live graph. It synthesizes a new state (e.g., a summarized snapshot node).
### 3. The Commit Phase
When the processor finishes, it returns its proposed mutations (an array of new \`ConcreteNode\`s that specify which old nodes they replace via the \`replacesId\` property).
The Orchestrator then attempts to "rebase" this mutation into the live graph:
1. It looks at the live graph.
2. It checks: *Do all the original nodes (the Proof of Claim) still exist unmodified in the live graph?*
3. **If Yes (Clean Fast-Forward):** The orchestrator deletes the old nodes and inserts the new synthesized nodes.
4. **If No (Conflict):** If *any* of the original nodes were deleted or modified by another processor while the async task was running, the orchestrator rejects the async mutation entirely (or handles it via a conflict resolution strategy).
This guarantees that async pipelines can never corrupt the context state by overwriting newer information with stale data.
+2 -12
View File
@@ -49,20 +49,10 @@ export interface ContextProcessor {
process(args: ProcessArgs): Promise<readonly ConcreteNode[]>;
}
export interface ContextWorker {
export interface AsyncContextProcessor {
readonly id: string;
readonly name: string;
readonly triggers: {
onNodesAdded?: boolean;
onNodesAgedOut?: boolean;
onInboxTopics?: string[];
};
start(): void;
stop(): void;
execute(args: {
targets: readonly ConcreteNode[];
inbox: InboxSnapshot;
}): Promise<void>;
process(args: ProcessArgs): Promise<void>;
}
export interface BackstopTargetOptions {
@@ -4,12 +4,14 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi } from 'vitest';
import { createStateSnapshotWorker } from './stateSnapshotWorker.js';
import { createStateSnapshotAsyncProcessor } from './stateSnapshotAsyncProcessor.js';
import {
createMockEnvironment,
createDummyNode,
createMockProcessArgs,
} from '../testing/contextTestUtils.js';
import { InboxSnapshotImpl } from '../sidecar/inbox.js';
import type { InboxMessage } from '../pipeline.js';
import type { InboxSnapshotImpl } from '../sidecar/inbox.js';
describe('StateSnapshotWorker', () => {
it('should generate a snapshot and publish it to the inbox', async () => {
@@ -17,16 +19,13 @@ describe('StateSnapshotWorker', () => {
// Spy on the publish method
const publishSpy = vi.spyOn(env.inbox, 'publish');
const worker = createStateSnapshotWorker('StateSnapshotWorker', env, { type: 'point-in-time' });
worker.start();
const worker = createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, { type: 'point-in-time' });
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const targets = [nodeA, nodeB];
const inbox = new InboxSnapshotImpl([]);
await worker.execute({ targets, inbox });
await worker.process(createMockProcessArgs(targets, targets, []));
// Ensure generateContent was called
expect(env.llmClient.generateContent).toHaveBeenCalled();
@@ -48,14 +47,12 @@ describe('StateSnapshotWorker', () => {
const publishSpy = vi.spyOn(env.inbox, 'publish');
const drainSpy = vi.spyOn(env.inbox, 'drainConsumed');
const worker = createStateSnapshotWorker('StateSnapshotWorker', env, { type: 'accumulate' });
worker.start();
const worker = createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, { type: 'accumulate' });
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeC];
// Simulate an existing accumulate draft in the inbox
const inbox = new InboxSnapshotImpl([
const inboxMessages: InboxMessage[] = [
{
id: 'draft-1',
topic: 'PROPOSED_SNAPSHOT',
@@ -66,12 +63,14 @@ describe('StateSnapshotWorker', () => {
type: 'accumulate',
},
},
]);
];
await worker.execute({ targets, inbox });
const args = createMockProcessArgs(targets, targets, inboxMessages);
await worker.process(args);
// The old draft should be consumed
expect(inbox.getConsumedIds().has('draft-1')).toBe(true);
expect((args.inbox as InboxSnapshotImpl).getConsumedIds().has('draft-1')).toBe(true);
expect(drainSpy).toHaveBeenCalledWith(expect.any(Set));
// The new publish should contain ALL consumed IDs (old + new)
@@ -104,10 +103,9 @@ describe('StateSnapshotWorker', () => {
it('should ignore empty targets', async () => {
const env = createMockEnvironment();
const publishSpy = vi.spyOn(env.inbox, 'publish');
const worker = createStateSnapshotWorker('StateSnapshotWorker', env, { type: 'accumulate' });
worker.start();
await worker.execute({ targets: [], inbox: new InboxSnapshotImpl([]) });
const worker = createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, { type: 'accumulate' });
await worker.process(createMockProcessArgs([], [], []));
expect(env.llmClient.generateContent).not.toHaveBeenCalled();
expect(publishSpy).not.toHaveBeenCalled();
@@ -3,35 +3,29 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextWorker, InboxSnapshot } from '../pipeline.js';
import type { AsyncContextProcessor, ProcessArgs } from '../pipeline.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { ConcreteNode } from '../ir/types.js';
import { SnapshotGenerator } from '../utils/snapshotGenerator.js';
import { debugLogger } from '../../utils/debugLogger.js';
export interface StateSnapshotWorkerOptions {
export interface StateSnapshotAsyncProcessorOptions {
type?: 'accumulate' | 'point-in-time';
systemInstruction?: string;
}
export function createStateSnapshotWorker(
export function createStateSnapshotAsyncProcessor(
id: string,
env: ContextEnvironment,
options: StateSnapshotWorkerOptions,
): ContextWorker {
options: StateSnapshotAsyncProcessorOptions,
): AsyncContextProcessor {
const generator = new SnapshotGenerator(env);
let isRunning = false;
const execute = async ({
targets,
inbox,
}: {
targets: readonly ConcreteNode[];
inbox: InboxSnapshot;
}): Promise<void> => {
if (!isRunning) return;
if (targets.length === 0) return;
return {
id,
name: 'StateSnapshotAsyncProcessor',
process: async ({ targets, inbox }: ProcessArgs): Promise<void> => {
if (targets.length === 0) return;
try {
let nodesToSummarize = [...targets];
@@ -97,22 +91,8 @@ export function createStateSnapshotWorker(
env.idGenerator,
);
} catch (e) {
debugLogger.error('StateSnapshotWorker failed to generate snapshot', e);
debugLogger.error('StateSnapshotAsyncProcessor failed to generate snapshot', e);
}
}
};
return {
id,
name: 'StateSnapshotWorker',
triggers: {
onNodesAgedOut: true,
},
start: () => {
isRunning = true;
},
stop: () => {
isRunning = false;
},
execute,
};
}
@@ -14,11 +14,10 @@ import {
import type { ContextEnvironment } from './environment.js';
import type {
ContextProcessor,
ContextWorker,
InboxSnapshot,
AsyncContextProcessor,
ProcessArgs,
} from '../pipeline.js';
import type { PipelineDef } from './types.js';
import type { PipelineDef, AsyncPipelineDef } from './types.js';
import type { ContextEventBus } from '../eventBus.js';
import type { ConcreteNode, UserPrompt } from '../ir/types.js';
@@ -61,19 +60,12 @@ function createThrowingProcessor(id: string): ContextProcessor {
};
}
// A mock worker that signals it ran
function createMockWorker(id: string, executeSpy: ReturnType<typeof vi.fn>): ContextWorker {
let isRunning = false;
// A mock async processor that signals it ran
function createMockAsyncProcessor(id: string, executeSpy: ReturnType<typeof vi.fn>): AsyncContextProcessor {
return {
id,
name: 'MockWorker',
triggers: {
onNodesAdded: true,
},
start: () => { isRunning = true; },
stop: () => { isRunning = false; },
execute: async (args: { targets: readonly ConcreteNode[]; inbox: InboxSnapshot }) => {
if (!isRunning) return;
name: 'MockAsyncProcessor',
process: async (args: ProcessArgs) => {
executeSpy(args);
}
};
@@ -94,11 +86,11 @@ describe('PipelineOrchestrator (Component)', () => {
const setupOrchestrator = (
pipelines: PipelineDef[],
workers: ContextWorker[] = [],
asyncPipelines: AsyncPipelineDef[] = [],
) => {
const orchestrator = new PipelineOrchestrator(
pipelines,
workers,
asyncPipelines,
env,
eventBus,
env.tracer,
@@ -189,11 +181,15 @@ describe('PipelineOrchestrator (Component)', () => {
});
describe('Asynchronous Worker Events', () => {
it('routes emitChunkReceived to workers with onNodesAdded trigger', async () => {
it('routes emitChunkReceived to async pipelines with nodes_added trigger', async () => {
const executeSpy = vi.fn();
const worker = createMockWorker('MyWorker', executeSpy);
const asyncProcessor = createMockAsyncProcessor('MyWorker', executeSpy);
setupOrchestrator([], [worker]);
setupOrchestrator([], [{
name: 'TestAsync',
triggers: ['nodes_added'],
processors: [asyncProcessor]
}]);
const node1 = createDummyNode('ep1', 'USER_PROMPT', 10);
const node2 = createDummyNode('ep1', 'AGENT_THOUGHT', 20);
@@ -5,8 +5,7 @@
*/
import type { ConcreteNode } from '../ir/types.js';
import type { ContextWorker } from '../pipeline.js';
import type { PipelineDef, PipelineTrigger } from './types.js';
import type { AsyncPipelineDef, PipelineDef, PipelineTrigger } from './types.js';
import type {
ContextEnvironment,
ContextEventBus,
@@ -21,13 +20,12 @@ export class PipelineOrchestrator {
constructor(
private readonly pipelines: PipelineDef[],
private readonly workers: ContextWorker[],
private readonly asyncPipelines: AsyncPipelineDef[],
private readonly env: ContextEnvironment,
private readonly eventBus: ContextEventBus,
private readonly tracer: ContextTracer,
) {
this.setupTriggers();
this.startWorkers();
}
private isNodeAllowed(
@@ -42,84 +40,42 @@ export class PipelineOrchestrator {
);
}
private startWorkers() {
for (const worker of this.workers) {
try {
worker.start();
} catch (e) {
debugLogger.error(`Worker ${worker.name} failed to start:`, e);
}
}
}
private setupTriggers() {
// 1. Pipeline Triggers
for (const pipeline of this.pipelines) {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {
// Background timers not fully implemented in V1 yet
}, trigger.intervalMs);
this.activeTimers.push(timer);
} else if (trigger === 'retained_exceeded') {
this.eventBus.onConsolidationNeeded((event) => {
void this.executePipelineAsync(
pipeline,
event.nodes,
event.targetNodeIds,
new Set(), // protected IDs
);
});
} else if (trigger === 'new_message') {
this.eventBus.onChunkReceived((event) => {
void this.executePipelineAsync(
pipeline,
event.nodes,
event.targetNodeIds,
new Set(), // protected IDs
);
});
const bindTriggers = (
pipelines: PipelineDef[] | AsyncPipelineDef[],
executeFn: (pipeline: PipelineDef | AsyncPipelineDef, nodes: readonly ConcreteNode[], targets: ReadonlySet<string>, protectedIds: ReadonlySet<string>) => void
) => {
for (const pipeline of pipelines) {
for (const trigger of pipeline.triggers) {
if (typeof trigger === 'object' && trigger.type === 'timer') {
const timer = setInterval(() => {
// Background timers not fully implemented in V1 yet
}, trigger.intervalMs);
this.activeTimers.push(timer);
} else if (trigger === 'retained_exceeded' || trigger === 'nodes_aged_out') {
this.eventBus.onConsolidationNeeded((event) => {
executeFn(pipeline, event.nodes, event.targetNodeIds, new Set());
});
} else if (trigger === 'new_message' || trigger === 'nodes_added') {
this.eventBus.onChunkReceived((event) => {
executeFn(pipeline, event.nodes, event.targetNodeIds, new Set());
});
}
}
}
}
};
// 2. Worker Triggers (onNodesAdded / onNodesAgedOut)
this.eventBus.onChunkReceived((event) => {
// Fire all workers that care about new nodes
for (const worker of this.workers) {
if (worker.triggers.onNodesAdded) {
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
const targets = event.nodes.filter((n) =>
event.targetNodeIds.has(n.id),
);
// Fire and forget
worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => {
debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e);
});
}
}
bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
void this.executePipelineAsync(pipeline as PipelineDef, nodes, new Set(targets), new Set(protectedIds));
});
this.eventBus.onConsolidationNeeded((event) => {
// Fire all workers that care about aged out nodes
for (const worker of this.workers) {
if (worker.triggers.onNodesAgedOut) {
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox.getMessages() || [],
);
const targets = event.nodes.filter((n) =>
event.targetNodeIds.has(n.id),
);
// Fire and forget
worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => {
debugLogger.error(
`Worker ${worker.name} failed onNodesAgedOut:`,
e,
);
});
}
bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => {
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
const targets = nodes.filter((n) => targetIds.has(n.id));
for (const processor of pipeline.processors) {
processor.process({ targets, inbox: inboxSnapshot, buffer: ContextWorkingBufferImpl.initialize(nodes) })
.catch((e: unknown) => debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e));
}
});
}
@@ -128,13 +84,6 @@ export class PipelineOrchestrator {
for (const timer of this.activeTimers) {
clearInterval(timer);
}
for (const worker of this.workers) {
try {
worker.stop();
} catch (e) {
debugLogger.error(`Worker ${worker.name} failed to stop:`, e);
}
}
}
async executeTriggerSync(
+11 -5
View File
@@ -6,7 +6,7 @@
import type { SidecarConfig, PipelineDef } from './types.js';
import type { ContextEnvironment } from './environment.js';
import type { ContextWorker } from '../pipeline.js';
import type { AsyncPipelineDef } from './types.js';
// Import factories
import { createToolMaskingProcessor } from '../processors/toolMaskingProcessor.js';
@@ -14,12 +14,12 @@ import { createBlobDegradationProcessor } from '../processors/blobDegradationPro
import { createNodeTruncationProcessor } from '../processors/nodeTruncationProcessor.js';
import { createNodeDistillationProcessor } from '../processors/nodeDistillationProcessor.js';
import { createStateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js';
import { createStateSnapshotWorker } from '../processors/stateSnapshotWorker.js';
import { createStateSnapshotAsyncProcessor } from '../processors/stateSnapshotAsyncProcessor.js';
export interface ContextProfile {
config: SidecarConfig;
buildPipelines: (env: ContextEnvironment, config?: SidecarConfig) => PipelineDef[];
buildWorkers: (env: ContextEnvironment, config?: SidecarConfig) => ContextWorker[];
buildAsyncPipelines: (env: ContextEnvironment, config?: SidecarConfig) => AsyncPipelineDef[];
}
/**
@@ -71,7 +71,7 @@ export const defaultSidecarProfile: ContextProfile = {
];
},
buildWorkers: (env: ContextEnvironment, config?: SidecarConfig): ContextWorker[] => {
buildAsyncPipelines: (env: ContextEnvironment, config?: SidecarConfig): AsyncPipelineDef[] => {
const getOptions = <T>(id: string, defaultOptions: T): T => {
if (config?.processorOptions && config.processorOptions[id]) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
@@ -81,7 +81,13 @@ export const defaultSidecarProfile: ContextProfile = {
};
return [
createStateSnapshotWorker('StateSnapshotAsync', env, getOptions('StateSnapshotAsync', { type: 'accumulate' }))
{
name: 'Async Background GC',
triggers: ['nodes_aged_out'],
processors: [
createStateSnapshotAsyncProcessor('StateSnapshotAsync', env, getOptions('StateSnapshotAsync', { type: 'accumulate' }))
]
}
];
}
};
+9 -1
View File
@@ -4,12 +4,14 @@
* SPDX-License-Identifier: Apache-2.0
*/
import type { ContextProcessor } from '../pipeline.js';
import type { ContextProcessor, AsyncContextProcessor } from '../pipeline.js';
export type PipelineTrigger =
| 'new_message'
| 'retained_exceeded'
| 'gc_backstop'
| 'nodes_added'
| 'nodes_aged_out'
| { type: 'timer'; intervalMs: number };
export interface PipelineDef {
@@ -18,6 +20,12 @@ export interface PipelineDef {
processors: ContextProcessor[];
}
export interface AsyncPipelineDef {
name: string;
triggers: PipelineTrigger[];
processors: AsyncContextProcessor[];
}
/**
* The Data-Driven Schema for the Context Manager.
*/
@@ -75,7 +75,7 @@ export class SimulationHarness {
this.orchestrator = new PipelineOrchestrator(
config.buildPipelines(this.env),
config.buildWorkers(this.env),
config.buildAsyncPipelines(this.env),
this.env,
this.eventBus,
this.tracer,
@@ -12,7 +12,7 @@ import { createToolMaskingProcessor } from '../processors/toolMaskingProcessor.j
import { createBlobDegradationProcessor } from '../processors/blobDegradationProcessor.js';
import { createStateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js';
import { createHistoryTruncationProcessor } from '../processors/historyTruncationProcessor.js';
import { createStateSnapshotWorker } from '../processors/stateSnapshotWorker.js';
import { createStateSnapshotAsyncProcessor } from '../processors/stateSnapshotAsyncProcessor.js';
expect.addSnapshotSerializer({
test: (val) =>
@@ -58,7 +58,11 @@ describe('System Lifecycle Golden Tests', () => {
],
},
],
buildWorkers: (env) => [createStateSnapshotWorker('StateSnapshotWorker', env, {})],
buildAsyncPipelines: (env) => [{
name: 'Async',
triggers: ['nodes_aged_out'],
processors: [createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, {})]
}],
});
const mockLlmClient = createMockLlmClient([
@@ -150,7 +154,7 @@ describe('System Lifecycle Golden Tests', () => {
budget: { maxTokens: 100000, retainedTokens: 50000 },
},
buildPipelines: () => [],
buildWorkers: () => [],
buildAsyncPipelines: () => [],
};
const harness = await SimulationHarness.create(
@@ -182,7 +186,11 @@ describe('System Lifecycle Golden Tests', () => {
budget: { maxTokens: 200, retainedTokens: 100 },
},
buildPipelines: () => [],
buildWorkers: (env) => [createStateSnapshotWorker('StateSnapshotWorker', env, {})],
buildAsyncPipelines: (env) => [{
name: 'Async',
triggers: ['nodes_aged_out'],
processors: [createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, {})]
}],
};
const harness = await SimulationHarness.create(gcConfig, mockLlmClient);
@@ -267,7 +267,7 @@ export function setupContextComponentTest(
const orchestrator = new PipelineOrchestrator(
sidecar.buildPipelines(env),
sidecar.buildWorkers(env),
sidecar.buildAsyncPipelines(env),
env,
eventBus,
tracer,
@@ -24,5 +24,5 @@ export const testTruncateProfile: ContextProfile = {
],
},
],
buildWorkers: () => [],
buildAsyncPipelines: () => [],
};