next steps

This commit is contained in:
Your Name
2026-04-07 01:57:36 +00:00
parent f423affe6d
commit 81c8dac01c
11 changed files with 395 additions and 294 deletions
@@ -0,0 +1,125 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Episode } from './types.js';
export interface MutationRecord {
episodeId: string;
type: 'modified' | 'inserted' | 'replaced' | 'deleted';
action: string;
originalIds?: string[]; // If replaced
episode?: Episode; // For new or modified
}
export class EpisodeEditor {
private originalMap: Map<string, Episode>;
private workingOrder: string[];
private workingMap: Map<string, Episode>;
private mutations: MutationRecord[] = [];
constructor(episodes: Episode[]) {
this.originalMap = new Map(episodes.map(e => [e.id, e]));
this.workingOrder = episodes.map(e => e.id);
this.workingMap = new Map(episodes.map(e => [e.id, e]));
}
/**
* Provides a readonly view of the current working state of the episodes.
* Processors should iterate over this to decide what to mutate.
*/
get episodes(): ReadonlyArray<Episode> {
return this.workingOrder.map(id => this.workingMap.get(id)!);
}
/**
* Safely edits an existing episode.
* The framework will handle deeply cloning the episode before passing it to the mutator,
* guaranteeing that original references are never modified.
*/
editEpisode(id: string, action: string, mutator: (draft: Episode) => void) {
const ep = this.workingMap.get(id);
if (!ep) return;
// Lazy deep clone only if it's the original reference
if (ep === this.originalMap.get(id)) {
const clone = structuredClone(ep);
this.workingMap.set(id, clone);
}
const draft = this.workingMap.get(id)!;
mutator(draft);
// Log mutation if not already tracked as modified/inserted/replaced
if (!this.mutations.find(m => m.episodeId === id)) {
this.mutations.push({ episodeId: id, type: 'modified', action, episode: draft });
}
}
/**
* Inserts a brand new episode into the graph at the specified index.
*/
insertEpisode(index: number, newEpisode: Episode, action: string) {
this.workingMap.set(newEpisode.id, newEpisode);
this.workingOrder.splice(index, 0, newEpisode.id);
this.mutations.push({ episodeId: newEpisode.id, type: 'inserted', action, episode: newEpisode });
}
/**
* Replaces a set of older episodes with a single new episode (e.g., a Summary or Snapshot).
* It inserts the new episode at the lowest index of the removed episodes.
*/
replaceEpisodes(oldIds: string[], newEpisode: Episode, action: string) {
const indices = oldIds.map(id => this.workingOrder.indexOf(id)).filter(i => i !== -1);
if (indices.length === 0) return;
const insertIndex = Math.min(...indices);
// Remove old
this.workingOrder = this.workingOrder.filter(id => !oldIds.includes(id));
for (const id of oldIds) {
this.workingMap.delete(id);
}
// Insert new
this.workingOrder.splice(insertIndex, 0, newEpisode.id);
this.workingMap.set(newEpisode.id, newEpisode);
this.mutations.push({
episodeId: newEpisode.id,
type: 'replaced',
action,
originalIds: oldIds,
episode: newEpisode
});
}
/**
* Removes episodes from the graph completely (e.g., emergency truncation).
*/
removeEpisodes(oldIds: string[], action: string) {
this.workingOrder = this.workingOrder.filter(id => !oldIds.includes(id));
for (const id of oldIds) {
this.workingMap.delete(id);
this.mutations.push({ episodeId: id, type: 'deleted', action });
}
}
/**
* Retrieves the final, finalized array of episodes.
* Called by the Orchestrator.
*/
getFinalEpisodes(): Episode[] {
return this.workingOrder.map(id => this.workingMap.get(id)!);
}
/**
* Retrieves a log of all structural and property mutations performed by this editor.
* Called by the Orchestrator to emit VariantReady events.
*/
getMutations(): MutationRecord[] {
return this.mutations;
}
}
+6 -4
View File
@@ -6,6 +6,8 @@
import type { Episode } from './ir/types.js';
import type { EpisodeEditor } from './ir/episodeEditor.js';
/**
* State object passed through the processing pipeline.
* Contains global accounting logic and semantic protection rules.
@@ -38,11 +40,11 @@ export interface ContextProcessor {
readonly name: string;
/**
* Processes the episodic history payload based on the current accounting state.
* Processors should return a new or mutated array of episodes.
* Processes the episodic history payload via the provided EpisodeEditor, based on the current accounting state.
* Processors should safely mutate or replace episodes using the editor's API.
*/
process(
episodes: Episode[],
editor: EpisodeEditor,
state: ContextAccountingState,
): Promise<Episode[]>;
): Promise<void>;
}
@@ -10,6 +10,8 @@ import type { ContextEnvironment } from '../sidecar/environment.js';
import { sanitizeFilenamePart } from '../../utils/fileUtils.js';
import type { Part } from '@google/genai';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export class BlobDegradationProcessor implements ContextProcessor {
readonly name = 'BlobDegradation';
private env: ContextEnvironment;
@@ -19,15 +21,14 @@ export class BlobDegradationProcessor implements ContextProcessor {
}
async process(
episodes: Episode[],
editor: EpisodeEditor,
state: ContextAccountingState,
): Promise<Episode[]> {
): Promise<void> {
if (state.isBudgetSatisfied) {
return episodes;
return;
}
let currentDeficit = state.deficitTokens;
const newEpisodes = [...episodes];
let directoryCreated = false;
let blobOutputsDir = this.env.fileSystem.join(
@@ -50,13 +51,13 @@ export class BlobDegradationProcessor implements ContextProcessor {
};
// Forward scan, looking for bloated non-text parts to degrade
for (let i = 0; i < newEpisodes.length; i++) {
for (const ep of editor.episodes) {
if (currentDeficit <= 0) break;
const ep = newEpisodes[i];
if (state.protectedEpisodeIds.has(ep.id)) continue;
if (ep.trigger.type === 'USER_PROMPT') {
for (const part of ep.trigger.semanticParts) {
for (let j = 0; j < ep.trigger.semanticParts.length; j++) {
const part = ep.trigger.semanticParts[j];
if (currentDeficit <= 0) break;
// We only target non-text parts that haven't already been masked
if (part.type === 'text' || part.presentation) continue;
@@ -100,12 +101,16 @@ export class BlobDegradationProcessor implements ContextProcessor {
if (newText && tokensSaved > 0) {
const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: newText }]);
part.presentation = { text: newText, tokens: newTokens };
ep.trigger.metadata.transformations.push({
processorName: this.name,
action: 'DEGRADED',
timestamp: Date.now(),
editor.editEpisode(ep.id, 'DEGRADE_BLOB', (draft) => {
if (draft.trigger.type === 'USER_PROMPT') {
draft.trigger.semanticParts[j].presentation = { text: newText, tokens: newTokens };
draft.trigger.metadata.transformations.push({
processorName: this.name,
action: 'DEGRADED',
timestamp: Date.now(),
});
}
});
currentDeficit -= tokensSaved;
@@ -113,7 +118,5 @@ export class BlobDegradationProcessor implements ContextProcessor {
}
}
}
return newEpisodes;
}
}
@@ -9,6 +9,8 @@ import type { Episode } from '../ir/types.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export interface EmergencyTruncationProcessorOptions {}
export class EmergencyTruncationProcessor implements ContextProcessor {
@@ -23,25 +25,25 @@ export class EmergencyTruncationProcessor implements ContextProcessor {
this.options = options;
}
async process(episodes: Episode[], state: ContextAccountingState): Promise<Episode[]> {
if (state.currentTokens <= state.maxTokens) return episodes;
async process(editor: EpisodeEditor, state: ContextAccountingState): Promise<void> {
if (state.currentTokens <= state.maxTokens) return;
let remainingTokens = state.currentTokens;
const targetTokens = state.maxTokens;
const truncated: Episode[] = [];
const toRemove: string[] = [];
// We respect the global protected Episode IDs (like the system prompt at index 0)
for (const ep of episodes) {
for (const ep of editor.episodes) {
const epTokens = this._env.tokenCalculator.calculateEpisodeListTokens([ep]);
if (remainingTokens > targetTokens && !state.protectedEpisodeIds.has(ep.id)) {
remainingTokens -= epTokens;
// Dropped! We do not add it to the truncated array.
} else {
truncated.push(ep);
toRemove.push(ep.id);
}
}
return truncated;
if (toRemove.length > 0) {
editor.removeEpisodes(toRemove, 'TRUNCATED');
}
}
}
@@ -47,11 +47,11 @@ export class HistorySquashingProcessor implements ContextProcessor {
}
async process(
episodes: Episode[],
editor: EpisodeEditor,
state: ContextAccountingState,
): Promise<Episode[]> {
): Promise<void> {
if (state.isBudgetSatisfied) {
return episodes;
return;
}
const { maxTokensPerNode } = this.options;
@@ -60,29 +60,36 @@ export class HistorySquashingProcessor implements ContextProcessor {
// We track how many tokens we still need to cut. If we hit 0, we can stop early!
let currentDeficit = state.deficitTokens;
const newEpisodes = [...episodes];
for (let i = 0; i < newEpisodes.length; i++) {
for (const ep of editor.episodes) {
if (currentDeficit <= 0) break;
if (state.protectedEpisodeIds.has(newEpisodes[i].id)) continue;
const ep = newEpisodes[i];
if (state.protectedEpisodeIds.has(ep.id)) continue;
// 1. Squash User Prompts
if (ep.trigger.type === 'USER_PROMPT') {
for (const part of ep.trigger.semanticParts) {
for (let j = 0; j < ep.trigger.semanticParts.length; j++) {
const part = ep.trigger.semanticParts[j];
if (part.type === 'text') {
const saved = this.tryApplySquash(
part.text,
limitChars,
currentDeficit,
(p) => (part.presentation = p),
() =>
ep.trigger.metadata.transformations.push({
processorName: this.name,
action: 'TRUNCATED',
timestamp: Date.now(),
}),
(p) => {
editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => {
if (draft.trigger.type === 'USER_PROMPT') {
draft.trigger.semanticParts[j].presentation = p;
}
});
},
() => {
editor.editEpisode(ep.id, 'SQUASH_PROMPT', (draft) => {
draft.trigger.metadata.transformations.push({
processorName: this.name,
action: 'TRUNCATED',
timestamp: Date.now(),
});
});
}
);
currentDeficit -= saved;
}
@@ -90,22 +97,38 @@ export class HistorySquashingProcessor implements ContextProcessor {
}
// 2. Squash Model Thoughts
for (const step of ep.steps) {
if (currentDeficit <= 0) break;
if (step.type === 'AGENT_THOUGHT') {
const saved = this.tryApplySquash(
step.text,
limitChars,
currentDeficit,
(p) => (step.presentation = p),
() =>
step.metadata.transformations.push({
processorName: this.name,
action: 'TRUNCATED',
timestamp: Date.now(),
}),
);
currentDeficit -= saved;
if (ep.steps) {
for (let j = 0; j < ep.steps.length; j++) {
const step = ep.steps[j];
if (currentDeficit <= 0) break;
if (step.type === 'AGENT_THOUGHT') {
const saved = this.tryApplySquash(
step.text,
limitChars,
currentDeficit,
(p) => {
editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => {
const draftStep = draft.steps![j];
if (draftStep.type === 'AGENT_THOUGHT') {
draftStep.presentation = p;
}
});
},
() => {
editor.editEpisode(ep.id, 'SQUASH_THOUGHT', (draft) => {
const draftStep = draft.steps![j];
if (draftStep.type === 'AGENT_THOUGHT') {
draftStep.metadata.transformations.push({
processorName: this.name,
action: 'TRUNCATED',
timestamp: Date.now(),
});
}
});
}
);
currentDeficit -= saved;
}
}
}
@@ -115,18 +138,25 @@ export class HistorySquashingProcessor implements ContextProcessor {
ep.yield.text,
limitChars,
currentDeficit,
(p) => (ep.yield!.presentation = p),
() =>
ep.yield!.metadata.transformations.push({
processorName: this.name,
action: 'TRUNCATED',
timestamp: Date.now(),
}),
(p) => {
editor.editEpisode(ep.id, 'SQUASH_YIELD', (draft) => {
if (draft.yield) draft.yield.presentation = p;
});
},
() => {
editor.editEpisode(ep.id, 'SQUASH_YIELD', (draft) => {
if (draft.yield) {
draft.yield.metadata.transformations.push({
processorName: this.name,
action: 'TRUNCATED',
timestamp: Date.now(),
});
}
});
}
);
currentDeficit -= saved;
}
}
return newEpisodes;
}
}
@@ -12,6 +12,8 @@ import { LlmRole } from '../../telemetry/types.js';
import { getResponseText } from '../../utils/partUtils.js';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export class SemanticCompressionProcessor implements ContextProcessor {
readonly name = 'SemanticCompression';
private env: ContextEnvironment;
@@ -27,12 +29,12 @@ export class SemanticCompressionProcessor implements ContextProcessor {
}
async process(
episodes: Episode[],
editor: EpisodeEditor,
state: ContextAccountingState,
): Promise<Episode[]> {
): Promise<void> {
// If the budget is satisfied, or semantic compression isn't enabled
if (state.isBudgetSatisfied) {
return episodes;
return;
}
const semanticConfig = this.options;
@@ -41,17 +43,16 @@ export class SemanticCompressionProcessor implements ContextProcessor {
this.modelToUse = 'gemini-2.5-flash';
let currentDeficit = state.deficitTokens;
const newEpisodes = [...episodes];
// We scan backwards (oldest to newest would also work, but older is safer to degrade first)
for (let i = 0; i < newEpisodes.length; i++) {
for (const ep of editor.episodes) {
if (currentDeficit <= 0) break;
const ep = newEpisodes[i];
if (state.protectedEpisodeIds.has(ep.id)) continue;
// 1. Compress User Prompts
if (ep.trigger.type === 'USER_PROMPT') {
for (const part of ep.trigger.semanticParts) {
for (let j = 0; j < ep.trigger.semanticParts.length; j++) {
const part = ep.trigger.semanticParts[j];
if (currentDeficit <= 0) break;
if (part.type !== 'text') continue;
// If it's already got a presentation, we don't want to re-summarize a summary
@@ -66,11 +67,15 @@ export class SemanticCompressionProcessor implements ContextProcessor {
const oldTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: part.text }]);
if (newTokens < oldTokens) {
part.presentation = { text: summary, tokens: newTokens };
ep.trigger.metadata.transformations.push({
processorName: this.name,
action: 'SUMMARIZED',
timestamp: Date.now(),
editor.editEpisode(ep.id, 'SUMMARIZE_PROMPT', (draft) => {
if (draft.trigger.type === 'USER_PROMPT') {
draft.trigger.semanticParts[j].presentation = { text: summary, tokens: newTokens };
draft.trigger.metadata.transformations.push({
processorName: this.name,
action: 'SUMMARIZED',
timestamp: Date.now(),
});
}
});
currentDeficit -= oldTokens - newTokens;
}
@@ -79,91 +84,104 @@ export class SemanticCompressionProcessor implements ContextProcessor {
}
// 2. Compress Model Thoughts
for (const step of ep.steps) {
if (currentDeficit <= 0) break;
if (step.type === 'AGENT_THOUGHT') {
if (step.presentation) continue;
if (step.text.length > thresholdChars) {
const summary = await this.generateSummary(
step.text,
'Agent Thought',
);
const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: summary }]);
const oldTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: step.text }]);
if (ep.steps) {
for (let j = 0; j < ep.steps.length; j++) {
const step = ep.steps[j];
if (currentDeficit <= 0) break;
if (step.type === 'AGENT_THOUGHT') {
if (step.presentation) continue;
if (step.text.length > thresholdChars) {
const summary = await this.generateSummary(
step.text,
'Agent Thought',
);
const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: summary }]);
const oldTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: step.text }]);
if (newTokens < oldTokens) {
step.presentation = { text: summary, tokens: newTokens };
step.metadata.transformations.push({
processorName: this.name,
action: 'SUMMARIZED',
timestamp: Date.now(),
});
currentDeficit -= oldTokens - newTokens;
}
}
}
// 3. Compress Tool Observations
if (step.type === 'TOOL_EXECUTION') {
const rawObs = step.presentation?.observation ?? step.observation;
let stringifiedObs = '';
if (typeof rawObs === 'string') {
stringifiedObs = rawObs;
} else {
try {
stringifiedObs = JSON.stringify(rawObs);
} catch (_e) {
stringifiedObs = String(rawObs);
if (newTokens < oldTokens) {
editor.editEpisode(ep.id, 'SUMMARIZE_THOUGHT', (draft) => {
const draftStep = draft.steps![j];
if (draftStep.type === 'AGENT_THOUGHT') {
draftStep.presentation = { text: summary, tokens: newTokens };
draftStep.metadata.transformations.push({
processorName: this.name,
action: 'SUMMARIZED',
timestamp: Date.now(),
});
}
});
currentDeficit -= oldTokens - newTokens;
}
}
}
if (
stringifiedObs.length > thresholdChars &&
!stringifiedObs.includes('<tool_output_masked>')
) {
const summary = await this.generateSummary(
stringifiedObs,
`Tool Output (${step.toolName})`,
);
// 3. Compress Tool Observations
if (step.type === 'TOOL_EXECUTION') {
const rawObs = step.presentation?.observation ?? step.observation;
// Wrap the summary in an object so the Gemini API accepts it as a valid functionResponse.response
const newObsObject = { summary };
let stringifiedObs = '';
if (typeof rawObs === 'string') {
stringifiedObs = rawObs;
} else {
try {
stringifiedObs = JSON.stringify(rawObs);
} catch (_e) {
stringifiedObs = String(rawObs);
}
}
const newObsTokens = this.env.tokenCalculator.estimateTokensForParts([
{
functionResponse: {
name: step.toolName,
response: newObsObject as unknown as Record<string, unknown>, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
id: step.id,
if (
stringifiedObs.length > thresholdChars &&
!stringifiedObs.includes('<tool_output_masked>')
) {
const summary = await this.generateSummary(
stringifiedObs,
`Tool Output (${step.toolName})`,
);
// Wrap the summary in an object so the Gemini API accepts it as a valid functionResponse.response
const newObsObject = { summary };
const newObsTokens = this.env.tokenCalculator.estimateTokensForParts([
{
functionResponse: {
name: step.toolName,
response: newObsObject as unknown as Record<string, unknown>, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
id: step.id,
},
},
},
]);
]);
const oldObsTokens =
step.presentation?.tokens.observation ?? step.tokens.observation;
const intentTokens =
step.presentation?.tokens.intent ?? step.tokens.intent;
const oldObsTokens =
step.presentation?.tokens?.observation ?? step.tokens?.observation ?? step.tokens;
const intentTokens =
step.presentation?.tokens?.intent ?? step.tokens?.intent ?? 0;
if (newObsTokens < oldObsTokens) {
step.presentation = {
intent: step.presentation?.intent ?? step.intent,
observation: newObsObject,
tokens: { intent: intentTokens, observation: newObsTokens },
};
step.metadata.transformations.push({
processorName: this.name,
action: 'SUMMARIZED',
timestamp: Date.now(),
});
currentDeficit -= oldObsTokens - newObsTokens;
if (newObsTokens < oldObsTokens) {
editor.editEpisode(ep.id, 'SUMMARIZE_TOOL', (draft) => {
const draftStep = draft.steps![j];
if (draftStep.type === 'TOOL_EXECUTION') {
draftStep.presentation = {
intent: draftStep.presentation?.intent ?? draftStep.intent,
observation: newObsObject,
tokens: { intent: intentTokens as number, observation: newObsTokens },
};
if (!draftStep.metadata) { draftStep.metadata = { transformations: [] } };
if (!draftStep.metadata.transformations) { draftStep.metadata.transformations = [] };
draftStep.metadata.transformations.push({
processorName: this.name,
action: 'SUMMARIZED',
timestamp: Date.now(),
});
}
});
currentDeficit -= oldObsTokens - newObsTokens;
}
}
}
}
}
}
return newEpisodes;
}
private async generateSummary(
@@ -12,6 +12,8 @@ import { v4 as uuidv4 } from 'uuid';
import { LlmRole } from '../../telemetry/llmRole.js';
import { debugLogger } from 'src/utils/debugLogger.js';
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export interface StateSnapshotProcessorOptions {
model?: string;
systemInstruction?: string;
@@ -37,17 +39,17 @@ export class StateSnapshotProcessor implements ContextProcessor {
this.options = options;
}
async process(episodes: Episode[], state: ContextAccountingState): Promise<Episode[]> {
async process(editor: EpisodeEditor, state: ContextAccountingState): Promise<void> {
const targetDeficit = Math.max(0, state.currentTokens - state.retainedTokens);
if (this.isSynthesizing || targetDeficit <= 0) return episodes;
if (this.isSynthesizing || targetDeficit <= 0) return;
this.isSynthesizing = true;
try {
let deficitAccumulator = 0;
const selectedEpisodes: Episode[] = [];
for (let i = 1; i < episodes.length - 1; i++) {
const ep = episodes[i];
for (let i = 1; i < editor.episodes.length - 1; i++) {
const ep = editor.episodes[i];
selectedEpisodes.push(ep);
deficitAccumulator += this.env.tokenCalculator.estimateTokensForParts([
{ text: (ep.trigger as any)?.semanticParts?.[0]?.text ?? '' },
@@ -56,21 +58,14 @@ export class StateSnapshotProcessor implements ContextProcessor {
if (deficitAccumulator >= targetDeficit) break;
}
if (selectedEpisodes.length < 2) return episodes; // Not enough context to summarize
if (selectedEpisodes.length < 2) return; // Not enough context to summarize
// Optimization: Do NOT emit VariantComputing, let the Orchestrator handle caching the final result.
const snapshotEp: Episode = await this.synthesizeSnapshot(selectedEpisodes);
const newEpisodes = [...episodes];
const oldIds = selectedEpisodes.map(ep => ep.id);
editor.replaceEpisodes(oldIds, snapshotEp, 'STATE_SNAPSHOT');
// Calculate indices to splice
const firstIndex = newEpisodes.findIndex(e => e.id === selectedEpisodes[0].id);
if (firstIndex !== -1) {
newEpisodes.splice(firstIndex, selectedEpisodes.length, snapshotEp);
}
return newEpisodes;
} finally {
this.isSynthesizing = false;
}
@@ -25,10 +25,12 @@ const UNMASKABLE_TOOLS = new Set([
EXIT_PLAN_MODE_TOOL_NAME,
]);
import type { EpisodeEditor } from '../ir/episodeEditor.js';
export class ToolMaskingProcessor implements ContextProcessor {
readonly name = 'ToolMasking';
private env: ContextEnvironment;
private options: { stringLengthThresholdTokens: number };
private env: ContextEnvironment;
constructor(
env: ContextEnvironment,
@@ -39,14 +41,13 @@ export class ToolMaskingProcessor implements ContextProcessor {
}
async process(
episodes: Episode[],
editor: EpisodeEditor,
state: ContextAccountingState,
): Promise<Episode[]> {
): Promise<void> {
const maskingConfig = this.options;
if (!maskingConfig) return episodes;
if (state.isBudgetSatisfied) return episodes;
if (!maskingConfig) return;
if (state.isBudgetSatisfied) return;
const newEpisodes = [...episodes];
let currentDeficit = state.deficitTokens;
const limitChars = this.env.tokenCalculator.tokensToChars(maskingConfig.stringLengthThresholdTokens);
@@ -92,9 +93,8 @@ export class ToolMaskingProcessor implements ContextProcessor {
};
// Forward scan, looking for massive intents or observations to mask
for (let i = 0; i < newEpisodes.length; i++) {
for (const ep of editor.episodes) {
if (currentDeficit <= 0) break;
const ep = newEpisodes[i];
if (!ep || !ep.steps || state.protectedEpisodeIds.has(ep.id)) continue;
for (let j = 0; j < ep.steps.length; j++) {
@@ -167,9 +167,6 @@ export class ToolMaskingProcessor implements ContextProcessor {
);
if (intentRes.changed || obsRes.changed) {
step.presentation.intent = intentRes.masked;
step.presentation.observation = obsRes.masked;
// Recalculate tokens perfectly
const newIntentTokens = this.env.tokenCalculator.estimateTokensForParts([
{
@@ -200,22 +197,41 @@ export class ToolMaskingProcessor implements ContextProcessor {
const savings = oldTotal - newTotal;
if (savings > 0) {
step.presentation.tokens = {
intent: newIntentTokens,
observation: newObsTokens,
};
step.metadata.transformations.push({
processorName: 'ToolMasking',
action: 'MASKED',
timestamp: Date.now(),
});
currentDeficit -= savings;
this.env.tracer.logEvent('ToolMaskingProcessor', `Masked tool ${toolName}`, { recoveredTokens: savings });
editor.editEpisode(ep.id, 'MASK_TOOL', (draft) => {
const draftStep = draft.steps![j];
if (draftStep.type !== 'TOOL_EXECUTION') return;
if (!draftStep.presentation) {
draftStep.presentation = {
intent: draftStep.intent,
observation: draftStep.observation,
tokens: draftStep.tokens,
};
}
draftStep.presentation.intent = intentRes.masked;
draftStep.presentation.observation = obsRes.masked;
draftStep.presentation.tokens = {
intent: newIntentTokens,
observation: newObsTokens,
};
draftStep.metadata = {
...draftStep.metadata,
transformations: [
...(draftStep.metadata?.transformations || []),
{
processorName: 'ToolMasking',
action: 'MASKED',
timestamp: Date.now(),
}
]
};
});
}
}
}
}
return newEpisodes;
}
private isAlreadyMasked(content: string): boolean {
@@ -10,6 +10,7 @@ import type { SidecarConfig, PipelineDef } from './types.js';
import type { ContextEnvironment, ContextEventBus, ContextTracer } from './environment.js';
import { ProcessorRegistry } from './registry.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { EpisodeEditor } from '../ir/episodeEditor.js';
export class PipelineOrchestrator {
private activeTimers: NodeJS.Timeout[] = [];
@@ -109,7 +110,9 @@ export class PipelineOrchestrator {
try {
this.tracer.logEvent('Orchestrator', `Executing processor: ${procDef.processorId}`);
currentEpisodes = await processor.process(currentEpisodes, state);
const editor = new EpisodeEditor(currentEpisodes);
await processor.process(editor, state);
currentEpisodes = editor.getFinalEpisodes();
} catch (error) {
debugLogger.error(`Pipeline ${pipeline.name} failed synchronously at ${procDef.processorId}:`, error);
return currentEpisodes; // Return what we have so far
@@ -135,28 +138,24 @@ export class PipelineOrchestrator {
try {
this.tracer.logEvent('Orchestrator', `Executing processor: ${procDef.processorId} (async)`);
// Before running, capture the state so we know what changed
const beforeMap = new Map(currentEpisodes.map(ep => [ep.id, ep]));
currentEpisodes = await processor.process(currentEpisodes, state);
const editor = new EpisodeEditor(currentEpisodes);
await processor.process(editor, state);
currentEpisodes = editor.getFinalEpisodes();
// Synthesize VariantReady events for anything that changed or was newly created
for (const ep of currentEpisodes) {
const original = beforeMap.get(ep.id);
// If an episode was transformed, or if it's a completely new synthetic episode (like a Snapshot)
// we need to broadcast it so the ContextManager can cache it as a variant.
if (!original || original !== ep) {
for (const mutation of editor.getMutations()) {
// We only broadcast modifications or replacements
// (Insertions without replacement and deletions are not tracked as variants on an existing node)
if (mutation.type === 'modified' || mutation.type === 'replaced') {
const variantId = `v-${procDef.processorId.toLowerCase()}`;
// Determine variant type. StateSnapshot generates full 'snapshot' replacement nodes.
// Masking/Squashing generate 'masked' or 'summary' in-place variants.
let vType: 'snapshot' | 'summary' | 'masked' = 'masked';
if (procDef.processorId.includes('Snapshot')) vType = 'snapshot';
else if (procDef.processorId.includes('Semantic')) vType = 'summary';
const ep = mutation.episode!;
this.eventBus.emitVariantReady({
targetId: ep.id, // The ID of the modified or new episode
targetId: mutation.type === 'replaced' ? mutation.originalIds![0] : ep.id,
variantId,
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
variant: {
@@ -165,8 +164,7 @@ export class PipelineOrchestrator {
episode: vType === 'snapshot' ? ep : undefined,
text: vType !== 'snapshot' ? (ep.yield?.text || (ep.trigger as any)?.semanticParts?.[0]?.presentation?.text || '') : undefined,
recoveredTokens: ep.yield?.metadata?.currentTokens || 10,
// For snapshots, we look at the transformations metadata to see what it replaced
replacedEpisodeIds: vType === 'snapshot' ? currentState.map(c => c.id).filter(id => id !== ep.id && !currentEpisodes.find(ce => ce.id === id)) : undefined,
replacedEpisodeIds: mutation.originalIds,
} as any
});
}
@@ -120,8 +120,9 @@ export class SimulationHarness {
}
// Inject the truncated view back into the graph
for (const ep of currentView) {
if (!currentHistory.find(c => c === ep)) {
for (let i = 0; i < currentView.length; i++) {
const ep = currentView[i];
if (!this.contextManager.getWorkingBufferView().find(c => c.id === ep.id)) {
this.eventBus.emitVariantReady({
targetId: ep.id,
variantId: 'v-emergency',
File diff suppressed because one or more lines are too long