!feat(core): harden agent session stream semantics

This commit is contained in:
Adam Weidman
2026-03-20 13:28:37 -04:00
parent 1947629a38
commit db0624c6cd
2 changed files with 60 additions and 18 deletions
@@ -171,6 +171,43 @@ describe('AgentSession', () => {
expect(streamedEvents).toEqual(allEvents.slice(2));
});
it('should complete immediately when resuming from agent_end', async () => {
const protocol = new MockAgentProtocol();
const session = new AgentSession(protocol);
protocol.pushResponse([{ type: 'message' }]);
const { streamId } = await session.send({
message: [{ type: 'text', text: 'request' }],
});
await new Promise((resolve) => setTimeout(resolve, 10));
const endEvent = session.events.findLast(
(event): event is AgentEvent<'agent_end'> =>
event.type === 'agent_end' && event.streamId === streamId,
);
expect(endEvent).toBeDefined();
const iterator = session.stream({ eventId: endEvent!.id })[
Symbol.asyncIterator
]();
await expect(iterator.next()).resolves.toEqual({
value: undefined,
done: true,
});
});
it('should throw for an unknown eventId', async () => {
const protocol = new MockAgentProtocol();
const session = new AgentSession(protocol);
const iterator = session.stream({ eventId: 'missing-event' })[
Symbol.asyncIterator
]();
await expect(iterator.next()).rejects.toThrow(
'Unknown eventId: missing-event',
);
});
it('should resume from an in-stream event within the same stream only', async () => {
const protocol = new MockAgentProtocol();
const session = new AgentSession(protocol);
+23 -18
View File
@@ -129,24 +129,29 @@ export class AgentSession implements AgentProtocol {
if (options.eventId) {
const index = currentEvents.findIndex((e) => e.id === options.eventId);
if (index !== -1) {
const resumeEvent = currentEvents[index];
replayStartIndex = index + 1;
trackedStreamId = resumeEvent?.streamId ?? undefined;
if (trackedStreamId) {
agentActivityStarted =
resumeEvent?.type === 'agent_start' ||
currentEvents
.slice(0, index)
.some(
(event) =>
event.type === 'agent_start' &&
event.streamId === trackedStreamId,
);
} else {
// No correlated stream means "resume globally after this event".
agentActivityStarted = true;
}
if (index === -1) {
throw new Error(`Unknown eventId: ${options.eventId}`);
}
const resumeEvent = currentEvents[index];
replayStartIndex = index + 1;
trackedStreamId = resumeEvent?.streamId ?? undefined;
if (resumeEvent?.type === 'agent_end') {
agentActivityStarted = true;
done = true;
} else if (trackedStreamId) {
agentActivityStarted =
resumeEvent?.type === 'agent_start' ||
currentEvents
.slice(0, index)
.some(
(event) =>
event.type === 'agent_start' &&
event.streamId === trackedStreamId,
);
} else {
// No correlated stream means "resume globally after this event".
agentActivityStarted = true;
}
} else if (options.streamId) {
const index = currentEvents.findIndex(