fix: SSE keepalive, request timeout, NoOp task store caching, and Docker tooling

- Add SSE keepalive middleware (15s comment heartbeat) to prevent Cloud Run
  LB from closing idle SSE connections during long tool executions
- Increase A2A server request timeout from 300s to 3600s (1 hour)
- Cache loaded tasks in NoOpTaskStore to prevent redundant GCS workspace
  restores on every SDK event cycle (was restoring 5+ times per request)
- Add undici dispatcher with 10-min timeouts for bridge SSE connections
- Install gh CLI and gcloud CLI in Docker image for agent GitHub/GCP access
This commit is contained in:
Adam Weidman
2026-02-20 10:02:13 -05:00
parent 305a47e5b5
commit bb379102d6
4 changed files with 67 additions and 5 deletions
+10 -1
View File
@@ -3,7 +3,16 @@
FROM docker.io/library/node:20-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 curl git jq ripgrep ca-certificates \
python3 curl git jq ripgrep ca-certificates gpg apt-transport-https \
&& curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg \
| gpg --dearmor -o /usr/share/keyrings/githubcli-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" \
> /etc/apt/sources.list.d/github-cli.list \
&& curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg \
| gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg \
&& echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" \
> /etc/apt/sources.list.d/google-cloud-sdk.list \
&& apt-get update && apt-get install -y --no-install-recommends gh google-cloud-cli \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
WORKDIR /app
@@ -28,9 +28,21 @@ import {
JsonRpcTransportFactory,
} from '@a2a-js/sdk/client';
import { GoogleAuth } from 'google-auth-library';
import { Agent } from 'undici';
import { v4 as uuidv4 } from 'uuid';
import { logger } from '../utils/logger.js';
/**
* Undici agent with long timeouts for SSE streaming.
* Default body/headers timeouts are ~30s which kills idle SSE connections
* when the agent runs long tools (npm install, tsc builds, etc.).
*/
const sseDispatcher = new Agent({
bodyTimeout: 10 * 60 * 1000, // 10 minutes
headersTimeout: 10 * 60 * 1000,
keepAliveTimeout: 10 * 60 * 1000,
});
// Inline A2UI constants so the chat bridge has no dependency on ../a2ui/
const A2UI_EXTENSION_URI = 'https://a2ui.org/a2a-extension/a2ui/v0.10';
const A2UI_MIME_TYPE = 'application/json+a2ui';
@@ -169,8 +181,13 @@ export class A2ABridgeClient {
async initialize(): Promise<void> {
if (this.client) return;
// On Cloud Run, create an authenticated fetch that adds identity tokens
let fetchImpl: typeof fetch = fetch;
// Create fetch wrapper with long SSE timeouts.
// On Cloud Run, also add identity tokens for service-to-service auth.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const baseFetch = (input: any, init?: any) =>
fetch(input, { ...init, dispatcher: sseDispatcher });
let fetchImpl: typeof fetch = baseFetch;
if (process.env['K_SERVICE']) {
const auth = new GoogleAuth();
const idTokenClient = await auth.getIdTokenClient(this.agentUrl);
@@ -180,7 +197,7 @@ export class A2ABridgeClient {
for (const [key, value] of Object.entries(authHeaders)) {
merged.set(key, value);
}
return fetch(input, { ...init, headers: merged });
return baseFetch(input, { ...init, headers: merged });
};
logger.info(
'[ChatBridge] Using Cloud Run identity token for A2A server auth',
+23
View File
@@ -206,6 +206,29 @@ export async function createApp() {
requestStorage.run({ req }, next);
});
// SSE keepalive — sends periodic comment lines to prevent Cloud Run's
// load balancer from closing idle SSE connections during long tool
// executions (npm install, tsc builds, etc.). SSE comments (`: ...`)
// are ignored by conformant parsers per the spec.
expressApp.use((req, res, next) => {
const origFlush = res.flushHeaders;
res.flushHeaders = function (this: express.Response) {
origFlush.call(this);
const ct = this.getHeader('content-type');
if (ct && String(ct).includes('text/event-stream')) {
const timer = setInterval(() => {
if (!res.writableEnded) {
res.write(': keepalive\n\n');
} else {
clearInterval(timer);
}
}, 15_000);
res.on('close', () => clearInterval(timer));
}
};
next();
});
// Google Chat bridge runs as a separate service (src/chat-bridge/server.ts).
// It connects to this A2A server over HTTP.
const appBuilder = new A2AExpressApp(requestHandler);
+14 -1
View File
@@ -345,6 +345,8 @@ export class GCSTaskStore implements TaskStore {
}
export class NoOpTaskStore implements TaskStore {
private cache = new Map<string, SDKTask>();
constructor(private realStore: TaskStore) {}
async save(task: SDKTask): Promise<void> {
@@ -353,9 +355,20 @@ export class NoOpTaskStore implements TaskStore {
}
async load(taskId: string): Promise<SDKTask | undefined> {
const cached = this.cache.get(taskId);
if (cached) {
logger.info(
`[NoOpTaskStore] load called for task ${taskId}, returning cached.`,
);
return cached;
}
logger.info(
`[NoOpTaskStore] load called for task ${taskId}, delegating to real store.`,
);
return this.realStore.load(taskId);
const result = await this.realStore.load(taskId);
if (result) {
this.cache.set(taskId, result);
}
return result;
}
}