diff --git a/packages/core/src/confirmation-bus/message-bus.test.ts b/packages/core/src/confirmation-bus/message-bus.test.ts index 4708c41d4c..24cd4392fa 100644 --- a/packages/core/src/confirmation-bus/message-bus.test.ts +++ b/packages/core/src/confirmation-bus/message-bus.test.ts @@ -8,6 +8,7 @@ import { describe, it, expect, beforeEach, vi } from 'vitest'; import { MessageBus } from './message-bus.js'; import { PolicyEngine } from '../policy/policy-engine.js'; import { PolicyDecision } from '../policy/types.js'; +import { debugLogger } from '../utils/debugLogger.js'; import { MessageBusType, type Message, @@ -167,6 +168,62 @@ describe('MessageBus', () => { ); }); + it('should sanitize sensitive data in error messages', async () => { + const errorHandler = vi.fn(); + messageBus.on('error', errorHandler); + + const invalidMessage = { + type: MessageBusType.TOOL_CONFIRMATION_REQUEST, + toolCall: { + name: 'test-tool', + args: { password: 'secret-password' }, + }, + // missing correlationId makes it invalid + } as unknown as Message; + + await expect(messageBus.publish(invalidMessage)).rejects.toThrow( + /\[REDACTED\]/, + ); + await expect(messageBus.publish(invalidMessage)).rejects.not.toThrow( + /secret-password/, + ); + + expect(errorHandler).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringContaining('[REDACTED]'), + }), + ); + expect(errorHandler).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.not.stringContaining('secret-password'), + }), + ); + }); + + it('should sanitize sensitive data in debug logs', async () => { + const debugSpy = vi.spyOn(debugLogger, 'debug'); + // @ts-expect-error - access private member for test + messageBus.debug = true; + + const message: ToolExecutionSuccess = { + type: MessageBusType.TOOL_EXECUTION_SUCCESS as const, + toolCall: { + name: 'test-tool', + args: { api_key: 'my-api-key' }, + }, + result: 'success', + }; + + await messageBus.publish(message); + + expect(debugSpy).toHaveBeenCalledWith( + expect.stringContaining('[REDACTED]'), + ); + expect(debugSpy).toHaveBeenCalledWith( + expect.not.stringContaining('my-api-key'), + ); + }); + it('should emit other message types directly', async () => { const successHandler = vi.fn(); messageBus.subscribe( @@ -268,6 +325,101 @@ describe('MessageBus', () => { }); }); + describe('request', () => { + it('should fail fast if publish fails', async () => { + // Mock publish to throw + vi.spyOn(messageBus, 'publish').mockRejectedValue( + new Error('Publish failed'), + ); + + const request: Omit = { + type: MessageBusType.TOOL_CONFIRMATION_REQUEST, + toolCall: { name: 'test-tool', args: {} }, + }; + + const start = Date.now(); + const requestPromise = messageBus.request( + request, + MessageBusType.TOOL_CONFIRMATION_RESPONSE, + 60000, + ); + + await expect(requestPromise).rejects.toThrow('Publish failed'); + const duration = Date.now() - start; + + // Should have failed way before the 60s timeout + expect(duration).toBeLessThan(1000); + }); + + it('should handle timeout if no response is received', async () => { + const request: Message = { + type: MessageBusType.SUBAGENT_ACTIVITY, + subagentName: 'test', + activity: { + id: '1', + type: 'thought', + status: 'running', + content: 'thinking', + }, + }; + + const requestPromise = messageBus.request( + request, + MessageBusType.ASK_USER_RESPONSE, + 10, // 10ms timeout + ); + + await expect(requestPromise).rejects.toThrow( + /Request timed out waiting for ask-user-response/, + ); + }); + }); + + describe('pattern resiliency', () => { + it('should handle publish returning non-promise (mock behavior)', () => { + // @ts-expect-error - mock returning undefined + vi.spyOn(messageBus, 'publish').mockReturnValue(undefined); + + const publishAndCatch = () => { + const p = messageBus.publish({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [], + schedulerId: 'test', + } as Message); + + if (p instanceof Promise) { + p.catch(() => {}); + } + }; + + expect(publishAndCatch).not.toThrow(); + }); + + it('should handle publish throwing synchronously (mock behavior)', () => { + vi.spyOn(messageBus, 'publish').mockImplementation(() => { + throw new Error('Sync throw'); + }); + + const publishAndCatch = () => { + try { + const p = messageBus.publish({ + type: MessageBusType.TOOL_CALLS_UPDATE, + toolCalls: [], + schedulerId: 'test', + } as Message); + + if (p instanceof Promise) { + p.catch(() => {}); + } + } catch { + // handled + } + }; + + expect(publishAndCatch).not.toThrow(); + }); + }); + describe('derive', () => { it('should receive responses from parent bus on derived bus', async () => { vi.spyOn(policyEngine, 'check').mockResolvedValue({