mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-04-20 18:14:29 -07:00
fix(core): harden AgentSession replay semantics (#23548)
This commit is contained in:
@@ -117,6 +117,7 @@ describe('AgentSession', () => {
|
||||
expect(events).toHaveLength(0);
|
||||
expect(protocol.events).toHaveLength(1);
|
||||
expect(protocol.events[0].type).toBe('session_update');
|
||||
expect(protocol.events[0].streamId).toEqual(expect.any(String));
|
||||
});
|
||||
|
||||
it('should skip events that occur before agent_start', async () => {
|
||||
@@ -171,6 +172,181 @@ describe('AgentSession', () => {
|
||||
expect(streamedEvents).toEqual(allEvents.slice(2));
|
||||
});
|
||||
|
||||
it('should complete immediately when resuming from agent_end', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([{ type: 'message' }]);
|
||||
const { streamId } = await session.send({
|
||||
message: [{ type: 'text', text: 'request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const endEvent = session.events.findLast(
|
||||
(event): event is AgentEvent<'agent_end'> =>
|
||||
event.type === 'agent_end' && event.streamId === streamId,
|
||||
);
|
||||
expect(endEvent).toBeDefined();
|
||||
|
||||
const iterator = session
|
||||
.stream({ eventId: endEvent!.id })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).resolves.toEqual({
|
||||
value: undefined,
|
||||
done: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw for an unknown eventId', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
const iterator = session
|
||||
.stream({ eventId: 'missing-event' })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).rejects.toThrow(
|
||||
'Unknown eventId: missing-event',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw when resuming from an event before agent_start on a stream with no agent activity', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
const { streamId } = await session.send({ update: { title: 'draft' } });
|
||||
expect(streamId).toBeNull();
|
||||
|
||||
const updateEvent = session.events.find(
|
||||
(event): event is AgentEvent<'session_update'> =>
|
||||
event.type === 'session_update',
|
||||
);
|
||||
expect(updateEvent).toBeDefined();
|
||||
|
||||
const iterator = session
|
||||
.stream({ eventId: updateEvent!.id })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).rejects.toThrow(
|
||||
`Cannot resume from eventId ${updateEvent!.id} before agent_start for stream ${updateEvent!.streamId}`,
|
||||
);
|
||||
});
|
||||
|
||||
it('should replay from agent_start when resuming from a pre-agent_start event after activity is in history', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'hello' }],
|
||||
},
|
||||
]);
|
||||
await session.send({
|
||||
message: [{ type: 'text', text: 'request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const userMessage = session.events.find(
|
||||
(event): event is AgentEvent<'message'> =>
|
||||
event.type === 'message' && event.role === 'user',
|
||||
);
|
||||
expect(userMessage).toBeDefined();
|
||||
|
||||
const streamedEvents: AgentEvent[] = [];
|
||||
for await (const event of session.stream({ eventId: userMessage!.id })) {
|
||||
streamedEvents.push(event);
|
||||
}
|
||||
|
||||
expect(streamedEvents.map((event) => event.type)).toEqual([
|
||||
'agent_start',
|
||||
'message',
|
||||
'agent_end',
|
||||
]);
|
||||
expect(streamedEvents[0]?.streamId).toBe(userMessage!.streamId);
|
||||
});
|
||||
|
||||
it('should throw when resuming from a pre-agent_start event before activity is in history', async () => {
|
||||
const protocol = new MockAgentProtocol([
|
||||
{
|
||||
id: 'e-1',
|
||||
timestamp: '2026-01-01T00:00:00.000Z',
|
||||
streamId: 'stream-1',
|
||||
type: 'message',
|
||||
role: 'user',
|
||||
content: [{ type: 'text', text: 'request' }],
|
||||
},
|
||||
]);
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
const iterator = session
|
||||
.stream({ eventId: 'e-1' })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).rejects.toThrow(
|
||||
'Cannot resume from eventId e-1 before agent_start for stream stream-1',
|
||||
);
|
||||
});
|
||||
|
||||
it('should resume from an in-stream event within the same stream only', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'first answer 1' }],
|
||||
},
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'first answer 2' }],
|
||||
},
|
||||
]);
|
||||
const { streamId: streamId1 } = await session.send({
|
||||
message: [{ type: 'text', text: 'first request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
protocol.pushResponse([
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'second answer' }],
|
||||
},
|
||||
]);
|
||||
await session.send({
|
||||
message: [{ type: 'text', text: 'second request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const resumeEvent = session.events.find(
|
||||
(event): event is AgentEvent<'message'> =>
|
||||
event.type === 'message' &&
|
||||
event.streamId === streamId1 &&
|
||||
event.role === 'agent' &&
|
||||
event.content[0]?.type === 'text' &&
|
||||
event.content[0].text === 'first answer 1',
|
||||
);
|
||||
expect(resumeEvent).toBeDefined();
|
||||
|
||||
const streamedEvents: AgentEvent[] = [];
|
||||
for await (const event of session.stream({ eventId: resumeEvent!.id })) {
|
||||
streamedEvents.push(event);
|
||||
}
|
||||
|
||||
expect(
|
||||
streamedEvents.every((event) => event.streamId === streamId1),
|
||||
).toBe(true);
|
||||
expect(streamedEvents.map((event) => event.type)).toEqual([
|
||||
'message',
|
||||
'agent_end',
|
||||
]);
|
||||
const resumedMessage = streamedEvents[0] as AgentEvent<'message'>;
|
||||
expect(resumedMessage.content).toEqual([
|
||||
{ type: 'text', text: 'first answer 2' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should replay events for streamId starting with agent_start', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
@@ -223,6 +399,33 @@ describe('AgentSession', () => {
|
||||
expect(streamedEvents.at(-1)?.type).toBe('agent_end');
|
||||
});
|
||||
|
||||
it('should not drop agent_end that arrives while replay events are being yielded', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([{ type: 'message' }], { keepOpen: true });
|
||||
const { streamId } = await session.send({ update: { title: 't1' } });
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const iterator = session
|
||||
.stream({ streamId: streamId! })
|
||||
[Symbol.asyncIterator]();
|
||||
|
||||
const first = await iterator.next();
|
||||
expect(first.value?.type).toBe('agent_start');
|
||||
|
||||
protocol.pushToStream(streamId!, [], { close: true });
|
||||
|
||||
const second = await iterator.next();
|
||||
expect(second.value?.type).toBe('message');
|
||||
|
||||
const third = await iterator.next();
|
||||
expect(third.value?.type).toBe('agent_end');
|
||||
|
||||
const fourth = await iterator.next();
|
||||
expect(fourth.done).toBe(true);
|
||||
});
|
||||
|
||||
it('should follow an active stream if no options provided', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
@@ -34,7 +34,7 @@ export class AgentSession implements AgentProtocol {
|
||||
return this._protocol.abort();
|
||||
}
|
||||
|
||||
get events(): AgentEvent[] {
|
||||
get events(): readonly AgentEvent[] {
|
||||
return this._protocol.events;
|
||||
}
|
||||
|
||||
@@ -77,6 +77,30 @@ export class AgentSession implements AgentProtocol {
|
||||
let done = false;
|
||||
let trackedStreamId = options.streamId;
|
||||
let started = false;
|
||||
let agentActivityStarted = false;
|
||||
|
||||
const queueVisibleEvent = (event: AgentEvent): void => {
|
||||
if (trackedStreamId && event.streamId !== trackedStreamId) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!agentActivityStarted) {
|
||||
if (event.type !== 'agent_start') {
|
||||
return;
|
||||
}
|
||||
trackedStreamId = event.streamId;
|
||||
agentActivityStarted = true;
|
||||
}
|
||||
|
||||
if (!trackedStreamId) {
|
||||
return;
|
||||
}
|
||||
|
||||
eventQueue.push(event);
|
||||
if (event.type === 'agent_end' && event.streamId === trackedStreamId) {
|
||||
done = true;
|
||||
}
|
||||
};
|
||||
|
||||
// 1. Subscribe early to avoid missing any events that occur during replay setup
|
||||
const unsubscribe = this._protocol.subscribe((event) => {
|
||||
@@ -87,23 +111,7 @@ export class AgentSession implements AgentProtocol {
|
||||
return;
|
||||
}
|
||||
|
||||
if (trackedStreamId && event.streamId !== trackedStreamId) return;
|
||||
|
||||
// If we don't have a tracked stream yet, the first agent_start we see becomes it.
|
||||
if (!trackedStreamId && event.type === 'agent_start') {
|
||||
trackedStreamId = event.streamId ?? undefined;
|
||||
}
|
||||
|
||||
// If we still don't have a tracked stream and we aren't replaying everything (eventId), ignore.
|
||||
if (!trackedStreamId && !options.eventId) return;
|
||||
|
||||
eventQueue.push(event);
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
}
|
||||
queueVisibleEvent(event);
|
||||
|
||||
const currentResolve = resolve;
|
||||
next = new Promise<void>((r) => {
|
||||
@@ -118,8 +126,42 @@ export class AgentSession implements AgentProtocol {
|
||||
|
||||
if (options.eventId) {
|
||||
const index = currentEvents.findIndex((e) => e.id === options.eventId);
|
||||
if (index !== -1) {
|
||||
if (index === -1) {
|
||||
throw new Error(`Unknown eventId: ${options.eventId}`);
|
||||
}
|
||||
|
||||
const resumeEvent = currentEvents[index];
|
||||
trackedStreamId = resumeEvent.streamId;
|
||||
const firstAgentStartIndex = currentEvents.findIndex(
|
||||
(event) =>
|
||||
event.type === 'agent_start' && event.streamId === trackedStreamId,
|
||||
);
|
||||
|
||||
if (resumeEvent.type === 'agent_end') {
|
||||
replayStartIndex = index + 1;
|
||||
agentActivityStarted = true;
|
||||
done = true;
|
||||
} else if (
|
||||
firstAgentStartIndex !== -1 &&
|
||||
firstAgentStartIndex <= index
|
||||
) {
|
||||
replayStartIndex = index + 1;
|
||||
agentActivityStarted = true;
|
||||
} else if (firstAgentStartIndex !== -1) {
|
||||
// A pre-agent_start cursor can be resumed once the corresponding
|
||||
// agent activity is already present in history. Because stream()
|
||||
// yields only agent_start -> agent_end, replay begins at agent_start
|
||||
// rather than at the original pre-start event.
|
||||
replayStartIndex = firstAgentStartIndex;
|
||||
} else {
|
||||
// Consumers can only resume by eventId once the corresponding stream
|
||||
// has entered the agent_start -> agent_end lifecycle in history.
|
||||
// Without a recorded agent_start, this wrapper cannot distinguish
|
||||
// "agent activity may start later" from "this send was acknowledged
|
||||
// without agent activity" without risking an infinite wait.
|
||||
throw new Error(
|
||||
`Cannot resume from eventId ${options.eventId} before agent_start for stream ${trackedStreamId}`,
|
||||
);
|
||||
}
|
||||
} else if (options.streamId) {
|
||||
const index = currentEvents.findIndex(
|
||||
@@ -128,29 +170,7 @@ export class AgentSession implements AgentProtocol {
|
||||
if (index !== -1) {
|
||||
replayStartIndex = index;
|
||||
}
|
||||
}
|
||||
|
||||
if (replayStartIndex !== -1) {
|
||||
for (let i = replayStartIndex; i < currentEvents.length; i++) {
|
||||
const event = currentEvents[i];
|
||||
if (options.streamId && event.streamId !== options.streamId) continue;
|
||||
|
||||
eventQueue.push(event);
|
||||
if (event.type === 'agent_start' && !trackedStreamId) {
|
||||
trackedStreamId = event.streamId ?? undefined;
|
||||
}
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!done && !trackedStreamId) {
|
||||
// Find active stream in history
|
||||
} else {
|
||||
const activeStarts = currentEvents.filter(
|
||||
(e) => e.type === 'agent_start',
|
||||
);
|
||||
@@ -161,36 +181,28 @@ export class AgentSession implements AgentProtocol {
|
||||
(e) => e.type === 'agent_end' && e.streamId === start.streamId,
|
||||
)
|
||||
) {
|
||||
trackedStreamId = start.streamId ?? undefined;
|
||||
trackedStreamId = start.streamId;
|
||||
replayStartIndex = currentEvents.findIndex(
|
||||
(e) => e.id === start.id,
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we replayed to the end and no stream is active, and we were specifically
|
||||
// replaying from an eventId (or we've already finished the stream we were looking for), we are done.
|
||||
if (!done && !trackedStreamId && options.eventId) {
|
||||
done = true;
|
||||
if (replayStartIndex !== -1) {
|
||||
for (let i = replayStartIndex; i < currentEvents.length; i++) {
|
||||
const event = currentEvents[i];
|
||||
queueVisibleEvent(event);
|
||||
if (done) break;
|
||||
}
|
||||
}
|
||||
|
||||
started = true;
|
||||
|
||||
// Process events that arrived while we were replaying
|
||||
for (const event of earlyEvents) {
|
||||
if (done) break;
|
||||
if (trackedStreamId && event.streamId !== trackedStreamId) continue;
|
||||
if (!trackedStreamId && event.type === 'agent_start') {
|
||||
trackedStreamId = event.streamId ?? undefined;
|
||||
}
|
||||
if (!trackedStreamId && !options.eventId) continue;
|
||||
|
||||
eventQueue.push(event);
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
}
|
||||
queueVisibleEvent(event);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
@@ -200,6 +212,7 @@ export class AgentSession implements AgentProtocol {
|
||||
for (const event of eventsToYield) {
|
||||
yield event;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (done) break;
|
||||
|
||||
@@ -235,7 +235,7 @@ describe('MockAgentProtocol', () => {
|
||||
expect(streamId).toBeNull();
|
||||
expect(session.events).toHaveLength(1);
|
||||
expect(session.events[0].type).toBe('session_update');
|
||||
expect(session.events[0].streamId).toBeNull();
|
||||
expect(session.events[0].streamId).toEqual(expect.any(String));
|
||||
});
|
||||
|
||||
it('should throw on action', async () => {
|
||||
|
||||
@@ -8,8 +8,8 @@ import type {
|
||||
AgentEvent,
|
||||
AgentEventCommon,
|
||||
AgentEventData,
|
||||
AgentSend,
|
||||
AgentProtocol,
|
||||
AgentSend,
|
||||
Unsubscribe,
|
||||
} from './types.js';
|
||||
|
||||
@@ -86,13 +86,7 @@ export class MockAgentProtocol implements AgentProtocol {
|
||||
) {
|
||||
const now = new Date().toISOString();
|
||||
for (const eventData of events) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const event: AgentEvent = {
|
||||
...eventData,
|
||||
id: eventData.id ?? `e-${this._nextEventId++}`,
|
||||
timestamp: eventData.timestamp ?? now,
|
||||
streamId: eventData.streamId ?? streamId,
|
||||
} as AgentEvent;
|
||||
const event = this._normalizeEvent(eventData, now, streamId);
|
||||
this._emit(event);
|
||||
}
|
||||
|
||||
@@ -100,13 +94,13 @@ export class MockAgentProtocol implements AgentProtocol {
|
||||
options?.close &&
|
||||
!events.some((eventData) => eventData.type === 'agent_end')
|
||||
) {
|
||||
this._emit({
|
||||
id: `e-${this._nextEventId++}`,
|
||||
timestamp: now,
|
||||
streamId,
|
||||
type: 'agent_end',
|
||||
reason: 'completed',
|
||||
} as AgentEvent);
|
||||
this._emit(
|
||||
this._normalizeEvent(
|
||||
{ type: 'agent_end', reason: 'completed' },
|
||||
now,
|
||||
streamId,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,16 +118,18 @@ export class MockAgentProtocol implements AgentProtocol {
|
||||
|
||||
const now = new Date().toISOString();
|
||||
const eventsToEmit: AgentEvent[] = [];
|
||||
let fallbackStreamId: string | undefined;
|
||||
|
||||
// Helper to normalize and prepare for emission
|
||||
// All emitted events stay correlated to a stream even if this send does not
|
||||
// start agent activity and therefore returns `streamId: null`.
|
||||
const normalize = (eventData: MockAgentEvent): AgentEvent =>
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
({
|
||||
...eventData,
|
||||
id: eventData.id ?? `e-${this._nextEventId++}`,
|
||||
timestamp: eventData.timestamp ?? now,
|
||||
streamId: eventData.streamId ?? streamId,
|
||||
}) as AgentEvent;
|
||||
this._normalizeEvent(
|
||||
eventData,
|
||||
now,
|
||||
eventData.streamId ??
|
||||
streamId ??
|
||||
(fallbackStreamId ??= `mock-stream-${this._nextStreamId++}`),
|
||||
);
|
||||
|
||||
// 1. User/Update event (BEFORE agent_start)
|
||||
if ('message' in payload && payload.message) {
|
||||
@@ -225,16 +221,32 @@ export class MockAgentProtocol implements AgentProtocol {
|
||||
return { streamId };
|
||||
}
|
||||
|
||||
private _normalizeEvent(
|
||||
eventData: MockAgentEvent,
|
||||
timestamp: string,
|
||||
streamId: string,
|
||||
): AgentEvent {
|
||||
// TypeScript loses the specific union member when we add common event
|
||||
// fields here, so keep the narrowing local to this mock-only helper.
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return {
|
||||
...eventData,
|
||||
id: eventData.id ?? `e-${this._nextEventId++}`,
|
||||
timestamp: eventData.timestamp ?? timestamp,
|
||||
streamId: eventData.streamId ?? streamId,
|
||||
} as AgentEvent;
|
||||
}
|
||||
|
||||
async abort(): Promise<void> {
|
||||
if (this._lastStreamId && this._activeStreamIds.has(this._lastStreamId)) {
|
||||
const streamId = this._lastStreamId;
|
||||
this._emit({
|
||||
id: `e-${this._nextEventId++}`,
|
||||
timestamp: new Date().toISOString(),
|
||||
streamId,
|
||||
type: 'agent_end',
|
||||
reason: 'aborted',
|
||||
} as AgentEvent);
|
||||
this._emit(
|
||||
this._normalizeEvent(
|
||||
{ type: 'agent_end', reason: 'aborted' },
|
||||
new Date().toISOString(),
|
||||
streamId,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,9 +11,10 @@ export type Unsubscribe = () => void;
|
||||
export interface AgentProtocol extends Trajectory {
|
||||
/**
|
||||
* Send data to the agent. Promise resolves when action is acknowledged.
|
||||
* Returns the `streamId` of the stream the message was correlated to --
|
||||
* this may be a new stream if idle, an existing stream, or null if no
|
||||
* stream was triggered.
|
||||
* Returns the agent-activity `streamId` affected by the send. This may be a
|
||||
* new stream if idle, an existing stream, or null if the send was
|
||||
* acknowledged without starting agent activity. Emitted events should still
|
||||
* remain correlated to a stream via their `streamId`.
|
||||
*
|
||||
* When a new stream is created by a send, the streamId MUST be returned
|
||||
* before the `agent_start` event is emitted for the stream.
|
||||
@@ -36,7 +37,7 @@ export interface AgentProtocol extends Trajectory {
|
||||
/**
|
||||
* AgentProtocol implements the Trajectory interface and can retrieve existing events.
|
||||
*/
|
||||
readonly events: AgentEvent[];
|
||||
readonly events: readonly AgentEvent[];
|
||||
}
|
||||
|
||||
type RequireExactlyOne<T> = {
|
||||
@@ -54,7 +55,7 @@ interface AgentSendPayloads {
|
||||
export type AgentSend = RequireExactlyOne<AgentSendPayloads> & WithMeta;
|
||||
|
||||
export interface Trajectory {
|
||||
readonly events: AgentEvent[];
|
||||
readonly events: readonly AgentEvent[];
|
||||
}
|
||||
|
||||
export interface AgentEventCommon {
|
||||
@@ -62,8 +63,8 @@ export interface AgentEventCommon {
|
||||
id: string;
|
||||
/** Identifies the subagent thread, omitted for "main thread" events. */
|
||||
threadId?: string;
|
||||
/** Identifies a particular stream of a particular thread. */
|
||||
streamId?: string | null;
|
||||
/** Identifies the stream this event belongs to. */
|
||||
streamId: string;
|
||||
/** ISO Timestamp for the time at which the event occurred. */
|
||||
timestamp: string;
|
||||
/** The concrete type of the event. */
|
||||
|
||||
Reference in New Issue
Block a user