fix(core): fail fast in MessageBus.request() on publish failure

Link: https://github.com/google-gemini/gemini-cli/issues/22588

Modified MessageBus.publish to re-throw errors and MessageBus.request to catch them, preventing 60s silent hangs. Also updated floating publish calls to prevent unhandled promise rejections.
This commit is contained in:
A.K.M. Adib
2026-04-29 16:21:37 -04:00
parent 25f422d0e4
commit ab4c6461db
6 changed files with 69 additions and 47 deletions
+7 -5
View File
@@ -90,11 +90,13 @@ export class LocalSubagentInvocation extends BaseToolInvocation<
}
private publishActivity(activity: SubagentActivityItem): void {
void this.messageBus.publish({
type: MessageBusType.SUBAGENT_ACTIVITY,
subagentName: this.definition.displayName ?? this.definition.name,
activity,
});
void this.messageBus
.publish({
type: MessageBusType.SUBAGENT_ACTIVITY,
subagentName: this.definition.displayName ?? this.definition.name,
activity,
})
.catch(() => {});
}
/**
@@ -10,6 +10,7 @@ import { PolicyEngine } from '../policy/policy-engine.js';
import { PolicyDecision } from '../policy/types.js';
import {
MessageBusType,
type Message,
type ToolConfirmationRequest,
type ToolConfirmationResponse,
type ToolPolicyRejection,
@@ -30,8 +31,9 @@ describe('MessageBus', () => {
const errorHandler = vi.fn();
messageBus.on('error', errorHandler);
// @ts-expect-error - Testing invalid message
await messageBus.publish({ invalid: 'message' });
await expect(
messageBus.publish({ invalid: 'message' } as unknown as Message),
).rejects.toThrow('Invalid message structure');
expect(errorHandler).toHaveBeenCalledWith(
expect.objectContaining({
@@ -44,11 +46,12 @@ describe('MessageBus', () => {
const errorHandler = vi.fn();
messageBus.on('error', errorHandler);
// @ts-expect-error - Testing missing correlationId
await messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_REQUEST,
toolCall: { name: 'test' },
});
await expect(
messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_REQUEST,
toolCall: { name: 'test' },
} as unknown as Message),
).rejects.toThrow('Invalid message structure');
expect(errorHandler).toHaveBeenCalled();
});
@@ -251,8 +254,10 @@ describe('MessageBus', () => {
correlationId: '123',
};
// Should not throw
await expect(messageBus.publish(request)).resolves.not.toThrow();
// Should throw
await expect(messageBus.publish(request)).rejects.toThrow(
'Policy check failed',
);
// Should emit error
expect(errorHandler).toHaveBeenCalledWith(
@@ -288,11 +293,13 @@ describe('MessageBus', () => {
MessageBusType.TOOL_CONFIRMATION_REQUEST,
(msg) => {
if (msg.subagent === subagentName) {
void messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
});
void messageBus
.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
})
.catch(() => {});
resolve();
}
},
@@ -330,11 +337,13 @@ describe('MessageBus', () => {
MessageBusType.TOOL_CONFIRMATION_REQUEST,
(msg) => {
if (msg.subagent === 'agent1/agent2') {
void messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
});
void messageBus
.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
})
.catch(() => {});
resolve();
}
},
@@ -144,6 +144,7 @@ export class MessageBus extends EventEmitter {
}
} catch (error) {
this.emit('error', error);
throw error;
}
}
@@ -239,8 +240,11 @@ export class MessageBus extends EventEmitter {
this.subscribe<TResponse>(responseType, responseHandler);
// Publish the request with correlation ID
// eslint-disable-next-line @typescript-eslint/no-floating-promises, @typescript-eslint/no-unsafe-type-assertion
this.publish({ ...request, correlationId } as TRequest);
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
this.publish({ ...request, correlationId } as TRequest).catch((error) => {
cleanup();
reject(error);
});
});
}
}
+11 -6
View File
@@ -13,6 +13,7 @@ import { checkPolicy, updatePolicy, getPolicyDenialError } from './policy.js';
import { evaluateBeforeToolHook } from './hook-utils.js';
import { ToolExecutor } from './tool-executor.js';
import { ToolModificationHandler } from './tool-modifier.js';
import { debugLogger } from '../utils/debugLogger.js';
import {
type ToolCallRequestInfo,
type ToolCall,
@@ -166,12 +167,16 @@ export class Scheduler {
private readonly handleToolConfirmationRequest = async (
request: ToolConfirmationRequest,
) => {
await this.messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: request.correlationId,
confirmed: false,
requiresUserConfirmation: true,
});
try {
await this.messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: request.correlationId,
confirmed: false,
requiresUserConfirmation: true,
});
} catch (error) {
debugLogger.error('Failed to publish confirmation response', error);
}
};
private setupMessageBusListener(messageBus: MessageBus): void {
+7 -5
View File
@@ -244,11 +244,13 @@ export class SchedulerStateManager {
const snapshot = this.getSnapshot();
// Fire and forget - The message bus handles the publish and error handling.
void this.messageBus.publish({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: snapshot,
schedulerId: this.schedulerId,
});
void this.messageBus
.publish({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: snapshot,
schedulerId: this.schedulerId,
})
.catch(() => {});
}
private isTerminalCall(call: ToolCall): call is CompletedToolCall {
+10 -10
View File
@@ -242,12 +242,14 @@ export abstract class BaseToolInvocation<
) {
if (this._toolName) {
const options = this.getPolicyUpdateOptions(outcome);
void this.messageBus.publish({
type: MessageBusType.UPDATE_POLICY,
toolName: this._toolName,
persist: outcome === ToolConfirmationOutcome.ProceedAlwaysAndSave,
...options,
});
void this.messageBus
.publish({
type: MessageBusType.UPDATE_POLICY,
toolName: this._toolName,
persist: outcome === ToolConfirmationOutcome.ProceedAlwaysAndSave,
...options,
})
.catch(() => {});
}
}
}
@@ -361,12 +363,10 @@ export abstract class BaseToolInvocation<
);
};
try {
void this.messageBus.publish(request);
} catch {
this.messageBus.publish(request).catch(() => {
cleanup();
resolve('allow');
}
});
});
}