""" tools/webhook_tool.py — Outbound webhook tool. Agents can POST a JSON payload to a pre-configured named target in the webhook_targets table. The target URL and optional auth secret are managed via Settings → Webhooks → Outbound Targets. """ from __future__ import annotations import json import httpx from ..context_vars import current_user from .base import BaseTool, ToolResult class WebhookTool(BaseTool): name = "webhook" description = ( "Send a JSON payload to a configured outbound webhook target. " "Use this to notify external services (e.g. Home Assistant, Zapier, custom APIs). " "List available targets with operation='list_targets', then send with operation='send'." ) input_schema = { "type": "object", "properties": { "operation": { "type": "string", "enum": ["send", "list_targets"], "description": "Operation: 'send' to POST to a target, 'list_targets' to see available targets.", }, "target": { "type": "string", "description": "Target name (as configured in Settings → Webhooks). Required for 'send'.", }, "payload": { "type": "object", "description": "JSON payload to POST. Required for 'send'.", }, }, "required": ["operation"], } requires_confirmation = True allowed_in_scheduled_tasks = True async def execute(self, operation: str, target: str = "", payload: dict | None = None, **_) -> ToolResult: if operation == "list_targets": return await self._list_targets() if operation == "send": if not target: return ToolResult(success=False, error="'target' is required for send operation") return await self._send(target, payload or {}) return ToolResult(success=False, error=f"Unknown operation: {operation}") def _current_user_id(self) -> str | None: try: u = current_user.get() return u.id if u else None except Exception: return None async def _list_targets(self) -> ToolResult: from ..database import get_pool pool = await get_pool() user_id = self._current_user_id() if user_id: rows = await pool.fetch( """ SELECT name, url, enabled FROM webhook_targets WHERE (owner_user_id = $1 OR owner_user_id IS NULL) ORDER BY owner_user_id NULLS LAST, name """, user_id, ) else: rows = await pool.fetch( "SELECT name, url, enabled FROM webhook_targets WHERE owner_user_id IS NULL ORDER BY name" ) targets = [{"name": r["name"], "url": r["url"], "enabled": r["enabled"]} for r in rows] return ToolResult(success=True, data={"targets": targets}) async def _send(self, target_name: str, payload: dict) -> ToolResult: from ..database import get_pool pool = await get_pool() user_id = self._current_user_id() # User-scoped target takes priority over global (NULL owner) target with same name row = None if user_id: row = await pool.fetchrow( "SELECT * FROM webhook_targets WHERE name = $1 AND owner_user_id = $2 AND enabled = TRUE", target_name, user_id, ) if not row: row = await pool.fetchrow( "SELECT * FROM webhook_targets WHERE name = $1 AND owner_user_id IS NULL AND enabled = TRUE", target_name, ) if not row: return ToolResult( success=False, error=f"No enabled webhook target named '{target_name}'. Use list_targets to see available targets.", ) url: str = row["url"] secret: str | None = row.get("secret_header") headers = {"Content-Type": "application/json"} if secret: headers["Authorization"] = f"Bearer {secret}" try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.post(url, json=payload, headers=headers) body = resp.text[:500] if resp.text else "" return ToolResult( success=True, data={ "status_code": resp.status_code, "ok": resp.is_success, "response": body, }, ) except httpx.TimeoutException: return ToolResult(success=False, error=f"Request to '{target_name}' timed out after 15 seconds") except Exception as e: return ToolResult(success=False, error=f"Request to '{target_name}' failed: {e}") def confirmation_description(self, operation: str = "", target: str = "", payload: dict | None = None, **_) -> str: if operation == "send": snippet = json.dumps(payload or {})[:100] return f"POST to webhook target '{target}' with payload: {snippet}" return "List webhook targets"