/** * @license * Copyright 2025 Google LLC * SPDX-License-Identifier: Apache-2.0 */ import { randomUUID } from 'node:crypto'; import { EventEmitter } from 'node:events'; import type { PolicyEngine } from '../policy/policy-engine.js'; import { PolicyDecision } from '../policy/types.js'; import { MessageBusType, type Message } from './types.js'; import { safeJsonStringify } from '../utils/safeJsonStringify.js'; import { debugLogger } from '../utils/debugLogger.js'; export class MessageBus extends EventEmitter { constructor( private readonly policyEngine: PolicyEngine, private readonly debug = false, ) { super(); this.debug = debug; } private isValidMessage(message: Message): boolean { if (!message || !message.type) { return false; } if ( message.type === MessageBusType.TOOL_CONFIRMATION_REQUEST && !('correlationId' in message) ) { return false; } return true; } private emitMessage(message: Message): void { this.emit(message.type, message); } /** * Derives a child message bus scoped to a specific subagent. */ derive(subagentName: string): MessageBus { const bus = new MessageBus(this.policyEngine, this.debug); bus.publish = async (message: Message) => { if (message.type === MessageBusType.TOOL_CONFIRMATION_REQUEST) { return this.publish({ ...message, subagent: message.subagent ? `${subagentName}/${message.subagent}` : subagentName, }); } return this.publish(message); }; // Delegate subscription methods to the parent bus bus.subscribe = this.subscribe.bind(this); bus.unsubscribe = this.unsubscribe.bind(this); bus.on = this.on.bind(this); bus.off = this.off.bind(this); bus.emit = this.emit.bind(this); bus.once = this.once.bind(this); bus.removeListener = this.removeListener.bind(this); bus.listenerCount = this.listenerCount.bind(this); return bus; } async publish(message: Message): Promise { if (this.debug) { debugLogger.debug(`[MESSAGE_BUS] publish: ${safeJsonStringify(message)}`); } try { if (!this.isValidMessage(message)) { throw new Error( `Invalid message structure: ${safeJsonStringify(message)}`, ); } if (message.type === MessageBusType.TOOL_CONFIRMATION_REQUEST) { const { decision: policyDecision } = await this.policyEngine.check( message.toolCall, message.serverName, message.toolAnnotations, message.subagent, ); const decision = message.forcedDecision ?? policyDecision; switch (decision) { case PolicyDecision.ALLOW: // Directly emit the response instead of recursive publish this.emitMessage({ type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, correlationId: message.correlationId, confirmed: true, }); break; case PolicyDecision.DENY: // Emit both rejection and response messages this.emitMessage({ type: MessageBusType.TOOL_POLICY_REJECTION, toolCall: message.toolCall, }); this.emitMessage({ type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, correlationId: message.correlationId, confirmed: false, }); break; case PolicyDecision.ASK_USER: // Pass through to UI for user confirmation if any listeners exist. // If no listeners are registered (e.g., headless/ACP flows), // immediately request user confirmation to avoid long timeouts. if ( this.listenerCount(MessageBusType.TOOL_CONFIRMATION_REQUEST) > 0 ) { this.emitMessage(message); } else { this.emitMessage({ type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, correlationId: message.correlationId, confirmed: false, requiresUserConfirmation: true, }); } break; default: throw new Error(`Unknown policy decision: ${decision}`); } } else { // For all other message types, just emit them this.emitMessage(message); } } catch (error) { this.emit('error', error); } } subscribe( type: T['type'], listener: (message: T) => void, ): void { this.on(type, listener); } unsubscribe( type: T['type'], listener: (message: T) => void, ): void { this.off(type, listener); } /** * Request-response pattern: Publish a message and wait for a correlated response * This enables synchronous-style communication over the async MessageBus * The correlation ID is generated internally and added to the request */ async request( request: Omit, responseType: TResponse['type'], timeoutMs: number = 60000, ): Promise { const correlationId = randomUUID(); return new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { cleanup(); reject(new Error(`Request timed out waiting for ${responseType}`)); }, timeoutMs); const cleanup = () => { clearTimeout(timeoutId); this.unsubscribe(responseType, responseHandler); }; const responseHandler = (response: TResponse) => { // Check if this response matches our request if ( 'correlationId' in response && response.correlationId === correlationId ) { cleanup(); resolve(response); } }; // Subscribe to responses this.subscribe(responseType, responseHandler); // Publish the request with correlation ID // eslint-disable-next-line @typescript-eslint/no-floating-promises, @typescript-eslint/no-unsafe-type-assertion this.publish({ ...request, correlationId } as TRequest); }); } }