diff --git a/packages/a2a-server/src/agent/task-event-driven.test.ts b/packages/a2a-server/src/agent/task-event-driven.test.ts index 86436fa811..5fc548a8f4 100644 --- a/packages/a2a-server/src/agent/task-event-driven.test.ts +++ b/packages/a2a-server/src/agent/task-event-driven.test.ts @@ -66,6 +66,7 @@ describe('Task Event-Driven Scheduler', () => { handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall], + schedulerId: 'task-id', }); expect(mockEventBus.publish).toHaveBeenCalledWith( @@ -106,6 +107,7 @@ describe('Task Event-Driven Scheduler', () => { handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall], + schedulerId: 'task-id', }); // Simulate A2A client confirmation @@ -148,7 +150,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (messageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); // Simulate Rejection (Cancel) const handled = await ( @@ -174,7 +180,11 @@ describe('Task Event-Driven Scheduler', () => { correlationId: 'corr-2', confirmationDetails: { type: 'info', title: 'test', prompt: 'test' }, }; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall2] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall2], + schedulerId: 'task-id', + }); // Simulate ModifyWithEditor const handled2 = await ( @@ -215,7 +225,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (messageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); // Simulate ProceedOnce for MCP const handled = await ( @@ -255,7 +269,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (messageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); const handled = await ( task as unknown as { @@ -294,7 +312,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (messageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); const handled = await ( task as unknown as { @@ -333,7 +355,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (messageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); const handled = await ( task as unknown as { @@ -376,7 +402,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (yoloMessageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); // Should NOT auto-publish ProceedOnce anymore, because PolicyEngine handles it directly expect(yoloMessageBus.publish).not.toHaveBeenCalledWith( @@ -419,6 +449,7 @@ describe('Task Event-Driven Scheduler', () => { handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall], + schedulerId: 'task-id', }); // Should publish artifact update for output @@ -453,7 +484,11 @@ describe('Task Event-Driven Scheduler', () => { const handler = (messageBus.subscribe as Mock).mock.calls.find( (call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE, )?.[1]; - handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall] }); + handler({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [toolCall], + schedulerId: 'task-id', + }); // The tool should be complete and registered appropriately, eventually // triggering the toolCompletionPromise resolution when all clear. @@ -533,6 +568,7 @@ describe('Task Event-Driven Scheduler', () => { handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall1, toolCall2], + schedulerId: 'task-id', }); // Confirm first tool call @@ -600,6 +636,7 @@ describe('Task Event-Driven Scheduler', () => { handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall1, toolCall2], + schedulerId: 'task-id', }); // Should NOT transition to input-required yet @@ -621,6 +658,7 @@ describe('Task Event-Driven Scheduler', () => { handler({ type: MessageBusType.TOOL_CALLS_UPDATE, toolCalls: [toolCall1Complete, toolCall2], + schedulerId: 'task-id', }); // Now it should transition diff --git a/packages/a2a-server/src/agent/task.test.ts b/packages/a2a-server/src/agent/task.test.ts index 26039ae3aa..27e6c28fed 100644 --- a/packages/a2a-server/src/agent/task.test.ts +++ b/packages/a2a-server/src/agent/task.test.ts @@ -460,4 +460,110 @@ describe('Task', () => { expect(task.currentPromptId).toBe(expectedPromptId2); }); }); + + describe('Race Condition Fix', () => { + const mockConfig = createMockConfig(); + const mockEventBus: ExecutionEventBus = { + publish: vi.fn(), + on: vi.fn(), + off: vi.fn(), + once: vi.fn(), + removeAllListeners: vi.fn(), + finished: vi.fn(), + }; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('should NOT transition to input-required if a tool is still validating', async () => { + // @ts-expect-error - Calling private constructor + const task = new Task( + 'task-id', + 'context-id', + mockConfig as Config, + mockEventBus, + ); + + // Manually register two tool calls + task['_registerToolCall']('tool-1', 'awaiting_approval'); + task['_registerToolCall']('tool-2', 'validating'); + + // Call checkInputRequiredState (private) + task['checkInputRequiredState'](); + + // Verify task state did NOT change to input-required + expect(task.taskState).not.toBe('input-required'); + expect(mockEventBus.publish).not.toHaveBeenCalledWith( + expect.objectContaining({ + status: expect.objectContaining({ state: 'input-required' }), + }), + ); + }); + + it('should transition to input-required if all active tools are awaiting approval', async () => { + // @ts-expect-error - Calling private constructor + const task = new Task( + 'task-id', + 'context-id', + mockConfig as Config, + mockEventBus, + ); + + // Transition from submitted to working first to simulate normal flow + task.taskState = 'working'; + + // Manually register tool calls + task['_registerToolCall']('tool-1', 'awaiting_approval'); + + // Call checkInputRequiredState + task['checkInputRequiredState'](); + + // Verify task state changed to input-required + expect(task.taskState).toBe('input-required'); + expect(mockEventBus.publish).toHaveBeenCalledWith( + expect.objectContaining({ + status: expect.objectContaining({ state: 'input-required' }), + }), + ); + }); + + it('handleEventDrivenToolCallsUpdate should ignore events for other schedulers', async () => { + // @ts-expect-error - Calling private constructor + const task = new Task( + 'task-id', + 'context-id', + mockConfig as Config, + mockEventBus, + ); + + const handleEventDrivenToolCallSpy = vi.spyOn( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + task as any, + 'handleEventDrivenToolCall', + ); + + const otherEvent = { + type: 'tool-calls-update', + toolCalls: [{ request: { callId: '1' }, status: 'executing' }], + schedulerId: 'other-task-id', + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + task['handleEventDrivenToolCallsUpdate'](otherEvent as any); + + expect(handleEventDrivenToolCallSpy).not.toHaveBeenCalled(); + + const ownEvent = { + type: 'tool-calls-update', + toolCalls: [{ request: { callId: '1' }, status: 'executing' }], + schedulerId: 'task-id', + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + task['handleEventDrivenToolCallsUpdate'](ownEvent as any); + + expect(handleEventDrivenToolCallSpy).toHaveBeenCalled(); + }); + }); }); diff --git a/packages/a2a-server/src/agent/task.ts b/packages/a2a-server/src/agent/task.ts index a76054263f..6e87a033dd 100644 --- a/packages/a2a-server/src/agent/task.ts +++ b/packages/a2a-server/src/agent/task.ts @@ -413,7 +413,10 @@ export class Task { private handleEventDrivenToolCallsUpdate( event: ToolCallsUpdateMessage, ): void { - if (event.type !== MessageBusType.TOOL_CALLS_UPDATE) { + if ( + event.type !== MessageBusType.TOOL_CALLS_UPDATE || + event.schedulerId !== this.id + ) { return; } @@ -508,7 +511,11 @@ export class Task { let isExecuting = false; for (const [callId, status] of this.pendingToolCalls.entries()) { - if (status === 'executing' || status === 'scheduled') { + if ( + status === 'executing' || + status === 'scheduled' || + status === 'validating' + ) { isExecuting = true; } else if ( status === 'awaiting_approval' && diff --git a/packages/a2a-server/src/http/app.test.ts b/packages/a2a-server/src/http/app.test.ts index 4a883992b5..ac4568ba3d 100644 --- a/packages/a2a-server/src/http/app.test.ts +++ b/packages/a2a-server/src/http/app.test.ts @@ -381,11 +381,11 @@ describe('E2E Tests', () => { ]); // 6. Tool 1 is awaiting approval. - const toolCallAwaitEvent = events[5].result as TaskStatusUpdateEvent; - expect(toolCallAwaitEvent.metadata?.['coderAgent']).toMatchObject({ + const toolCallAwaitEvent1 = events[5].result as TaskStatusUpdateEvent; + expect(toolCallAwaitEvent1.metadata?.['coderAgent']).toMatchObject({ kind: 'tool-call-confirmation', }); - expect(toolCallAwaitEvent.status.message?.parts).toMatchObject([ + expect(toolCallAwaitEvent1.status.message?.parts).toMatchObject([ { data: { request: { callId: 'test-call-id-1' }, @@ -394,14 +394,28 @@ describe('E2E Tests', () => { }, ]); - // 7. The final event is "input-required". - const finalEvent = events[6].result as TaskStatusUpdateEvent; + // 7. Tool 2 is awaiting approval. + const toolCallAwaitEvent2 = events[6].result as TaskStatusUpdateEvent; + expect(toolCallAwaitEvent2.metadata?.['coderAgent']).toMatchObject({ + kind: 'tool-call-confirmation', + }); + expect(toolCallAwaitEvent2.status.message?.parts).toMatchObject([ + { + data: { + request: { callId: 'test-call-id-2' }, + status: 'awaiting_approval', + }, + }, + ]); + + // 8. The final event is "input-required". + const finalEvent = events[7].result as TaskStatusUpdateEvent; expect(finalEvent.final).toBe(true); expect(finalEvent.status.state).toBe('input-required'); // The scheduler now waits for approval, so no more events are sent. assertUniqueFinalEventIsLast(events); - expect(events.length).toBe(7); + expect(events.length).toBe(8); }); it('should handle multiple tool calls sequentially in YOLO mode', async () => {