complete snapshotter

This commit is contained in:
Your Name
2026-04-09 01:05:13 +00:00
parent 4a34f64efa
commit f7b67ec3de
5 changed files with 47 additions and 7 deletions
+1
View File
@@ -41,6 +41,7 @@ export interface ContextWorker {
readonly name: string;
readonly triggers: {
onNodesAdded?: boolean;
onNodesAgedOut?: boolean;
onInboxTopics?: string[];
};
execute(args: {
@@ -30,7 +30,7 @@ export class StateSnapshotWorker implements ContextWorker {
// Triggers when nodes exceed retained threshold (via retained_exceeded in Orchestrator)
readonly triggers = {
onNodesAdded: true,
onNodesAgedOut: true,
};
constructor(env: ContextEnvironment, options: StateSnapshotWorkerOptions) {
@@ -125,7 +125,7 @@ export class PipelineOrchestrator {
this.eventBus.onConsolidationNeeded((event) => {
void this.executePipelineAsync(
pipeline,
[],
event.ship,
event.targetNodeIds,
new Set(), // protected IDs
);
@@ -134,7 +134,7 @@ export class PipelineOrchestrator {
this.eventBus.onChunkReceived((event) => {
void this.executePipelineAsync(
pipeline,
[],
event.ship,
event.targetNodeIds,
new Set(), // protected IDs
);
@@ -143,7 +143,7 @@ export class PipelineOrchestrator {
}
}
// 2. Worker Triggers (onNodesAdded is roughly onChunkReceived for now)
// 2. Worker Triggers (onNodesAdded / onNodesAgedOut)
this.eventBus.onChunkReceived((event) => {
// Fire all workers that care about new nodes
for (const worker of this.instantiatedWorkers.values()) {
@@ -151,14 +151,31 @@ export class PipelineOrchestrator {
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox?.getMessages() || [],
);
const targets = event.ship.filter(n => event.targetNodeIds.has(n.id));
// Fire and forget
worker.execute({ targets: [], inbox: inboxSnapshot }).catch((e) => {
worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => {
debugLogger.error(`Worker ${worker.name} failed onNodesAdded:`, e);
});
}
}
});
this.eventBus.onConsolidationNeeded((event) => {
// Fire all workers that care about aged out nodes
for (const worker of this.instantiatedWorkers.values()) {
if (worker.triggers.onNodesAgedOut) {
const inboxSnapshot = new InboxSnapshotImpl(
this.env.inbox?.getMessages() || [],
);
const targets = event.ship.filter(n => event.targetNodeIds.has(n.id));
// Fire and forget
worker.execute({ targets, inbox: inboxSnapshot }).catch((e) => {
debugLogger.error(`Worker ${worker.name} failed onNodesAgedOut:`, e);
});
}
}
});
// We don't have a formal event bus for inbox publish yet, but we will soon.
// For now the workers are just registered.
}
+12 -2
View File
@@ -15,6 +15,14 @@ export const defaultSidecarProfile: SidecarConfig = {
retainedTokens: 65000,
maxTokens: 150000,
},
workers: [
{
workerId: 'StateSnapshotWorker',
options: {
type: 'accumulate',
},
},
],
pipelines: [
{
name: 'Immediate Sanitization',
@@ -41,7 +49,6 @@ export const defaultSidecarProfile: SidecarConfig = {
processorId: 'SemanticCompressionProcessor',
options: { nodeThresholdTokens: 5000 },
},
{ processorId: 'StateSnapshotProcessor', options: {} },
],
},
{
@@ -49,7 +56,10 @@ export const defaultSidecarProfile: SidecarConfig = {
triggers: ['gc_backstop'],
execution: 'blocking',
processors: [
{ processorId: 'StateSnapshotProcessor', options: {} },
{
processorId: 'StateSnapshotProcessor',
options: { target: 'max' }
},
{ processorId: 'EmergencyTruncationProcessor', options: {} },
],
},
@@ -32,6 +32,18 @@ export function getSidecarConfigSchema(registry: ProcessorRegistry) {
},
},
},
workers: {
type: 'array',
description: 'Background workers that proactively accumulate context.',
items: {
type: 'object',
required: ['workerId'],
properties: {
workerId: { type: 'string' },
options: { type: 'object' },
},
},
},
pipelines: {
type: 'array',
description: 'The execution graphs for context manipulation.',