""" inbox/telegram_handler.py — Route Telegram /keyword messages to email handling agents. Called by the global Telegram listener before normal trigger matching. Returns True if the message was handled (consumed), False to fall through. """ from __future__ import annotations import logging logger = logging.getLogger(__name__) # Built-in commands handled directly without agent dispatch _BUILTIN = {"pause", "resume", "status"} async def handle_keyword_message( chat_id: str, user_id: str | None, keyword: str, message: str, ) -> bool: """ Returns True if a matching email account was found and the message was handled. message is the text AFTER the /keyword prefix (stripped). """ from ..database import get_pool from .accounts import get_account, pause_account, resume_account pool = await get_pool() # Find email account matching keyword + chat_id (security: must match bound chat) row = await pool.fetchrow( "SELECT * FROM email_accounts WHERE telegram_keyword = $1 AND telegram_chat_id = $2", keyword.lower(), str(chat_id), ) if row is None: return False account_id = str(row["id"]) from .accounts import get_account as _get_account account = await _get_account(account_id) if account is None: return False label = account.get("label", keyword) # ── Built-in commands ──────────────────────────────────────────────────── cmd = message.strip().lower().split()[0] if message.strip() else "" if cmd == "pause": await pause_account(account_id) from ..inbox.listener import inbox_listener inbox_listener.stop_account(account_id) await _send_reply(chat_id, account, f"⏸ *{label}* listener paused. Send `/{keyword} resume` to restart.") logger.info("[telegram-handler] paused account %s (%s)", account_id, label) return True if cmd == "resume": await resume_account(account_id) from ..inbox.listener import inbox_listener from ..inbox.accounts import get_account as _get updated = await _get(account_id) if updated: inbox_listener.start_account(account_id, updated) await _send_reply(chat_id, account, f"▶ *{label}* listener resumed.") logger.info("[telegram-handler] resumed account %s (%s)", account_id, label) return True if cmd == "status": enabled = account.get("enabled", False) paused = account.get("paused", False) state = "paused" if paused else ("enabled" if enabled else "disabled") reply = ( f"📊 *{label}* status\n" f"State: {state}\n" f"IMAP: {account.get('imap_username', '?')}\n" f"Keyword: /{keyword}" ) await _send_reply(chat_id, account, reply) return True # ── Agent dispatch ─────────────────────────────────────────────────────── agent_id = str(account.get("agent_id") or "") if not agent_id: await _send_reply(chat_id, account, f"⚠️ No agent configured for *{label}*.") return True # Build extra tools (same as email processing dispatch) from ..tools.email_handling_tool import EmailHandlingTool from ..tools.telegram_tool import BoundTelegramTool extra_tools = [EmailHandlingTool(account=account)] tg_chat_id = account.get("telegram_chat_id") or "" tg_keyword = account.get("telegram_keyword") or "" if tg_chat_id: extra_tools.append(BoundTelegramTool(chat_id=tg_chat_id, reply_keyword=tg_keyword)) # Add BoundFilesystemTool scoped to user's provisioned folder if user_id: from ..users import get_user_folder data_folder = await get_user_folder(str(user_id)) if data_folder: from ..tools.bound_filesystem_tool import BoundFilesystemTool extra_tools.append(BoundFilesystemTool(base_path=data_folder)) from ..agents.runner import agent_runner task_message = ( f"The user sent you a message via Telegram:\n\n{message}\n\n" f"Respond via Telegram (/{keyword}). " f"Read your memory file first if you need context." ) try: await agent_runner.run_agent_and_wait( agent_id, override_message=task_message, extra_tools=extra_tools, force_only_extra_tools=True, ) except Exception as e: logger.error("[telegram-handler] agent dispatch failed for %s: %s", label, e) await _send_reply(chat_id, account, f"⚠️ Error dispatching to *{label}* agent: {e}") return True async def _send_reply(chat_id: str, account: dict, text: str) -> None: """Send a Telegram reply using the account's bound token.""" import httpx from ..database import credential_store, user_settings_store token = await credential_store.get("telegram:bot_token") if not token and account.get("user_id"): token = await user_settings_store.get(str(account["user_id"]), "telegram_bot_token") if not token: return try: async with httpx.AsyncClient(timeout=10) as http: await http.post( f"https://api.telegram.org/bot{token}/sendMessage", json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"}, ) except Exception as e: logger.warning("[telegram-handler] reply send failed: %s", e)