2026-05-06 16:11:45 -04:00
"""
Purpose: A highly configurable, generic agentic processor for any backlog task.
Unlike the specific bug analyzers, this script accepts custom system prompts, input datasets, and output locations via command-line arguments, making it reusable for features, label updates, or custom queries.
"""
2026-05-06 16:02:30 -04:00
import json
import urllib . request
import os
import subprocess
import concurrent . futures
import threading
import argparse
# --- Defaults and Configuration ---
DEFAULT_MODEL = " gemini-3-flash-preview "
DEFAULT_URL_TEMPLATE = " https://generativelanguage.googleapis.com/v1beta/models/ {MODEL} :generateContent?key= {API_KEY} "
class GenericIssueProcessor :
def __init__ ( self , api_key , input_file , output_file , project_path , system_prompt , model = DEFAULT_MODEL , max_workers = 4 , turn_limit = 10 ) :
self . api_key = api_key
self . model = model
self . url = DEFAULT_URL_TEMPLATE . format ( MODEL = model , API_KEY = api_key )
self . input_file = input_file
self . output_file = output_file
self . project_path = os . path . abspath ( project_path )
self . system_prompt = system_prompt
self . max_workers = max_workers
self . turn_limit = turn_limit
self . file_lock = threading . Lock ( )
with open ( input_file , ' r ' ) as f :
self . data = json . load ( f )
def _execute_tool ( self , call ) :
name = call [ ' name ' ]
args = call . get ( ' args ' , { } )
if name == ' search_code ' :
pattern = args . get ( ' pattern ' , ' ' ) . replace ( ' " ' , ' \\ " ' )
try :
cmd = f ' grep -rn " { pattern } " { self . project_path } | grep -vE " node_modules|dist|build| \\ .test \\ . " | head -n 20 '
res = subprocess . check_output ( cmd , shell = True , text = True , stderr = subprocess . STDOUT )
return res if res else " No matches found. "
except subprocess . CalledProcessError as e :
return e . output if e . output else " No matches found. "
elif name == ' read_file ' :
filepath = args . get ( ' filepath ' , ' ' )
if not filepath . startswith ( ' / ' ) :
filepath = os . path . join ( self . project_path , filepath )
try :
if not os . path . exists ( filepath ) :
basename = os . path . basename ( filepath )
find_cmd = f ' find { self . project_path } -name " { basename } " | head -n 1 '
found_path = subprocess . check_output ( find_cmd , shell = True , text = True ) . strip ( )
if found_path : filepath = found_path
else : return f " File { filepath } not found. "
cmd = f ' head -n 300 " { filepath } " '
res = subprocess . check_output ( cmd , shell = True , text = True , stderr = subprocess . STDOUT )
return res
except Exception as e :
return str ( e )
return " Unknown tool "
def _call_gemini ( self , messages ) :
tools = [ {
" functionDeclarations " : [
{
" name " : " search_code " ,
" description " : " Search the project directory for a string using grep. " ,
" parameters " : {
" type " : " OBJECT " ,
" properties " : { " pattern " : { " type " : " STRING " } } ,
" required " : [ " pattern " ]
}
} ,
{
" name " : " read_file " ,
" description " : " Read a specific file context. " ,
" parameters " : {
" type " : " OBJECT " ,
" properties " : { " filepath " : { " type " : " STRING " } } ,
" required " : [ " filepath " ]
}
}
]
} ]
data = {
" contents " : messages ,
" tools " : tools ,
" generationConfig " : { " temperature " : 0.1 }
}
req = urllib . request . Request ( self . url , data = json . dumps ( data ) . encode ( ' utf-8 ' ) , headers = { ' Content-Type ' : ' application/json ' } )
with urllib . request . urlopen ( req ) as response :
return json . loads ( response . read ( ) . decode ( ' utf-8 ' ) )
def process_item ( self , item ) :
item_id = item . get ( ' number ' ) or item . get ( ' id ' , ' unknown ' )
print ( f " Processing item { item_id } ... " )
prompt = f " { self . system_prompt } \n \n Item Content: \n { json . dumps ( item , indent = 2 ) [ : 2000 ] } "
messages = [ { " role " : " user " , " parts " : [ { " text " : prompt } ] } ]
result = { " error " : " Turn limit exceeded " }
for turn in range ( self . turn_limit ) :
try :
res = self . _call_gemini ( messages )
candidate = res [ ' candidates ' ] [ 0 ] [ ' content ' ]
parts = candidate . get ( ' parts ' , [ ] )
if ' role ' not in candidate : candidate [ ' role ' ] = ' model '
messages . append ( candidate )
fcalls = [ p for p in parts if ' functionCall ' in p ]
if fcalls :
responses = [ ]
for fc in fcalls :
tool_res = self . _execute_tool ( fc [ ' functionCall ' ] )
responses . append ( {
" functionResponse " : {
" name " : fc [ ' functionCall ' ] [ ' name ' ] ,
" response " : { " result " : tool_res [ : 5000 ] }
}
} )
messages . append ( { " role " : " user " , " parts " : responses } )
else :
text = parts [ 0 ] . get ( ' text ' , ' ' )
if not text : continue
text = text . replace ( ' ```json ' , ' ' ) . replace ( ' ``` ' , ' ' ) . strip ( )
result = json . loads ( text )
break
except Exception as e :
result = { " error " : str ( e ) }
break
item . update ( result )
with self . file_lock :
with open ( self . output_file , ' w ' ) as f :
json . dump ( self . data , f , indent = 2 )
print ( f " Finished item { item_id } " )
def run ( self ) :
print ( f " Starting processing with { self . max_workers } workers... " )
with concurrent . futures . ThreadPoolExecutor ( max_workers = self . max_workers ) as executor :
list ( executor . map ( self . process_item , self . data ) )
print ( f " Processing complete. Saved to { self . output_file } " )
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = " Generic AI Issue Processor " )
parser . add_argument ( " --api-key " , required = True , help = " Gemini API Key " )
parser . add_argument ( " --input " , required = True , help = " Input JSON file " )
parser . add_argument ( " --output " , required = True , help = " Output JSON file " )
parser . add_argument ( " --project " , default = " . " , help = " Project root for tools " )
parser . add_argument ( " --prompt " , required = True , help = " System prompt / Instructions " )
parser . add_argument ( " --limit " , type = int , default = 10 , help = " Turn limit per item " )
parser . add_argument ( " --workers " , type = int , default = 4 , help = " Concurrent workers " )
args = parser . parse_args ( )
processor = GenericIssueProcessor (
api_key = args . api_key ,
input_file = args . input ,
output_file = args . output ,
project_path = args . project ,
system_prompt = args . prompt ,
max_workers = args . workers ,
turn_limit = args . limit
)
processor . run ( )