From db0624c6cd1dbc4ee88db6abf472259c8cd8265f Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Fri, 20 Mar 2026 13:28:37 -0400 Subject: [PATCH] !feat(core): harden agent session stream semantics --- packages/core/src/agent/agent-session.test.ts | 37 +++++++++++++++++ packages/core/src/agent/agent-session.ts | 41 +++++++++++-------- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/packages/core/src/agent/agent-session.test.ts b/packages/core/src/agent/agent-session.test.ts index 959e8d1a4e..be5416bf83 100644 --- a/packages/core/src/agent/agent-session.test.ts +++ b/packages/core/src/agent/agent-session.test.ts @@ -171,6 +171,43 @@ describe('AgentSession', () => { expect(streamedEvents).toEqual(allEvents.slice(2)); }); + it('should complete immediately when resuming from agent_end', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([{ type: 'message' }]); + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const endEvent = session.events.findLast( + (event): event is AgentEvent<'agent_end'> => + event.type === 'agent_end' && event.streamId === streamId, + ); + expect(endEvent).toBeDefined(); + + const iterator = session.stream({ eventId: endEvent!.id })[ + Symbol.asyncIterator + ](); + await expect(iterator.next()).resolves.toEqual({ + value: undefined, + done: true, + }); + }); + + it('should throw for an unknown eventId', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + const iterator = session.stream({ eventId: 'missing-event' })[ + Symbol.asyncIterator + ](); + await expect(iterator.next()).rejects.toThrow( + 'Unknown eventId: missing-event', + ); + }); + it('should resume from an in-stream event within the same stream only', async () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); diff --git a/packages/core/src/agent/agent-session.ts b/packages/core/src/agent/agent-session.ts index 4dacdceca8..e01f4d568f 100644 --- a/packages/core/src/agent/agent-session.ts +++ b/packages/core/src/agent/agent-session.ts @@ -129,24 +129,29 @@ export class AgentSession implements AgentProtocol { if (options.eventId) { const index = currentEvents.findIndex((e) => e.id === options.eventId); - if (index !== -1) { - const resumeEvent = currentEvents[index]; - replayStartIndex = index + 1; - trackedStreamId = resumeEvent?.streamId ?? undefined; - if (trackedStreamId) { - agentActivityStarted = - resumeEvent?.type === 'agent_start' || - currentEvents - .slice(0, index) - .some( - (event) => - event.type === 'agent_start' && - event.streamId === trackedStreamId, - ); - } else { - // No correlated stream means "resume globally after this event". - agentActivityStarted = true; - } + if (index === -1) { + throw new Error(`Unknown eventId: ${options.eventId}`); + } + + const resumeEvent = currentEvents[index]; + replayStartIndex = index + 1; + trackedStreamId = resumeEvent?.streamId ?? undefined; + if (resumeEvent?.type === 'agent_end') { + agentActivityStarted = true; + done = true; + } else if (trackedStreamId) { + agentActivityStarted = + resumeEvent?.type === 'agent_start' || + currentEvents + .slice(0, index) + .some( + (event) => + event.type === 'agent_start' && + event.streamId === trackedStreamId, + ); + } else { + // No correlated stream means "resume globally after this event". + agentActivityStarted = true; } } else if (options.streamId) { const index = currentEvents.findIndex(