mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-06-10 11:12:35 -07:00
initial worker pr, debugging
This commit is contained in:
@@ -14,10 +14,8 @@ import type {
|
||||
AgentThought,
|
||||
AgentYield,
|
||||
UserPrompt,
|
||||
SystemEvent,
|
||||
} from './types.js';
|
||||
import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js';
|
||||
import { isAgentThought } from './graphUtils.js';
|
||||
|
||||
// WeakMap to provide stable, deterministic identity across parses for the exact same Content/Part references
|
||||
const nodeIdentityMap = new WeakMap<object, string>();
|
||||
|
||||
@@ -4,104 +4,62 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import type { ConcreteNode, Snapshot, RollingSummary } from './ir/types.js';
|
||||
import type { ConcreteNode } from './ir/types.js';
|
||||
|
||||
export type InboxMessage =
|
||||
| { type: 'SNAPSHOT_READY'; snapshot: Snapshot; abstractsIds: string[] }
|
||||
| { type: 'BACKGROUND_SUMMARY'; summary: RollingSummary; targetId: string };
|
||||
export interface InboxMessage<T = unknown> {
|
||||
id: string;
|
||||
topic: string;
|
||||
payload: T;
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
export interface ContextInbox {
|
||||
dispatch(message: InboxMessage): void;
|
||||
peek<T extends InboxMessage['type']>(type: T): Extract<InboxMessage, { type: T }> | undefined;
|
||||
export interface InboxSnapshot {
|
||||
getMessages<T = unknown>(topic: string): ReadonlyArray<InboxMessage<T>>;
|
||||
consume(messageId: string): void;
|
||||
}
|
||||
|
||||
export interface ContextWorkingBuffer {
|
||||
/** The current active (projected) flat list of ConcreteNodes. */
|
||||
readonly nodes: readonly ConcreteNode[];
|
||||
|
||||
/** Retrieves the historical, pristine version of a node (before any masks/summaries). */
|
||||
getPristineNode(id: string): ConcreteNode | undefined;
|
||||
|
||||
/** Retrieves the full audit lineage of a specific node ID. */
|
||||
getLineage(id: string): readonly ConcreteNode[];
|
||||
}
|
||||
|
||||
/**
|
||||
* State object passed through the processing pipeline.
|
||||
* Contains global accounting logic and semantic protection rules.
|
||||
*/
|
||||
export interface ContextAccountingState {
|
||||
readonly currentTokens: number;
|
||||
readonly maxTokens: number;
|
||||
readonly retainedTokens: number;
|
||||
|
||||
/** The exact number of tokens that need to be trimmed to reach the retainedTokens goal */
|
||||
readonly deficitTokens: number;
|
||||
|
||||
/**
|
||||
* Set of Logical Node IDs (like Tasks or Episodes) that the orchestrator has deemed highly protected.
|
||||
* Processors should generally skip mutating Concrete Nodes that belong to these parents.
|
||||
*/
|
||||
readonly protectedLogicalIds: ReadonlySet<string>;
|
||||
|
||||
/**
|
||||
* True if currentTokens <= retainedTokens.
|
||||
*/
|
||||
readonly isBudgetSatisfied: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* A declarative instruction from a processor on how to modify the Ship.
|
||||
* Applied sequentially by the Orchestrator (Reducer).
|
||||
*/
|
||||
export interface ProcessArgs {
|
||||
/** The rich buffer containing current nodes and their history. */
|
||||
readonly buffer: ContextWorkingBuffer;
|
||||
|
||||
/**
|
||||
* The unprotected, mutable subset of nodes targeted by this trigger.
|
||||
* The Orchestrator strictly filters out ANY protected nodes (like active tasks) before calling.
|
||||
* Processors can assume all targets passed here are legally theirs to mutate or drop.
|
||||
*/
|
||||
readonly targets: readonly ConcreteNode[];
|
||||
|
||||
/** The token budget and accounting state. */
|
||||
readonly state: ContextAccountingState;
|
||||
|
||||
/** Type-safe messaging system for async/sync coordination. */
|
||||
readonly inbox: ContextInbox;
|
||||
readonly inbox: InboxSnapshot;
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface for all context degradation strategies.
|
||||
* Processors are pure functional map/filter operations over the targets.
|
||||
*/
|
||||
export interface ContextProcessor {
|
||||
/** Unique ID for registry mapping. */
|
||||
readonly id: string;
|
||||
/** Unique name for telemetry and logging. */
|
||||
readonly name: string;
|
||||
|
||||
/**
|
||||
* A pure function. Returns the new state of the `targets`.
|
||||
* If an ID from `targets` is missing in the return array, the Orchestrator deletes it.
|
||||
* If a new synthetic node is in the return array, the Orchestrator inserts it.
|
||||
* The Orchestrator automatically appends audit `IrMetadata` to any changes.
|
||||
*/
|
||||
process(args: ProcessArgs): Promise<readonly ConcreteNode[]>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Standardized configuration options for processors that act as a GC Backstop.
|
||||
* Defines exactly how much of the targeted (degraded/aged-out) history should be cleared.
|
||||
*/
|
||||
export interface ContextWorker {
|
||||
readonly id: string;
|
||||
readonly name: string;
|
||||
readonly triggers: {
|
||||
onNodesAdded?: boolean;
|
||||
onInboxTopics?: string[];
|
||||
};
|
||||
execute(args: {
|
||||
targets: ReadonlyArray<ConcreteNode>;
|
||||
inbox: InboxSnapshot;
|
||||
}): Promise<void>;
|
||||
}
|
||||
|
||||
export interface BackstopTargetOptions {
|
||||
/**
|
||||
* - 'incremental': Remove just enough to get under the threshold (maxTokens or retainedTokens).
|
||||
* - 'freeNTokens': Remove enough to free an explicit number of tokens (defined in freeTokensTarget).
|
||||
* - 'max': Remove/Summarize all explicitly targeted nodes (everything that aged out).
|
||||
*/
|
||||
target?: 'incremental' | 'freeNTokens' | 'max';
|
||||
/** If target is 'freeNTokens', this is the amount of tokens to clear. */
|
||||
freeTokensTarget?: number;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,6 @@ describe('BlobDegradationProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
@@ -86,7 +85,6 @@ describe('BlobDegradationProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
@@ -116,7 +114,6 @@ describe('BlobDegradationProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
|
||||
@@ -47,7 +47,6 @@ describe('HistorySquashingProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
@@ -111,7 +110,6 @@ describe('HistorySquashingProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
|
||||
@@ -56,7 +56,6 @@ describe('SemanticCompressionProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
@@ -133,7 +132,6 @@ describe('SemanticCompressionProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets,
|
||||
state,
|
||||
inbox: {} as any,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import type { ContextProcessor, ProcessArgs } from '../pipeline.js';
|
||||
import type { ContextEnvironment } from '../sidecar/environment.js';
|
||||
import { debugLogger } from '../../utils/debugLogger.js';
|
||||
import { LlmRole } from '../../telemetry/types.js';
|
||||
import { getResponseText } from '../../utils/partUtils.js';
|
||||
import type { ConcreteNode, UserPrompt, AgentThought, ToolExecution } from '../ir/types.js';
|
||||
|
||||
@@ -32,7 +31,6 @@ export class SemanticCompressionProcessor implements ContextProcessor {
|
||||
readonly name = 'SemanticCompressionProcessor';
|
||||
readonly options: SemanticCompressionProcessorOptions;
|
||||
private env: ContextEnvironment;
|
||||
private modelToUse: string = 'gemini-2.5-flash';
|
||||
|
||||
constructor(
|
||||
env: ContextEnvironment,
|
||||
@@ -49,9 +47,13 @@ export class SemanticCompressionProcessor implements ContextProcessor {
|
||||
try {
|
||||
const response = await this.env.llmClient.generateContent(
|
||||
{
|
||||
role: 'user' as any,
|
||||
modelConfigKey: 'default' as any,
|
||||
promptId: this.env.promptId,
|
||||
abortSignal: new AbortController().signal,
|
||||
contents: [
|
||||
{
|
||||
role: 'user',
|
||||
role: 'user' as any,
|
||||
parts: [{ text }],
|
||||
},
|
||||
],
|
||||
@@ -63,8 +65,7 @@ export class SemanticCompressionProcessor implements ContextProcessor {
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
this.modelToUse
|
||||
}
|
||||
);
|
||||
return getResponseText(response) || text;
|
||||
} catch (e) {
|
||||
|
||||
@@ -34,7 +34,6 @@ describe('ToolMaskingProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets: [toolStep],
|
||||
state,
|
||||
inbox: {} as any,
|
||||
@@ -75,7 +74,6 @@ describe('ToolMaskingProcessor', () => {
|
||||
|
||||
const result = await processor.process({
|
||||
buffer: {} as any,
|
||||
ship: [],
|
||||
targets: [toolStep],
|
||||
state,
|
||||
inbox: {} as any,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { ProcessorRegistry } from './registry.js';
|
||||
import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js';
|
||||
import { BlobDegradationProcessor, type BlobDegradationProcessorOptions } from '../processors/blobDegradationProcessor.js';
|
||||
import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js';
|
||||
|
||||
export function registerBuiltInProcessors(registry: ProcessorRegistry) {
|
||||
registry.register<Record<string, never>>({
|
||||
|
||||
@@ -9,6 +9,7 @@ import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'
|
||||
import type { ContextTracer } from '../tracer.js';
|
||||
import type { IFileSystem } from '../system/IFileSystem.js';
|
||||
import type { IIdGenerator } from '../system/IIdGenerator.js';
|
||||
import type { LiveInbox } from './inbox.js';
|
||||
|
||||
export type { ContextTracer, ContextEventBus };
|
||||
|
||||
@@ -24,4 +25,5 @@ export interface ContextEnvironment {
|
||||
readonly fileSystem: IFileSystem;
|
||||
readonly idGenerator: IIdGenerator;
|
||||
readonly eventBus: ContextEventBus;
|
||||
readonly inbox: LiveInbox;
|
||||
}
|
||||
|
||||
@@ -14,10 +14,13 @@ import { NodeFileSystem } from '../system/NodeFileSystem.js';
|
||||
import type { IIdGenerator } from '../system/IIdGenerator.js';
|
||||
import { NodeIdGenerator } from '../system/NodeIdGenerator.js';
|
||||
|
||||
import { LiveInbox } from './inbox.js';
|
||||
|
||||
export class ContextEnvironmentImpl implements ContextEnvironment {
|
||||
readonly tokenCalculator: ContextTokenCalculator;
|
||||
readonly fileSystem: IFileSystem;
|
||||
readonly idGenerator: IIdGenerator;
|
||||
readonly inbox: LiveInbox;
|
||||
|
||||
constructor(
|
||||
readonly llmClient: BaseLlmClient,
|
||||
@@ -34,5 +37,6 @@ export class ContextEnvironmentImpl implements ContextEnvironment {
|
||||
this.tokenCalculator = new ContextTokenCalculator(this.charsPerToken);
|
||||
this.fileSystem = fileSystem || new NodeFileSystem();
|
||||
this.idGenerator = idGenerator || new NodeIdGenerator();
|
||||
this.inbox = new LiveInbox();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
import type { InboxMessage, InboxSnapshot } from '../pipeline.js';
|
||||
|
||||
export class LiveInbox {
|
||||
private messages: InboxMessage[] = [];
|
||||
|
||||
publish<T>(topic: string, payload: T, idGenerator: { generateId(): string }): void {
|
||||
this.messages.push({
|
||||
id: idGenerator.generateId(),
|
||||
topic,
|
||||
payload,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
getMessages(): ReadonlyArray<InboxMessage> {
|
||||
return [...this.messages];
|
||||
}
|
||||
|
||||
drainConsumed(consumedIds: Set<string>): void {
|
||||
this.messages = this.messages.filter((m) => !consumedIds.has(m.id));
|
||||
}
|
||||
}
|
||||
|
||||
export class InboxSnapshotImpl implements InboxSnapshot {
|
||||
private messages: ReadonlyArray<InboxMessage>;
|
||||
private consumedIds = new Set<string>();
|
||||
|
||||
constructor(messages: ReadonlyArray<InboxMessage>) {
|
||||
this.messages = messages;
|
||||
}
|
||||
|
||||
getMessages<T = unknown>(topic: string): ReadonlyArray<InboxMessage<T>> {
|
||||
return this.messages.filter((m) => m.topic === topic) as unknown as ReadonlyArray<InboxMessage<T>>;
|
||||
}
|
||||
|
||||
consume(messageId: string): void {
|
||||
this.consumedIds.add(messageId);
|
||||
}
|
||||
|
||||
getConsumedIds(): Set<string> {
|
||||
return this.consumedIds;
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,7 @@ import {
|
||||
createDummyNode,
|
||||
} from '../testing/contextTestUtils.js';
|
||||
import type { ContextEnvironment } from './environment.js';
|
||||
import type { ContextAccountingState, ContextProcessor } from '../pipeline.js';
|
||||
import type { ContextProcessor } from '../pipeline.js';
|
||||
import type { PipelineDef, ProcessorConfig, SidecarConfig } from './types.js';
|
||||
import type { ContextEventBus } from '../eventBus.js';
|
||||
|
||||
@@ -154,13 +154,13 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
registry,
|
||||
);
|
||||
|
||||
const episodes = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const state = createDummyState(false);
|
||||
|
||||
const result = await orchestrator.executeTriggerSync(
|
||||
'new_message',
|
||||
episodes,
|
||||
new Set(episodes.map(e => e.id)),
|
||||
ship,
|
||||
new Set(ship.map(s => s.id)),
|
||||
state,
|
||||
);
|
||||
|
||||
@@ -189,14 +189,14 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
registry,
|
||||
);
|
||||
|
||||
const episodes = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const state = createDummyState(false);
|
||||
|
||||
// This should resolve immediately with the UNMODIFIED array because execution is background
|
||||
const result = await orchestrator.executeTriggerSync(
|
||||
'new_message',
|
||||
episodes,
|
||||
new Set(episodes.map(e => e.id)),
|
||||
ship,
|
||||
new Set(ship.map(s => s.id)),
|
||||
state,
|
||||
);
|
||||
|
||||
@@ -228,19 +228,19 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
registry,
|
||||
);
|
||||
|
||||
const episodes = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const state = createDummyState(false);
|
||||
|
||||
// It should not throw! It should swallow the error and return the unmodified array.
|
||||
const result = await orchestrator.executeTriggerSync(
|
||||
'new_message',
|
||||
episodes,
|
||||
new Set(episodes.map(e => e.id)),
|
||||
ship,
|
||||
new Set(ship.map(s => s.id)),
|
||||
state,
|
||||
);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result).toStrictEqual(episodes);
|
||||
expect(result).toStrictEqual(ship);
|
||||
});
|
||||
|
||||
it('automatically binds to retained_exceeded trigger via EventBus', () => {
|
||||
@@ -264,10 +264,10 @@ describe('PipelineOrchestrator (Component)', () => {
|
||||
|
||||
new PipelineOrchestrator(config, env, eventBus, env.tracer, registry);
|
||||
|
||||
const episodes = [createDummyNode('1', 'USER_PROMPT')];
|
||||
const ship = [createDummyNode('1', 'USER_PROMPT')];
|
||||
|
||||
// Emit the trigger
|
||||
eventBus.emitConsolidationNeeded({ episodes, targetDeficit: 100, targetNodeIds: new Set() });
|
||||
eventBus.emitConsolidationNeeded({ ship, targetDeficit: 100, targetNodeIds: new Set() });
|
||||
|
||||
expect(executeSpy).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
*/
|
||||
|
||||
import type { ConcreteNode } from '../ir/types.js';
|
||||
import type { ContextProcessor, ContextAccountingState, ContextPatch } from '../pipeline.js';
|
||||
import type { ContextProcessor, ContextWorker, ContextAccountingState } from '../pipeline.js';
|
||||
import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js';
|
||||
import type {
|
||||
ContextEnvironment,
|
||||
@@ -14,10 +14,12 @@ import type {
|
||||
} from './environment.js';
|
||||
import type { ProcessorRegistry } from './registry.js';
|
||||
import { debugLogger } from '../../utils/debugLogger.js';
|
||||
import { InboxSnapshotImpl } from './inbox.js';
|
||||
|
||||
export class PipelineOrchestrator {
|
||||
private activeTimers: NodeJS.Timeout[] = [];
|
||||
private readonly instantiatedProcessors = new Map<string, ContextProcessor>();
|
||||
private readonly instantiatedWorkers = new Map<string, ContextWorker>();
|
||||
|
||||
constructor(
|
||||
private readonly config: SidecarConfig,
|
||||
@@ -27,6 +29,7 @@ export class PipelineOrchestrator {
|
||||
private readonly registry: ProcessorRegistry,
|
||||
) {
|
||||
this.instantiateProcessors();
|
||||
this.instantiateWorkers();
|
||||
this.setupTriggers();
|
||||
}
|
||||
|
||||
@@ -35,14 +38,26 @@ export class PipelineOrchestrator {
|
||||
for (const procDef of pipeline.processors) {
|
||||
if (!this.instantiatedProcessors.has(procDef.processorId)) {
|
||||
const factory = this.registry.get(procDef.processorId);
|
||||
const instance = factory.create(this.env, procDef.options || {});
|
||||
const instance = factory.create(this.env, procDef.options || {}) as ContextProcessor;
|
||||
this.instantiatedProcessors.set(procDef.processorId, instance);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private instantiateWorkers() {
|
||||
if (!this.config.workers) return;
|
||||
for (const workerDef of this.config.workers) {
|
||||
if (!this.instantiatedWorkers.has(workerDef.workerId)) {
|
||||
const factory = this.registry.get(workerDef.workerId);
|
||||
const instance = factory.create(this.env, workerDef.options || {}) as unknown as ContextWorker;
|
||||
this.instantiatedWorkers.set(workerDef.workerId, instance);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private setupTriggers() {
|
||||
// 1. Pipeline Triggers
|
||||
for (const pipeline of this.config.pipelines) {
|
||||
for (const trigger of pipeline.triggers) {
|
||||
if (typeof trigger === 'object' && trigger.type === 'timer') {
|
||||
@@ -60,7 +75,6 @@ export class PipelineOrchestrator {
|
||||
deficitTokens: event.targetDeficit,
|
||||
protectedLogicalIds: new Set(),
|
||||
};
|
||||
// Note: In a real implementation, event.episodes needs to be mapped to the Concrete Ship
|
||||
void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state);
|
||||
});
|
||||
} else if (trigger === 'new_message') {
|
||||
@@ -78,6 +92,23 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Worker Triggers (onNodesAdded is roughly onChunkReceived for now)
|
||||
this.eventBus.onChunkReceived((event) => {
|
||||
// Fire all workers that care about new nodes
|
||||
for (const worker of this.instantiatedWorkers.values()) {
|
||||
if (worker.triggers.onNodesAdded) {
|
||||
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages());
|
||||
// Fire and forget
|
||||
worker.execute({ targets: [], inbox: inboxSnapshot }).catch(e => {
|
||||
debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// We don't have a formal event bus for inbox publish yet, but we will soon.
|
||||
// For now the workers are just registered.
|
||||
}
|
||||
|
||||
shutdown() {
|
||||
@@ -86,10 +117,6 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluates the subset returned by the processor against the original targets,
|
||||
* deducing the removed and inserted nodes, and updating the Ship accordingly.
|
||||
*/
|
||||
applyProcessorDiff(
|
||||
ship: ReadonlyArray<ConcreteNode>,
|
||||
targets: ReadonlyArray<ConcreteNode>,
|
||||
@@ -102,9 +129,6 @@ export class PipelineOrchestrator {
|
||||
const removedIds = new Set<string>();
|
||||
const newNodes: ConcreteNode[] = [];
|
||||
|
||||
// 1. Identify Removals & Modifications
|
||||
// If a target is missing from returnedMap -> Removed
|
||||
// If a target is in returnedMap but !== object ref -> Modified (Remove old, Insert new)
|
||||
for (const t of targets) {
|
||||
const returnedNode = returnedMap.get(t.id);
|
||||
if (!returnedNode) {
|
||||
@@ -115,7 +139,6 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Identify pure Additions (New synthetic nodes)
|
||||
for (const r of returnedNodes) {
|
||||
if (!targetSet.has(r.id)) {
|
||||
newNodes.push(r);
|
||||
@@ -123,10 +146,9 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
|
||||
if (removedIds.size === 0 && newNodes.length === 0) {
|
||||
return ship; // No changes
|
||||
return ship;
|
||||
}
|
||||
|
||||
// Find the earliest index in the ship where a removal occurred so we know where to insert
|
||||
let earliestRemovalIdx = mutableShip.length;
|
||||
let i = 0;
|
||||
while (i < mutableShip.length) {
|
||||
@@ -138,10 +160,7 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
// Insert new nodes exactly where the old nodes were removed
|
||||
if (newNodes.length > 0) {
|
||||
// NOTE: Metadata appending (who, what, when) should ideally happen here
|
||||
// But for V1, processors still construct the new nodes with metadata inside.
|
||||
mutableShip.splice(earliestRemovalIdx, 0, ...newNodes);
|
||||
}
|
||||
|
||||
@@ -157,6 +176,9 @@ export class PipelineOrchestrator {
|
||||
let currentShip = ship;
|
||||
const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger));
|
||||
|
||||
// Freeze the inbox for this pipeline run
|
||||
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages());
|
||||
|
||||
for (const pipeline of pipelines) {
|
||||
for (const procDef of pipeline.processors) {
|
||||
const processor = this.instantiatedProcessors.get(procDef.processorId);
|
||||
@@ -168,18 +190,16 @@ export class PipelineOrchestrator {
|
||||
`Executing processor synchronously: ${procDef.processorId}`,
|
||||
);
|
||||
|
||||
// 1. Filter out protected nodes
|
||||
const allowedTargets = currentShip.filter(n =>
|
||||
triggerTargets.has(n.id) &&
|
||||
(!n.logicalParentId || !state.protectedLogicalIds.has(n.logicalParentId))
|
||||
);
|
||||
|
||||
const returnedNodes = await processor.process({
|
||||
ship: currentShip,
|
||||
buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully
|
||||
targets: allowedTargets,
|
||||
state,
|
||||
buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully
|
||||
inbox: {} as any, // TODO: Implement ContextInbox fully
|
||||
inbox: inboxSnapshot,
|
||||
});
|
||||
|
||||
currentShip = this.applyProcessorDiff(currentShip, allowedTargets, returnedNodes);
|
||||
@@ -193,6 +213,9 @@ export class PipelineOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
// Success! Drain consumed messages
|
||||
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
|
||||
return currentShip;
|
||||
}
|
||||
|
||||
@@ -209,6 +232,7 @@ export class PipelineOrchestrator {
|
||||
if (!ship || ship.length === 0) return;
|
||||
|
||||
let currentShip = ship;
|
||||
const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages());
|
||||
|
||||
for (const procDef of pipeline.processors) {
|
||||
const processor = this.instantiatedProcessors.get(procDef.processorId);
|
||||
@@ -220,18 +244,16 @@ export class PipelineOrchestrator {
|
||||
`Executing processor: ${procDef.processorId} (async)`,
|
||||
);
|
||||
|
||||
// 1. Filter out protected nodes
|
||||
const allowedTargets = currentShip.filter(n =>
|
||||
triggerTargets.has(n.id) &&
|
||||
(!n.logicalParentId || !state.protectedLogicalIds.has(n.logicalParentId))
|
||||
);
|
||||
|
||||
const returnedNodes = await processor.process({
|
||||
ship: currentShip,
|
||||
buffer: {} as any,
|
||||
targets: allowedTargets,
|
||||
state,
|
||||
buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully
|
||||
inbox: {} as any, // TODO: Implement ContextInbox fully
|
||||
inbox: inboxSnapshot,
|
||||
});
|
||||
|
||||
currentShip = this.applyProcessorDiff(currentShip, allowedTargets, returnedNodes);
|
||||
@@ -244,5 +266,7 @@ export class PipelineOrchestrator {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import type { StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js';
|
||||
|
||||
/**
|
||||
* Definition of a processor or worker to be instantiated in the graph.
|
||||
*/
|
||||
@@ -25,13 +23,18 @@ export type ProcessorConfig =
|
||||
}
|
||||
| {
|
||||
processorId: 'StateSnapshotProcessor';
|
||||
options: StateSnapshotProcessorOptions;
|
||||
options?: Record<string, unknown>;
|
||||
}
|
||||
| {
|
||||
processorId: 'EmergencyTruncationProcessor';
|
||||
options?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export interface WorkerConfig {
|
||||
workerId: string;
|
||||
options?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export type PipelineTrigger =
|
||||
| 'new_message'
|
||||
| 'retained_exceeded'
|
||||
@@ -57,4 +60,7 @@ export interface SidecarConfig {
|
||||
|
||||
/** The execution graphs for context manipulation */
|
||||
pipelines: PipelineDef[];
|
||||
|
||||
/** Background actors that generate data for pipelines */
|
||||
workers?: WorkerConfig[];
|
||||
}
|
||||
|
||||
@@ -150,10 +150,11 @@ export class SimulationHarness {
|
||||
targetId: ep.id,
|
||||
variantId: 'v-emergency',
|
||||
variant: {
|
||||
status: 'ready',
|
||||
type: 'masked', // Truncation is technically a mask
|
||||
text: ep.yield?.text || '',
|
||||
recoveredTokens: 0,
|
||||
type: 'MASKED_TOOL',
|
||||
id: 'mock-id',
|
||||
metadata: { currentTokens: 0, originalTokens: 0, transformations: [] },
|
||||
tokens: { intent: 0, observation: 0 },
|
||||
intent: {}, observation: {}, toolName: 'tool',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5,22 +5,24 @@
|
||||
*/
|
||||
|
||||
import { vi } from 'vitest';
|
||||
import type { Config } from '../../config/config.js';
|
||||
import type { ContextEnvironment } from '../sidecar/environment.js';
|
||||
import type { Content } from '@google/genai';
|
||||
import { AgentChatHistory } from '../../core/agentChatHistory.js';
|
||||
import type { ConcreteNode, ToolExecution } from "../ir/types.js";
|
||||
import { ContextManager } from '../contextManager.js';
|
||||
import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js';
|
||||
import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js';
|
||||
import type {
|
||||
Episode,
|
||||
UserPrompt,
|
||||
SystemEvent,
|
||||
SemanticPart,
|
||||
} from '../ir/types.js';
|
||||
import type { ContextAccountingState } from '../pipeline.js';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { ContextTracer } from '../tracer.js';
|
||||
import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js';
|
||||
import { SidecarLoader } from '../sidecar/SidecarLoader.js';
|
||||
import { ContextEventBus } from '../eventBus.js';
|
||||
import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js';
|
||||
import { ProcessorRegistry } from '../sidecar/registry.js';
|
||||
import { registerBuiltInProcessors } from '../sidecar/builtins.js';
|
||||
import type { ContextAccountingState } from '../pipeline.js';
|
||||
import type { ConcreteNode, ToolExecution } from '../ir/types.js';
|
||||
import type { ContextEnvironment } from '../sidecar/environment.js';
|
||||
import type { Config } from '../../config/config.js';
|
||||
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
|
||||
import type { Content } from '@google/genai';
|
||||
|
||||
export function createDummyState(
|
||||
isSatisfied = false,
|
||||
@@ -47,6 +49,7 @@ export function createDummyNode(
|
||||
overrides?: Partial<ConcreteNode>,
|
||||
id?: string
|
||||
): ConcreteNode {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return {
|
||||
id: id || randomUUID(),
|
||||
episodeId: logicalParentId,
|
||||
@@ -73,6 +76,7 @@ export function createDummyToolNode(
|
||||
overrides?: Partial<ToolExecution>,
|
||||
id?: string
|
||||
): ToolExecution {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return {
|
||||
id: id || randomUUID(),
|
||||
episodeId: logicalParentId,
|
||||
@@ -95,64 +99,7 @@ export function createDummyToolNode(
|
||||
} as unknown as ToolExecution;
|
||||
}
|
||||
|
||||
export function createDummyEpisode(
|
||||
id: string,
|
||||
type: 'USER_PROMPT' | 'SYSTEM_EVENT',
|
||||
parts: SemanticPart[] = [],
|
||||
toolSteps: Array<{
|
||||
intent: Record<string, unknown>;
|
||||
observation: Record<string, unknown>;
|
||||
toolName?: string;
|
||||
tokens?: { intent: number; observation: number };
|
||||
}> = [],
|
||||
): Episode {
|
||||
let trigger: UserPrompt | SystemEvent;
|
||||
|
||||
if (type === 'USER_PROMPT') {
|
||||
trigger = {
|
||||
id: randomUUID(),
|
||||
type: 'USER_PROMPT',
|
||||
semanticParts: parts,
|
||||
metadata: {
|
||||
originalTokens: 100,
|
||||
currentTokens: 100,
|
||||
transformations: [],
|
||||
},
|
||||
};
|
||||
} else {
|
||||
trigger = {
|
||||
id: randomUUID(),
|
||||
type: 'SYSTEM_EVENT',
|
||||
name: 'dummy_event',
|
||||
payload: {},
|
||||
metadata: {
|
||||
originalTokens: 100,
|
||||
currentTokens: 100,
|
||||
transformations: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
type: 'EPISODE',
|
||||
id,
|
||||
timestamp: Date.now(),
|
||||
trigger,
|
||||
steps: toolSteps.map((step) => ({
|
||||
id: randomUUID(),
|
||||
type: 'TOOL_EXECUTION',
|
||||
toolName: step.toolName || 'test_tool',
|
||||
intent: step.intent,
|
||||
observation: step.observation,
|
||||
tokens: step.tokens || { intent: 50, observation: 50 },
|
||||
metadata: {
|
||||
originalTokens: 100,
|
||||
currentTokens: 100,
|
||||
transformations: [],
|
||||
},
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
export function createMockEnvironment(overrides?: Partial<ContextEnvironment>): ContextEnvironment {
|
||||
return {
|
||||
@@ -233,14 +180,6 @@ export function createMockContextConfig(
|
||||
/**
|
||||
* Wires up a full ContextManager component with an AgentChatHistory and active background workers.
|
||||
*/
|
||||
import { ContextTracer } from '../tracer.js';
|
||||
import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js';
|
||||
import { SidecarLoader } from '../sidecar/SidecarLoader.js';
|
||||
import { ContextEventBus } from '../eventBus.js';
|
||||
import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js';
|
||||
import type { BaseLlmClient } from 'src/core/baseLlmClient.js';
|
||||
import { ProcessorRegistry } from '../sidecar/registry.js';
|
||||
import { registerBuiltInProcessors } from '../sidecar/builtins.js';
|
||||
|
||||
export function setupContextComponentTest(config: Config) {
|
||||
const chatHistory = new AgentChatHistory();
|
||||
|
||||
@@ -6,10 +6,9 @@
|
||||
|
||||
import type { Part } from '@google/genai';
|
||||
import { estimateTokenCountSync as baseEstimate } from '../../utils/tokenCalculation.js';
|
||||
|
||||
import type { ConcreteNode } from '../ir/types.js';
|
||||
|
||||
/**
|
||||
import type { ConcreteNode } from "../ir/types.js";
|
||||
* The flat token cost assigned to a single multi-modal asset (like an image tile)
|
||||
* by the Gemini API. We use this as a baseline heuristic for inlineData/fileData.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user