Files
oai/oai/utils/web_search.py

248 lines
7.8 KiB
Python

"""
Web search utilities for oAI.
Provides web search capabilities for all providers (not just OpenRouter).
Uses DuckDuckGo by default (no API key needed).
"""
import json
import re
from typing import Dict, List, Optional
from urllib.parse import quote_plus
import requests
from oai.utils.logging import get_logger
logger = get_logger()
class WebSearchResult:
"""Container for a single search result."""
def __init__(self, title: str, url: str, snippet: str):
self.title = title
self.url = url
self.snippet = snippet
def __repr__(self) -> str:
return f"WebSearchResult(title='{self.title}', url='{self.url}')"
class WebSearchProvider:
"""Base class for web search providers."""
def search(self, query: str, num_results: int = 5) -> List[WebSearchResult]:
"""
Perform a web search.
Args:
query: Search query
num_results: Number of results to return
Returns:
List of search results
"""
raise NotImplementedError
class DuckDuckGoSearch(WebSearchProvider):
"""DuckDuckGo search provider (no API key needed)."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
})
def search(self, query: str, num_results: int = 5) -> List[WebSearchResult]:
"""
Search using DuckDuckGo HTML interface.
Args:
query: Search query
num_results: Number of results to return (default: 5)
Returns:
List of search results
"""
try:
# Use DuckDuckGo HTML search
url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
response = self.session.get(url, timeout=10)
response.raise_for_status()
results = []
html = response.text
# Parse results using regex (simple HTML parsing)
# Find all result blocks - they end at next result or end of results section
result_blocks = re.findall(
r'<div class="result results_links.*?(?=<div class="result results_links|<div id="links")',
html,
re.DOTALL
)
for block in result_blocks[:num_results]:
# Extract title and URL - look for result__a class
title_match = re.search(r'<a[^>]*class="result__a"[^>]*href="([^"]+)"[^>]*>([^<]+)</a>', block)
# Extract snippet - look for result__snippet class
snippet_match = re.search(r'<a[^>]*class="result__snippet"[^>]*>([^<]+)</a>', block)
if title_match:
url_raw = title_match.group(1)
title = title_match.group(2).strip()
# Decode HTML entities in title
import html as html_module
title = html_module.unescape(title)
snippet = ""
if snippet_match:
snippet = snippet_match.group(1).strip()
snippet = html_module.unescape(snippet)
# Clean up URL (DDG uses redirect links)
if 'uddg=' in url_raw:
# Extract actual URL from redirect
actual_url_match = re.search(r'uddg=([^&]+)', url_raw)
if actual_url_match:
from urllib.parse import unquote
url_raw = unquote(actual_url_match.group(1))
results.append(WebSearchResult(
title=title,
url=url_raw,
snippet=snippet
))
logger.info(f"DuckDuckGo search: found {len(results)} results for '{query}'")
return results
except requests.RequestException as e:
logger.error(f"DuckDuckGo search failed: {e}")
return []
except Exception as e:
logger.error(f"Error parsing DuckDuckGo results: {e}")
return []
class GoogleCustomSearch(WebSearchProvider):
"""Google Custom Search API provider (requires API key)."""
def __init__(self, api_key: str, search_engine_id: str):
"""
Initialize Google Custom Search.
Args:
api_key: Google API key
search_engine_id: Custom Search Engine ID
"""
self.api_key = api_key
self.search_engine_id = search_engine_id
def search(self, query: str, num_results: int = 5) -> List[WebSearchResult]:
"""
Search using Google Custom Search API.
Args:
query: Search query
num_results: Number of results to return
Returns:
List of search results
"""
try:
url = "https://www.googleapis.com/customsearch/v1"
params = {
'key': self.api_key,
'cx': self.search_engine_id,
'q': query,
'num': min(num_results, 10) # Google allows max 10
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
results = []
for item in data.get('items', []):
results.append(WebSearchResult(
title=item.get('title', ''),
url=item.get('link', ''),
snippet=item.get('snippet', '')
))
logger.info(f"Google Custom Search: found {len(results)} results for '{query}'")
return results
except requests.RequestException as e:
logger.error(f"Google Custom Search failed: {e}")
return []
def perform_web_search(
query: str,
num_results: int = 5,
provider: str = "duckduckgo",
**kwargs
) -> List[WebSearchResult]:
"""
Perform a web search using the specified provider.
Args:
query: Search query
num_results: Number of results to return (default: 5)
provider: Search provider ("duckduckgo" or "google")
**kwargs: Provider-specific arguments (e.g., api_key for Google)
Returns:
List of search results
"""
if provider == "google":
api_key = kwargs.get("google_api_key")
search_engine_id = kwargs.get("google_search_engine_id")
if not api_key or not search_engine_id:
logger.warning("Google search requires api_key and search_engine_id, falling back to DuckDuckGo")
provider = "duckduckgo"
if provider == "google":
search_provider = GoogleCustomSearch(api_key, search_engine_id)
else:
search_provider = DuckDuckGoSearch()
return search_provider.search(query, num_results)
def format_search_results(results: List[WebSearchResult], max_length: int = 2000) -> str:
"""
Format search results for inclusion in AI prompt.
Args:
results: List of search results
max_length: Maximum total length of formatted results
Returns:
Formatted string with search results
"""
if not results:
return "No search results found."
formatted = "**Web Search Results:**\n\n"
for i, result in enumerate(results, 1):
result_text = f"{i}. **{result.title}**\n"
result_text += f" URL: {result.url}\n"
if result.snippet:
result_text += f" {result.snippet}\n"
result_text += "\n"
# Check if adding this result would exceed max_length
if len(formatted) + len(result_text) > max_length:
formatted += f"... ({len(results) - i + 1} more results truncated)\n"
break
formatted += result_text
return formatted.strip()