mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-15 06:12:50 -07:00
396 lines
17 KiB
Python
396 lines
17 KiB
Python
"""
|
|
Purpose: A unified pipeline that performs end-to-end effort analysis on a dataset of GitHub issues.
|
|
It combines agentic deep analysis, single-turn fallbacks, heuristic validation, and CSV export in a single, efficient execution, avoiding redundant file operations.
|
|
"""
|
|
import argparse
|
|
import json
|
|
import urllib.request
|
|
import os
|
|
import subprocess
|
|
import re
|
|
import concurrent.futures
|
|
import threading
|
|
import csv
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
MODEL = "gemini-3-flash-preview"
|
|
file_lock = threading.Lock()
|
|
|
|
# --- VALIDATION HEURISTICS ---
|
|
LARGE_KEYWORDS = [
|
|
'windows', 'win32', 'wsl', 'wsl2', 'pty', 'pseudo-terminal', 'child_process', 'spawn', 'sigint', 'sigterm',
|
|
'memory leak', 'performance', 'boot time', 'infinite loop', 'hangs', 'freezes', 'crashes', 'race condition',
|
|
'intermittent', 'sometimes', 'flickering', 'a2a', 'mcp protocol', 'scheduler', 'event loop', 'websocket',
|
|
'stream', 'throughput', 'concurrency', 'deadlock', 'file descriptor', 'architecture', 'refactor'
|
|
]
|
|
|
|
MEDIUM_KEYWORDS = [
|
|
'react', 'hook', 'useeffect', 'usestate', 'usememo', 'ink', 'tui', 'ui state', 'parser', 'markdown',
|
|
'regex', 'regular expression', 'ansi', 'escape sequence', 'toml', 'schema', 'validation', 'zod',
|
|
'promise', 'async', 'await', 'unhandled', 'rejection', 'config', 'settings', 'env', 'environment',
|
|
'path resolution', 'symlink', 'git', 'telemetry', 'logging', 'format', 'display', 'rendering',
|
|
'clipboard', 'copy', 'paste', 'bracketed', 'interactive', 'dialog', 'modal', 'focus'
|
|
]
|
|
|
|
tools_decl = [
|
|
{
|
|
"functionDeclarations": [
|
|
{
|
|
"name": "search_codebase",
|
|
"description": "Search the project directory for a string using grep. Returns matching lines and file paths.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {
|
|
"pattern": {"type": "STRING", "description": "The text pattern to search for"}
|
|
},
|
|
"required": ["pattern"]
|
|
}
|
|
},
|
|
{
|
|
"name": "read_file",
|
|
"description": "Read a specific file to understand its context.",
|
|
"parameters": {
|
|
"type": "OBJECT",
|
|
"properties": {
|
|
"filepath": {"type": "STRING", "description": "The path to the file"}
|
|
},
|
|
"required": ["filepath"]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
def call_gemini(messages, url):
|
|
data = {
|
|
"contents": messages,
|
|
"tools": tools_decl,
|
|
"generationConfig": {"temperature": 0.1}
|
|
}
|
|
req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'})
|
|
with urllib.request.urlopen(req, timeout=120) as response:
|
|
return json.loads(response.read().decode('utf-8'))
|
|
|
|
def execute_tool(call, project_path):
|
|
name = call['name']
|
|
args = call.get('args', {})
|
|
|
|
if name == 'search_codebase':
|
|
pattern = args.get('pattern', '').replace('"', '\\"')
|
|
try:
|
|
cmd = f'grep -rn "{pattern}" "{project_path}" | grep -vE "node_modules|dist|build|\\.test\\." | head -n 20'
|
|
res = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT)
|
|
return res if res else "No matches found."
|
|
except subprocess.CalledProcessError as e:
|
|
return e.output if e.output else "No matches found."
|
|
elif name == 'read_file':
|
|
filepath = args.get('filepath', '')
|
|
if not filepath.startswith('/'):
|
|
filepath = os.path.join(project_path, filepath)
|
|
|
|
try:
|
|
if not os.path.exists(filepath):
|
|
basename = os.path.basename(filepath)
|
|
find_cmd = f'find "{project_path}" -name "{basename}" | head -n 1'
|
|
found_path = subprocess.check_output(find_cmd, shell=True, text=True).strip()
|
|
if found_path: filepath = found_path
|
|
else: return f"File {filepath} not found."
|
|
|
|
cmd = f'head -n 300 "{filepath}"'
|
|
res = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT)
|
|
return res
|
|
except Exception as e:
|
|
return str(e)
|
|
return "Unknown tool"
|
|
|
|
def analyze_issue_agentic(issue, url, project_path):
|
|
system_instruction = """You are a senior software engineer analyzing bug/feature reports.
|
|
You MUST use the provided tools to investigate the codebase and pinpoint exactly which files and logic are responsible.
|
|
DO NOT GUESS.
|
|
|
|
Rating Effort Level:
|
|
- small (1 day): Localized fix/change (1-2 files), clear logic.
|
|
- medium (2-3 days): Harder to trace, state management, touches multiple components.
|
|
- large (>3 days): Architectural issues, core protocol changes, or very complex multi-package bugs.
|
|
|
|
REPRODUCTION RULE:
|
|
If a bug is hard to reproduce (specific OS, complex setup, intermittent/flickering), it MUST NOT be rated as small.
|
|
|
|
Output format (ONLY valid JSON, NO markdown):
|
|
{
|
|
"analysis": "technical analysis of root cause and fix",
|
|
"effort_level": "small|medium|large",
|
|
"reasoning": "justification with specific files/lines/logic you found",
|
|
"recommended_implementation": "code snippets or specific logic changes (only if small)"
|
|
}
|
|
"""
|
|
prompt = f"{system_instruction}\n\nIssue Title: {issue.get('title')}\nIssue Body: {issue.get('body', '')[:1500]}"
|
|
messages = [{"role": "user", "parts": [{"text": prompt}]}]
|
|
|
|
for turn in range(15): # Limit turns to 15 for efficiency in unified loop
|
|
try:
|
|
res = call_gemini(messages, url)
|
|
candidate = res['candidates'][0]['content']
|
|
parts = candidate.get('parts', [])
|
|
|
|
if 'role' not in candidate: candidate['role'] = 'model'
|
|
messages.append(candidate)
|
|
|
|
function_calls = [p for p in parts if 'functionCall' in p]
|
|
|
|
if function_calls:
|
|
tool_responses = []
|
|
for fcall in function_calls:
|
|
call_data = fcall['functionCall']
|
|
result = execute_tool(call_data, project_path)
|
|
tool_responses.append({
|
|
"functionResponse": {
|
|
"name": call_data['name'],
|
|
"response": {"result": result[:5000]}
|
|
}
|
|
})
|
|
messages.append({"role": "user", "parts": tool_responses})
|
|
else:
|
|
text = parts[0].get('text', '')
|
|
if not text: continue
|
|
text = text.replace('```json', '').replace('```', '').strip()
|
|
return json.loads(text)
|
|
except Exception as e:
|
|
break
|
|
|
|
return {"analysis": "Failed to analyze autonomously", "effort_level": "medium", "reasoning": "Agent loop exceeded limit or errored."}
|
|
|
|
def extract_keywords(text):
|
|
words = re.findall(r'\b[A-Z][a-zA-Z0-9]+\b|\b\w+\.tsx?\b|\b\w+Service\b|\b\w+Command\b', text)
|
|
words = list(set([w for w in words if len(w) > 4]))
|
|
return words[:8]
|
|
|
|
def search_codebase_static(keywords, project_path):
|
|
context = ""
|
|
for kw in keywords:
|
|
try:
|
|
kw_clean = kw.replace('"', '\\"')
|
|
cmd = f'grep -rn "{kw_clean}" "{project_path}" | grep -vE "node_modules|dist|build|\\.test\\." | head -n 8'
|
|
out = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT)
|
|
if out:
|
|
context += f"\n--- Matches for {kw_clean} ---\n{out}\n"
|
|
except:
|
|
pass
|
|
return context
|
|
|
|
def analyze_issue_single_turn(issue, url, project_path):
|
|
title = issue.get('title', '')
|
|
body = issue.get('body', '')[:1500]
|
|
|
|
keywords = extract_keywords(title + " " + body)
|
|
code_context = search_codebase_static(keywords, project_path)
|
|
|
|
prompt = f"""You are a senior software engineer analyzing issues.
|
|
Based on the description and codebase search context, pinpoint exactly which files and logic are responsible.
|
|
DO NOT GUESS. If the context isn't enough, provide your best technical hypothesis.
|
|
|
|
Rating Effort Level:
|
|
- small (1 day): Localized fix (1-2 files), clear cause.
|
|
- medium (2-3 days): Touches multiple components or hard to trace.
|
|
- large (>3 days): Architectural issues, Windows/WSL-specific, core protocols.
|
|
|
|
Issue Title: {title}
|
|
Issue Body: {body}
|
|
|
|
Codebase Search Context:
|
|
{code_context[:8000]}
|
|
|
|
Output ONLY valid JSON (no markdown block):
|
|
{{
|
|
"analysis": "technical analysis of root cause and fix",
|
|
"effort_level": "small|medium|large",
|
|
"reasoning": "justification with specific files/lines found"
|
|
}}
|
|
"""
|
|
data = {
|
|
"contents": [{"role": "user", "parts": [{"text": prompt}]}],
|
|
"generationConfig": {"temperature": 0.1}
|
|
}
|
|
|
|
try:
|
|
req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'})
|
|
with urllib.request.urlopen(req, timeout=60) as response:
|
|
res = json.loads(response.read().decode('utf-8'))
|
|
txt = res['candidates'][0]['content']['parts'][0]['text']
|
|
txt = txt.replace('```json', '').replace('```', '').strip()
|
|
return json.loads(txt)
|
|
except Exception as e:
|
|
return {"analysis": "Failed to analyze autonomously", "effort_level": "medium", "reasoning": str(e)}
|
|
|
|
# --- VALIDATION ---
|
|
def find_files_in_text(text):
|
|
matches = re.findall(r'([\w\.\/\-]+\.(?:ts|tsx|js|json|md))', text)
|
|
return set([m for m in matches if not m.startswith('http')])
|
|
|
|
def resolve_file(filename, project_path):
|
|
if os.path.exists(os.path.join(project_path, filename)):
|
|
return os.path.join(project_path, filename)
|
|
basename = os.path.basename(filename)
|
|
for root, dirs, files in os.walk(project_path):
|
|
if '.git' in root or 'node_modules' in root: continue
|
|
if basename in files: return os.path.join(root, basename)
|
|
return None
|
|
|
|
def validate_effort(issue, project_path):
|
|
title = issue.get('title', '').lower()
|
|
body = issue.get('body', '').lower()
|
|
analysis = issue.get('analysis', '').lower()
|
|
reasoning = issue.get('reasoning', '').lower()
|
|
|
|
combined_text = f"{title} {body} {analysis} {reasoning}"
|
|
|
|
potential_files = find_files_in_text(combined_text)
|
|
actual_files = []
|
|
total_lines = 0
|
|
|
|
for f in potential_files:
|
|
resolved = resolve_file(f, project_path)
|
|
if resolved and resolved not in [a[0] for a in actual_files]:
|
|
try:
|
|
with open(resolved, 'r', encoding='utf-8') as file_obj:
|
|
lines = sum(1 for line in file_obj)
|
|
actual_files.append((resolved, lines))
|
|
total_lines += lines
|
|
except Exception: pass
|
|
|
|
num_files = len(actual_files)
|
|
|
|
keyword_effort = "small"
|
|
for kw in LARGE_KEYWORDS:
|
|
if re.search(r'\b' + re.escape(kw) + r'\b', combined_text):
|
|
keyword_effort = "large"
|
|
break
|
|
|
|
if keyword_effort != "large":
|
|
for kw in MEDIUM_KEYWORDS:
|
|
if re.search(r'\b' + re.escape(kw) + r'\b', combined_text):
|
|
keyword_effort = "medium"
|
|
break
|
|
|
|
effort = "small"
|
|
validation_msg = ""
|
|
if num_files == 0:
|
|
effort = keyword_effort if keyword_effort in ['medium', 'large'] else 'medium'
|
|
validation_msg = f"No specific files identified in codebase. Keyword heuristic: {keyword_effort}."
|
|
else:
|
|
file_details = ", ".join([f"{os.path.basename(f[0])} ({f[1]} lines)" for f in actual_files])
|
|
if num_files > 3 or total_lines > 1500 or keyword_effort == "large":
|
|
effort = "large"
|
|
validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Keyword hint: {keyword_effort}."
|
|
elif num_files >= 2 or total_lines > 500 or keyword_effort == "medium":
|
|
effort = "medium"
|
|
validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Keyword hint: {keyword_effort}."
|
|
else:
|
|
effort = "small"
|
|
validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Appears highly localized."
|
|
|
|
return effort, validation_msg
|
|
|
|
|
|
def process_pipeline_task(args_tuple):
|
|
issue, url, project_path, input_file, all_issues = args_tuple
|
|
|
|
needs_analysis = not issue.get('analysis') or issue.get('analysis') == "Failed to analyze autonomously" or len(issue.get('analysis', '')) < 30
|
|
|
|
if needs_analysis:
|
|
print(f"[{issue.get('number', 'unknown')}] Starting Agentic Analysis...")
|
|
result = analyze_issue_agentic(issue, url, project_path)
|
|
|
|
if result.get('analysis') == "Failed to analyze autonomously":
|
|
print(f"[{issue.get('number', 'unknown')}] Agentic failed. Falling back to Single-Turn Context Analysis...")
|
|
result = analyze_issue_single_turn(issue, url, project_path)
|
|
|
|
issue['analysis'] = result.get('analysis', 'Failed to analyze')
|
|
issue['effort_level'] = result.get('effort_level', 'medium')
|
|
issue['reasoning'] = result.get('reasoning', 'Could not determine')
|
|
if 'recommended_implementation' in result:
|
|
issue['recommended_implementation'] = result['recommended_implementation']
|
|
|
|
with file_lock:
|
|
with open(input_file, 'w') as f:
|
|
json.dump(all_issues, f, indent=2)
|
|
|
|
# Validation
|
|
old_effort = issue.get('effort_level')
|
|
new_effort, validation_reason = validate_effort(issue, project_path)
|
|
issue['effort_level'] = new_effort
|
|
|
|
existing_reasoning = issue.get('reasoning', '')
|
|
existing_reasoning = existing_reasoning.split(' | Codebase validation:')[0]
|
|
existing_reasoning = existing_reasoning.split(' | No specific files identified')[0]
|
|
issue['reasoning'] = f"{existing_reasoning} | {validation_reason}".strip(' |')
|
|
|
|
if needs_analysis or old_effort != new_effort:
|
|
with file_lock:
|
|
with open(input_file, 'w') as f:
|
|
json.dump(all_issues, f, indent=2)
|
|
|
|
print(f"[{issue.get('number', 'unknown')}] Completed -> {issue.get('effort_level')}")
|
|
return issue
|
|
|
|
def export_csv(issues, output_csv):
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
with open(output_csv, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f, delimiter='\t')
|
|
writer.writerow([
|
|
'Issue ID', 'Title', 'Status', 'Assignee', 'Labels',
|
|
'Last Sync', 'Link', 'analysis', 'effort_level',
|
|
'reasoning', 'recommended_implementation'
|
|
])
|
|
|
|
for issue in issues:
|
|
assignee_list = issue.get('assignees', [])
|
|
if isinstance(assignee_list, dict) and 'nodes' in assignee_list: assignee_list = assignee_list['nodes']
|
|
assignee = ", ".join([a.get('login', '') for a in assignee_list])
|
|
|
|
labels_list = issue.get('labels', [])
|
|
if isinstance(labels_list, dict) and 'nodes' in labels_list: labels_list = labels_list['nodes']
|
|
labels = ", ".join([l.get('name', '') for l in labels_list])
|
|
|
|
writer.writerow([
|
|
issue.get('number'),
|
|
issue.get('title', ''),
|
|
issue.get('state', 'OPEN'),
|
|
assignee,
|
|
labels,
|
|
today,
|
|
issue.get('url', ''),
|
|
issue.get('analysis', ''),
|
|
issue.get('effort_level', ''),
|
|
issue.get('reasoning', ''),
|
|
issue.get('recommended_implementation', '')
|
|
])
|
|
print(f"Exported successfully to {output_csv}")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Unified Effort Analysis Pipeline.")
|
|
parser.add_argument("--api-key", required=True, help="Gemini API Key")
|
|
parser.add_argument("--input", default="data/bugs.json", help="Input JSON file")
|
|
parser.add_argument("--project", default="../../packages", help="Project root to analyze")
|
|
parser.add_argument("--workers", type=int, default=4, help="Number of concurrent workers")
|
|
args = parser.parse_args()
|
|
|
|
url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:generateContent?key={args.api_key}"
|
|
|
|
with open(args.input, 'r') as f:
|
|
issues = json.load(f)
|
|
|
|
print(f"Starting unified analysis pipeline on {len(issues)} issues...")
|
|
|
|
tasks = [(issue, url, args.project, args.input, issues) for issue in issues]
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as executor:
|
|
list(executor.map(process_pipeline_task, tasks))
|
|
|
|
output_csv = args.input.replace('.json', '.csv')
|
|
export_csv(issues, output_csv)
|
|
print("Pipeline fully complete!")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|