diff --git a/packages/core/src/agents/local-invocation.ts b/packages/core/src/agents/local-invocation.ts index 186f015979..2c522c04e4 100644 --- a/packages/core/src/agents/local-invocation.ts +++ b/packages/core/src/agents/local-invocation.ts @@ -90,11 +90,13 @@ export class LocalSubagentInvocation extends BaseToolInvocation< } private publishActivity(activity: SubagentActivityItem): void { - void this.messageBus.publish({ - type: MessageBusType.SUBAGENT_ACTIVITY, - subagentName: this.definition.displayName ?? this.definition.name, - activity, - }); + void this.messageBus + .publish({ + type: MessageBusType.SUBAGENT_ACTIVITY, + subagentName: this.definition.displayName ?? this.definition.name, + activity, + }) + .catch(() => {}); } /** diff --git a/packages/core/src/confirmation-bus/message-bus.test.ts b/packages/core/src/confirmation-bus/message-bus.test.ts index 9e2e43455b..4a05faa24a 100644 --- a/packages/core/src/confirmation-bus/message-bus.test.ts +++ b/packages/core/src/confirmation-bus/message-bus.test.ts @@ -10,6 +10,7 @@ import { PolicyEngine } from '../policy/policy-engine.js'; import { PolicyDecision } from '../policy/types.js'; import { MessageBusType, + type Message, type ToolConfirmationRequest, type ToolConfirmationResponse, type ToolPolicyRejection, @@ -30,8 +31,9 @@ describe('MessageBus', () => { const errorHandler = vi.fn(); messageBus.on('error', errorHandler); - // @ts-expect-error - Testing invalid message - await messageBus.publish({ invalid: 'message' }); + await expect( + messageBus.publish({ invalid: 'message' } as unknown as Message), + ).rejects.toThrow('Invalid message structure'); expect(errorHandler).toHaveBeenCalledWith( expect.objectContaining({ @@ -44,11 +46,12 @@ describe('MessageBus', () => { const errorHandler = vi.fn(); messageBus.on('error', errorHandler); - // @ts-expect-error - Testing missing correlationId - await messageBus.publish({ - type: MessageBusType.TOOL_CONFIRMATION_REQUEST, - toolCall: { name: 'test' }, - }); + await expect( + messageBus.publish({ + type: MessageBusType.TOOL_CONFIRMATION_REQUEST, + toolCall: { name: 'test' }, + } as unknown as Message), + ).rejects.toThrow('Invalid message structure'); expect(errorHandler).toHaveBeenCalled(); }); @@ -251,8 +254,10 @@ describe('MessageBus', () => { correlationId: '123', }; - // Should not throw - await expect(messageBus.publish(request)).resolves.not.toThrow(); + // Should throw + await expect(messageBus.publish(request)).rejects.toThrow( + 'Policy check failed', + ); // Should emit error expect(errorHandler).toHaveBeenCalledWith( @@ -288,11 +293,13 @@ describe('MessageBus', () => { MessageBusType.TOOL_CONFIRMATION_REQUEST, (msg) => { if (msg.subagent === subagentName) { - void messageBus.publish({ - type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, - correlationId: msg.correlationId, - confirmed: true, - }); + void messageBus + .publish({ + type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, + correlationId: msg.correlationId, + confirmed: true, + }) + .catch(() => {}); resolve(); } }, @@ -330,11 +337,13 @@ describe('MessageBus', () => { MessageBusType.TOOL_CONFIRMATION_REQUEST, (msg) => { if (msg.subagent === 'agent1/agent2') { - void messageBus.publish({ - type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, - correlationId: msg.correlationId, - confirmed: true, - }); + void messageBus + .publish({ + type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, + correlationId: msg.correlationId, + confirmed: true, + }) + .catch(() => {}); resolve(); } }, diff --git a/packages/core/src/confirmation-bus/message-bus.ts b/packages/core/src/confirmation-bus/message-bus.ts index a14022ada5..b57ee851f2 100644 --- a/packages/core/src/confirmation-bus/message-bus.ts +++ b/packages/core/src/confirmation-bus/message-bus.ts @@ -144,6 +144,7 @@ export class MessageBus extends EventEmitter { } } catch (error) { this.emit('error', error); + throw error; } } @@ -239,8 +240,11 @@ export class MessageBus extends EventEmitter { this.subscribe(responseType, responseHandler); // Publish the request with correlation ID - // eslint-disable-next-line @typescript-eslint/no-floating-promises, @typescript-eslint/no-unsafe-type-assertion - this.publish({ ...request, correlationId } as TRequest); + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + this.publish({ ...request, correlationId } as TRequest).catch((error) => { + cleanup(); + reject(error); + }); }); } } diff --git a/packages/core/src/scheduler/scheduler.ts b/packages/core/src/scheduler/scheduler.ts index 709bdc2bf5..18e6db5066 100644 --- a/packages/core/src/scheduler/scheduler.ts +++ b/packages/core/src/scheduler/scheduler.ts @@ -13,6 +13,7 @@ import { checkPolicy, updatePolicy, getPolicyDenialError } from './policy.js'; import { evaluateBeforeToolHook } from './hook-utils.js'; import { ToolExecutor } from './tool-executor.js'; import { ToolModificationHandler } from './tool-modifier.js'; +import { debugLogger } from '../utils/debugLogger.js'; import { type ToolCallRequestInfo, type ToolCall, @@ -166,12 +167,16 @@ export class Scheduler { private readonly handleToolConfirmationRequest = async ( request: ToolConfirmationRequest, ) => { - await this.messageBus.publish({ - type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, - correlationId: request.correlationId, - confirmed: false, - requiresUserConfirmation: true, - }); + try { + await this.messageBus.publish({ + type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, + correlationId: request.correlationId, + confirmed: false, + requiresUserConfirmation: true, + }); + } catch (error) { + debugLogger.error('Failed to publish confirmation response', error); + } }; private setupMessageBusListener(messageBus: MessageBus): void { diff --git a/packages/core/src/scheduler/state-manager.ts b/packages/core/src/scheduler/state-manager.ts index c524a139bd..3a852fe4d8 100644 --- a/packages/core/src/scheduler/state-manager.ts +++ b/packages/core/src/scheduler/state-manager.ts @@ -244,11 +244,13 @@ export class SchedulerStateManager { const snapshot = this.getSnapshot(); // Fire and forget - The message bus handles the publish and error handling. - void this.messageBus.publish({ - type: MessageBusType.TOOL_CALLS_UPDATE, - toolCalls: snapshot, - schedulerId: this.schedulerId, - }); + void this.messageBus + .publish({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: snapshot, + schedulerId: this.schedulerId, + }) + .catch(() => {}); } private isTerminalCall(call: ToolCall): call is CompletedToolCall { diff --git a/packages/core/src/tools/tools.ts b/packages/core/src/tools/tools.ts index cd6209079c..7533d80e7e 100644 --- a/packages/core/src/tools/tools.ts +++ b/packages/core/src/tools/tools.ts @@ -242,12 +242,14 @@ export abstract class BaseToolInvocation< ) { if (this._toolName) { const options = this.getPolicyUpdateOptions(outcome); - void this.messageBus.publish({ - type: MessageBusType.UPDATE_POLICY, - toolName: this._toolName, - persist: outcome === ToolConfirmationOutcome.ProceedAlwaysAndSave, - ...options, - }); + void this.messageBus + .publish({ + type: MessageBusType.UPDATE_POLICY, + toolName: this._toolName, + persist: outcome === ToolConfirmationOutcome.ProceedAlwaysAndSave, + ...options, + }) + .catch(() => {}); } } } @@ -361,12 +363,10 @@ export abstract class BaseToolInvocation< ); }; - try { - void this.messageBus.publish(request); - } catch { + this.messageBus.publish(request).catch(() => { cleanup(); resolve('allow'); - } + }); }); }