diff --git a/packages/core/src/agent/agent-session.test.ts b/packages/core/src/agent/agent-session.test.ts index 235b4eb013..e3ff1c5dc0 100644 --- a/packages/core/src/agent/agent-session.test.ts +++ b/packages/core/src/agent/agent-session.test.ts @@ -117,6 +117,7 @@ describe('AgentSession', () => { expect(events).toHaveLength(0); expect(protocol.events).toHaveLength(1); expect(protocol.events[0].type).toBe('session_update'); + expect(protocol.events[0].streamId).toEqual(expect.any(String)); }); it('should skip events that occur before agent_start', async () => { @@ -171,6 +172,181 @@ describe('AgentSession', () => { expect(streamedEvents).toEqual(allEvents.slice(2)); }); + it('should complete immediately when resuming from agent_end', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([{ type: 'message' }]); + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const endEvent = session.events.findLast( + (event): event is AgentEvent<'agent_end'> => + event.type === 'agent_end' && event.streamId === streamId, + ); + expect(endEvent).toBeDefined(); + + const iterator = session + .stream({ eventId: endEvent!.id }) + [Symbol.asyncIterator](); + await expect(iterator.next()).resolves.toEqual({ + value: undefined, + done: true, + }); + }); + + it('should throw for an unknown eventId', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + const iterator = session + .stream({ eventId: 'missing-event' }) + [Symbol.asyncIterator](); + await expect(iterator.next()).rejects.toThrow( + 'Unknown eventId: missing-event', + ); + }); + + it('should throw when resuming from an event before agent_start on a stream with no agent activity', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + const { streamId } = await session.send({ update: { title: 'draft' } }); + expect(streamId).toBeNull(); + + const updateEvent = session.events.find( + (event): event is AgentEvent<'session_update'> => + event.type === 'session_update', + ); + expect(updateEvent).toBeDefined(); + + const iterator = session + .stream({ eventId: updateEvent!.id }) + [Symbol.asyncIterator](); + await expect(iterator.next()).rejects.toThrow( + `Cannot resume from eventId ${updateEvent!.id} before agent_start for stream ${updateEvent!.streamId}`, + ); + }); + + it('should replay from agent_start when resuming from a pre-agent_start event after activity is in history', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([ + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'hello' }], + }, + ]); + await session.send({ + message: [{ type: 'text', text: 'request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const userMessage = session.events.find( + (event): event is AgentEvent<'message'> => + event.type === 'message' && event.role === 'user', + ); + expect(userMessage).toBeDefined(); + + const streamedEvents: AgentEvent[] = []; + for await (const event of session.stream({ eventId: userMessage!.id })) { + streamedEvents.push(event); + } + + expect(streamedEvents.map((event) => event.type)).toEqual([ + 'agent_start', + 'message', + 'agent_end', + ]); + expect(streamedEvents[0]?.streamId).toBe(userMessage!.streamId); + }); + + it('should throw when resuming from a pre-agent_start event before activity is in history', async () => { + const protocol = new MockAgentProtocol([ + { + id: 'e-1', + timestamp: '2026-01-01T00:00:00.000Z', + streamId: 'stream-1', + type: 'message', + role: 'user', + content: [{ type: 'text', text: 'request' }], + }, + ]); + const session = new AgentSession(protocol); + + const iterator = session + .stream({ eventId: 'e-1' }) + [Symbol.asyncIterator](); + await expect(iterator.next()).rejects.toThrow( + 'Cannot resume from eventId e-1 before agent_start for stream stream-1', + ); + }); + + it('should resume from an in-stream event within the same stream only', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([ + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'first answer 1' }], + }, + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'first answer 2' }], + }, + ]); + const { streamId: streamId1 } = await session.send({ + message: [{ type: 'text', text: 'first request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + protocol.pushResponse([ + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'second answer' }], + }, + ]); + await session.send({ + message: [{ type: 'text', text: 'second request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const resumeEvent = session.events.find( + (event): event is AgentEvent<'message'> => + event.type === 'message' && + event.streamId === streamId1 && + event.role === 'agent' && + event.content[0]?.type === 'text' && + event.content[0].text === 'first answer 1', + ); + expect(resumeEvent).toBeDefined(); + + const streamedEvents: AgentEvent[] = []; + for await (const event of session.stream({ eventId: resumeEvent!.id })) { + streamedEvents.push(event); + } + + expect( + streamedEvents.every((event) => event.streamId === streamId1), + ).toBe(true); + expect(streamedEvents.map((event) => event.type)).toEqual([ + 'message', + 'agent_end', + ]); + const resumedMessage = streamedEvents[0] as AgentEvent<'message'>; + expect(resumedMessage.content).toEqual([ + { type: 'text', text: 'first answer 2' }, + ]); + }); + it('should replay events for streamId starting with agent_start', async () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); @@ -223,6 +399,33 @@ describe('AgentSession', () => { expect(streamedEvents.at(-1)?.type).toBe('agent_end'); }); + it('should not drop agent_end that arrives while replay events are being yielded', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([{ type: 'message' }], { keepOpen: true }); + const { streamId } = await session.send({ update: { title: 't1' } }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const iterator = session + .stream({ streamId: streamId! }) + [Symbol.asyncIterator](); + + const first = await iterator.next(); + expect(first.value?.type).toBe('agent_start'); + + protocol.pushToStream(streamId!, [], { close: true }); + + const second = await iterator.next(); + expect(second.value?.type).toBe('message'); + + const third = await iterator.next(); + expect(third.value?.type).toBe('agent_end'); + + const fourth = await iterator.next(); + expect(fourth.done).toBe(true); + }); + it('should follow an active stream if no options provided', async () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); diff --git a/packages/core/src/agent/agent-session.ts b/packages/core/src/agent/agent-session.ts index 0d9fc86bb0..6a4c295fc8 100644 --- a/packages/core/src/agent/agent-session.ts +++ b/packages/core/src/agent/agent-session.ts @@ -34,7 +34,7 @@ export class AgentSession implements AgentProtocol { return this._protocol.abort(); } - get events(): AgentEvent[] { + get events(): readonly AgentEvent[] { return this._protocol.events; } @@ -77,6 +77,30 @@ export class AgentSession implements AgentProtocol { let done = false; let trackedStreamId = options.streamId; let started = false; + let agentActivityStarted = false; + + const queueVisibleEvent = (event: AgentEvent): void => { + if (trackedStreamId && event.streamId !== trackedStreamId) { + return; + } + + if (!agentActivityStarted) { + if (event.type !== 'agent_start') { + return; + } + trackedStreamId = event.streamId; + agentActivityStarted = true; + } + + if (!trackedStreamId) { + return; + } + + eventQueue.push(event); + if (event.type === 'agent_end' && event.streamId === trackedStreamId) { + done = true; + } + }; // 1. Subscribe early to avoid missing any events that occur during replay setup const unsubscribe = this._protocol.subscribe((event) => { @@ -87,23 +111,7 @@ export class AgentSession implements AgentProtocol { return; } - if (trackedStreamId && event.streamId !== trackedStreamId) return; - - // If we don't have a tracked stream yet, the first agent_start we see becomes it. - if (!trackedStreamId && event.type === 'agent_start') { - trackedStreamId = event.streamId ?? undefined; - } - - // If we still don't have a tracked stream and we aren't replaying everything (eventId), ignore. - if (!trackedStreamId && !options.eventId) return; - - eventQueue.push(event); - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { - done = true; - } + queueVisibleEvent(event); const currentResolve = resolve; next = new Promise((r) => { @@ -118,8 +126,42 @@ export class AgentSession implements AgentProtocol { if (options.eventId) { const index = currentEvents.findIndex((e) => e.id === options.eventId); - if (index !== -1) { + if (index === -1) { + throw new Error(`Unknown eventId: ${options.eventId}`); + } + + const resumeEvent = currentEvents[index]; + trackedStreamId = resumeEvent.streamId; + const firstAgentStartIndex = currentEvents.findIndex( + (event) => + event.type === 'agent_start' && event.streamId === trackedStreamId, + ); + + if (resumeEvent.type === 'agent_end') { replayStartIndex = index + 1; + agentActivityStarted = true; + done = true; + } else if ( + firstAgentStartIndex !== -1 && + firstAgentStartIndex <= index + ) { + replayStartIndex = index + 1; + agentActivityStarted = true; + } else if (firstAgentStartIndex !== -1) { + // A pre-agent_start cursor can be resumed once the corresponding + // agent activity is already present in history. Because stream() + // yields only agent_start -> agent_end, replay begins at agent_start + // rather than at the original pre-start event. + replayStartIndex = firstAgentStartIndex; + } else { + // Consumers can only resume by eventId once the corresponding stream + // has entered the agent_start -> agent_end lifecycle in history. + // Without a recorded agent_start, this wrapper cannot distinguish + // "agent activity may start later" from "this send was acknowledged + // without agent activity" without risking an infinite wait. + throw new Error( + `Cannot resume from eventId ${options.eventId} before agent_start for stream ${trackedStreamId}`, + ); } } else if (options.streamId) { const index = currentEvents.findIndex( @@ -128,29 +170,7 @@ export class AgentSession implements AgentProtocol { if (index !== -1) { replayStartIndex = index; } - } - - if (replayStartIndex !== -1) { - for (let i = replayStartIndex; i < currentEvents.length; i++) { - const event = currentEvents[i]; - if (options.streamId && event.streamId !== options.streamId) continue; - - eventQueue.push(event); - if (event.type === 'agent_start' && !trackedStreamId) { - trackedStreamId = event.streamId ?? undefined; - } - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { - done = true; - break; - } - } - } - - if (!done && !trackedStreamId) { - // Find active stream in history + } else { const activeStarts = currentEvents.filter( (e) => e.type === 'agent_start', ); @@ -161,36 +181,28 @@ export class AgentSession implements AgentProtocol { (e) => e.type === 'agent_end' && e.streamId === start.streamId, ) ) { - trackedStreamId = start.streamId ?? undefined; + trackedStreamId = start.streamId; + replayStartIndex = currentEvents.findIndex( + (e) => e.id === start.id, + ); break; } } } - // If we replayed to the end and no stream is active, and we were specifically - // replaying from an eventId (or we've already finished the stream we were looking for), we are done. - if (!done && !trackedStreamId && options.eventId) { - done = true; + if (replayStartIndex !== -1) { + for (let i = replayStartIndex; i < currentEvents.length; i++) { + const event = currentEvents[i]; + queueVisibleEvent(event); + if (done) break; + } } - started = true; // Process events that arrived while we were replaying for (const event of earlyEvents) { if (done) break; - if (trackedStreamId && event.streamId !== trackedStreamId) continue; - if (!trackedStreamId && event.type === 'agent_start') { - trackedStreamId = event.streamId ?? undefined; - } - if (!trackedStreamId && !options.eventId) continue; - - eventQueue.push(event); - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { - done = true; - } + queueVisibleEvent(event); } while (true) { @@ -200,6 +212,7 @@ export class AgentSession implements AgentProtocol { for (const event of eventsToYield) { yield event; } + continue; } if (done) break; diff --git a/packages/core/src/agent/mock.test.ts b/packages/core/src/agent/mock.test.ts index 4f102d5dbd..f5138e388a 100644 --- a/packages/core/src/agent/mock.test.ts +++ b/packages/core/src/agent/mock.test.ts @@ -235,7 +235,7 @@ describe('MockAgentProtocol', () => { expect(streamId).toBeNull(); expect(session.events).toHaveLength(1); expect(session.events[0].type).toBe('session_update'); - expect(session.events[0].streamId).toBeNull(); + expect(session.events[0].streamId).toEqual(expect.any(String)); }); it('should throw on action', async () => { diff --git a/packages/core/src/agent/mock.ts b/packages/core/src/agent/mock.ts index 683e3e0b2a..80d8ebae2f 100644 --- a/packages/core/src/agent/mock.ts +++ b/packages/core/src/agent/mock.ts @@ -8,8 +8,8 @@ import type { AgentEvent, AgentEventCommon, AgentEventData, - AgentSend, AgentProtocol, + AgentSend, Unsubscribe, } from './types.js'; @@ -86,13 +86,7 @@ export class MockAgentProtocol implements AgentProtocol { ) { const now = new Date().toISOString(); for (const eventData of events) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const event: AgentEvent = { - ...eventData, - id: eventData.id ?? `e-${this._nextEventId++}`, - timestamp: eventData.timestamp ?? now, - streamId: eventData.streamId ?? streamId, - } as AgentEvent; + const event = this._normalizeEvent(eventData, now, streamId); this._emit(event); } @@ -100,13 +94,13 @@ export class MockAgentProtocol implements AgentProtocol { options?.close && !events.some((eventData) => eventData.type === 'agent_end') ) { - this._emit({ - id: `e-${this._nextEventId++}`, - timestamp: now, - streamId, - type: 'agent_end', - reason: 'completed', - } as AgentEvent); + this._emit( + this._normalizeEvent( + { type: 'agent_end', reason: 'completed' }, + now, + streamId, + ), + ); } } @@ -124,16 +118,18 @@ export class MockAgentProtocol implements AgentProtocol { const now = new Date().toISOString(); const eventsToEmit: AgentEvent[] = []; + let fallbackStreamId: string | undefined; - // Helper to normalize and prepare for emission + // All emitted events stay correlated to a stream even if this send does not + // start agent activity and therefore returns `streamId: null`. const normalize = (eventData: MockAgentEvent): AgentEvent => - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - ({ - ...eventData, - id: eventData.id ?? `e-${this._nextEventId++}`, - timestamp: eventData.timestamp ?? now, - streamId: eventData.streamId ?? streamId, - }) as AgentEvent; + this._normalizeEvent( + eventData, + now, + eventData.streamId ?? + streamId ?? + (fallbackStreamId ??= `mock-stream-${this._nextStreamId++}`), + ); // 1. User/Update event (BEFORE agent_start) if ('message' in payload && payload.message) { @@ -225,16 +221,32 @@ export class MockAgentProtocol implements AgentProtocol { return { streamId }; } + private _normalizeEvent( + eventData: MockAgentEvent, + timestamp: string, + streamId: string, + ): AgentEvent { + // TypeScript loses the specific union member when we add common event + // fields here, so keep the narrowing local to this mock-only helper. + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + return { + ...eventData, + id: eventData.id ?? `e-${this._nextEventId++}`, + timestamp: eventData.timestamp ?? timestamp, + streamId: eventData.streamId ?? streamId, + } as AgentEvent; + } + async abort(): Promise { if (this._lastStreamId && this._activeStreamIds.has(this._lastStreamId)) { const streamId = this._lastStreamId; - this._emit({ - id: `e-${this._nextEventId++}`, - timestamp: new Date().toISOString(), - streamId, - type: 'agent_end', - reason: 'aborted', - } as AgentEvent); + this._emit( + this._normalizeEvent( + { type: 'agent_end', reason: 'aborted' }, + new Date().toISOString(), + streamId, + ), + ); } } } diff --git a/packages/core/src/agent/types.ts b/packages/core/src/agent/types.ts index 014998d68b..4ec369d066 100644 --- a/packages/core/src/agent/types.ts +++ b/packages/core/src/agent/types.ts @@ -11,9 +11,10 @@ export type Unsubscribe = () => void; export interface AgentProtocol extends Trajectory { /** * Send data to the agent. Promise resolves when action is acknowledged. - * Returns the `streamId` of the stream the message was correlated to -- - * this may be a new stream if idle, an existing stream, or null if no - * stream was triggered. + * Returns the agent-activity `streamId` affected by the send. This may be a + * new stream if idle, an existing stream, or null if the send was + * acknowledged without starting agent activity. Emitted events should still + * remain correlated to a stream via their `streamId`. * * When a new stream is created by a send, the streamId MUST be returned * before the `agent_start` event is emitted for the stream. @@ -36,7 +37,7 @@ export interface AgentProtocol extends Trajectory { /** * AgentProtocol implements the Trajectory interface and can retrieve existing events. */ - readonly events: AgentEvent[]; + readonly events: readonly AgentEvent[]; } type RequireExactlyOne = { @@ -54,7 +55,7 @@ interface AgentSendPayloads { export type AgentSend = RequireExactlyOne & WithMeta; export interface Trajectory { - readonly events: AgentEvent[]; + readonly events: readonly AgentEvent[]; } export interface AgentEventCommon { @@ -62,8 +63,8 @@ export interface AgentEventCommon { id: string; /** Identifies the subagent thread, omitted for "main thread" events. */ threadId?: string; - /** Identifies a particular stream of a particular thread. */ - streamId?: string | null; + /** Identifies the stream this event belongs to. */ + streamId: string; /** ISO Timestamp for the time at which the event occurred. */ timestamp: string; /** The concrete type of the event. */