building now

This commit is contained in:
Your Name
2026-04-06 18:01:32 +00:00
parent fcaa6c5584
commit 6867a96be0
8 changed files with 14 additions and 412 deletions
@@ -1,94 +0,0 @@
# Asynchronous Context Management (Dataflow Architecture)
## The Problem
Context management today is an emergency response. When a chat session hits the
maximum token limit (`maxTokens`), the system halts the user's request,
synchronously runs expensive compression pipelines (masking tools, summarizing
text with LLMs), and only proceeds when the token count falls below the limit.
This introduces unacceptable latency and forces trade-offs between speed and
data fidelity.
## The Vision: Eager Subconscious Compute
Instead of a reactive, synchronous pipeline, Context Management should be an
**asynchronous dataflow graph**.
Because we know old memory will _eventually_ need to be degraded or garbage
collected, we should utilize the agent's idle time (while the user is reading or
typing) to proactively compute "degraded variants" of episodes before there is
any context pressure.
### The Three Phases of Memory Lifecycle
#### 1. The Eager Compute Phase (Background / Continuous Streaming)
Context pressure doesn't wait for an episode to finish. If a user pastes a
100k-token file, the budget explodes instantly. Therefore, the dataflow graph is
fed continuously.
- Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped
into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a
`TOOL_EXECUTION` step) and broadcast immediately.
- **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background
workers.**
- They eagerly compute degraded variants on partial episodes. For instance,
`SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the
millisecond it arrives, without waiting for the model to reply.
- It attaches the result to the IR graph as
`Episode#1.trigger.variants.summary`.
- **Result:** This costs the user zero latency. The agent is
"dreaming/consolidating" granular memory chunks in the background, even during
long-running "mono-episodes."
#### 2. Opportunistic Replacement (`retainedTokens` Threshold)
When the active context window crosses the "ideal" size (e.g., 65k tokens):
- The system identifies the oldest episodes that have fallen outside the
`retained` window.
- It checks if they have pre-computed variants (e.g., a `summary` or `masked`
variant).
- If yes, it instantly and silently swaps the raw episode for the degraded
variant.
- **Result:** The context gently decays over time, completely avoiding hard
limits for as long as possible, with zero latency cost.
#### 3. The Pressure Barrier (`maxTokens` Hard Limit)
When the active context window crosses the absolute hard limit (e.g., 150k
tokens)—perhaps because the user pasted a massive file and the background
workers couldn't keep up—the system hits a **Synchronous Barrier**.
At this barrier, the `ContextManager` checks the user's configured
`ContextPressureStrategy` to decide how to unblock the request:
- **Strategy A: `truncate` (The Baseline)**
- _Behavior:_ Instantly drop the oldest episodes until under `maxTokens`.
- _Tradeoff:_ Maximum speed, maximum data loss.
- **Strategy B: `incrementalGc` (Progressive)**
- _Behavior:_ Look for any pre-computed summaries/masks. If none exist,
synchronously block to compute _just enough_ summaries to survive the
current turn.
- _Tradeoff:_ Medium speed, medium data retention.
- **Strategy C: `compress` (State Snapshot)**
- _Behavior:_ Identify the oldest N episodes causing the overflow. If their
N-to-1 World State Snapshot isn't ready yet, **block the user's request**
and force the `StateSnapshotProcessor` to generate it synchronously. Once
generated, replace the N episodes with the 1 snapshot and proceed.
- _Tradeoff:_ Maximum latency, maximum data retention/fidelity.
## Architectural Changes Required
1. **Episode Variants:** Update the `Episode` IR type to support a `variants`
dictionary.
2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to
dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of
`AgentChatHistory`).
3. **Processor Interface:** Change `ContextProcessor` from a synchronous
`process(episodes[])` function to an asynchronous worker that listens to the
event bus, updates the `variants` dictionary, and emits `VARIANT_READY`
events.
4. **Projection Logic:** Update `projectCompressedHistory()` to act as the
Pressure Barrier, reading the user's strategy and either applying ready
variants, waiting for variants, or truncating.
@@ -1,144 +0,0 @@
# Asynchronous Context Management Implementation Plan
This document outlines the step-by-step implementation plan for refactoring
`ContextManager` into a fully asynchronous, event-driven dataflow graph (Eager
Subconscious Compute).
---
## Phase 1: Stable Identity & Incremental IR Mapping
**The Problem:** Currently, `IrMapper.toIr()` is stateless. It generates random
UUIDs for `Episode` and `Step` nodes every time it parses the `Content[]` array.
If the array is rebuilt while an asynchronous processor is computing a summary,
the target ID will be lost, and the variant will be orphaned. **The Goal:**
Episodes must maintain a stable identity across turns so background workers can
confidently attach variants to them.
**Tasks:**
1. **Deterministic Hashing or Stateful Mapping:** Update `IrMapper` to either
generate deterministic UUIDs (e.g., hashing the part text/timestamp) OR make
`ContextManager`'s pristine graph mutable, where new `PUSH` events are
mapped _incrementally_ onto the tail of `this.pristineEpisodes` rather than
rebuilding the whole array.
2. **Test Update:** Ensure `IrMapper` tests verify stable IDs across successive
parse events.
---
## Phase 2: Data Structures & Event Bus
**The Problem:** The system lacks the internal types and communication channels
to support asynchronous variant generation. **The Goal:** Define the `Variant`
schemas and the internal `EventEmitter` that will broadcast graph updates to the
async workers.
**Tasks:**
1. **Variant Types:** Update `packages/core/src/context/ir/types.ts`.
- Add a `variants?: Record<string, Variant>` property to `Episode` and
`Step` (where `Variant` is a discriminated union of `SummaryVariant`,
`MaskedVariant`, `SnapshotVariant`, etc.).
- Include metadata on the variant:
`status: 'computing' | 'ready' | 'failed'`, `promise?: Promise<void>`,
`recoveredTokens: number`.
2. **Event Bus (`ContextEventBus`):**
- Create an internal event emitter in `ContextManager` (using
`events.EventEmitter` or a lightweight alternative).
- Define Events:
- `IR_NODE_CREATED`: Fired when a new Episode/Step is mapped. (Triggers
eager compute).
- `VARIANT_READY`: Fired by a worker when it finishes computing a
summary/snapshot.
- `BUDGET_RETAINED_CROSSED`: Fired when `currentTokens > retainedTokens`.
- `BUDGET_MAX_CROSSED`: Fired when `currentTokens > maxTokens`.
---
## Phase 3: Refactoring Processors into Async Workers
**The Problem:** Processors currently implement a synchronous
`process(episodes, state) -> Promise<Episode[]>` interface and block the main
loop. **The Goal:** Convert them into background workers that listen to the
`ContextEventBus`, perform LLM tasks asynchronously, and emit `VARIANT_READY`.
**Tasks:**
1. **Define `AsyncContextWorker` Interface:**
- `start(bus: ContextEventBus): void`
- `stop(): void`
2. **Implement `SemanticCompressionWorker`:**
- Listens to `IR_NODE_CREATED` (or `BUDGET_RETAINED_CROSSED` for lazier
eager compute).
- Batches old `USER_PROMPT` nodes.
- Calls LLM in background.
- Emits `VARIANT_READY` with the summary string and target Node IDs.
3. **Implement `StateSnapshotWorker`:**
- Listens to `BUDGET_RETAINED_CROSSED`.
- Identifies the N oldest raw episodes.
- Synthesizes them into a single `world_state_snapshot`.
- Emits `VARIANT_READY` containing the new Snapshot Episode and the IDs of
the N episodes it replaces.
4. **Wire Event Listeners:** `ContextManager` listens to `VARIANT_READY` and
updates the pristine graph's `variants` dictionary.
---
## Phase 4: The Projection Engine & Pressure Barrier
**The Problem:** `projectCompressedHistory()` currently runs the synchronous
pipeline. It needs to become the non-blocking opportunistic swapper and the
blocking pressure barrier. **The Goal:** Serve the LLM request instantly using
pre-computed variants, or block strictly according to the user's
`maxPressureStrategy`.
**Tasks:**
1. **Opportunistic Swap (`retainedTokens`):**
- When traversing `this.pristineEpisodes` to build the projected array, if
`currentTokens > retainedTokens`, check the oldest episodes.
- If an episode has a `variant.status === 'ready'`, use the variant's tokens
and text _instead_ of the raw episode.
2. **Pressure Barrier (`maxTokens`):**
- If the projected array is _still_ `> maxTokens` after all ready variants
are applied, hit the Barrier.
- Read `config.getContextManagementConfig().budget.maxPressureStrategy`.
- **If `truncate`:** Instantly drop the oldest episodes from the projection
until under budget. (Fastest).
- **If `incrementalGc`:** Await any variants that are
`status === 'computing'` for the oldest nodes until the deficit is
cleared. If none are computing, force a synchronous masking/truncation.
- **If `compress`:** Await the `StateSnapshotWorker`'s active `Promise`. If
it hasn't started, synchronously invoke it and block until the N-to-1
snapshot is ready.
---
## Phase 5: Configuration & Telemetry
**The Goal:** Expose the new strategies to the user and ensure we can observe
the background workers.
**Tasks:**
1. **Config Schema:** Update `settingsSchema.ts` to include
`maxPressureStrategy: 'truncate' | 'incrementalGc' | 'compress'`.
2. **Telemetry:** Log events when background workers start/finish, including
the tokens saved and the latency of the background task.
3. **Testing:** Write concurrency tests simulating a user typing rapidly while
background summaries are still resolving, ensuring no data corruption or
dropped variants.
---
## Open Questions & Risks
- **API Cost:** Eager compute means we might summarize an episode that the user
_never_ actually hits the context limit for. Should Eager Compute only begin
when `current > retained`, or truly immediately? (Recommendation: Start at
`retained` to save money, but `max` must be high enough above `retained` to
give the async workers time to finish).
- **Race Conditions:** If the user deletes a message via the UI (triggering
`AgentChatHistory.map/flatMap`), we must cleanly abort any pending Promises in
the background workers for those deleted IDs.
@@ -1,52 +0,0 @@
# Asynchronous Context Management: Status Report & Bug Sweep
_Date: End of Day 2 (Subconscious Memory Refactoring Complete)_
## 1. Inventory against Implementation Plan
### ✅ Phase 1: Stable Identity & Incremental IR Mapping (100% Complete)
- **Accomplished:** Implemented an `IdentityMap` (`WeakMap<object, string>`) in `IrMapper`.
- **Result:** `Episode` and `Step` nodes now receive deterministic UUIDs based on the underlying `Content` object references. Re-parsing the history array no longer orphans background variants.
- **Testing:** Implemented an explicit `IrMapper.test.ts` unit test proving `WeakMap` identity stability across conversation growth.
### ✅ Phase 2: Data Structures & Event Bus (100% Complete)
- **Accomplished:** Added `variants?: Record<string, Variant>` to `Episode` IR types.
- **Accomplished:** Created `ContextEventBus` class and instantiated it on `ContextManager`.
- **Accomplished:** Added `checkTriggers()` to emit `IR_CHUNK_RECEIVED` (for Eager Compute) and `BUDGET_RETAINED_CROSSED` (for Opportunistic Consolidation) on every `PUSH`.
### ✅ Phase 3: Refactoring Processors into Async Workers (100% Complete)
- **Accomplished:** Defined `AsyncContextWorker` interface.
- **Accomplished:** Refactored `StateSnapshotProcessor` into `StateSnapshotWorker`. It successfully listens to the bus, batches unprotected dying episodes, and emits a `VARIANT_READY` event.
- **Accomplished:** Replaced dummy execution with the actual `config.getBaseLlmClient().generateContent()` API call using `gemini-2.5-flash` and the `LlmRole.UTILITY_COMPRESSOR` telemetry role.
- **Accomplished:** Added robust `try/catch` and extensive `debugLogger.error` / `debugLogger.warn` logging to catch anomalous LLM failures without crashing the main loop.
### ✅ Phase 4.1: Opportunistic Replacement Engine (100% Complete)
- **Accomplished:** Rewrote the `projectCompressedHistory` sweep to traverse from newest to oldest. When `rollingTokens > retainedTokens`, it successfully swaps raw episodes for `variants` (Summary, Masked, Snapshot) if they exist.
- **Accomplished:** Implemented the `getWorkingBufferView()` sweep method. It perfectly resolves the N-to-1 Variant Targeting bug by injecting the snapshot and adding all `replacedEpisodeIds` to a `skippedIds` Set, cleanly dropping the older raw nodes from the final projection array.
### ✅ Phase 4.2: The Synchronous Pressure Barrier (100% Complete)
- **Accomplished:** Implemented the hard block at the end of `projectCompressedHistory()` if `currentTokens` still exceeds `maxTokens` after all opportunistic swaps are applied.
- **Accomplished:** Reads the `mngConfig.budget.maxPressureStrategy` flag. Supports `truncate` (instantly dropping oldest unprotected episodes) and safely falls back if `compress` isn't fully wired synchronously yet.
- **Testing:** Wrote `contextManager.barrier.test.ts` to blast the system with ~200k tokens and verify the instant truncation successfully protects the System Prompt (Episode 0) and the current working context.
### ✅ Phase 5: Configuration & Testing (100% Complete)
- **Accomplished:** Exposed `maxPressureStrategy` in `settingsSchema.ts` and replaced the deprecated `incrementalGc` flag across the entire monorepo.
- **Accomplished:** Wrote extensive concurrency component tests in `contextManager.async.test.ts` to prove the async LLM Promise resolution does not block the main user thread, and handles the critical race condition of "User typing while background snapshotting" flawlessly.
---
## 2. Bug Sweep & Architectural Review (Critical Findings Resolved)
Both critical flaws discovered on Day 1 have been completely resolved:
### ✅ Resolved Bug 1: The "Duplicate Projection" Flaw (N-to-1 Variant Targeting)
**The Fix:** The `getWorkingBufferView()` method tracks a `skippedIds` Set during its sweep. If it chooses a SnapshotVariant, it pushes all `replacedEpisodeIds` into the Set, cleanly skipping the raw text nodes on subsequent iterations.
### ✅ Resolved Bug 2: Infinite RAM Growth (Pristine Graph Accumulation)
**The Fix:** The `checkTriggers()` method now calculates its token budget against the computed `WorkingBufferView` rather than the `pristineEpisodes` array. As soon as an async worker injects a snapshot, the calculated token count plummets natively, breaking the infinite GC loop while leaving the pristine log untouched.
@@ -1,86 +0,0 @@
# Data-Driven Context Pipeline (Sidecar Config)
## 1. Motivation
The Context Management subsystem has grown sophisticated, but its configuration
is currently entangled with the global CLI `Config` god-object and the static
`settingsSchema.ts`. This entanglement causes several problems:
1. **Rigidity:** The order of processors (`ToolMasking` -> `Degradation` ->
`Semantic` -> `Squashing`) is hardcoded in TypeScript.
2. **Hyperparameter Bloat:** Every new tuning knob requires modifying the global
schema, UI dialogs, and types.
3. **Pipeline Isolation:** Background tasks like the `StateSnapshotWorker` were
isolated silos. They managed their own triggers and could not participate in a
sequential data pipeline (e.g. receiving degraded messages as input).
## 2. Vision: The Orthogonal "Forking" Pipeline
We will transition the Context Manager to be entirely configured by an independent,
strictly internal "Sidecar JSON" that represents a Directed Acyclic Graph (DAG) of
**Triggers** and **Processors**.
By completely separating the "Execution Strategy" (when something runs) from the
"Data Transformation Logic" (what it does), we can arbitrarily compose tools.
Crucially, the architecture supports a **"Forking Pipeline" mechanic**:
- **Synchronous Execution:** If all processors in a pipeline return `Episode[]`,
the orchestrator runs them inline and immediately returns the result (e.g. for
instant LLM prompting).
- **Asynchronous Forking (Eventual Consistency):** If a processor returns a
`Promise<Episode[]>` (like a heavy LLM summarizer), the orchestrator immediately
halts the synchronous chain, returns the previously processed state to the caller
so the CLI doesn't freeze, and lets the rest of the pipeline continue resolving
in the background. When it finishes, it caches the result for the *next* turn.
## 3. High-Level Architecture
### A. The Sidecar Schema
The sidecar JSON defines the **Budget** and an array of **Pipelines**.
```json
{
"budget": {
"retainedTokens": 65000,
"maxTokens": 150000
},
"pipelines": [
{
"name": "Immediate Sanitization",
"triggers": ["on_turn"],
"processors": [
{ "processorId": "ToolMaskingProcessor", "options": { "stringLengthThresholdTokens": 8000 } },
{ "processorId": "BlobDegradationProcessor", "options": {} },
{ "processorId": "SemanticCompressionProcessor", "options": { "nodeThresholdTokens": 5000 } }
]
},
{
"name": "Deep Background Compression",
"triggers": [{ "type": "timer", "intervalMs": 5000 }, "budget_exceeded"],
"processors": [
{ "processorId": "HistorySquashingProcessor", "options": { "maxTokensPerNode": 3000 } },
{ "processorId": "StateSnapshotProcessor", "options": {} }
]
}
]
}
```
### B. Processor Registry & Reification
To convert the JSON into a running graph, we use a dynamic registry. Every
processor implements the `ContextProcessor` interface and defines its own explicit Options.
```typescript
export interface ContextProcessor {
process(episodes: Episode[]): Episode[] | Promise<Episode[]>;
}
```
## 4. Implementation Phases
- **Phase 1: Interfaces & Registry:** Define `PipelineDef`, `Trigger`, and a `ProcessorRegistry`.
- **Phase 2: Normalize Workers:** Demote `StateSnapshotWorker` into a standard `StateSnapshotProcessor` so it can be composed in any pipeline array.
- **Phase 3: The Pipeline Orchestrator:** Build the central orchestration engine that listens to triggers, pumps `pristineEpisodes` through the arrays, and handles the Sync/Async forking logic to ensure zero-blocking eventual consistency.
- **Phase 4: ContextManager Integration:** Wire the `ContextManager` to delegate execution and caching to the Orchestrator.
@@ -130,11 +130,8 @@ describe('ContextManager Golden Tests', () => {
const tracer2 = new ContextTracer('/tmp', 'test2');
contextManager = new ContextManager(
{
pipelines: {
eagerBackground: [],
normalProcessingGraph: [],
retainedProcessingGraph: [],
},
budget: { retainedTokens: 100000, maxTokens: 150000 },
pipelines: [],
} as any,
{} as any,
tracer2,
+9 -27
View File
@@ -21,12 +21,13 @@ import type { ContextEnvironment } from './sidecar/environment.js';
import type { SidecarConfig } from './sidecar/types.js';
import { ProcessorRegistry } from './sidecar/registry.js';
import { PipelineOrchestrator } from './sidecar/orchestrator.js';
import type { ContextProcessor } from './pipeline.js';
import { ToolMaskingProcessor } from './processors/toolMaskingProcessor.js';
import { BlobDegradationProcessor } from './processors/blobDegradationProcessor.js';
import { SemanticCompressionProcessor } from './processors/semanticCompressionProcessor.js';
import { HistorySquashingProcessor } from './processors/historySquashingProcessor.js';
import { StateSnapshotProcessor } from './processors/stateSnapshotProcessor.js';
export class ContextManager {
@@ -52,13 +53,14 @@ export class ContextManager {
(this.env as any).setEventBus(this.eventBus);
}
this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer);
// Register built-ins
// Register built-ins BEFORE creating Orchestrator
ProcessorRegistry.register({ id: 'ToolMaskingProcessor', create: (env, opts) => new ToolMaskingProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'BlobDegradationProcessor', create: (env, opts) => new BlobDegradationProcessor(env) });
ProcessorRegistry.register({ id: 'SemanticCompressionProcessor', create: (env, opts) => new SemanticCompressionProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'HistorySquashingProcessor', create: (env, opts) => new HistorySquashingProcessor(env, opts as any) });
ProcessorRegistry.register({ id: 'StateSnapshotProcessor', create: (env, opts) => StateSnapshotProcessor.create(env, opts as any) });
this.orchestrator = new PipelineOrchestrator(this.sidecar, this.env, this.eventBus, this.tracer);
this.eventBus.onVariantReady((event) => {
@@ -153,28 +155,6 @@ export class ContextManager {
*/
private async applyProcessorGraphs(episodes: Episode[]): Promise<Episode[]> {
const mngConfig = this.sidecar;
const retainedLimit = mngConfig.budget.retainedTokens;
// If we're incredibly small, maybe we just run the retained graph on everything?
// Let's divide the episodes exactly at the retained boundary.
const retainedWindow: Episode[] = [];
const normalWindow: Episode[] = [];
let rollingTokens = 0;
// Scan backwards to fill the retained window
for (let i = episodes.length - 1; i >= 0; i--) {
const ep = episodes[i];
const epTokens = this.calculateIrTokens([ep]);
if ((rollingTokens + epTokens <= retainedLimit && normalWindow.length === 0) || retainedWindow.length === 0) {
// We always put at least the latest episode in the retained window.
// We only add to retainedWindow if we haven't already started the normalWindow (contiguous block).
retainedWindow.unshift(ep);
rollingTokens += epTokens;
} else {
normalWindow.unshift(ep);
}
}
const protectedIds = new Set<string>();
// We must protect the System Episode, which is always index 0 of pristineEpisodes.
@@ -182,7 +162,9 @@ export class ContextManager {
protectedIds.add(this.pristineEpisodes[0].id); // Structural invariant
}
return this.orchestrator.executePipelineForking('Immediate Sanitization', this.getWorkingBufferView(), {
let currentTokens = this.calculateIrTokens(episodes);
return this.orchestrator.executePipelineForking('Immediate Sanitization', episodes, {
currentTokens: currentTokens,
maxTokens: mngConfig.budget.maxTokens,
retainedTokens: mngConfig.budget.retainedTokens,
@@ -49,7 +49,7 @@ export class StateSnapshotProcessor implements ContextProcessor {
const ep = episodes[i];
selectedEpisodes.push(ep);
deficitAccumulator += estimateTokenCountSync([
{ text: ep.trigger?.parts?.[0]?.text ?? '' },
{ text: (ep.trigger as any)?.semanticParts?.[0]?.text ?? '' },
{ text: ep.yield?.text ?? '' },
]);
if (deficitAccumulator >= targetDeficit) break;
@@ -87,7 +87,7 @@ Output ONLY the raw factual snapshot, formatted compactly. Do not include markdo
let userPromptText = 'TRANSCRIPT TO SNAPSHOT:\n\n';
for (const ep of episodes) {
if (ep.trigger) {
userPromptText += `USER: ${ep.trigger.parts?.map((p: any) => p.text).join('')}\n`;
userPromptText += `USER: ${(ep.trigger as any).semanticParts?.map((p: any) => p.text).join('')}\n`;
}
for (const step of ep.steps) {
if (step.type === 'TOOL_EXECUTION') {
@@ -5,7 +5,6 @@
*/
import type { ContextProcessor } from '../pipeline.js';
import type { AsyncContextWorker } from '../workers/asyncContextWorker.js';
import type { ContextEnvironment } from './environment.js';
export interface ContextProcessorDef<
@@ -15,7 +14,7 @@ export interface ContextProcessorDef<
create(
env: ContextEnvironment,
options: TOptions,
): ContextProcessor | AsyncContextWorker;
): ContextProcessor;
}
/**