This commit is contained in:
Your Name
2026-04-08 23:37:46 +00:00
parent fd5a703684
commit 6e7987696f
17 changed files with 125 additions and 243 deletions
+1 -8
View File
@@ -85,14 +85,7 @@ export class IrProjector {
'gc_backstop',
ship,
agedOutNodes,
{
currentTokens,
maxTokens: sidecar.budget.maxTokens,
retainedTokens: sidecar.budget.retainedTokens,
deficitTokens: Math.max(0, currentTokens - sidecar.budget.maxTokens),
protectedLogicalIds: protectedIds,
isBudgetSatisfied: currentTokens <= sidecar.budget.maxTokens,
},
protectedIds,
);
const finalTokens =
-10
View File
@@ -24,19 +24,9 @@ export interface ContextWorkingBuffer {
getLineage(id: string): readonly ConcreteNode[];
}
export interface ContextAccountingState {
readonly currentTokens: number;
readonly maxTokens: number;
readonly retainedTokens: number;
readonly deficitTokens: number;
readonly protectedLogicalIds: ReadonlySet<string>;
readonly isBudgetSatisfied: boolean;
}
export interface ProcessArgs {
readonly buffer: ContextWorkingBuffer;
readonly targets: readonly ConcreteNode[];
readonly state: ContextAccountingState;
readonly inbox: InboxSnapshot;
}
@@ -7,7 +7,6 @@ import { describe, it, expect, vi } from 'vitest';
import { BlobDegradationProcessor } from './blobDegradationProcessor.js';
import {
createMockEnvironment,
createDummyState,
createDummyNode,
} from '../testing/contextTestUtils.js';
import type { UserPrompt, SemanticPart } from '../ir/types.js';
@@ -24,7 +23,6 @@ describe('BlobDegradationProcessor', () => {
const processor = BlobDegradationProcessor.create(env, {});
// Deficit of 50 means budget is NOT satisfied.
const state = createDummyState(false, 50);
const parts: SemanticPart[] = [
{ type: 'text', text: 'Hello' },
@@ -41,7 +39,6 @@ describe('BlobDegradationProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as unknown as import('../pipeline.js').InboxSnapshot,
});
@@ -65,10 +62,8 @@ describe('BlobDegradationProcessor', () => {
// The transformation should be logged
});
it('should stop degrading once the deficit is cleared', async () => {
it('should degrade all blobs unconditionally', async () => {
const env = createMockEnvironment();
// Huge deficit requires one degradation
const state = createDummyState(false, 90);
env.tokenCalculator.estimateTokensForParts = vi.fn((parts: import('@google/genai').Part[]) => {
if (parts[0].text) return 10;
@@ -89,36 +84,26 @@ describe('BlobDegradationProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as unknown as import('../pipeline.js').InboxSnapshot,
});
const modifiedPrompt = result[0] as UserPrompt;
expect(modifiedPrompt.semanticParts.length).toBe(2);
// First part was degraded
// Both parts should be degraded
expect(modifiedPrompt.semanticParts[0].type).toBe('text');
// Second part was untouched because deficit hit 0
expect(modifiedPrompt.semanticParts[1].type).toBe('file_data');
expect(modifiedPrompt.semanticParts[1].type).toBe('text');
});
it('should return exactly the targets array if budget is already satisfied', async () => {
it('should return exactly the targets array if targets are empty', async () => {
const env = createMockEnvironment();
const state = createDummyState(true, 0); // Budget satisfied!
const processor = BlobDegradationProcessor.create(env, {});
const prompt = createDummyNode('ep1', 'USER_PROMPT', 100, {
semanticParts: [
{ type: 'inline_data', mimeType: 'image/png', data: 'abc' },
]
}) as UserPrompt;
const targets = [prompt];
const targets: import("../ir/types.js").ConcreteNode[] = [];
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as unknown as import('../pipeline.js').InboxSnapshot,
});
@@ -27,12 +27,11 @@ export class BlobDegradationProcessor implements ContextProcessor {
this.env = env;
}
async process({ targets, state }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (state.isBudgetSatisfied) {
async process({ targets }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (targets.length === 0) {
return targets;
}
let currentDeficit = state.deficitTokens;
let directoryCreated = false;
let blobOutputsDir = this.env.fileSystem.join(
@@ -58,7 +57,7 @@ export class BlobDegradationProcessor implements ContextProcessor {
// Forward scan, looking for bloated non-text parts to degrade
for (const node of targets) {
if (currentDeficit <= 0 || node.type !== 'USER_PROMPT') {
if (node.type !== 'USER_PROMPT') {
returnedNodes.push(node);
continue;
}
@@ -69,7 +68,6 @@ export class BlobDegradationProcessor implements ContextProcessor {
for (let j = 0; j < prompt.semanticParts.length; j++) {
const part = prompt.semanticParts[j];
if (currentDeficit <= 0) break;
// We only target non-text parts that haven't already been masked
if (part.type === 'text') continue;
@@ -119,7 +117,6 @@ export class BlobDegradationProcessor implements ContextProcessor {
if (newText && tokensSaved > 0) {
// Replace the part with a synthetic text part
newParts[j] = { type: 'text', text: newText };
currentDeficit -= tokensSaved;
modified = true;
}
}
@@ -51,15 +51,13 @@ export class EmergencyTruncationProcessor implements ContextProcessor {
async process({
targets,
state,
}: ProcessArgs): Promise<readonly ConcreteNode[]> {
// Calculate how many tokens we need to remove based on the configured knob
let targetTokensToRemove = 0;
const strategy = this.options.target ?? 'max';
if (strategy === 'incremental') {
if (state.currentTokens <= state.maxTokens) return targets;
targetTokensToRemove = state.currentTokens - state.maxTokens;
targetTokensToRemove = Infinity;
} else if (strategy === 'freeNTokens') {
targetTokensToRemove = this.options.freeTokensTarget ?? 0;
if (targetTokensToRemove <= 0) return targets;
@@ -7,7 +7,6 @@ import { describe, it, expect, vi } from 'vitest';
import { HistorySquashingProcessor } from './historySquashingProcessor.js';
import {
createMockEnvironment,
createDummyState,
createDummyNode,
} from '../testing/contextTestUtils.js';
import type { UserPrompt, AgentThought, AgentYield } from '../ir/types.js';
@@ -31,7 +30,6 @@ describe('HistorySquashingProcessor', () => {
maxTokensPerNode: 1, // Will equal 10 chars limit
});
const state = createDummyState(false, 500); // 500 token deficit
const prompt = createDummyNode('ep1', 'USER_PROMPT', 100, {
semanticParts: [
@@ -52,7 +50,6 @@ describe('HistorySquashingProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as any,
});
@@ -78,33 +75,32 @@ describe('HistorySquashingProcessor', () => {
expect(squashedYield.text).toContain('[... OMITTED');
});
it('should stop truncating once the deficit is cleared', async () => {
it('should ignore nodes that are below maxTokensPerNode', async () => {
const env = createMockEnvironment();
const mockTokenCalculator = new ContextTokenCalculator(1, env.behaviorRegistry) as any;
mockTokenCalculator.tokensToChars = vi.fn().mockReturnValue(10);
mockTokenCalculator.tokensToChars = vi.fn().mockReturnValue(100);
mockTokenCalculator.estimateTokensForString = vi.fn((text: string) => {
if (text.includes('OMITTED')) return 0; // Huge savings
return 500;
return text.length;
});
mockTokenCalculator.estimateTokensForParts = vi.fn(() => 0);
mockTokenCalculator.estimateTokensForParts = vi.fn(() => 5);
mockTokenCalculator.getTokenCost = vi.fn(() => 5);
(env as any).tokenCalculator = mockTokenCalculator;
const processor = HistorySquashingProcessor.create(env, {
maxTokensPerNode: 1,
maxTokensPerNode: 100,
});
// Deficit is only 10 tokens. First truncation saves 500.
const state = createDummyState(false, 10);
const prompt = createDummyNode('ep1', 'USER_PROMPT', 500, {
const prompt = createDummyNode('ep1', 'USER_PROMPT', 5, {
semanticParts: [
{ type: 'text', text: 'This text is way longer than 10 characters and needs truncation' }
{ type: 'text', text: 'Short text' } // 10 chars
],
}, 'prompt-id') as UserPrompt;
const thought = createDummyNode('ep1', 'AGENT_THOUGHT', 500, {
text: 'The model is thinking something incredibly long and verbose that exceeds 10 chars',
const thought = createDummyNode('ep1', 'AGENT_THOUGHT', 5, {
text: 'Short thought', // 13 chars
}, 'thought-id') as AgentThought;
const targets = [prompt, thought];
@@ -112,19 +108,17 @@ describe('HistorySquashingProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as any,
});
expect(result.length).toBe(2);
// 1. User Prompt (truncated because deficit > 0)
// 1. User Prompt (untouched)
const squashedPrompt = result[0] as UserPrompt;
expect(squashedPrompt.id).toBe('mock-uuid-1');
expect(squashedPrompt.id).not.toBe(prompt.id);
expect((squashedPrompt.semanticParts[0] as any).text).toContain('[... OMITTED');
expect(squashedPrompt.id).toBe(prompt.id);
expect((squashedPrompt.semanticParts[0] as any).text).not.toContain('[... OMITTED');
// 2. Agent Thought (untouched because deficit is now < 0)
// 2. Agent Thought (untouched)
const untouchedThought = result[1] as AgentThought;
expect(untouchedThought.id).toBe(thought.id);
expect(untouchedThought.text).not.toContain('[... OMITTED');
@@ -47,9 +47,7 @@ export class HistorySquashingProcessor implements ContextProcessor {
private tryApplySquash(
text: string,
limitChars: number,
currentDeficit: number,
): { text: string; newTokens: number; oldTokens: number; tokensSaved: number } | null {
if (currentDeficit <= 0) return null;
const originalLength = text.length;
if (originalLength <= limitChars) return null;
@@ -72,23 +70,17 @@ export class HistorySquashingProcessor implements ContextProcessor {
return null;
}
async process({ targets, state }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (state.isBudgetSatisfied) {
async process({ targets }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (targets.length === 0) {
return targets;
}
const { maxTokensPerNode } = this.options;
const limitChars = this.env.tokenCalculator.tokensToChars(maxTokensPerNode);
let currentDeficit = state.deficitTokens;
const returnedNodes: ConcreteNode[] = [];
for (const node of targets) {
if (currentDeficit <= 0) {
returnedNodes.push(node);
continue;
}
// 1. Squash User Prompts
if (node.type === 'USER_PROMPT') {
const prompt = node;
@@ -97,24 +89,16 @@ export class HistorySquashingProcessor implements ContextProcessor {
for (let j = 0; j < prompt.semanticParts.length; j++) {
const part = prompt.semanticParts[j];
if (currentDeficit <= 0) break;
if (part.type === 'text') {
const squashResult = this.tryApplySquash(part.text, limitChars, currentDeficit);
const squashResult = this.tryApplySquash(part.text, limitChars);
if (squashResult) {
newParts[j] = { type: 'text', text: squashResult.text };
currentDeficit -= squashResult.tokensSaved;
modified = true;
}
}
}
if (modified) {
newParts.map(p => {
if (p.type === 'text') return { text: p.text };
if (p.type === 'inline_data') return { inlineData: { mimeType: p.mimeType, data: p.data } };
if (p.type === 'file_data') return { fileData: { mimeType: p.mimeType, fileUri: p.fileUri } };
return (p as Extract<import('../ir/types.js').SemanticPart, { type: 'raw_part' }>).part;
});
returnedNodes.push({
...prompt,
id: this.env.idGenerator.generateId(),
@@ -129,10 +113,9 @@ export class HistorySquashingProcessor implements ContextProcessor {
// 2. Squash Model Thoughts
if (node.type === 'AGENT_THOUGHT') {
const thought = node;
const squashResult = this.tryApplySquash(thought.text, limitChars, currentDeficit);
const squashResult = this.tryApplySquash(thought.text, limitChars);
if (squashResult) {
currentDeficit -= squashResult.tokensSaved;
returnedNodes.push({
...thought,
id: this.env.idGenerator.generateId(),
@@ -147,10 +130,9 @@ export class HistorySquashingProcessor implements ContextProcessor {
// 3. Squash Agent Yields
if (node.type === 'AGENT_YIELD') {
const agentYield = node;
const squashResult = this.tryApplySquash(agentYield.text, limitChars, currentDeficit);
const squashResult = this.tryApplySquash(agentYield.text, limitChars);
if (squashResult) {
currentDeficit -= squashResult.tokensSaved;
returnedNodes.push({
...agentYield,
id: this.env.idGenerator.generateId(),
@@ -7,40 +7,26 @@ import { describe, it, expect, vi } from 'vitest';
import { SemanticCompressionProcessor } from './semanticCompressionProcessor.js';
import {
createMockEnvironment,
createDummyState,
createDummyNode,
createDummyToolNode,
createMockGenerateContentResponse
} from '../testing/contextTestUtils.js';
import type { UserPrompt, AgentThought, ToolExecution } from '../ir/types.js';
import { ContextTokenCalculator } from '../utils/contextTokenCalculator.js';
describe('SemanticCompressionProcessor', () => {
it('should trigger summarization via LLM for long text parts', async () => {
const mockLlmClient = {
generateContent: vi.fn().mockResolvedValue({
text: 'Mocked Summary!',
}),
generateContent: vi.fn().mockResolvedValue(createMockGenerateContentResponse('Mocked Summary!')), // length = 15
};
const env = createMockEnvironment({
llmClient: mockLlmClient as any,
});
const mockTokenCalculator = new ContextTokenCalculator(1, env.behaviorRegistry) as any;
mockTokenCalculator.tokensToChars = vi.fn().mockReturnValue(10);
mockTokenCalculator.estimateTokensForParts = vi.fn((parts: any) => {
if (parts[0]?.text === 'Mocked Summary!') return 5;
if (parts[0]?.functionResponse?.response?.summary === 'Mocked Summary!') return 10;
return 5000;
});
mockTokenCalculator.getTokenCost = vi.fn().mockReturnValue(5000);
(env as any).tokenCalculator = mockTokenCalculator;
const processor = SemanticCompressionProcessor.create(env, {
nodeThresholdTokens: 10,
});
const state = createDummyState(false, 15000); // We need to save tons of tokens
const prompt = createDummyNode('ep1', 'USER_PROMPT', 3800, {
semanticParts: [
@@ -62,7 +48,6 @@ describe('SemanticCompressionProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as any,
});
@@ -87,42 +72,28 @@ describe('SemanticCompressionProcessor', () => {
expect(mockLlmClient.generateContent).toHaveBeenCalledTimes(3);
});
it('should stop summarizing once the deficit is cleared', async () => {
it('should ignore nodes that are below the threshold', async () => {
const mockLlmClient = {
generateContent: vi.fn().mockResolvedValue({
text: 'Mocked Summary!',
}),
generateContent: vi.fn().mockResolvedValue(createMockGenerateContentResponse('S')), // length = 1
};
const env = createMockEnvironment({
llmClient: mockLlmClient as any,
});
const mockTokenCalculator = new ContextTokenCalculator(1, env.behaviorRegistry) as any;
mockTokenCalculator.tokensToChars = vi.fn().mockReturnValue(10);
// Returning 0 tokens for the summary to maximize savings
mockTokenCalculator.estimateTokensForParts = vi.fn((parts: any) => {
if (parts[0]?.text === 'Mocked Summary!') return 0;
return 5000;
});
mockTokenCalculator.getTokenCost = vi.fn().mockReturnValue(5000);
(env as any).tokenCalculator = mockTokenCalculator;
const processor = SemanticCompressionProcessor.create(env, {
nodeThresholdTokens: 10,
nodeThresholdTokens: 100, // Very high threshold
});
// Deficit is only 10 tokens! The first summarization will save 5000 tokens, clearing it instantly.
const state = createDummyState(false, 10);
const prompt = createDummyNode('ep1', 'USER_PROMPT', 3800, {
semanticParts: [
{ type: 'text', text: 'This text is way longer than 10 characters and needs compression' }
{ type: 'text', text: 'Short text' } // Below threshold
],
}, 'prompt-id') as UserPrompt;
const thought = createDummyNode('ep1', 'AGENT_THOUGHT', 1500, {
text: 'The model is thinking something incredibly long and verbose that exceeds 10 chars',
text: 'Short thought', // Below threshold
}, 'thought-id') as AgentThought;
const targets = [prompt, thought];
@@ -130,22 +101,20 @@ describe('SemanticCompressionProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets,
state,
inbox: {} as any,
});
expect(result.length).toBe(2);
// 1. User Prompt (was summarized because deficit was > 0)
const compressedPrompt = result[0] as UserPrompt;
expect(compressedPrompt.id).not.toBe(prompt.id);
// 1. User Prompt (NOT compressed)
const untouchedPrompt = result[0] as UserPrompt;
expect(untouchedPrompt.id).toBe(prompt.id);
// 2. Agent Thought (was NOT summarized because deficit hit 0)
// 2. Agent Thought (NOT compressed)
const untouchedThought = result[1] as AgentThought;
expect(untouchedThought.id).toBe(thought.id); // Reference equality!
expect(untouchedThought.text).toBe(thought.text);
expect(untouchedThought.id).toBe(thought.id);
// LLM should only have been called once
expect(mockLlmClient.generateContent).toHaveBeenCalledTimes(1);
// LLM should not have been called
expect(mockLlmClient.generateContent).toHaveBeenCalledTimes(0);
});
});
@@ -79,25 +79,15 @@ export class SemanticCompressionProcessor implements ContextProcessor {
}
}
async process({ targets, state }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (state.isBudgetSatisfied) {
return targets;
}
async process({ targets }: ProcessArgs): Promise<readonly ConcreteNode[]> {
const semanticConfig = this.options;
const limitTokens = semanticConfig.nodeThresholdTokens;
const thresholdChars = this.env.tokenCalculator.tokensToChars(limitTokens);
let currentDeficit = state.deficitTokens;
const returnedNodes: ConcreteNode[] = [];
// Scan backwards (oldest to newest would also work, but older is safer to degrade first)
// Scan the target working buffer and unconditionally apply the configured hyperparameter threshold
for (const node of targets) {
if (currentDeficit <= 0) {
returnedNodes.push(node);
continue;
}
// 1. Compress User Prompts
if (node.type === 'USER_PROMPT') {
const prompt = node;
@@ -106,18 +96,21 @@ export class SemanticCompressionProcessor implements ContextProcessor {
for (let j = 0; j < prompt.semanticParts.length; j++) {
const part = prompt.semanticParts[j];
if (currentDeficit <= 0) break;
if (part.type !== 'text') continue;
if (part.text.length > thresholdChars) {
const summary = await this.generateSummary(part.text, 'User Prompt');
const newTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: summary }]);
const oldTokens = this.env.tokenCalculator.estimateTokensForParts([{ text: part.text }]);
console.log(`SMOKING GUN (User Prompt): text.length=${part.text.length}, threshold=${thresholdChars}, newTokens=${newTokens}, oldTokens=${oldTokens}, summary='${summary}'`);
if (newTokens < oldTokens) {
newParts[j] = { type: 'text', text: summary };
currentDeficit -= (oldTokens - newTokens);
modified = true;
console.log('SMOKING GUN (User Prompt): modified=true');
} else {
console.log('SMOKING GUN (User Prompt): modified=false');
}
}
}
@@ -143,7 +136,6 @@ export class SemanticCompressionProcessor implements ContextProcessor {
const oldTokens = this.env.tokenCalculator.getTokenCost(thought);
if (newTokens < oldTokens) {
currentDeficit -= (oldTokens - newTokens);
returnedNodes.push({
...thought,
id: this.env.idGenerator.generateId(),
@@ -190,7 +182,6 @@ export class SemanticCompressionProcessor implements ContextProcessor {
const intentTokens = tool.tokens?.intent ?? 0;
if (newObsTokens < oldObsTokens) {
currentDeficit -= (oldObsTokens - newObsTokens);
returnedNodes.push({
...tool,
id: this.env.idGenerator.generateId(),
@@ -7,7 +7,6 @@ import { describe, it, expect } from 'vitest';
import { StateSnapshotProcessor } from './stateSnapshotProcessor.js';
import {
createMockEnvironment,
createDummyState,
createDummyNode,
} from '../testing/contextTestUtils.js';
import { InboxSnapshotImpl } from '../sidecar/inbox.js';
@@ -16,18 +15,16 @@ describe('StateSnapshotProcessor', () => {
it('should ignore if budget is satisfied', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
const state = createDummyState(true); // satisfied
const targets = [createDummyNode('ep1', 'USER_PROMPT')];
const inbox = new InboxSnapshotImpl([]);
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, state, inbox });
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
expect(result).toBe(targets); // Strict equality
});
it('should apply a valid snapshot from the Inbox (Fast Path)', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
const state = createDummyState(false, 100);
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
@@ -48,7 +45,7 @@ describe('StateSnapshotProcessor', () => {
}
]);
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, state, inbox });
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
// Should remove A and B, insert Snapshot, keep C
expect(result.length).toBe(2);
@@ -63,7 +60,6 @@ describe('StateSnapshotProcessor', () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'incremental' });
// Make deficit 0 so we don't fall through to the sync backstop and fail the test that way
const state = createDummyState(false, 0);
// node-A is MISSING (user deleted it)
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
@@ -81,7 +77,7 @@ describe('StateSnapshotProcessor', () => {
}
]);
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, state, inbox });
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
// Because deficit is 0, and Inbox was rejected, nothing should change
expect(result.length).toBe(1);
@@ -92,7 +88,6 @@ describe('StateSnapshotProcessor', () => {
it('should fall back to sync backstop if inbox is empty', async () => {
const env = createMockEnvironment();
const processor = StateSnapshotProcessor.create(env, { target: 'max' }); // Summarize all
const state = createDummyState(false, 100);
const nodeA = createDummyNode('ep1', 'USER_PROMPT', 50, {}, 'node-A');
const nodeB = createDummyNode('ep1', 'AGENT_THOUGHT', 60, {}, 'node-B');
@@ -100,7 +95,7 @@ describe('StateSnapshotProcessor', () => {
const targets = [nodeA, nodeB, nodeC];
const inbox = new InboxSnapshotImpl([]);
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, state, inbox });
const result = await processor.process({ buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer, targets, inbox });
// Should synthesize a new snapshot synchronously
expect(env.llmClient.generateContent).toHaveBeenCalled();
@@ -51,8 +51,8 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
}
// --- ContextProcessor Interface (Sync Backstop / Cache Application) ---
async process({ targets, state, inbox }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (state.isBudgetSatisfied) {
async process({ targets, inbox }: ProcessArgs): Promise<readonly ConcreteNode[]> {
if (targets.length === 0) {
return targets;
}
@@ -104,9 +104,9 @@ export class StateSnapshotProcessor implements ContextProcessor, ContextWorker {
let targetTokensToRemove = 0;
if (strategy === 'incremental') {
targetTokensToRemove = state.deficitTokens;
targetTokensToRemove = Infinity; // incremental implies removing as much as possible if no state is passed
} else if (strategy === 'freeNTokens') {
targetTokensToRemove = this.options.freeTokensTarget ?? state.deficitTokens;
targetTokensToRemove = this.options.freeTokensTarget ?? Infinity;
} else if (strategy === 'max') {
targetTokensToRemove = Infinity;
}
@@ -7,7 +7,6 @@ import { describe, it, expect, vi } from 'vitest';
import { ToolMaskingProcessor } from './toolMaskingProcessor.js';
import {
createMockEnvironment,
createDummyState,
createDummyToolNode,
} from '../testing/contextTestUtils.js';
@@ -23,7 +22,6 @@ describe('ToolMaskingProcessor', () => {
stringLengthThresholdTokens: 10,
});
const state = createDummyState(false, 500);
const toolStep = createDummyToolNode('ep1', 50, 100, {
observation: {
@@ -35,7 +33,6 @@ describe('ToolMaskingProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets: [toolStep],
state,
inbox: {} as any,
});
@@ -60,7 +57,6 @@ describe('ToolMaskingProcessor', () => {
stringLengthThresholdTokens: 10,
});
const state = createDummyState(false, 500);
const toolStep = createDummyToolNode('ep1', 10, 10, {
toolName: 'activate_skill',
@@ -72,7 +68,6 @@ describe('ToolMaskingProcessor', () => {
const result = await processor.process({
buffer: {} as unknown as import('../pipeline.js').ContextWorkingBuffer,
targets: [toolStep],
state,
inbox: {} as any,
});
@@ -96,12 +96,11 @@ export class ToolMaskingProcessor implements ContextProcessor {
return text.includes('<tool_output_masked>');
}
async process({ targets, state }: ProcessArgs): Promise<readonly ConcreteNode[]> {
async process({ targets }: ProcessArgs): Promise<readonly ConcreteNode[]> {
const maskingConfig = this.options;
if (!maskingConfig) return targets;
if (state.isBudgetSatisfied) return targets;
if (targets.length === 0) return targets;
let currentDeficit = state.deficitTokens;
const limitChars = this.env.tokenCalculator.tokensToChars(
maskingConfig.stringLengthThresholdTokens,
);
@@ -148,7 +147,7 @@ export class ToolMaskingProcessor implements ContextProcessor {
const returnedNodes: ConcreteNode[] = [];
for (const node of targets) {
if (currentDeficit <= 0 || node.type !== 'TOOL_EXECUTION') {
if (node.type !== 'TOOL_EXECUTION') {
returnedNodes.push(node);
continue;
}
@@ -267,7 +266,6 @@ export class ToolMaskingProcessor implements ContextProcessor {
};
returnedNodes.push(maskedNode);
currentDeficit -= tokensSaved;
} else {
returnedNodes.push(node);
}
@@ -9,7 +9,6 @@ import { PipelineOrchestrator } from './orchestrator.js';
import { ProcessorRegistry } from './registry.js';
import {
createMockEnvironment,
createDummyState,
createDummyNode,
} from '../testing/contextTestUtils.js';
import type { ContextEnvironment } from './environment.js';
@@ -167,13 +166,12 @@ describe('PipelineOrchestrator (Component)', () => {
);
const ship = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
const state = createDummyState(false, 0, new Set());
const result = await orchestrator.executeTriggerSync(
'new_message',
ship,
new Set(ship.map((s) => s.id)),
state,
new Set(),
);
expect(result).toHaveLength(1);
@@ -202,14 +200,13 @@ describe('PipelineOrchestrator (Component)', () => {
);
const ship = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
const state = createDummyState(false, 0, new Set());
// This should resolve immediately with the UNMODIFIED array because execution is background
const result = await orchestrator.executeTriggerSync(
'new_message',
ship,
new Set(ship.map((s) => s.id)),
state,
new Set(),
);
expect(result).toHaveLength(1);
@@ -241,14 +238,13 @@ describe('PipelineOrchestrator (Component)', () => {
);
const ship = [createDummyNode('not-protected-ep', 'USER_PROMPT', 100, undefined, 'not-protected-id')];
const state = createDummyState(false, 0, new Set());
// It should not throw! It should swallow the error and return the unmodified array.
const result = await orchestrator.executeTriggerSync(
'new_message',
ship,
new Set(ship.map((s) => s.id)),
state,
new Set(),
);
expect(result).toHaveLength(1);
@@ -8,7 +8,6 @@ import type { ConcreteNode } from '../ir/types.js';
import type {
ContextProcessor,
ContextWorker,
ContextAccountingState,
} from '../pipeline.js';
import type { SidecarConfig, PipelineDef, PipelineTrigger } from './types.js';
import type {
@@ -19,6 +18,40 @@ import type {
import type { ProcessorRegistry } from './registry.js';
import { debugLogger } from '../../utils/debugLogger.js';
import { InboxSnapshotImpl } from './inbox.js';
import type { ContextWorkingBuffer } from '../pipeline.js';
class ContextWorkingBufferImpl implements ContextWorkingBuffer {
private readonly nodesMap: Map<string, ConcreteNode>;
constructor(
public readonly nodes: readonly ConcreteNode[],
) {
this.nodesMap = new Map(nodes.map(n => [n.id, n]));
}
getPristineNode(id: string): ConcreteNode | undefined {
// In V2, pristine nodes are accessed via the IrMapper's state tracking or through the history
// Since orchestrator doesn't natively hold the original pristine graph, we search current buffer
// or rely on the env's capability. For now, since pristine graph is maintained in ContextManager,
// we just return the node from the current buffer if we don't have a direct pristine link.
// To fully implement pristine lookup, we would need to pass the pristine graph from ContextManager.
return this.nodesMap.get(id);
}
getLineage(id: string): readonly ConcreteNode[] {
const lineage: ConcreteNode[] = [];
let current = this.nodesMap.get(id);
while (current) {
lineage.push(current);
if (current.logicalParentId && current.logicalParentId !== current.id) {
current = this.nodesMap.get(current.logicalParentId);
} else {
break;
}
}
return lineage;
}
}
export class PipelineOrchestrator {
private activeTimers: NodeJS.Timeout[] = [];
@@ -40,13 +73,13 @@ export class PipelineOrchestrator {
private isNodeAllowed(
node: import('../ir/types.js').ConcreteNode,
triggerTargets: ReadonlySet<string>,
state: ContextAccountingState,
protectedLogicalIds: ReadonlySet<string> = new Set(),
): boolean {
return (
triggerTargets.has(node.id) &&
!state.protectedLogicalIds.has(node.id) &&
!protectedLogicalIds.has(node.id) &&
(!node.logicalParentId ||
!state.protectedLogicalIds.has(node.logicalParentId))
!protectedLogicalIds.has(node.logicalParentId))
);
}
@@ -90,36 +123,20 @@ export class PipelineOrchestrator {
this.activeTimers.push(timer);
} else if (trigger === 'retained_exceeded') {
this.eventBus.onConsolidationNeeded((event) => {
const state: ContextAccountingState = {
currentTokens: 0,
retainedTokens: this.config.budget.retainedTokens,
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: event.targetDeficit,
protectedLogicalIds: new Set(),
};
void this.executePipelineAsync(
pipeline,
[],
event.targetNodeIds,
state,
new Set(), // protected IDs
);
});
} else if (trigger === 'new_message') {
this.eventBus.onChunkReceived((event) => {
const state: ContextAccountingState = {
currentTokens: 0,
retainedTokens: this.config.budget.retainedTokens,
maxTokens: this.config.budget.maxTokens,
isBudgetSatisfied: false,
deficitTokens: 0,
protectedLogicalIds: new Set(),
};
void this.executePipelineAsync(
pipeline,
[],
event.targetNodeIds,
state,
new Set(), // protected IDs
);
});
}
@@ -206,7 +223,7 @@ export class PipelineOrchestrator {
trigger: PipelineTrigger,
ship: readonly ConcreteNode[],
triggerTargets: ReadonlySet<string>,
state: ContextAccountingState,
protectedLogicalIds: ReadonlySet<string> = new Set(),
): Promise<readonly ConcreteNode[]> {
let currentShip = ship;
const pipelines = this.config.pipelines.filter((p) =>
@@ -230,13 +247,12 @@ export class PipelineOrchestrator {
);
const allowedTargets = currentShip.filter((n) =>
this.isNodeAllowed(n, triggerTargets, state),
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds),
);
const returnedNodes = await processor.process({
buffer: {} as any, // TODO: Implement ContextWorkingBuffer fully
buffer: new ContextWorkingBufferImpl(currentShip),
targets: allowedTargets,
state,
inbox: inboxSnapshot,
});
@@ -264,7 +280,7 @@ export class PipelineOrchestrator {
pipeline: PipelineDef,
ship: readonly ConcreteNode[],
triggerTargets: Set<string>,
state: ContextAccountingState,
protectedLogicalIds: ReadonlySet<string> = new Set(),
) {
this.tracer.logEvent(
'Orchestrator',
@@ -288,13 +304,12 @@ export class PipelineOrchestrator {
);
const allowedTargets = currentShip.filter((n) =>
this.isNodeAllowed(n, triggerTargets, state),
this.isNodeAllowed(n, triggerTargets, protectedLogicalIds),
);
const returnedNodes = await processor.process({
buffer: {} as any,
buffer: new ContextWorkingBufferImpl(currentShip),
targets: allowedTargets,
state,
inbox: inboxSnapshot,
});
@@ -129,15 +129,8 @@ export class SimulationHarness {
'gc_backstop',
currentView,
new Set(currentView.map(e => e.id)),
{
currentTokens,
maxTokens: this.config.budget.maxTokens,
retainedTokens: this.config.budget.retainedTokens,
isBudgetSatisfied: false,
deficitTokens: currentTokens - this.config.budget.maxTokens,
protectedLogicalIds: new Set<string>(),
});
new Set<string>(),
);
// Inject the truncated view back into the graph
for (let i = 0; i < currentView.length; i++) {
const ep = currentView[i];
@@ -20,30 +20,23 @@ import { registerBuiltInBehaviors } from '../ir/builtinBehaviors.js';
import { IrMapper } from '../ir/mapper.js';
import { ProcessorRegistry } from '../sidecar/registry.js';
import { registerBuiltInProcessors } from '../sidecar/builtins.js';
import type { ContextAccountingState } from '../pipeline.js';
import type { ConcreteNode, ToolExecution } from '../ir/types.js';
import type { ContextEnvironment } from '../sidecar/environment.js';
import type { Config } from '../../config/config.js';
import type { BaseLlmClient } from '../../core/baseLlmClient.js';
import type { Content } from '@google/genai';
export function createDummyState(
isSatisfied = false,
deficit = 0,
protectedIds = new Set<string>(),
currentTokens = 5000,
maxTokens = 10000,
retainedTokens = 4000,
): ContextAccountingState {
return {
currentTokens,
maxTokens,
retainedTokens,
deficitTokens: deficit,
protectedLogicalIds: protectedIds,
isBudgetSatisfied: isSatisfied,
};
}
import type { GenerateContentResponse } from '@google/genai';
/**
* Creates a valid mock GenerateContentResponse with the provided text.
* Used to avoid having to manually construct the deeply nested candidate/content/part structure.
*/
export const createMockGenerateContentResponse = (text: string): GenerateContentResponse =>
({
candidates: [{ content: { role: 'model', parts: [{ text }] }, index: 0 }],
}) as GenerateContentResponse;
export function createDummyNode(
logicalParentId: string,
@@ -112,9 +105,7 @@ export function createMockEnvironment(
return {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
llmClient: vi.fn().mockReturnValue({
generateContent: vi.fn().mockResolvedValue({
text: 'Mock LLM summary response',
}),
generateContent: vi.fn().mockResolvedValue(createMockGenerateContentResponse('Mock LLM summary response')),
})() as unknown as BaseLlmClient,
promptId: 'mock-prompt-id',
sessionId: 'mock-session',