diff --git a/packages/a2a-server/Dockerfile b/packages/a2a-server/Dockerfile index 0d4fd8f7b1..bb24fb24e7 100644 --- a/packages/a2a-server/Dockerfile +++ b/packages/a2a-server/Dockerfile @@ -3,7 +3,16 @@ FROM docker.io/library/node:20-slim RUN apt-get update && apt-get install -y --no-install-recommends \ - python3 curl git jq ripgrep ca-certificates \ + python3 curl git jq ripgrep ca-certificates gpg apt-transport-https \ + && curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg \ + | gpg --dearmor -o /usr/share/keyrings/githubcli-archive-keyring.gpg \ + && echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" \ + > /etc/apt/sources.list.d/github-cli.list \ + && curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg \ + | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg \ + && echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" \ + > /etc/apt/sources.list.d/google-cloud-sdk.list \ + && apt-get update && apt-get install -y --no-install-recommends gh google-cloud-cli \ && apt-get clean && rm -rf /var/lib/apt/lists/* WORKDIR /app diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index 80af71657f..0c07e8dc47 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -28,9 +28,21 @@ import { JsonRpcTransportFactory, } from '@a2a-js/sdk/client'; import { GoogleAuth } from 'google-auth-library'; +import { Agent } from 'undici'; import { v4 as uuidv4 } from 'uuid'; import { logger } from '../utils/logger.js'; +/** + * Undici agent with long timeouts for SSE streaming. + * Default body/headers timeouts are ~30s which kills idle SSE connections + * when the agent runs long tools (npm install, tsc builds, etc.). + */ +const sseDispatcher = new Agent({ + bodyTimeout: 10 * 60 * 1000, // 10 minutes + headersTimeout: 10 * 60 * 1000, + keepAliveTimeout: 10 * 60 * 1000, +}); + // Inline A2UI constants so the chat bridge has no dependency on ../a2ui/ const A2UI_EXTENSION_URI = 'https://a2ui.org/a2a-extension/a2ui/v0.10'; const A2UI_MIME_TYPE = 'application/json+a2ui'; @@ -169,8 +181,13 @@ export class A2ABridgeClient { async initialize(): Promise { if (this.client) return; - // On Cloud Run, create an authenticated fetch that adds identity tokens - let fetchImpl: typeof fetch = fetch; + // Create fetch wrapper with long SSE timeouts. + // On Cloud Run, also add identity tokens for service-to-service auth. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const baseFetch = (input: any, init?: any) => + fetch(input, { ...init, dispatcher: sseDispatcher }); + + let fetchImpl: typeof fetch = baseFetch; if (process.env['K_SERVICE']) { const auth = new GoogleAuth(); const idTokenClient = await auth.getIdTokenClient(this.agentUrl); @@ -180,7 +197,7 @@ export class A2ABridgeClient { for (const [key, value] of Object.entries(authHeaders)) { merged.set(key, value); } - return fetch(input, { ...init, headers: merged }); + return baseFetch(input, { ...init, headers: merged }); }; logger.info( '[ChatBridge] Using Cloud Run identity token for A2A server auth', diff --git a/packages/a2a-server/src/http/app.ts b/packages/a2a-server/src/http/app.ts index b20d3051db..5ed173e217 100644 --- a/packages/a2a-server/src/http/app.ts +++ b/packages/a2a-server/src/http/app.ts @@ -206,6 +206,29 @@ export async function createApp() { requestStorage.run({ req }, next); }); + // SSE keepalive — sends periodic comment lines to prevent Cloud Run's + // load balancer from closing idle SSE connections during long tool + // executions (npm install, tsc builds, etc.). SSE comments (`: ...`) + // are ignored by conformant parsers per the spec. + expressApp.use((req, res, next) => { + const origFlush = res.flushHeaders; + res.flushHeaders = function (this: express.Response) { + origFlush.call(this); + const ct = this.getHeader('content-type'); + if (ct && String(ct).includes('text/event-stream')) { + const timer = setInterval(() => { + if (!res.writableEnded) { + res.write(': keepalive\n\n'); + } else { + clearInterval(timer); + } + }, 15_000); + res.on('close', () => clearInterval(timer)); + } + }; + next(); + }); + // Google Chat bridge runs as a separate service (src/chat-bridge/server.ts). // It connects to this A2A server over HTTP. const appBuilder = new A2AExpressApp(requestHandler); diff --git a/packages/a2a-server/src/persistence/gcs.ts b/packages/a2a-server/src/persistence/gcs.ts index 2747bc054b..f97995195f 100644 --- a/packages/a2a-server/src/persistence/gcs.ts +++ b/packages/a2a-server/src/persistence/gcs.ts @@ -345,6 +345,8 @@ export class GCSTaskStore implements TaskStore { } export class NoOpTaskStore implements TaskStore { + private cache = new Map(); + constructor(private realStore: TaskStore) {} async save(task: SDKTask): Promise { @@ -353,9 +355,20 @@ export class NoOpTaskStore implements TaskStore { } async load(taskId: string): Promise { + const cached = this.cache.get(taskId); + if (cached) { + logger.info( + `[NoOpTaskStore] load called for task ${taskId}, returning cached.`, + ); + return cached; + } logger.info( `[NoOpTaskStore] load called for task ${taskId}, delegating to real store.`, ); - return this.realStore.load(taskId); + const result = await this.realStore.load(taskId); + if (result) { + this.cache.set(taskId, result); + } + return result; } }