Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import Annotated, List | |
| from datetime import datetime | |
| import gradio as gr | |
| from ddgs import DDGS | |
| from app import _log_call_end, _log_call_start, _search_rate_limiter, _truncate_for_log | |
| from ._docstrings import autodoc | |
| # Single source of truth for the LLM-facing tool description | |
| TOOL_SUMMARY = ( | |
| "Run a DuckDuckGo-backed search across text, news, images, videos, or books. " | |
| "Readable results include pagination hints and next_offset when more results are available; " | |
| "Use in combination with `Web_Fetch` to navigate the web." | |
| ) | |
| _SAFESEARCH_LEVEL = "off" | |
| # Defaults and choices for newly added parameters | |
| BACKEND_CHOICES = [ | |
| "auto", | |
| "duckduckgo", | |
| "bing", | |
| "brave", | |
| "yahoo", | |
| "wikipedia", | |
| ] | |
| # Allowed backends per type (explicit selection set) | |
| _ALLOWED_BACKENDS = { | |
| "text": ["duckduckgo", "bing", "brave", "yahoo", "wikipedia"], | |
| "news": ["duckduckgo", "bing", "yahoo"], | |
| "images": ["duckduckgo"], | |
| "videos": ["duckduckgo"], | |
| "books": ["annasarchive"], | |
| } | |
| # Auto order per type (used when backend == "auto"); wikipedia excluded for text | |
| _AUTO_ORDER = { | |
| "text": ["duckduckgo", "bing", "brave", "yahoo"], | |
| "news": ["duckduckgo", "bing", "yahoo"], | |
| "images": ["duckduckgo"], | |
| "videos": ["duckduckgo"], | |
| "books": ["annasarchive"], | |
| } | |
| # Date filter choices: canonical values used by resolver | |
| DATE_FILTER_CHOICES = ["any", "day", "week", "month", "year"] | |
| def _resolve_backend(search_type: str, backend_choice: str) -> str: | |
| """Resolve backend string for DDGS based on search type and user choice. | |
| - If backend_choice is "auto", return a comma-separated fallback order for that type. | |
| - If backend_choice is not supported by the type, fall back to the first allowed backend. | |
| - Books endpoint uses only 'annasarchive'. | |
| """ | |
| stype = search_type if search_type in _ALLOWED_BACKENDS else "text" | |
| allowed = _ALLOWED_BACKENDS[stype] | |
| if backend_choice == "auto": | |
| return ", ".join(_AUTO_ORDER[stype]) | |
| if stype == "books": | |
| return "annasarchive" | |
| # Validate backend against allowed set for this type | |
| if backend_choice in allowed: | |
| return backend_choice | |
| # Fallback to first allowed backend | |
| return allowed[0] | |
| def _resolve_timelimit(date_filter: str, search_type: str) -> str | None: | |
| """Map UI date filter to DDGS timelimit code per endpoint. | |
| Returns one of: None, 'd', 'w', 'm', 'y'. For news/videos (which support d/w/m), | |
| selecting 'year' will coerce to 'm' to stay within supported range. | |
| """ | |
| normalized = (date_filter or "any").strip().lower() | |
| if normalized in ("any", "none", ""): | |
| return None | |
| mapping = { | |
| "day": "d", | |
| "week": "w", | |
| "month": "m", | |
| "year": "y", | |
| } | |
| code = mapping.get(normalized) | |
| if not code: | |
| return None | |
| if search_type in ("news", "videos") and code == "y": | |
| return "m" | |
| return code | |
| def _extract_date_from_snippet(snippet: str) -> str: | |
| if not snippet: | |
| return "" | |
| import re | |
| date_patterns = [ | |
| r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b", | |
| r"\b([A-Za-z]{3,9}\s+\d{1,2},?\s+\d{4})\b", | |
| r"\b(\d{1,2}\s+[A-Za-z]{3,9}\s+\d{4})\b", | |
| r"\b(\d+\s+(?:day|week|month|year)s?\s+ago)\b", | |
| r"(?:Published|Updated|Posted):\s*([^,\n]+?)(?:[,\n]|$)", | |
| ] | |
| for pattern in date_patterns: | |
| matches = re.findall(pattern, snippet, re.IGNORECASE) | |
| if matches: | |
| return matches[0].strip() | |
| return "" | |
| def _format_search_result(result: dict, search_type: str, index: int) -> List[str]: | |
| lines: List[str] = [] | |
| if search_type == "text": | |
| title = result.get("title", "").strip() | |
| url = result.get("href", "").strip() | |
| snippet = result.get("body", "").strip() | |
| date = _extract_date_from_snippet(snippet) | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" URL: {url}") | |
| if snippet: | |
| lines.append(f" Summary: {snippet}") | |
| if date: | |
| lines.append(f" Date: {date}") | |
| elif search_type == "news": | |
| title = result.get("title", "").strip() | |
| url = result.get("url", "").strip() | |
| body = result.get("body", "").strip() | |
| date = result.get("date", "").strip() | |
| source = result.get("source", "").strip() | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" URL: {url}") | |
| if source: | |
| lines.append(f" Source: {source}") | |
| if date: | |
| lines.append(f" Date: {date}") | |
| if body: | |
| lines.append(f" Summary: {body}") | |
| elif search_type == "images": | |
| title = result.get("title", "").strip() | |
| image_url = result.get("image", "").strip() | |
| source_url = result.get("url", "").strip() | |
| source = result.get("source", "").strip() | |
| width = result.get("width", "") | |
| height = result.get("height", "") | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" Image: {image_url}") | |
| lines.append(f" Source: {source_url}") | |
| if source: | |
| lines.append(f" Publisher: {source}") | |
| if width and height: | |
| lines.append(f" Dimensions: {width}x{height}") | |
| elif search_type == "videos": | |
| title = result.get("title", "").strip() | |
| description = result.get("description", "").strip() | |
| duration = result.get("duration", "").strip() | |
| published = result.get("published", "").strip() | |
| uploader = result.get("uploader", "").strip() | |
| embed_url = result.get("embed_url", "").strip() | |
| lines.append(f"{index}. {title}") | |
| if embed_url: | |
| lines.append(f" Video: {embed_url}") | |
| if uploader: | |
| lines.append(f" Uploader: {uploader}") | |
| if duration: | |
| lines.append(f" Duration: {duration}") | |
| if published: | |
| lines.append(f" Published: {published}") | |
| if description: | |
| lines.append(f" Description: {description}") | |
| elif search_type == "books": | |
| title = result.get("title", "").strip() | |
| url = result.get("url", "").strip() | |
| body = result.get("body", "").strip() | |
| lines.append(f"{index}. {title}") | |
| lines.append(f" URL: {url}") | |
| if body: | |
| lines.append(f" Description: {body}") | |
| return lines | |
| def Web_Search( | |
| query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], | |
| max_results: Annotated[int, "Number of results to return (1–20)."] = 5, | |
| page: Annotated[int, "Page number for pagination (1-based, each page contains max_results items)."] = 1, | |
| offset: Annotated[int, "Result offset to start from (overrides page if > 0, for precise continuation)."] = 0, | |
| search_type: Annotated[str, "Type of search: 'text' (web pages), 'news', 'images', 'videos', or 'books'."] = "text", | |
| backend: Annotated[str, "Search backend or ordered fallbacks. Use 'auto' for recommended order."] = "auto", | |
| date_filter: Annotated[str, "Time filter: any, day, week, month, year."] = "any", | |
| ) -> str: | |
| _log_call_start( | |
| "Web_Search", | |
| query=query, | |
| max_results=max_results, | |
| page=page, | |
| search_type=search_type, | |
| offset=offset, | |
| backend=backend, | |
| date_filter=date_filter, | |
| ) | |
| if not query or not query.strip(): | |
| result = "No search query provided. Please enter a search term." | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| max_results = max(1, min(20, max_results)) | |
| page = max(1, page) | |
| offset = max(0, offset) | |
| valid_types = ["text", "news", "images", "videos", "books"] | |
| if search_type not in valid_types: | |
| search_type = "text" | |
| if offset > 0: | |
| actual_offset = offset | |
| calculated_page = (offset // max_results) + 1 | |
| else: | |
| actual_offset = (page - 1) * max_results | |
| calculated_page = page | |
| total_needed = actual_offset + max_results | |
| used_fallback = False | |
| original_search_type = search_type | |
| # Prepare cross-cutting parameters | |
| resolved_backend = _resolve_backend(search_type, (backend or "auto").lower()) | |
| timelimit = _resolve_timelimit(date_filter, search_type) | |
| def _perform_search(stype: str) -> list[dict]: | |
| try: | |
| _search_rate_limiter.acquire() | |
| with DDGS() as ddgs: | |
| if stype == "text": | |
| user_backend_choice = (backend or "auto").lower() | |
| if user_backend_choice == "auto": | |
| # Custom auto: DDG first, then append other engines | |
| results: list[dict] = [] | |
| seen: set[str] = set() | |
| def add_unique(items: list[dict], key_field: str) -> None: | |
| for it in items or []: | |
| url = (it.get(key_field, "") or "").strip() | |
| if url and url not in seen: | |
| seen.add(url) | |
| results.append(it) | |
| # First: duckduckgo | |
| try: | |
| ddg_items = list( | |
| ddgs.text( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend="duckduckgo", | |
| ) | |
| ) | |
| except Exception: | |
| ddg_items = [] | |
| add_unique(ddg_items, "href") | |
| # Then: other engines appended (excluding duckduckgo) | |
| for eng in [b for b in _AUTO_ORDER["text"] if b != "duckduckgo"]: | |
| try: | |
| extra = list( | |
| ddgs.text( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend=eng, | |
| ) | |
| ) | |
| except Exception: | |
| extra = [] | |
| add_unique(extra, "href") | |
| return results | |
| else: | |
| raw_gen = ddgs.text( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend=resolved_backend, | |
| ) | |
| elif stype == "news": | |
| user_backend_choice = (backend or "auto").lower() | |
| if user_backend_choice == "auto": | |
| # Custom auto: DDG first, then append other engines | |
| results: list[dict] = [] | |
| seen: set[str] = set() | |
| def add_unique(items: list[dict], key_field: str) -> None: | |
| for it in items or []: | |
| url = (it.get(key_field, "") or "").strip() | |
| if url and url not in seen: | |
| seen.add(url) | |
| results.append(it) | |
| # First: duckduckgo news | |
| try: | |
| ddg_news = list( | |
| ddgs.news( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend="duckduckgo", | |
| ) | |
| ) | |
| except Exception: | |
| ddg_news = [] | |
| add_unique(ddg_news, "url") | |
| # Then: other news engines appended | |
| for eng in [b for b in _AUTO_ORDER["news"] if b != "duckduckgo"]: | |
| try: | |
| extra = list( | |
| ddgs.news( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend=eng, | |
| ) | |
| ) | |
| except Exception: | |
| extra = [] | |
| add_unique(extra, "url") | |
| return results | |
| else: | |
| raw_gen = ddgs.news( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend=_resolve_backend("news", (backend or "auto").lower()), | |
| ) | |
| elif stype == "images": | |
| raw_gen = ddgs.images( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend=_resolve_backend("images", (backend or "auto").lower()), | |
| ) | |
| elif stype == "videos": | |
| raw_gen = ddgs.videos( | |
| query, | |
| max_results=total_needed + 10, | |
| safesearch=_SAFESEARCH_LEVEL, | |
| timelimit=timelimit, | |
| backend=_resolve_backend("videos", (backend or "auto").lower()), | |
| ) | |
| else: | |
| raw_gen = ddgs.books( | |
| query, | |
| max_results=total_needed + 10, | |
| backend=_resolve_backend("books", (backend or "auto").lower()), | |
| ) | |
| try: | |
| return list(raw_gen) | |
| except Exception as inner_exc: | |
| if "no results" in str(inner_exc).lower() or "not found" in str(inner_exc).lower(): | |
| return [] | |
| raise inner_exc | |
| except Exception as exc: | |
| error_msg = f"Search failed: {str(exc)[:200]}" | |
| lowered = str(exc).lower() | |
| if "blocked" in lowered or "rate" in lowered: | |
| error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." | |
| elif "timeout" in lowered: | |
| error_msg = "Search timed out. Please try again with a simpler query." | |
| elif "network" in lowered or "connection" in lowered: | |
| error_msg = "Network connection error. Please check your internet connection and try again." | |
| elif "no results" in lowered or "not found" in lowered: | |
| return [] | |
| raise Exception(error_msg) | |
| try: | |
| raw = _perform_search(search_type) | |
| except Exception as exc: | |
| result = f"Error: {exc}" | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| if not raw and search_type == "news": | |
| try: | |
| raw = _perform_search("text") | |
| if raw: | |
| used_fallback = True | |
| search_type = "text" | |
| except Exception: | |
| pass | |
| if not raw: | |
| fallback_note = " (also tried 'text' search as fallback)" if original_search_type == "news" and used_fallback else "" | |
| result = f"No {original_search_type} results found for query: {query}{fallback_note}" | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| paginated_results = raw[actual_offset: actual_offset + max_results] | |
| if not paginated_results: | |
| if actual_offset >= len(raw): | |
| result = f"Offset {actual_offset} exceeds available results ({len(raw)} total). Try offset=0 to start from beginning." | |
| else: | |
| result = f"No {original_search_type} results found on page {calculated_page} for query: {query}. Try page 1 or reduce page number." | |
| _log_call_end("Web_Search", _truncate_for_log(result)) | |
| return result | |
| total_available = len(raw) | |
| start_num = actual_offset + 1 | |
| end_num = actual_offset + len(paginated_results) | |
| next_offset = actual_offset + len(paginated_results) | |
| search_label = original_search_type.title() | |
| if used_fallback: | |
| search_label += " → Text (Smart Fallback)" | |
| now_dt = datetime.now().astimezone() | |
| date_str = now_dt.strftime("%A, %B %d, %Y %I:%M %p %Z").strip() | |
| if not date_str: | |
| date_str = now_dt.isoformat() | |
| pagination_info = f"Page {calculated_page}" | |
| if offset > 0: | |
| pagination_info = f"Offset {actual_offset} (≈ {pagination_info})" | |
| lines = [f"Current Date: {date_str}", f"{search_label} search results for: {query}"] | |
| if used_fallback: | |
| lines.append("📍 Note: News search returned no results, automatically searched general web content instead") | |
| lines.append(f"{pagination_info} (results {start_num}-{end_num} of ~{total_available}+ available)\n") | |
| for i, result in enumerate(paginated_results, start_num): | |
| result_lines = _format_search_result(result, search_type, i) | |
| lines.extend(result_lines) | |
| lines.append("") | |
| if total_available > end_num: | |
| lines.append("💡 More results available:") | |
| lines.append(f" • Next page: page={calculated_page + 1}") | |
| lines.append(f" • Next offset: offset={next_offset}") | |
| lines.append(f" • Use offset={next_offset} to continue exactly from result {next_offset + 1}") | |
| result = "\n".join(lines) | |
| search_info = f"type={original_search_type}" | |
| if used_fallback: | |
| search_info += "→text" | |
| _log_call_end("Web_Search", f"{search_info} page={calculated_page} offset={actual_offset} results={len(paginated_results)} chars={len(result)}") | |
| return result | |
| def build_interface() -> gr.Interface: | |
| return gr.Interface( | |
| fn=Web_Search, | |
| inputs=[ | |
| gr.Textbox(label="Query", placeholder="topic OR site:example.com", max_lines=1, info="The search query"), | |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results", info="Number of results to return (1–20)"), | |
| gr.Slider(minimum=1, maximum=10, value=1, step=1, label="Page", info="Page number for pagination (ignored if offset > 0)"), | |
| gr.Slider( | |
| minimum=0, | |
| maximum=1000, | |
| value=0, | |
| step=1, | |
| label="Offset", | |
| info="Result offset to start from (overrides page if > 0, use next_offset from previous search)", | |
| ), | |
| gr.Radio( | |
| label="Search Type", | |
| choices=["text", "news", "images", "videos", "books"], | |
| value="text", | |
| info="Type of content to search for", | |
| ), | |
| gr.Radio( | |
| label="Backend", | |
| choices=BACKEND_CHOICES, | |
| value="auto", | |
| info="Search engine backend or fallback order (auto applies recommended order)", | |
| ), | |
| gr.Radio( | |
| label="Date filter", | |
| choices=DATE_FILTER_CHOICES, | |
| value="any", | |
| info="Limit results to: day, week, month, or year (varies by type)", | |
| ), | |
| ], | |
| outputs=gr.Textbox(label="Search Results", interactive=False, lines=20, max_lines=20), | |
| title="Web Search", | |
| description=( | |
| "<div style=\"text-align:center\">Multi-type web search with readable output format, date detection, and flexible pagination. " | |
| "Supports text, news, images, videos, and books. Features smart fallback for news searches and precise offset control.</div>" | |
| ), | |
| api_description=TOOL_SUMMARY, | |
| flagging_mode="never", | |
| submit_btn="Search", | |
| ) | |
| __all__ = ["Web_Search", "build_interface"] | |