elismasilva's picture
fixes on handle_upload
d324f3c
raw
history blame
77.3 kB
import base64
import io
import json
import os
import re
import time
import gradio as gr
import markdown
import requests
from ddgs import DDGS
from dotenv import load_dotenv
from PIL import Image
from config.constants import (
AVAILABLE_ISSUE_MODELS_BY_PROVIDER,
AVAILABLE_SKETCH_MODELS_BY_PROVIDER,
PROVIDER_LOGOS,
SYSTEM_PROMPT_COMMENT_SUMMARIZER_EN,
SYSTEM_PROMPT_DUPLICATE_FINDER_EN,
SYSTEM_PROMPT_ISSUE_ANALYZER_EN,
SYSTEM_PROMPT_PRIORITIZER_EN,
SYSTEM_PROMPT_SKETCH_EN,
SYSTEM_PROMPT_THEME_GENERATOR_DESIGNER_EN,
SYSTEM_PROMPT_THEME_GENERATOR_ENGINEER_EN,
)
from config.database import connect, initialize_database
from config.model_factory import LLMFactory
from config.theme_model import FullThemeModel
# Load environment variables
load_dotenv()
# region TOOLS FUNCTIONS
def _initialize_database():
"""Initializes the database by creating necessary tables."""
initialize_database()
return
def get_available_models():
"""
RETRIEVES the registry of supported LLM providers and their specific model names available on this server.
MUST BE USED whenever the user asks:
- "What models do you have?"
- "Which providers are available?"
- "List the options."
Returns:
dict: A JSON object containing lists of 'text_models' and 'vision_models' grouped by provider.
"""
return {
"text_models": AVAILABLE_ISSUE_MODELS_BY_PROVIDER,
"vision_models": AVAILABLE_SKETCH_MODELS_BY_PROVIDER,
"providers": list(AVAILABLE_ISSUE_MODELS_BY_PROVIDER.keys()),
}
def fetch_all_pages(url: str, headers: dict, progress: gr.Progress | None = None, desc: str = ""):
"""Helper function to fetch all pages from a paginated GitHub API endpoint."""
results = []
page_num = 1
base_url = url.split("&page=")[0]
while True:
paginated_url = f"{base_url}&page={page_num}"
response = requests.get(paginated_url, headers=headers)
if response.status_code != 200:
if response.status_code == 422:
print("Warning: ... Assuming end of results.")
break
response.raise_for_status()
json_data = response.json()
if not json_data:
break
results.extend(json_data)
page_num += 1
return results
def web_search(query: str, max_results: int = 5):
"""
Performs a web search using DuckDuckGo to find documentation, error solutions, or external context.
Useful when the issue involves external libraries or obscure error messages.
Args:
query (str): The search query (e.g., "gradio error 422 unprocessable entity").
max_results (int): Number of results to return.
Returns:
str: A summarized list of search results with titles, snippets, and URLs.
"""
if not query:
return "Please provide a search query."
try:
results = DDGS().text(query, max_results=max_results)
if not results:
return "No results found on the web."
formatted_results = f"## Web Search Results for '{query}'\n\n"
for res in results:
formatted_results += f"### [{res['title']}]({res['href']})\n"
formatted_results += f"{res['body']}\n\n"
return formatted_results
except Exception as e:
return f"Error performing web search: {str(e)}"
def sync_repository(
repo_url: str,
github_token: str | None = None,
progress=gr.Progress(track_tqdm=True),
):
"""
Performs a lightweight sync of a GitHub repository with the local database.
Args:
repo_url (str): The full URL of the GitHub repository (e.g., 'https://github.com/gradio-app/gradio').
github_token (str, optional): A GitHub Personal Access Token. Optional. If not provided, the tool will work but may hit public API rate limits.
Returns:
str: A status message indicating completion.
"""
if not repo_url:
raise gr.Error("Please provide a repository URL.")
try:
owner, repo = repo_url.strip().replace("https://github.com/", "").split("/")
repo_slug = f"{owner}/{repo}"
headers = {"Authorization": f"token {github_token}"} if github_token else {}
conn = connect()
cursor = conn.cursor()
yield f"🚀 Starting sync for {repo_slug}..."
# 1. Sync OPEN issues only
issues_url = f"https://api.github.com/repos/{owner}/{repo}/issues?state=open&per_page=100&sort=updated"
yield f"📥 Fetching issues from GitHub API..."
open_items = fetch_all_pages(issues_url, headers)
github_open_ids = set()
yield f"💾 Saving {len(open_items)} issues to database..."
for item in progress.tqdm(open_items, desc="Syncing Open Issues"):
if "pull_request" in item:
continue
item_labels = json.dumps([label["name"] for label in item["labels"]])
github_open_ids.add(item["number"])
cursor.execute(
"""
INSERT INTO items (repo, number, is_pr, title, body, state, user_login, labels, comments, reactions, created_at, updated_at, html_url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (repo, number) DO UPDATE SET
title = EXCLUDED.title,
body = EXCLUDED.body,
state = EXCLUDED.state,
labels = EXCLUDED.labels,
comments = EXCLUDED.comments,
reactions = EXCLUDED.reactions,
updated_at = EXCLUDED.updated_at,
html_url = EXCLUDED.html_url;
""",
(
repo_slug,
item["number"],
False,
item["title"],
item.get("body"),
item["state"],
item["user"]["login"],
item_labels,
item["comments"],
item.get("reactions", {}).get("total_count", 0),
item["created_at"],
item["updated_at"],
item["html_url"],
),
)
# 2. Find closed issues
cursor.execute("SELECT number FROM items WHERE repo = %s AND state = 'open'", (repo_slug,))
db_open_ids = {row[0] for row in cursor.fetchall()}
closed_ids = db_open_ids - github_open_ids
if closed_ids:
yield f"⚡ Found {len(closed_ids)} issues closed remotely. Updating DB..."
if len(closed_ids) == 1:
cursor.execute("UPDATE items SET state = 'closed' WHERE repo = %s AND number = %s", (repo_slug, list(closed_ids)[0]))
else:
cursor.execute("UPDATE items SET state = 'closed' WHERE repo = %s AND number IN %s", (repo_slug, tuple(closed_ids)))
# 3. Sync ALL releases
yield f"📥 Fetching releases..."
releases_url = f"https://api.github.com/repos/{owner}/{repo}/releases?per_page=100"
all_releases = fetch_all_pages(releases_url, headers, progress, desc="Syncing All Releases")
yield f"💾 Saving {len(all_releases)} releases..."
for release in progress.tqdm(all_releases, desc="Saving Releases"):
cursor.execute(
"""
INSERT INTO releases (repo, tag_name, name, body, published_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (tag_name) DO NOTHING;
""",
(
repo_slug,
release["tag_name"],
release["name"],
release.get("body"),
release["published_at"],
),
)
conn.commit()
conn.close()
yield f"✅ Sync complete for {repo_slug}!"
except Exception as e:
import traceback
traceback.print_exc()
error_msg = str(e)
if "429" in error_msg or "Rate" in error_msg:
detailed_msg = "Rate Limit Error: The AI provider is busy. Please wait a moment."
else:
detailed_msg = f"System Error: {error_msg}"
raise gr.Error(detailed_msg)
def _extract_answer(text):
"""
Extracts content inside <think> and <answer> tags.
Returns: (thought_content, answer_content)
"""
import re
thought = ""
answer = text
think_match = re.search(r"<think>(.*?)</think>", text, re.DOTALL)
if think_match:
thought = think_match.group(1).strip()
answer_match = re.search(r"<answer>(.*?)</answer>", text, re.DOTALL)
if answer_match:
answer = answer_match.group(1).strip()
elif "<think>" in text and not answer_match:
parts = text.split("</think>")
if len(parts) > 1:
answer = parts[1].strip()
return thought, answer
def _create_provider_seal(provider, model, warning_msg=""):
provider = provider.lower()
logo_url = PROVIDER_LOGOS.get(provider, "")
warning_html = (
f"<div style='background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 5px; margin-bottom: 10px;'>{warning_msg}</div>"
if warning_msg
else ""
)
return f"""
{warning_html}
<div style="font-family: sans-serif; font-size: 0.8rem; color: #6B7280; border-bottom: 1px solid #E5E7EB; padding-bottom: 8px; margin-bottom: 8px;">
Analyzed by <strong style="color: #111827;"> {model.capitalize()}</strong>
<br><span style="font-size: 0.7rem;">Provider: {provider.capitalize()} <img src='{logo_url}' alt='logo' width='220' style='vertical-align: middle;'></span>
</div>
"""
def analyze_github_issue(
repo_url: str,
issue_number: int,
llm_provider: str = "gemini",
llm_model: str = "gemini-2.0-flash",
github_token: str | None = None,
llm_api_key: str | None = None,
request: gr.Request = None,
progress=gr.Progress(track_tqdm=True),
):
"""
Analyzes a single GitHub issue to determine its resolution status.
Implements fallback to DB cache if GitHub API Rate Limits are exceeded.
Args:
repo_url (str): The full URL of the GitHub repository.
issue_number (int): The issue number to analyze (e.g., 1234).
llm_provider (str, optional): The LLM provider to use. Defaults to 'gemini'.
llm_model (str, optional): The specific model name. Defaults to 'gemini-2.0-flash'.
github_token (str, optional): GitHub Token. Optional. Recommended for higher rate limits.
llm_api_key (str, optional): API Key for the LLM. OPTIONAL. If not provided, uses server defaults.
Returns:
str: An HTML-formatted analysis report.
"""
_validate_api_keys(llm_api_key, request)
if not repo_url:
error = "## Error\nPlease provide a repository URL."
gr.Info(f"⚠️ {error}")
return error, gr.skip()
if not issue_number or issue_number == 0:
error = "Please provide the issue number."
gr.Info(f"⚠️ {error}")
return error, gr.skip()
try:
repo_slug = "/".join(repo_url.strip().replace("https://github.com/", "").split("/")[:2])
headers = {"Authorization": f"token {github_token}"} if github_token else {}
conn = connect()
cursor = conn.cursor()
issue_data = None
comments_data = []
linked_prs_details = []
rate_limit_warning = ""
# 1. JUST-IN-TIME DATA FETCHING
progress(0, desc="Fetching issue details...")
issue_url = f"https://api.github.com/repos/{repo_slug}/issues/{issue_number}"
try:
# Try API first
issue_response = requests.get(issue_url, headers=headers)
issue_response.raise_for_status()
issue_data = issue_response.json()
# If success, fetch auxiliary data
progress(0.2, desc="Fetching comments...")
if issue_data.get("comments", 0) > 0:
comments_resp = requests.get(issue_data["comments_url"], headers=headers)
if comments_resp.status_code == 200:
comments_data = comments_resp.json()
progress(0.4, desc="Fetching timeline...")
timeline_events = fetch_all_pages(issue_data["timeline_url"] + "?per_page=100", headers)
pr_urls_to_fetch = []
for event in timeline_events:
if event.get("event") == "cross-referenced" and "pull_request" in event.get("source", {}).get("issue", {}):
pr_num = event["source"]["issue"]["number"]
pr_urls_to_fetch.append(f"https://api.github.com/repos/{repo_slug}/pulls/{pr_num}")
for i, url in enumerate(pr_urls_to_fetch):
pr_res = requests.get(url, headers=headers)
if pr_res.status_code == 200:
linked_prs_details.append(pr_res.json())
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
print(f"⚠️ GitHub Rate Limit exceeded for {issue_number}. Attempting DB Fallback.")
cursor.execute(
"SELECT * FROM items WHERE repo = %s AND number = %s",
(repo_slug, issue_number),
)
db_item = cursor.fetchone()
if db_item:
rate_limit_warning = "⚠️ **Warning:** GitHub API Rate Limit exceeded. Analysis based on **cached data**."
issue_data = {
"number": db_item["number"],
"title": db_item["title"],
"state": db_item["state"],
"body": db_item["body"],
"labels": db_item["labels"],
}
# Clear volatile data
comments_data = []
linked_prs_details = []
else:
raise gr.Error("GitHub Rate Limit Exceeded and Issue NOT found in Cache. Please provide a GitHub Token.")
else:
raise e
# 2. GET RELEASE DATA (Always from cache, fixed fetchall issue)
progress(0.6, desc="Querying local release cache...")
cursor.execute(
"SELECT * FROM releases WHERE repo = %s ORDER BY published_at DESC LIMIT 50",
(repo_slug,),
)
releases = cursor.fetchall()
conn.close()
# 3. SUMMARIZE COMMENTS
additional_problems_summary = "No comments available (Rate Limit or Empty)."
if comments_data:
progress(0.7, desc="Summarizing comments with AI...")
comments_text = "\n---\n".join([f"User '{c.get('user', {}).get('login')}':\n{c.get('body')}" for c in comments_data])
if comments_text:
try:
summarizer_prompt = SYSTEM_PROMPT_COMMENT_SUMMARIZER_EN.format(comments_text=comments_text)
additional_problems_summary = LLMFactory.call(
llm_provider,
llm_model,
messages=[{"role": "user", "content": summarizer_prompt}],
temperature=0.0,
api_key=llm_api_key,
)
except Exception:
additional_problems_summary = "Could not summarize comments due to an error."
elif rate_limit_warning:
additional_problems_summary = "Could not fetch comments due to GitHub Rate Limits."
# 4. FORMAT CONTEXT
progress(0.8, desc="Preparing final analysis...")
pull_requests_summary = (
"\n".join([f"- PR #{pr['number']} ('{pr['title']}') - Status: {pr.get('state')}..." for pr in linked_prs_details])
if linked_prs_details
else "No linked PRs found (or API limited)."
)
# Fixed date casting
release_notes_summary = (
"\n\n".join([f"- **Release {r['tag_name']} ({str(r['published_at'])[:10]}):**\n{str(r['body'])[:500]}..." for r in releases])
if releases
else "No releases found in local cache."
)
prompt_context = {
"issue_number": issue_data["number"],
"issue_title": issue_data["title"],
"issue_state": issue_data["state"],
"issue_body": issue_data["body"] or "No description provided.",
"additional_problems_summary": additional_problems_summary.strip(),
"issue_labels": (json.dumps([label["name"] for label in issue_data["labels"]]) if issue_data.get("labels") else "None"),
"pull_requests_summary": pull_requests_summary,
"release_notes_summary": release_notes_summary,
}
final_prompt = SYSTEM_PROMPT_ISSUE_ANALYZER_EN.format(**prompt_context)
# 5. CALL LLM
progress(0.9, desc="Generating final report with AI...")
messages = [{"role": "user", "content": final_prompt}]
raw_output = LLMFactory.call(
llm_provider,
llm_model,
messages=messages,
temperature=0.1,
max_tokens=2048,
api_key=llm_api_key,
)
if isinstance(raw_output, str) and raw_output.startswith("Error:"):
raise Exception(raw_output)
thought, analysis = _extract_answer(raw_output)
warning_html = (
f"<div style='background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 5px; margin-bottom: 10px;'>{rate_limit_warning}</div>"
if rate_limit_warning
else ""
)
seal = _create_provider_seal(llm_provider, llm_model, warning_html)
html_report = seal + markdown.markdown(analysis.strip(), extensions=["extra"])
return html_report, thought
except Exception as e:
import traceback
traceback.print_exc()
error_msg = str(e)
if "Auth" in error_msg or "Key" in error_msg:
msg = f"Authentication Error: {error_msg}"
elif "403" in error_msg:
msg = "GitHub Rate Limit Exceeded. Please provide a Token."
else:
msg = f"System Error: {error_msg}"
raise gr.Error(msg)
def find_duplicate_issues(
repo_url: str,
issue_number: int,
llm_provider: str = "gemini",
llm_model: str = "gemini-2.0-flash",
github_token: str | None = None,
llm_api_key: str | None = None,
request: gr.Request = None,
progress=gr.Progress(),
):
"""
Finds potential duplicate issues for a given issue using mentions and keyword search.
Args:
repo_url (str): The full URL of the GitHub repository.
issue_number (int): The main issue number to check.
llm_provider (str, optional): The LLM provider. Defaults to 'gemini'.
llm_model (str, optional): The model name. Defaults to 'gemini-2.0-flash'.
github_token (str, optional): GitHub Token. Optional.
llm_api_key (str, optional): API Key for the LLM. **OPTIONAL**. If not provided, the server uses its own keys.
Returns:
str: A Markdown list of potential duplicates.
"""
_validate_api_keys(llm_api_key, request)
if not repo_url:
error = "Please provide the repository URL and the main issue number."
gr.Info(f"⚠️ {error}")
return error, gr.skip()
if not issue_number or issue_number == 0:
error = "Please provide the main issue number."
gr.Info(f"⚠️ {error}")
return error, gr.skip()
try:
owner, repo = repo_url.strip().replace("https://github.com/", "").split("/")
repo_slug = f"{owner}/{repo}"
headers = {"Authorization": f"token {github_token}"} if github_token else {}
conn = connect()
cursor = conn.cursor()
progress(0, desc="Fetching main issue from cache...")
cursor.execute(
"SELECT * FROM items WHERE repo = %s AND number = %s",
(repo_slug, issue_number),
)
main_issue = cursor.fetchone()
if not main_issue:
conn.close()
error = "Main issue not found in cache. Please synchronize the repository first."
gr.Info(f"⚠️ {error}")
return error, gr.skip()
# STEP 1: TIMELINE
progress(0.2, desc="Fetching timeline for mentions...")
mentions_summary = ""
rate_limit_warning = ""
try:
timeline_api_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/timeline?per_page=100"
timeline_events = fetch_all_pages(timeline_api_url, headers)
for event in timeline_events:
if event.get("event") == "cross-referenced":
source = event.get("source", {}).get("issue", {})
if source and "pull_request" not in source:
mentions_summary += f"- Mentioned by Issue #{source.get('number')}: '{source.get('title')}'\n"
if not mentions_summary:
mentions_summary = "No other issues were found mentioning this issue in its timeline."
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
print("⚠️ GitHub Rate Limit in find_duplicates. Skipping timeline.")
mentions_summary = "⚠️ **Could not check mentions due to GitHub API Rate Limits.**"
rate_limit_warning = "⚠️ **Warning:** Analysis running in degraded mode (Keyword Search only)."
else:
print(f"Warning: Timeline fetch failed: {e}")
mentions_summary = "Could not fetch timeline data."
# STEP 2: KEYWORD SEARCH
progress(0.5, desc="Searching for candidates in cache...")
main_issue_title = main_issue["title"]
keywords = [word for word in re.findall(r"\b\w+\b", main_issue_title) if len(word) > 3]
if keywords:
query_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords])
candidate_query = f"SELECT * FROM items WHERE repo = '{repo_slug}' AND is_pr = FALSE AND state = 'open' AND ({query_conditions})"
cursor.execute(candidate_query)
candidate_issues = cursor.fetchall()
else:
candidate_issues = []
conn.close()
candidate_issues_summary = ""
for cand in candidate_issues:
if cand["number"] == main_issue["number"]:
continue
body_text = cand["body"] or ""
candidate_issues_summary += f'- **Candidate #{cand["number"]}**: "{cand["title"]}"\n - **Description**: {body_text[:200]}...\n'
if not candidate_issues_summary:
candidate_issues_summary = "No similar open issues found via keyword search."
# STEP 3: LLM
progress(0.8, desc="Analyzing for duplicates with AI...")
if rate_limit_warning:
mentions_summary += f"\n\n(System Note: {rate_limit_warning})"
prompt_context = {
"main_issue_number": main_issue["number"],
"main_issue_title": main_issue["title"],
"main_issue_body": main_issue["body"],
"mentions_summary": mentions_summary,
"candidate_issues_summary": candidate_issues_summary,
}
final_prompt = SYSTEM_PROMPT_DUPLICATE_FINDER_EN.format(**prompt_context)
messages = [{"role": "user", "content": final_prompt}]
raw_output = LLMFactory.call(
llm_provider,
llm_model,
messages=messages,
temperature=0.0,
api_key=llm_api_key,
)
if rate_limit_warning:
return f"{rate_limit_warning}\n\n{raw_output}", None
if "## Duplicate Analysis" not in raw_output:
raw_output = f"## Duplicate Analysis for Issue #{issue_number}\n\n{raw_output}"
thought, analysis = _extract_answer(raw_output)
warning_html = (
f"<div style='background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 5px; margin-bottom: 10px;'>{rate_limit_warning}</div>"
if rate_limit_warning
else ""
)
seal = _create_provider_seal(llm_provider, llm_model, warning_html)
html_report = seal + markdown.markdown(analysis.strip(), extensions=["extra"])
return html_report, thought
except Exception as e:
import traceback
traceback.print_exc()
error_msg = str(e)
if "Auth" in error_msg or "Key" in error_msg:
msg = f"Authentication Error: {error_msg}"
else:
msg = f"System Error: {error_msg}"
raise gr.Error(msg)
def prioritize_open_issues(
repo_url: str,
llm_provider: str = "gemini",
llm_model: str = "gemini-2.0-flash",
llm_api_key: str | None = None,
request: gr.Request = None,
progress=gr.Progress(track_tqdm=True),
):
"""
Analyzes open issues from the cache to create a prioritized backlog.
Returns a stream of status updates followed by the final report.
"""
_validate_api_keys(llm_api_key, request)
if not repo_url:
error = "Please provide the repository URL."
gr.Info(f"⚠️ {error}")
return error, gr.skip()
try:
repo_slug = "/".join(repo_url.strip().replace("https://github.com/", "").split("/")[:2])
yield f"<div style='padding:20px; color:#666;'>🔄 Connecting to database for {repo_slug}...</div>", ""
conn = connect()
cursor = conn.cursor()
progress(0, desc="Fetching open issues from cache...")
yield f"<div style='padding:20px; color:#666;'>📥 Fetching open issues from cache...</div>", ""
cursor.execute(
"SELECT * FROM items WHERE repo = %s AND is_pr = FALSE AND state = 'open' ORDER BY comments DESC, reactions DESC LIMIT 50",
(repo_slug,),
)
open_issues = cursor.fetchall()
conn.close()
if not open_issues:
yield "No open issues found in the cache to prioritize.", ""
return
progress(0.5, desc="Preparing context for prioritization...")
yield f"<div style='padding:20px; color:#666;'>📝 Preparing context for {len(open_issues)} issues...</div>", ""
issues_list_summary = ""
for issue in open_issues:
labels = issue["labels"] if issue["labels"] else "None"
issues_list_summary += (
f'- **Issue #{issue["number"]}**: "{issue["title"]}"\n'
f" - **Labels**: {labels}\n"
f" - **Comments**: {issue['comments']}, **Reactions**: {issue['reactions']}\n"
)
prompt_context = {"issues_list_summary": issues_list_summary}
final_prompt = SYSTEM_PROMPT_PRIORITIZER_EN.format(**prompt_context)
progress(0.8, desc="Generating priority list with AI...")
yield f"<div style='padding:20px; color:#2563EB;'>🧠 <b>{llm_model}</b> is analyzing priorities... This may take a moment.</div>", ""
messages = [{"role": "user", "content": final_prompt}]
raw_output = LLMFactory.call(
llm_provider,
llm_model,
messages=messages,
temperature=0.1,
max_tokens=4096,
api_key=llm_api_key,
)
thought, report = _extract_answer(raw_output)
if report.strip().startswith("answer"):
report = report.replace("answer", "", 1).strip()
if "\n" not in report and "|" in report:
report = report.replace("| |", "|\n|")
report = report.replace("Justification |", "Justification |\n")
report = report.replace("Justification|", "Justification|\n")
seal = _create_provider_seal(llm_provider, llm_model)
html_table = markdown.markdown(report, extensions=["tables", "extra"])
html_report = f"""
<style>
.prio-table table {{
width: 100%;
border-collapse: collapse;
font-family: var(--font);
font-size: var(--text-sm);
color: var(--body-text-color);
}}
.prio-table th {{
background-color: var(--background-fill-secondary);
padding: 10px;
border: 1px solid var(--border-color-primary);
text-align: left;
font-weight: 600;
}}
.prio-table td {{
padding: 10px;
border: 1px solid var(--border-color-primary);
background-color: transparent;
}}
.prio-table tr:nth-child(even) {{
background-color: var(--background-fill-secondary);
}}
.prio-table tr:hover {{
background-color: var(--background-fill-primary);
}}
</style>
<div class="prio-table">
{seal}
{html_table}
</div>
"""
yield html_report, thought
except Exception as e:
import traceback
traceback.print_exc()
error_msg = str(e)
msg = f"Authentication Error: {error_msg}" if "Auth" in error_msg or "Key" in error_msg else f"System Error: {error_msg}"
raise gr.Error(msg)
def reply_and_close_issue(
repo_url: str,
issue_number: int,
comment_body: str,
close_issue: bool = False,
github_token: str | None = None,
request: gr.Request = None,
):
"""
Posts a comment on a GitHub issue and optionally closes it.
Updates the local database state immediately if closed.
Args:
repo_url (str): Full repository URL.
issue_number (int): The issue number.
comment_body (str): The markdown text to post as a comment.
close_issue (bool): If True, changes the issue state to 'closed'.
github_token (str): MANDATORY. A GitHub token with write permissions.
"""
if not github_token:
error = "Write Permission Error: A GitHub Token is mandatory for posting comments or closing issues."
gr.Info(f"⚠️ {error}")
return error
if not repo_url:
error = "Please provide the repository URL."
gr.Info(f"⚠️ {error}")
return error
if not issue_number or issue_number == 0:
error = "Please provide the issue number."
gr.Info(f"⚠️ {error}")
return error
try:
owner, repo = repo_url.strip().replace("https://github.com/", "").split("/")
repo_slug = f"{owner}/{repo}"
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json",
}
# 1. Post the Comment
if comment_body and comment_body.strip():
comment_url = f"https://api.github.com/repos/{repo_slug}/issues/{issue_number}/comments"
comment_resp = requests.post(comment_url, headers=headers, json={"body": comment_body})
comment_resp.raise_for_status()
action_log = f"✅ Comment posted on issue #{issue_number}."
else:
action_log = "ℹ️ No comment body provided, skipping comment."
# 2. Close the Issue (if requested)
if close_issue:
issue_url = f"https://api.github.com/repos/{repo_slug}/issues/{issue_number}"
# state_reason can be 'completed' or 'not_planned'
close_resp = requests.patch(
issue_url,
headers=headers,
json={"state": "closed", "state_reason": "completed"},
)
close_resp.raise_for_status()
action_log += f"\n🔒 Issue #{issue_number} has been CLOSED."
# 3. Update Local DB optimistically
try:
conn = connect()
cursor = conn.cursor()
cursor.execute(
"UPDATE items SET state = 'closed' WHERE repo = %s AND number = %s",
(repo_slug, int(issue_number))
)
conn.commit()
conn.close()
print(f"✅ Local DB updated: #{issue_number} -> closed")
except Exception as db_e:
print(f"⚠️ Failed to update local DB: {db_e}")
# ------------------------------------
return f"## Success\n{action_log}"
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise gr.Error("Error 404: Repo or Issue not found (or Token lacks permission).")
if e.response.status_code == 401:
raise gr.Error("Error 401: Invalid GitHub Token.")
raise gr.Error(f"GitHub API Error: {e}")
except Exception as e:
import traceback
traceback.print_exc()
raise gr.Error(f"System Error: {str(e)}")
def search_issues(
repo_url: str,
query: str = "",
issue_number: int | None = None,
status: str = "open",
verdict: str | None = None,
author: str | None = None, # Added author parameter
limit: int = 5
) -> list | dict:
"""
Searches for issues in the database based on multiple criteria.
Use this to find issues before getting their full details.
Args:
repo_url (str): The full repository URL.
query (str, optional): Text to search in title or body (e.g., "chatbot error").
issue_number (int, optional): Specific issue ID to find. If provided, ignores query.
status (str, optional): GitHub state ('open', 'closed', 'all'). Defaults to 'open'.
verdict (str, optional): AI Analysis Verdict ('resolved', 'duplicate', 'unresolved').
author (str, optional): Filter by issue creator's username (e.g., 'abidlabs').
limit (int, optional): Max results to return. Defaults to 5.
Returns:
list | dict: A list of dictionaries representing matching issues, or an error dict.
"""
if not repo_url:
return {"error": "Repo URL required."}
try:
repo_slug = repo_url.replace("https://github.com/", "").strip("/")
conn = connect()
cursor = conn.cursor()
params = []
# Base Query: Join Items (GitHub) with Reports (AI)
# We use LEFT JOIN so we can find non-analyzed issues too
sql = """
SELECT
i.number,
i.title,
i.user_login,
LEFT(i.body, 200) as snippet,
i.state as github_state,
COALESCE(r.verdict, 'pending_analysis') as ai_verdict
FROM items i
LEFT JOIN issue_reports r ON i.number = r.issue_number AND r.repo_url ILIKE %s
WHERE i.repo = %s
"""
# Param 1 (Join): '%slug%' to match full url
params.append(f"%{repo_slug}%")
# Param 2 (Where): slug
params.append(repo_slug)
# --- FILTERS ---
# 1. Specific ID (Strongest filter)
if issue_number:
sql += " AND i.number = %s"
params.append(issue_number)
else:
# 2. Status Filter
if status and status.lower() not in ["all", "none", ""]:
sql += " AND i.state = %s"
params.append(status)
# 3. Verdict Filter
if verdict == "None" or verdict == "":
verdict = None
if verdict:
# If filtering by verdict, we assume user wants analyzed issues
sql += " AND r.verdict = %s"
params.append(verdict)
# 4. Author Filter (New)
if author:
sql += " AND i.user_login = %s"
params.append(author)
# 5. Text Search
if query:
sql += " AND (i.title ILIKE %s OR i.body ILIKE %s)"
wildcard = f"%{query}%"
params.extend([wildcard, wildcard])
sql += " ORDER BY i.created_at DESC LIMIT %s"
params.append(limit)
cursor.execute(sql, tuple(params))
rows = cursor.fetchall()
conn.close()
if not rows:
return {"error": "No issues found matching criteria."}
# Convert to list of dicts (Return Python object, let Gradio handle JSON serialization)
result = [
{
"id": row["number"],
"title": row["title"],
"author": row["user_login"],
"snippet": row["snippet"] + "..." if row["snippet"] else "",
"state": row["github_state"],
"verdict": row["ai_verdict"],
}
for row in rows
]
return result
except Exception as e:
return {"error": f"Database Error: {str(e)}"}
def get_issue_report(repo_url: str, issue_number: int) -> dict:
"""
Retrieves the full AI analysis report and proposed action for a specific issue.
Use this AFTER searching to get details.
Args:
repo_url (str): The full repository URL.
issue_number (int): The issue number.
Returns:
str: JSON object containing the full analysis body and action plan.
"""
try:
conn = connect()
cursor = conn.cursor()
# Check Report Table
cursor.execute("SELECT analysis_body, proposed_action, verdict, thought_process FROM issue_reports WHERE issue_number = %s AND repo_url = %s", (issue_number, repo_url))
row = cursor.fetchone()
conn.close()
if not row:
return {"error": f"No AI report found for Issue #{issue_number}. It might be pending analysis."}
result = json.dumps(
{
"issue": issue_number,
"verdict": row["verdict"],
"thought": row["thought_process"],
"report": row["analysis_body"], # Full Markdown
"action": row["proposed_action"], # JSON
},
indent=2,
default=str,
)
return result
except Exception as e:
return f"Database Error: {str(e)}"
def generate_theme(
prompt: str,
llm_provider: str = "gemini",
llm_model: str = "gemini-2.0-flash",
llm_api_key: str | None = None,
theme_mode: str = "Designer",
request: gr.Request = None,
):
"""
Generates a Gradio theme based on a text prompt.
Args:
prompt (str): Description of the desired theme.
llm_provider (str, optional): LLM provider. Defaults to 'gemini'.
llm_model (str, optional): Model name. Defaults to 'gemini-2.0-flash'.
llm_api_key (str, optional): API Key. **OPTIONAL**. If empty, uses server keys.
theme_mode (str, optional): Theme mode. Defaults to 'Designer'. Use 'Engineer' if you want to specify the details and 'Designer' if you want the model to handle that; in this mode, use short prompts.
"""
_validate_api_keys(llm_api_key, request)
try:
SYSTEM_PROMPT = SYSTEM_PROMPT_THEME_GENERATOR_DESIGNER_EN if theme_mode == "Designer" else SYSTEM_PROMPT_THEME_GENERATOR_ENGINEER_EN
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
llm_output = LLMFactory.call(
llm_provider=llm_provider,
llm_model=llm_model,
messages=messages,
temperature=0.1,
max_tokens=2048,
api_key=llm_api_key,
)
if isinstance(llm_output, str) and llm_output.startswith("Error:"):
raise Exception(llm_output)
json_str = llm_output.split("```json\n")[1].split("\n```")[0] if "```json" in llm_output else llm_output
try:
raw_data = json.loads(json_str)
valid_colors = [
"red",
"blue",
"green",
"orange",
"amber",
"yellow",
"lime",
"emerald",
"teal",
"cyan",
"sky",
"indigo",
"violet",
"purple",
"fuchsia",
"pink",
"rose",
"slate",
"gray",
"zinc",
"neutral",
"stone",
]
c_args = raw_data.get("constructor_args", {})
for hue in ["primary_hue", "secondary_hue", "neutral_hue"]:
if hue in c_args and c_args[hue] not in valid_colors:
print(f"⚠️ Invalid color '{c_args[hue]}' detected for {hue}. Fallback to 'blue'.")
c_args[hue] = "blue"
# Re-dump
json_str = json.dumps(raw_data)
except json.JSONDecodeError:
pass
theme_config = FullThemeModel.model_validate_json(json_str)
constructor_kwargs = theme_config.constructor_args.model_dump(exclude_unset=True)
set_kwargs = theme_config.set_args.model_dump(exclude_unset=True)
# Clean up color refs
for key in list(set_kwargs.keys()):
value = set_kwargs[key]
if isinstance(value, str) and value.startswith("*"):
try:
name, shade = value[1:].rsplit("_", 1)
shade_num = int(shade)
if not (50 <= shade_num <= 950):
del set_kwargs[key]
except (ValueError, IndexError):
del set_kwargs[key]
if "font" in constructor_kwargs:
fonts = constructor_kwargs.get("font", [])
constructor_kwargs["font"] = [(gr.themes.GoogleFont(name) if name not in ["ui-sans-serif", "system-ui", "sans-serif"] else name) for name in fonts]
if "font_mono" in constructor_kwargs:
mono_fonts = constructor_kwargs.get("font_mono", [])
constructor_kwargs["font_mono"] = [
(gr.themes.GoogleFont(name) if name not in ["ui-monospace", "Consolas", "monospace"] else name) for name in mono_fonts
]
theme_object = gr.themes.Default(**constructor_kwargs)
if set_kwargs:
theme_object.set(**set_kwargs)
status_message = "Theme generated successfully!"
css_str = theme_object._get_theme_css()
def format_arg(val):
if isinstance(val, str):
return f"'{val}'"
if isinstance(val, list):
font_list = [(f"gr.themes.GoogleFont('{f.name}')" if isinstance(f, gr.themes.GoogleFont) else f"'{f}'") for f in val]
return f"[{', '.join(font_list)}]"
return str(val)
init_args = ",\n ".join([f"{k}={format_arg(v)}" for k, v in constructor_kwargs.items()])
set_args = ",\n ".join([f"{k}={format_arg(v)}" for k, v in set_kwargs.items()])
set_call = f".set(\n {set_args}\n)" if set_kwargs else ""
py_code = f"import gradio as gr\n\ntheme = gr.themes.Default(\n {init_args}\n){set_call}"
return py_code, theme_object, str(status_message), css_str
except Exception as e:
import traceback
traceback.print_exc()
error_msg = str(e)
if "Auth" in error_msg or "Key" in error_msg or "401" in error_msg:
detailed_msg = f"Authentication Error: The provided API Key for {llm_provider} is invalid or expired."
elif "429" in error_msg or "Rate" in error_msg:
detailed_msg = "Rate Limit Error: The AI provider is busy. Please wait a moment."
else:
detailed_msg = f"System Error: {error_msg}"
raise gr.Error(detailed_msg)
def handle_upload(theme_name, generated_theme, hf_token):
"""
Handles uploading the generated theme to Hugging Face Hub.
This function takes a generated theme and pushes it to the Hugging Face Hub repository.
It validates that a theme has been generated and that the required credentials are provided,
then attempts to upload the theme and returns a success or error message.
Args:
theme_name (str): The name of the theme to be uploaded. This will be used as the repository name
on Hugging Face Hub. Must not be empty.
generated_theme (object): Theme generated object
hf_token (str): The Hugging Face API token for authentication. Required to push the theme to the hub.
Must not be empty.
Returns
None
"""
if not isinstance(generated_theme, gr.themes.Default):
gr.Info("⚠️ Please generate a theme first.")
return
if not theme_name or not hf_token:
gr.Info("⚠️ Please fill in the theme name and token.")
return
try:
url = generated_theme.push_to_hub(repo_name=theme_name, hf_token=hf_token, theme_name=theme_name)
space_id = "/".join(url.split("/")[-2:])
gr.Info(f"✅ Success! Theme uploaded to: ({url}), To use: `theme='{space_id}'`")
return
except Exception as e:
raise gr.Error(f"❌ **Error:** {e}", visible=True)
def _process_sketch_logic(final_image, text_description, llm_provider, llm_model, llm_api_key):
"""
Internal function: Takes a PIL Image object and calls the LLM.
"""
try:
messages = [
{"role": "system", "content": SYSTEM_PROMPT_SKETCH_EN},
{"role": "user", "content": f"Additional text description: {text_description if text_description else 'None'}"},
]
llm_output = LLMFactory.call(
llm_provider=llm_provider, llm_model=llm_model, messages=messages, image=final_image, temperature=0.1, max_tokens=8192, api_key=llm_api_key
)
code = llm_output.split("```python\n")[1].split("\n```")[0] if "```python" in llm_output else llm_output
return code.strip()
except Exception as e:
return f"# An error occurred while generating the UI: {e}"
def generate_sketch_ui(sketch_image, text_desc, provider, model, key, request: gr.Request):
_validate_api_keys(key, request)
if sketch_image is None:
return "# Please draw or upload an image."
return _process_sketch_logic(sketch_image, text_desc, provider, model, key)
# 3. To MCP
def generate_sketch_to_ui(
image_url: str,
text_desc: str | None = None,
llm_provider: str = "gemini",
llm_model: str = "gemini-2.5-flash",
llm_api_key: str | None = None,
request: gr.Request = None,
):
"""
Generates Gradio code from a sketch image URL.
Args:
image_url (str): The public URL of the sketch image.
text_desc (str): Optional description.
llm_provider (str, optional): LLM provider. Defaults to 'gemini'.
llm_model (str, optional): Model name. Defaults to 'gemini-2.5-flash'.
llm_api_key (str, optional): API Key. **OPTIONAL**. If empty, uses server keys.
"""
_validate_api_keys(llm_api_key, request)
if not image_url:
return "# Error: Please provide an image URL."
try:
# 1. Base64
if image_url.startswith("data:image") or ";base64," in image_url:
if "," in image_url:
image_url = image_url.split(",")[1]
image_data = base64.b64decode(image_url)
final_image = Image.open(io.BytesIO(image_data))
# 2. Direct URL
else:
print(f"API: Downloading sketch from {image_url}...")
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
resp = requests.get(image_url, headers=headers, stream=True, timeout=10, allow_redirects=True)
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "")
if "text/html" in content_type:
return "# Error: The URL returned an HTML page, not an image file. Please use a direct link (e.g., ending in .png or .jpg)."
final_image = Image.open(io.BytesIO(resp.content))
return _process_sketch_logic(final_image, text_desc, llm_provider, llm_model, llm_api_key)
except Exception as e:
return f"# Error downloading image from URL: {str(e)}"
# endregion
# region HELPER FUNCTIONS
def _validate_api_keys(llm_api_key, request):
USE_SERVER_KEYS = os.getenv("USE_SERVER_KEYS", "false").lower() in (
"True",
"true",
"1",
"yes",
)
if not USE_SERVER_KEYS and request and request.headers.get("referer"):
if not llm_api_key or not llm_api_key.strip():
raise gr.Error("⚠️ LLM API Key Required! Please enter your own API Key to use this tool in the demo UI.")
def _add_interactive_inputs():
with gr.Row():
with gr.Column(variant="panel", scale=1):
gr.Markdown("### Interactive Controls")
radio = gr.Radio(["A", "B", "C"], label="Radio")
drop = gr.Dropdown(["Option 1", "Option 2"], show_label=False)
drop_2 = gr.Dropdown(
["A", "B", "C"],
multiselect=True,
value=["A"],
label="Multiple Dropdown",
)
check = gr.Checkbox(label="Checkbox")
with gr.Column(variant="panel", scale=2):
gr.Image(
"https://gradio-static-files.s3.us-west-2.amazonaws.com/header-image.jpg",
label="Image",
height=320,
)
with gr.Row():
gr.Button("Primary", variant="primary")
gr.Button("Secondary")
with gr.Row():
gr.Button("Small", size="sm")
gr.UploadButton(size="sm")
gr.Button("Stop", variant="stop", size="sm")
return radio, drop, drop_2, check
def _add_data_display_components():
with gr.Row():
gr.Dataframe(value=[[1, 2], [3, 4]], label="Dataframe"), gr.JSON(value={"a": 1}), gr.Label(value={"cat": 0.7}), gr.File()
def _add_media_components():
with gr.Row():
(
gr.ColorPicker(label="Color Picker"),
gr.Video("https://gradio-static-files.s3.us-west-2.amazonaws.com/world.mp4"),
gr.Gallery(
[
(
"https://gradio-static-files.s3.us-west-2.amazonaws.com/lion.jpg",
"lion",
)
],
height="200px",
),
)
def _add_chatbot_component():
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot([{"role": "user", "content": "Hello"}], label="Chatbot", type="messages")
msg_input = gr.Textbox(show_label=False, placeholder="Type your message...")
add_msg_btn = gr.Button("Add Message")
with gr.Column(scale=1):
with gr.Accordion("Settings"):
gr.Slider(label="Temperature"), gr.Checkbox(label="Streaming")
return chatbot, msg_input, add_msg_btn
def _create_example_app():
with gr.Column(scale=3, elem_id="app"):
with gr.Tabs():
with gr.TabItem("Common Inputs"):
gr.Textbox(
label="Text Box",
info="A standard text field.",
placeholder="Write something...",
)
gr.Interface(
lambda x: x,
"number",
"textbox",
title="Interface Component (Compact)",
# api_name=False,
api_name=False,
)
with gr.Row():
gr.Slider(label="Slider 1")
gr.Slider(label="Slider 2")
gr.CheckboxGroup(["A", "B", "C"], label="Checkbox Group")
radio, drop, drop_2, check = _add_interactive_inputs()
gr.Examples(
examples=[
["A", "Option 1", ["B"], True],
["B", "Option 2", ["A", "C"], False],
],
inputs=[radio, drop, drop_2, check],
label="Input Examples",
)
with gr.TabItem("Data and Media"):
_add_data_display_components()
_add_media_components()
with gr.TabItem("Layout and Chat"):
chatbot, msg_input, add_msg_btn = _add_chatbot_component()
return chatbot, msg_input, add_msg_btn
def _create_gradio_lite_html(python_code: str) -> str:
"""
Wraps the Python code in a Gradio-Lite HTML structure with an iframe.
This ensures the preview runs isolated from the main app.
"""
# Escape any existing script tags to prevent injection issues inside srcdoc
safe_code = python_code.replace("<", "&lt;").replace(">", "&gt;")
# To make sure demo.launch() is present for Lite to render
if "demo.launch()" not in safe_code:
safe_code += "\n\ndemo.launch()"
html_template = f"""
<div style="width: 100%; height: 600px; border: 1px solid #e5e7eb; border-radius: 8px; overflow: hidden;">
<iframe
srcdoc='
<!DOCTYPE html>
<html>
<head>
<script type="module" crossorigin src="https://gradio-lite-previews.s3.amazonaws.com/PINNED_HF_HUB/dist/lite.js"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@gradio/lite/dist/lite.css" />
<style>
body {{ margin: 0; padding: 0; height: 100vh; }}
gradio-lite {{ width: 100%; height: 100%; display: block; }}
</style>
</head>
<body>
<gradio-lite>
{safe_code}
</gradio-lite>
</body>
</html>'
width="100%"
height="100%"
style="border: none;"
sandbox="allow-scripts allow-same-origin"
></iframe>
</div>
"""
return html_template
def _get_mcp_header():
html = """
<style>
.header-container {
background: linear-gradient(314deg, #64748b 0%, #373f4a 100%);
padding: 30px 20px;
border-radius: 16px;
color: white !important;
text-align: center;
box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.1), 0 4px 6px -2px rgba(0, 0, 0, 0.05);
margin-bottom: 25px;
font-family: 'Inter', -apple-system, sans-serif;
}
.header-content {
max-width: 800px;
margin: 0 auto;
}
.header-title {
color: white !important;
font-size: 2.5rem;
font-weight: 800;
margin: 0;
display: flex;
align-items: center;
justify-content: center;
gap: 15px;
letter-spacing: -0.02em;
text-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.header-icon {
font-size: 3rem;
filter: drop-shadow(0 2px 4px rgba(0,0,0,0.1));
}
.header-subtitle {
color: #e2e8f0 !important; /* Fallback para var(--neutral-200) */
font-size: 1.1rem;
font-weight: 400;
margin-top: 8px;
opacity: 0.9;
letter-spacing: 0.01em;
}
.header-footer {
color: #e2e8f0 !important;
margin-top: 25px;
padding-top: 15px;
border-top: 1px solid rgba(255, 255, 255, 0.2);
font-size: 0.85rem;
font-weight: 500;
opacity: 0.85;
display: flex;
justify-content: center;
flex-wrap: wrap;
gap: 15px;
}
.header-tag {
color: #e2e8f0 !important;
display: flex;
align-items: center;
gap: 6px;
}
.separator {
color: #e2e8f0 !important;
opacity: 0.4;
}
</style>
<div class="header-container">
<div class="header-content">
<div class="header-title">
<span class="header-icon">🕵️‍♂️</span> GitRepo Inspector
</div>
<div class="header-subtitle">
MCP Server: AI Tools for GitHub Analysis & Automation
</div>
<div class="header-footer">
<span class="header-tag">Powered by Gradio 🚀</span>
<span class="separator">|</span>
<span class="header-tag">Official MCP Protocol 🔌</span>
<span class="separator">|</span>
<span class="header-tag">Gemini • SambaNova • OpenAI • Nebius</span>
</div>
</div>
</div>
"""
return html
# endregion
# region MAIN LAYOUT DEFINITION
THEME_JS = """
<script>
window.apply_gradio_theme = function(css) {
console.log("🎨 Applying theme via Native JS...");
let style_tag = document.getElementById('dynamic_theme_css');
if (!style_tag) {
style_tag = document.createElement('style');
style_tag.id = 'dynamic_theme_css';
document.head.appendChild(style_tag);
}
style_tag.innerHTML = css;
}
</script>
"""
css = """
.gradio-container { max-width: none !important; }
.fillable { width: 100% !important; max-width: unset !important; }
#app { height: 100vh; overflow-y: scroll; }
.min-h-50px { min-height: 50px; }
"""
theme = gr.themes.Default(
primary_hue='blue',
secondary_hue='teal',
neutral_hue='neutral'
).set(
body_background_fill='*neutral_100',
body_background_fill_dark='*neutral_900',
body_text_color='*neutral_700',
body_text_color_dark='*neutral_200',
body_text_weight='400',
link_text_color='*primary_500',
link_text_color_dark='*primary_400',
code_background_fill='*neutral_100',
code_background_fill_dark='*neutral_800',
shadow_drop='0 1px 3px rgba(0,0,0,0.1)',
shadow_inset='inset 0 2px 4px rgba(0,0,0,0.05)',
block_background_fill='*neutral_50',
block_background_fill_dark='*neutral_700',
block_border_color='*neutral_200',
block_border_color_dark='*neutral_600',
block_border_width='1px',
block_border_width_dark='1px',
block_label_background_fill='*primary_50',
block_label_background_fill_dark='*primary_600',
block_label_text_color='*primary_600',
block_label_text_color_dark='*primary_50',
panel_background_fill='white',
panel_background_fill_dark='*neutral_800',
panel_border_color='*neutral_200',
panel_border_color_dark='*neutral_700',
panel_border_width='1px',
panel_border_width_dark='1px',
input_background_fill='white',
input_background_fill_dark='*neutral_800',
input_border_color='*neutral_300',
input_border_color_dark='*neutral_700',
slider_color='*primary_500',
slider_color_dark='*primary_400',
button_primary_background_fill='*primary_600',
button_primary_background_fill_dark='*primary_500',
button_primary_background_fill_hover='*primary_700',
button_primary_background_fill_hover_dark='*primary_400',
button_primary_border_color='transparent',
button_primary_border_color_dark='transparent',
button_primary_text_color='white',
button_primary_text_color_dark='white',
button_secondary_background_fill='*neutral_200',
button_secondary_background_fill_dark='*neutral_600',
button_secondary_background_fill_hover='*neutral_300',
button_secondary_background_fill_hover_dark='*neutral_500',
button_secondary_border_color='transparent',
button_secondary_border_color_dark='transparent',
button_secondary_text_color='*neutral_700',
button_secondary_text_color_dark='*neutral_200'
)
with gr.Blocks(theme=theme, css=css, head=THEME_JS, title="GitRepo Inspector (MCP Server)") as app:
css_data_transport = gr.Textbox(visible=False)
generated_theme_state = gr.State(value=None)
gr.HTML(_get_mcp_header())
# GLOBAL CONFIGURATION
with gr.Accordion("⚙️ Global Configuration (Configure tools here)", open=True):
with gr.Row():
global_github_token = gr.Textbox(
label="GitHub Token",
type="password",
placeholder="Optional for public repos, required for higher rate limits.",
)
global_api_key = gr.Textbox(
label="LLM API Key",
type="password",
placeholder="Required for UI demo. (Matches selected Provider)",
)
with gr.Row():
global_provider = gr.Dropdown(
choices=list(AVAILABLE_ISSUE_MODELS_BY_PROVIDER.keys()),
value="gemini",
label="LLM Provider (For all tools)",
)
global_model = gr.Dropdown(
choices=AVAILABLE_ISSUE_MODELS_BY_PROVIDER["gemini"],
value=AVAILABLE_ISSUE_MODELS_BY_PROVIDER["gemini"][0],
label="Main Model (Text/Chat)",
interactive=True,
allow_custom_value=True,
)
with gr.Row():
models_info_output = gr.JSON(label="Available Models Registry", visible=False)
get_models_btn = gr.Button("Refresh Models Info", visible=False)
# -----------------------------
with gr.Tabs():
with gr.TabItem("1. Sync Repo"):
gr.Markdown("### 🔄 Synchronize Repository with Local Cache")
gr.Markdown("Fetch open issues and recent releases to speed up analysis.")
sync_repo_url = gr.Textbox(label="Repository URL", value="https://github.com/gradio-app/gradio")
sync_button = gr.Button("Start Synchronization", variant="primary")
sync_status_output = gr.Textbox(label="Sync Status", interactive=False)
with gr.TabItem("2. Issue Analyzer"):
gr.Markdown("### 🔍 Analyze a GitHub Issue")
with gr.Row():
issue_repo_url = gr.Textbox(
label="GitHub Repository URL",
value="https://github.com/gradio-app/gradio",
)
issue_number_input = gr.Number(label="Issue Number", precision=0)
issue_analyze_button = gr.Button("Analyze Issue 🕵️", variant="primary")
issue_report_output = gr.HTML(label="Analysis Report", elem_classes="min-h-50px")
with gr.Accordion("🧠 Thinking (CoT) Log", open=False, visible=False) as issue_cot_accordion:
issue_thought_output = gr.Markdown()
with gr.TabItem("3. Issue Duplicate Finder"):
gr.Markdown("### 👯 Find Duplicate Issues")
with gr.Row():
dup_repo_url = gr.Textbox(label="Repository URL", value="https://github.com/gradio-app/gradio")
dup_issue_number = gr.Number(label="Main Issue #", precision=0)
dup_find_button = gr.Button("Find Duplicates 🔍", variant="primary")
dup_report_output = gr.HTML(label="Duplicate Report", elem_classes="min-h-50px")
with gr.Accordion("🧠 Thinking (CoT) Log", open=False, visible=False) as dup_accordion:
dup_thought_output = gr.Markdown()
with gr.TabItem("4. Issue Prioritizer"):
gr.Markdown("### 🥇 Prioritize Backlog")
prio_repo_url = gr.Textbox(label="Repository URL", value="https://github.com/gradio-app/gradio")
prio_run_button = gr.Button("Generate Priority List ⚡", variant="primary")
prio_report_output = gr.HTML(label="Prioritized Backlog", elem_classes="min-h-50px")
with gr.Accordion("🧠 Thinking (CoT) Log", open=False, visible=False) as prio_cot_accordion:
prio_thought_output = gr.Markdown()
with gr.TabItem("5. Action Runner (Reply & Close)"):
gr.Markdown("### ⚡ Action Runner")
gr.Markdown("This tool allows the Agent (or you) to take action: post comments and close resolved issues.")
with gr.Row():
action_repo_url = gr.Textbox(label="Repository URL", value="https://github.com/gradio-app/gradio")
action_issue_number = gr.Number(label="Issue Number", precision=0)
action_comment = gr.Textbox(
label="Comment Body (Markdown supported)",
lines=5,
placeholder="Ex: This issue is resolved in PR #123. Closing now.",
)
action_close_checkbox = gr.Checkbox(
label="Close this issue?",
value=False,
info="Check to verify the fix and close the issue on GitHub.",
)
action_button = gr.Button("Execute Action 🚀", variant="stop")
action_output = gr.Markdown(label="Execution Result")
with gr.TabItem("6. Web Search"):
gr.Markdown("### 🌍 Web Search Tool")
gr.Markdown("Allows the agent to search the internet for documentation, error codes, and related discussions.")
search_query_input = gr.Textbox(label="Search Query", placeholder="ex: pydantic validator error fix")
search_btn = gr.Button("Search Web 🔍")
search_output = gr.Markdown(label="Results")
with gr.TabItem("7. Database Tools (Agentic Chatbot Only)"):
gr.Markdown("### 🛠️ Read-Only Database Tools")
gr.Markdown("These tools allow the Chatbot Agent to query the database directly.")
with gr.Row():
db_search_repo_url = gr.Textbox(label="Repository URL", value="https://github.com/gradio-app/gradio")
with gr.Row():
db_search_q = gr.Textbox(label="Query")
with gr.Row():
db_search_id = gr.Number(label="Issue ID (Optional)")
db_search_author = gr.Textbox(label="Author (User Login)", placeholder="e.g. abidlabs")
db_search_verdict = gr.Dropdown(["None", "resolved", "possibly_resolved", "duplicate", "unresolved", "error"], label="Verdict")
db_search_status = gr.Dropdown(["open", "closed"], label="GitHub Status")
db_search_btn = gr.Button("Search DB")
db_out = gr.JSON(label="Results")
with gr.Row():
db_get_id = gr.Number(label="Issue ID to Fetch")
db_get_btn = gr.Button("Get Report")
db_report_out = gr.JSON(label="Full Report")
with gr.TabItem("BONUS 1 - Theme Generator"):
gr.Markdown("### 🖌️ Create a Theme with Natural Language")
with gr.Row():
with gr.Column(scale=1, min_width=450):
with gr.Group():
theme_mode = gr.Dropdown(
label="Theme Mode",
info="Use Engineer if you want to specify the details and Designer if you want the model to handle that; in this mode, use short prompts.",
choices=["Designer", "Engineer"],
value="Designer",
)
theme_prompt_input = gr.Textbox(
label="Describe your theme",
placeholder="Ex: I want a spring-inspired theme....",
lines=4,
)
theme_generate_button = gr.Button("Generate Theme ✨", variant="primary")
status_output = gr.Textbox(label="Status", interactive=False)
with gr.Tabs():
with gr.TabItem("Code to Use"):
python_code_output = gr.Code(label="Copy and paste", language="python")
with gr.TabItem("Publish to Hub"):
hub_theme_name = gr.Textbox(label="Theme Name", placeholder="my-amazing-theme")
hub_hf_token = gr.Textbox(label="HF Token", type="password", placeholder="hf_...")
upload_button = gr.Button("Upload 🚀")
chatbot_comp, msg_input_comp, add_msg_btn_comp = _create_example_app()
with gr.TabItem("BONUS 2 - Sketch Image to Gradio UI"):
gr.Markdown("### 🖼️ Create a UI from a Sketch")
gr.Markdown("*Note: This tool uses the Global API Key, but requires a Vision Model (selected below).*")
with gr.Row():
sketch_model_dropdown = gr.Dropdown(
choices=AVAILABLE_SKETCH_MODELS_BY_PROVIDER["gemini"],
value=AVAILABLE_SKETCH_MODELS_BY_PROVIDER["gemini"][0],
label="Vision Model (Specific to Sketch Tool)",
interactive=True,
allow_custom_value=True,
)
with gr.Row():
with gr.Column(scale=1):
sketch_input = gr.Image(type="pil", label="Upload the sketch", sources=["upload", "clipboard"])
api_image_url = gr.Textbox(label="Image URL", visible=False)
text_desc_input = gr.Textbox(label="Additional Description (Optional)")
sketch_generate_button = gr.Button("1. Generate Code", variant="secondary")
api_trigger_btn = gr.Button("API Trigger", visible=False)
gr.Examples(
examples=[
["./assets/sketch_1.png"],
["./assets/sketch_2.webp"],
],
inputs=[sketch_input],
label="Sketch Examples",
)
with gr.Column(scale=2):
with gr.Tabs():
with gr.TabItem("🐍 Python Code"):
sketch_code_output = gr.Code(label="Generated UI Code", language="python", lines=20)
with gr.TabItem("👀 Live Preview"):
gr.HTML(
"<div style='color: red;'>The Live Preview uses gradio-lite which may fail; if this occurs, right-click in the visualization area and select 'Refresh Frame'.</div>"
)
sketch_preview_output = gr.HTML(label="Gradio-Lite Preview")
# EVENTS & LOGIC
get_models_btn.click(
fn=get_available_models,
inputs=None,
outputs=[models_info_output],
api_name="get_available_models",
)
def update_all_model_choices(provider):
text_models = AVAILABLE_ISSUE_MODELS_BY_PROVIDER.get(provider, [])
vision_models = AVAILABLE_SKETCH_MODELS_BY_PROVIDER.get(provider, [])
return (
gr.update(choices=text_models, value=text_models[0] if text_models else None),
gr.update(choices=vision_models, value=vision_models[0] if vision_models else None),
)
def change_cot_visibility(data):
if data:
return gr.update(visible=True)
return gr.update(visible=False)
global_provider.change(
fn=update_all_model_choices,
inputs=[global_provider],
outputs=[global_model, sketch_model_dropdown],
api_name=False,
)
# Tool 1: Sync
sync_button.click(
fn=sync_repository,
inputs=[sync_repo_url, global_github_token],
outputs=[sync_status_output],
api_name="sync_repository",
)
# Tool 2: Analyzer
issue_analyze_button.click(
fn=analyze_github_issue,
inputs=[
issue_repo_url,
issue_number_input,
global_provider,
global_model,
global_github_token,
global_api_key,
],
outputs=[issue_report_output, issue_thought_output],
api_name="analyze_github_issue",
show_progress_on=[issue_report_output],
).then(fn=change_cot_visibility, inputs=[issue_thought_output], outputs=[issue_cot_accordion], api_name=False)
# Tool 3: Duplicate Finder
dup_find_button.click(
fn=find_duplicate_issues,
inputs=[
dup_repo_url,
dup_issue_number,
global_provider,
global_model,
global_github_token,
global_api_key,
],
outputs=[dup_report_output, dup_thought_output],
api_name="find_duplicate_issues",
show_progress_on=[dup_report_output],
).then(
fn=change_cot_visibility,
inputs=[dup_thought_output],
outputs=[dup_accordion],
api_name=False,
)
# Tool 4: Prioritizer
prio_run_button.click(
fn=prioritize_open_issues,
inputs=[prio_repo_url, global_provider, global_model, global_api_key],
outputs=[prio_report_output, prio_thought_output],
api_name="prioritize_open_issues",
show_progress_on=[prio_report_output],
).success(fn=change_cot_visibility, inputs=[prio_thought_output], outputs=[prio_cot_accordion], api_name=False)
# Tool 5: Action Runner
action_button.click(
fn=reply_and_close_issue,
inputs=[
action_repo_url,
action_issue_number,
action_comment,
action_close_checkbox,
global_github_token,
],
outputs=[action_output],
api_name="reply_and_close_issue",
)
# Tool 6: Web Search
search_btn.click(
fn=web_search,
inputs=[search_query_input],
outputs=[search_output],
api_name="web_search",
)
# Tool 7: DB Search
db_search_btn.click(
fn=search_issues, inputs=[
db_search_repo_url,
db_search_q,
db_search_id,
db_search_status,
db_search_verdict,
db_search_author
],
outputs=[db_out],
api_name="search_issues"
)
# Tool 8: Get Report
db_get_btn.click(fn=get_issue_report, inputs=[sync_repo_url, db_get_id], outputs=[db_report_out], api_name="get_issue_report")
# Bonus 1: Theme Gen
theme_generate_button.click(
fn=generate_theme,
inputs=[
theme_prompt_input,
global_provider,
global_model,
global_api_key,
theme_mode,
],
outputs=[python_code_output, generated_theme_state, status_output, css_data_transport],
).then(
fn=None,
inputs=[css_data_transport],
outputs=None,
js="(css) => { window.apply_gradio_theme(css); }",
)
upload_button.click(
fn=handle_upload,
inputs=[hub_theme_name, generated_theme_state, hub_hf_token],
api_name=False
)
# Bonus 2: Sketch
ui_event = sketch_generate_button.click(
fn=generate_sketch_ui,
inputs=[sketch_input, text_desc_input, global_provider, sketch_model_dropdown, global_api_key],
outputs=[sketch_code_output],
api_name=False,
)
ui_event.success(fn=_create_gradio_lite_html, inputs=[sketch_code_output], outputs=[sketch_preview_output], api_name=False)
api_trigger_btn.click(
fn=generate_sketch_to_ui,
inputs=[api_image_url, text_desc_input, global_provider, sketch_model_dropdown, global_api_key],
outputs=[sketch_code_output],
api_name="generate_ui_from_sketch",
)
def add_message_to_chat(history, message):
"""
A simple function to add a user message to the chat history
and respond with a canned message.
Args:
history (list): The current chat history.
message (str): The user's new message.
Returns:
list: The updated chat history.
"""
if not message:
return history
history.append({"role": "user", "content": message})
time.sleep(0.5)
history.append({"role": "assistant", "content": "Thank you for your message!"})
return history
# Chat Example Logic
add_msg_btn_comp.click(
fn=add_message_to_chat,
inputs=[chatbot_comp, msg_input_comp],
outputs=[chatbot_comp],
api_name=False,
).then(fn=lambda: "", outputs=[msg_input_comp], api_name=False)
# Init
app.load(fn=_initialize_database, inputs=None, outputs=None, show_api=False)
# endregion
if __name__ == "__main__":
app.allowed_paths = ["."]
app.launch(mcp_server=True, share=True, show_error=True, server_port=7860)