diff --git a/packages/a2a-server/k8s/deployment.yaml b/packages/a2a-server/k8s/deployment.yaml index a3d5573d5b..e2911d25d7 100644 --- a/packages/a2a-server/k8s/deployment.yaml +++ b/packages/a2a-server/k8s/deployment.yaml @@ -34,8 +34,6 @@ spec: key: api-key - name: GEMINI_YOLO_MODE value: "true" - - name: CHAT_BRIDGE_A2A_URL - value: "http://localhost:8080" - name: NODE_ENV value: "production" resources: @@ -64,8 +62,10 @@ metadata: name: gemini-a2a-server labels: app: gemini-a2a-server + annotations: + networking.gke.io/load-balancer-type: "Internal" spec: - type: ClusterIP + type: LoadBalancer selector: app: gemini-a2a-server ports: diff --git a/packages/a2a-server/src/agent/executor.ts b/packages/a2a-server/src/agent/executor.ts index 3dd5de5dd4..a2bc1a44b8 100644 --- a/packages/a2a-server/src/agent/executor.ts +++ b/packages/a2a-server/src/agent/executor.ts @@ -488,7 +488,15 @@ export class CoderAgentExecutor implements AgentExecutor { logger.info(`[CoderAgentExecutor] A2UI enabled for task ${taskId}`); } - if (['canceled', 'failed', 'completed'].includes(currentTask.taskState)) { + if (currentTask.taskState === 'canceled') { + // Allow resuming canceled tasks — user wants to redirect after /esc. + // The workspace and conversation history are preserved, so the LLM + // picks up with full context of prior interactions. + logger.info( + `[CoderAgentExecutor] Resuming canceled task ${taskId}. Resetting state to submitted.`, + ); + currentTask.taskState = 'submitted'; + } else if (['failed', 'completed'].includes(currentTask.taskState)) { logger.warn( `[CoderAgentExecutor] Attempted to execute task ${taskId} which is already in state ${currentTask.taskState}. Ignoring.`, ); @@ -623,7 +631,10 @@ export class CoderAgentExecutor implements AgentExecutor { logger.warn(`[CoderAgentExecutor] Task ${taskId} execution aborted.`); currentTask.cancelPendingTools('Execution aborted'); if ( - currentTask.taskState !== 'canceled' && + // cancelTask() can set state to 'canceled' concurrently, so we + // need to re-check at runtime despite TypeScript's narrowing. + + (currentTask.taskState as string) !== 'canceled' && currentTask.taskState !== 'failed' ) { currentTask.setTaskStateAndPublishUpdate( diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index b52c6a3d5a..2ef27a9299 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -38,9 +38,9 @@ import { logger } from '../utils/logger.js'; * when the agent runs long tools (npm install, tsc builds, etc.). */ const sseDispatcher = new Agent({ - bodyTimeout: 10 * 60 * 1000, // 10 minutes - headersTimeout: 10 * 60 * 1000, - keepAliveTimeout: 10 * 60 * 1000, + bodyTimeout: 60 * 60 * 1000, // 1 hour — matches Cloud Run request timeout + headersTimeout: 60 * 60 * 1000, + keepAliveTimeout: 60 * 60 * 1000, }); // Inline A2UI constants so the chat bridge has no dependency on ../a2ui/ diff --git a/packages/a2a-server/src/chat-bridge/default-agents.ts b/packages/a2a-server/src/chat-bridge/default-agents.ts new file mode 100644 index 0000000000..42fb57cd8d --- /dev/null +++ b/packages/a2a-server/src/chat-bridge/default-agents.ts @@ -0,0 +1,39 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { promises as fs } from 'node:fs'; +import * as path from 'node:path'; +import { homedir } from '@google/gemini-cli-core'; +import { logger } from '../utils/logger.js'; + +/** + * Seeds a remote agent definition for the GKE worker at the user level + * (~/.gemini/agents/) so it bypasses the project-level acknowledgment check. + */ +export async function ensureDefaultAgents(gkeAgentUrl?: string): Promise { + if (!gkeAgentUrl) return; + + // Seed remote agent definition at user level (~/.gemini/agents/) + // User-level agents are registered directly without acknowledgment, + // which is required for headless server mode. + const agentsDir = path.join(homedir(), '.gemini', 'agents'); + await fs.mkdir(agentsDir, { recursive: true }); + + const agentMd = `--- +kind: remote +name: gke-worker +agent_card_url: ${gkeAgentUrl.replace(/\/$/, '')}/.well-known/agent-card.json +--- +Long-running worker agent on GKE. Delegate tasks that will take more than 30 minutes, +such as running evals, large builds, or extensive test suites. This agent runs on a +persistent server with no timeout limit. Tell it to run commands in the background +(nohup/&) so it can respond immediately and you can check back for status. +`; + await fs.writeFile(path.join(agentsDir, 'gke-worker.md'), agentMd); + logger.info( + `[DefaultAgents] Seeded gke-worker agent definition at ${agentsDir}/gke-worker.md`, + ); +} diff --git a/packages/a2a-server/src/chat-bridge/default-gemini-md.ts b/packages/a2a-server/src/chat-bridge/default-gemini-md.ts index 8c333bcf08..ca27bb7767 100644 --- a/packages/a2a-server/src/chat-bridge/default-gemini-md.ts +++ b/packages/a2a-server/src/chat-bridge/default-gemini-md.ts @@ -24,6 +24,11 @@ You are Gemini CLI running as an A2A server in headless mode. Your output is dis - Use markdown formatting (headers, lists, code blocks) for readability. - Keep responses under 3000 characters when possible. - Be concise and direct. + +## Long-Running Tasks +For tasks that will take more than 30 minutes (evals, large builds, extensive test suites), +delegate to the gke-worker agent. Tell it to run the command in the background (using nohup +or &) so it responds quickly. You can then check back on its status with follow-up messages. `; /** diff --git a/packages/a2a-server/src/chat-bridge/handler.ts b/packages/a2a-server/src/chat-bridge/handler.ts index a4647e2445..a5e4fb4adc 100644 --- a/packages/a2a-server/src/chat-bridge/handler.ts +++ b/packages/a2a-server/src/chat-bridge/handler.ts @@ -40,6 +40,15 @@ export class ChatBridgeHandler { private initialized = false; /** Full webhook URL for card button actions (HTTP Add-ons need a URL, not a function name). */ private webhookUrl: string | undefined; + /** + * Tracks active background work per thread. A self-request to /internal/keepalive + * awaits the promise, giving Cloud Run an active incoming request so it won't + * kill the instance while work is in progress. + */ + private workCompletionMap = new Map< + string, + { promise: Promise; resolve: () => void } + >(); constructor( private config: ChatBridgeConfig, @@ -200,7 +209,11 @@ export class ChatBridgeHandler { logger.info( `[ChatBridge] Task cancelled via /esc for thread ${threadName}`, ); - return this.pushAndReturn(spaceName, threadName, 'Task cancelled.'); + return this.pushAndReturn( + spaceName, + threadName, + 'Task stopped. Send a new message to continue with updated directions.', + ); } else { return this.pushAndReturn( spaceName, @@ -319,6 +332,25 @@ export class ChatBridgeHandler { const spaceName = event.space.name; session.asyncProcessing = true; + session.cancelled = false; + + // Fire keepalive self-request (same as processMessageAsync) + let resolveApprovalWork: () => void; + const approvalWorkPromise = new Promise((r) => { + resolveApprovalWork = r; + }); + this.workCompletionMap.set(threadName, { + promise: approvalWorkPromise, + resolve: resolveApprovalWork!, + }); + const approvalPort = process.env['PORT'] || '8080'; + fetch(`http://localhost:${approvalPort}/internal/keepalive`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ threadName }), + }).catch((err) => { + logger.warn(`[ChatBridge] Keepalive self-request failed: ${err}`); + }); try { // Stream the tool confirmation to collect text as it arrives @@ -340,6 +372,7 @@ export class ChatBridgeHandler { let lastContextId: string | undefined; let lastState: string | undefined; let sentResponse = false; + let textSnapshotAtLastTool: string | undefined; for await (const streamEvent of stream) { if (session.cancelled) break; @@ -353,6 +386,11 @@ export class ChatBridgeHandler { lastText = extracted.text; } + // Track tool activity for text snapshot + if (extracted.toolApprovals.length > 0 && lastText) { + textSnapshotAtLastTool = lastText; + } + // Check for new tool approvals const pending = extracted.toolApprovals.filter( (a) => a.status === 'awaiting_approval', @@ -370,7 +408,12 @@ export class ChatBridgeHandler { lastContextId = autoResult.lastContextId; if (autoResult.lastTaskId) lastTaskId = autoResult.lastTaskId; if (autoResult.lastState) lastState = autoResult.lastState; - if (autoResult.text) lastText = autoResult.text; + if (autoResult.text) { + lastText = autoResult.text; + if (autoResult.textSnapshotAtLastTool) { + textSnapshotAtLastTool = autoResult.textSnapshotAtLastTool; + } + } } else { // Non-YOLO: push approval card session.pendingToolApproval = { @@ -430,12 +473,24 @@ export class ChatBridgeHandler { cardsV2: [activityCard], }); } - // The text here is already post-approval, so send it directly + // Strip intermediate "I will..." narration — send only the final answer + let finalText = lastText; + if ( + textSnapshotAtLastTool && + lastText.startsWith(textSnapshotAtLastTool) + ) { + const postToolText = lastText + .substring(textSnapshotAtLastTool.length) + .trim(); + if (postToolText) { + finalText = postToolText; + } + } await this.chatApiClient.sendMessage(spaceName, threadName, { - text: lastText, + text: finalText, }); logger.info( - `[ChatBridge] Pushed post-approval response (${lastText.length} chars, ${tracker.count} activity entries): "${lastText.substring(0, 200)}"`, + `[ChatBridge] Pushed post-approval response (${finalText.length} chars, ${tracker.count} activity entries): "${finalText.substring(0, 200)}"`, ); } } catch (error) { @@ -451,6 +506,11 @@ export class ChatBridgeHandler { } finally { session.asyncProcessing = false; session.cancelled = false; + const approvalEntry = this.workCompletionMap.get(threadName); + if (approvalEntry) { + approvalEntry.resolve(); + this.workCompletionMap.delete(threadName); + } } } @@ -468,6 +528,30 @@ export class ChatBridgeHandler { const spaceName = event.space.name; session.asyncProcessing = true; + // Clear any stale cancellation flag from a previous /esc. + // The old processMessageAsync may still be winding down (blocked on its + // SSE stream), so its finally block hasn't cleared cancelled yet. + session.cancelled = false; + + // Fire a self-request to keep this Cloud Run instance alive. + // Cloud Run only tracks active incoming HTTP requests — without this, + // it may kill the instance during scale-down or deployment rollouts. + let resolveWork: () => void; + const workPromise = new Promise((r) => { + resolveWork = r; + }); + this.workCompletionMap.set(threadName, { + promise: workPromise, + resolve: resolveWork!, + }); + const port = process.env['PORT'] || '8080'; + fetch(`http://localhost:${port}/internal/keepalive`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ threadName }), + }).catch((err) => { + logger.warn(`[ChatBridge] Keepalive self-request failed: ${err}`); + }); try { // Retry streaming if the A2A server returns 500 (no available instance). @@ -560,6 +644,11 @@ export class ChatBridgeHandler { existing.label, tool.status, ); + // Capture text at tool status changes (e.g., completion) so the + // post-tool text stripping at the end works correctly. Without this, + // "I will..." narration between tool detection and completion leaks + // into the final response. + lastTextSnapshotAtTool = lastText; } } else { // New tool — capture any narration text before this tool as an activity entry. @@ -677,7 +766,7 @@ export class ChatBridgeHandler { // Handle pending approvals (only relevant when server sent input-required) // Use the text snapshot from when the last tool appeared — everything after // that is the actual final response the user cares about. - const textBeforeTools = lastTextSnapshotAtTool || lastText; + let textBeforeTools = lastTextSnapshotAtTool || lastText; if (latestPendingApprovals.length > 0 && lastState === 'input-required') { if (session.yoloMode) { @@ -692,7 +781,16 @@ export class ChatBridgeHandler { lastContextId = autoApproved.lastContextId; if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId; if (autoApproved.lastState) lastState = autoApproved.lastState; - if (autoApproved.text) lastText = autoApproved.text; + if (autoApproved.text) { + lastText = autoApproved.text; + // Update textBeforeTools with the snapshot from the last tool + // activity inside autoApproveTools. Without this, textBeforeTools + // is stale (captured before auto-approval ran) and all "I will..." + // narration from auto-approval rounds leaks into the final output. + if (autoApproved.textSnapshotAtLastTool) { + textBeforeTools = autoApproved.textSnapshotAtLastTool; + } + } } else { // Non-YOLO: push approval card and wait for user input const firstApproval = latestPendingApprovals[0]; @@ -813,6 +911,23 @@ export class ChatBridgeHandler { } finally { session.asyncProcessing = false; session.cancelled = false; + // Release the keepalive self-request so Cloud Run can reclaim the instance. + const entry = this.workCompletionMap.get(threadName); + if (entry) { + entry.resolve(); + this.workCompletionMap.delete(threadName); + } + } + } + + /** + * Waits for background work on a thread to complete. Called by the + * /internal/keepalive endpoint to keep the Cloud Run instance alive. + */ + async waitForWork(threadName: string): Promise { + const entry = this.workCompletionMap.get(threadName); + if (entry) { + await entry.promise; } } @@ -885,12 +1000,16 @@ export class ChatBridgeHandler { lastTaskId?: string; lastState?: string; text?: string; + textSnapshotAtLastTool?: string; }> { let approvalsToProcess = initialApprovals; let lastContextId = contextId; let lastTaskId: string | undefined; let lastState: string | undefined; let lastText: string | undefined; + // Track text at the last tool activity — used by the caller to strip + // intermediate "I will..." narration from the final output. + let textSnapshotAtLastTool: string | undefined; const approvedNames: string[] = []; const MAX_ROUNDS = 20; @@ -931,6 +1050,13 @@ export class ChatBridgeHandler { lastText = extracted.text; } + // Track tool activity for text snapshot — any event with tool surfaces + // means tools are still running; capture the text at this point so the + // caller can strip all intermediate narration. + if (extracted.toolApprovals.length > 0 && lastText) { + textSnapshotAtLastTool = lastText; + } + logger.info( `[ChatBridge] YOLO event #${eventCount}: kind=${event.kind}, ` + `state=${extracted.state ?? 'n/a'}, text=${extracted.text.length} chars`, @@ -958,7 +1084,13 @@ export class ChatBridgeHandler { `[ChatBridge] YOLO auto-approved ${approvedNames.length} tools: ${approvedNames.join(', ')}`, ); - return { lastContextId, lastTaskId, lastState, text: lastText }; + return { + lastContextId, + lastTaskId, + lastState, + text: lastText, + textSnapshotAtLastTool, + }; } /** @@ -1040,8 +1172,7 @@ export class ChatBridgeHandler { `I can:\n` + `- Generate code from natural language\n` + `- Edit files and run commands\n` + - `- Answer questions about code\n\n` + - `I'll ask for your approval before executing tools.`, + `- Answer questions about code`, }; } diff --git a/packages/a2a-server/src/chat-bridge/response-renderer.ts b/packages/a2a-server/src/chat-bridge/response-renderer.ts index f76f51f78b..a6a42f3125 100644 --- a/packages/a2a-server/src/chat-bridge/response-renderer.ts +++ b/packages/a2a-server/src/chat-bridge/response-renderer.ts @@ -77,9 +77,28 @@ export function renderResponse( threadName?: string, webhookUrl?: string, ): ChatResponse { - const parts = extractAllParts(response); - const textContent = extractTextFromParts(parts); - const a2uiMessageGroups = extractA2UIParts(parts); + // Extract A2UI surfaces from ALL parts (including history) for tool approvals, + // thoughts, and agent response surfaces. + const allParts = extractAllParts(response); + const a2uiMessageGroups = extractA2UIParts(allParts); + + // Extract plain text ONLY from the final response — not from history. + // History contains intermediate "I will..." status updates that should not + // be concatenated into the final output. + const finalParts: Part[] = []; + if (response.kind === 'task') { + if (response.status?.message?.parts) { + finalParts.push(...response.status.message.parts); + } + if (response.artifacts) { + for (const artifact of response.artifacts) { + finalParts.push(...(artifact.parts ?? [])); + } + } + } else if (response.kind === 'message') { + finalParts.push(...(response.parts ?? [])); + } + const textContent = extractTextFromParts(finalParts); // Parse A2UI surfaces for known types const toolApprovals: ToolApprovalInfo[] = []; diff --git a/packages/a2a-server/src/chat-bridge/routes.ts b/packages/a2a-server/src/chat-bridge/routes.ts index d5609dc8b4..b603971050 100644 --- a/packages/a2a-server/src/chat-bridge/routes.ts +++ b/packages/a2a-server/src/chat-bridge/routes.ts @@ -352,6 +352,28 @@ export function createChatBridgeRoutes(config: ChatBridgeConfig): Router { }, ); + // Internal keepalive endpoint — called by the bridge itself to keep the + // Cloud Run instance alive while background work (processMessageAsync) runs. + // The request stays open until the work completes, giving Cloud Run an active + // incoming request to track. No auth needed since it's localhost-only. + router.post('/internal/keepalive', async (req: Request, res: Response) => { + const threadName = + typeof req.body?.threadName === 'string' ? req.body.threadName : ''; + if (!threadName) { + res.status(400).json({ error: 'Missing threadName' }); + return; + } + logger.info(`[ChatBridge] Keepalive started for thread ${threadName}`); + try { + await handler.waitForWork(threadName); + logger.info(`[ChatBridge] Keepalive released for thread ${threadName}`); + res.json({ ok: true }); + } catch (err) { + logger.warn(`[ChatBridge] Keepalive error: ${err}`); + res.status(500).json({ error: 'Keepalive failed' }); + } + }); + // Health check endpoint for the chat bridge (no auth required) router.get('/chat/health', (_req: Request, res: Response) => { res.json({ diff --git a/packages/a2a-server/src/config/config.ts b/packages/a2a-server/src/config/config.ts index ae0e136b65..27acecf22e 100644 --- a/packages/a2a-server/src/config/config.ts +++ b/packages/a2a-server/src/config/config.ts @@ -7,6 +7,7 @@ import * as fs from 'node:fs'; import * as path from 'node:path'; import * as dotenv from 'dotenv'; +import { execSync } from 'node:child_process'; import type { TelemetryTarget } from '@google/gemini-cli-core'; import { @@ -30,6 +31,7 @@ import { import { logger } from '../utils/logger.js'; import { ensureDefaultGeminiMd } from '../chat-bridge/default-gemini-md.js'; +import { ensureDefaultAgents } from '../chat-bridge/default-agents.js'; import type { Settings } from './settings.js'; import { type AgentSettings, CoderAgentEvent } from '../types.js'; @@ -58,6 +60,20 @@ export async function loadConfig( } } + // Configure git to use GITHUB_TOKEN for HTTPS push/pull if available. + if (process.env['GITHUB_TOKEN']) { + try { + execSync( + `git config --global credential.helper '!f() { echo "password=$GITHUB_TOKEN"; }; f'`, + ); + logger.info( + '[Config] Configured git credential helper with GITHUB_TOKEN', + ); + } catch (e) { + logger.warn(`[Config] Failed to configure git credential helper: ${e}`); + } + } + const configParams: ConfigParameters = { sessionId: taskId, model: process.env['GEMINI_MODEL'] || PREVIEW_GEMINI_MODEL, @@ -106,11 +122,13 @@ export async function loadConfig( interactive: true, enableInteractiveShell: true, ptyInfo: 'auto', + enableAgents: !!process.env['GKE_AGENT_URL'], }; // Ensure a base GEMINI.md exists in the workspace so the agent gets // default behavior instructions. Does not overwrite user-created files. await ensureDefaultGeminiMd(workspaceDir); + await ensureDefaultAgents(process.env['GKE_AGENT_URL']); const fileService = new FileDiscoveryService(workspaceDir, { respectGitIgnore: configParams?.fileFiltering?.respectGitIgnore, diff --git a/packages/core/src/agents/registry.ts b/packages/core/src/agents/registry.ts index 85747c3964..d39dc2855c 100644 --- a/packages/core/src/agents/registry.ts +++ b/packages/core/src/agents/registry.ts @@ -355,20 +355,27 @@ export class AgentRegistry { // Log remote A2A agent registration for visibility. try { const clientManager = A2AClientManager.getInstance(); - // Use ADCHandler to ensure we can load agents hosted on secure platforms (e.g. Vertex AI) - const authHandler = new ADCHandler(); - const agentCard = await clientManager.loadAgent( - definition.name, - definition.agentCardUrl, - authHandler, - ); + // Check if the A2A client is already loaded (singleton persists across + // Config instances in long-lived processes like servers). + let agentCard = clientManager.getAgentCard(definition.name); + if (!agentCard) { + // Use ADCHandler to ensure we can load agents hosted on secure platforms (e.g. Vertex AI) + const authHandler = new ADCHandler(); + agentCard = await clientManager.loadAgent( + definition.name, + definition.agentCardUrl, + authHandler, + ); + } if (agentCard.skills && agentCard.skills.length > 0) { - definition.description = agentCard.skills + const skillsDescription = agentCard.skills .map( (skill: { name: string; description: string }) => `${skill.name}: ${skill.description}`, ) .join('\n'); + definition.description = + definition.description + '\n' + skillsDescription; } if (this.config.getDebugMode()) { debugLogger.log(