diff --git a/packages/core/src/agent/agent-session.test.ts b/packages/core/src/agent/agent-session.test.ts index f69a9758bb..e3ff1c5dc0 100644 --- a/packages/core/src/agent/agent-session.test.ts +++ b/packages/core/src/agent/agent-session.test.ts @@ -188,9 +188,9 @@ describe('AgentSession', () => { ); expect(endEvent).toBeDefined(); - const iterator = session.stream({ eventId: endEvent!.id })[ - Symbol.asyncIterator - ](); + const iterator = session + .stream({ eventId: endEvent!.id }) + [Symbol.asyncIterator](); await expect(iterator.next()).resolves.toEqual({ value: undefined, done: true, @@ -201,9 +201,9 @@ describe('AgentSession', () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); - const iterator = session.stream({ eventId: 'missing-event' })[ - Symbol.asyncIterator - ](); + const iterator = session + .stream({ eventId: 'missing-event' }) + [Symbol.asyncIterator](); await expect(iterator.next()).rejects.toThrow( 'Unknown eventId: missing-event', ); @@ -222,15 +222,50 @@ describe('AgentSession', () => { ); expect(updateEvent).toBeDefined(); - const iterator = session.stream({ eventId: updateEvent!.id })[ - Symbol.asyncIterator - ](); + const iterator = session + .stream({ eventId: updateEvent!.id }) + [Symbol.asyncIterator](); await expect(iterator.next()).rejects.toThrow( - `Cannot resume from eventId ${updateEvent!.id} before agent_start; use stream({ streamId }) instead`, + `Cannot resume from eventId ${updateEvent!.id} before agent_start for stream ${updateEvent!.streamId}`, ); }); - it('should throw when resuming from a pre-agent_start event even if agent activity may start later', async () => { + it('should replay from agent_start when resuming from a pre-agent_start event after activity is in history', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([ + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'hello' }], + }, + ]); + await session.send({ + message: [{ type: 'text', text: 'request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const userMessage = session.events.find( + (event): event is AgentEvent<'message'> => + event.type === 'message' && event.role === 'user', + ); + expect(userMessage).toBeDefined(); + + const streamedEvents: AgentEvent[] = []; + for await (const event of session.stream({ eventId: userMessage!.id })) { + streamedEvents.push(event); + } + + expect(streamedEvents.map((event) => event.type)).toEqual([ + 'agent_start', + 'message', + 'agent_end', + ]); + expect(streamedEvents[0]?.streamId).toBe(userMessage!.streamId); + }); + + it('should throw when resuming from a pre-agent_start event before activity is in history', async () => { const protocol = new MockAgentProtocol([ { id: 'e-1', @@ -243,11 +278,11 @@ describe('AgentSession', () => { ]); const session = new AgentSession(protocol); - const iterator = session.stream({ eventId: 'e-1' })[ - Symbol.asyncIterator - ](); + const iterator = session + .stream({ eventId: 'e-1' }) + [Symbol.asyncIterator](); await expect(iterator.next()).rejects.toThrow( - 'Cannot resume from eventId e-1 before agent_start; use stream({ streamId }) instead', + 'Cannot resume from eventId e-1 before agent_start for stream stream-1', ); }); @@ -441,6 +476,5 @@ describe('AgentSession', () => { expect(streamedEvents.some((e) => e.type === 'agent_end')).toBe(true); expect(streamedEvents.some((e) => e.streamId === streamId2)).toBe(false); }); - }); }); diff --git a/packages/core/src/agent/agent-session.ts b/packages/core/src/agent/agent-session.ts index c48addcc4a..6a4c295fc8 100644 --- a/packages/core/src/agent/agent-session.ts +++ b/packages/core/src/agent/agent-session.ts @@ -131,32 +131,36 @@ export class AgentSession implements AgentProtocol { } const resumeEvent = currentEvents[index]; - replayStartIndex = index + 1; trackedStreamId = resumeEvent.streamId; - const streamHasStarted = - resumeEvent.type === 'agent_start' || - currentEvents - .slice(0, index) - .some( - (event) => - event.type === 'agent_start' && - event.streamId === trackedStreamId, - ); + const firstAgentStartIndex = currentEvents.findIndex( + (event) => + event.type === 'agent_start' && event.streamId === trackedStreamId, + ); if (resumeEvent.type === 'agent_end') { + replayStartIndex = index + 1; agentActivityStarted = true; done = true; - } else if (streamHasStarted) { + } else if ( + firstAgentStartIndex !== -1 && + firstAgentStartIndex <= index + ) { + replayStartIndex = index + 1; agentActivityStarted = true; + } else if (firstAgentStartIndex !== -1) { + // A pre-agent_start cursor can be resumed once the corresponding + // agent activity is already present in history. Because stream() + // yields only agent_start -> agent_end, replay begins at agent_start + // rather than at the original pre-start event. + replayStartIndex = firstAgentStartIndex; } else { - // Consumers can only resume by eventId once the stream has entered the - // agent_start -> agent_end lifecycle. For pre-start events, use - // stream({ streamId }) instead because this wrapper cannot - // distinguish "agent activity will start later" from "this send was - // acknowledged without agent activity" without risking an infinite - // wait. + // Consumers can only resume by eventId once the corresponding stream + // has entered the agent_start -> agent_end lifecycle in history. + // Without a recorded agent_start, this wrapper cannot distinguish + // "agent activity may start later" from "this send was acknowledged + // without agent activity" without risking an infinite wait. throw new Error( - `Cannot resume from eventId ${options.eventId} before agent_start; use stream({ streamId }) instead`, + `Cannot resume from eventId ${options.eventId} before agent_start for stream ${trackedStreamId}`, ); } } else if (options.streamId) {