Fixing race condition in task updates

This commit is contained in:
Keith Schaab
2026-05-04 21:23:21 +00:00
parent 4e175527a2
commit dc82d97b09
4 changed files with 181 additions and 16 deletions
@@ -66,6 +66,7 @@ describe('Task Event-Driven Scheduler', () => {
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
expect(mockEventBus.publish).toHaveBeenCalledWith(
@@ -106,6 +107,7 @@ describe('Task Event-Driven Scheduler', () => {
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
// Simulate A2A client confirmation
@@ -148,7 +150,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
// Simulate Rejection (Cancel)
const handled = await (
@@ -174,7 +180,11 @@ describe('Task Event-Driven Scheduler', () => {
correlationId: 'corr-2',
confirmationDetails: { type: 'info', title: 'test', prompt: 'test' },
};
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall2] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall2],
schedulerId: 'task-id',
});
// Simulate ModifyWithEditor
const handled2 = await (
@@ -215,7 +225,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
// Simulate ProceedOnce for MCP
const handled = await (
@@ -255,7 +269,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
const handled = await (
task as unknown as {
@@ -294,7 +312,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
const handled = await (
task as unknown as {
@@ -333,7 +355,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
const handled = await (
task as unknown as {
@@ -376,7 +402,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (yoloMessageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
// Should NOT auto-publish ProceedOnce anymore, because PolicyEngine handles it directly
expect(yoloMessageBus.publish).not.toHaveBeenCalledWith(
@@ -419,6 +449,7 @@ describe('Task Event-Driven Scheduler', () => {
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
// Should publish artifact update for output
@@ -453,7 +484,11 @@ describe('Task Event-Driven Scheduler', () => {
const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];
handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] });
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall],
schedulerId: 'task-id',
});
// The tool should be complete and registered appropriately, eventually
// triggering the toolCompletionPromise resolution when all clear.
@@ -533,6 +568,7 @@ describe('Task Event-Driven Scheduler', () => {
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall1, toolCall2],
schedulerId: 'task-id',
});
// Confirm first tool call
@@ -600,6 +636,7 @@ describe('Task Event-Driven Scheduler', () => {
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall1, toolCall2],
schedulerId: 'task-id',
});
// Should NOT transition to input-required yet
@@ -621,6 +658,7 @@ describe('Task Event-Driven Scheduler', () => {
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [toolCall1Complete, toolCall2],
schedulerId: 'task-id',
});
// Now it should transition
+106
View File
@@ -460,4 +460,110 @@ describe('Task', () => {
expect(task.currentPromptId).toBe(expectedPromptId2);
});
});
describe('Race Condition Fix', () => {
const mockConfig = createMockConfig();
const mockEventBus: ExecutionEventBus = {
publish: vi.fn(),
on: vi.fn(),
off: vi.fn(),
once: vi.fn(),
removeAllListeners: vi.fn(),
finished: vi.fn(),
};
beforeEach(() => {
vi.clearAllMocks();
});
it('should NOT transition to input-required if a tool is still validating', async () => {
// @ts-expect-error - Calling private constructor
const task = new Task(
'task-id',
'context-id',
mockConfig as Config,
mockEventBus,
);
// Manually register two tool calls
task['_registerToolCall']('tool-1', 'awaiting_approval');
task['_registerToolCall']('tool-2', 'validating');
// Call checkInputRequiredState (private)
task['checkInputRequiredState']();
// Verify task state did NOT change to input-required
expect(task.taskState).not.toBe('input-required');
expect(mockEventBus.publish).not.toHaveBeenCalledWith(
expect.objectContaining({
status: expect.objectContaining({ state: 'input-required' }),
}),
);
});
it('should transition to input-required if all active tools are awaiting approval', async () => {
// @ts-expect-error - Calling private constructor
const task = new Task(
'task-id',
'context-id',
mockConfig as Config,
mockEventBus,
);
// Transition from submitted to working first to simulate normal flow
task.taskState = 'working';
// Manually register tool calls
task['_registerToolCall']('tool-1', 'awaiting_approval');
// Call checkInputRequiredState
task['checkInputRequiredState']();
// Verify task state changed to input-required
expect(task.taskState).toBe('input-required');
expect(mockEventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
status: expect.objectContaining({ state: 'input-required' }),
}),
);
});
it('handleEventDrivenToolCallsUpdate should ignore events for other schedulers', async () => {
// @ts-expect-error - Calling private constructor
const task = new Task(
'task-id',
'context-id',
mockConfig as Config,
mockEventBus,
);
const handleEventDrivenToolCallSpy = vi.spyOn(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
task as any,
'handleEventDrivenToolCall',
);
const otherEvent = {
type: 'tool-calls-update',
toolCalls: [{ request: { callId: '1' }, status: 'executing' }],
schedulerId: 'other-task-id',
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
task['handleEventDrivenToolCallsUpdate'](otherEvent as any);
expect(handleEventDrivenToolCallSpy).not.toHaveBeenCalled();
const ownEvent = {
type: 'tool-calls-update',
toolCalls: [{ request: { callId: '1' }, status: 'executing' }],
schedulerId: 'task-id',
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
task['handleEventDrivenToolCallsUpdate'](ownEvent as any);
expect(handleEventDrivenToolCallSpy).toHaveBeenCalled();
});
});
});
+9 -2
View File
@@ -413,7 +413,10 @@ export class Task {
private handleEventDrivenToolCallsUpdate(
event: ToolCallsUpdateMessage,
): void {
if (event.type !== MessageBusType.TOOL_CALLS_UPDATE) {
if (
event.type !== MessageBusType.TOOL_CALLS_UPDATE ||
event.schedulerId !== this.id
) {
return;
}
@@ -508,7 +511,11 @@ export class Task {
let isExecuting = false;
for (const [callId, status] of this.pendingToolCalls.entries()) {
if (status === 'executing' || status === 'scheduled') {
if (
status === 'executing' ||
status === 'scheduled' ||
status === 'validating'
) {
isExecuting = true;
} else if (
status === 'awaiting_approval' &&
+20 -6
View File
@@ -381,11 +381,11 @@ describe('E2E Tests', () => {
]);
// 6. Tool 1 is awaiting approval.
const toolCallAwaitEvent = events[5].result as TaskStatusUpdateEvent;
expect(toolCallAwaitEvent.metadata?.['coderAgent']).toMatchObject({
const toolCallAwaitEvent1 = events[5].result as TaskStatusUpdateEvent;
expect(toolCallAwaitEvent1.metadata?.['coderAgent']).toMatchObject({
kind: 'tool-call-confirmation',
});
expect(toolCallAwaitEvent.status.message?.parts).toMatchObject([
expect(toolCallAwaitEvent1.status.message?.parts).toMatchObject([
{
data: {
request: { callId: 'test-call-id-1' },
@@ -394,14 +394,28 @@ describe('E2E Tests', () => {
},
]);
// 7. The final event is "input-required".
const finalEvent = events[6].result as TaskStatusUpdateEvent;
// 7. Tool 2 is awaiting approval.
const toolCallAwaitEvent2 = events[6].result as TaskStatusUpdateEvent;
expect(toolCallAwaitEvent2.metadata?.['coderAgent']).toMatchObject({
kind: 'tool-call-confirmation',
});
expect(toolCallAwaitEvent2.status.message?.parts).toMatchObject([
{
data: {
request: { callId: 'test-call-id-2' },
status: 'awaiting_approval',
},
},
]);
// 8. The final event is "input-required".
const finalEvent = events[7].result as TaskStatusUpdateEvent;
expect(finalEvent.final).toBe(true);
expect(finalEvent.status.state).toBe('input-required');
// The scheduler now waits for approval, so no more events are sent.
assertUniqueFinalEventIsLast(events);
expect(events.length).toBe(7);
expect(events.length).toBe(8);
});
it('should handle multiple tool calls sequentially in YOLO mode', async () => {