mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-20 00:32:31 -07:00
!feat(core): harden agent session stream semantics
This commit is contained in:
@@ -171,6 +171,67 @@ describe('AgentSession', () => {
|
||||
expect(streamedEvents).toEqual(allEvents.slice(2));
|
||||
});
|
||||
|
||||
it('should resume from an in-stream event within the same stream only', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'first answer 1' }],
|
||||
},
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'first answer 2' }],
|
||||
},
|
||||
]);
|
||||
const { streamId: streamId1 } = await session.send({
|
||||
message: [{ type: 'text', text: 'first request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
protocol.pushResponse([
|
||||
{
|
||||
type: 'message',
|
||||
role: 'agent',
|
||||
content: [{ type: 'text', text: 'second answer' }],
|
||||
},
|
||||
]);
|
||||
await session.send({
|
||||
message: [{ type: 'text', text: 'second request' }],
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const resumeEvent = session.events.find(
|
||||
(event): event is AgentEvent<'message'> =>
|
||||
event.type === 'message' &&
|
||||
event.streamId === streamId1 &&
|
||||
event.role === 'agent' &&
|
||||
event.content[0]?.type === 'text' &&
|
||||
event.content[0].text === 'first answer 1',
|
||||
);
|
||||
expect(resumeEvent).toBeDefined();
|
||||
|
||||
const streamedEvents: AgentEvent[] = [];
|
||||
for await (const event of session.stream({ eventId: resumeEvent!.id })) {
|
||||
streamedEvents.push(event);
|
||||
}
|
||||
|
||||
expect(
|
||||
streamedEvents.every((event) => event.streamId === streamId1),
|
||||
).toBe(true);
|
||||
expect(streamedEvents.map((event) => event.type)).toEqual([
|
||||
'message',
|
||||
'agent_end',
|
||||
]);
|
||||
const resumedMessage = streamedEvents[0] as AgentEvent<'message'>;
|
||||
expect(resumedMessage.content).toEqual([
|
||||
{ type: 'text', text: 'first answer 2' },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should replay events for streamId starting with agent_start', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
@@ -223,6 +284,33 @@ describe('AgentSession', () => {
|
||||
expect(streamedEvents.at(-1)?.type).toBe('agent_end');
|
||||
});
|
||||
|
||||
it('should not drop agent_end that arrives while replay events are being yielded', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
protocol.pushResponse([{ type: 'message' }], { keepOpen: true });
|
||||
const { streamId } = await session.send({ update: { title: 't1' } });
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const iterator = session
|
||||
.stream({ streamId: streamId! })
|
||||
[Symbol.asyncIterator]();
|
||||
|
||||
const first = await iterator.next();
|
||||
expect(first.value?.type).toBe('agent_start');
|
||||
|
||||
protocol.pushToStream(streamId!, [], { close: true });
|
||||
|
||||
const second = await iterator.next();
|
||||
expect(second.value?.type).toBe('message');
|
||||
|
||||
const third = await iterator.next();
|
||||
expect(third.value?.type).toBe('agent_end');
|
||||
|
||||
const fourth = await iterator.next();
|
||||
expect(fourth.done).toBe(true);
|
||||
});
|
||||
|
||||
it('should follow an active stream if no options provided', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
@@ -77,6 +77,33 @@ export class AgentSession implements AgentProtocol {
|
||||
let done = false;
|
||||
let trackedStreamId = options.streamId;
|
||||
let started = false;
|
||||
let agentActivityStarted = false;
|
||||
|
||||
const queueVisibleEvent = (event: AgentEvent): void => {
|
||||
if (trackedStreamId && event.streamId !== trackedStreamId) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!agentActivityStarted) {
|
||||
if (event.type !== 'agent_start') {
|
||||
return;
|
||||
}
|
||||
trackedStreamId = event.streamId ?? trackedStreamId ?? undefined;
|
||||
agentActivityStarted = true;
|
||||
}
|
||||
|
||||
if (!trackedStreamId && !options.eventId) {
|
||||
return;
|
||||
}
|
||||
|
||||
eventQueue.push(event);
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
}
|
||||
};
|
||||
|
||||
// 1. Subscribe early to avoid missing any events that occur during replay setup
|
||||
const unsubscribe = this._protocol.subscribe((event) => {
|
||||
@@ -87,23 +114,7 @@ export class AgentSession implements AgentProtocol {
|
||||
return;
|
||||
}
|
||||
|
||||
if (trackedStreamId && event.streamId !== trackedStreamId) return;
|
||||
|
||||
// If we don't have a tracked stream yet, the first agent_start we see becomes it.
|
||||
if (!trackedStreamId && event.type === 'agent_start') {
|
||||
trackedStreamId = event.streamId ?? undefined;
|
||||
}
|
||||
|
||||
// If we still don't have a tracked stream and we aren't replaying everything (eventId), ignore.
|
||||
if (!trackedStreamId && !options.eventId) return;
|
||||
|
||||
eventQueue.push(event);
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
}
|
||||
queueVisibleEvent(event);
|
||||
|
||||
const currentResolve = resolve;
|
||||
next = new Promise<void>((r) => {
|
||||
@@ -119,7 +130,23 @@ export class AgentSession implements AgentProtocol {
|
||||
if (options.eventId) {
|
||||
const index = currentEvents.findIndex((e) => e.id === options.eventId);
|
||||
if (index !== -1) {
|
||||
const resumeEvent = currentEvents[index];
|
||||
replayStartIndex = index + 1;
|
||||
trackedStreamId = resumeEvent?.streamId ?? undefined;
|
||||
if (trackedStreamId) {
|
||||
agentActivityStarted =
|
||||
resumeEvent?.type === 'agent_start' ||
|
||||
currentEvents
|
||||
.slice(0, index)
|
||||
.some(
|
||||
(event) =>
|
||||
event.type === 'agent_start' &&
|
||||
event.streamId === trackedStreamId,
|
||||
);
|
||||
} else {
|
||||
// No correlated stream means "resume globally after this event".
|
||||
agentActivityStarted = true;
|
||||
}
|
||||
}
|
||||
} else if (options.streamId) {
|
||||
const index = currentEvents.findIndex(
|
||||
@@ -128,29 +155,7 @@ export class AgentSession implements AgentProtocol {
|
||||
if (index !== -1) {
|
||||
replayStartIndex = index;
|
||||
}
|
||||
}
|
||||
|
||||
if (replayStartIndex !== -1) {
|
||||
for (let i = replayStartIndex; i < currentEvents.length; i++) {
|
||||
const event = currentEvents[i];
|
||||
if (options.streamId && event.streamId !== options.streamId) continue;
|
||||
|
||||
eventQueue.push(event);
|
||||
if (event.type === 'agent_start' && !trackedStreamId) {
|
||||
trackedStreamId = event.streamId ?? undefined;
|
||||
}
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!done && !trackedStreamId) {
|
||||
// Find active stream in history
|
||||
} else {
|
||||
const activeStarts = currentEvents.filter(
|
||||
(e) => e.type === 'agent_start',
|
||||
);
|
||||
@@ -162,11 +167,22 @@ export class AgentSession implements AgentProtocol {
|
||||
)
|
||||
) {
|
||||
trackedStreamId = start.streamId ?? undefined;
|
||||
replayStartIndex = currentEvents.findIndex(
|
||||
(e) => e.id === start.id,
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (replayStartIndex !== -1) {
|
||||
for (let i = replayStartIndex; i < currentEvents.length; i++) {
|
||||
const event = currentEvents[i];
|
||||
queueVisibleEvent(event);
|
||||
if (done) break;
|
||||
}
|
||||
}
|
||||
|
||||
// If we replayed to the end and no stream is active, and we were specifically
|
||||
// replaying from an eventId (or we've already finished the stream we were looking for), we are done.
|
||||
if (!done && !trackedStreamId && options.eventId) {
|
||||
@@ -178,19 +194,7 @@ export class AgentSession implements AgentProtocol {
|
||||
// Process events that arrived while we were replaying
|
||||
for (const event of earlyEvents) {
|
||||
if (done) break;
|
||||
if (trackedStreamId && event.streamId !== trackedStreamId) continue;
|
||||
if (!trackedStreamId && event.type === 'agent_start') {
|
||||
trackedStreamId = event.streamId ?? undefined;
|
||||
}
|
||||
if (!trackedStreamId && !options.eventId) continue;
|
||||
|
||||
eventQueue.push(event);
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
done = true;
|
||||
}
|
||||
queueVisibleEvent(event);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
@@ -200,6 +204,7 @@ export class AgentSession implements AgentProtocol {
|
||||
for (const event of eventsToYield) {
|
||||
yield event;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (done) break;
|
||||
|
||||
Reference in New Issue
Block a user