fix(core): sanitize MessageBus logs and make publish calls resilient

- Added sanitization to MessageBus.publish() logs and error messages using sanitizeToolArgs to prevent secret leakage.

- Refactored floating publish() calls to use a type-safe resiliency pattern (instanceof Promise) to handle test mocks and sync throws.

Fixes CI failures and addresses security review feedback.
This commit is contained in:
A.K.M. Adib
2026-04-30 10:16:29 -04:00
parent ab4c6461db
commit a24eccbf57
6 changed files with 64 additions and 32 deletions
+4 -2
View File
@@ -609,13 +609,15 @@ export class AppRig {
this.removeToolPolicy(pending.toolName);
}
// eslint-disable-next-line @typescript-eslint/no-floating-promises
messageBus.publish({
const p = messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: pending.correlationId,
confirmed: outcome !== ToolConfirmationOutcome.Cancel,
outcome,
});
if (p instanceof Promise) {
p.catch(() => {});
}
});
await act(async () => {
+9 -4
View File
@@ -90,13 +90,18 @@ export class LocalSubagentInvocation extends BaseToolInvocation<
}
private publishActivity(activity: SubagentActivityItem): void {
void this.messageBus
.publish({
try {
const p = this.messageBus.publish({
type: MessageBusType.SUBAGENT_ACTIVITY,
subagentName: this.definition.displayName ?? this.definition.name,
activity,
})
.catch(() => {});
});
if (p instanceof Promise) {
p.catch(() => {});
}
} catch {
// Ignore errors in fire-and-forget activity update
}
}
/**
@@ -293,13 +293,14 @@ describe('MessageBus', () => {
MessageBusType.TOOL_CONFIRMATION_REQUEST,
(msg) => {
if (msg.subagent === subagentName) {
void messageBus
.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
})
.catch(() => {});
const p = messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
});
if (p instanceof Promise) {
p.catch(() => {});
}
resolve();
}
},
@@ -337,13 +338,14 @@ describe('MessageBus', () => {
MessageBusType.TOOL_CONFIRMATION_REQUEST,
(msg) => {
if (msg.subagent === 'agent1/agent2') {
void messageBus
.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
})
.catch(() => {});
const p = messageBus.publish({
type: MessageBusType.TOOL_CONFIRMATION_RESPONSE,
correlationId: msg.correlationId,
confirmed: true,
});
if (p instanceof Promise) {
p.catch(() => {});
}
resolve();
}
},
@@ -11,6 +11,7 @@ import { PolicyDecision } from '../policy/types.js';
import { MessageBusType, type Message } from './types.js';
import { safeJsonStringify } from '../utils/safeJsonStringify.js';
import { debugLogger } from '../utils/debugLogger.js';
import { sanitizeToolArgs } from '../utils/agent-sanitization-utils.js';
export class MessageBus extends EventEmitter {
private listenerToAbortCleanup = new WeakMap<
@@ -78,12 +79,16 @@ export class MessageBus extends EventEmitter {
async publish(message: Message): Promise<void> {
if (this.debug) {
debugLogger.debug(`[MESSAGE_BUS] publish: ${safeJsonStringify(message)}`);
debugLogger.debug(
`[MESSAGE_BUS] publish: ${safeJsonStringify(sanitizeToolArgs(message))}`,
);
}
try {
if (!this.isValidMessage(message)) {
throw new Error(
`Invalid message structure: ${safeJsonStringify(message)}`,
`Invalid message structure: ${safeJsonStringify(
sanitizeToolArgs(message),
)}`,
);
}
+9 -4
View File
@@ -244,13 +244,18 @@ export class SchedulerStateManager {
const snapshot = this.getSnapshot();
// Fire and forget - The message bus handles the publish and error handling.
void this.messageBus
.publish({
try {
const p = this.messageBus.publish({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: snapshot,
schedulerId: this.schedulerId,
})
.catch(() => {});
});
if (p instanceof Promise) {
p.catch(() => {});
}
} catch {
// Ignore errors in fire-and-forget update
}
}
private isTerminalCall(call: ToolCall): call is CompletedToolCall {
+19 -6
View File
@@ -242,14 +242,19 @@ export abstract class BaseToolInvocation<
) {
if (this._toolName) {
const options = this.getPolicyUpdateOptions(outcome);
void this.messageBus
.publish({
try {
const p = this.messageBus.publish({
type: MessageBusType.UPDATE_POLICY,
toolName: this._toolName,
persist: outcome === ToolConfirmationOutcome.ProceedAlwaysAndSave,
...options,
})
.catch(() => {});
});
if (p instanceof Promise) {
p.catch(() => {});
}
} catch {
// Ignore errors in fire-and-forget update
}
}
}
}
@@ -363,10 +368,18 @@ export abstract class BaseToolInvocation<
);
};
this.messageBus.publish(request).catch(() => {
try {
const p = this.messageBus.publish(request);
if (p instanceof Promise) {
p.catch(() => {
cleanup();
resolve('allow');
});
}
} catch {
cleanup();
resolve('allow');
});
}
});
}