mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-20 16:53:12 -07:00
fix(core): align wrapper replay semantics
This commit is contained in:
@@ -188,9 +188,9 @@ describe('AgentSession', () => {
|
||||
);
|
||||
expect(endEvent).toBeDefined();
|
||||
|
||||
const iterator = session.stream({ eventId: endEvent!.id })[
|
||||
Symbol.asyncIterator
|
||||
]();
|
||||
const iterator = session
|
||||
.stream({ eventId: endEvent!.id })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).resolves.toEqual({
|
||||
value: undefined,
|
||||
done: true,
|
||||
@@ -201,9 +201,9 @@ describe('AgentSession', () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
const iterator = session.stream({ eventId: 'missing-event' })[
|
||||
Symbol.asyncIterator
|
||||
]();
|
||||
const iterator = session
|
||||
.stream({ eventId: 'missing-event' })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).rejects.toThrow(
|
||||
'Unknown eventId: missing-event',
|
||||
);
|
||||
@@ -222,15 +222,50 @@ describe('AgentSession', () => {
|
||||
);
|
||||
expect(updateEvent).toBeDefined();
|
||||
|
||||
const iterator = session.stream({ eventId: updateEvent!.id })[
|
||||
Symbol.asyncIterator
|
||||
]();
|
||||
const iterator = session
|
||||
.stream({ eventId: updateEvent!.id })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).rejects.toThrow(
|
||||
`Cannot resume from eventId ${updateEvent!.id} before agent_start; use stream({ streamId }) instead`,
|
||||
`Cannot resume from eventId ${updateEvent!.id} before agent_start for stream ${updateEvent!.streamId}`,
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw when resuming from a pre-agent_start event even if agent activity may start later', async () => {
|
||||
it('should replay from agent_start when resuming from a pre-agent_start event after activity is in history', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'hello' }],
|
||||
},
|
||||
]);
|
||||
await session.send({
|
||||
message: [{ type: 'text', text: 'request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const userMessage = session.events.find(
|
||||
(event): event is AgentEvent<'message'> =>
|
||||
event.type === 'message' && event.role === 'user',
|
||||
);
|
||||
expect(userMessage).toBeDefined();
|
||||
|
||||
const streamedEvents: AgentEvent[] = [];
|
||||
for await (const event of session.stream({ eventId: userMessage!.id })) {
|
||||
streamedEvents.push(event);
|
||||
}
|
||||
|
||||
expect(streamedEvents.map((event) => event.type)).toEqual([
|
||||
'agent_start',
|
||||
'message',
|
||||
'agent_end',
|
||||
]);
|
||||
expect(streamedEvents[0]?.streamId).toBe(userMessage!.streamId);
|
||||
});
|
||||
|
||||
it('should throw when resuming from a pre-agent_start event before activity is in history', async () => {
|
||||
const protocol = new MockAgentProtocol([
|
||||
{
|
||||
id: 'e-1',
|
||||
@@ -243,11 +278,11 @@ describe('AgentSession', () => {
|
||||
]);
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
const iterator = session.stream({ eventId: 'e-1' })[
|
||||
Symbol.asyncIterator
|
||||
]();
|
||||
const iterator = session
|
||||
.stream({ eventId: 'e-1' })
|
||||
[Symbol.asyncIterator]();
|
||||
await expect(iterator.next()).rejects.toThrow(
|
||||
'Cannot resume from eventId e-1 before agent_start; use stream({ streamId }) instead',
|
||||
'Cannot resume from eventId e-1 before agent_start for stream stream-1',
|
||||
);
|
||||
});
|
||||
|
||||
@@ -441,6 +476,5 @@ describe('AgentSession', () => {
|
||||
expect(streamedEvents.some((e) => e.type === 'agent_end')).toBe(true);
|
||||
expect(streamedEvents.some((e) => e.streamId === streamId2)).toBe(false);
|
||||
});
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
@@ -131,32 +131,36 @@ export class AgentSession implements AgentProtocol {
|
||||
}
|
||||
|
||||
const resumeEvent = currentEvents[index];
|
||||
replayStartIndex = index + 1;
|
||||
trackedStreamId = resumeEvent.streamId;
|
||||
const streamHasStarted =
|
||||
resumeEvent.type === 'agent_start' ||
|
||||
currentEvents
|
||||
.slice(0, index)
|
||||
.some(
|
||||
(event) =>
|
||||
event.type === 'agent_start' &&
|
||||
event.streamId === trackedStreamId,
|
||||
);
|
||||
const firstAgentStartIndex = currentEvents.findIndex(
|
||||
(event) =>
|
||||
event.type === 'agent_start' && event.streamId === trackedStreamId,
|
||||
);
|
||||
|
||||
if (resumeEvent.type === 'agent_end') {
|
||||
replayStartIndex = index + 1;
|
||||
agentActivityStarted = true;
|
||||
done = true;
|
||||
} else if (streamHasStarted) {
|
||||
} else if (
|
||||
firstAgentStartIndex !== -1 &&
|
||||
firstAgentStartIndex <= index
|
||||
) {
|
||||
replayStartIndex = index + 1;
|
||||
agentActivityStarted = true;
|
||||
} else if (firstAgentStartIndex !== -1) {
|
||||
// A pre-agent_start cursor can be resumed once the corresponding
|
||||
// agent activity is already present in history. Because stream()
|
||||
// yields only agent_start -> agent_end, replay begins at agent_start
|
||||
// rather than at the original pre-start event.
|
||||
replayStartIndex = firstAgentStartIndex;
|
||||
} else {
|
||||
// Consumers can only resume by eventId once the stream has entered the
|
||||
// agent_start -> agent_end lifecycle. For pre-start events, use
|
||||
// stream({ streamId }) instead because this wrapper cannot
|
||||
// distinguish "agent activity will start later" from "this send was
|
||||
// acknowledged without agent activity" without risking an infinite
|
||||
// wait.
|
||||
// Consumers can only resume by eventId once the corresponding stream
|
||||
// has entered the agent_start -> agent_end lifecycle in history.
|
||||
// Without a recorded agent_start, this wrapper cannot distinguish
|
||||
// "agent activity may start later" from "this send was acknowledged
|
||||
// without agent activity" without risking an infinite wait.
|
||||
throw new Error(
|
||||
`Cannot resume from eventId ${options.eventId} before agent_start; use stream({ streamId }) instead`,
|
||||
`Cannot resume from eventId ${options.eventId} before agent_start for stream ${trackedStreamId}`,
|
||||
);
|
||||
}
|
||||
} else if (options.streamId) {
|
||||
|
||||
Reference in New Issue
Block a user