fix(core): bound memory usage in CoreEventEmitter backlog to prevent OOM

This commit is contained in:
Spencer
2026-03-19 05:02:03 +00:00
parent 1725ec346b
commit ae6d39d9f8
2 changed files with 291 additions and 5 deletions

View File

@@ -425,4 +425,159 @@ describe('CoreEventEmitter', () => {
expect(listener).toHaveBeenCalledExactlyOnceWith(payload);
});
});
describe('Memory-bounded Backlog', () => {
it('should strip older events when MAX_BACKLOG_PAYLOAD_BYTES is exceeded', () => {
// Set small budget: 64KB
(
events as unknown as {
_setBudgetsForTesting: (
maxBacklog: number,
maxSingle: number,
) => void;
_currentBacklogPayloadBytes: number;
}
)._setBudgetsForTesting(64 * 1024, 1024 * 1024);
const payloadSize = 20 * 1024; // 20KB each
const chunk = 'a'.repeat(payloadSize);
// Emit 5 events (total 100KB > 64KB)
for (let i = 0; i < 5; i++) {
events.emitOutput(false, chunk);
}
const listener = vi.fn();
events.on(CoreEvent.Output, listener);
events.drainBacklogs();
expect(listener).toHaveBeenCalledTimes(5);
// The first 2 events should be stripped to stay within the 64KB budget
expect(listener.mock.calls[0][0]).toMatchObject({
chunk: '',
omitted: true,
originalByteLength: payloadSize,
});
expect(listener.mock.calls[1][0]).toMatchObject({
chunk: '',
omitted: true,
originalByteLength: payloadSize,
});
// The last 3 events should be intact (3 * 20KB = 60KB <= 64KB)
expect(listener.mock.calls[2][0]).toMatchObject({ chunk });
expect(listener.mock.calls[3][0]).toMatchObject({ chunk });
expect(listener.mock.calls[4][0]).toMatchObject({ chunk });
// Verify internal byte counter is reset
expect(
(
events as unknown as {
_setBudgetsForTesting: (
maxBacklog: number,
maxSingle: number,
) => void;
_currentBacklogPayloadBytes: number;
}
)._currentBacklogPayloadBytes,
).toBe(0);
});
it('should truncate a single huge event that exceeds MAX_SINGLE_EVENT_PAYLOAD_BYTES', () => {
(
events as unknown as {
_setBudgetsForTesting: (
maxBacklog: number,
maxSingle: number,
) => void;
_currentBacklogPayloadBytes: number;
}
)._setBudgetsForTesting(10 * 1024 * 1024, 1 * 1024 * 1024); // 1MB single event budget
const payloadSize = 5 * 1024 * 1024; // 5MB
const chunk = 'b'.repeat(payloadSize);
events.emitOutput(false, chunk);
const listener = vi.fn();
events.on(CoreEvent.Output, listener);
events.drainBacklogs();
expect(listener).toHaveBeenCalledTimes(1);
const output = listener.mock.calls[0][0];
// Should be truncated to 1MB
expect(output.chunk.length).toBe(1 * 1024 * 1024);
expect(output.truncated).toBe(true);
expect(output.originalByteLength).toBe(payloadSize);
expect(
(
events as unknown as {
_setBudgetsForTesting: (
maxBacklog: number,
maxSingle: number,
) => void;
_currentBacklogPayloadBytes: number;
}
)._currentBacklogPayloadBytes,
).toBe(0);
});
it('should preserve MAX_BACKLOG_SIZE events while staying within memory budget', () => {
(
events as unknown as {
_setBudgetsForTesting: (
maxBacklog: number,
maxSingle: number,
) => void;
_currentBacklogPayloadBytes: number;
}
)._setBudgetsForTesting(64 * 1024, 1024 * 1024);
const MAX_BACKLOG_SIZE = 10000;
const smallChunk = 'c'.repeat(10); // 10 bytes
// Emit enough to fill the backlog entirely
for (let i = 0; i < MAX_BACKLOG_SIZE; i++) {
events.emitOutput(false, smallChunk);
}
// Total bytes = 100,000 > 65,536 (64KB budget)
// Older items will be stripped to respect the memory budget.
const listener = vi.fn();
events.on(CoreEvent.Output, listener);
events.drainBacklogs();
expect(listener).toHaveBeenCalledTimes(MAX_BACKLOG_SIZE);
// Verify the first item was stripped
expect(listener.mock.calls[0][0]).toMatchObject({
chunk: '',
omitted: true,
originalByteLength: 10,
});
// Verify the latest items are preserved
expect(listener.mock.calls[MAX_BACKLOG_SIZE - 1][0]).toMatchObject({
chunk: smallChunk,
});
expect(
(
events as unknown as {
_setBudgetsForTesting: (
maxBacklog: number,
maxSingle: number,
) => void;
_currentBacklogPayloadBytes: number;
}
)._currentBacklogPayloadBytes,
).toBe(0);
});
});
});

View File

@@ -25,6 +25,9 @@ export type FeedbackSeverity = 'info' | 'warning' | 'error';
* Payload for the 'user-feedback' event.
*/
export interface UserFeedbackPayload {
truncated?: boolean;
originalByteLength?: number;
omitted?: boolean;
/**
* The severity level determines how the message is rendered in the UI
* (e.g. colored text, specific icon).
@@ -56,6 +59,9 @@ export interface ModelChangedPayload {
* Payload for the 'console-log' event.
*/
export interface ConsoleLogPayload {
truncated?: boolean;
originalByteLength?: number;
omitted?: boolean;
type: 'log' | 'warn' | 'error' | 'debug' | 'info';
content: string;
}
@@ -64,6 +70,9 @@ export interface ConsoleLogPayload {
* Payload for the 'output' event.
*/
export interface OutputPayload {
truncated?: boolean;
originalByteLength?: number;
omitted?: boolean;
isStderr: boolean;
chunk: Uint8Array | string;
encoding?: BufferEncoding;
@@ -239,19 +248,140 @@ export class CoreEventEmitter extends EventEmitter<CoreEvents> {
private _backlogHead = 0;
private static readonly MAX_BACKLOG_SIZE = 10000;
// Modifiable for tests, defaults based on memory requirements
protected MAX_BACKLOG_PAYLOAD_BYTES = 50 * 1024 * 1024;
protected MAX_SINGLE_EVENT_PAYLOAD_BYTES = 1 * 1024 * 1024;
private _currentBacklogPayloadBytes = 0;
constructor() {
super();
}
// Exposed purely for testing to override budget defaults
_setBudgetsForTesting(maxBacklog: number, maxSingle: number): void {
this.MAX_BACKLOG_PAYLOAD_BYTES = maxBacklog;
this.MAX_SINGLE_EVENT_PAYLOAD_BYTES = maxSingle;
}
private _getPayloadBytes(item: EventBacklogItem): number {
if (item.event === CoreEvent.Output) {
const payload = item.args[0];
if (typeof payload.chunk === 'string') {
return Buffer.byteLength(payload.chunk, 'utf8');
} else {
return payload.chunk.byteLength;
}
} else if (item.event === CoreEvent.ConsoleLog) {
const payload = item.args[0];
return Buffer.byteLength(payload.content, 'utf8');
} else if (item.event === CoreEvent.UserFeedback) {
const payload = item.args[0];
return Buffer.byteLength(payload.message, 'utf8');
}
return 0;
}
private _truncatePayloadIfNeeded(item: EventBacklogItem): void {
const bytes = this._getPayloadBytes(item);
if (bytes > this.MAX_SINGLE_EVENT_PAYLOAD_BYTES) {
if (item.event === CoreEvent.Output) {
const payload = item.args[0];
payload.originalByteLength = bytes;
payload.truncated = true;
if (typeof payload.chunk === 'string') {
payload.chunk = Buffer.from(payload.chunk, 'utf8')
.subarray(0, this.MAX_SINGLE_EVENT_PAYLOAD_BYTES)
.toString('utf8');
} else {
payload.chunk = payload.chunk.slice(
0,
this.MAX_SINGLE_EVENT_PAYLOAD_BYTES,
);
}
} else if (item.event === CoreEvent.ConsoleLog) {
const payload = item.args[0];
payload.originalByteLength = bytes;
payload.truncated = true;
payload.content = Buffer.from(payload.content, 'utf8')
.subarray(0, this.MAX_SINGLE_EVENT_PAYLOAD_BYTES)
.toString('utf8');
} else if (item.event === CoreEvent.UserFeedback) {
const payload = item.args[0];
payload.originalByteLength = bytes;
payload.truncated = true;
payload.message = Buffer.from(payload.message, 'utf8')
.subarray(0, this.MAX_SINGLE_EVENT_PAYLOAD_BYTES)
.toString('utf8');
}
}
}
private _stripPayload(item: EventBacklogItem): void {
const bytes = this._getPayloadBytes(item);
if (bytes === 0) return;
if (item.event === CoreEvent.Output) {
const payload = item.args[0];
payload.omitted = true;
if (payload.originalByteLength === undefined) {
payload.originalByteLength = bytes;
}
payload.chunk =
typeof payload.chunk === 'string' ? '' : new Uint8Array(0);
} else if (item.event === CoreEvent.ConsoleLog) {
const payload = item.args[0];
payload.omitted = true;
if (payload.originalByteLength === undefined) {
payload.originalByteLength = bytes;
}
payload.content = '';
} else if (item.event === CoreEvent.UserFeedback) {
const payload = item.args[0];
payload.omitted = true;
if (payload.originalByteLength === undefined) {
payload.originalByteLength = bytes;
}
payload.message = '';
}
}
private _emitOrQueue<K extends keyof CoreEvents>(
event: K,
...args: CoreEvents[K]
): void {
if (this.listenerCount(event) === 0) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const item = { event, args } as EventBacklogItem;
this._truncatePayloadIfNeeded(item);
const itemBytes = this._getPayloadBytes(item);
// Enforce MAX_BACKLOG_PAYLOAD_BYTES by stripping oldest items
let stripIdx = this._backlogHead;
while (
this._currentBacklogPayloadBytes + itemBytes >
this.MAX_BACKLOG_PAYLOAD_BYTES &&
stripIdx < this._eventBacklog.length
) {
const oldItem = this._eventBacklog[stripIdx];
if (oldItem) {
const oldBytes = this._getPayloadBytes(oldItem);
if (oldBytes > 0) {
this._stripPayload(oldItem);
this._currentBacklogPayloadBytes -= oldBytes;
} else {
// Item is already stripped, or has no payload. Skip it to preserve event metadata and count.
}
}
stripIdx++;
}
// Enforce MAX_BACKLOG_SIZE
const backlogSize = this._eventBacklog.length - this._backlogHead;
if (backlogSize >= CoreEventEmitter.MAX_BACKLOG_SIZE) {
// Evict oldest entry. Use a head pointer instead of shift() to avoid
// O(n) array reindexing on every eviction at capacity.
const oldItem = this._eventBacklog[this._backlogHead];
if (oldItem) {
this._currentBacklogPayloadBytes -= this._getPayloadBytes(oldItem);
}
(this._eventBacklog as unknown[])[this._backlogHead] = undefined;
this._backlogHead++;
// Compact once dead entries exceed half capacity to bound memory
@@ -260,8 +390,9 @@ export class CoreEventEmitter extends EventEmitter<CoreEvents> {
this._backlogHead = 0;
}
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
this._eventBacklog.push({ event, args } as EventBacklogItem);
this._eventBacklog.push(item);
this._currentBacklogPayloadBytes += itemBytes;
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
(this.emit as (event: K, ...args: CoreEvents[K]) => boolean)(
@@ -270,7 +401,6 @@ export class CoreEventEmitter extends EventEmitter<CoreEvents> {
);
}
}
/**
* Sends actionable feedback to the user.
* Buffers automatically if the UI hasn't subscribed yet.
@@ -409,6 +539,7 @@ export class CoreEventEmitter extends EventEmitter<CoreEvents> {
const head = this._backlogHead;
this._eventBacklog = [];
this._backlogHead = 0;
this._currentBacklogPayloadBytes = 0;
for (let i = head; i < backlog.length; i++) {
const item = backlog[i];
if (item === undefined) continue;