332 lines
14 KiB
Python
332 lines
14 KiB
Python
"""
|
|
tools/browser_tool.py — Playwright headless browser tool.
|
|
|
|
Read operations (fetch_page, screenshot) never require confirmation.
|
|
Interactive operations (click, fill, select, press) require confirmation
|
|
unless the target domain is in the user's browser_approved_domains list.
|
|
|
|
Sessions are stateful within a session_id: navigate with fetch_page first,
|
|
then use interactive ops without a url to act on the current page.
|
|
|
|
Requires: playwright package + `playwright install chromium`
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import ClassVar
|
|
|
|
from ..context_vars import current_task_id, current_session_id, web_tier2_enabled
|
|
from ..security import assert_domain_tier1, sanitize_external_content
|
|
from .base import BaseTool, ToolResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_MAX_TEXT_CHARS = 25_000
|
|
_TIMEOUT_MS = 30_000
|
|
_USER_AGENT = (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
_INTERACTIVE_OPS = {"click", "fill", "select", "press"}
|
|
|
|
|
|
async def _is_domain_approved(user_id: str, hostname: str) -> bool:
|
|
"""Return True if hostname (or a parent domain) is in the user's approved list."""
|
|
from ..database import get_pool
|
|
pool = await get_pool()
|
|
rows = await pool.fetch(
|
|
"SELECT domain FROM browser_approved_domains WHERE owner_user_id = $1",
|
|
user_id,
|
|
)
|
|
hostname = hostname.lower()
|
|
for row in rows:
|
|
d = row["domain"].lower().lstrip("*.")
|
|
if hostname == d or hostname.endswith("." + d):
|
|
return True
|
|
return False
|
|
|
|
|
|
class BrowserTool(BaseTool):
|
|
name = "browser"
|
|
description = (
|
|
"Headless Chromium browser for JS-heavy pages and web interactions. "
|
|
"Read ops: fetch_page (extract text), screenshot (PNG). "
|
|
"Interactive ops: click, fill (type into field), select (dropdown), press (keyboard key). "
|
|
"Interactive ops require confirmation unless the domain is in your Browser Trusted Domains list. "
|
|
"Page state is kept across calls within the same session — navigate with fetch_page first, "
|
|
"then use interactive ops (omit url to stay on the current page). "
|
|
"Follows the same domain whitelist rules as the web tool."
|
|
)
|
|
input_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"operation": {
|
|
"type": "string",
|
|
"enum": ["fetch_page", "screenshot", "click", "fill", "select", "press"],
|
|
"description": (
|
|
"fetch_page: extract page text. "
|
|
"screenshot: capture PNG. "
|
|
"click: click an element (selector required). "
|
|
"fill: type into a field (selector + value required). "
|
|
"select: choose a <select> option (selector + value required). "
|
|
"press: press a keyboard key (key required; selector optional)."
|
|
),
|
|
},
|
|
"url": {
|
|
"type": "string",
|
|
"description": (
|
|
"URL to navigate to. Required for fetch_page and screenshot. "
|
|
"For interactive ops, omit to act on the current page."
|
|
),
|
|
},
|
|
"selector": {
|
|
"type": "string",
|
|
"description": "CSS selector for click / fill / select / press operations.",
|
|
},
|
|
"value": {
|
|
"type": "string",
|
|
"description": "Text to type (fill) or option value to select (select).",
|
|
},
|
|
"key": {
|
|
"type": "string",
|
|
"description": "Key name for press (e.g. 'Enter', 'Tab', 'Escape', 'ArrowDown').",
|
|
},
|
|
"wait_for": {
|
|
"type": "string",
|
|
"description": "CSS selector to wait for before acting (optional).",
|
|
},
|
|
"extract_selector": {
|
|
"type": "string",
|
|
"description": "CSS selector for text extraction in fetch_page (optional; defaults to full page body).",
|
|
},
|
|
},
|
|
"required": ["operation"],
|
|
}
|
|
requires_confirmation = True # default; read ops override via should_confirm()
|
|
allowed_in_scheduled_tasks = False
|
|
|
|
# Shared Playwright/browser instance (lazy-init)
|
|
_playwright = None
|
|
_browser = None
|
|
_lock: ClassVar[asyncio.Lock] = asyncio.Lock()
|
|
|
|
# Per-session pages: session_id → (context, page)
|
|
_sessions: ClassVar[dict] = {}
|
|
|
|
# ── Confirmation logic ────────────────────────────────────────────────────
|
|
|
|
async def should_confirm(self, operation: str = "", url: str = "", **_) -> bool:
|
|
if operation not in _INTERACTIVE_OPS:
|
|
return False # read-only ops never need confirmation
|
|
|
|
# Determine the target hostname
|
|
target_url = url
|
|
if not target_url:
|
|
# Acting on the current page — check its URL
|
|
sid = current_session_id.get() or "default"
|
|
data = BrowserTool._sessions.get(sid)
|
|
if data:
|
|
try:
|
|
target_url = data[1].url
|
|
except Exception:
|
|
pass
|
|
|
|
if not target_url:
|
|
return True # Unknown target → confirm to be safe
|
|
|
|
from urllib.parse import urlparse
|
|
hostname = urlparse(target_url).hostname or ""
|
|
if not hostname:
|
|
return True
|
|
|
|
from ..context_vars import current_user as _cu
|
|
user = _cu.get()
|
|
if not user:
|
|
return True
|
|
|
|
return not await _is_domain_approved(user.id, hostname)
|
|
|
|
def confirmation_description(self, operation: str = "", url: str = "",
|
|
selector: str = "", value: str = "", key: str = "", **_) -> str:
|
|
loc = url or "current page"
|
|
if operation == "click":
|
|
return f"Click '{selector}' on {loc}"
|
|
if operation == "fill":
|
|
display_val = value[:40] + "…" if len(value) > 40 else value
|
|
return f"Type \"{display_val}\" into '{selector}' on {loc}"
|
|
if operation == "select":
|
|
return f"Select '{value}' in '{selector}' on {loc}"
|
|
if operation == "press":
|
|
return f"Press '{key}' on {loc}"
|
|
return super().confirmation_description(operation=operation, url=url)
|
|
|
|
# ── Session management ────────────────────────────────────────────────────
|
|
|
|
async def _get_page(self, session_id: str, url: str | None = None):
|
|
"""Get or create a page for this session; navigate to url if given."""
|
|
data = BrowserTool._sessions.get(session_id)
|
|
page = None
|
|
if data:
|
|
context, page = data
|
|
if page.is_closed():
|
|
try:
|
|
await context.close()
|
|
except Exception:
|
|
pass
|
|
page = None
|
|
|
|
if page is None:
|
|
browser = await self._get_browser()
|
|
context = await browser.new_context(user_agent=_USER_AGENT)
|
|
page = await context.new_page()
|
|
BrowserTool._sessions[session_id] = (context, page)
|
|
|
|
if url:
|
|
await page.goto(url, timeout=_TIMEOUT_MS, wait_until="domcontentloaded")
|
|
|
|
return page
|
|
|
|
async def _get_browser(self):
|
|
async with BrowserTool._lock:
|
|
if BrowserTool._browser is None or not BrowserTool._browser.is_connected():
|
|
from playwright.async_api import async_playwright
|
|
BrowserTool._playwright = await async_playwright().start()
|
|
BrowserTool._browser = await BrowserTool._playwright.chromium.launch(
|
|
args=["--no-sandbox", "--disable-dev-shm-usage"],
|
|
)
|
|
logger.info("[browser] Chromium launched")
|
|
return BrowserTool._browser
|
|
|
|
# ── Domain access check ───────────────────────────────────────────────────
|
|
|
|
async def _check_tier(self, url: str) -> ToolResult | None:
|
|
if await assert_domain_tier1(url):
|
|
return None
|
|
task_id = current_task_id.get()
|
|
if task_id is not None:
|
|
return None
|
|
if web_tier2_enabled.get():
|
|
return None
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(url)
|
|
return ToolResult(
|
|
success=False,
|
|
error=(
|
|
f"Domain '{parsed.hostname}' is not in the Tier 1 whitelist. "
|
|
"Ask me to access a specific external page to enable Tier 2 access."
|
|
),
|
|
)
|
|
|
|
# ── Execute ───────────────────────────────────────────────────────────────
|
|
|
|
async def execute(
|
|
self,
|
|
operation: str,
|
|
url: str = "",
|
|
selector: str = "",
|
|
value: str = "",
|
|
key: str = "",
|
|
wait_for: str = "",
|
|
extract_selector: str = "",
|
|
**_,
|
|
) -> ToolResult:
|
|
# Read ops require a url
|
|
if operation in ("fetch_page", "screenshot") and not url:
|
|
return ToolResult(success=False, error=f"'url' is required for {operation}")
|
|
|
|
try:
|
|
from playwright.async_api import async_playwright # noqa: F401
|
|
except ImportError:
|
|
return ToolResult(
|
|
success=False,
|
|
error="Playwright is not installed. Run: pip install playwright && playwright install chromium",
|
|
)
|
|
|
|
# Whitelist check
|
|
target_url = url
|
|
if not target_url:
|
|
sid = current_session_id.get() or "default"
|
|
data = BrowserTool._sessions.get(sid)
|
|
if data:
|
|
try:
|
|
target_url = data[1].url
|
|
except Exception:
|
|
pass
|
|
if target_url:
|
|
denied = await self._check_tier(target_url)
|
|
if denied:
|
|
return denied
|
|
|
|
session_id = current_session_id.get() or "default"
|
|
|
|
try:
|
|
page = await self._get_page(session_id, url or None)
|
|
|
|
if wait_for:
|
|
try:
|
|
await page.wait_for_selector(wait_for, timeout=10_000)
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Read operations ──────────────────────────────────────────────
|
|
|
|
if operation == "fetch_page":
|
|
if extract_selector:
|
|
elements = await page.query_selector_all(extract_selector)
|
|
text_parts = [await el.inner_text() for el in elements]
|
|
text = "\n".join(text_parts)
|
|
else:
|
|
text = await page.inner_text("body")
|
|
text = text[:_MAX_TEXT_CHARS]
|
|
text = await sanitize_external_content(text, source="browser")
|
|
return ToolResult(success=True, data={"url": page.url, "text": text, "length": len(text)})
|
|
|
|
if operation == "screenshot":
|
|
data = await page.screenshot(type="png")
|
|
import base64
|
|
return ToolResult(success=True, data={"screenshot_base64": base64.b64encode(data).decode()})
|
|
|
|
# ── Interactive operations ───────────────────────────────────────
|
|
|
|
if operation == "click":
|
|
if not selector:
|
|
return ToolResult(success=False, error="'selector' is required for click")
|
|
await page.click(selector, timeout=10_000)
|
|
try:
|
|
await page.wait_for_load_state("domcontentloaded", timeout=5_000)
|
|
except Exception:
|
|
pass
|
|
preview = (await page.inner_text("body"))[:2000]
|
|
return ToolResult(success=True, data={"url": page.url, "page_preview": preview})
|
|
|
|
if operation == "fill":
|
|
if not selector:
|
|
return ToolResult(success=False, error="'selector' is required for fill")
|
|
await page.fill(selector, value, timeout=10_000)
|
|
return ToolResult(success=True, data={"url": page.url, "filled": value})
|
|
|
|
if operation == "select":
|
|
if not selector:
|
|
return ToolResult(success=False, error="'selector' is required for select")
|
|
await page.select_option(selector, value=value, timeout=10_000)
|
|
return ToolResult(success=True, data={"url": page.url, "selected": value})
|
|
|
|
if operation == "press":
|
|
if not key:
|
|
return ToolResult(success=False, error="'key' is required for press")
|
|
target = selector if selector else "body"
|
|
await page.press(target, key, timeout=10_000)
|
|
try:
|
|
await page.wait_for_load_state("domcontentloaded", timeout=5_000)
|
|
except Exception:
|
|
pass
|
|
preview = (await page.inner_text("body"))[:2000]
|
|
return ToolResult(success=True, data={"url": page.url, "page_preview": preview})
|
|
|
|
return ToolResult(success=False, error=f"Unknown operation: {operation}")
|
|
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Browser error: {e}")
|