From 1947629a38f503023d56e3fe59ab4ea7526890ad Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Fri, 20 Mar 2026 12:55:26 -0400 Subject: [PATCH] !feat(core): harden agent session stream semantics --- packages/core/src/agent/agent-session.test.ts | 88 ++++++++++++++ packages/core/src/agent/agent-session.ts | 111 +++++++++--------- 2 files changed, 146 insertions(+), 53 deletions(-) diff --git a/packages/core/src/agent/agent-session.test.ts b/packages/core/src/agent/agent-session.test.ts index 235b4eb013..959e8d1a4e 100644 --- a/packages/core/src/agent/agent-session.test.ts +++ b/packages/core/src/agent/agent-session.test.ts @@ -171,6 +171,67 @@ describe('AgentSession', () => { expect(streamedEvents).toEqual(allEvents.slice(2)); }); + it('should resume from an in-stream event within the same stream only', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([ + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'first answer 1' }], + }, + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'first answer 2' }], + }, + ]); + const { streamId: streamId1 } = await session.send({ + message: [{ type: 'text', text: 'first request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + protocol.pushResponse([ + { + type: 'message', + role: 'agent', + content: [{ type: 'text', text: 'second answer' }], + }, + ]); + await session.send({ + message: [{ type: 'text', text: 'second request' }], + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const resumeEvent = session.events.find( + (event): event is AgentEvent<'message'> => + event.type === 'message' && + event.streamId === streamId1 && + event.role === 'agent' && + event.content[0]?.type === 'text' && + event.content[0].text === 'first answer 1', + ); + expect(resumeEvent).toBeDefined(); + + const streamedEvents: AgentEvent[] = []; + for await (const event of session.stream({ eventId: resumeEvent!.id })) { + streamedEvents.push(event); + } + + expect( + streamedEvents.every((event) => event.streamId === streamId1), + ).toBe(true); + expect(streamedEvents.map((event) => event.type)).toEqual([ + 'message', + 'agent_end', + ]); + const resumedMessage = streamedEvents[0] as AgentEvent<'message'>; + expect(resumedMessage.content).toEqual([ + { type: 'text', text: 'first answer 2' }, + ]); + }); + it('should replay events for streamId starting with agent_start', async () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); @@ -223,6 +284,33 @@ describe('AgentSession', () => { expect(streamedEvents.at(-1)?.type).toBe('agent_end'); }); + it('should not drop agent_end that arrives while replay events are being yielded', async () => { + const protocol = new MockAgentProtocol(); + const session = new AgentSession(protocol); + + protocol.pushResponse([{ type: 'message' }], { keepOpen: true }); + const { streamId } = await session.send({ update: { title: 't1' } }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const iterator = session + .stream({ streamId: streamId! }) + [Symbol.asyncIterator](); + + const first = await iterator.next(); + expect(first.value?.type).toBe('agent_start'); + + protocol.pushToStream(streamId!, [], { close: true }); + + const second = await iterator.next(); + expect(second.value?.type).toBe('message'); + + const third = await iterator.next(); + expect(third.value?.type).toBe('agent_end'); + + const fourth = await iterator.next(); + expect(fourth.done).toBe(true); + }); + it('should follow an active stream if no options provided', async () => { const protocol = new MockAgentProtocol(); const session = new AgentSession(protocol); diff --git a/packages/core/src/agent/agent-session.ts b/packages/core/src/agent/agent-session.ts index 0d9fc86bb0..4dacdceca8 100644 --- a/packages/core/src/agent/agent-session.ts +++ b/packages/core/src/agent/agent-session.ts @@ -77,6 +77,33 @@ export class AgentSession implements AgentProtocol { let done = false; let trackedStreamId = options.streamId; let started = false; + let agentActivityStarted = false; + + const queueVisibleEvent = (event: AgentEvent): void => { + if (trackedStreamId && event.streamId !== trackedStreamId) { + return; + } + + if (!agentActivityStarted) { + if (event.type !== 'agent_start') { + return; + } + trackedStreamId = event.streamId ?? trackedStreamId ?? undefined; + agentActivityStarted = true; + } + + if (!trackedStreamId && !options.eventId) { + return; + } + + eventQueue.push(event); + if ( + event.type === 'agent_end' && + event.streamId === (trackedStreamId ?? null) + ) { + done = true; + } + }; // 1. Subscribe early to avoid missing any events that occur during replay setup const unsubscribe = this._protocol.subscribe((event) => { @@ -87,23 +114,7 @@ export class AgentSession implements AgentProtocol { return; } - if (trackedStreamId && event.streamId !== trackedStreamId) return; - - // If we don't have a tracked stream yet, the first agent_start we see becomes it. - if (!trackedStreamId && event.type === 'agent_start') { - trackedStreamId = event.streamId ?? undefined; - } - - // If we still don't have a tracked stream and we aren't replaying everything (eventId), ignore. - if (!trackedStreamId && !options.eventId) return; - - eventQueue.push(event); - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { - done = true; - } + queueVisibleEvent(event); const currentResolve = resolve; next = new Promise((r) => { @@ -119,7 +130,23 @@ export class AgentSession implements AgentProtocol { if (options.eventId) { const index = currentEvents.findIndex((e) => e.id === options.eventId); if (index !== -1) { + const resumeEvent = currentEvents[index]; replayStartIndex = index + 1; + trackedStreamId = resumeEvent?.streamId ?? undefined; + if (trackedStreamId) { + agentActivityStarted = + resumeEvent?.type === 'agent_start' || + currentEvents + .slice(0, index) + .some( + (event) => + event.type === 'agent_start' && + event.streamId === trackedStreamId, + ); + } else { + // No correlated stream means "resume globally after this event". + agentActivityStarted = true; + } } } else if (options.streamId) { const index = currentEvents.findIndex( @@ -128,29 +155,7 @@ export class AgentSession implements AgentProtocol { if (index !== -1) { replayStartIndex = index; } - } - - if (replayStartIndex !== -1) { - for (let i = replayStartIndex; i < currentEvents.length; i++) { - const event = currentEvents[i]; - if (options.streamId && event.streamId !== options.streamId) continue; - - eventQueue.push(event); - if (event.type === 'agent_start' && !trackedStreamId) { - trackedStreamId = event.streamId ?? undefined; - } - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { - done = true; - break; - } - } - } - - if (!done && !trackedStreamId) { - // Find active stream in history + } else { const activeStarts = currentEvents.filter( (e) => e.type === 'agent_start', ); @@ -162,11 +167,22 @@ export class AgentSession implements AgentProtocol { ) ) { trackedStreamId = start.streamId ?? undefined; + replayStartIndex = currentEvents.findIndex( + (e) => e.id === start.id, + ); break; } } } + if (replayStartIndex !== -1) { + for (let i = replayStartIndex; i < currentEvents.length; i++) { + const event = currentEvents[i]; + queueVisibleEvent(event); + if (done) break; + } + } + // If we replayed to the end and no stream is active, and we were specifically // replaying from an eventId (or we've already finished the stream we were looking for), we are done. if (!done && !trackedStreamId && options.eventId) { @@ -178,19 +194,7 @@ export class AgentSession implements AgentProtocol { // Process events that arrived while we were replaying for (const event of earlyEvents) { if (done) break; - if (trackedStreamId && event.streamId !== trackedStreamId) continue; - if (!trackedStreamId && event.type === 'agent_start') { - trackedStreamId = event.streamId ?? undefined; - } - if (!trackedStreamId && !options.eventId) continue; - - eventQueue.push(event); - if ( - event.type === 'agent_end' && - event.streamId === (trackedStreamId ?? null) - ) { - done = true; - } + queueVisibleEvent(event); } while (true) { @@ -200,6 +204,7 @@ export class AgentSession implements AgentProtocol { for (const event of eventsToYield) { yield event; } + continue; } if (done) break;