From 4398261feb88daf99720971d1b74f055102e2c4c Mon Sep 17 00:00:00 2001 From: Coco Sheng Date: Wed, 6 May 2026 16:35:38 -0400 Subject: [PATCH] feat: consolidate multi-stage analysis pipeline into a single, efficient python script --- scripts/backlog-analysis/README.md | 19 +- scripts/backlog-analysis/analyze_bugs.py | 119 ------ scripts/backlog-analysis/analyze_pipeline.py | 395 ++++++++++++++++++ .../backlog-analysis/bug_analyzer_final.py | 192 --------- scripts/backlog-analysis/generate_bugs_csv.py | 55 --- scripts/backlog-analysis/run_pipeline.sh | 53 --- .../single_turn_bug_analyzer.py | 119 ------ .../backlog-analysis/utils/validate_effort.py | 135 ------ 8 files changed, 405 insertions(+), 682 deletions(-) delete mode 100644 scripts/backlog-analysis/analyze_bugs.py create mode 100644 scripts/backlog-analysis/analyze_pipeline.py delete mode 100644 scripts/backlog-analysis/bug_analyzer_final.py delete mode 100644 scripts/backlog-analysis/generate_bugs_csv.py delete mode 100755 scripts/backlog-analysis/run_pipeline.sh delete mode 100644 scripts/backlog-analysis/single_turn_bug_analyzer.py delete mode 100644 scripts/backlog-analysis/utils/validate_effort.py diff --git a/scripts/backlog-analysis/README.md b/scripts/backlog-analysis/README.md index 7578dcc5b9..170029e555 100644 --- a/scripts/backlog-analysis/README.md +++ b/scripts/backlog-analysis/README.md @@ -8,11 +8,11 @@ and determining implementation effort levels for the Gemini CLI project. - `data/`: Contains the issue data in JSON and CSV formats. - `bugs.json`: The primary source of truth for bug analysis. - `utils/`: Auxiliary scripts for manual overrides, debugging, and post-analysis - validation (e.g., `validate_effort.py`, `inject_manual_fixes.py`). -- `*.py`: Core analysis and export scripts (e.g., `bug_analyzer_final.py`, - `generate_bugs_csv.py`). -- `run_pipeline.sh`: A shell script that orchestrates the entire effort analysis - pipeline end-to-end. + validation (e.g., `inject_manual_fixes.py`). +- `analyze_pipeline.py`: A unified Python script that orchestrates the entire + effort analysis pipeline end-to-end, combining agentic analysis, single-turn + fallbacks, heuristic validation, and CSV export. +- `generic_processor.py`: A highly configurable agent for custom backlog tasks. ## 🚀 The Ideal Workflow @@ -45,12 +45,13 @@ python3 fetch_from_url.py "https://github.com/google-gemini/gemini-cli/issues/?q ### Step 2: Analyze Effort Level -Run the full effort analysis pipeline. This will run a fast static pass, a deep -agentic codebase search, iterative recovery for complex cases, and heuristic -validation. +Run the unified effort analysis pipeline. This single Python script efficiently +runs a deep agentic codebase search, gracefully falls back to context-based +single-turn analysis for complex cases, runs heuristic validation to prevent +underestimations, and immediately exports the results to a CSV. ```bash -GEMINI_API_KEY="YOUR_KEY" ./run_pipeline.sh data/bugs.json ../../packages +python3 analyze_pipeline.py --api-key "YOUR_KEY" --input data/bugs.json --project ../../packages ``` ### Step 3: Review and Update JSON diff --git a/scripts/backlog-analysis/analyze_bugs.py b/scripts/backlog-analysis/analyze_bugs.py deleted file mode 100644 index 523efc2bfb..0000000000 --- a/scripts/backlog-analysis/analyze_bugs.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Purpose: Performs fast, static initial triage of issues. -It sends the issue text along with a static representation of the directory tree to Gemini in a single turn. Useful for quick first-pass estimations without the overhead of deep codebase search. -""" -import json -import urllib.request -import urllib.error -import os -import argparse -import concurrent.futures -from pathlib import Path - -MODEL = "gemini-3-flash-preview" - -# Collect basic directory structure to provide as context -def get_tree(path, max_depth=3): - tree = [] - base_path = Path(path) - if not base_path.exists(): return "" - for root, dirs, files in os.walk(base_path): - dirs[:] = [d for d in dirs if d not in ('.git', 'node_modules', 'dist', 'build', 'coverage')] - depth = Path(root).relative_to(base_path).parts - if len(depth) >= max_depth: - dirs.clear() - continue - indent = ' ' * len(depth) - tree.append(f"{indent}{Path(root).name}/") - for f in files: - if f.endswith(('.ts', '.tsx', '.js', '.json', '.toml', '.md', '.py', '.sh')): - tree.append(f"{indent} {f}") - return "\n".join(tree) - -def analyze_bug(bug, url, tree_context): - prompt = f""" -You are analyzing bugs for the current codebase. -Here is the directory structure of the project: -{tree_context[:4000]} - -Analyze the following GitHub bug report to determine the implementation effort. -Rate the effort level with reasoning (small as in 1 day, medium as in 2-3 day, else large). -Look at the directory structure above to pinpoint which packages and files need modification. - -Issue Title: {bug.get('title')} -Issue Body: {bug.get('body', '')[:1000]} - -Reply with ONLY a valid JSON object matching exactly this schema, without Markdown formatting: -{{"analysis": "short technical analysis of the root cause and required fix", "effort_level": "small|medium|large", "reasoning": "brief justification mapping the effort to the files/components involved", "recommended_implementation": "concise code change instructions (only if small effort)"}} -""" - data = { - "contents": [{"parts": [{"text": prompt}]}], - "generationConfig": { - "temperature": 0.1, - } - } - - req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'}) - try: - with urllib.request.urlopen(req) as response: - result = json.loads(response.read().decode('utf-8')) - text = result['candidates'][0]['content']['parts'][0]['text'] - - # Clean markdown block if present - if text.startswith('```json'): - text = text[7:] - if text.startswith('```'): - text = text[3:] - if text.endswith('```'): - text = text[:-3] - - parsed = json.loads(text.strip()) - return parsed - except Exception as e: - print(f"Error processing bug {bug.get('number', 'unknown')}: {e}") - return {"analysis": "Failed to analyze", "effort_level": "medium", "reasoning": "Error calling Gemini API"} - -def process_bug_task(args): - bug, url, tree_context = args - print(f"Analyzing Bug #{bug.get('number', 'unknown')}...") - result = analyze_bug(bug, url, tree_context) - bug['analysis'] = result.get('analysis', '') - bug['effort_level'] = result.get('effort_level', 'medium') - bug['reasoning'] = result.get('reasoning', '') - if 'recommended_implementation' in result: - bug['recommended_implementation'] = result['recommended_implementation'] - return bug - -def main(): - parser = argparse.ArgumentParser(description="Static initial triage analyzer for bugs.") - parser.add_argument("--api-key", required=True, help="Gemini API Key") - parser.add_argument("--input", default="data/bugs.json", help="Input JSON file containing bugs") - parser.add_argument("--project", default="../../packages", help="Project root to analyze") - args = parser.parse_args() - - url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:generateContent?key={args.api_key}" - - with open(args.input, 'r') as f: - bugs = json.load(f) - - tree_context = get_tree(args.project) - - print(f"Starting static analysis of {len(bugs)} bugs...") - - # Process in batches to save incrementally - batch_size = 10 - for i in range(0, len(bugs), batch_size): - batch = bugs[i:i+batch_size] - tasks = [(bug, url, tree_context) for bug in batch] - - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - list(executor.map(process_bug_task, tasks)) - - with open(args.input, 'w') as f: - json.dump(bugs, f, indent=2) - print(f"Saved batch {i//batch_size + 1}") - - print("Done analyzing all bugs!") - -if __name__ == '__main__': - main() diff --git a/scripts/backlog-analysis/analyze_pipeline.py b/scripts/backlog-analysis/analyze_pipeline.py new file mode 100644 index 0000000000..e2700e802d --- /dev/null +++ b/scripts/backlog-analysis/analyze_pipeline.py @@ -0,0 +1,395 @@ +""" +Purpose: A unified pipeline that performs end-to-end effort analysis on a dataset of GitHub issues. +It combines agentic deep analysis, single-turn fallbacks, heuristic validation, and CSV export in a single, efficient execution, avoiding redundant file operations. +""" +import argparse +import json +import urllib.request +import os +import subprocess +import re +import concurrent.futures +import threading +import csv +from datetime import datetime +from pathlib import Path + +MODEL = "gemini-3-flash-preview" +file_lock = threading.Lock() + +# --- VALIDATION HEURISTICS --- +LARGE_KEYWORDS = [ + 'windows', 'win32', 'wsl', 'wsl2', 'pty', 'pseudo-terminal', 'child_process', 'spawn', 'sigint', 'sigterm', + 'memory leak', 'performance', 'boot time', 'infinite loop', 'hangs', 'freezes', 'crashes', 'race condition', + 'intermittent', 'sometimes', 'flickering', 'a2a', 'mcp protocol', 'scheduler', 'event loop', 'websocket', + 'stream', 'throughput', 'concurrency', 'deadlock', 'file descriptor', 'architecture', 'refactor' +] + +MEDIUM_KEYWORDS = [ + 'react', 'hook', 'useeffect', 'usestate', 'usememo', 'ink', 'tui', 'ui state', 'parser', 'markdown', + 'regex', 'regular expression', 'ansi', 'escape sequence', 'toml', 'schema', 'validation', 'zod', + 'promise', 'async', 'await', 'unhandled', 'rejection', 'config', 'settings', 'env', 'environment', + 'path resolution', 'symlink', 'git', 'telemetry', 'logging', 'format', 'display', 'rendering', + 'clipboard', 'copy', 'paste', 'bracketed', 'interactive', 'dialog', 'modal', 'focus' +] + +tools_decl = [ + { + "functionDeclarations": [ + { + "name": "search_codebase", + "description": "Search the project directory for a string using grep. Returns matching lines and file paths.", + "parameters": { + "type": "OBJECT", + "properties": { + "pattern": {"type": "STRING", "description": "The text pattern to search for"} + }, + "required": ["pattern"] + } + }, + { + "name": "read_file", + "description": "Read a specific file to understand its context.", + "parameters": { + "type": "OBJECT", + "properties": { + "filepath": {"type": "STRING", "description": "The path to the file"} + }, + "required": ["filepath"] + } + } + ] + } +] + +def call_gemini(messages, url): + data = { + "contents": messages, + "tools": tools_decl, + "generationConfig": {"temperature": 0.1} + } + req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'}) + with urllib.request.urlopen(req, timeout=120) as response: + return json.loads(response.read().decode('utf-8')) + +def execute_tool(call, project_path): + name = call['name'] + args = call.get('args', {}) + + if name == 'search_codebase': + pattern = args.get('pattern', '').replace('"', '\\"') + try: + cmd = f'grep -rn "{pattern}" "{project_path}" | grep -vE "node_modules|dist|build|\\.test\\." | head -n 20' + res = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT) + return res if res else "No matches found." + except subprocess.CalledProcessError as e: + return e.output if e.output else "No matches found." + elif name == 'read_file': + filepath = args.get('filepath', '') + if not filepath.startswith('/'): + filepath = os.path.join(project_path, filepath) + + try: + if not os.path.exists(filepath): + basename = os.path.basename(filepath) + find_cmd = f'find "{project_path}" -name "{basename}" | head -n 1' + found_path = subprocess.check_output(find_cmd, shell=True, text=True).strip() + if found_path: filepath = found_path + else: return f"File {filepath} not found." + + cmd = f'head -n 300 "{filepath}"' + res = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT) + return res + except Exception as e: + return str(e) + return "Unknown tool" + +def analyze_issue_agentic(issue, url, project_path): + system_instruction = """You are a senior software engineer analyzing bug/feature reports. +You MUST use the provided tools to investigate the codebase and pinpoint exactly which files and logic are responsible. +DO NOT GUESS. + +Rating Effort Level: +- small (1 day): Localized fix/change (1-2 files), clear logic. +- medium (2-3 days): Harder to trace, state management, touches multiple components. +- large (>3 days): Architectural issues, core protocol changes, or very complex multi-package bugs. + +REPRODUCTION RULE: +If a bug is hard to reproduce (specific OS, complex setup, intermittent/flickering), it MUST NOT be rated as small. + +Output format (ONLY valid JSON, NO markdown): +{ + "analysis": "technical analysis of root cause and fix", + "effort_level": "small|medium|large", + "reasoning": "justification with specific files/lines/logic you found", + "recommended_implementation": "code snippets or specific logic changes (only if small)" +} +""" + prompt = f"{system_instruction}\n\nIssue Title: {issue.get('title')}\nIssue Body: {issue.get('body', '')[:1500]}" + messages = [{"role": "user", "parts": [{"text": prompt}]}] + + for turn in range(15): # Limit turns to 15 for efficiency in unified loop + try: + res = call_gemini(messages, url) + candidate = res['candidates'][0]['content'] + parts = candidate.get('parts', []) + + if 'role' not in candidate: candidate['role'] = 'model' + messages.append(candidate) + + function_calls = [p for p in parts if 'functionCall' in p] + + if function_calls: + tool_responses = [] + for fcall in function_calls: + call_data = fcall['functionCall'] + result = execute_tool(call_data, project_path) + tool_responses.append({ + "functionResponse": { + "name": call_data['name'], + "response": {"result": result[:5000]} + } + }) + messages.append({"role": "user", "parts": tool_responses}) + else: + text = parts[0].get('text', '') + if not text: continue + text = text.replace('```json', '').replace('```', '').strip() + return json.loads(text) + except Exception as e: + break + + return {"analysis": "Failed to analyze autonomously", "effort_level": "medium", "reasoning": "Agent loop exceeded limit or errored."} + +def extract_keywords(text): + words = re.findall(r'\b[A-Z][a-zA-Z0-9]+\b|\b\w+\.tsx?\b|\b\w+Service\b|\b\w+Command\b', text) + words = list(set([w for w in words if len(w) > 4])) + return words[:8] + +def search_codebase_static(keywords, project_path): + context = "" + for kw in keywords: + try: + kw_clean = kw.replace('"', '\\"') + cmd = f'grep -rn "{kw_clean}" "{project_path}" | grep -vE "node_modules|dist|build|\\.test\\." | head -n 8' + out = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT) + if out: + context += f"\n--- Matches for {kw_clean} ---\n{out}\n" + except: + pass + return context + +def analyze_issue_single_turn(issue, url, project_path): + title = issue.get('title', '') + body = issue.get('body', '')[:1500] + + keywords = extract_keywords(title + " " + body) + code_context = search_codebase_static(keywords, project_path) + + prompt = f"""You are a senior software engineer analyzing issues. +Based on the description and codebase search context, pinpoint exactly which files and logic are responsible. +DO NOT GUESS. If the context isn't enough, provide your best technical hypothesis. + +Rating Effort Level: +- small (1 day): Localized fix (1-2 files), clear cause. +- medium (2-3 days): Touches multiple components or hard to trace. +- large (>3 days): Architectural issues, Windows/WSL-specific, core protocols. + +Issue Title: {title} +Issue Body: {body} + +Codebase Search Context: +{code_context[:8000]} + +Output ONLY valid JSON (no markdown block): +{{ + "analysis": "technical analysis of root cause and fix", + "effort_level": "small|medium|large", + "reasoning": "justification with specific files/lines found" +}} +""" + data = { + "contents": [{"role": "user", "parts": [{"text": prompt}]}], + "generationConfig": {"temperature": 0.1} + } + + try: + req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'}) + with urllib.request.urlopen(req, timeout=60) as response: + res = json.loads(response.read().decode('utf-8')) + txt = res['candidates'][0]['content']['parts'][0]['text'] + txt = txt.replace('```json', '').replace('```', '').strip() + return json.loads(txt) + except Exception as e: + return {"analysis": "Failed to analyze autonomously", "effort_level": "medium", "reasoning": str(e)} + +# --- VALIDATION --- +def find_files_in_text(text): + matches = re.findall(r'([\w\.\/\-]+\.(?:ts|tsx|js|json|md))', text) + return set([m for m in matches if not m.startswith('http')]) + +def resolve_file(filename, project_path): + if os.path.exists(os.path.join(project_path, filename)): + return os.path.join(project_path, filename) + basename = os.path.basename(filename) + for root, dirs, files in os.walk(project_path): + if '.git' in root or 'node_modules' in root: continue + if basename in files: return os.path.join(root, basename) + return None + +def validate_effort(issue, project_path): + title = issue.get('title', '').lower() + body = issue.get('body', '').lower() + analysis = issue.get('analysis', '').lower() + reasoning = issue.get('reasoning', '').lower() + + combined_text = f"{title} {body} {analysis} {reasoning}" + + potential_files = find_files_in_text(combined_text) + actual_files = [] + total_lines = 0 + + for f in potential_files: + resolved = resolve_file(f, project_path) + if resolved and resolved not in [a[0] for a in actual_files]: + try: + with open(resolved, 'r', encoding='utf-8') as file_obj: + lines = sum(1 for line in file_obj) + actual_files.append((resolved, lines)) + total_lines += lines + except Exception: pass + + num_files = len(actual_files) + + keyword_effort = "small" + for kw in LARGE_KEYWORDS: + if re.search(r'\b' + re.escape(kw) + r'\b', combined_text): + keyword_effort = "large" + break + + if keyword_effort != "large": + for kw in MEDIUM_KEYWORDS: + if re.search(r'\b' + re.escape(kw) + r'\b', combined_text): + keyword_effort = "medium" + break + + effort = "small" + validation_msg = "" + if num_files == 0: + effort = keyword_effort if keyword_effort in ['medium', 'large'] else 'medium' + validation_msg = f"No specific files identified in codebase. Keyword heuristic: {keyword_effort}." + else: + file_details = ", ".join([f"{os.path.basename(f[0])} ({f[1]} lines)" for f in actual_files]) + if num_files > 3 or total_lines > 1500 or keyword_effort == "large": + effort = "large" + validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Keyword hint: {keyword_effort}." + elif num_files >= 2 or total_lines > 500 or keyword_effort == "medium": + effort = "medium" + validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Keyword hint: {keyword_effort}." + else: + effort = "small" + validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Appears highly localized." + + return effort, validation_msg + + +def process_pipeline_task(args_tuple): + issue, url, project_path, input_file, all_issues = args_tuple + + needs_analysis = not issue.get('analysis') or issue.get('analysis') == "Failed to analyze autonomously" or len(issue.get('analysis', '')) < 30 + + if needs_analysis: + print(f"[{issue.get('number', 'unknown')}] Starting Agentic Analysis...") + result = analyze_issue_agentic(issue, url, project_path) + + if result.get('analysis') == "Failed to analyze autonomously": + print(f"[{issue.get('number', 'unknown')}] Agentic failed. Falling back to Single-Turn Context Analysis...") + result = analyze_issue_single_turn(issue, url, project_path) + + issue['analysis'] = result.get('analysis', 'Failed to analyze') + issue['effort_level'] = result.get('effort_level', 'medium') + issue['reasoning'] = result.get('reasoning', 'Could not determine') + if 'recommended_implementation' in result: + issue['recommended_implementation'] = result['recommended_implementation'] + + with file_lock: + with open(input_file, 'w') as f: + json.dump(all_issues, f, indent=2) + + # Validation + old_effort = issue.get('effort_level') + new_effort, validation_reason = validate_effort(issue, project_path) + issue['effort_level'] = new_effort + + existing_reasoning = issue.get('reasoning', '') + existing_reasoning = existing_reasoning.split(' | Codebase validation:')[0] + existing_reasoning = existing_reasoning.split(' | No specific files identified')[0] + issue['reasoning'] = f"{existing_reasoning} | {validation_reason}".strip(' |') + + if needs_analysis or old_effort != new_effort: + with file_lock: + with open(input_file, 'w') as f: + json.dump(all_issues, f, indent=2) + + print(f"[{issue.get('number', 'unknown')}] Completed -> {issue.get('effort_level')}") + return issue + +def export_csv(issues, output_csv): + today = datetime.now().strftime("%Y-%m-%d") + with open(output_csv, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f, delimiter='\t') + writer.writerow([ + 'Issue ID', 'Title', 'Status', 'Assignee', 'Labels', + 'Last Sync', 'Link', 'analysis', 'effort_level', + 'reasoning', 'recommended_implementation' + ]) + + for issue in issues: + assignee_list = issue.get('assignees', []) + if isinstance(assignee_list, dict) and 'nodes' in assignee_list: assignee_list = assignee_list['nodes'] + assignee = ", ".join([a.get('login', '') for a in assignee_list]) + + labels_list = issue.get('labels', []) + if isinstance(labels_list, dict) and 'nodes' in labels_list: labels_list = labels_list['nodes'] + labels = ", ".join([l.get('name', '') for l in labels_list]) + + writer.writerow([ + issue.get('number'), + issue.get('title', ''), + issue.get('state', 'OPEN'), + assignee, + labels, + today, + issue.get('url', ''), + issue.get('analysis', ''), + issue.get('effort_level', ''), + issue.get('reasoning', ''), + issue.get('recommended_implementation', '') + ]) + print(f"Exported successfully to {output_csv}") + +def main(): + parser = argparse.ArgumentParser(description="Unified Effort Analysis Pipeline.") + parser.add_argument("--api-key", required=True, help="Gemini API Key") + parser.add_argument("--input", default="data/bugs.json", help="Input JSON file") + parser.add_argument("--project", default="../../packages", help="Project root to analyze") + parser.add_argument("--workers", type=int, default=4, help="Number of concurrent workers") + args = parser.parse_args() + + url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:generateContent?key={args.api_key}" + + with open(args.input, 'r') as f: + issues = json.load(f) + + print(f"Starting unified analysis pipeline on {len(issues)} issues...") + + tasks = [(issue, url, args.project, args.input, issues) for issue in issues] + with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as executor: + list(executor.map(process_pipeline_task, tasks)) + + output_csv = args.input.replace('.json', '.csv') + export_csv(issues, output_csv) + print("Pipeline fully complete!") + +if __name__ == '__main__': + main() diff --git a/scripts/backlog-analysis/bug_analyzer_final.py b/scripts/backlog-analysis/bug_analyzer_final.py deleted file mode 100644 index 417f67eaf9..0000000000 --- a/scripts/backlog-analysis/bug_analyzer_final.py +++ /dev/null @@ -1,192 +0,0 @@ -""" -Purpose: Performs deep, agentic analysis on backlog issues. -It equips the Gemini model with tool-calling capabilities (grep and file reading), allowing it to autonomously navigate the codebase and investigate the root cause over multiple turns (up to 30) for high-accuracy effort estimation. -""" -import json -import urllib.request -import urllib.error -import os -import argparse -import concurrent.futures -import subprocess -import sys -import threading - -MODEL = "gemini-3-flash-preview" -file_lock = threading.Lock() - -tools_decl = [ - { - "functionDeclarations": [ - { - "name": "search_codebase", - "description": "Search the project directory for a string using grep. Returns matching lines and file paths.", - "parameters": { - "type": "OBJECT", - "properties": { - "pattern": {"type": "STRING", "description": "The text pattern to search for"} - }, - "required": ["pattern"] - } - }, - { - "name": "read_file", - "description": "Read a specific file to understand its context.", - "parameters": { - "type": "OBJECT", - "properties": { - "filepath": {"type": "STRING", "description": "The path to the file"} - }, - "required": ["filepath"] - } - } - ] - } -] - -def call_gemini(messages, url): - data = { - "contents": messages, - "tools": tools_decl, - "generationConfig": {"temperature": 0.1} - } - req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'}) - with urllib.request.urlopen(req) as response: - return json.loads(response.read().decode('utf-8')) - -def execute_tool(call, project_path): - name = call['name'] - args = call.get('args', {}) - - if name == 'search_codebase': - pattern = args.get('pattern', '') - pattern = pattern.replace('"', '\\"') - try: - cmd = f'grep -rn "{pattern}" "{project_path}" | grep -vE "node_modules|dist|build|\\.test\\." | head -n 20' - res = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT) - return res if res else "No matches found." - except subprocess.CalledProcessError as e: - return e.output if e.output else "No matches found." - elif name == 'read_file': - filepath = args.get('filepath', '') - if not filepath.startswith('/'): - filepath = os.path.join(project_path, filepath) - - try: - if not os.path.exists(filepath): - basename = os.path.basename(filepath) - find_cmd = f'find "{project_path}" -name "{basename}" | head -n 1' - found_path = subprocess.check_output(find_cmd, shell=True, text=True).strip() - if found_path: filepath = found_path - else: return f"File {filepath} not found." - - cmd = f'head -n 300 "{filepath}"' - res = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT) - return res - except Exception as e: - return str(e) - return "Unknown tool" - -def analyze_issue(issue, url, project_path): - system_instruction = """You are a senior software engineer analyzing bug reports. -You MUST use the provided tools to investigate the codebase and pinpoint exactly which files and logic are responsible for the bug. -DO NOT GUESS. - -Rating Effort Level: -- small (1 day): Bug is easy to reproduce, clear cause, localized fix (1-2 files). -- medium (2-3 days): Harder to reproduce (needs specific platform/setup), requires tracing, or touches multiple components. -- large (>3 days): Architectural issues, core protocol changes, or very complex multi-package bugs. - -REPRODUCTION RULE: -If a bug is hard to reproduce (specific OS, complex setup, intermittent/flickering), it MUST NOT be rated as small. - -Output format (ONLY valid JSON, NO markdown): -{ - "analysis": "technical analysis of root cause and fix", - "effort_level": "small|medium|large", - "reasoning": "justification with specific files/lines/logic you found", - "recommended_implementation": "code snippets or specific logic changes (only if small)" -} -""" - - prompt = f"{system_instruction}\n\nBug Title: {issue.get('title')}\nBug Body: {issue.get('body', '')[:1200]}" - messages = [{"role": "user", "parts": [{"text": prompt}]}] - - for turn in range(30): - try: - res = call_gemini(messages, url) - candidate = res['candidates'][0]['content'] - parts = candidate.get('parts', []) - - if 'role' not in candidate: candidate['role'] = 'model' - messages.append(candidate) - - function_calls = [p for p in parts if 'functionCall' in p] - - if function_calls: - tool_responses = [] - for fcall in function_calls: - call_data = fcall['functionCall'] - result = execute_tool(call_data, project_path) - tool_responses.append({ - "functionResponse": { - "name": call_data['name'], - "response": {"result": result[:5000]} - } - }) - messages.append({"role": "user", "parts": tool_responses}) - else: - text = parts[0].get('text', '') - if not text: continue - text = text.replace('```json', '').replace('```', '').strip() - return json.loads(text) - except Exception as e: break - - return {"analysis": "Failed to analyze autonomously", "effort_level": "medium", "reasoning": "Agent loop exceeded 30 turns or errored."} - -def process_issue_task(args_tuple): - issue, url, project_path, input_file, bugs = args_tuple - current_analysis = issue.get('analysis', '') - if current_analysis and current_analysis != "Failed to analyze autonomously" and len(current_analysis) > 50: - return issue - - print(f"Analyzing Bug #{issue.get('number', 'unknown')}...", flush=True) - result = analyze_issue(issue, url, project_path) - - issue['analysis'] = result.get('analysis', 'Failed to analyze') - issue['effort_level'] = result.get('effort_level', 'medium') - issue['reasoning'] = result.get('reasoning', 'Could not determine') - if 'recommended_implementation' in result: - issue['recommended_implementation'] = result['recommended_implementation'] - else: - issue.pop('recommended_implementation', None) - - print(f"Completed Bug #{issue.get('number', 'unknown')} -> {issue.get('effort_level', 'unknown')}", flush=True) - - with file_lock: - with open(input_file, 'w') as f: - json.dump(bugs, f, indent=2) - return issue - -def main(): - parser = argparse.ArgumentParser(description="Deep agentic bug analyzer.") - parser.add_argument("--api-key", required=True, help="Gemini API Key") - parser.add_argument("--input", default="data/bugs.json", help="Input JSON file containing bugs") - parser.add_argument("--project", default="../../packages", help="Project root to analyze") - args = parser.parse_args() - - url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:generateContent?key={args.api_key}" - - with open(args.input, 'r') as f: - bugs = json.load(f) - - print(f"Starting FINAL RE-ANALYSIS for {len(bugs)} bugs (Turn Limit: 30)...", flush=True) - - tasks = [(b, url, args.project, args.input, bugs) for b in bugs] - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - list(executor.map(process_issue_task, tasks)) - - print("Agentic analysis complete. JSON is updated.", flush=True) - -if __name__ == '__main__': - main() diff --git a/scripts/backlog-analysis/generate_bugs_csv.py b/scripts/backlog-analysis/generate_bugs_csv.py deleted file mode 100644 index f745bfa951..0000000000 --- a/scripts/backlog-analysis/generate_bugs_csv.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Purpose: Exports analyzed JSON issue data into a human-readable CSV format. -This is typically the final step in the workflow, making the output suitable for sharing, spreadsheet import, or manual review. -""" -import argparse -import json -import csv -from datetime import datetime - -parser = argparse.ArgumentParser(description="Export JSON issues to CSV.") -parser.add_argument("--input", default="data/bugs.json", help="Input JSON file") -parser.add_argument("--output", default="data/bugs.csv", help="Output CSV file") -args = parser.parse_args() - -with open(args.input, 'r') as f: - issues = json.load(f) - -today = datetime.now().strftime("%Y-%m-%d") - -with open(args.output, 'w', newline='', encoding='utf-8') as f: - writer = csv.writer(f, delimiter='\t') - writer.writerow([ - 'Issue ID', 'Title', 'Status', 'Assignee', 'Labels', - 'Last Sync', 'Link', 'analysis', 'effort_level', - 'reasoning', 'recommended_implementation' - ]) - - for issue in issues: - num = issue.get('number') - - assignee_list = issue.get('assignees', []) - if isinstance(assignee_list, dict) and 'nodes' in assignee_list: - assignee_list = assignee_list['nodes'] - assignee = ", ".join([a.get('login', '') for a in assignee_list]) - - labels_list = issue.get('labels', []) - if isinstance(labels_list, dict) and 'nodes' in labels_list: - labels_list = labels_list['nodes'] - labels = ", ".join([l.get('name', '') for l in labels_list]) - - writer.writerow([ - num, - issue.get('title', ''), - issue.get('state', 'OPEN'), - assignee, - labels, - today, - issue.get('url', ''), - issue.get('analysis', ''), - issue.get('effort_level', ''), - issue.get('reasoning', ''), - issue.get('recommended_implementation', '') - ]) - -print(f"Successfully generated {args.output}") diff --git a/scripts/backlog-analysis/run_pipeline.sh b/scripts/backlog-analysis/run_pipeline.sh deleted file mode 100755 index f5c1960970..0000000000 --- a/scripts/backlog-analysis/run_pipeline.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# run_pipeline.sh -# Purpose: Orchestrates the full effort analysis pipeline end-to-end. - -if [ -z "$GEMINI_API_KEY" ]; then - echo "Error: GEMINI_API_KEY environment variable is required." - echo "Usage: GEMINI_API_KEY=your_key ./run_pipeline.sh [INPUT_FILE] [PROJECT_DIR]" - exit 1 -fi - -INPUT_FILE=${1:-"data/bugs.json"} -PROJECT_DIR=${2:-"../../packages"} -OUTPUT_CSV="${INPUT_FILE%.json}.csv" - -echo "==========================================" -echo "Step 1: Initial Triage (Static Pass)" -echo "==========================================" -python3 analyze_bugs.py --api-key "$GEMINI_API_KEY" --input "$INPUT_FILE" --project "$PROJECT_DIR" - -echo "" -echo "==========================================" -echo "Step 2: Deep Agentic Analysis" -echo "==========================================" -python3 bug_analyzer_final.py --api-key "$GEMINI_API_KEY" --input "$INPUT_FILE" --project "$PROJECT_DIR" - -echo "" -echo "==========================================" -echo "Step 3: Iterative Recovery Analysis" -echo "==========================================" -while true; do - count=$(jq '[.[] | select(.analysis == "Failed to analyze autonomously" or .analysis == null or .analysis == "" or (.analysis | length) < 30)] | length' "$INPUT_FILE") - if [ -z "$count" ] || [ "$count" -eq 0 ]; then - echo "All issues successfully processed!" - break - fi - echo "Remaining unanalyzed issues: $count" - python3 single_turn_bug_analyzer.py --api-key "$GEMINI_API_KEY" --input "$INPUT_FILE" --project "$PROJECT_DIR" -done - -echo "" -echo "==========================================" -echo "Step 4: Heuristic Validation" -echo "==========================================" -python3 utils/validate_effort.py --input "$INPUT_FILE" --project "$PROJECT_DIR" - -echo "" -echo "==========================================" -echo "Step 5: Exporting to CSV" -echo "==========================================" -python3 generate_bugs_csv.py --input "$INPUT_FILE" --output "$OUTPUT_CSV" - -echo "" -echo "✅ Pipeline Complete! Results saved to $OUTPUT_CSV" diff --git a/scripts/backlog-analysis/single_turn_bug_analyzer.py b/scripts/backlog-analysis/single_turn_bug_analyzer.py deleted file mode 100644 index 8f816b6b1a..0000000000 --- a/scripts/backlog-analysis/single_turn_bug_analyzer.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Purpose: Performs a single-turn analysis on backlog issues. -It pre-fetches context by grepping the codebase for keywords found in the issue description, then sends a single prompt to Gemini to determine the root cause and effort level. Faster than agentic analysis but more grounded than static analysis. -""" -import json -import urllib.request -import os -import subprocess -import re -import argparse -import concurrent.futures -import threading - -MODEL = "gemini-3-flash-preview" -file_lock = threading.Lock() - -def extract_keywords(text): - words = re.findall(r'\b[A-Z][a-zA-Z0-9]+\b|\b\w+\.tsx?\b|\b\w+Service\b|\b\w+Command\b', text) - words = list(set([w for w in words if len(w) > 4])) - return words[:8] - -def search_codebase(keywords, project_path): - context = "" - for kw in keywords: - try: - kw_clean = kw.replace('"', '\\"') - cmd = f'grep -rn "{kw_clean}" "{project_path}" | grep -vE "node_modules|dist|build|\\.test\\." | head -n 8' - out = subprocess.check_output(cmd, shell=True, text=True, stderr=subprocess.STDOUT) - if out: - context += f"\n--- Matches for {kw_clean} ---\n{out}\n" - except: - pass - return context - -def process_issue_task(args_tuple): - issue, url, project_path, input_file, bugs = args_tuple - - if issue.get('analysis') and issue['analysis'] != "Failed to analyze autonomously" and len(issue['analysis']) > 30: - return issue - - title = issue.get('title', '') - body = issue.get('body', '')[:1500] - - keywords = extract_keywords(title + " " + body) - code_context = search_codebase(keywords, project_path) - - prompt = f"""You are a senior software engineer analyzing bug reports. -Based on the bug description and the provided codebase search context, pinpoint exactly which files and logic are responsible for the bug. -DO NOT GUESS. If the context isn't enough, provide your best technical hypothesis. - -Rating Effort Level: -- small (1 day): Localized fix (1-2 files), clear cause. -- medium (2-3 days): Touches multiple components or hard to trace. -- large (>3 days): Architectural issues, Windows/WSL-specific, core protocols. - -Bug Title: {title} -Bug Body: {body} - -Codebase Search Context: -{code_context[:8000]} - -Output ONLY valid JSON (no markdown block): -{{ - "analysis": "technical analysis of root cause and fix", - "effort_level": "small|medium|large", - "reasoning": "justification with specific files/lines found" -}} -""" - data = { - "contents": [{"role": "user", "parts": [{"text": prompt}]}], - "generationConfig": {"temperature": 0.1} - } - - try: - req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'}) - with urllib.request.urlopen(req, timeout=60) as response: - res = json.loads(response.read().decode('utf-8')) - txt = res['candidates'][0]['content']['parts'][0]['text'] - txt = txt.replace('```json', '').replace('```', '').strip() - parsed = json.loads(txt) - - issue['analysis'] = parsed.get('analysis', 'Failed to analyze') - issue['effort_level'] = parsed.get('effort_level', 'medium') - issue['reasoning'] = parsed.get('reasoning', 'Could not determine') - print(f"Completed {issue.get('number', 'unknown')} -> {issue['effort_level']}", flush=True) - except Exception as e: - print(f"Failed {issue.get('number', 'unknown')}: {e}", flush=True) - - with file_lock: - with open(input_file, 'w') as f: - json.dump(bugs, f, indent=2) - - return issue - -def main(): - parser = argparse.ArgumentParser(description="Single turn code search bug analyzer.") - parser.add_argument("--api-key", required=True, help="Gemini API Key") - parser.add_argument("--input", default="data/bugs.json", help="Input JSON file containing bugs") - parser.add_argument("--project", default="../../packages", help="Project root to analyze") - args = parser.parse_args() - - url = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:generateContent?key={args.api_key}" - - with open(args.input, 'r') as f: - bugs = json.load(f) - - to_analyze = [b for b in bugs if b.get('analysis') == "Failed to analyze autonomously" or not b.get('analysis') or len(b.get('analysis', '')) < 30] - to_analyze = to_analyze[:5] - - print(f"Starting single-turn analysis for {len(to_analyze)} bugs...", flush=True) - - tasks = [(b, url, args.project, args.input, bugs) for b in to_analyze] - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - list(executor.map(process_issue_task, tasks)) - - print("Done processing batch.", flush=True) - -if __name__ == '__main__': - main() diff --git a/scripts/backlog-analysis/utils/validate_effort.py b/scripts/backlog-analysis/utils/validate_effort.py deleted file mode 100644 index f7f9923fbf..0000000000 --- a/scripts/backlog-analysis/utils/validate_effort.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -Purpose: Runs heuristic post-analysis validation on the AI's effort estimations. -Checks for keywords (like 'Windows', 'WSL', 'PTY') in the issue body to ensure the AI didn't underestimate platform-specific or architecturally complex bugs as 'small'. -""" -import argparse -import json -import re -import os - -parser = argparse.ArgumentParser(description="Validate effort levels using heuristics.") -parser.add_argument("--input", default="data/bugs.json", help="Input JSON file containing analyzed issues") -parser.add_argument("--project", default="../../packages", help="Project root for codebase validation") -args = parser.parse_args() - -ISSUES_FILE = args.input -REPO_ROOT = args.project - -with open(ISSUES_FILE, 'r') as f: - issues = json.load(f) - -# Stricter criteria keywords -LARGE_KEYWORDS = [ - 'windows', 'win32', 'wsl', 'wsl2', 'pty', 'pseudo-terminal', 'child_process', 'spawn', 'sigint', 'sigterm', - 'memory leak', 'performance', 'boot time', 'infinite loop', 'hangs', 'freezes', 'crashes', 'race condition', - 'intermittent', 'sometimes', 'flickering', 'a2a', 'mcp protocol', 'scheduler', 'event loop', 'websocket', - 'stream', 'throughput', 'concurrency', 'deadlock', 'file descriptor', 'architecture', 'refactor' -] - -MEDIUM_KEYWORDS = [ - 'react', 'hook', 'useeffect', 'usestate', 'usememo', 'ink', 'tui', 'ui state', 'parser', 'markdown', - 'regex', 'regular expression', 'ansi', 'escape sequence', 'toml', 'schema', 'validation', 'zod', - 'promise', 'async', 'await', 'unhandled', 'rejection', 'config', 'settings', 'env', 'environment', - 'path resolution', 'symlink', 'git', 'telemetry', 'logging', 'format', 'display', 'rendering', - 'clipboard', 'copy', 'paste', 'bracketed', 'interactive', 'dialog', 'modal', 'focus' -] - -SMALL_KEYWORDS = [ - 'typo', 'spelling', 'rename', 'string', 'constant', 'css', 'color', 'theme.status', 'padding', 'margin', - 'error message', 'econnreset', 'enotdir', 'etimedout', 'documentation', 'jsdoc', 'readme', 'help text', - 'flag', 'version string', 'static value' -] - -def find_files_in_text(text): - matches = re.findall(r'([\w\.\/\-]+\.(?:ts|tsx|js|json|md))', text) - return set([m for m in matches if not m.startswith('http')]) - -def resolve_file(filename): - if os.path.exists(os.path.join(REPO_ROOT, filename)): - return os.path.join(REPO_ROOT, filename) - - basename = os.path.basename(filename) - for root, dirs, files in os.walk(REPO_ROOT): - if '.git' in root or 'node_modules' in root: - continue - if basename in files: - return os.path.join(root, basename) - return None - -def analyze_issue(issue): - title = issue.get('title', '').lower() - body = issue.get('body', '').lower() - analysis = issue.get('analysis', '').lower() - reasoning = issue.get('reasoning', '').lower() - - combined_text = f"{title} {body} {analysis} {reasoning}" - - potential_files = find_files_in_text(combined_text) - actual_files = [] - total_lines = 0 - - for f in potential_files: - resolved = resolve_file(f) - if resolved and resolved not in [a[0] for a in actual_files]: - try: - with open(resolved, 'r', encoding='utf-8') as file_obj: - lines = sum(1 for line in file_obj) - actual_files.append((resolved, lines)) - total_lines += lines - except Exception: - pass - - num_files = len(actual_files) - - effort = "small" - validation_msg = "" - - keyword_effort = "small" - for kw in LARGE_KEYWORDS: - if re.search(r'\b' + re.escape(kw) + r'\b', combined_text): - keyword_effort = "large" - break - - if keyword_effort != "large": - for kw in MEDIUM_KEYWORDS: - if re.search(r'\b' + re.escape(kw) + r'\b', combined_text): - keyword_effort = "medium" - break - - if num_files == 0: - effort = keyword_effort if keyword_effort in ['medium', 'large'] else 'medium' - validation_msg = f"No specific files identified in codebase. Keyword heuristic: {keyword_effort}." - else: - file_details = ", ".join([f"{os.path.basename(f[0])} ({f[1]} lines)" for f in actual_files]) - if num_files > 3 or total_lines > 1500 or keyword_effort == "large": - effort = "large" - validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Keyword hint: {keyword_effort}." - elif num_files >= 2 or total_lines > 500 or keyword_effort == "medium": - effort = "medium" - validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Keyword hint: {keyword_effort}." - else: - effort = "small" - validation_msg = f"Codebase validation: {num_files} files ({file_details}), {total_lines} total lines. Appears highly localized." - - return effort, validation_msg - -updated_count = 0 -for issue in issues: - old_effort = issue.get('effort_level') - new_effort, validation_reason = analyze_issue(issue) - - issue['effort_level'] = new_effort - - existing_reasoning = issue.get('reasoning', '') - existing_reasoning = existing_reasoning.split(' | Codebase validation:')[0] - existing_reasoning = existing_reasoning.split(' | No specific files identified')[0] - - issue['reasoning'] = f"{existing_reasoning} | {validation_reason}".strip(' |') - - if old_effort != new_effort: - updated_count += 1 - -with open(ISSUES_FILE, 'w') as f: - json.dump(issues, f, indent=2) - -print(f"Successfully re-evaluated and updated {updated_count} issues. Codebase validated.")