""" telegram/triggers.py — CRUD for telegram_triggers and telegram_whitelist tables (async). """ from __future__ import annotations import uuid from datetime import datetime, timezone from typing import Any from ..database import _rowcount, get_pool def _now() -> str: return datetime.now(timezone.utc).isoformat() # ── Trigger rules ───────────────────────────────────────────────────────────── async def list_triggers(user_id: str | None = "GLOBAL") -> list[dict]: """ - user_id="GLOBAL" (default): global triggers (user_id IS NULL) - user_id=None: ALL triggers - user_id="": that user's triggers only """ pool = await get_pool() if user_id == "GLOBAL": rows = await pool.fetch( "SELECT t.*, a.name AS agent_name " "FROM telegram_triggers t LEFT JOIN agents a ON a.id = t.agent_id " "WHERE t.user_id IS NULL ORDER BY t.created_at" ) elif user_id is None: rows = await pool.fetch( "SELECT t.*, a.name AS agent_name " "FROM telegram_triggers t LEFT JOIN agents a ON a.id = t.agent_id " "ORDER BY t.created_at" ) else: rows = await pool.fetch( "SELECT t.*, a.name AS agent_name " "FROM telegram_triggers t LEFT JOIN agents a ON a.id = t.agent_id " "WHERE t.user_id = $1 ORDER BY t.created_at", user_id, ) return [dict(r) for r in rows] async def create_trigger( trigger_word: str, agent_id: str, description: str = "", enabled: bool = True, user_id: str | None = None, ) -> dict: now = _now() trigger_id = str(uuid.uuid4()) pool = await get_pool() await pool.execute( """ INSERT INTO telegram_triggers (id, trigger_word, agent_id, description, enabled, user_id, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) """, trigger_id, trigger_word, agent_id, description, enabled, user_id, now, now, ) return { "id": trigger_id, "trigger_word": trigger_word, "agent_id": agent_id, "description": description, "enabled": enabled, "user_id": user_id, "created_at": now, "updated_at": now, } async def update_trigger(id: str, **fields) -> bool: fields["updated_at"] = _now() set_parts = [] values: list[Any] = [] for i, (k, v) in enumerate(fields.items(), start=1): set_parts.append(f"{k} = ${i}") values.append(v) id_param = len(fields) + 1 values.append(id) pool = await get_pool() status = await pool.execute( f"UPDATE telegram_triggers SET {', '.join(set_parts)} WHERE id = ${id_param}", *values, ) return _rowcount(status) > 0 async def delete_trigger(id: str) -> bool: pool = await get_pool() status = await pool.execute("DELETE FROM telegram_triggers WHERE id = $1", id) return _rowcount(status) > 0 async def toggle_trigger(id: str) -> None: pool = await get_pool() await pool.execute( "UPDATE telegram_triggers SET enabled = NOT enabled, updated_at = $1 WHERE id = $2", _now(), id, ) async def get_enabled_triggers(user_id: str | None = "GLOBAL") -> list[dict]: """Return enabled triggers scoped to user_id.""" pool = await get_pool() if user_id == "GLOBAL": rows = await pool.fetch( "SELECT * FROM telegram_triggers WHERE enabled = TRUE AND user_id IS NULL" ) elif user_id is None: rows = await pool.fetch("SELECT * FROM telegram_triggers WHERE enabled = TRUE") else: rows = await pool.fetch( "SELECT * FROM telegram_triggers WHERE enabled = TRUE AND user_id = $1", user_id, ) return [dict(r) for r in rows] # ── Chat ID whitelist ───────────────────────────────────────────────────────── async def list_whitelist(user_id: str | None = "GLOBAL") -> list[dict]: """ - user_id="GLOBAL" (default): global whitelist (user_id IS NULL) - user_id=None: ALL whitelist entries - user_id="": that user's entries """ pool = await get_pool() if user_id == "GLOBAL": rows = await pool.fetch( "SELECT * FROM telegram_whitelist WHERE user_id IS NULL ORDER BY created_at" ) elif user_id is None: rows = await pool.fetch("SELECT * FROM telegram_whitelist ORDER BY created_at") else: rows = await pool.fetch( "SELECT * FROM telegram_whitelist WHERE user_id = $1 ORDER BY created_at", user_id, ) return [dict(r) for r in rows] async def add_to_whitelist( chat_id: str, label: str = "", user_id: str | None = None, ) -> dict: now = _now() chat_id = str(chat_id) pool = await get_pool() await pool.execute( """ INSERT INTO telegram_whitelist (chat_id, label, user_id, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (chat_id, user_id) NULLS NOT DISTINCT DO UPDATE SET label = EXCLUDED.label """, chat_id, label, user_id, now, ) return {"chat_id": chat_id, "label": label, "user_id": user_id, "created_at": now} async def remove_from_whitelist(chat_id: str, user_id: str | None = "GLOBAL") -> bool: """Remove whitelist entry. user_id="GLOBAL" deletes only global entry (user_id IS NULL).""" pool = await get_pool() if user_id == "GLOBAL": status = await pool.execute( "DELETE FROM telegram_whitelist WHERE chat_id = $1 AND user_id IS NULL", str(chat_id) ) elif user_id is None: status = await pool.execute( "DELETE FROM telegram_whitelist WHERE chat_id = $1", str(chat_id) ) else: status = await pool.execute( "DELETE FROM telegram_whitelist WHERE chat_id = $1 AND user_id = $2", str(chat_id), user_id, ) return _rowcount(status) > 0 async def is_allowed(chat_id: str | int, user_id: str | None = "GLOBAL") -> bool: """Check if chat_id is whitelisted. Scoped to user_id (or global if "GLOBAL").""" pool = await get_pool() if user_id == "GLOBAL": row = await pool.fetchrow( "SELECT 1 FROM telegram_whitelist WHERE chat_id = $1 AND user_id IS NULL", str(chat_id), ) elif user_id is None: row = await pool.fetchrow( "SELECT 1 FROM telegram_whitelist WHERE chat_id = $1", str(chat_id) ) else: row = await pool.fetchrow( "SELECT 1 FROM telegram_whitelist WHERE chat_id = $1 AND user_id = $2", str(chat_id), user_id, ) return row is not None