diff --git a/packages/core/src/context/processors/blobDegradationProcessor.test.ts b/packages/core/src/context/processors/blobDegradationProcessor.test.ts index cfb84e6336..9ebb0bfbc0 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.test.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.test.ts @@ -3,6 +3,8 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + +import assert from 'node:assert'; import { describe, it, expect } from 'vitest'; import { BlobDegradationProcessor } from './blobDegradationProcessor.js'; import { @@ -10,7 +12,7 @@ import { createMockEnvironment, createDummyNode, } from '../testing/contextTestUtils.js'; -import type { UserPrompt, SemanticPart } from '../ir/types.js'; +import type { UserPrompt, SemanticPart, ConcreteNode } from '../ir/types.js'; describe('BlobDegradationProcessor', () => { it('should ignore text parts and only target inline_data and file_data', async () => { @@ -48,8 +50,9 @@ describe('BlobDegradationProcessor', () => { expect(modifiedPrompt.semanticParts[2]).toEqual(parts[2]); // The inline_data part should be replaced with text - const degradedPart = modifiedPrompt.semanticParts[1] as unknown as { type: string, text: string }; + const degradedPart = modifiedPrompt.semanticParts[1]; expect(degradedPart.type).toBe('text'); + assert(degradedPart.type === 'text'); expect(degradedPart.text).toContain('[Multi-Modal Blob (image/png, 0.00MB) degraded to text'); }); @@ -87,7 +90,7 @@ describe('BlobDegradationProcessor', () => { const env = createMockEnvironment(); const processor = BlobDegradationProcessor.create(env, {}); - const targets: Array = []; + const targets: ConcreteNode[] = []; const result = await processor.process(createMockProcessArgs(targets)); diff --git a/packages/core/src/context/processors/nodeDistillationProcessor.test.ts b/packages/core/src/context/processors/nodeDistillationProcessor.test.ts index bc095697f3..aab43b2d61 100644 --- a/packages/core/src/context/processors/nodeDistillationProcessor.test.ts +++ b/packages/core/src/context/processors/nodeDistillationProcessor.test.ts @@ -3,6 +3,8 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + +import assert from 'node:assert'; import { describe, it, expect, vi } from 'vitest'; import { NodeDistillationProcessor } from './nodeDistillationProcessor.js'; import { @@ -56,7 +58,8 @@ describe('NodeDistillationProcessor', () => { const compressedPrompt = result[0] as UserPrompt; expect(compressedPrompt.id).not.toBe(prompt.id); expect(compressedPrompt.semanticParts[0].type).toBe('text'); - expect((compressedPrompt.semanticParts[0] as unknown as {text: string}).text).toBe('Mocked Summary!'); + assert(compressedPrompt.semanticParts[0].type === 'text'); + expect(compressedPrompt.semanticParts[0].text).toBe('Mocked Summary!'); // 2. Agent Thought const compressedThought = result[1] as AgentThought; diff --git a/packages/core/src/context/processors/nodeTruncationProcessor.test.ts b/packages/core/src/context/processors/nodeTruncationProcessor.test.ts index 325163b54d..1573e60cf6 100644 --- a/packages/core/src/context/processors/nodeTruncationProcessor.test.ts +++ b/packages/core/src/context/processors/nodeTruncationProcessor.test.ts @@ -3,6 +3,8 @@ * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ + +import assert from 'node:assert'; import { describe, it, expect } from 'vitest'; import { NodeTruncationProcessor } from './nodeTruncationProcessor.js'; import { @@ -45,7 +47,8 @@ describe('NodeTruncationProcessor', () => { const squashedPrompt = result[0] as UserPrompt; expect(squashedPrompt.id).not.toBe(prompt.id); expect(squashedPrompt.semanticParts[0].type).toBe('text'); - expect((squashedPrompt.semanticParts[0] as unknown as { text: string }).text).toContain('[... OMITTED'); + assert(squashedPrompt.semanticParts[0].type === 'text'); + expect(squashedPrompt.semanticParts[0].text).toContain('[... OMITTED'); // 2. Agent Thought const squashedThought = result[1] as AgentThought; @@ -84,7 +87,8 @@ describe('NodeTruncationProcessor', () => { // 1. User Prompt (untouched) const squashedPrompt = result[0] as UserPrompt; expect(squashedPrompt.id).toBe(prompt.id); - expect((squashedPrompt.semanticParts[0] as unknown as { text: string }).text).not.toContain('[... OMITTED'); + assert(squashedPrompt.semanticParts[0].type === 'text'); + expect(squashedPrompt.semanticParts[0].text).not.toContain('[... OMITTED'); // 2. Agent Thought (untouched) const untouchedThought = result[1] as AgentThought; diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts index d20d6e3e38..414e0f035d 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.test.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.test.ts @@ -8,17 +8,16 @@ import { StateSnapshotProcessor } from './stateSnapshotProcessor.js'; import { createMockEnvironment, createDummyNode, + createMockProcessArgs, } from '../testing/contextTestUtils.js'; -import { InboxSnapshotImpl } from '../sidecar/inbox.js'; +import type { InboxSnapshotImpl } from '../sidecar/inbox.js'; describe('StateSnapshotProcessor', () => { it('should ignore if budget is satisfied', async () => { const env = createMockEnvironment(); const processor = StateSnapshotProcessor.create(env, { target: 'incremental' }); const targets = [createDummyNode('ep1', 'USER_PROMPT')]; - const inbox = new InboxSnapshotImpl([]); - - const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox }); + const result = await processor.process(createMockProcessArgs(targets)); expect(result).toBe(targets); // Strict equality }); @@ -33,7 +32,7 @@ describe('StateSnapshotProcessor', () => { const targets = [nodeA, nodeB, nodeC]; // The background worker created a snapshot of A and B - const inbox = new InboxSnapshotImpl([ + const messages = [ { id: 'msg-1', topic: 'PROPOSED_SNAPSHOT', @@ -44,9 +43,10 @@ describe('StateSnapshotProcessor', () => { type: 'point-in-time', } } - ]); + ]; - const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox }); + const processArgs = createMockProcessArgs(targets, [], messages); + const result = await processor.process(processArgs); // Should remove A and B, insert Snapshot, keep C expect(result.length).toBe(2); @@ -54,7 +54,7 @@ describe('StateSnapshotProcessor', () => { expect(result[1].id).toBe('node-C'); // Should consume the message - expect(inbox.getConsumedIds().has('msg-1')).toBe(true); + expect((processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1')).toBe(true); }); it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => { @@ -66,7 +66,7 @@ describe('StateSnapshotProcessor', () => { const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); const targets = [nodeB]; - const inbox = new InboxSnapshotImpl([ + const messages = [ { id: 'msg-1', topic: 'PROPOSED_SNAPSHOT', @@ -76,14 +76,15 @@ describe('StateSnapshotProcessor', () => { newText: '', } } - ]); + ]; - const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox }); + const processArgs = createMockProcessArgs(targets, [], messages); + const result = await processor.process(processArgs); // Because deficit is 0, and Inbox was rejected, nothing should change expect(result.length).toBe(1); expect(result[0].id).toBe('node-B'); - expect(inbox.getConsumedIds().has('msg-1')).toBe(false); + expect((processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1')).toBe(false); }); it('should fall back to sync backstop if inbox is empty', async () => { @@ -94,9 +95,7 @@ describe('StateSnapshotProcessor', () => { const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B'); const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C'); const targets = [nodeA, nodeB, nodeC]; - const inbox = new InboxSnapshotImpl([]); - - const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox }); + const result = await processor.process(createMockProcessArgs(targets)); // Should synthesize a new snapshot synchronously expect(env.llmClient.generateContent).toHaveBeenCalled(); diff --git a/packages/core/src/context/sidecar/orchestrator.test.ts b/packages/core/src/context/sidecar/orchestrator.test.ts index 8e24446242..041f257954 100644 --- a/packages/core/src/context/sidecar/orchestrator.test.ts +++ b/packages/core/src/context/sidecar/orchestrator.test.ts @@ -4,6 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import assert from 'node:assert'; import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import { PipelineOrchestrator } from './orchestrator.js'; import { SidecarRegistry } from './registry.js'; @@ -12,45 +13,35 @@ import { createDummyNode, } from '../testing/contextTestUtils.js'; import type { ContextEnvironment } from './environment.js'; -import type { ContextProcessor } from '../pipeline.js'; +import type { ContextProcessor, ContextWorker } from '../pipeline.js'; import type { PipelineDef, ProcessorConfig, SidecarConfig } from './types.js'; import type { ContextEventBus } from '../eventBus.js'; +import type { UserPrompt } from '../ir/types.js'; -// Create a Dummy Processor for testing Orchestration routing -class DummySyncProcessor implements ContextProcessor { +// A realistic mock processor that modifies the text of the first target node +class ModifyingProcessor implements ContextProcessor { static create() { - return new DummySyncProcessor(); + return new ModifyingProcessor(); } constructor() {} - readonly name = 'DummySync'; - readonly id = 'DummySync'; + readonly name = 'ModifyingProcessor'; + readonly id = 'ModifyingProcessor'; readonly options = {}; async process(args: import('../pipeline.js').ProcessArgs) { const newTargets = [...args.targets]; - if (newTargets.length > 0) { - newTargets[0] = { ...newTargets[0], dummyModified: true } as unknown as import('../ir/types.js').ConcreteNode; - } - return newTargets; - } -} - -class DummyAsyncProcessor implements ContextProcessor { - static create() { - return new DummyAsyncProcessor(); - } - constructor() {} - readonly name = 'DummyAsync'; - readonly id = 'DummyAsync'; - readonly options = {}; - async process(args: import('../pipeline.js').ProcessArgs) { - const newTargets = [...args.targets]; - if (newTargets.length > 0) { - newTargets[0] = { ...newTargets[0], dummyModified: true } as unknown as import('../ir/types.js').ConcreteNode; + if (newTargets.length > 0 && newTargets[0].type === 'USER_PROMPT') { + const prompt = newTargets[0]; + const newParts = [...prompt.semanticParts]; + if (newParts.length > 0 && newParts[0].type === 'text') { + newParts[0] = { ...newParts[0], text: newParts[0].text + ' [modified]' }; + } + newTargets[0] = { ...prompt, semanticParts: newParts }; } return newTargets; } } +// A processor that just throws an error class ThrowingProcessor implements ContextProcessor { static create() { return new ThrowingProcessor(); @@ -64,6 +55,33 @@ class ThrowingProcessor implements ContextProcessor { } } +// A mock worker that signals it ran +class MockWorker implements ContextWorker { + static create() { + return new MockWorker(); + } + constructor() {} + readonly name = 'MockWorker'; + readonly id = 'MockWorker'; + readonly triggers = { + onNodesAdded: true, + }; + wasExecuted = false; + + async execute(args: { + targets: ReadonlyArray; + inbox: import('../pipeline.js').InboxSnapshot; + }) { + this.wasExecuted = true; + if (args.targets.length > 0 && args.targets[0].type === 'USER_PROMPT') { + const prompt = args.targets[0]; + if (prompt.semanticParts[0].type === 'text') { + args.inbox.consume('test'); + } + } + } +} + describe('PipelineOrchestrator (Component)', () => { let env: ContextEnvironment; let eventBus: ContextEventBus; @@ -75,45 +93,44 @@ describe('PipelineOrchestrator (Component)', () => { eventBus = env.eventBus; registry = new SidecarRegistry(); - // Register our test processors registry.registerProcessor({ - id: 'DummySyncProcessor', + id: 'ModifyingProcessor', schema: {}, - create: () => new DummySyncProcessor(), - }); - registry.registerProcessor({ - id: 'DummyAsyncProcessor', - schema: {}, - create: () => new DummyAsyncProcessor(), + create: () => new ModifyingProcessor(), }); registry.registerProcessor({ id: 'ThrowingProcessor', schema: {}, create: () => new ThrowingProcessor(), }); + registry.registerWorker({ + id: 'MockWorker', + schema: {}, + create: () => new MockWorker(), + }); }); afterEach(() => { - // Cleanup registry to not pollute other tests registry.clear(); }); - const createConfig = (pipelines: PipelineDef[]): SidecarConfig => ({ + const createConfig = (pipelines: PipelineDef[], workers?: Array<{ workerId: string }>): SidecarConfig => ({ budget: { maxTokens: 100, retainedTokens: 50 }, pipelines, + workers, }); - it('instantiates processors from the registry on initialization', () => { + it('instantiates processors and workers from the registry on initialization', () => { const config = createConfig([ { - name: 'ThrowPipe', + name: 'SyncPipe', execution: 'blocking', triggers: ['new_message'], processors: [ - { processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig, + { processorId: 'ModifyingProcessor' } as unknown as ProcessorConfig, ], }, - ]); + ], [{ workerId: 'MockWorker' }]); const orchestrator = new PipelineOrchestrator( config, @@ -122,10 +139,11 @@ describe('PipelineOrchestrator (Component)', () => { env.tracer, registry, ); - expect( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (orchestrator as any).instantiatedProcessors.has('DummySyncProcessor'), - ).toBe(true); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((orchestrator as any).instantiatedProcessors.has('ModifyingProcessor')).toBe(true); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((orchestrator as any).instantiatedWorkers.has('MockWorker')).toBe(true); }); it('throws an error if a config requests an unknown processor', () => { @@ -146,14 +164,14 @@ describe('PipelineOrchestrator (Component)', () => { ).toThrow('Context Processor [DoesNotExist] is not registered.'); }); - it('executes blocking pipelines synchronously and returns the modified array', async () => { + it('executes synchronous routes (executeTriggerSync) and returns modified array', async () => { const config = createConfig([ { name: 'SyncPipe', execution: 'blocking', triggers: ['new_message'], processors: [ - { processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig, + { processorId: 'ModifyingProcessor' } as unknown as ProcessorConfig, ], }, ]); @@ -165,7 +183,9 @@ describe('PipelineOrchestrator (Component)', () => { registry, ); - const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')]; + const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, { + semanticParts: [{ type: 'text', text: 'original text' }] + }, 'not-protected-id')]; const result = await orchestrator.executeTriggerSync( 'new_message', @@ -175,50 +195,12 @@ describe('PipelineOrchestrator (Component)', () => { ); expect(result).toHaveLength(1); - expect( - (result[0] as unknown as { dummyModified?: boolean }).dummyModified, - ).toBe(true); + const modifiedPrompt = result[0] as UserPrompt; + assert(modifiedPrompt.semanticParts[0].type === 'text', 'Expected a text part'); + expect(modifiedPrompt.semanticParts[0].text).toBe('original text [modified]'); }); - it('executes background pipelines asynchronously without blocking the return', async () => { - const config = createConfig([ - { - name: 'AsyncPipe', - execution: 'background', - triggers: ['new_message'], - processors: [ - { processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig, - ], - }, - ]); - const orchestrator = new PipelineOrchestrator( - config, - env, - eventBus, - env.tracer, - registry, - ); - - const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')]; - - // This should resolve immediately with the UNMODIFIED array because execution is background - const result = await orchestrator.executeTriggerSync( - 'new_message', - nodes, - new Set(nodes.map((s) => s.id)), - new Set(), - ); - - expect(result).toHaveLength(1); - expect( - (result[0] as unknown as { asyncModified: unknown }).asyncModified, - ).toBeUndefined(); // Not modified yet! - - // Wait for the background task to complete (50ms delay in DummyAsyncProcessor) - await new Promise((resolve) => setTimeout(resolve, 60)); - }); - - it('gracefully handles and swallows processor errors in synchronous pipelines', async () => { + it('gracefully handles and swallows processor errors in synchronous routes', async () => { const config = createConfig([ { name: 'ThrowPipe', @@ -251,14 +233,14 @@ describe('PipelineOrchestrator (Component)', () => { expect(result).toStrictEqual(nodes); }); - it('automatically binds to retained_exceeded trigger via EventBus', () => { + it('automatically triggers background pipelines via EventBus', () => { const config = createConfig([ { name: 'PressureRelief', execution: 'background', triggers: ['retained_exceeded'], processors: [ - { processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig, + { processorId: 'ModifyingProcessor' } as unknown as ProcessorConfig, ], }, ]); @@ -283,4 +265,29 @@ describe('PipelineOrchestrator (Component)', () => { expect(executeSpy).toHaveBeenCalled(); }); -}); + + it('automatically dispatches workers when matching EventBus events occur', async () => { + const config = createConfig([], [{ workerId: 'MockWorker' }]); + + const orchestrator = new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const workerInstance = (orchestrator as any).instantiatedWorkers.get('MockWorker') as MockWorker; + expect(workerInstance.wasExecuted).toBe(false); + + const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, { + semanticParts: [{ type: 'text', text: 'worker trigger text' }] + }, 'not-protected-id')]; + + // Emit the new_message chunk which maps to onNodesAdded for workers + eventBus.emitChunkReceived({ + nodes, + targetNodeIds: new Set(nodes.map(n => n.id)), + }); + + // Worker execute is fire and forget, so we yield to the event loop + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(workerInstance.wasExecuted).toBe(true); + }); +}); \ No newline at end of file