diff --git a/packages/core/src/agent/agent-session.test.ts b/packages/core/src/agent/agent-session.test.ts index be5416bf83..fd32ce4fb7 100644 --- a/packages/core/src/agent/agent-session.test.ts +++ b/packages/core/src/agent/agent-session.test.ts @@ -117,6 +117,7 @@ describe('AgentSession', () => { expect(events).toHaveLength(0); expect(protocol.events).toHaveLength(1); expect(protocol.events[0].type).toBe('session_update'); + expect(protocol.events[0].streamId).toEqual(expect.any(String)); }); it('should skip events that occur before agent_start', async () => { @@ -208,6 +209,28 @@ describe('AgentSession', () => { ); }); + it('should complete immediately when resuming from an event on a stream with no agent activity', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + const { streamId } = await session.send({ update: { title: 'draft' } }); + expect(streamId).toBeNull(); + + const updateEvent = session.events.find( + (event): event is AgentEvent<'session_update'> => + event.type === 'session_update', + ); + expect(updateEvent).toBeDefined(); + + const iterator = session.stream({ eventId: updateEvent!.id })[ + Symbol.asyncIterator + ](); + await expect(iterator.next()).resolves.toEqual({ + value: undefined, + done: true, + }); + }); + it('should resume from an in-stream event within the same stream only', async () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); @@ -398,5 +421,6 @@ describe('AgentSession', () => { expect(streamedEvents.some((e) => e.type === 'agent_end')).toBe(true); expect(streamedEvents.some((e) => e.streamId === streamId2)).toBe(false); }); + }); }); diff --git a/packages/core/src/agent/agent-session.ts b/packages/core/src/agent/agent-session.ts index e01f4d568f..a58efec4ea 100644 --- a/packages/core/src/agent/agent-session.ts +++ b/packages/core/src/agent/agent-session.ts @@ -34,7 +34,7 @@ export class AgentSession implements AgentProtocol { return this._protocol.abort(); } - get events(): AgentEvent[] { + get events(): readonly AgentEvent[] { return this._protocol.events; } @@ -88,19 +88,16 @@ export class AgentSession implements AgentProtocol { if (event.type !== 'agent_start') { return; } - trackedStreamId = event.streamId ?? trackedStreamId ?? undefined; + trackedStreamId = event.streamId; agentActivityStarted = true; } - if (!trackedStreamId && !options.eventId) { + if (!trackedStreamId) { return; } eventQueue.push(event); - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { + if (event.type === 'agent_end' && event.streamId === trackedStreamId) { done = true; } }; @@ -135,23 +132,32 @@ export class AgentSession implements AgentProtocol { const resumeEvent = currentEvents[index]; replayStartIndex = index + 1; - trackedStreamId = resumeEvent?.streamId ?? undefined; - if (resumeEvent?.type === 'agent_end') { + trackedStreamId = resumeEvent.streamId; + const streamHasStarted = + resumeEvent.type === 'agent_start' || + currentEvents + .slice(0, index) + .some( + (event) => + event.type === 'agent_start' && + event.streamId === trackedStreamId, + ); + + if (resumeEvent.type === 'agent_end') { agentActivityStarted = true; done = true; - } else if (trackedStreamId) { - agentActivityStarted = - resumeEvent?.type === 'agent_start' || - currentEvents - .slice(0, index) - .some( - (event) => - event.type === 'agent_start' && - event.streamId === trackedStreamId, - ); - } else { - // No correlated stream means "resume globally after this event". + } else if (streamHasStarted) { agentActivityStarted = true; + } else if ( + !currentEvents + .slice(index + 1) + .some( + (event) => + event.type === 'agent_start' && + event.streamId === trackedStreamId, + ) + ) { + done = true; } } else if (options.streamId) { const index = currentEvents.findIndex( @@ -171,7 +177,7 @@ export class AgentSession implements AgentProtocol { (e) => e.type === 'agent_end' && e.streamId === start.streamId, ) ) { - trackedStreamId = start.streamId ?? undefined; + trackedStreamId = start.streamId; replayStartIndex = currentEvents.findIndex( (e) => e.id === start.id, ); @@ -187,13 +193,6 @@ export class AgentSession implements AgentProtocol { if (done) break; } } - - // If we replayed to the end and no stream is active, and we were specifically - // replaying from an eventId (or we've already finished the stream we were looking for), we are done. - if (!done && !trackedStreamId && options.eventId) { - done = true; - } - started = true; // Process events that arrived while we were replaying diff --git a/packages/core/src/agent/mock.test.ts b/packages/core/src/agent/mock.test.ts index 4f102d5dbd..f5138e388a 100644 --- a/packages/core/src/agent/mock.test.ts +++ b/packages/core/src/agent/mock.test.ts @@ -235,7 +235,7 @@ describe('MockAgentProtocol', () => { expect(streamId).toBeNull(); expect(session.events).toHaveLength(1); expect(session.events[0].type).toBe('session_update'); - expect(session.events[0].streamId).toBeNull(); + expect(session.events[0].streamId).toEqual(expect.any(String)); }); it('should throw on action', async () => { diff --git a/packages/core/src/agent/mock.ts b/packages/core/src/agent/mock.ts index 683e3e0b2a..dfbe96e5f1 100644 --- a/packages/core/src/agent/mock.ts +++ b/packages/core/src/agent/mock.ts @@ -118,21 +118,24 @@ export class MockAgentProtocol implements AgentProtocol { // If there were queued responses (even if empty array), we trigger a stream. const hasResponseEvents = responseData !== undefined; - const streamId = hasResponseEvents - ? (response[0]?.streamId ?? `mock-stream-${this._nextStreamId++}`) - : null; + let correlationStreamId: string | undefined; + const getCorrelationStreamId = (): string => + (correlationStreamId ??= + response[0]?.streamId ?? `mock-stream-${this._nextStreamId++}`); + const streamId = hasResponseEvents ? getCorrelationStreamId() : null; const now = new Date().toISOString(); const eventsToEmit: AgentEvent[] = []; - // Helper to normalize and prepare for emission + // All emitted events stay correlated to a stream even if this send does not + // start agent activity and therefore returns `streamId: null`. const normalize = (eventData: MockAgentEvent): AgentEvent => // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion ({ ...eventData, id: eventData.id ?? `e-${this._nextEventId++}`, timestamp: eventData.timestamp ?? now, - streamId: eventData.streamId ?? streamId, + streamId: eventData.streamId ?? getCorrelationStreamId(), }) as AgentEvent; // 1. User/Update event (BEFORE agent_start) diff --git a/packages/core/src/agent/types.ts b/packages/core/src/agent/types.ts index 014998d68b..4ec369d066 100644 --- a/packages/core/src/agent/types.ts +++ b/packages/core/src/agent/types.ts @@ -11,9 +11,10 @@ export type Unsubscribe = () => void; export interface AgentProtocol extends Trajectory { /** * Send data to the agent. Promise resolves when action is acknowledged. - * Returns the `streamId` of the stream the message was correlated to -- - * this may be a new stream if idle, an existing stream, or null if no - * stream was triggered. + * Returns the agent-activity `streamId` affected by the send. This may be a + * new stream if idle, an existing stream, or null if the send was + * acknowledged without starting agent activity. Emitted events should still + * remain correlated to a stream via their `streamId`. * * When a new stream is created by a send, the streamId MUST be returned * before the `agent_start` event is emitted for the stream. @@ -36,7 +37,7 @@ export interface AgentProtocol extends Trajectory { /** * AgentProtocol implements the Trajectory interface and can retrieve existing events. */ - readonly events: AgentEvent[]; + readonly events: readonly AgentEvent[]; } type RequireExactlyOne = { @@ -54,7 +55,7 @@ interface AgentSendPayloads { export type AgentSend = RequireExactlyOne & WithMeta; export interface Trajectory { - readonly events: AgentEvent[]; + readonly events: readonly AgentEvent[]; } export interface AgentEventCommon { @@ -62,8 +63,8 @@ export interface AgentEventCommon { id: string; /** Identifies the subagent thread, omitted for "main thread" events. */ threadId?: string; - /** Identifies a particular stream of a particular thread. */ - streamId?: string | null; + /** Identifies the stream this event belongs to. */ + streamId: string; /** ISO Timestamp for the time at which the event occurred. */ timestamp: string; /** The concrete type of the event. */