From 6867a96be0aac3bcbd10ceb0361abc158fe5eb0d Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 6 Apr 2026 18:01:32 +0000 Subject: [PATCH] building now --- packages/core/src/context/ASYNC_GC_DESIGN.md | 94 ------------ .../context/ASYNC_GC_IMPLEMENTATION_PLAN.md | 144 ------------------ .../src/context/ASYNC_GC_STATUS_REPORT.md | 52 ------- .../src/context/SIDECAR_PIPELINE_DESIGN.md | 86 ----------- .../src/context/contextManager.golden.test.ts | 7 +- packages/core/src/context/contextManager.ts | 36 ++--- .../processors/stateSnapshotProcessor.ts | 4 +- packages/core/src/context/sidecar/registry.ts | 3 +- 8 files changed, 14 insertions(+), 412 deletions(-) delete mode 100644 packages/core/src/context/ASYNC_GC_DESIGN.md delete mode 100644 packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md delete mode 100644 packages/core/src/context/ASYNC_GC_STATUS_REPORT.md delete mode 100644 packages/core/src/context/SIDECAR_PIPELINE_DESIGN.md diff --git a/packages/core/src/context/ASYNC_GC_DESIGN.md b/packages/core/src/context/ASYNC_GC_DESIGN.md deleted file mode 100644 index c6e4cbec8a..0000000000 --- a/packages/core/src/context/ASYNC_GC_DESIGN.md +++ /dev/null @@ -1,94 +0,0 @@ -# Asynchronous Context Management (Dataflow Architecture) - -## The Problem - -Context management today is an emergency response. When a chat session hits the -maximum token limit (`maxTokens`), the system halts the user's request, -synchronously runs expensive compression pipelines (masking tools, summarizing -text with LLMs), and only proceeds when the token count falls below the limit. -This introduces unacceptable latency and forces trade-offs between speed and -data fidelity. - -## The Vision: Eager Subconscious Compute - -Instead of a reactive, synchronous pipeline, Context Management should be an -**asynchronous dataflow graph**. - -Because we know old memory will _eventually_ need to be degraded or garbage -collected, we should utilize the agent's idle time (while the user is reading or -typing) to proactively compute "degraded variants" of episodes before there is -any context pressure. - -### The Three Phases of Memory Lifecycle - -#### 1. The Eager Compute Phase (Background / Continuous Streaming) - -Context pressure doesn't wait for an episode to finish. If a user pastes a -100k-token file, the budget explodes instantly. Therefore, the dataflow graph is -fed continuously. - -- Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped - into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a - `TOOL_EXECUTION` step) and broadcast immediately. -- **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background - workers.** -- They eagerly compute degraded variants on partial episodes. For instance, - `SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the - millisecond it arrives, without waiting for the model to reply. -- It attaches the result to the IR graph as - `Episode#1.trigger.variants.summary`. -- **Result:** This costs the user zero latency. The agent is - "dreaming/consolidating" granular memory chunks in the background, even during - long-running "mono-episodes." - -#### 2. Opportunistic Replacement (`retainedTokens` Threshold) - -When the active context window crosses the "ideal" size (e.g., 65k tokens): - -- The system identifies the oldest episodes that have fallen outside the - `retained` window. -- It checks if they have pre-computed variants (e.g., a `summary` or `masked` - variant). -- If yes, it instantly and silently swaps the raw episode for the degraded - variant. -- **Result:** The context gently decays over time, completely avoiding hard - limits for as long as possible, with zero latency cost. - -#### 3. The Pressure Barrier (`maxTokens` Hard Limit) - -When the active context window crosses the absolute hard limit (e.g., 150k -tokens)—perhaps because the user pasted a massive file and the background -workers couldn't keep up—the system hits a **Synchronous Barrier**. - -At this barrier, the `ContextManager` checks the user's configured -`ContextPressureStrategy` to decide how to unblock the request: - -- **Strategy A: `truncate` (The Baseline)** - - _Behavior:_ Instantly drop the oldest episodes until under `maxTokens`. - - _Tradeoff:_ Maximum speed, maximum data loss. -- **Strategy B: `incrementalGc` (Progressive)** - - _Behavior:_ Look for any pre-computed summaries/masks. If none exist, - synchronously block to compute _just enough_ summaries to survive the - current turn. - - _Tradeoff:_ Medium speed, medium data retention. -- **Strategy C: `compress` (State Snapshot)** - - _Behavior:_ Identify the oldest N episodes causing the overflow. If their - N-to-1 World State Snapshot isn't ready yet, **block the user's request** - and force the `StateSnapshotProcessor` to generate it synchronously. Once - generated, replace the N episodes with the 1 snapshot and proceed. - - _Tradeoff:_ Maximum latency, maximum data retention/fidelity. - -## Architectural Changes Required - -1. **Episode Variants:** Update the `Episode` IR type to support a `variants` - dictionary. -2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to - dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of - `AgentChatHistory`). -3. **Processor Interface:** Change `ContextProcessor` from a synchronous - `process(episodes[])` function to an asynchronous worker that listens to the - event bus, updates the `variants` dictionary, and emits `VARIANT_READY` - events. -4. **Projection Logic:** Update `projectCompressedHistory()` to act as the - Pressure Barrier, reading the user's strategy and either applying ready - variants, waiting for variants, or truncating. diff --git a/packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md b/packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md deleted file mode 100644 index aa7197eff3..0000000000 --- a/packages/core/src/context/ASYNC_GC_IMPLEMENTATION_PLAN.md +++ /dev/null @@ -1,144 +0,0 @@ -# Asynchronous Context Management Implementation Plan - -This document outlines the step-by-step implementation plan for refactoring -`ContextManager` into a fully asynchronous, event-driven dataflow graph (Eager -Subconscious Compute). - ---- - -## Phase 1: Stable Identity & Incremental IR Mapping - -**The Problem:** Currently, `IrMapper.toIr()` is stateless. It generates random -UUIDs for `Episode` and `Step` nodes every time it parses the `Content[]` array. -If the array is rebuilt while an asynchronous processor is computing a summary, -the target ID will be lost, and the variant will be orphaned. **The Goal:** -Episodes must maintain a stable identity across turns so background workers can -confidently attach variants to them. - -**Tasks:** - -1. **Deterministic Hashing or Stateful Mapping:** Update `IrMapper` to either - generate deterministic UUIDs (e.g., hashing the part text/timestamp) OR make - `ContextManager`'s pristine graph mutable, where new `PUSH` events are - mapped _incrementally_ onto the tail of `this.pristineEpisodes` rather than - rebuilding the whole array. -2. **Test Update:** Ensure `IrMapper` tests verify stable IDs across successive - parse events. - ---- - -## Phase 2: Data Structures & Event Bus - -**The Problem:** The system lacks the internal types and communication channels -to support asynchronous variant generation. **The Goal:** Define the `Variant` -schemas and the internal `EventEmitter` that will broadcast graph updates to the -async workers. - -**Tasks:** - -1. **Variant Types:** Update `packages/core/src/context/ir/types.ts`. - - Add a `variants?: Record` property to `Episode` and - `Step` (where `Variant` is a discriminated union of `SummaryVariant`, - `MaskedVariant`, `SnapshotVariant`, etc.). - - Include metadata on the variant: - `status: 'computing' | 'ready' | 'failed'`, `promise?: Promise`, - `recoveredTokens: number`. -2. **Event Bus (`ContextEventBus`):** - - Create an internal event emitter in `ContextManager` (using - `events.EventEmitter` or a lightweight alternative). - - Define Events: - - `IR_NODE_CREATED`: Fired when a new Episode/Step is mapped. (Triggers - eager compute). - - `VARIANT_READY`: Fired by a worker when it finishes computing a - summary/snapshot. - - `BUDGET_RETAINED_CROSSED`: Fired when `currentTokens > retainedTokens`. - - `BUDGET_MAX_CROSSED`: Fired when `currentTokens > maxTokens`. - ---- - -## Phase 3: Refactoring Processors into Async Workers - -**The Problem:** Processors currently implement a synchronous -`process(episodes, state) -> Promise` interface and block the main -loop. **The Goal:** Convert them into background workers that listen to the -`ContextEventBus`, perform LLM tasks asynchronously, and emit `VARIANT_READY`. - -**Tasks:** - -1. **Define `AsyncContextWorker` Interface:** - - `start(bus: ContextEventBus): void` - - `stop(): void` -2. **Implement `SemanticCompressionWorker`:** - - Listens to `IR_NODE_CREATED` (or `BUDGET_RETAINED_CROSSED` for lazier - eager compute). - - Batches old `USER_PROMPT` nodes. - - Calls LLM in background. - - Emits `VARIANT_READY` with the summary string and target Node IDs. -3. **Implement `StateSnapshotWorker`:** - - Listens to `BUDGET_RETAINED_CROSSED`. - - Identifies the N oldest raw episodes. - - Synthesizes them into a single `world_state_snapshot`. - - Emits `VARIANT_READY` containing the new Snapshot Episode and the IDs of - the N episodes it replaces. -4. **Wire Event Listeners:** `ContextManager` listens to `VARIANT_READY` and - updates the pristine graph's `variants` dictionary. - ---- - -## Phase 4: The Projection Engine & Pressure Barrier - -**The Problem:** `projectCompressedHistory()` currently runs the synchronous -pipeline. It needs to become the non-blocking opportunistic swapper and the -blocking pressure barrier. **The Goal:** Serve the LLM request instantly using -pre-computed variants, or block strictly according to the user's -`maxPressureStrategy`. - -**Tasks:** - -1. **Opportunistic Swap (`retainedTokens`):** - - When traversing `this.pristineEpisodes` to build the projected array, if - `currentTokens > retainedTokens`, check the oldest episodes. - - If an episode has a `variant.status === 'ready'`, use the variant's tokens - and text _instead_ of the raw episode. -2. **Pressure Barrier (`maxTokens`):** - - If the projected array is _still_ `> maxTokens` after all ready variants - are applied, hit the Barrier. - - Read `config.getContextManagementConfig().budget.maxPressureStrategy`. - - **If `truncate`:** Instantly drop the oldest episodes from the projection - until under budget. (Fastest). - - **If `incrementalGc`:** Await any variants that are - `status === 'computing'` for the oldest nodes until the deficit is - cleared. If none are computing, force a synchronous masking/truncation. - - **If `compress`:** Await the `StateSnapshotWorker`'s active `Promise`. If - it hasn't started, synchronously invoke it and block until the N-to-1 - snapshot is ready. - ---- - -## Phase 5: Configuration & Telemetry - -**The Goal:** Expose the new strategies to the user and ensure we can observe -the background workers. - -**Tasks:** - -1. **Config Schema:** Update `settingsSchema.ts` to include - `maxPressureStrategy: 'truncate' | 'incrementalGc' | 'compress'`. -2. **Telemetry:** Log events when background workers start/finish, including - the tokens saved and the latency of the background task. -3. **Testing:** Write concurrency tests simulating a user typing rapidly while - background summaries are still resolving, ensuring no data corruption or - dropped variants. - ---- - -## Open Questions & Risks - -- **API Cost:** Eager compute means we might summarize an episode that the user - _never_ actually hits the context limit for. Should Eager Compute only begin - when `current > retained`, or truly immediately? (Recommendation: Start at - `retained` to save money, but `max` must be high enough above `retained` to - give the async workers time to finish). -- **Race Conditions:** If the user deletes a message via the UI (triggering - `AgentChatHistory.map/flatMap`), we must cleanly abort any pending Promises in - the background workers for those deleted IDs. diff --git a/packages/core/src/context/ASYNC_GC_STATUS_REPORT.md b/packages/core/src/context/ASYNC_GC_STATUS_REPORT.md deleted file mode 100644 index c6f5fef429..0000000000 --- a/packages/core/src/context/ASYNC_GC_STATUS_REPORT.md +++ /dev/null @@ -1,52 +0,0 @@ -# Asynchronous Context Management: Status Report & Bug Sweep - -_Date: End of Day 2 (Subconscious Memory Refactoring Complete)_ - -## 1. Inventory against Implementation Plan - -### ✅ Phase 1: Stable Identity & Incremental IR Mapping (100% Complete) - -- **Accomplished:** Implemented an `IdentityMap` (`WeakMap`) in `IrMapper`. -- **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based on the underlying `Content` object references. Re-parsing the history array no longer orphans background variants. -- **Testing:** Implemented an explicit `IrMapper.test.ts` unit test proving `WeakMap` identity stability across conversation growth. - -### ✅ Phase 2: Data Structures & Event Bus (100% Complete) - -- **Accomplished:** Added `variants?: Record` to `Episode` IR types. -- **Accomplished:** Created `ContextEventBus` class and instantiated it on `ContextManager`. -- **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation) on every `PUSH`. - -### ✅ Phase 3: Refactoring Processors into Async Workers (100% Complete) - -- **Accomplished:** Defined `AsyncContextWorker` interface. -- **Accomplished:** Refactored `StateSnapshotProcessor` into `StateSnapshotWorker`. It successfully listens to the bus, batches unprotected dying episodes, and emits a `VARIANT_READY` event. -- **Accomplished:** Replaced dummy execution with the actual `config.getBaseLlmClient().generateContent()` API call using `gemini-2.5-flash` and the `LlmRole.UTILITY_COMPRESSOR` telemetry role. -- **Accomplished:** Added robust `try/catch` and extensive `debugLogger.error` / `debugLogger.warn` logging to catch anomalous LLM failures without crashing the main loop. - -### ✅ Phase 4.1: Opportunistic Replacement Engine (100% Complete) - -- **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse from newest to oldest. When `rollingTokens > retainedTokens`, it successfully swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist. -- **Accomplished:** Implemented the `getWorkingBufferView()` sweep method. It perfectly resolves the N-to-1 Variant Targeting bug by injecting the snapshot and adding all `replacedEpisodeIds` to a `skippedIds` Set, cleanly dropping the older raw nodes from the final projection array. - -### ✅ Phase 4.2: The Synchronous Pressure Barrier (100% Complete) - -- **Accomplished:** Implemented the hard block at the end of `projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens` after all opportunistic swaps are applied. -- **Accomplished:** Reads the `mngConfig.budget.maxPressureStrategy` flag. Supports `truncate` (instantly dropping oldest unprotected episodes) and safely falls back if `compress` isn't fully wired synchronously yet. -- **Testing:** Wrote `contextManager.barrier.test.ts` to blast the system with ~200k tokens and verify the instant truncation successfully protects the System Prompt (Episode 0) and the current working context. - -### ✅ Phase 5: Configuration & Testing (100% Complete) - -- **Accomplished:** Exposed `maxPressureStrategy` in `settingsSchema.ts` and replaced the deprecated `incrementalGc` flag across the entire monorepo. -- **Accomplished:** Wrote extensive concurrency component tests in `contextManager.async.test.ts` to prove the async LLM Promise resolution does not block the main user thread, and handles the critical race condition of "User typing while background snapshotting" flawlessly. - ---- - -## 2. Bug Sweep & Architectural Review (Critical Findings Resolved) - -Both critical flaws discovered on Day 1 have been completely resolved: - -### ✅ Resolved Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting) -**The Fix:** The `getWorkingBufferView()` method tracks a `skippedIds` Set during its sweep. If it chooses a SnapshotVariant, it pushes all `replacedEpisodeIds` into the Set, cleanly skipping the raw text nodes on subsequent iterations. - -### ✅ Resolved Bug 2: Infinite RAM Growth (Pristine Graph Accumulation) -**The Fix:** The `checkTriggers()` method now calculates its token budget against the computed `WorkingBufferView` rather than the `pristineEpisodes` array. As soon as an async worker injects a snapshot, the calculated token count plummets natively, breaking the infinite GC loop while leaving the pristine log untouched. diff --git a/packages/core/src/context/SIDECAR_PIPELINE_DESIGN.md b/packages/core/src/context/SIDECAR_PIPELINE_DESIGN.md deleted file mode 100644 index f1ee65db15..0000000000 --- a/packages/core/src/context/SIDECAR_PIPELINE_DESIGN.md +++ /dev/null @@ -1,86 +0,0 @@ -# Data-Driven Context Pipeline (Sidecar Config) - -## 1. Motivation - -The Context Management subsystem has grown sophisticated, but its configuration -is currently entangled with the global CLI `Config` god-object and the static -`settingsSchema.ts`. This entanglement causes several problems: - -1. **Rigidity:** The order of processors (`ToolMasking` -> `Degradation` -> - `Semantic` -> `Squashing`) is hardcoded in TypeScript. -2. **Hyperparameter Bloat:** Every new tuning knob requires modifying the global - schema, UI dialogs, and types. -3. **Pipeline Isolation:** Background tasks like the `StateSnapshotWorker` were - isolated silos. They managed their own triggers and could not participate in a - sequential data pipeline (e.g. receiving degraded messages as input). - -## 2. Vision: The Orthogonal "Forking" Pipeline - -We will transition the Context Manager to be entirely configured by an independent, -strictly internal "Sidecar JSON" that represents a Directed Acyclic Graph (DAG) of -**Triggers** and **Processors**. - -By completely separating the "Execution Strategy" (when something runs) from the -"Data Transformation Logic" (what it does), we can arbitrarily compose tools. -Crucially, the architecture supports a **"Forking Pipeline" mechanic**: - -- **Synchronous Execution:** If all processors in a pipeline return `Episode[]`, - the orchestrator runs them inline and immediately returns the result (e.g. for - instant LLM prompting). -- **Asynchronous Forking (Eventual Consistency):** If a processor returns a - `Promise` (like a heavy LLM summarizer), the orchestrator immediately - halts the synchronous chain, returns the previously processed state to the caller - so the CLI doesn't freeze, and lets the rest of the pipeline continue resolving - in the background. When it finishes, it caches the result for the *next* turn. - -## 3. High-Level Architecture - -### A. The Sidecar Schema - -The sidecar JSON defines the **Budget** and an array of **Pipelines**. - -```json -{ - "budget": { - "retainedTokens": 65000, - "maxTokens": 150000 - }, - "pipelines": [ - { - "name": "Immediate Sanitization", - "triggers": ["on_turn"], - "processors": [ - { "processorId": "ToolMaskingProcessor", "options": { "stringLengthThresholdTokens": 8000 } }, - { "processorId": "BlobDegradationProcessor", "options": {} }, - { "processorId": "SemanticCompressionProcessor", "options": { "nodeThresholdTokens": 5000 } } - ] - }, - { - "name": "Deep Background Compression", - "triggers": [{ "type": "timer", "intervalMs": 5000 }, "budget_exceeded"], - "processors": [ - { "processorId": "HistorySquashingProcessor", "options": { "maxTokensPerNode": 3000 } }, - { "processorId": "StateSnapshotProcessor", "options": {} } - ] - } - ] -} -``` - -### B. Processor Registry & Reification - -To convert the JSON into a running graph, we use a dynamic registry. Every -processor implements the `ContextProcessor` interface and defines its own explicit Options. - -```typescript -export interface ContextProcessor { - process(episodes: Episode[]): Episode[] | Promise; -} -``` - -## 4. Implementation Phases - -- **Phase 1: Interfaces & Registry:** Define `PipelineDef`, `Trigger`, and a `ProcessorRegistry`. -- **Phase 2: Normalize Workers:** Demote `StateSnapshotWorker` into a standard `StateSnapshotProcessor` so it can be composed in any pipeline array. -- **Phase 3: The Pipeline Orchestrator:** Build the central orchestration engine that listens to triggers, pumps `pristineEpisodes` through the arrays, and handles the Sync/Async forking logic to ensure zero-blocking eventual consistency. -- **Phase 4: ContextManager Integration:** Wire the `ContextManager` to delegate execution and caching to the Orchestrator. diff --git a/packages/core/src/context/contextManager.golden.test.ts b/packages/core/src/context/contextManager.golden.test.ts index a46bec209d..4b4d45b058 100644 --- a/packages/core/src/context/contextManager.golden.test.ts +++ b/packages/core/src/context/contextManager.golden.test.ts @@ -130,11 +130,8 @@ describe('ContextManager Golden Tests', () => { const tracer2 = new ContextTracer('/tmp', 'test2'); contextManager = new ContextManager( { - pipelines: { - eagerBackground: [], - normalProcessingGraph: [], - retainedProcessingGraph: [], - }, + budget: { retainedTokens: 100000, maxTokens: 150000 }, + pipelines: [], } as any, {} as any, tracer2, diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index 5682011973..5a005469c7 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -21,12 +21,13 @@ import type { ContextEnvironment } from './sidecar/environment.js'; import type { SidecarConfig } from './sidecar/types.js'; import { ProcessorRegistry } from './sidecar/registry.js'; import { PipelineOrchestrator } from './sidecar/orchestrator.js'; -import type { ContextProcessor } from './pipeline.js'; + import { ToolMaskingProcessor } from './processors/toolMaskingProcessor.js'; import { BlobDegradationProcessor } from './processors/blobDegradationProcessor.js'; import { SemanticCompressionProcessor } from './processors/semanticCompressionProcessor.js'; import { HistorySquashingProcessor } from './processors/historySquashingProcessor.js'; +import { StateSnapshotProcessor } from './processors/stateSnapshotProcessor.js'; export class ContextManager { @@ -52,13 +53,14 @@ export class ContextManager { (this.env as any).setEventBus(this.eventBus); } - this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer); - - // Register built-ins + // Register built-ins BEFORE creating Orchestrator ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts as any) }); ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) }); ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts as any) }); ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts as any) }); + ProcessorRegistry.register({ id: 'StateSnapshotProcessor', create: (env, opts) => StateSnapshotProcessor.create(env, opts as any) }); + + this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer); this.eventBus.onVariantReady((event) => { @@ -153,28 +155,6 @@ export class ContextManager { */ private async applyProcessorGraphs(episodes: Episode[]): Promise { const mngConfig = this.sidecar; - const retainedLimit = mngConfig.budget.retainedTokens; - - - // If we're incredibly small, maybe we just run the retained graph on everything? - // Let's divide the episodes exactly at the retained boundary. - const retainedWindow: Episode[] = []; - const normalWindow: Episode[] = []; - let rollingTokens = 0; - - // Scan backwards to fill the retained window - for (let i = episodes.length - 1; i >= 0; i--) { - const ep = episodes[i]; - const epTokens = this.calculateIrTokens([ep]); - if ((rollingTokens + epTokens <= retainedLimit && normalWindow.length === 0) || retainedWindow.length === 0) { - // We always put at least the latest episode in the retained window. - // We only add to retainedWindow if we haven't already started the normalWindow (contiguous block). - retainedWindow.unshift(ep); - rollingTokens += epTokens; - } else { - normalWindow.unshift(ep); - } - } const protectedIds = new Set(); // We must protect the System Episode, which is always index 0 of pristineEpisodes. @@ -182,7 +162,9 @@ export class ContextManager { protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant } - return this.orchestrator.executePipelineForking('Immediate Sanitization', this.getWorkingBufferView(), { + let currentTokens = this.calculateIrTokens(episodes); + + return this.orchestrator.executePipelineForking('Immediate Sanitization', episodes, { currentTokens: currentTokens, maxTokens: mngConfig.budget.maxTokens, retainedTokens: mngConfig.budget.retainedTokens, diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 471f565f25..929138bf51 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -49,7 +49,7 @@ export class StateSnapshotProcessor implements ContextProcessor { const ep = episodes[i]; selectedEpisodes.push(ep); deficitAccumulator += estimateTokenCountSync([ - { text: ep.trigger?.parts?.[0]?.text ?? '' }, + { text: (ep.trigger as any)?.semanticParts?.[0]?.text ?? '' }, { text: ep.yield?.text ?? '' }, ]); if (deficitAccumulator >= targetDeficit) break; @@ -87,7 +87,7 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n'; for (const ep of episodes) { if (ep.trigger) { - userPromptText += `USER: ${ep.trigger.parts?.map((p: any) => p.text).join('')}\n`; + userPromptText += `USER: ${(ep.trigger as any).semanticParts?.map((p: any) => p.text).join('')}\n`; } for (const step of ep.steps) { if (step.type === 'TOOL_EXECUTION') { diff --git a/packages/core/src/context/sidecar/registry.ts b/packages/core/src/context/sidecar/registry.ts index 1ad9115b42..867612e108 100644 --- a/packages/core/src/context/sidecar/registry.ts +++ b/packages/core/src/context/sidecar/registry.ts @@ -5,7 +5,6 @@ */ import type { ContextProcessor } from '../pipeline.js'; -import type { AsyncContextWorker } from '../workers/asyncContextWorker.js'; import type { ContextEnvironment } from './environment.js'; export interface ContextProcessorDef< @@ -15,7 +14,7 @@ export interface ContextProcessorDef< create( env: ContextEnvironment, options: TOptions, - ): ContextProcessor | AsyncContextWorker; + ): ContextProcessor; } /**