mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-16 06:43:07 -07:00
hmm speculative
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -4,8 +4,10 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { testTruncateProfile } from './sidecar/testProfile.js';
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
|
||||
import { testTruncateProfile } from './sidecar/testProfile.js';
|
||||
import {
|
||||
createSyntheticHistory,
|
||||
createMockContextConfig,
|
||||
@@ -25,7 +27,10 @@ describe('ContextManager Sync Pressure Barrier Tests', () => {
|
||||
it('should instantly truncate history when maxTokens is exceeded using truncate strategy', async () => {
|
||||
// 1. Setup
|
||||
const config = createMockContextConfig();
|
||||
const { chatHistory, contextManager } = setupContextComponentTest(config);
|
||||
const { chatHistory, contextManager } = setupContextComponentTest(
|
||||
config,
|
||||
testTruncateProfile,
|
||||
);
|
||||
|
||||
// 2. Add System Prompt (Episode 0 - Protected)
|
||||
chatHistory.set([
|
||||
@@ -60,9 +65,15 @@ describe('ContextManager Sync Pressure Barrier Tests', () => {
|
||||
expect(projection[0].role).toBe('user');
|
||||
expect(projection[0].parts![0].text).toBe('System prompt');
|
||||
|
||||
// Filter out synthetic Yield nodes (they are model responses without actual tool/text bodies)
|
||||
const contentNodes = projection.filter(
|
||||
(p) =>
|
||||
p.parts && p.parts.some((part) => part.text && part.text !== 'Yield'),
|
||||
);
|
||||
|
||||
// Verify the latest turn is perfectly preserved at the back
|
||||
const lastUser = projection[projection.length - 2];
|
||||
const lastModel = projection[projection.length - 1];
|
||||
const lastUser = contentNodes[contentNodes.length - 2];
|
||||
const lastModel = contentNodes[contentNodes.length - 1];
|
||||
|
||||
expect(lastUser.role).toBe('user');
|
||||
expect(lastUser.parts![0].text).toBe('Final question.');
|
||||
|
||||
@@ -26,6 +26,7 @@ import type { SidecarConfig } from './sidecar/types.js';
|
||||
import { ProcessorRegistry } from './sidecar/registry.js';
|
||||
import { registerBuiltInProcessors } from './sidecar/builtins.js';
|
||||
import { IrMapper } from './ir/mapper.js';
|
||||
import { createMockContextConfig, setupContextComponentTest } from './testing/contextTestUtils.js';
|
||||
|
||||
expect.addSnapshotSerializer({
|
||||
test: (val) =>
|
||||
@@ -147,43 +148,18 @@ describe('ContextManager Golden Tests', () => {
|
||||
|
||||
it('should not modify history when under budget', async () => {
|
||||
const history = createLargeHistory();
|
||||
(
|
||||
contextManager as unknown as { pristineEpisodes: Episode[] }
|
||||
).pristineEpisodes = IrMapper.toIr(history, new ContextTokenCalculator(4));
|
||||
// In Golden Tests, we just want to ensure the logic doesn't throw or alter unprotected history in weird ways.
|
||||
// Since we're skipping processors due to being under budget, it should equal history.
|
||||
const tracer2 = new ContextTracer({
|
||||
targetDir: '/tmp',
|
||||
sessionId: 'test2',
|
||||
});
|
||||
const eventBus2 = new ContextEventBus();
|
||||
const env2 = new ContextEnvironmentImpl(
|
||||
{
|
||||
generateContent: async () => ({}),
|
||||
generateJson: async () => ({}),
|
||||
} as unknown as BaseLlmClient,
|
||||
'test-prompt-id',
|
||||
'test',
|
||||
'/tmp',
|
||||
'/tmp',
|
||||
tracer2,
|
||||
4,
|
||||
eventBus2,
|
||||
);
|
||||
contextManager = ContextManager.create(
|
||||
{
|
||||
|
||||
const config = createMockContextConfig();
|
||||
const { chatHistory, contextManager: localManager } = setupContextComponentTest(config, {
|
||||
budget: { retainedTokens: 100000, maxTokens: 150000 },
|
||||
pipelines: [],
|
||||
} as unknown as SidecarConfig,
|
||||
env2,
|
||||
tracer2,
|
||||
);
|
||||
} as unknown as SidecarConfig);
|
||||
|
||||
(
|
||||
contextManager as unknown as { pristineEpisodes: Episode[] }
|
||||
).pristineEpisodes = IrMapper.toIr(history, new ContextTokenCalculator(4));
|
||||
const result = await contextManager.projectCompressedHistory();
|
||||
chatHistory.set(history);
|
||||
|
||||
expect(result.length).toEqual(history.length);
|
||||
const result = await localManager.projectCompressedHistory();
|
||||
|
||||
// V2 adds an AgentYield node to the end of the history array
|
||||
expect(result.length).toEqual(history.length + 1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -38,22 +38,22 @@ export class HistoryObserver {
|
||||
this.unsubscribeHistory = this.chatHistory.subscribe(
|
||||
(_event: HistoryEvent) => {
|
||||
// Rebuild the pristine IR graph from the full source history on every change.
|
||||
// Wait, toIr still returns an Episode[].
|
||||
// Wait, toIr still returns an Episode[].
|
||||
// We actually need to map the Episode[] to a flat ConcreteNode[] here to form the 'ship'.
|
||||
const pristineEpisodes = IrMapper.toIr(
|
||||
this.chatHistory.get(),
|
||||
this.tokenCalculator,
|
||||
);
|
||||
|
||||
|
||||
const ship: import('./ir/types.js').ConcreteNode[] = [];
|
||||
for (const ep of pristineEpisodes) {
|
||||
if (ep.concreteNodeIds) {
|
||||
for (const child of ep.concreteNodeIds) {
|
||||
ship.push(child as unknown as import('./ir/types.js').ConcreteNode);
|
||||
}
|
||||
}
|
||||
if (ep.concreteNodes) {
|
||||
for (const child of ep.concreteNodes) {
|
||||
ship.push(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const newNodes = new Set<string>();
|
||||
for (const node of ship) {
|
||||
if (!this.seenNodeIds.has(node.id)) {
|
||||
|
||||
@@ -37,8 +37,17 @@ export class IrProjector {
|
||||
}
|
||||
|
||||
const maxTokens = sidecar.budget.maxTokens;
|
||||
const currentTokens =
|
||||
env.tokenCalculator.calculateConcreteListTokens(ship);
|
||||
const currentTokens = env.tokenCalculator.calculateConcreteListTokens(ship);
|
||||
|
||||
// V0: Always protect the first node (System Prompt) and the last turn
|
||||
if (ship.length > 0) {
|
||||
protectedIds.add(ship[0].id);
|
||||
if (ship[0].logicalParentId) protectedIds.add(ship[0].logicalParentId);
|
||||
|
||||
const lastNode = ship[ship.length - 1];
|
||||
protectedIds.add(lastNode.id);
|
||||
if (lastNode.logicalParentId) protectedIds.add(lastNode.logicalParentId);
|
||||
}
|
||||
|
||||
if (currentTokens <= maxTokens) {
|
||||
tracer.logEvent(
|
||||
@@ -101,11 +110,11 @@ export class IrProjector {
|
||||
const skipList = new Set<string>();
|
||||
for (const node of processedShip) {
|
||||
if (node.abstractsIds) {
|
||||
for (const id of node.abstractsIds) skipList.add(id);
|
||||
for (const id of node.abstractsIds) skipList.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
const visibleShip = processedShip.filter(n => !skipList.has(n.id));
|
||||
const visibleShip = processedShip.filter((n) => !skipList.has(n.id));
|
||||
|
||||
const contents = IrMapper.fromIr(visibleShip);
|
||||
tracer.logEvent('IrProjector', 'Projected Sanitized Context to LLM', {
|
||||
|
||||
@@ -37,8 +37,8 @@ function isCompleteEpisode(ep: Partial<Episode>): ep is Episode {
|
||||
return (
|
||||
typeof ep.id === 'string' &&
|
||||
typeof ep.timestamp === 'number' &&
|
||||
Array.isArray(ep.concreteNodeIds) &&
|
||||
ep.concreteNodeIds.length > 0
|
||||
Array.isArray(ep.concreteNodes) &&
|
||||
ep.concreteNodes.length > 0
|
||||
);
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ function parseToolResponses(
|
||||
currentEpisode = {
|
||||
id: getStableId(msg),
|
||||
timestamp: Date.now(),
|
||||
concreteNodeIds: [getStableId(msg.parts![0] || msg)],
|
||||
concreteNodes: [],
|
||||
};
|
||||
}
|
||||
|
||||
@@ -152,7 +152,10 @@ function parseToolResponses(
|
||||
transformations: [],
|
||||
},
|
||||
};
|
||||
currentEpisode.concreteNodeIds = [...(currentEpisode.concreteNodeIds || []), step.id];
|
||||
currentEpisode.concreteNodes = [
|
||||
...(currentEpisode.concreteNodes || []),
|
||||
step,
|
||||
];
|
||||
if (callId) pendingCallParts.delete(callId);
|
||||
}
|
||||
}
|
||||
@@ -193,7 +196,8 @@ function parseUserParts(
|
||||
return {
|
||||
id: getStableId(msg),
|
||||
timestamp: Date.now(),
|
||||
concreteNodeIds: [trigger.id], };
|
||||
concreteNodes: [trigger],
|
||||
};
|
||||
}
|
||||
|
||||
function parseModelParts(
|
||||
@@ -206,7 +210,7 @@ function parseModelParts(
|
||||
currentEpisode = {
|
||||
id: getStableId(msg),
|
||||
timestamp: Date.now(),
|
||||
concreteNodeIds: [getStableId(msg.parts![0] || msg)],
|
||||
concreteNodes: [],
|
||||
};
|
||||
}
|
||||
|
||||
@@ -221,25 +225,29 @@ function parseModelParts(
|
||||
text: part.text,
|
||||
metadata: createMetadata([part]),
|
||||
};
|
||||
currentEpisode.concreteNodeIds = [...(currentEpisode.concreteNodeIds || []), thought.id];
|
||||
currentEpisode.concreteNodes = [
|
||||
...(currentEpisode.concreteNodes || []),
|
||||
thought,
|
||||
];
|
||||
}
|
||||
}
|
||||
return currentEpisode as Partial<Episode>;
|
||||
}
|
||||
|
||||
function finalizeYield(currentEpisode: Partial<Episode>) {
|
||||
if (currentEpisode.concreteNodeIds && currentEpisode.concreteNodeIds.length > 0) {
|
||||
const yieldNode: AgentYield = {
|
||||
id: randomUUID(),
|
||||
type: 'AGENT_YIELD',
|
||||
text: 'Yield', // Synthesized yield since we don't have the original concrete node
|
||||
metadata: {
|
||||
originalTokens: 1,
|
||||
currentTokens: 1,
|
||||
transformations: []
|
||||
},
|
||||
};
|
||||
const existingNodes = currentEpisode.concreteNodeIds as string[];
|
||||
currentEpisode.concreteNodeIds = [...existingNodes.slice(0, -1), yieldNode.id];
|
||||
if (currentEpisode.concreteNodes && currentEpisode.concreteNodes.length > 0) {
|
||||
const yieldNode: AgentYield = {
|
||||
id: randomUUID(),
|
||||
type: 'AGENT_YIELD',
|
||||
text: 'Yield', // Synthesized yield since we don't have the original concrete node
|
||||
metadata: {
|
||||
originalTokens: 1,
|
||||
currentTokens: 1,
|
||||
transformations: [],
|
||||
},
|
||||
};
|
||||
const existingNodes =
|
||||
currentEpisode.concreteNodes as import('./types.js').ConcreteNode[];
|
||||
currentEpisode.concreteNodes = [...existingNodes, yieldNode];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ export type IrNodeType =
|
||||
| 'AGENT_THOUGHT'
|
||||
| 'TOOL_EXECUTION'
|
||||
| 'AGENT_YIELD'
|
||||
|
||||
|
||||
// Synthetic Concrete Nodes
|
||||
| 'SNAPSHOT'
|
||||
| 'ROLLING_SUMMARY'
|
||||
@@ -56,17 +56,17 @@ export interface IrNode {
|
||||
readonly metadata: IrMetadata;
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Concrete Nodes: The atomic, renderable pieces of data.
|
||||
* These are the actual "planks" of the Ship of Theseus.
|
||||
*/
|
||||
export interface BaseConcreteNode extends IrNode {
|
||||
/** The ID of the Logical Node (e.g., Episode) that structurally owns this node */
|
||||
readonly logicalParentId?: string;
|
||||
|
||||
|
||||
/** If this node replaced a single node 1:1 (e.g., masking), this points to the original */
|
||||
readonly replacesId?: string;
|
||||
|
||||
|
||||
/** If this node is a synthetic summary of N nodes, this points to the original IDs */
|
||||
readonly abstractsIds?: readonly string[];
|
||||
}
|
||||
@@ -171,14 +171,14 @@ export interface RollingSummary extends BaseConcreteNode {
|
||||
|
||||
export type SyntheticLeaf = Snapshot | RollingSummary;
|
||||
|
||||
export type ConcreteNode =
|
||||
| UserPrompt
|
||||
| SystemEvent
|
||||
| AgentThought
|
||||
| ToolExecution
|
||||
| MaskedTool
|
||||
| AgentYield
|
||||
| Snapshot
|
||||
export type ConcreteNode =
|
||||
| UserPrompt
|
||||
| SystemEvent
|
||||
| AgentThought
|
||||
| ToolExecution
|
||||
| MaskedTool
|
||||
| AgentYield
|
||||
| Snapshot
|
||||
| RollingSummary;
|
||||
|
||||
/**
|
||||
@@ -189,7 +189,7 @@ export interface Episode extends IrNode {
|
||||
readonly type: 'EPISODE';
|
||||
readonly timestamp: number;
|
||||
/** References to the Concrete Node IDs that conceptually belong to this Episode. */
|
||||
concreteNodeIds: readonly string[];
|
||||
concreteNodes: readonly ConcreteNode[];
|
||||
}
|
||||
|
||||
export interface Task extends IrNode {
|
||||
|
||||
@@ -26,7 +26,13 @@ class DummySyncProcessor implements ContextProcessor {
|
||||
readonly name = 'DummySync';
|
||||
readonly id = 'DummySync';
|
||||
readonly options = {};
|
||||
async process() { return []; }
|
||||
async process(args: any) {
|
||||
const newTargets = [...args.targets];
|
||||
if (newTargets.length > 0) {
|
||||
newTargets[0] = { ...newTargets[0], dummyModified: true };
|
||||
}
|
||||
return newTargets;
|
||||
}
|
||||
}
|
||||
|
||||
class DummyAsyncProcessor implements ContextProcessor {
|
||||
@@ -37,7 +43,13 @@ class DummyAsyncProcessor implements ContextProcessor {
|
||||
readonly name = 'DummyAsync';
|
||||
readonly id = 'DummyAsync';
|
||||
readonly options = {};
|
||||
async process() { return []; }
|
||||
async process(args: any) {
|
||||
const newTargets = [...args.targets];
|
||||
if (newTargets.length > 0) {
|
||||
newTargets[0] = { ...newTargets[0], dummyModified: true };
|
||||
}
|
||||
return newTargets;
|
||||
}
|
||||
}
|
||||
|
||||
class ThrowingProcessor implements ContextProcessor {
|
||||
@@ -95,9 +107,9 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
it('instantiates processors from the registry on initialization', () => {
|
||||
const config = createConfig([
|
||||
{
|
||||
name: 'Sync',
|
||||
name: 'ThrowPipe',
|
||||
execution: 'blocking',
|
||||
triggers: [],
|
||||
triggers: ['new_message'],
|
||||
processors: [
|
||||
{ processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig,
|
||||
],
|
||||
@@ -120,9 +132,9 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
it('throws an error if a config requests an unknown processor', () => {
|
||||
const config = createConfig([
|
||||
{
|
||||
name: 'Bad',
|
||||
name: 'ThrowPipe',
|
||||
execution: 'blocking',
|
||||
triggers: [],
|
||||
triggers: ['new_message'],
|
||||
processors: [
|
||||
{ processorId: 'DoesNotExist' } as unknown as ProcessorConfig,
|
||||
],
|
||||
@@ -140,7 +152,7 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
{
|
||||
name: 'SyncPipe',
|
||||
execution: 'blocking',
|
||||
triggers: [],
|
||||
triggers: ['new_message'],
|
||||
processors: [
|
||||
{ processorId: 'DummySyncProcessor' } as unknown as ProcessorConfig,
|
||||
],
|
||||
@@ -154,19 +166,19 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
registry,
|
||||
);
|
||||
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const state = createDummyState(false);
|
||||
const ship = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
|
||||
const state = createDummyState(false, 0, new Set());
|
||||
|
||||
const result = await orchestrator.executeTriggerSync(
|
||||
'new_message',
|
||||
ship,
|
||||
new Set(ship.map(s => s.id)),
|
||||
new Set(ship.map((s) => s.id)),
|
||||
state,
|
||||
);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(
|
||||
(result[0] as unknown as { dummyModified: boolean }).dummyModified,
|
||||
(result[0] as unknown as { dummyModified?: boolean }).dummyModified,
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
@@ -175,7 +187,7 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
{
|
||||
name: 'AsyncPipe',
|
||||
execution: 'background',
|
||||
triggers: [],
|
||||
triggers: ['new_message'],
|
||||
processors: [
|
||||
{ processorId: 'DummyAsyncProcessor' } as unknown as ProcessorConfig,
|
||||
],
|
||||
@@ -189,14 +201,14 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
registry,
|
||||
);
|
||||
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const state = createDummyState(false);
|
||||
const ship = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
|
||||
const state = createDummyState(false, 0, new Set());
|
||||
|
||||
// This should resolve immediately with the UNMODIFIED array because execution is background
|
||||
const result = await orchestrator.executeTriggerSync(
|
||||
'new_message',
|
||||
ship,
|
||||
new Set(ship.map(s => s.id)),
|
||||
new Set(ship.map((s) => s.id)),
|
||||
state,
|
||||
);
|
||||
|
||||
@@ -212,9 +224,9 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
it('gracefully handles and swallows processor errors in synchronous pipelines', async () => {
|
||||
const config = createConfig([
|
||||
{
|
||||
name: 'ThrowingPipe',
|
||||
name: 'ThrowPipe',
|
||||
execution: 'blocking',
|
||||
triggers: [],
|
||||
triggers: ['new_message'],
|
||||
processors: [
|
||||
{ processorId: 'ThrowingProcessor' } as unknown as ProcessorConfig,
|
||||
],
|
||||
@@ -228,14 +240,14 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
registry,
|
||||
);
|
||||
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const state = createDummyState(false);
|
||||
const ship = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
|
||||
const state = createDummyState(false, 0, new Set());
|
||||
|
||||
// It should not throw! It should swallow the error and return the unmodified array.
|
||||
const result = await orchestrator.executeTriggerSync(
|
||||
'new_message',
|
||||
ship,
|
||||
new Set(ship.map(s => s.id)),
|
||||
new Set(ship.map((s) => s.id)),
|
||||
state,
|
||||
);
|
||||
|
||||
@@ -267,7 +279,11 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
|
||||
// Emit the trigger
|
||||
eventBus.emitConsolidationNeeded({ ship, targetDeficit: 100, targetNodeIds: new Set() });
|
||||
eventBus.emitConsolidationNeeded({
|
||||
ship,
|
||||
targetDeficit: 100,
|
||||
targetNodeIds: new Set(),
|
||||
});
|
||||
|
||||
expect(executeSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -5,7 +5,11 @@
|
||||
*/
|
||||
|
||||
import type { ConcreteNode } from '../ir/types.js';
|
||||
import type { ContextProcessor, ContextWorker, ContextAccountingState } from '../pipeline.js';
|
||||
import type {
|
||||
ContextProcessor,
|
||||
ContextWorker,
|
||||
ContextAccountingState,
|
||||
} from '../pipeline.js';
|
||||
import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js';
|
||||
import type {
|
||||
ContextEnvironment,
|
||||
@@ -33,12 +37,28 @@ export class PipelineOrchestrator {
|
||||
this.setupTriggers();
|
||||
}
|
||||
|
||||
private isNodeAllowed(
|
||||
node: import('../ir/types.js').ConcreteNode,
|
||||
triggerTargets: ReadonlySet<string>,
|
||||
state: ContextAccountingState,
|
||||
): boolean {
|
||||
return (
|
||||
triggerTargets.has(node.id) &&
|
||||
!state.protectedLogicalIds.has(node.id) &&
|
||||
(!node.logicalParentId ||
|
||||
!state.protectedLogicalIds.has(node.logicalParentId))
|
||||
);
|
||||
}
|
||||
|
||||
private instantiateProcessors() {
|
||||
for (const pipeline of this.config.pipelines) {
|
||||
for (const procDef of pipeline.processors) {
|
||||
if (!this.instantiatedProcessors.has(procDef.processorId)) {
|
||||
const factory = this.registry.get(procDef.processorId);
|
||||
const instance = factory.create(this.env, procDef.options || {}) as ContextProcessor;
|
||||
const instance = factory.create(
|
||||
this.env,
|
||||
procDef.options || {},
|
||||
) as ContextProcessor;
|
||||
this.instantiatedProcessors.set(procDef.processorId, instance);
|
||||
}
|
||||
}
|
||||
@@ -50,7 +70,10 @@ export class PipelineOrchestrator {
|
||||
for (const workerDef of this.config.workers) {
|
||||
if (!this.instantiatedWorkers.has(workerDef.workerId)) {
|
||||
const factory = this.registry.get(workerDef.workerId);
|
||||
const instance = factory.create(this.env, workerDef.options || {}) as unknown as ContextWorker;
|
||||
const instance = factory.create(
|
||||
this.env,
|
||||
workerDef.options || {},
|
||||
) as unknown as ContextWorker;
|
||||
this.instantiatedWorkers.set(workerDef.workerId, instance);
|
||||
}
|
||||
}
|
||||
@@ -75,7 +98,12 @@ export class PipelineOrchestrator {
|
||||
deficitTokens: event.targetDeficit,
|
||||
protectedLogicalIds: new Set(),
|
||||
};
|
||||
void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state);
|
||||
void this.executePipelineAsync(
|
||||
pipeline,
|
||||
[],
|
||||
event.targetNodeIds,
|
||||
state,
|
||||
);
|
||||
});
|
||||
} else if (trigger === 'new_message') {
|
||||
this.eventBus.onChunkReceived((event) => {
|
||||
@@ -87,7 +115,12 @@ export class PipelineOrchestrator {
|
||||
deficitTokens: 0,
|
||||
protectedLogicalIds: new Set(),
|
||||
};
|
||||
void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state);
|
||||
void this.executePipelineAsync(
|
||||
pipeline,
|
||||
[],
|
||||
event.targetNodeIds,
|
||||
state,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -95,16 +128,18 @@ export class PipelineOrchestrator {
|
||||
|
||||
// 2. Worker Triggers (onNodesAdded is roughly onChunkReceived for now)
|
||||
this.eventBus.onChunkReceived((event) => {
|
||||
// Fire all workers that care about new nodes
|
||||
for (const worker of this.instantiatedWorkers.values()) {
|
||||
if (worker.triggers.onNodesAdded) {
|
||||
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
|
||||
// Fire and forget
|
||||
worker.execute({ targets: [], inbox: inboxSnapshot }).catch(e => {
|
||||
debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e);
|
||||
});
|
||||
}
|
||||
}
|
||||
// Fire all workers that care about new nodes
|
||||
for (const worker of this.instantiatedWorkers.values()) {
|
||||
if (worker.triggers.onNodesAdded) {
|
||||
const inboxSnapshot = new InboxSnapshotImpl(
|
||||
this.env.inbox?.getMessages() || [],
|
||||
);
|
||||
// Fire and forget
|
||||
worker.execute({ targets: [], inbox: inboxSnapshot }).catch((e) => {
|
||||
debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// We don't have a formal event bus for inbox publish yet, but we will soon.
|
||||
@@ -123,8 +158,8 @@ export class PipelineOrchestrator {
|
||||
returnedNodes: ReadonlyArray<ConcreteNode>,
|
||||
): ReadonlyArray<ConcreteNode> {
|
||||
const mutableShip = [...ship];
|
||||
const targetSet = new Set(targets.map(n => n.id));
|
||||
const returnedMap = new Map(returnedNodes.map(n => [n.id, n]));
|
||||
const targetSet = new Set(targets.map((n) => n.id));
|
||||
const returnedMap = new Map(returnedNodes.map((n) => [n.id, n]));
|
||||
|
||||
const removedIds = new Set<string>();
|
||||
const newNodes: ConcreteNode[] = [];
|
||||
@@ -146,7 +181,7 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
|
||||
if (removedIds.size === 0 && newNodes.length === 0) {
|
||||
return ship;
|
||||
return ship;
|
||||
}
|
||||
|
||||
let earliestRemovalIdx = mutableShip.length;
|
||||
@@ -174,10 +209,14 @@ export class PipelineOrchestrator {
|
||||
state: ContextAccountingState,
|
||||
): Promise<ReadonlyArray<ConcreteNode>> {
|
||||
let currentShip = ship;
|
||||
const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger));
|
||||
|
||||
const pipelines = this.config.pipelines.filter((p) =>
|
||||
p.triggers.includes(trigger),
|
||||
);
|
||||
|
||||
// Freeze the inbox for this pipeline run
|
||||
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
|
||||
const inboxSnapshot = new InboxSnapshotImpl(
|
||||
this.env.inbox?.getMessages() || [],
|
||||
);
|
||||
|
||||
for (const pipeline of pipelines) {
|
||||
for (const procDef of pipeline.processors) {
|
||||
@@ -190,20 +229,22 @@ export class PipelineOrchestrator {
|
||||
`Executing processor synchronously: ${procDef.processorId}`,
|
||||
);
|
||||
|
||||
const allowedTargets = currentShip.filter(n =>
|
||||
triggerTargets.has(n.id) &&
|
||||
(!n.logicalParentId || !state.protectedLogicalIds.has(n.logicalParentId))
|
||||
const allowedTargets = currentShip.filter((n) =>
|
||||
this.isNodeAllowed(n, triggerTargets, state),
|
||||
);
|
||||
|
||||
|
||||
const returnedNodes = await processor.process({
|
||||
buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully
|
||||
targets: allowedTargets,
|
||||
state,
|
||||
inbox: inboxSnapshot,
|
||||
});
|
||||
|
||||
currentShip = this.applyProcessorDiff(currentShip, allowedTargets, returnedNodes);
|
||||
|
||||
|
||||
currentShip = this.applyProcessorDiff(
|
||||
currentShip,
|
||||
allowedTargets,
|
||||
returnedNodes,
|
||||
);
|
||||
} catch (error) {
|
||||
debugLogger.error(
|
||||
`Synchronous processor ${procDef.processorId} failed:`,
|
||||
@@ -214,7 +255,7 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
|
||||
// Success! Drain consumed messages
|
||||
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
this.env.inbox?.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
|
||||
return currentShip;
|
||||
}
|
||||
@@ -232,7 +273,9 @@ export class PipelineOrchestrator {
|
||||
if (!ship || ship.length === 0) return;
|
||||
|
||||
let currentShip = ship;
|
||||
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages() || []);
|
||||
const inboxSnapshot = new InboxSnapshotImpl(
|
||||
this.env.inbox?.getMessages() || [],
|
||||
);
|
||||
|
||||
for (const procDef of pipeline.processors) {
|
||||
const processor = this.instantiatedProcessors.get(procDef.processorId);
|
||||
@@ -244,29 +287,31 @@ export class PipelineOrchestrator {
|
||||
`Executing processor: ${procDef.processorId} (async)`,
|
||||
);
|
||||
|
||||
const allowedTargets = currentShip.filter(n =>
|
||||
triggerTargets.has(n.id) &&
|
||||
(!n.logicalParentId || !state.protectedLogicalIds.has(n.logicalParentId))
|
||||
const allowedTargets = currentShip.filter((n) =>
|
||||
this.isNodeAllowed(n, triggerTargets, state),
|
||||
);
|
||||
|
||||
const returnedNodes = await processor.process({
|
||||
buffer: {} as any,
|
||||
targets: allowedTargets,
|
||||
state,
|
||||
inbox: inboxSnapshot,
|
||||
buffer: {} as any,
|
||||
targets: allowedTargets,
|
||||
state,
|
||||
inbox: inboxSnapshot,
|
||||
});
|
||||
|
||||
currentShip = this.applyProcessorDiff(currentShip, allowedTargets, returnedNodes);
|
||||
|
||||
currentShip = this.applyProcessorDiff(
|
||||
currentShip,
|
||||
allowedTargets,
|
||||
returnedNodes,
|
||||
);
|
||||
} catch (error) {
|
||||
debugLogger.error(
|
||||
`Pipeline ${pipeline.name} failed async at ${procDef.processorId}:`,
|
||||
error,
|
||||
);
|
||||
return;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
this.env.inbox?.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
import type { SidecarConfig } from './types.js';
|
||||
|
||||
export const testTruncateProfile: SidecarConfig = {
|
||||
budget: {
|
||||
retainedTokens: 65000,
|
||||
maxTokens: 150000,
|
||||
},
|
||||
pipelines: [
|
||||
{
|
||||
name: 'Emergency Backstop (Truncate Only)',
|
||||
triggers: ['gc_backstop', 'retained_exceeded'],
|
||||
execution: 'blocking',
|
||||
processors: [
|
||||
{ processorId: 'EmergencyTruncationProcessor', options: {} },
|
||||
],
|
||||
},
|
||||
],
|
||||
};
|
||||
+83
-10
File diff suppressed because one or more lines are too long
@@ -47,7 +47,7 @@ export function createDummyNode(
|
||||
type: 'USER_PROMPT' | 'SYSTEM_EVENT' | 'AGENT_THOUGHT' | 'AGENT_YIELD',
|
||||
tokens = 100,
|
||||
overrides?: Partial<ConcreteNode>,
|
||||
id?: string
|
||||
id?: string,
|
||||
): ConcreteNode {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return {
|
||||
@@ -74,7 +74,7 @@ export function createDummyToolNode(
|
||||
intentTokens = 100,
|
||||
obsTokens = 200,
|
||||
overrides?: Partial<ToolExecution>,
|
||||
id?: string
|
||||
id?: string,
|
||||
): ToolExecution {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return {
|
||||
@@ -99,9 +99,9 @@ export function createDummyToolNode(
|
||||
} as unknown as ToolExecution;
|
||||
}
|
||||
|
||||
|
||||
|
||||
export function createMockEnvironment(overrides?: Partial<ContextEnvironment>): ContextEnvironment {
|
||||
export function createMockEnvironment(
|
||||
overrides?: Partial<ContextEnvironment>,
|
||||
): ContextEnvironment {
|
||||
return {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
llmClient: vi.fn().mockReturnValue({
|
||||
@@ -181,11 +181,14 @@ export function createMockContextConfig(
|
||||
* Wires up a full ContextManager component with an AgentChatHistory and active background workers.
|
||||
*/
|
||||
|
||||
export function setupContextComponentTest(config: Config) {
|
||||
export function setupContextComponentTest(
|
||||
config: Config,
|
||||
sidecarOverride?: import('../sidecar/types.js').SidecarConfig,
|
||||
) {
|
||||
const chatHistory = new AgentChatHistory();
|
||||
const registry = new ProcessorRegistry();
|
||||
registerBuiltInProcessors(registry);
|
||||
const sidecar = SidecarLoader.fromConfig(config, registry);
|
||||
const sidecar = sidecarOverride || SidecarLoader.fromConfig(config, registry);
|
||||
const tracer = new ContextTracer({
|
||||
targetDir: '/tmp',
|
||||
sessionId: 'test-session',
|
||||
|
||||
Reference in New Issue
Block a user