fix(core): remediate subagent memory leaks using AbortSignal in MessageBus (#25048)

This commit is contained in:
Abhi
2026-04-09 16:22:26 -04:00
committed by GitHub
parent 20113ee595
commit a7a091360e
4 changed files with 215 additions and 16 deletions

View File

@@ -348,4 +348,66 @@ describe('MessageBus', () => {
);
});
});
describe('subscribe with AbortSignal', () => {
it('should remove listener when signal is aborted', async () => {
const handler = vi.fn();
const controller = new AbortController();
messageBus.subscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler, {
signal: controller.signal,
});
const message: ToolExecutionSuccess<string> = {
type: MessageBusType.TOOL_EXECUTION_SUCCESS as const,
toolCall: { name: 'test' },
result: 'test',
};
controller.abort();
await messageBus.publish(message);
expect(handler).not.toHaveBeenCalled();
});
it('should not add listener if signal is already aborted', async () => {
const handler = vi.fn();
const controller = new AbortController();
controller.abort();
messageBus.subscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler, {
signal: controller.signal,
});
const message: ToolExecutionSuccess<string> = {
type: MessageBusType.TOOL_EXECUTION_SUCCESS as const,
toolCall: { name: 'test' },
result: 'test',
};
await messageBus.publish(message);
expect(handler).not.toHaveBeenCalled();
});
it('should remove abort listener when unsubscribe is called', async () => {
const handler = vi.fn();
const controller = new AbortController();
const signal = controller.signal;
const removeEventListenerSpy = vi.spyOn(signal, 'removeEventListener');
messageBus.subscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler, {
signal,
});
messageBus.unsubscribe(MessageBusType.TOOL_EXECUTION_SUCCESS, handler);
expect(removeEventListenerSpy).toHaveBeenCalledWith(
'abort',
expect.any(Function),
);
});
});
});

View File

@@ -13,6 +13,11 @@ import { safeJsonStringify } from '../utils/safeJsonStringify.js';
import { debugLogger } from '../utils/debugLogger.js';
export class MessageBus extends EventEmitter {
private listenerToAbortCleanup = new WeakMap<
object,
Map<string, () => void>
>();
constructor(
private readonly policyEngine: PolicyEngine,
private readonly debug = false,
@@ -145,7 +150,36 @@ export class MessageBus extends EventEmitter {
subscribe<T extends Message>(
type: T['type'],
listener: (message: T) => void,
options?: { signal?: AbortSignal },
): void {
if (options?.signal) {
const signal = options.signal;
if (signal.aborted) return;
if (this.listenerToAbortCleanup.get(listener)?.has(type)) return;
const abortHandler = () => {
this.off(type, listener);
const typeToCleanup = this.listenerToAbortCleanup.get(listener);
if (typeToCleanup) {
typeToCleanup.delete(type);
if (typeToCleanup.size === 0) {
this.listenerToAbortCleanup.delete(listener);
}
}
};
signal.addEventListener('abort', abortHandler, { once: true });
let typeToCleanup = this.listenerToAbortCleanup.get(listener);
if (!typeToCleanup) {
typeToCleanup = new Map<string, () => void>();
this.listenerToAbortCleanup.set(listener, typeToCleanup);
}
typeToCleanup.set(type, () => {
signal.removeEventListener('abort', abortHandler);
});
}
this.on(type, listener);
}
@@ -154,6 +188,17 @@ export class MessageBus extends EventEmitter {
listener: (message: T) => void,
): void {
this.off(type, listener);
const typeToCleanup = this.listenerToAbortCleanup.get(listener);
if (typeToCleanup) {
const cleanup = typeToCleanup.get(type);
if (cleanup) {
cleanup();
typeToCleanup.delete(type);
}
if (typeToCleanup.size === 0) {
this.listenerToAbortCleanup.delete(listener);
}
}
}
/**

View File

@@ -49,6 +49,7 @@ import { resolveConfirmation } from './confirmation.js';
import { checkPolicy, updatePolicy } from './policy.js';
import { ToolExecutor } from './tool-executor.js';
import { ToolModificationHandler } from './tool-modifier.js';
import { MessageBusType, type Message } from '../confirmation-bus/types.js';
vi.mock('./state-manager.js');
vi.mock('./confirmation.js');
@@ -1299,6 +1300,64 @@ describe('Scheduler (Orchestrator)', () => {
});
});
describe('Fallback Handlers', () => {
it('should respond to TOOL_CONFIRMATION_REQUEST with requiresUserConfirmation: true', async () => {
const listeners: Record<
string,
Array<(message: Message) => void | Promise<void>>
> = {};
const mockBus = {
subscribe: vi.fn(
(
type: string,
handler: (message: Message) => void | Promise<void>,
) => {
listeners[type] = listeners[type] || [];
listeners[type].push(handler);
},
),
publish: vi.fn(async (message: Message) => {
const type = message.type as string;
if (listeners[type]) {
for (const handler of listeners[type]) {
await handler(message);
}
}
}),
} as unknown as MessageBus;
const scheduler = new Scheduler({
context: mockConfig,
messageBus: mockBus,
getPreferredEditor,
schedulerId: 'fallback-test',
});
const handler = vi.fn();
mockBus.subscribe(MessageBusType.TOOL_CONFIRMATION_RESPONSE, handler);
await mockBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_REQUEST,
correlationId: 'test-correlation-id',
toolCall: { name: 'test-tool' },
});
// Wait for async handler to fire
await new Promise((resolve) => setTimeout(resolve, 10));
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
correlationId: 'test-correlation-id',
confirmed: false,
requiresUserConfirmation: true,
}),
);
scheduler.dispose();
});
});
describe('Cleanup', () => {
it('should unregister McpProgress listener on dispose()', () => {
const onSpy = vi.spyOn(coreEvents, 'on');
@@ -1323,6 +1382,40 @@ describe('Scheduler (Orchestrator)', () => {
expect.any(Function),
);
});
it('should abort disposeController signal on dispose()', () => {
const mockSubscribe =
vi.fn<
(
type: unknown,
listener: unknown,
options?: { signal?: AbortSignal },
) => void
>();
const mockBus = {
subscribe: mockSubscribe,
publish: vi.fn(),
} as unknown as MessageBus;
let capturedSignal: AbortSignal | undefined;
mockSubscribe.mockImplementation((type, listener, options) => {
capturedSignal = options?.signal;
});
const s = new Scheduler({
context: mockConfig,
messageBus: mockBus,
getPreferredEditor,
schedulerId: 'cleanup-test-2',
});
expect(capturedSignal).toBeDefined();
expect(capturedSignal?.aborted).toBe(false);
s.dispose();
expect(capturedSignal?.aborted).toBe(true);
});
});
});

View File

@@ -93,8 +93,7 @@ const createErrorResponse = (
* Coordinates execution via state updates and event listening.
*/
export class Scheduler {
// Tracks which MessageBus instances have the legacy listener attached to prevent duplicates.
private static subscribedMessageBuses = new WeakSet<MessageBus>();
private readonly disposeController = new AbortController();
private readonly state: SchedulerStateManager;
private readonly executor: ToolExecutor;
@@ -136,6 +135,7 @@ export class Scheduler {
dispose(): void {
coreEvents.off(CoreEvent.McpProgress, this.handleMcpProgress);
this.disposeController.abort();
}
private readonly handleMcpProgress = (payload: McpProgressPayload) => {
@@ -163,26 +163,25 @@ export class Scheduler {
});
};
private setupMessageBusListener(messageBus: MessageBus): void {
if (Scheduler.subscribedMessageBuses.has(messageBus)) {
return;
}
private readonly handleToolConfirmationRequest = async (
request: ToolConfirmationRequest,
) => {
await this.messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: request.correlationId,
confirmed: false,
requiresUserConfirmation: true,
});
};
private setupMessageBusListener(messageBus: MessageBus): void {
// TODO: Optimize policy checks. Currently, tools check policy via
// MessageBus even though the Scheduler already checked it.
messageBus.subscribe(
MessageBusType.TOOL_CONFIRMATION_REQUEST,
async (request: ToolConfirmationRequest) => {
await messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: request.correlationId,
confirmed: false,
requiresUserConfirmation: true,
});
},
this.handleToolConfirmationRequest,
{ signal: this.disposeController.signal },
);
Scheduler.subscribedMessageBuses.add(messageBus);
}
/**