From a7a091360ef537851a99758048007414d84b34d2 Mon Sep 17 00:00:00 2001 From: Abhi <43648792+abhipatel12@users.noreply.github.com> Date: Thu, 9 Apr 2026 16:22:26 -0400 Subject: [PATCH] fix(core): remediate subagent memory leaks using AbortSignal in MessageBus (#25048) --- .../src/confirmation-bus/message-bus.test.ts | 62 +++++++++++++ .../core/src/confirmation-bus/message-bus.ts | 45 +++++++++ packages/core/src/scheduler/scheduler.test.ts | 93 +++++++++++++++++++ packages/core/src/scheduler/scheduler.ts | 31 +++---- 4 files changed, 215 insertions(+), 16 deletions(-) diff --git a/packages/core/src/confirmation-bus/message-bus.test.ts b/packages/core/src/confirmation-bus/message-bus.test.ts index 8f5c51d7d5..9e2e43455b 100644 --- a/packages/core/src/confirmation-bus/message-bus.test.ts +++ b/packages/core/src/confirmation-bus/message-bus.test.ts @@ -348,4 +348,66 @@ describe('MessageBus', () => { ); }); }); + + describe('subscribe with AbortSignal', () => { + it('should remove listener when signal is aborted', async () => { + const handler = vi.fn(); + const controller = new AbortController(); + + messageBus.subscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler, { + signal: controller.signal, + }); + + const message: ToolExecutionSuccess = { + type: MessageBusType.TOOL_EXECUTION_SUCCESS as const, + toolCall: { name: 'test' }, + result: 'test', + }; + + controller.abort(); + + await messageBus.publish(message); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('should not add listener if signal is already aborted', async () => { + const handler = vi.fn(); + const controller = new AbortController(); + controller.abort(); + + messageBus.subscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler, { + signal: controller.signal, + }); + + const message: ToolExecutionSuccess = { + type: MessageBusType.TOOL_EXECUTION_SUCCESS as const, + toolCall: { name: 'test' }, + result: 'test', + }; + + await messageBus.publish(message); + + expect(handler).not.toHaveBeenCalled(); + }); + + it('should remove abort listener when unsubscribe is called', async () => { + const handler = vi.fn(); + const controller = new AbortController(); + const signal = controller.signal; + + const removeEventListenerSpy = vi.spyOn(signal, 'removeEventListener'); + + messageBus.subscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler, { + signal, + }); + + messageBus.unsubscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler); + + expect(removeEventListenerSpy).toHaveBeenCalledWith( + 'abort', + expect.any(Function), + ); + }); + }); }); diff --git a/packages/core/src/confirmation-bus/message-bus.ts b/packages/core/src/confirmation-bus/message-bus.ts index 72f1c1c15a..a14022ada5 100644 --- a/packages/core/src/confirmation-bus/message-bus.ts +++ b/packages/core/src/confirmation-bus/message-bus.ts @@ -13,6 +13,11 @@ import { safeJsonStringify } from '../utils/safeJsonStringify.js'; import { debugLogger } from '../utils/debugLogger.js'; export class MessageBus extends EventEmitter { + private listenerToAbortCleanup = new WeakMap< + object, + Map void> + >(); + constructor( private readonly policyEngine: PolicyEngine, private readonly debug = false, @@ -145,7 +150,36 @@ export class MessageBus extends EventEmitter { subscribe( type: T['type'], listener: (message: T) => void, + options?: { signal?: AbortSignal }, ): void { + if (options?.signal) { + const signal = options.signal; + if (signal.aborted) return; + + if (this.listenerToAbortCleanup.get(listener)?.has(type)) return; + + const abortHandler = () => { + this.off(type, listener); + const typeToCleanup = this.listenerToAbortCleanup.get(listener); + if (typeToCleanup) { + typeToCleanup.delete(type); + if (typeToCleanup.size === 0) { + this.listenerToAbortCleanup.delete(listener); + } + } + }; + signal.addEventListener('abort', abortHandler, { once: true }); + + let typeToCleanup = this.listenerToAbortCleanup.get(listener); + if (!typeToCleanup) { + typeToCleanup = new Map void>(); + this.listenerToAbortCleanup.set(listener, typeToCleanup); + } + typeToCleanup.set(type, () => { + signal.removeEventListener('abort', abortHandler); + }); + } + this.on(type, listener); } @@ -154,6 +188,17 @@ export class MessageBus extends EventEmitter { listener: (message: T) => void, ): void { this.off(type, listener); + const typeToCleanup = this.listenerToAbortCleanup.get(listener); + if (typeToCleanup) { + const cleanup = typeToCleanup.get(type); + if (cleanup) { + cleanup(); + typeToCleanup.delete(type); + } + if (typeToCleanup.size === 0) { + this.listenerToAbortCleanup.delete(listener); + } + } } /** diff --git a/packages/core/src/scheduler/scheduler.test.ts b/packages/core/src/scheduler/scheduler.test.ts index e0fe7b873c..aaa5d48f5d 100644 --- a/packages/core/src/scheduler/scheduler.test.ts +++ b/packages/core/src/scheduler/scheduler.test.ts @@ -49,6 +49,7 @@ import { resolveConfirmation } from './confirmation.js'; import { checkPolicy, updatePolicy } from './policy.js'; import { ToolExecutor } from './tool-executor.js'; import { ToolModificationHandler } from './tool-modifier.js'; +import { MessageBusType, type Message } from '../confirmation-bus/types.js'; vi.mock('./state-manager.js'); vi.mock('./confirmation.js'); @@ -1299,6 +1300,64 @@ describe('Scheduler (Orchestrator)', () => { }); }); + describe('Fallback Handlers', () => { + it('should respond to TOOL_CONFIRMATION_REQUEST with requiresUserConfirmation: true', async () => { + const listeners: Record< + string, + Array<(message: Message) => void | Promise> + > = {}; + + const mockBus = { + subscribe: vi.fn( + ( + type: string, + handler: (message: Message) => void | Promise, + ) => { + listeners[type] = listeners[type] || []; + listeners[type].push(handler); + }, + ), + publish: vi.fn(async (message: Message) => { + const type = message.type as string; + if (listeners[type]) { + for (const handler of listeners[type]) { + await handler(message); + } + } + }), + } as unknown as MessageBus; + + const scheduler = new Scheduler({ + context: mockConfig, + messageBus: mockBus, + getPreferredEditor, + schedulerId: 'fallback-test', + }); + + const handler = vi.fn(); + mockBus.subscribe(MessageBusType.TOOL_CONFIRMATION_RESPONSE, handler); + + await mockBus.publish({ + type: MessageBusType.TOOL_CONFIRMATION_REQUEST, + correlationId: 'test-correlation-id', + toolCall: { name: 'test-tool' }, + }); + + // Wait for async handler to fire + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + correlationId: 'test-correlation-id', + confirmed: false, + requiresUserConfirmation: true, + }), + ); + + scheduler.dispose(); + }); + }); + describe('Cleanup', () => { it('should unregister McpProgress listener on dispose()', () => { const onSpy = vi.spyOn(coreEvents, 'on'); @@ -1323,6 +1382,40 @@ describe('Scheduler (Orchestrator)', () => { expect.any(Function), ); }); + + it('should abort disposeController signal on dispose()', () => { + const mockSubscribe = + vi.fn< + ( + type: unknown, + listener: unknown, + options?: { signal?: AbortSignal }, + ) => void + >(); + const mockBus = { + subscribe: mockSubscribe, + publish: vi.fn(), + } as unknown as MessageBus; + + let capturedSignal: AbortSignal | undefined; + mockSubscribe.mockImplementation((type, listener, options) => { + capturedSignal = options?.signal; + }); + + const s = new Scheduler({ + context: mockConfig, + messageBus: mockBus, + getPreferredEditor, + schedulerId: 'cleanup-test-2', + }); + + expect(capturedSignal).toBeDefined(); + expect(capturedSignal?.aborted).toBe(false); + + s.dispose(); + + expect(capturedSignal?.aborted).toBe(true); + }); }); }); diff --git a/packages/core/src/scheduler/scheduler.ts b/packages/core/src/scheduler/scheduler.ts index 2f95748597..fef22968e1 100644 --- a/packages/core/src/scheduler/scheduler.ts +++ b/packages/core/src/scheduler/scheduler.ts @@ -93,8 +93,7 @@ const createErrorResponse = ( * Coordinates execution via state updates and event listening. */ export class Scheduler { - // Tracks which MessageBus instances have the legacy listener attached to prevent duplicates. - private static subscribedMessageBuses = new WeakSet(); + private readonly disposeController = new AbortController(); private readonly state: SchedulerStateManager; private readonly executor: ToolExecutor; @@ -136,6 +135,7 @@ export class Scheduler { dispose(): void { coreEvents.off(CoreEvent.McpProgress, this.handleMcpProgress); + this.disposeController.abort(); } private readonly handleMcpProgress = (payload: McpProgressPayload) => { @@ -163,26 +163,25 @@ export class Scheduler { }); }; - private setupMessageBusListener(messageBus: MessageBus): void { - if (Scheduler.subscribedMessageBuses.has(messageBus)) { - return; - } + private readonly handleToolConfirmationRequest = async ( + request: ToolConfirmationRequest, + ) => { + await this.messageBus.publish({ + type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, + correlationId: request.correlationId, + confirmed: false, + requiresUserConfirmation: true, + }); + }; + private setupMessageBusListener(messageBus: MessageBus): void { // TODO: Optimize policy checks. Currently, tools check policy via // MessageBus even though the Scheduler already checked it. messageBus.subscribe( MessageBusType.TOOL_CONFIRMATION_REQUEST, - async (request: ToolConfirmationRequest) => { - await messageBus.publish({ - type: MessageBusType.TOOL_CONFIRMATION_RESPONSE, - correlationId: request.correlationId, - confirmed: false, - requiresUserConfirmation: true, - }); - }, + this.handleToolConfirmationRequest, + { signal: this.disposeController.signal }, ); - - Scheduler.subscribedMessageBuses.add(messageBus); } /**