next batch

This commit is contained in:
Your Name
2026-04-09 16:49:30 +00:00
parent 10ef9a6876
commit 9bd9c0f72d
5 changed files with 128 additions and 112 deletions
@@ -3,6 +3,8 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import assert from 'node:assert';
import { describe, it, expect } from 'vitest';
import { BlobDegradationProcessor } from './blobDegradationProcessor.js';
import {
@@ -10,7 +12,7 @@ import {
createMockEnvironment,
createDummyNode,
} from '../testing/contextTestUtils.js';
import type { UserPrompt, SemanticPart } from '../ir/types.js';
import type { UserPrompt, SemanticPart, ConcreteNode } from '../ir/types.js';
describe('BlobDegradationProcessor', () => {
it('should ignore text parts and only target inline_data and file_data', async () => {
@@ -48,8 +50,9 @@ describe('BlobDegradationProcessor', () => {
expect(modifiedPrompt.semanticParts[2]).toEqual(parts[2]);
// The inline_data part should be replaced with text
const degradedPart = modifiedPrompt.semanticParts[1] as unknown as { type: string, text: string };
const degradedPart = modifiedPrompt.semanticParts[1];
expect(degradedPart.type).toBe('text');
assert(degradedPart.type === 'text');
expect(degradedPart.text).toContain('[Multi-Modal Blob (image/png, 0.00MB) degraded to text');
});
@@ -87,7 +90,7 @@ describe('BlobDegradationProcessor', () => {
const env = createMockEnvironment();
const processor = BlobDegradationProcessor.create(env, {});
const targets: Array<import('../ir/types.js').ConcreteNode> = [];
const targets: ConcreteNode[] = [];
const result = await processor.process(createMockProcessArgs(targets));
@@ -3,6 +3,8 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import assert from 'node:assert';
import { describe, it, expect, vi } from 'vitest';
import { NodeDistillationProcessor } from './nodeDistillationProcessor.js';
import {
@@ -56,7 +58,8 @@ describe('NodeDistillationProcessor', () => {
const compressedPrompt = result[0] as UserPrompt;
expect(compressedPrompt.id).not.toBe(prompt.id);
expect(compressedPrompt.semanticParts[0].type).toBe('text');
expect((compressedPrompt.semanticParts[0] as unknown as {text: string}).text).toBe('Mocked Summary!');
assert(compressedPrompt.semanticParts[0].type === 'text');
expect(compressedPrompt.semanticParts[0].text).toBe('Mocked Summary!');
// 2. Agent Thought
const compressedThought = result[1] as AgentThought;
@@ -3,6 +3,8 @@
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import assert from 'node:assert';
import { describe, it, expect } from 'vitest';
import { NodeTruncationProcessor } from './nodeTruncationProcessor.js';
import {
@@ -45,7 +47,8 @@ describe('NodeTruncationProcessor', () => {
const squashedPrompt = result[0] as UserPrompt;
expect(squashedPrompt.id).not.toBe(prompt.id);
expect(squashedPrompt.semanticParts[0].type).toBe('text');
expect((squashedPrompt.semanticParts[0] as unknown as { text: string }).text).toContain('[... OMITTED');
assert(squashedPrompt.semanticParts[0].type === 'text');
expect(squashedPrompt.semanticParts[0].text).toContain('[... OMITTED');
// 2. Agent Thought
const squashedThought = result[1] as AgentThought;
@@ -84,7 +87,8 @@ describe('NodeTruncationProcessor', () => {
// 1. User Prompt (untouched)
const squashedPrompt = result[0] as UserPrompt;
expect(squashedPrompt.id).toBe(prompt.id);
expect((squashedPrompt.semanticParts[0] as unknown as { text: string }).text).not.toContain('[... OMITTED');
assert(squashedPrompt.semanticParts[0].type === 'text');
expect(squashedPrompt.semanticParts[0].text).not.toContain('[... OMITTED');
// 2. Agent Thought (untouched)
const untouchedThought = result[1] as AgentThought;
@@ -8,17 +8,16 @@ import { StateSnapshotProcessor } from './stateSnapshotProcessor.js';
import {
createMockEnvironment,
createDummyNode,
createMockProcessArgs,
} from '../testing/contextTestUtils.js';
import { InboxSnapshotImpl } from '../sidecar/inbox.js';
import type { InboxSnapshotImpl } from '../sidecar/inbox.js';
describe('StateSnapshotProcessor', () => {
it('should ignore if budget is satisfied', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
const targets = [createDummyNode('ep1', 'USER_PROMPT')];
const inbox = new InboxSnapshotImpl([]);
const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
const result = await processor.process(createMockProcessArgs(targets));
expect(result).toBe(targets); // Strict equality
});
@@ -33,7 +32,7 @@ describe('StateSnapshotProcessor', () => {
const targets = [nodeA, nodeB, nodeC];
// The background worker created a snapshot of A and B
const inbox = new InboxSnapshotImpl([
const messages = [
{
id: 'msg-1',
topic: 'PROPOSED_SNAPSHOT',
@@ -44,9 +43,10 @@ describe('StateSnapshotProcessor', () => {
type: 'point-in-time',
}
}
]);
];
const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
const processArgs = createMockProcessArgs(targets, [], messages);
const result = await processor.process(processArgs);
// Should remove A and B, insert Snapshot, keep C
expect(result.length).toBe(2);
@@ -54,7 +54,7 @@ describe('StateSnapshotProcessor', () => {
expect(result[1].id).toBe('node-C');
// Should consume the message
expect(inbox.getConsumedIds().has('msg-1')).toBe(true);
expect((processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1')).toBe(true);
});
it('should reject a snapshot if the nodes were modified/deleted (Cache Invalidated)', async () => {
@@ -66,7 +66,7 @@ describe('StateSnapshotProcessor', () => {
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const targets = [nodeB];
const inbox = new InboxSnapshotImpl([
const messages = [
{
id: 'msg-1',
topic: 'PROPOSED_SNAPSHOT',
@@ -76,14 +76,15 @@ describe('StateSnapshotProcessor', () => {
newText: '<compressed A and B>',
}
}
]);
];
const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
const processArgs = createMockProcessArgs(targets, [], messages);
const result = await processor.process(processArgs);
// Because deficit is 0, and Inbox was rejected, nothing should change
expect(result.length).toBe(1);
expect(result[0].id).toBe('node-B');
expect(inbox.getConsumedIds().has('msg-1')).toBe(false);
expect((processArgs.inbox as InboxSnapshotImpl).getConsumedIds().has('msg-1')).toBe(false);
});
it('should fall back to sync backstop if inbox is empty', async () => {
@@ -94,9 +95,7 @@ describe('StateSnapshotProcessor', () => {
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
const nodeC = createDummyNode('ep2', 'USER_PROMPT', 50, {}, 'node-C');
const targets = [nodeA, nodeB, nodeC];
const inbox = new InboxSnapshotImpl([]);
const result = await processor.process({ buffer: undefined as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
const result = await processor.process(createMockProcessArgs(targets));
// Should synthesize a new snapshot synchronously
expect(env.llmClient.generateContent).toHaveBeenCalled();
@@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
import assert from 'node:assert';
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { PipelineOrchestrator } from './orchestrator.js';
import { SidecarRegistry } from './registry.js';
@@ -12,45 +13,35 @@ import {
createDummyNode,
} from '../testing/contextTestUtils.js';
import type { ContextEnvironment } from './environment.js';
import type { ContextProcessor } from '../pipeline.js';
import type { ContextProcessor, ContextWorker } from '../pipeline.js';
import type { PipelineDef, ProcessorConfig, SidecarConfig } from './types.js';
import type { ContextEventBus } from '../eventBus.js';
import type { UserPrompt } from '../ir/types.js';
// Create a Dummy Processor for testing Orchestration routing
class DummySyncProcessor implements ContextProcessor {
// A realistic mock processor that modifies the text of the first target node
class ModifyingProcessor implements ContextProcessor {
static create() {
return new DummySyncProcessor();
return new ModifyingProcessor();
}
constructor() {}
readonly name = 'DummySync';
readonly id = 'DummySync';
readonly name = 'ModifyingProcessor';
readonly id = 'ModifyingProcessor';
readonly options = {};
async process(args: import('../pipeline.js').ProcessArgs) {
const newTargets = [...args.targets];
if (newTargets.length > 0) {
newTargets[0] = { ...newTargets[0], dummyModified: true } as unknown as import('../ir/types.js').ConcreteNode;
}
return newTargets;
}
}
class DummyAsyncProcessor implements ContextProcessor {
static create() {
return new DummyAsyncProcessor();
}
constructor() {}
readonly name = 'DummyAsync';
readonly id = 'DummyAsync';
readonly options = {};
async process(args: import('../pipeline.js').ProcessArgs) {
const newTargets = [...args.targets];
if (newTargets.length > 0) {
newTargets[0] = { ...newTargets[0], dummyModified: true } as unknown as import('../ir/types.js').ConcreteNode;
if (newTargets.length > 0 && newTargets[0].type === 'USER_PROMPT') {
const prompt = newTargets[0];
const newParts = [...prompt.semanticParts];
if (newParts.length > 0 && newParts[0].type === 'text') {
newParts[0] = { ...newParts[0], text: newParts[0].text + ' [modified]' };
}
newTargets[0] = { ...prompt, semanticParts: newParts };
}
return newTargets;
}
}
// A processor that just throws an error
class ThrowingProcessor implements ContextProcessor {
static create() {
return new ThrowingProcessor();
@@ -64,6 +55,33 @@ class ThrowingProcessor implements ContextProcessor {
}
}
// A mock worker that signals it ran
class MockWorker implements ContextWorker {
static create() {
return new MockWorker();
}
constructor() {}
readonly name = 'MockWorker';
readonly id = 'MockWorker';
readonly triggers = {
onNodesAdded: true,
};
wasExecuted = false;
async execute(args: {
targets: ReadonlyArray<import('../ir/types.js').ConcreteNode>;
inbox: import('../pipeline.js').InboxSnapshot;
}) {
this.wasExecuted = true;
if (args.targets.length > 0 && args.targets[0].type === 'USER_PROMPT') {
const prompt = args.targets[0];
if (prompt.semanticParts[0].type === 'text') {
args.inbox.consume('test');
}
}
}
}
describe('PipelineOrchestrator (Component)', () => {
let env: ContextEnvironment;
let eventBus: ContextEventBus;
@@ -75,45 +93,44 @@ describe('PipelineOrchestrator (Component)', () => {
eventBus = env.eventBus;
registry = new SidecarRegistry();
// Register our test processors
registry.registerProcessor({
id: 'DummySyncProcessor',
id: 'ModifyingProcessor',
schema: {},
create: () => new DummySyncProcessor(),
});
registry.registerProcessor({
id: 'DummyAsyncProcessor',
schema: {},
create: () => new DummyAsyncProcessor(),
create: () => new ModifyingProcessor(),
});
registry.registerProcessor({
id: 'ThrowingProcessor',
schema: {},
create: () => new ThrowingProcessor(),
});
registry.registerWorker({
id: 'MockWorker',
schema: {},
create: () => new MockWorker(),
});
});
afterEach(() => {
// Cleanup registry to not pollute other tests
registry.clear();
});
const createConfig = (pipelines: PipelineDef[]): SidecarConfig => ({
const createConfig = (pipelines: PipelineDef[], workers?: Array<{ workerId: string }>): SidecarConfig => ({
budget: { maxTokens: 100, retainedTokens: 50 },
pipelines,
workers,
});
it('instantiates processors from the registry on initialization', () => {
it('instantiates processors and workers from the registry on initialization', () => {
const config = createConfig([
{
name: 'ThrowPipe',
name: 'SyncPipe',
execution: 'blocking',
triggers: ['new_message'],
processors: [
{ processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig,
{ processorId: 'ModifyingProcessor' } as unknown as ProcessorConfig,
],
},
]);
], [{ workerId: 'MockWorker' }]);
const orchestrator = new PipelineOrchestrator(
config,
@@ -122,10 +139,11 @@ describe('PipelineOrchestrator (Component)', () => {
env.tracer,
registry,
);
expect(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(orchestrator as any).instantiatedProcessors.has('DummySyncProcessor'),
).toBe(true);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((orchestrator as any).instantiatedProcessors.has('ModifyingProcessor')).toBe(true);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((orchestrator as any).instantiatedWorkers.has('MockWorker')).toBe(true);
});
it('throws an error if a config requests an unknown processor', () => {
@@ -146,14 +164,14 @@ describe('PipelineOrchestrator (Component)', () => {
).toThrow('Context Processor [DoesNotExist] is not registered.');
});
it('executes blocking pipelines synchronously and returns the modified array', async () => {
it('executes synchronous routes (executeTriggerSync) and returns modified array', async () => {
const config = createConfig([
{
name: 'SyncPipe',
execution: 'blocking',
triggers: ['new_message'],
processors: [
{ processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig,
{ processorId: 'ModifyingProcessor' } as unknown as ProcessorConfig,
],
},
]);
@@ -165,7 +183,9 @@ describe('PipelineOrchestrator (Component)', () => {
registry,
);
const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, {
semanticParts: [{ type: 'text', text: 'original text' }]
}, 'not-protected-id')];
const result = await orchestrator.executeTriggerSync(
'new_message',
@@ -175,50 +195,12 @@ describe('PipelineOrchestrator (Component)', () => {
);
expect(result).toHaveLength(1);
expect(
(result[0] as unknown as { dummyModified?: boolean }).dummyModified,
).toBe(true);
const modifiedPrompt = result[0] as UserPrompt;
assert(modifiedPrompt.semanticParts[0].type === 'text', 'Expected a text part');
expect(modifiedPrompt.semanticParts[0].text).toBe('original text [modified]');
});
it('executes background pipelines asynchronously without blocking the return', async () => {
const config = createConfig([
{
name: 'AsyncPipe',
execution: 'background',
triggers: ['new_message'],
processors: [
{ processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig,
],
},
]);
const orchestrator = new PipelineOrchestrator(
config,
env,
eventBus,
env.tracer,
registry,
);
const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
// This should resolve immediately with the UNMODIFIED array because execution is background
const result = await orchestrator.executeTriggerSync(
'new_message',
nodes,
new Set(nodes.map((s) => s.id)),
new Set(),
);
expect(result).toHaveLength(1);
expect(
(result[0] as unknown as { asyncModified: unknown }).asyncModified,
).toBeUndefined(); // Not modified yet!
// Wait for the background task to complete (50ms delay in DummyAsyncProcessor)
await new Promise((resolve) => setTimeout(resolve, 60));
});
it('gracefully handles and swallows processor errors in synchronous pipelines', async () => {
it('gracefully handles and swallows processor errors in synchronous routes', async () => {
const config = createConfig([
{
name: 'ThrowPipe',
@@ -251,14 +233,14 @@ describe('PipelineOrchestrator (Component)', () => {
expect(result).toStrictEqual(nodes);
});
it('automatically binds to retained_exceeded trigger via EventBus', () => {
it('automatically triggers background pipelines via EventBus', () => {
const config = createConfig([
{
name: 'PressureRelief',
execution: 'background',
triggers: ['retained_exceeded'],
processors: [
{ processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig,
{ processorId: 'ModifyingProcessor' } as unknown as ProcessorConfig,
],
},
]);
@@ -283,4 +265,29 @@ describe('PipelineOrchestrator (Component)', () => {
expect(executeSpy).toHaveBeenCalled();
});
});
it('automatically dispatches workers when matching EventBus events occur', async () => {
const config = createConfig([], [{ workerId: 'MockWorker' }]);
const orchestrator = new PipelineOrchestrator(config, env, eventBus, env.tracer, registry);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const workerInstance = (orchestrator as any).instantiatedWorkers.get('MockWorker') as MockWorker;
expect(workerInstance.wasExecuted).toBe(false);
const nodes = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, {
semanticParts: [{ type: 'text', text: 'worker trigger text' }]
}, 'not-protected-id')];
// Emit the new_message chunk which maps to onNodesAdded for workers
eventBus.emitChunkReceived({
nodes,
targetNodeIds: new Set(nodes.map(n => n.id)),
});
// Worker execute is fire and forget, so we yield to the event loop
await new Promise((resolve) => setTimeout(resolve, 10));
expect(workerInstance.wasExecuted).toBe(true);
});
});