diff --git a/packages/core/src/utils/events.test.ts b/packages/core/src/utils/events.test.ts index 82be02f12a..06eedbc92a 100644 --- a/packages/core/src/utils/events.test.ts +++ b/packages/core/src/utils/events.test.ts @@ -425,4 +425,159 @@ describe('CoreEventEmitter', () => { expect(listener).toHaveBeenCalledExactlyOnceWith(payload); }); }); + + describe('Memory-bounded Backlog', () => { + it('should strip older events when MAX_BACKLOG_PAYLOAD_BYTES is exceeded', () => { + // Set small budget: 64KB + + ( + events as unknown as { + _setBudgetsForTesting: ( + maxBacklog: number, + maxSingle: number, + ) => void; + _currentBacklogPayloadBytes: number; + } + )._setBudgetsForTesting(64 * 1024, 1024 * 1024); + + const payloadSize = 20 * 1024; // 20KB each + const chunk = 'a'.repeat(payloadSize); + + // Emit 5 events (total 100KB > 64KB) + for (let i = 0; i < 5; i++) { + events.emitOutput(false, chunk); + } + + const listener = vi.fn(); + events.on(CoreEvent.Output, listener); + events.drainBacklogs(); + + expect(listener).toHaveBeenCalledTimes(5); + + // The first 2 events should be stripped to stay within the 64KB budget + expect(listener.mock.calls[0][0]).toMatchObject({ + chunk: '', + omitted: true, + originalByteLength: payloadSize, + }); + expect(listener.mock.calls[1][0]).toMatchObject({ + chunk: '', + omitted: true, + originalByteLength: payloadSize, + }); + + // The last 3 events should be intact (3 * 20KB = 60KB <= 64KB) + expect(listener.mock.calls[2][0]).toMatchObject({ chunk }); + expect(listener.mock.calls[3][0]).toMatchObject({ chunk }); + expect(listener.mock.calls[4][0]).toMatchObject({ chunk }); + + // Verify internal byte counter is reset + + expect( + ( + events as unknown as { + _setBudgetsForTesting: ( + maxBacklog: number, + maxSingle: number, + ) => void; + _currentBacklogPayloadBytes: number; + } + )._currentBacklogPayloadBytes, + ).toBe(0); + }); + + it('should truncate a single huge event that exceeds MAX_SINGLE_EVENT_PAYLOAD_BYTES', () => { + ( + events as unknown as { + _setBudgetsForTesting: ( + maxBacklog: number, + maxSingle: number, + ) => void; + _currentBacklogPayloadBytes: number; + } + )._setBudgetsForTesting(10 * 1024 * 1024, 1 * 1024 * 1024); // 1MB single event budget + + const payloadSize = 5 * 1024 * 1024; // 5MB + const chunk = 'b'.repeat(payloadSize); + + events.emitOutput(false, chunk); + + const listener = vi.fn(); + events.on(CoreEvent.Output, listener); + events.drainBacklogs(); + + expect(listener).toHaveBeenCalledTimes(1); + const output = listener.mock.calls[0][0]; + + // Should be truncated to 1MB + expect(output.chunk.length).toBe(1 * 1024 * 1024); + expect(output.truncated).toBe(true); + expect(output.originalByteLength).toBe(payloadSize); + + expect( + ( + events as unknown as { + _setBudgetsForTesting: ( + maxBacklog: number, + maxSingle: number, + ) => void; + _currentBacklogPayloadBytes: number; + } + )._currentBacklogPayloadBytes, + ).toBe(0); + }); + + it('should preserve MAX_BACKLOG_SIZE events while staying within memory budget', () => { + ( + events as unknown as { + _setBudgetsForTesting: ( + maxBacklog: number, + maxSingle: number, + ) => void; + _currentBacklogPayloadBytes: number; + } + )._setBudgetsForTesting(64 * 1024, 1024 * 1024); + + const MAX_BACKLOG_SIZE = 10000; + const smallChunk = 'c'.repeat(10); // 10 bytes + + // Emit enough to fill the backlog entirely + for (let i = 0; i < MAX_BACKLOG_SIZE; i++) { + events.emitOutput(false, smallChunk); + } + + // Total bytes = 100,000 > 65,536 (64KB budget) + // Older items will be stripped to respect the memory budget. + + const listener = vi.fn(); + events.on(CoreEvent.Output, listener); + events.drainBacklogs(); + + expect(listener).toHaveBeenCalledTimes(MAX_BACKLOG_SIZE); + + // Verify the first item was stripped + expect(listener.mock.calls[0][0]).toMatchObject({ + chunk: '', + omitted: true, + originalByteLength: 10, + }); + + // Verify the latest items are preserved + expect(listener.mock.calls[MAX_BACKLOG_SIZE - 1][0]).toMatchObject({ + chunk: smallChunk, + }); + + expect( + ( + events as unknown as { + _setBudgetsForTesting: ( + maxBacklog: number, + maxSingle: number, + ) => void; + _currentBacklogPayloadBytes: number; + } + )._currentBacklogPayloadBytes, + ).toBe(0); + }); + }); }); diff --git a/packages/core/src/utils/events.ts b/packages/core/src/utils/events.ts index 47c42c93ba..2e8c8eb7f6 100644 --- a/packages/core/src/utils/events.ts +++ b/packages/core/src/utils/events.ts @@ -25,6 +25,9 @@ export type FeedbackSeverity = 'info' | 'warning' | 'error'; * Payload for the 'user-feedback' event. */ export interface UserFeedbackPayload { + truncated?: boolean; + originalByteLength?: number; + omitted?: boolean; /** * The severity level determines how the message is rendered in the UI * (e.g. colored text, specific icon). @@ -56,6 +59,9 @@ export interface ModelChangedPayload { * Payload for the 'console-log' event. */ export interface ConsoleLogPayload { + truncated?: boolean; + originalByteLength?: number; + omitted?: boolean; type: 'log' | 'warn' | 'error' | 'debug' | 'info'; content: string; } @@ -64,6 +70,9 @@ export interface ConsoleLogPayload { * Payload for the 'output' event. */ export interface OutputPayload { + truncated?: boolean; + originalByteLength?: number; + omitted?: boolean; isStderr: boolean; chunk: Uint8Array | string; encoding?: BufferEncoding; @@ -239,19 +248,140 @@ export class CoreEventEmitter extends EventEmitter { private _backlogHead = 0; private static readonly MAX_BACKLOG_SIZE = 10000; + // Modifiable for tests, defaults based on memory requirements + protected MAX_BACKLOG_PAYLOAD_BYTES = 50 * 1024 * 1024; + protected MAX_SINGLE_EVENT_PAYLOAD_BYTES = 1 * 1024 * 1024; + private _currentBacklogPayloadBytes = 0; + constructor() { super(); } + // Exposed purely for testing to override budget defaults + _setBudgetsForTesting(maxBacklog: number, maxSingle: number): void { + this.MAX_BACKLOG_PAYLOAD_BYTES = maxBacklog; + this.MAX_SINGLE_EVENT_PAYLOAD_BYTES = maxSingle; + } + + private _getPayloadBytes(item: EventBacklogItem): number { + if (item.event === CoreEvent.Output) { + const payload = item.args[0]; + if (typeof payload.chunk === 'string') { + return Buffer.byteLength(payload.chunk, 'utf8'); + } else { + return payload.chunk.byteLength; + } + } else if (item.event === CoreEvent.ConsoleLog) { + const payload = item.args[0]; + return Buffer.byteLength(payload.content, 'utf8'); + } else if (item.event === CoreEvent.UserFeedback) { + const payload = item.args[0]; + return Buffer.byteLength(payload.message, 'utf8'); + } + return 0; + } + + private _truncatePayloadIfNeeded(item: EventBacklogItem): void { + const bytes = this._getPayloadBytes(item); + if (bytes > this.MAX_SINGLE_EVENT_PAYLOAD_BYTES) { + if (item.event === CoreEvent.Output) { + const payload = item.args[0]; + payload.originalByteLength = bytes; + payload.truncated = true; + if (typeof payload.chunk === 'string') { + payload.chunk = Buffer.from(payload.chunk, 'utf8') + .subarray(0, this.MAX_SINGLE_EVENT_PAYLOAD_BYTES) + .toString('utf8'); + } else { + payload.chunk = payload.chunk.slice( + 0, + this.MAX_SINGLE_EVENT_PAYLOAD_BYTES, + ); + } + } else if (item.event === CoreEvent.ConsoleLog) { + const payload = item.args[0]; + payload.originalByteLength = bytes; + payload.truncated = true; + payload.content = Buffer.from(payload.content, 'utf8') + .subarray(0, this.MAX_SINGLE_EVENT_PAYLOAD_BYTES) + .toString('utf8'); + } else if (item.event === CoreEvent.UserFeedback) { + const payload = item.args[0]; + payload.originalByteLength = bytes; + payload.truncated = true; + payload.message = Buffer.from(payload.message, 'utf8') + .subarray(0, this.MAX_SINGLE_EVENT_PAYLOAD_BYTES) + .toString('utf8'); + } + } + } + + private _stripPayload(item: EventBacklogItem): void { + const bytes = this._getPayloadBytes(item); + if (bytes === 0) return; + + if (item.event === CoreEvent.Output) { + const payload = item.args[0]; + payload.omitted = true; + if (payload.originalByteLength === undefined) { + payload.originalByteLength = bytes; + } + payload.chunk = + typeof payload.chunk === 'string' ? '' : new Uint8Array(0); + } else if (item.event === CoreEvent.ConsoleLog) { + const payload = item.args[0]; + payload.omitted = true; + if (payload.originalByteLength === undefined) { + payload.originalByteLength = bytes; + } + payload.content = ''; + } else if (item.event === CoreEvent.UserFeedback) { + const payload = item.args[0]; + payload.omitted = true; + if (payload.originalByteLength === undefined) { + payload.originalByteLength = bytes; + } + payload.message = ''; + } + } + private _emitOrQueue( event: K, ...args: CoreEvents[K] ): void { if (this.listenerCount(event) === 0) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const item = { event, args } as EventBacklogItem; + this._truncatePayloadIfNeeded(item); + const itemBytes = this._getPayloadBytes(item); + + // Enforce MAX_BACKLOG_PAYLOAD_BYTES by stripping oldest items + let stripIdx = this._backlogHead; + while ( + this._currentBacklogPayloadBytes + itemBytes > + this.MAX_BACKLOG_PAYLOAD_BYTES && + stripIdx < this._eventBacklog.length + ) { + const oldItem = this._eventBacklog[stripIdx]; + if (oldItem) { + const oldBytes = this._getPayloadBytes(oldItem); + if (oldBytes > 0) { + this._stripPayload(oldItem); + this._currentBacklogPayloadBytes -= oldBytes; + } else { + // Item is already stripped, or has no payload. Skip it to preserve event metadata and count. + } + } + stripIdx++; + } + + // Enforce MAX_BACKLOG_SIZE const backlogSize = this._eventBacklog.length - this._backlogHead; if (backlogSize >= CoreEventEmitter.MAX_BACKLOG_SIZE) { - // Evict oldest entry. Use a head pointer instead of shift() to avoid - // O(n) array reindexing on every eviction at capacity. + const oldItem = this._eventBacklog[this._backlogHead]; + if (oldItem) { + this._currentBacklogPayloadBytes -= this._getPayloadBytes(oldItem); + } (this._eventBacklog as unknown[])[this._backlogHead] = undefined; this._backlogHead++; // Compact once dead entries exceed half capacity to bound memory @@ -260,8 +390,9 @@ export class CoreEventEmitter extends EventEmitter { this._backlogHead = 0; } } - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - this._eventBacklog.push({ event, args } as EventBacklogItem); + + this._eventBacklog.push(item); + this._currentBacklogPayloadBytes += itemBytes; } else { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion (this.emit as (event: K, ...args: CoreEvents[K]) => boolean)( @@ -270,7 +401,6 @@ export class CoreEventEmitter extends EventEmitter { ); } } - /** * Sends actionable feedback to the user. * Buffers automatically if the UI hasn't subscribed yet. @@ -409,6 +539,7 @@ export class CoreEventEmitter extends EventEmitter { const head = this._backlogHead; this._eventBacklog = []; this._backlogHead = 0; + this._currentBacklogPayloadBytes = 0; for (let i = head; i < backlog.length; i++) { const item = backlog[i]; if (item === undefined) continue;