feat(scheduler): support multi-scheduler tool aggregation and nested call IDs (#17429)

This commit is contained in:
Abhi
2026-01-26 13:38:11 -05:00
committed by GitHub
parent 3e1a377d78
commit d745d86af1
9 changed files with 241 additions and 23 deletions
@@ -26,6 +26,7 @@ export enum MessageBusType {
export interface ToolCallsUpdateMessage {
type: MessageBusType.TOOL_CALLS_UPDATE;
toolCalls: ToolCall[];
schedulerId: string;
}
export interface ToolConfirmationRequest {
@@ -29,6 +29,7 @@ import {
import type { SchedulerStateManager } from './state-manager.js';
import type { ToolModificationHandler } from './tool-modifier.js';
import type { ValidatingToolCall, WaitingToolCall } from './types.js';
import { ROOT_SCHEDULER_ID } from './types.js';
import type { Config } from '../config/config.js';
import type { EditorType } from '../utils/editor.js';
import { randomUUID } from 'node:crypto';
@@ -52,7 +53,7 @@ describe('confirmation.ts', () => {
});
afterEach(() => {
vi.clearAllMocks();
vi.restoreAllMocks();
});
const emitResponse = (response: ToolConfirmationResponse) => {
@@ -188,6 +189,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
});
expect(result.outcome).toBe(ToolConfirmationOutcome.ProceedOnce);
@@ -217,6 +219,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
});
await listenerPromise;
@@ -252,6 +255,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
});
await waitForListener(MessageBusType.TOOL_CONFIRMATION_RESPONSE);
@@ -293,6 +297,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
});
await listenerPromise1;
@@ -351,6 +356,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
});
await listenerPromise;
@@ -397,6 +403,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
});
const result = await promise;
@@ -420,6 +427,7 @@ describe('confirmation.ts', () => {
state: mockState,
modifier: mockModifier,
getPreferredEditor,
schedulerId: ROOT_SCHEDULER_ID,
}),
).rejects.toThrow(/lost during confirmation loop/);
});
@@ -103,6 +103,7 @@ export async function resolveConfirmation(
state: SchedulerStateManager;
modifier: ToolModificationHandler;
getPreferredEditor: () => EditorType | undefined;
schedulerId: string;
},
): Promise<ResolutionResult> {
const { state } = deps;
@@ -66,6 +66,7 @@ import type {
CancelledToolCall,
ToolCallResponseInfo,
} from './types.js';
import { ROOT_SCHEDULER_ID } from './types.js';
import { ToolErrorType } from '../tools/tool-error.js';
import * as ToolUtils from '../utils/tool-utils.js';
import type { EditorType } from '../utils/editor.js';
@@ -94,6 +95,8 @@ describe('Scheduler (Orchestrator)', () => {
args: { foo: 'bar' },
isClientInitiated: false,
prompt_id: 'prompt-1',
schedulerId: ROOT_SCHEDULER_ID,
parentCallId: undefined,
};
const req2: ToolCallRequestInfo = {
@@ -102,6 +105,8 @@ describe('Scheduler (Orchestrator)', () => {
args: { foo: 'baz' },
isClientInitiated: false,
prompt_id: 'prompt-1',
schedulerId: ROOT_SCHEDULER_ID,
parentCallId: undefined,
};
const mockTool = {
@@ -208,6 +213,7 @@ describe('Scheduler (Orchestrator)', () => {
config: mockConfig,
messageBus: mockMessageBus,
getPreferredEditor,
schedulerId: 'root',
});
// Reset Tool build behavior
@@ -271,6 +277,8 @@ describe('Scheduler (Orchestrator)', () => {
request: req1,
tool: mockTool,
invocation: mockInvocation,
schedulerId: ROOT_SCHEDULER_ID,
startTime: expect.any(Number),
}),
]),
);
@@ -769,6 +777,7 @@ describe('Scheduler (Orchestrator)', () => {
config: mockConfig,
messageBus: mockMessageBus,
state: mockStateManager,
schedulerId: ROOT_SCHEDULER_ID,
}),
);
+18 -3
View File
@@ -48,6 +48,8 @@ export interface SchedulerOptions {
config: Config;
messageBus: MessageBus;
getPreferredEditor: () => EditorType | undefined;
schedulerId: string;
parentCallId?: string;
}
const createErrorResponse = (
@@ -85,6 +87,8 @@ export class Scheduler {
private readonly config: Config;
private readonly messageBus: MessageBus;
private readonly getPreferredEditor: () => EditorType | undefined;
private readonly schedulerId: string;
private readonly parentCallId?: string;
private isProcessing = false;
private isCancelling = false;
@@ -94,7 +98,9 @@ export class Scheduler {
this.config = options.config;
this.messageBus = options.messageBus;
this.getPreferredEditor = options.getPreferredEditor;
this.state = new SchedulerStateManager(this.messageBus);
this.schedulerId = options.schedulerId;
this.parentCallId = options.parentCallId;
this.state = new SchedulerStateManager(this.messageBus, this.schedulerId);
this.executor = new ToolExecutor(this.config);
this.modifier = new ToolModificationHandler();
@@ -228,16 +234,21 @@ export class Scheduler {
try {
const toolRegistry = this.config.getToolRegistry();
const newCalls: ToolCall[] = requests.map((request) => {
const enrichedRequest: ToolCallRequestInfo = {
...request,
schedulerId: this.schedulerId,
parentCallId: this.parentCallId,
};
const tool = toolRegistry.getTool(request.name);
if (!tool) {
return this._createToolNotFoundErroredToolCall(
request,
enrichedRequest,
toolRegistry.getAllToolNames(),
);
}
return this._validateAndCreateToolCall(request, tool);
return this._validateAndCreateToolCall(enrichedRequest, tool);
});
this.state.enqueue(newCalls);
@@ -263,6 +274,7 @@ export class Scheduler {
ToolErrorType.TOOL_NOT_REGISTERED,
),
durationMs: 0,
schedulerId: this.schedulerId,
};
}
@@ -278,6 +290,7 @@ export class Scheduler {
tool,
invocation,
startTime: Date.now(),
schedulerId: this.schedulerId,
};
} catch (e) {
return {
@@ -290,6 +303,7 @@ export class Scheduler {
ToolErrorType.INVALID_TOOL_PARAMS,
),
durationMs: 0,
schedulerId: this.schedulerId,
};
}
}
@@ -411,6 +425,7 @@ export class Scheduler {
state: this.state,
modifier: this.modifier,
getPreferredEditor: this.getPreferredEditor,
schedulerId: this.schedulerId,
});
outcome = result.outcome;
lastDetails = result.lastDetails;
+13 -1
View File
@@ -17,6 +17,7 @@ import type {
ExecutingToolCall,
ToolCallResponseInfo,
} from './types.js';
import { ROOT_SCHEDULER_ID } from './types.js';
import type {
ToolConfirmationOutcome,
ToolResultDisplay,
@@ -39,7 +40,10 @@ export class SchedulerStateManager {
private readonly queue: ToolCall[] = [];
private _completedBatch: CompletedToolCall[] = [];
constructor(private readonly messageBus: MessageBus) {}
constructor(
private readonly messageBus: MessageBus,
private readonly schedulerId: string = ROOT_SCHEDULER_ID,
) {}
addToolCalls(calls: ToolCall[]): void {
this.enqueue(calls);
@@ -201,6 +205,7 @@ export class SchedulerStateManager {
void this.messageBus.publish({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: snapshot,
schedulerId: this.schedulerId,
});
}
@@ -321,6 +326,7 @@ export class SchedulerStateManager {
response,
durationMs: startTime ? Date.now() - startTime : undefined,
outcome: call.outcome,
schedulerId: call.schedulerId,
};
}
@@ -336,6 +342,7 @@ export class SchedulerStateManager {
response,
durationMs: startTime ? Date.now() - startTime : undefined,
outcome: call.outcome,
schedulerId: call.schedulerId,
};
}
@@ -364,6 +371,7 @@ export class SchedulerStateManager {
startTime: 'startTime' in call ? call.startTime : undefined,
outcome: call.outcome,
invocation: call.invocation,
schedulerId: call.schedulerId,
};
}
@@ -388,6 +396,7 @@ export class SchedulerStateManager {
startTime: 'startTime' in call ? call.startTime : undefined,
outcome: call.outcome,
invocation: call.invocation,
schedulerId: call.schedulerId,
};
}
@@ -442,6 +451,7 @@ export class SchedulerStateManager {
},
durationMs: startTime ? Date.now() - startTime : undefined,
outcome: call.outcome,
schedulerId: call.schedulerId,
};
}
@@ -462,6 +472,7 @@ export class SchedulerStateManager {
startTime: 'startTime' in call ? call.startTime : undefined,
outcome: call.outcome,
invocation: call.invocation,
schedulerId: call.schedulerId,
};
}
@@ -482,6 +493,7 @@ export class SchedulerStateManager {
invocation: call.invocation,
liveOutput,
pid,
schedulerId: call.schedulerId,
};
}
}
+11
View File
@@ -16,6 +16,8 @@ import type { AnsiOutput } from '../utils/terminalSerializer.js';
import type { ToolErrorType } from '../tools/tool-error.js';
import type { SerializableConfirmationDetails } from '../confirmation-bus/types.js';
export const ROOT_SCHEDULER_ID = 'root';
export interface ToolCallRequestInfo {
callId: string;
name: string;
@@ -24,6 +26,8 @@ export interface ToolCallRequestInfo {
prompt_id: string;
checkpoint?: string;
traceId?: string;
parentCallId?: string;
schedulerId?: string;
}
export interface ToolCallResponseInfo {
@@ -43,6 +47,7 @@ export type ValidatingToolCall = {
invocation: AnyToolInvocation;
startTime?: number;
outcome?: ToolConfirmationOutcome;
schedulerId?: string;
};
export type ScheduledToolCall = {
@@ -52,6 +57,7 @@ export type ScheduledToolCall = {
invocation: AnyToolInvocation;
startTime?: number;
outcome?: ToolConfirmationOutcome;
schedulerId?: string;
};
export type ErroredToolCall = {
@@ -61,6 +67,7 @@ export type ErroredToolCall = {
tool?: AnyDeclarativeTool;
durationMs?: number;
outcome?: ToolConfirmationOutcome;
schedulerId?: string;
};
export type SuccessfulToolCall = {
@@ -71,6 +78,7 @@ export type SuccessfulToolCall = {
invocation: AnyToolInvocation;
durationMs?: number;
outcome?: ToolConfirmationOutcome;
schedulerId?: string;
};
export type ExecutingToolCall = {
@@ -82,6 +90,7 @@ export type ExecutingToolCall = {
startTime?: number;
outcome?: ToolConfirmationOutcome;
pid?: number;
schedulerId?: string;
};
export type CancelledToolCall = {
@@ -92,6 +101,7 @@ export type CancelledToolCall = {
invocation: AnyToolInvocation;
durationMs?: number;
outcome?: ToolConfirmationOutcome;
schedulerId?: string;
};
export type WaitingToolCall = {
@@ -113,6 +123,7 @@ export type WaitingToolCall = {
correlationId?: string;
startTime?: number;
outcome?: ToolConfirmationOutcome;
schedulerId?: string;
};
export type Status = ToolCall['status'];