""" webhooks/endpoints.py — CRUD for inbound webhook trigger endpoints. Each endpoint has a secret token. When the token is presented via GET ?q=... or POST {"message": "..."}, the associated agent is triggered. """ from __future__ import annotations import secrets from datetime import datetime, timezone from ..database import _rowcount, get_pool def _utcnow() -> str: return datetime.now(timezone.utc).isoformat() async def create_endpoint( name: str, agent_id: str, description: str = "", allow_get: bool = True, owner_user_id: str | None = None, ) -> dict: """Create a new webhook endpoint. Returns the full row including the plaintext token.""" token = secrets.token_urlsafe(32) pool = await get_pool() row = await pool.fetchrow( """ INSERT INTO webhook_endpoints (name, token, agent_id, description, allow_get, owner_user_id, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * """, name, token, agent_id or None, description, allow_get, owner_user_id, _utcnow(), ) return dict(row) async def list_endpoints(owner_user_id: str | None = None) -> list[dict]: """List endpoints. Pass owner_user_id to scope to one user; None returns all (admin). Token never included.""" pool = await get_pool() if owner_user_id: rows = await pool.fetch( "SELECT * FROM webhook_endpoints WHERE owner_user_id = $1 ORDER BY created_at DESC", owner_user_id, ) else: rows = await pool.fetch( "SELECT * FROM webhook_endpoints ORDER BY created_at DESC" ) result = [] for row in rows: d = dict(row) d.pop("token", None) result.append(d) return result async def get_endpoint(endpoint_id: str, owner_user_id: str | None = None) -> dict | None: """Get one endpoint by ID. Token is not included. Pass owner_user_id to enforce ownership.""" pool = await get_pool() if owner_user_id: row = await pool.fetchrow( "SELECT * FROM webhook_endpoints WHERE id = $1::uuid AND owner_user_id = $2", endpoint_id, owner_user_id, ) else: row = await pool.fetchrow( "SELECT * FROM webhook_endpoints WHERE id = $1::uuid", endpoint_id ) if not row: return None d = dict(row) d.pop("token", None) return d async def get_by_token(token: str) -> dict | None: """Look up an enabled endpoint by its secret token (includes token field).""" pool = await get_pool() row = await pool.fetchrow( "SELECT * FROM webhook_endpoints WHERE token = $1 AND enabled = TRUE", token ) return dict(row) if row else None async def update_endpoint(endpoint_id: str, **fields) -> dict | None: allowed = {"name", "agent_id", "description", "allow_get", "enabled"} updates = {k: v for k, v in fields.items() if k in allowed} if not updates: return await get_endpoint(endpoint_id) pool = await get_pool() set_clauses = ", ".join(f"{k} = ${i + 2}" for i, k in enumerate(updates)) await pool.execute( f"UPDATE webhook_endpoints SET {set_clauses} WHERE id = $1::uuid", endpoint_id, *updates.values(), ) return await get_endpoint(endpoint_id) async def rotate_token(endpoint_id: str) -> str: """Generate and store a new token. Returns the new plaintext token.""" new_token = secrets.token_urlsafe(32) pool = await get_pool() await pool.execute( "UPDATE webhook_endpoints SET token = $1 WHERE id = $2::uuid", new_token, endpoint_id, ) return new_token async def delete_endpoint(endpoint_id: str, owner_user_id: str | None = None) -> bool: pool = await get_pool() if owner_user_id: status = await pool.execute( "DELETE FROM webhook_endpoints WHERE id = $1::uuid AND owner_user_id = $2", endpoint_id, owner_user_id, ) else: status = await pool.execute( "DELETE FROM webhook_endpoints WHERE id = $1::uuid", endpoint_id ) return _rowcount(status) > 0 async def record_trigger(endpoint_id: str) -> None: """Increment trigger_count and update last_triggered_at.""" pool = await get_pool() await pool.execute( """ UPDATE webhook_endpoints SET last_triggered_at = $1, trigger_count = COALESCE(trigger_count, 0) + 1 WHERE id = $2::uuid """, _utcnow(), endpoint_id, )