diff --git a/packages/core/src/context/ir/toIr.ts b/packages/core/src/context/ir/toIr.ts index 1298fdbdd8..c68efe15f1 100644 --- a/packages/core/src/context/ir/toIr.ts +++ b/packages/core/src/context/ir/toIr.ts @@ -14,10 +14,8 @@ import type { AgentThought, AgentYield, UserPrompt, - SystemEvent, } from './types.js'; import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; -import { isAgentThought } from './graphUtils.js'; // WeakMap to provide stable, deterministic identity across parses for the exact same Content/Part references const nodeIdentityMap = new WeakMap(); diff --git a/packages/core/src/context/pipeline.ts b/packages/core/src/context/pipeline.ts index b8130f9249..12fc59808c 100644 --- a/packages/core/src/context/pipeline.ts +++ b/packages/core/src/context/pipeline.ts @@ -4,104 +4,62 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { ConcreteNode, Snapshot, RollingSummary } from './ir/types.js'; +import type { ConcreteNode } from './ir/types.js'; -export type InboxMessage = - | { type: 'SNAPSHOT_READY'; snapshot: Snapshot; abstractsIds: string[] } - | { type: 'BACKGROUND_SUMMARY'; summary: RollingSummary; targetId: string }; +export interface InboxMessage { + id: string; + topic: string; + payload: T; + timestamp: number; +} -export interface ContextInbox { - dispatch(message: InboxMessage): void; - peek(type: T): Extract | undefined; +export interface InboxSnapshot { + getMessages(topic: string): ReadonlyArray>; + consume(messageId: string): void; } export interface ContextWorkingBuffer { - /** The current active (projected) flat list of ConcreteNodes. */ readonly nodes: readonly ConcreteNode[]; - - /** Retrieves the historical, pristine version of a node (before any masks/summaries). */ getPristineNode(id: string): ConcreteNode | undefined; - - /** Retrieves the full audit lineage of a specific node ID. */ getLineage(id: string): readonly ConcreteNode[]; } -/** - * State object passed through the processing pipeline. - * Contains global accounting logic and semantic protection rules. - */ export interface ContextAccountingState { readonly currentTokens: number; readonly maxTokens: number; readonly retainedTokens: number; - - /** The exact number of tokens that need to be trimmed to reach the retainedTokens goal */ readonly deficitTokens: number; - - /** - * Set of Logical Node IDs (like Tasks or Episodes) that the orchestrator has deemed highly protected. - * Processors should generally skip mutating Concrete Nodes that belong to these parents. - */ readonly protectedLogicalIds: ReadonlySet; - - /** - * True if currentTokens <= retainedTokens. - */ readonly isBudgetSatisfied: boolean; } -/** - * A declarative instruction from a processor on how to modify the Ship. - * Applied sequentially by the Orchestrator (Reducer). - */ export interface ProcessArgs { - /** The rich buffer containing current nodes and their history. */ readonly buffer: ContextWorkingBuffer; - - /** - * The unprotected, mutable subset of nodes targeted by this trigger. - * The Orchestrator strictly filters out ANY protected nodes (like active tasks) before calling. - * Processors can assume all targets passed here are legally theirs to mutate or drop. - */ readonly targets: readonly ConcreteNode[]; - - /** The token budget and accounting state. */ readonly state: ContextAccountingState; - - /** Type-safe messaging system for async/sync coordination. */ - readonly inbox: ContextInbox; + readonly inbox: InboxSnapshot; } -/** - * Interface for all context degradation strategies. - * Processors are pure functional map/filter operations over the targets. - */ export interface ContextProcessor { - /** Unique ID for registry mapping. */ readonly id: string; - /** Unique name for telemetry and logging. */ readonly name: string; - - /** - * A pure function. Returns the new state of the `targets`. - * If an ID from `targets` is missing in the return array, the Orchestrator deletes it. - * If a new synthetic node is in the return array, the Orchestrator inserts it. - * The Orchestrator automatically appends audit `IrMetadata` to any changes. - */ process(args: ProcessArgs): Promise; } -/** - * Standardized configuration options for processors that act as a GC Backstop. - * Defines exactly how much of the targeted (degraded/aged-out) history should be cleared. - */ +export interface ContextWorker { + readonly id: string; + readonly name: string; + readonly triggers: { + onNodesAdded?: boolean; + onInboxTopics?: string[]; + }; + execute(args: { + targets: ReadonlyArray; + inbox: InboxSnapshot; + }): Promise; +} + export interface BackstopTargetOptions { - /** - * - 'incremental': Remove just enough to get under the threshold (maxTokens or retainedTokens). - * - 'freeNTokens': Remove enough to free an explicit number of tokens (defined in freeTokensTarget). - * - 'max': Remove/Summarize all explicitly targeted nodes (everything that aged out). - */ target?: 'incremental' | 'freeNTokens' | 'max'; - /** If target is 'freeNTokens', this is the amount of tokens to clear. */ freeTokensTarget?: number; -} \ No newline at end of file +} diff --git a/packages/core/src/context/processors/blobDegradationProcessor.test.ts b/packages/core/src/context/processors/blobDegradationProcessor.test.ts index 4ca45c931b..7bd0cb11a6 100644 --- a/packages/core/src/context/processors/blobDegradationProcessor.test.ts +++ b/packages/core/src/context/processors/blobDegradationProcessor.test.ts @@ -35,7 +35,6 @@ describe('BlobDegradationProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, @@ -86,7 +85,6 @@ describe('BlobDegradationProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, @@ -116,7 +114,6 @@ describe('BlobDegradationProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, diff --git a/packages/core/src/context/processors/historySquashingProcessor.test.ts b/packages/core/src/context/processors/historySquashingProcessor.test.ts index 85ceac7e88..5b195c1aca 100644 --- a/packages/core/src/context/processors/historySquashingProcessor.test.ts +++ b/packages/core/src/context/processors/historySquashingProcessor.test.ts @@ -47,7 +47,6 @@ describe('HistorySquashingProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, @@ -111,7 +110,6 @@ describe('HistorySquashingProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.test.ts b/packages/core/src/context/processors/semanticCompressionProcessor.test.ts index 45c52d0a04..cf7209956d 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.test.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.test.ts @@ -56,7 +56,6 @@ describe('SemanticCompressionProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, @@ -133,7 +132,6 @@ describe('SemanticCompressionProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets, state, inbox: {} as any, diff --git a/packages/core/src/context/processors/semanticCompressionProcessor.ts b/packages/core/src/context/processors/semanticCompressionProcessor.ts index fa38c26bf2..71c97f0d48 100644 --- a/packages/core/src/context/processors/semanticCompressionProcessor.ts +++ b/packages/core/src/context/processors/semanticCompressionProcessor.ts @@ -1,7 +1,6 @@ import type { ContextProcessor, ProcessArgs } from '../pipeline.js'; import type { ContextEnvironment } from '../sidecar/environment.js'; import { debugLogger } from '../../utils/debugLogger.js'; -import { LlmRole } from '../../telemetry/types.js'; import { getResponseText } from '../../utils/partUtils.js'; import type { ConcreteNode, UserPrompt, AgentThought, ToolExecution } from '../ir/types.js'; @@ -32,7 +31,6 @@ export class SemanticCompressionProcessor implements ContextProcessor { readonly name = 'SemanticCompressionProcessor'; readonly options: SemanticCompressionProcessorOptions; private env: ContextEnvironment; - private modelToUse: string = 'gemini-2.5-flash'; constructor( env: ContextEnvironment, @@ -49,9 +47,13 @@ export class SemanticCompressionProcessor implements ContextProcessor { try { const response = await this.env.llmClient.generateContent( { + role: 'user' as any, + modelConfigKey: 'default' as any, + promptId: this.env.promptId, + abortSignal: new AbortController().signal, contents: [ { - role: 'user', + role: 'user' as any, parts: [{ text }], }, ], @@ -63,8 +65,7 @@ export class SemanticCompressionProcessor implements ContextProcessor { }, ], }, - }, - this.modelToUse + } ); return getResponseText(response) || text; } catch (e) { diff --git a/packages/core/src/context/processors/toolMaskingProcessor.test.ts b/packages/core/src/context/processors/toolMaskingProcessor.test.ts index c019fae74c..690eb0fe29 100644 --- a/packages/core/src/context/processors/toolMaskingProcessor.test.ts +++ b/packages/core/src/context/processors/toolMaskingProcessor.test.ts @@ -34,7 +34,6 @@ describe('ToolMaskingProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets: [toolStep], state, inbox: {} as any, @@ -75,7 +74,6 @@ describe('ToolMaskingProcessor', () => { const result = await processor.process({ buffer: {} as any, - ship: [], targets: [toolStep], state, inbox: {} as any, diff --git a/packages/core/src/context/sidecar/builtins.ts b/packages/core/src/context/sidecar/builtins.ts index 7a58412d6c..c1ec6511bc 100644 --- a/packages/core/src/context/sidecar/builtins.ts +++ b/packages/core/src/context/sidecar/builtins.ts @@ -1,6 +1,6 @@ import { ProcessorRegistry } from './registry.js'; import { EmergencyTruncationProcessor, type EmergencyTruncationProcessorOptions } from '../processors/emergencyTruncationProcessor.js'; -import { BlobDegradationProcessor, type BlobDegradationProcessorOptions } from '../processors/blobDegradationProcessor.js'; +import { BlobDegradationProcessor } from '../processors/blobDegradationProcessor.js'; export function registerBuiltInProcessors(registry: ProcessorRegistry) { registry.register>({ diff --git a/packages/core/src/context/sidecar/environment.ts b/packages/core/src/context/sidecar/environment.ts index ee66ec13d0..11bf6f64fc 100644 --- a/packages/core/src/context/sidecar/environment.ts +++ b/packages/core/src/context/sidecar/environment.ts @@ -9,6 +9,7 @@ import type { ContextTokenCalculator } from '../utils/contextTokenCalculator.js' import type { ContextTracer } from '../tracer.js'; import type { IFileSystem } from '../system/IFileSystem.js'; import type { IIdGenerator } from '../system/IIdGenerator.js'; +import type { LiveInbox } from './inbox.js'; export type { ContextTracer, ContextEventBus }; @@ -24,4 +25,5 @@ export interface ContextEnvironment { readonly fileSystem: IFileSystem; readonly idGenerator: IIdGenerator; readonly eventBus: ContextEventBus; + readonly inbox: LiveInbox; } diff --git a/packages/core/src/context/sidecar/environmentImpl.ts b/packages/core/src/context/sidecar/environmentImpl.ts index 0987e317de..d14aeb1ca0 100644 --- a/packages/core/src/context/sidecar/environmentImpl.ts +++ b/packages/core/src/context/sidecar/environmentImpl.ts @@ -14,10 +14,13 @@ import { NodeFileSystem } from '../system/NodeFileSystem.js'; import type { IIdGenerator } from '../system/IIdGenerator.js'; import { NodeIdGenerator } from '../system/NodeIdGenerator.js'; +import { LiveInbox } from './inbox.js'; + export class ContextEnvironmentImpl implements ContextEnvironment { readonly tokenCalculator: ContextTokenCalculator; readonly fileSystem: IFileSystem; readonly idGenerator: IIdGenerator; + readonly inbox: LiveInbox; constructor( readonly llmClient: BaseLlmClient, @@ -34,5 +37,6 @@ export class ContextEnvironmentImpl implements ContextEnvironment { this.tokenCalculator = new ContextTokenCalculator(this.charsPerToken); this.fileSystem = fileSystem || new NodeFileSystem(); this.idGenerator = idGenerator || new NodeIdGenerator(); + this.inbox = new LiveInbox(); } } diff --git a/packages/core/src/context/sidecar/inbox.ts b/packages/core/src/context/sidecar/inbox.ts new file mode 100644 index 0000000000..faf18c10ce --- /dev/null +++ b/packages/core/src/context/sidecar/inbox.ts @@ -0,0 +1,43 @@ +import type { InboxMessage, InboxSnapshot } from '../pipeline.js'; + +export class LiveInbox { + private messages: InboxMessage[] = []; + + publish(topic: string, payload: T, idGenerator: { generateId(): string }): void { + this.messages.push({ + id: idGenerator.generateId(), + topic, + payload, + timestamp: Date.now(), + }); + } + + getMessages(): ReadonlyArray { + return [...this.messages]; + } + + drainConsumed(consumedIds: Set): void { + this.messages = this.messages.filter((m) => !consumedIds.has(m.id)); + } +} + +export class InboxSnapshotImpl implements InboxSnapshot { + private messages: ReadonlyArray; + private consumedIds = new Set(); + + constructor(messages: ReadonlyArray) { + this.messages = messages; + } + + getMessages(topic: string): ReadonlyArray> { + return this.messages.filter((m) => m.topic === topic) as unknown as ReadonlyArray>; + } + + consume(messageId: string): void { + this.consumedIds.add(messageId); + } + + getConsumedIds(): Set { + return this.consumedIds; + } +} diff --git a/packages/core/src/context/sidecar/orchestrator.test.ts b/packages/core/src/context/sidecar/orchestrator.test.ts index 53222d841d..05284f68d6 100644 --- a/packages/core/src/context/sidecar/orchestrator.test.ts +++ b/packages/core/src/context/sidecar/orchestrator.test.ts @@ -13,7 +13,7 @@ import { createDummyNode, } from '../testing/contextTestUtils.js'; import type { ContextEnvironment } from './environment.js'; -import type { ContextAccountingState, ContextProcessor } from '../pipeline.js'; +import type { ContextProcessor } from '../pipeline.js'; import type { PipelineDef, ProcessorConfig, SidecarConfig } from './types.js'; import type { ContextEventBus } from '../eventBus.js'; @@ -154,13 +154,13 @@ describe('PipelineOrchestrator (Component)', () => { registry, ); - const episodes = [createDummyNode('1', 'USER_PROMPT')]; + const ship = [createDummyNode('1', 'USER_PROMPT')]; const state = createDummyState(false); const result = await orchestrator.executeTriggerSync( 'new_message', - episodes, - new Set(episodes.map(e => e.id)), + ship, + new Set(ship.map(s => s.id)), state, ); @@ -189,14 +189,14 @@ describe('PipelineOrchestrator (Component)', () => { registry, ); - const episodes = [createDummyNode('1', 'USER_PROMPT')]; + const ship = [createDummyNode('1', 'USER_PROMPT')]; const state = createDummyState(false); // This should resolve immediately with the UNMODIFIED array because execution is background const result = await orchestrator.executeTriggerSync( 'new_message', - episodes, - new Set(episodes.map(e => e.id)), + ship, + new Set(ship.map(s => s.id)), state, ); @@ -228,19 +228,19 @@ describe('PipelineOrchestrator (Component)', () => { registry, ); - const episodes = [createDummyNode('1', 'USER_PROMPT')]; + const ship = [createDummyNode('1', 'USER_PROMPT')]; const state = createDummyState(false); // It should not throw! It should swallow the error and return the unmodified array. const result = await orchestrator.executeTriggerSync( 'new_message', - episodes, - new Set(episodes.map(e => e.id)), + ship, + new Set(ship.map(s => s.id)), state, ); expect(result).toHaveLength(1); - expect(result).toStrictEqual(episodes); + expect(result).toStrictEqual(ship); }); it('automatically binds to retained_exceeded trigger via EventBus', () => { @@ -264,10 +264,10 @@ describe('PipelineOrchestrator (Component)', () => { new PipelineOrchestrator(config, env, eventBus, env.tracer, registry); - const episodes = [createDummyNode('1', 'USER_PROMPT')]; + const ship = [createDummyNode('1', 'USER_PROMPT')]; // Emit the trigger - eventBus.emitConsolidationNeeded({ episodes, targetDeficit: 100, targetNodeIds: new Set() }); + eventBus.emitConsolidationNeeded({ ship, targetDeficit: 100, targetNodeIds: new Set() }); expect(executeSpy).toHaveBeenCalled(); }); diff --git a/packages/core/src/context/sidecar/orchestrator.ts b/packages/core/src/context/sidecar/orchestrator.ts index 61f1ef2d22..4cae47d550 100644 --- a/packages/core/src/context/sidecar/orchestrator.ts +++ b/packages/core/src/context/sidecar/orchestrator.ts @@ -5,7 +5,7 @@ */ import type { ConcreteNode } from '../ir/types.js'; -import type { ContextProcessor, ContextAccountingState, ContextPatch } from '../pipeline.js'; +import type { ContextProcessor, ContextWorker, ContextAccountingState } from '../pipeline.js'; import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js'; import type { ContextEnvironment, @@ -14,10 +14,12 @@ import type { } from './environment.js'; import type { ProcessorRegistry } from './registry.js'; import { debugLogger } from '../../utils/debugLogger.js'; +import { InboxSnapshotImpl } from './inbox.js'; export class PipelineOrchestrator { private activeTimers: NodeJS.Timeout[] = []; private readonly instantiatedProcessors = new Map(); + private readonly instantiatedWorkers = new Map(); constructor( private readonly config: SidecarConfig, @@ -27,6 +29,7 @@ export class PipelineOrchestrator { private readonly registry: ProcessorRegistry, ) { this.instantiateProcessors(); + this.instantiateWorkers(); this.setupTriggers(); } @@ -35,14 +38,26 @@ export class PipelineOrchestrator { for (const procDef of pipeline.processors) { if (!this.instantiatedProcessors.has(procDef.processorId)) { const factory = this.registry.get(procDef.processorId); - const instance = factory.create(this.env, procDef.options || {}); + const instance = factory.create(this.env, procDef.options || {}) as ContextProcessor; this.instantiatedProcessors.set(procDef.processorId, instance); } } } } + private instantiateWorkers() { + if (!this.config.workers) return; + for (const workerDef of this.config.workers) { + if (!this.instantiatedWorkers.has(workerDef.workerId)) { + const factory = this.registry.get(workerDef.workerId); + const instance = factory.create(this.env, workerDef.options || {}) as unknown as ContextWorker; + this.instantiatedWorkers.set(workerDef.workerId, instance); + } + } + } + private setupTriggers() { + // 1. Pipeline Triggers for (const pipeline of this.config.pipelines) { for (const trigger of pipeline.triggers) { if (typeof trigger === 'object' && trigger.type === 'timer') { @@ -60,7 +75,6 @@ export class PipelineOrchestrator { deficitTokens: event.targetDeficit, protectedLogicalIds: new Set(), }; - // Note: In a real implementation, event.episodes needs to be mapped to the Concrete Ship void this.executePipelineAsync(pipeline, [], event.targetNodeIds, state); }); } else if (trigger === 'new_message') { @@ -78,6 +92,23 @@ export class PipelineOrchestrator { } } } + + // 2. Worker Triggers (onNodesAdded is roughly onChunkReceived for now) + this.eventBus.onChunkReceived((event) => { + // Fire all workers that care about new nodes + for (const worker of this.instantiatedWorkers.values()) { + if (worker.triggers.onNodesAdded) { + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages()); + // Fire and forget + worker.execute({ targets: [], inbox: inboxSnapshot }).catch(e => { + debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e); + }); + } + } + }); + + // We don't have a formal event bus for inbox publish yet, but we will soon. + // For now the workers are just registered. } shutdown() { @@ -86,10 +117,6 @@ export class PipelineOrchestrator { } } - /** - * Evaluates the subset returned by the processor against the original targets, - * deducing the removed and inserted nodes, and updating the Ship accordingly. - */ applyProcessorDiff( ship: ReadonlyArray, targets: ReadonlyArray, @@ -102,9 +129,6 @@ export class PipelineOrchestrator { const removedIds = new Set(); const newNodes: ConcreteNode[] = []; - // 1. Identify Removals & Modifications - // If a target is missing from returnedMap -> Removed - // If a target is in returnedMap but !== object ref -> Modified (Remove old, Insert new) for (const t of targets) { const returnedNode = returnedMap.get(t.id); if (!returnedNode) { @@ -115,7 +139,6 @@ export class PipelineOrchestrator { } } - // 2. Identify pure Additions (New synthetic nodes) for (const r of returnedNodes) { if (!targetSet.has(r.id)) { newNodes.push(r); @@ -123,10 +146,9 @@ export class PipelineOrchestrator { } if (removedIds.size === 0 && newNodes.length === 0) { - return ship; // No changes + return ship; } - // Find the earliest index in the ship where a removal occurred so we know where to insert let earliestRemovalIdx = mutableShip.length; let i = 0; while (i < mutableShip.length) { @@ -138,10 +160,7 @@ export class PipelineOrchestrator { } } - // Insert new nodes exactly where the old nodes were removed if (newNodes.length > 0) { - // NOTE: Metadata appending (who, what, when) should ideally happen here - // But for V1, processors still construct the new nodes with metadata inside. mutableShip.splice(earliestRemovalIdx, 0, ...newNodes); } @@ -157,6 +176,9 @@ export class PipelineOrchestrator { let currentShip = ship; const pipelines = this.config.pipelines.filter((p) => p.triggers.includes(trigger)); + // Freeze the inbox for this pipeline run + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages()); + for (const pipeline of pipelines) { for (const procDef of pipeline.processors) { const processor = this.instantiatedProcessors.get(procDef.processorId); @@ -168,18 +190,16 @@ export class PipelineOrchestrator { `Executing processor synchronously: ${procDef.processorId}`, ); - // 1. Filter out protected nodes const allowedTargets = currentShip.filter(n => triggerTargets.has(n.id) && (!n.logicalParentId || !state.protectedLogicalIds.has(n.logicalParentId)) ); const returnedNodes = await processor.process({ - ship: currentShip, + buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully targets: allowedTargets, state, - buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully - inbox: {} as any, // TODO: Implement ContextInbox fully + inbox: inboxSnapshot, }); currentShip = this.applyProcessorDiff(currentShip, allowedTargets, returnedNodes); @@ -193,6 +213,9 @@ export class PipelineOrchestrator { } } + // Success! Drain consumed messages + this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds()); + return currentShip; } @@ -209,6 +232,7 @@ export class PipelineOrchestrator { if (!ship || ship.length === 0) return; let currentShip = ship; + const inboxSnapshot = new InboxSnapshotImpl(this.env.inbox.getMessages()); for (const procDef of pipeline.processors) { const processor = this.instantiatedProcessors.get(procDef.processorId); @@ -220,18 +244,16 @@ export class PipelineOrchestrator { `Executing processor: ${procDef.processorId} (async)`, ); - // 1. Filter out protected nodes const allowedTargets = currentShip.filter(n => triggerTargets.has(n.id) && (!n.logicalParentId || !state.protectedLogicalIds.has(n.logicalParentId)) ); const returnedNodes = await processor.process({ - ship: currentShip, + buffer: {} as any, targets: allowedTargets, state, - buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully - inbox: {} as any, // TODO: Implement ContextInbox fully + inbox: inboxSnapshot, }); currentShip = this.applyProcessorDiff(currentShip, allowedTargets, returnedNodes); @@ -244,5 +266,7 @@ export class PipelineOrchestrator { return; } } + + this.env.inbox.drainConsumed(inboxSnapshot.getConsumedIds()); } -} \ No newline at end of file +} diff --git a/packages/core/src/context/sidecar/types.ts b/packages/core/src/context/sidecar/types.ts index eb75c24f9b..81db3f45e0 100644 --- a/packages/core/src/context/sidecar/types.ts +++ b/packages/core/src/context/sidecar/types.ts @@ -4,8 +4,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { StateSnapshotProcessorOptions } from '../processors/stateSnapshotProcessor.js'; - /** * Definition of a processor or worker to be instantiated in the graph. */ @@ -25,13 +23,18 @@ export type ProcessorConfig = } | { processorId: 'StateSnapshotProcessor'; - options: StateSnapshotProcessorOptions; + options?: Record; } | { processorId: 'EmergencyTruncationProcessor'; options?: Record; }; +export interface WorkerConfig { + workerId: string; + options?: Record; +} + export type PipelineTrigger = | 'new_message' | 'retained_exceeded' @@ -57,4 +60,7 @@ export interface SidecarConfig { /** The execution graphs for context manipulation */ pipelines: PipelineDef[]; + + /** Background actors that generate data for pipelines */ + workers?: WorkerConfig[]; } diff --git a/packages/core/src/context/system-tests/SimulationHarness.ts b/packages/core/src/context/system-tests/SimulationHarness.ts index aa90326e00..c2820af6a6 100644 --- a/packages/core/src/context/system-tests/SimulationHarness.ts +++ b/packages/core/src/context/system-tests/SimulationHarness.ts @@ -150,10 +150,11 @@ export class SimulationHarness { targetId: ep.id, variantId: 'v-emergency', variant: { - status: 'ready', - type: 'masked', // Truncation is technically a mask - text: ep.yield?.text || '', - recoveredTokens: 0, + type: 'MASKED_TOOL', + id: 'mock-id', + metadata: { currentTokens: 0, originalTokens: 0, transformations: [] }, + tokens: { intent: 0, observation: 0 }, + intent: {}, observation: {}, toolName: 'tool', }, }); } diff --git a/packages/core/src/context/testing/contextTestUtils.ts b/packages/core/src/context/testing/contextTestUtils.ts index 7ffbfd41c2..d32ef27315 100644 --- a/packages/core/src/context/testing/contextTestUtils.ts +++ b/packages/core/src/context/testing/contextTestUtils.ts @@ -5,22 +5,24 @@ */ import { vi } from 'vitest'; -import type { Config } from '../../config/config.js'; -import type { ContextEnvironment } from '../sidecar/environment.js'; -import type { Content } from '@google/genai'; import { AgentChatHistory } from '../../core/agentChatHistory.js'; -import type { ConcreteNode, ToolExecution } from "../ir/types.js"; import { ContextManager } from '../contextManager.js'; import { InMemoryFileSystem } from '../system/InMemoryFileSystem.js'; import { DeterministicIdGenerator } from '../system/DeterministicIdGenerator.js'; -import type { - Episode, - UserPrompt, - SystemEvent, - SemanticPart, -} from '../ir/types.js'; -import type { ContextAccountingState } from '../pipeline.js'; import { randomUUID } from 'node:crypto'; +import { ContextTracer } from '../tracer.js'; +import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js'; +import { SidecarLoader } from '../sidecar/SidecarLoader.js'; +import { ContextEventBus } from '../eventBus.js'; +import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; +import { ProcessorRegistry } from '../sidecar/registry.js'; +import { registerBuiltInProcessors } from '../sidecar/builtins.js'; +import type { ContextAccountingState } from '../pipeline.js'; +import type { ConcreteNode, ToolExecution } from '../ir/types.js'; +import type { ContextEnvironment } from '../sidecar/environment.js'; +import type { Config } from '../../config/config.js'; +import type { BaseLlmClient } from '../../core/baseLlmClient.js'; +import type { Content } from '@google/genai'; export function createDummyState( isSatisfied = false, @@ -47,6 +49,7 @@ export function createDummyNode( overrides?: Partial, id?: string ): ConcreteNode { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion return { id: id || randomUUID(), episodeId: logicalParentId, @@ -73,6 +76,7 @@ export function createDummyToolNode( overrides?: Partial, id?: string ): ToolExecution { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion return { id: id || randomUUID(), episodeId: logicalParentId, @@ -95,64 +99,7 @@ export function createDummyToolNode( } as unknown as ToolExecution; } -export function createDummyEpisode( - id: string, - type: 'USER_PROMPT' | 'SYSTEM_EVENT', - parts: SemanticPart[] = [], - toolSteps: Array<{ - intent: Record; - observation: Record; - toolName?: string; - tokens?: { intent: number; observation: number }; - }> = [], -): Episode { - let trigger: UserPrompt | SystemEvent; - if (type === 'USER_PROMPT') { - trigger = { - id: randomUUID(), - type: 'USER_PROMPT', - semanticParts: parts, - metadata: { - originalTokens: 100, - currentTokens: 100, - transformations: [], - }, - }; - } else { - trigger = { - id: randomUUID(), - type: 'SYSTEM_EVENT', - name: 'dummy_event', - payload: {}, - metadata: { - originalTokens: 100, - currentTokens: 100, - transformations: [], - }, - }; - } - - return { - type: 'EPISODE', - id, - timestamp: Date.now(), - trigger, - steps: toolSteps.map((step) => ({ - id: randomUUID(), - type: 'TOOL_EXECUTION', - toolName: step.toolName || 'test_tool', - intent: step.intent, - observation: step.observation, - tokens: step.tokens || { intent: 50, observation: 50 }, - metadata: { - originalTokens: 100, - currentTokens: 100, - transformations: [], - }, - })), - }; -} export function createMockEnvironment(overrides?: Partial): ContextEnvironment { return { @@ -233,14 +180,6 @@ export function createMockContextConfig( /** * Wires up a full ContextManager component with an AgentChatHistory and active background workers. */ -import { ContextTracer } from '../tracer.js'; -import { ContextEnvironmentImpl } from '../sidecar/environmentImpl.js'; -import { SidecarLoader } from '../sidecar/SidecarLoader.js'; -import { ContextEventBus } from '../eventBus.js'; -import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js'; -import type { BaseLlmClient } from 'src/core/baseLlmClient.js'; -import { ProcessorRegistry } from '../sidecar/registry.js'; -import { registerBuiltInProcessors } from '../sidecar/builtins.js'; export function setupContextComponentTest(config: Config) { const chatHistory = new AgentChatHistory(); diff --git a/packages/core/src/context/utils/contextTokenCalculator.ts b/packages/core/src/context/utils/contextTokenCalculator.ts index 65c16fbd93..fa25980cc4 100644 --- a/packages/core/src/context/utils/contextTokenCalculator.ts +++ b/packages/core/src/context/utils/contextTokenCalculator.ts @@ -6,10 +6,9 @@ import type { Part } from '@google/genai'; import { estimateTokenCountSync as baseEstimate } from '../../utils/tokenCalculation.js'; - +import type { ConcreteNode } from '../ir/types.js'; /** -import type { ConcreteNode } from "../ir/types.js"; * The flat token cost assigned to a single multi-modal asset (like an image tile) * by the Gemini API. We use this as a baseline heuristic for inlineData/fileData. */