diff --git a/docs/context-manager-async-mutations.md b/docs/context-manager-async-mutations.md new file mode 100644 index 0000000000..a2fc5d38f9 --- /dev/null +++ b/docs/context-manager-async-mutations.md @@ -0,0 +1,28 @@ +# Async Context Mutations (V1 Architecture) + +## The Problem +In V0, the \`ContextManager\` processes LLM inputs sequentially and synchronously. Processors like \`NodeTruncation\` can safely mutate the graph because they hold an exclusive lock on the context state. + +However, operations like \`StateSnapshotAsyncProcessor\` take a long time to run (distilling thousands of tokens). If they run synchronously, they block the user from interacting with the agent. If they run asynchronously in the background, by the time they finish, the active context graph has likely moved on (new messages, tool calls, or other truncations have occurred). + +## The V1 Solution: Ancestral Replacement (Optimistic Concurrency) + +To allow async background pipelines to mutate the live context graph safely, we use an Optimistic Concurrency Control mechanism called **Ancestral Replacement**. + +### 1. Proof of Claim +When an \`AsyncContextProcessor\` triggers, it is handed a \`ProcessArgs\` containing a snapshot of the graph (the targets it was asked to process). +The processor records the specific IDs of the \`ConcreteNode\`s it is reading. This is its "Proof of Claim". + +### 2. Background Execution +The processor runs in the background, completely detached from the live graph. It synthesizes a new state (e.g., a summarized snapshot node). + +### 3. The Commit Phase +When the processor finishes, it returns its proposed mutations (an array of new \`ConcreteNode\`s that specify which old nodes they replace via the \`replacesId\` property). + +The Orchestrator then attempts to "rebase" this mutation into the live graph: +1. It looks at the live graph. +2. It checks: *Do all the original nodes (the Proof of Claim) still exist unmodified in the live graph?* +3. **If Yes (Clean Fast-Forward):** The orchestrator deletes the old nodes and inserts the new synthesized nodes. +4. **If No (Conflict):** If *any* of the original nodes were deleted or modified by another processor while the async task was running, the orchestrator rejects the async mutation entirely (or handles it via a conflict resolution strategy). + +This guarantees that async pipelines can never corrupt the context state by overwriting newer information with stale data. diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index 870b4b097d..e5043489fd 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -49,20 +49,10 @@ export interface ContextProcessor { process(args: ProcessArgs): Promise; } -export interface ContextWorker { +export interface AsyncContextProcessor { readonly id: string; readonly name: string; - readonly triggers: { - onNodesAdded?: boolean; - onNodesAgedOut?: boolean; - onInboxTopics?: string[]; - }; - start(): void; - stop(): void; - execute(args: { - targets: readonly ConcreteNode[]; - inbox: InboxSnapshot; - }): Promise; + process(args: ProcessArgs): Promise; } export interface BackstopTargetOptions { diff --git a/packages/core/src/context/processors/stateSnapshotWorker.test.ts b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts similarity index 75% rename from packages/core/src/context/processors/stateSnapshotWorker.test.ts rename to packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts index cd262df2f1..cbc0c6eb9f 100644 --- a/packages/core/src/context/processors/stateSnapshotWorker.test.ts +++ b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.test.ts @@ -4,12 +4,14 @@ * SPDX-License-Identifier: Apache-2.0 */ import { describe, it, expect, vi } from 'vitest'; -import { createStateSnapshotWorker } from './stateSnapshotWorker.js'; +import { createStateSnapshotAsyncProcessor } from './stateSnapshotAsyncProcessor.js'; import { createMockEnvironment, createDummyNode, + createMockProcessArgs, } from '../testing/contextTestUtils.js'; -import { InboxSnapshotImpl } from '../sidecar/inbox.js'; +import type { InboxMessage } from '../pipeline.js'; +import type { InboxSnapshotImpl } from '../sidecar/inbox.js'; describe('StateSnapshotWorker', () => { it('should generate a snapshot and publish it to the inbox', async () => { @@ -17,16 +19,13 @@ describe('StateSnapshotWorker', () => { // Spy on the publish method const publishSpy = vi.spyOn(env.inbox, 'publish'); - const worker = createStateSnapshotWorker('StateSnapshotWorker', env, { type: 'point-in-time' }); - worker.start(); - + const worker = createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, { type: 'point-in-time' }); + const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A'); const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); const targets = [nodeA, nodeB]; - const inbox = new InboxSnapshotImpl([]); - - await worker.execute({ targets, inbox }); + await worker.process(createMockProcessArgs(targets, targets, [])); // Ensure generateContent was called expect(env.llmClient.generateContent).toHaveBeenCalled(); @@ -48,14 +47,12 @@ describe('StateSnapshotWorker', () => { const publishSpy = vi.spyOn(env.inbox, 'publish'); const drainSpy = vi.spyOn(env.inbox, 'drainConsumed'); - const worker = createStateSnapshotWorker('StateSnapshotWorker', env, { type: 'accumulate' }); - worker.start(); - + const worker = createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, { type: 'accumulate' }); + const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); const targets = [nodeC]; - // Simulate an existing accumulate draft in the inbox - const inbox = new InboxSnapshotImpl([ + const inboxMessages: InboxMessage[] = [ { id: 'draft-1', topic: 'PROPOSED_SNAPSHOT', @@ -66,12 +63,14 @@ describe('StateSnapshotWorker', () => { type: 'accumulate', }, }, - ]); + ]; - await worker.execute({ targets, inbox }); + const args = createMockProcessArgs(targets, targets, inboxMessages); + + await worker.process(args); // The old draft should be consumed - expect(inbox.getConsumedIds().has('draft-1')).toBe(true); + expect((args.inbox as InboxSnapshotImpl).getConsumedIds().has('draft-1')).toBe(true); expect(drainSpy).toHaveBeenCalledWith(expect.any(Set)); // The new publish should contain ALL consumed IDs (old + new) @@ -104,10 +103,9 @@ describe('StateSnapshotWorker', () => { it('should ignore empty targets', async () => { const env = createMockEnvironment(); const publishSpy = vi.spyOn(env.inbox, 'publish'); - const worker = createStateSnapshotWorker('StateSnapshotWorker', env, { type: 'accumulate' }); - worker.start(); - - await worker.execute({ targets: [], inbox: new InboxSnapshotImpl([]) }); + const worker = createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, { type: 'accumulate' }); + + await worker.process(createMockProcessArgs([], [], [])); expect(env.llmClient.generateContent).not.toHaveBeenCalled(); expect(publishSpy).not.toHaveBeenCalled(); diff --git a/packages/core/src/context/processors/stateSnapshotWorker.ts b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts similarity index 78% rename from packages/core/src/context/processors/stateSnapshotWorker.ts rename to packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts index bf79bd01b4..88b74bffe8 100644 --- a/packages/core/src/context/processors/stateSnapshotWorker.ts +++ b/packages/core/src/context/processors/stateSnapshotAsyncProcessor.ts @@ -3,35 +3,29 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import type { ContextWorker, InboxSnapshot } from '../pipeline.js'; +import type { AsyncContextProcessor, ProcessArgs } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import type { ConcreteNode } from '../ir/types.js'; import { SnapshotGenerator } from '../utils/snapshotGenerator.js'; import { debugLogger } from '../../utils/debugLogger.js'; -export interface StateSnapshotWorkerOptions { +export interface StateSnapshotAsyncProcessorOptions { type?: 'accumulate' | 'point-in-time'; systemInstruction?: string; } -export function createStateSnapshotWorker( +export function createStateSnapshotAsyncProcessor( id: string, env: ContextEnvironment, - options: StateSnapshotWorkerOptions, -): ContextWorker { + options: StateSnapshotAsyncProcessorOptions, +): AsyncContextProcessor { const generator = new SnapshotGenerator(env); - let isRunning = false; - - const execute = async ({ - targets, - inbox, - }: { - targets: readonly ConcreteNode[]; - inbox: InboxSnapshot; - }): Promise => { - if (!isRunning) return; - if (targets.length === 0) return; + return { + id, + name: 'StateSnapshotAsyncProcessor', + process: async ({ targets, inbox }: ProcessArgs): Promise => { + if (targets.length === 0) return; try { let nodesToSummarize = [...targets]; @@ -97,22 +91,8 @@ export function createStateSnapshotWorker( env.idGenerator, ); } catch (e) { - debugLogger.error('StateSnapshotWorker failed to generate snapshot', e); + debugLogger.error('StateSnapshotAsyncProcessor failed to generate snapshot', e); + } } }; - - return { - id, - name: 'StateSnapshotWorker', - triggers: { - onNodesAgedOut: true, - }, - start: () => { - isRunning = true; - }, - stop: () => { - isRunning = false; - }, - execute, - }; } diff --git a/packages/core/src/context/sidecar/orchestrator.test.ts b/packages/core/src/context/sidecar/orchestrator.test.ts index 0c946e0a20..f341ac2965 100644 --- a/packages/core/src/context/sidecar/orchestrator.test.ts +++ b/packages/core/src/context/sidecar/orchestrator.test.ts @@ -14,11 +14,10 @@ import { import type { ContextEnvironment } from './environment.js'; import type { ContextProcessor, - ContextWorker, - InboxSnapshot, + AsyncContextProcessor, ProcessArgs, } from '../pipeline.js'; -import type { PipelineDef } from './types.js'; +import type { PipelineDef, AsyncPipelineDef } from './types.js'; import type { ContextEventBus } from '../eventBus.js'; import type { ConcreteNode, UserPrompt } from '../ir/types.js'; @@ -61,19 +60,12 @@ function createThrowingProcessor(id: string): ContextProcessor { }; } -// A mock worker that signals it ran -function createMockWorker(id: string, executeSpy: ReturnType): ContextWorker { - let isRunning = false; +// A mock async processor that signals it ran +function createMockAsyncProcessor(id: string, executeSpy: ReturnType): AsyncContextProcessor { return { id, - name: 'MockWorker', - triggers: { - onNodesAdded: true, - }, - start: () => { isRunning = true; }, - stop: () => { isRunning = false; }, - execute: async (args: { targets: readonly ConcreteNode[]; inbox: InboxSnapshot }) => { - if (!isRunning) return; + name: 'MockAsyncProcessor', + process: async (args: ProcessArgs) => { executeSpy(args); } }; @@ -94,11 +86,11 @@ describe('PipelineOrchestrator (Component)', () => { const setupOrchestrator = ( pipelines: PipelineDef[], - workers: ContextWorker[] = [], + asyncPipelines: AsyncPipelineDef[] = [], ) => { const orchestrator = new PipelineOrchestrator( pipelines, - workers, + asyncPipelines, env, eventBus, env.tracer, @@ -189,11 +181,15 @@ describe('PipelineOrchestrator (Component)', () => { }); describe('Asynchronous Worker Events', () => { - it('routes emitChunkReceived to workers with onNodesAdded trigger', async () => { + it('routes emitChunkReceived to async pipelines with nodes_added trigger', async () => { const executeSpy = vi.fn(); - const worker = createMockWorker('MyWorker', executeSpy); + const asyncProcessor = createMockAsyncProcessor('MyWorker', executeSpy); - setupOrchestrator([], [worker]); + setupOrchestrator([], [{ + name: 'TestAsync', + triggers: ['nodes_added'], + processors: [asyncProcessor] + }]); const node1 = createDummyNode('ep1', 'USER_PROMPT', 10); const node2 = createDummyNode('ep1', 'AGENT_THOUGHT', 20); diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 030b32e2c7..2db51e5fa7 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -5,8 +5,7 @@ */ import type { ConcreteNode } from '../ir/types.js'; -import type { ContextWorker } from '../pipeline.js'; -import type { PipelineDef, PipelineTrigger } from './types.js'; +import type { AsyncPipelineDef, PipelineDef, PipelineTrigger } from './types.js'; import type { ContextEnvironment, ContextEventBus, @@ -21,13 +20,12 @@ export class PipelineOrchestrator { constructor( private readonly pipelines: PipelineDef[], - private readonly workers: ContextWorker[], + private readonly asyncPipelines: AsyncPipelineDef[], private readonly env: ContextEnvironment, private readonly eventBus: ContextEventBus, private readonly tracer: ContextTracer, ) { this.setupTriggers(); - this.startWorkers(); } private isNodeAllowed( @@ -42,84 +40,42 @@ export class PipelineOrchestrator { ); } - private startWorkers() { - for (const worker of this.workers) { - try { - worker.start(); - } catch (e) { - debugLogger.error(`Worker ${worker.name} failed to start:`, e); - } - } - } - private setupTriggers() { - // 1. Pipeline Triggers - for (const pipeline of this.pipelines) { - for (const trigger of pipeline.triggers) { - if (typeof trigger === 'object' && trigger.type === 'timer') { - const timer = setInterval(() => { - // Background timers not fully implemented in V1 yet - }, trigger.intervalMs); - this.activeTimers.push(timer); - } else if (trigger === 'retained_exceeded') { - this.eventBus.onConsolidationNeeded((event) => { - void this.executePipelineAsync( - pipeline, - event.nodes, - event.targetNodeIds, - new Set(), // protected IDs - ); - }); - } else if (trigger === 'new_message') { - this.eventBus.onChunkReceived((event) => { - void this.executePipelineAsync( - pipeline, - event.nodes, - event.targetNodeIds, - new Set(), // protected IDs - ); - }); + const bindTriggers = ( + pipelines: PipelineDef[] | AsyncPipelineDef[], + executeFn: (pipeline: PipelineDef | AsyncPipelineDef, nodes: readonly ConcreteNode[], targets: ReadonlySet, protectedIds: ReadonlySet) => void + ) => { + for (const pipeline of pipelines) { + for (const trigger of pipeline.triggers) { + if (typeof trigger === 'object' && trigger.type === 'timer') { + const timer = setInterval(() => { + // Background timers not fully implemented in V1 yet + }, trigger.intervalMs); + this.activeTimers.push(timer); + } else if (trigger === 'retained_exceeded' || trigger === 'nodes_aged_out') { + this.eventBus.onConsolidationNeeded((event) => { + executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); + }); + } else if (trigger === 'new_message' || trigger === 'nodes_added') { + this.eventBus.onChunkReceived((event) => { + executeFn(pipeline, event.nodes, event.targetNodeIds, new Set()); + }); + } } } - } + }; - // 2. Worker Triggers (onNodesAdded / onNodesAgedOut) - this.eventBus.onChunkReceived((event) => { - // Fire all workers that care about new nodes - for (const worker of this.workers) { - if (worker.triggers.onNodesAdded) { - const inboxSnapshot = new InboxSnapshotImpl( - this.env.inbox.getMessages() || [], - ); - const targets = event.nodes.filter((n) => - event.targetNodeIds.has(n.id), - ); - // Fire and forget - worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => { - debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e); - }); - } - } + bindTriggers(this.pipelines, (pipeline, nodes, targets, protectedIds) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + void this.executePipelineAsync(pipeline as PipelineDef, nodes, new Set(targets), new Set(protectedIds)); }); - this.eventBus.onConsolidationNeeded((event) => { - // Fire all workers that care about aged out nodes - for (const worker of this.workers) { - if (worker.triggers.onNodesAgedOut) { - const inboxSnapshot = new InboxSnapshotImpl( - this.env.inbox.getMessages() || [], - ); - const targets = event.nodes.filter((n) => - event.targetNodeIds.has(n.id), - ); - // Fire and forget - worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => { - debugLogger.error( - `Worker ${worker.name} failed onNodesAgedOut:`, - e, - ); - }); - } + bindTriggers(this.asyncPipelines, (pipeline, nodes, targetIds) => { + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []); + const targets = nodes.filter((n) => targetIds.has(n.id)); + for (const processor of pipeline.processors) { + processor.process({ targets, inbox: inboxSnapshot, buffer: ContextWorkingBufferImpl.initialize(nodes) }) + .catch((e: unknown) => debugLogger.error(`AsyncProcessor ${processor.name} failed:`, e)); } }); } @@ -128,13 +84,6 @@ export class PipelineOrchestrator { for (const timer of this.activeTimers) { clearInterval(timer); } - for (const worker of this.workers) { - try { - worker.stop(); - } catch (e) { - debugLogger.error(`Worker ${worker.name} failed to stop:`, e); - } - } } async executeTriggerSync( diff --git a/packages/core/src/context/sidecar/profiles.ts b/packages/core/src/context/sidecar/profiles.ts index a86534bd03..b02951272a 100644 --- a/packages/core/src/context/sidecar/profiles.ts +++ b/packages/core/src/context/sidecar/profiles.ts @@ -6,7 +6,7 @@ import type { SidecarConfig, PipelineDef } from './types.js'; import type { ContextEnvironment } from './environment.js'; -import type { ContextWorker } from '../pipeline.js'; +import type { AsyncPipelineDef } from './types.js'; // Import factories import { createToolMaskingProcessor } from '../processors/toolMaskingProcessor.js'; @@ -14,12 +14,12 @@ import { createBlobDegradationProcessor } from '../processors/blobDegradationPro import { createNodeTruncationProcessor } from '../processors/nodeTruncationProcessor.js'; import { createNodeDistillationProcessor } from '../processors/nodeDistillationProcessor.js'; import { createStateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js'; -import { createStateSnapshotWorker } from '../processors/stateSnapshotWorker.js'; +import { createStateSnapshotAsyncProcessor } from '../processors/stateSnapshotAsyncProcessor.js'; export interface ContextProfile { config: SidecarConfig; buildPipelines: (env: ContextEnvironment, config?: SidecarConfig) => PipelineDef[]; - buildWorkers: (env: ContextEnvironment, config?: SidecarConfig) => ContextWorker[]; + buildAsyncPipelines: (env: ContextEnvironment, config?: SidecarConfig) => AsyncPipelineDef[]; } /** @@ -71,7 +71,7 @@ export const defaultSidecarProfile: ContextProfile = { ]; }, - buildWorkers: (env: ContextEnvironment, config?: SidecarConfig): ContextWorker[] => { + buildAsyncPipelines: (env: ContextEnvironment, config?: SidecarConfig): AsyncPipelineDef[] => { const getOptions = (id: string, defaultOptions: T): T => { if (config?.processorOptions && config.processorOptions[id]) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion @@ -81,7 +81,13 @@ export const defaultSidecarProfile: ContextProfile = { }; return [ - createStateSnapshotWorker('StateSnapshotAsync', env, getOptions('StateSnapshotAsync', { type: 'accumulate' })) + { + name: 'Async Background GC', + triggers: ['nodes_aged_out'], + processors: [ + createStateSnapshotAsyncProcessor('StateSnapshotAsync', env, getOptions('StateSnapshotAsync', { type: 'accumulate' })) + ] + } ]; } }; diff --git a/packages/core/src/context/sidecar/types.ts b/packages/core/src/context/sidecar/types.ts index 5d23688495..1a738bf125 100644 --- a/packages/core/src/context/sidecar/types.ts +++ b/packages/core/src/context/sidecar/types.ts @@ -4,12 +4,14 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { ContextProcessor } from '../pipeline.js'; +import type { ContextProcessor, AsyncContextProcessor } from '../pipeline.js'; export type PipelineTrigger = | 'new_message' | 'retained_exceeded' | 'gc_backstop' + | 'nodes_added' + | 'nodes_aged_out' | { type: 'timer'; intervalMs: number }; export interface PipelineDef { @@ -18,6 +20,12 @@ export interface PipelineDef { processors: ContextProcessor[]; } +export interface AsyncPipelineDef { + name: string; + triggers: PipelineTrigger[]; + processors: AsyncContextProcessor[]; +} + /** * The Data-Driven Schema for the Context Manager. */ diff --git a/packages/core/src/context/system-tests/SimulationHarness.ts b/packages/core/src/context/system-tests/SimulationHarness.ts index 9b6840d54b..dc93d30223 100644 --- a/packages/core/src/context/system-tests/SimulationHarness.ts +++ b/packages/core/src/context/system-tests/SimulationHarness.ts @@ -75,7 +75,7 @@ export class SimulationHarness { this.orchestrator = new PipelineOrchestrator( config.buildPipelines(this.env), - config.buildWorkers(this.env), + config.buildAsyncPipelines(this.env), this.env, this.eventBus, this.tracer, diff --git a/packages/core/src/context/system-tests/lifecycle.golden.test.ts b/packages/core/src/context/system-tests/lifecycle.golden.test.ts index b99138b9b3..a1bc8ea195 100644 --- a/packages/core/src/context/system-tests/lifecycle.golden.test.ts +++ b/packages/core/src/context/system-tests/lifecycle.golden.test.ts @@ -12,7 +12,7 @@ import { createToolMaskingProcessor } from '../processors/toolMaskingProcessor.j import { createBlobDegradationProcessor } from '../processors/blobDegradationProcessor.js'; import { createStateSnapshotProcessor } from '../processors/stateSnapshotProcessor.js'; import { createHistoryTruncationProcessor } from '../processors/historyTruncationProcessor.js'; -import { createStateSnapshotWorker } from '../processors/stateSnapshotWorker.js'; +import { createStateSnapshotAsyncProcessor } from '../processors/stateSnapshotAsyncProcessor.js'; expect.addSnapshotSerializer({ test: (val) => @@ -58,7 +58,11 @@ describe('System Lifecycle Golden Tests', () => { ], }, ], - buildWorkers: (env) => [createStateSnapshotWorker('StateSnapshotWorker', env, {})], + buildAsyncPipelines: (env) => [{ + name: 'Async', + triggers: ['nodes_aged_out'], + processors: [createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, {})] + }], }); const mockLlmClient = createMockLlmClient([ @@ -150,7 +154,7 @@ describe('System Lifecycle Golden Tests', () => { budget: { maxTokens: 100000, retainedTokens: 50000 }, }, buildPipelines: () => [], - buildWorkers: () => [], + buildAsyncPipelines: () => [], }; const harness = await SimulationHarness.create( @@ -182,7 +186,11 @@ describe('System Lifecycle Golden Tests', () => { budget: { maxTokens: 200, retainedTokens: 100 }, }, buildPipelines: () => [], - buildWorkers: (env) => [createStateSnapshotWorker('StateSnapshotWorker', env, {})], + buildAsyncPipelines: (env) => [{ + name: 'Async', + triggers: ['nodes_aged_out'], + processors: [createStateSnapshotAsyncProcessor('StateSnapshotWorker', env, {})] + }], }; const harness = await SimulationHarness.create(gcConfig, mockLlmClient); diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index 85695ba56c..8f177c1f8d 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -267,7 +267,7 @@ export function setupContextComponentTest( const orchestrator = new PipelineOrchestrator( sidecar.buildPipelines(env), - sidecar.buildWorkers(env), + sidecar.buildAsyncPipelines(env), env, eventBus, tracer, diff --git a/packages/core/src/context/testing/testProfile.ts b/packages/core/src/context/testing/testProfile.ts index 31e5955441..694f5e534e 100644 --- a/packages/core/src/context/testing/testProfile.ts +++ b/packages/core/src/context/testing/testProfile.ts @@ -24,5 +24,5 @@ export const testTruncateProfile: ContextProfile = { ], }, ], - buildWorkers: () => [], + buildAsyncPipelines: () => [], };