mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-16 23:02:51 -07:00
!feat(core): correlate all protocol events to streams
This commit is contained in:
@@ -117,6 +117,7 @@ describe('AgentSession', () => {
|
||||
expect(events).toHaveLength(0);
|
||||
expect(protocol.events).toHaveLength(1);
|
||||
expect(protocol.events[0].type).toBe('session_update');
|
||||
expect(protocol.events[0].streamId).toEqual(expect.any(String));
|
||||
});
|
||||
|
||||
it('should skip events that occur before agent_start', async () => {
|
||||
@@ -208,6 +209,28 @@ describe('AgentSession', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('should complete immediately when resuming from an event on a stream with no agent activity', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
|
||||
const { streamId } = await session.send({ update: { title: 'draft' } });
|
||||
expect(streamId).toBeNull();
|
||||
|
||||
const updateEvent = session.events.find(
|
||||
(event): event is AgentEvent<'session_update'> =>
|
||||
event.type === 'session_update',
|
||||
);
|
||||
expect(updateEvent).toBeDefined();
|
||||
|
||||
const iterator = session.stream({ eventId: updateEvent!.id })[
|
||||
Symbol.asyncIterator
|
||||
]();
|
||||
await expect(iterator.next()).resolves.toEqual({
|
||||
value: undefined,
|
||||
done: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('should resume from an in-stream event within the same stream only', async () => {
|
||||
const protocol = new MockAgentProtocol();
|
||||
const session = new AgentSession(protocol);
|
||||
@@ -398,5 +421,6 @@ describe('AgentSession', () => {
|
||||
expect(streamedEvents.some((e) => e.type === 'agent_end')).toBe(true);
|
||||
expect(streamedEvents.some((e) => e.streamId === streamId2)).toBe(false);
|
||||
});
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
@@ -34,7 +34,7 @@ export class AgentSession implements AgentProtocol {
|
||||
return this._protocol.abort();
|
||||
}
|
||||
|
||||
get events(): AgentEvent[] {
|
||||
get events(): readonly AgentEvent[] {
|
||||
return this._protocol.events;
|
||||
}
|
||||
|
||||
@@ -88,19 +88,16 @@ export class AgentSession implements AgentProtocol {
|
||||
if (event.type !== 'agent_start') {
|
||||
return;
|
||||
}
|
||||
trackedStreamId = event.streamId ?? trackedStreamId ?? undefined;
|
||||
trackedStreamId = event.streamId;
|
||||
agentActivityStarted = true;
|
||||
}
|
||||
|
||||
if (!trackedStreamId && !options.eventId) {
|
||||
if (!trackedStreamId) {
|
||||
return;
|
||||
}
|
||||
|
||||
eventQueue.push(event);
|
||||
if (
|
||||
event.type === 'agent_end' &&
|
||||
event.streamId === (trackedStreamId ?? null)
|
||||
) {
|
||||
if (event.type === 'agent_end' && event.streamId === trackedStreamId) {
|
||||
done = true;
|
||||
}
|
||||
};
|
||||
@@ -135,23 +132,32 @@ export class AgentSession implements AgentProtocol {
|
||||
|
||||
const resumeEvent = currentEvents[index];
|
||||
replayStartIndex = index + 1;
|
||||
trackedStreamId = resumeEvent?.streamId ?? undefined;
|
||||
if (resumeEvent?.type === 'agent_end') {
|
||||
trackedStreamId = resumeEvent.streamId;
|
||||
const streamHasStarted =
|
||||
resumeEvent.type === 'agent_start' ||
|
||||
currentEvents
|
||||
.slice(0, index)
|
||||
.some(
|
||||
(event) =>
|
||||
event.type === 'agent_start' &&
|
||||
event.streamId === trackedStreamId,
|
||||
);
|
||||
|
||||
if (resumeEvent.type === 'agent_end') {
|
||||
agentActivityStarted = true;
|
||||
done = true;
|
||||
} else if (trackedStreamId) {
|
||||
agentActivityStarted =
|
||||
resumeEvent?.type === 'agent_start' ||
|
||||
currentEvents
|
||||
.slice(0, index)
|
||||
.some(
|
||||
(event) =>
|
||||
event.type === 'agent_start' &&
|
||||
event.streamId === trackedStreamId,
|
||||
);
|
||||
} else {
|
||||
// No correlated stream means "resume globally after this event".
|
||||
} else if (streamHasStarted) {
|
||||
agentActivityStarted = true;
|
||||
} else if (
|
||||
!currentEvents
|
||||
.slice(index + 1)
|
||||
.some(
|
||||
(event) =>
|
||||
event.type === 'agent_start' &&
|
||||
event.streamId === trackedStreamId,
|
||||
)
|
||||
) {
|
||||
done = true;
|
||||
}
|
||||
} else if (options.streamId) {
|
||||
const index = currentEvents.findIndex(
|
||||
@@ -171,7 +177,7 @@ export class AgentSession implements AgentProtocol {
|
||||
(e) => e.type === 'agent_end' && e.streamId === start.streamId,
|
||||
)
|
||||
) {
|
||||
trackedStreamId = start.streamId ?? undefined;
|
||||
trackedStreamId = start.streamId;
|
||||
replayStartIndex = currentEvents.findIndex(
|
||||
(e) => e.id === start.id,
|
||||
);
|
||||
@@ -187,13 +193,6 @@ export class AgentSession implements AgentProtocol {
|
||||
if (done) break;
|
||||
}
|
||||
}
|
||||
|
||||
// If we replayed to the end and no stream is active, and we were specifically
|
||||
// replaying from an eventId (or we've already finished the stream we were looking for), we are done.
|
||||
if (!done && !trackedStreamId && options.eventId) {
|
||||
done = true;
|
||||
}
|
||||
|
||||
started = true;
|
||||
|
||||
// Process events that arrived while we were replaying
|
||||
|
||||
@@ -235,7 +235,7 @@ describe('MockAgentProtocol', () => {
|
||||
expect(streamId).toBeNull();
|
||||
expect(session.events).toHaveLength(1);
|
||||
expect(session.events[0].type).toBe('session_update');
|
||||
expect(session.events[0].streamId).toBeNull();
|
||||
expect(session.events[0].streamId).toEqual(expect.any(String));
|
||||
});
|
||||
|
||||
it('should throw on action', async () => {
|
||||
|
||||
@@ -118,21 +118,24 @@ export class MockAgentProtocol implements AgentProtocol {
|
||||
|
||||
// If there were queued responses (even if empty array), we trigger a stream.
|
||||
const hasResponseEvents = responseData !== undefined;
|
||||
const streamId = hasResponseEvents
|
||||
? (response[0]?.streamId ?? `mock-stream-${this._nextStreamId++}`)
|
||||
: null;
|
||||
let correlationStreamId: string | undefined;
|
||||
const getCorrelationStreamId = (): string =>
|
||||
(correlationStreamId ??=
|
||||
response[0]?.streamId ?? `mock-stream-${this._nextStreamId++}`);
|
||||
const streamId = hasResponseEvents ? getCorrelationStreamId() : null;
|
||||
|
||||
const now = new Date().toISOString();
|
||||
const eventsToEmit: AgentEvent[] = [];
|
||||
|
||||
// Helper to normalize and prepare for emission
|
||||
// All emitted events stay correlated to a stream even if this send does not
|
||||
// start agent activity and therefore returns `streamId: null`.
|
||||
const normalize = (eventData: MockAgentEvent): AgentEvent =>
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
({
|
||||
...eventData,
|
||||
id: eventData.id ?? `e-${this._nextEventId++}`,
|
||||
timestamp: eventData.timestamp ?? now,
|
||||
streamId: eventData.streamId ?? streamId,
|
||||
streamId: eventData.streamId ?? getCorrelationStreamId(),
|
||||
}) as AgentEvent;
|
||||
|
||||
// 1. User/Update event (BEFORE agent_start)
|
||||
|
||||
@@ -11,9 +11,10 @@ export type Unsubscribe = () => void;
|
||||
export interface AgentProtocol extends Trajectory {
|
||||
/**
|
||||
* Send data to the agent. Promise resolves when action is acknowledged.
|
||||
* Returns the `streamId` of the stream the message was correlated to --
|
||||
* this may be a new stream if idle, an existing stream, or null if no
|
||||
* stream was triggered.
|
||||
* Returns the agent-activity `streamId` affected by the send. This may be a
|
||||
* new stream if idle, an existing stream, or null if the send was
|
||||
* acknowledged without starting agent activity. Emitted events should still
|
||||
* remain correlated to a stream via their `streamId`.
|
||||
*
|
||||
* When a new stream is created by a send, the streamId MUST be returned
|
||||
* before the `agent_start` event is emitted for the stream.
|
||||
@@ -36,7 +37,7 @@ export interface AgentProtocol extends Trajectory {
|
||||
/**
|
||||
* AgentProtocol implements the Trajectory interface and can retrieve existing events.
|
||||
*/
|
||||
readonly events: AgentEvent[];
|
||||
readonly events: readonly AgentEvent[];
|
||||
}
|
||||
|
||||
type RequireExactlyOne<T> = {
|
||||
@@ -54,7 +55,7 @@ interface AgentSendPayloads {
|
||||
export type AgentSend = RequireExactlyOne<AgentSendPayloads> & WithMeta;
|
||||
|
||||
export interface Trajectory {
|
||||
readonly events: AgentEvent[];
|
||||
readonly events: readonly AgentEvent[];
|
||||
}
|
||||
|
||||
export interface AgentEventCommon {
|
||||
@@ -62,8 +63,8 @@ export interface AgentEventCommon {
|
||||
id: string;
|
||||
/** Identifies the subagent thread, omitted for "main thread" events. */
|
||||
threadId?: string;
|
||||
/** Identifies a particular stream of a particular thread. */
|
||||
streamId?: string | null;
|
||||
/** Identifies the stream this event belongs to. */
|
||||
streamId: string;
|
||||
/** ISO Timestamp for the time at which the event occurred. */
|
||||
timestamp: string;
|
||||
/** The concrete type of the event. */
|
||||
|
||||
Reference in New Issue
Block a user