""" tools/telegram_tool.py — Outbound Telegram messages for agents. Sends to whitelisted chat IDs only. No confirmation required. """ from __future__ import annotations import httpx from ..database import credential_store from ..telegram.triggers import is_allowed from .base import BaseTool, ToolResult _API = "https://api.telegram.org/bot{token}/sendMessage" class TelegramTool(BaseTool): name = "telegram" description = ( "Send a Telegram message to a whitelisted chat ID. " "Use for notifications, alerts, and replies to Telegram users. " "chat_id must be in the Telegram whitelist." ) input_schema = { "type": "object", "properties": { "chat_id": { "type": "string", "description": "Telegram chat ID to send the message to", }, "message": { "type": "string", "description": "Message text (plain text, max 4096 characters)", }, }, "required": ["chat_id", "message"], } requires_confirmation = False allowed_in_scheduled_tasks = True async def execute(self, chat_id: str, message: str, **kwargs) -> ToolResult: # Resolve current user for per-user token and whitelist check from ..context_vars import current_user as _current_user user = _current_user.get(None) # Try global token first, then per-user token token = await credential_store.get("telegram:bot_token") if not token and user: from ..database import user_settings_store token = await user_settings_store.get(user.id, "telegram_bot_token") if not token: return ToolResult( success=False, error="Telegram is not configured. Add telegram:bot_token in Settings → Credentials.", ) # Security: chat_id must be whitelisted (check user scope, then global) allowed = await is_allowed(chat_id, user_id=user.id if user else "GLOBAL") if not allowed and user: allowed = await is_allowed(chat_id, user_id="GLOBAL") if not allowed: # Return whitelisted IDs so the model can retry with a real one from ..telegram.triggers import list_whitelist global_ids = [r["chat_id"] for r in await list_whitelist("GLOBAL")] user_ids = [r["chat_id"] for r in await list_whitelist(user.id)] if user else [] all_ids = list(dict.fromkeys(global_ids + user_ids)) # deduplicate, preserve order hint = f" Whitelisted chat_ids: {', '.join(all_ids)}." if all_ids else " No chat IDs are whitelisted yet — add one in Settings → Telegram." return ToolResult( success=False, error=f"chat_id '{chat_id}' is not in the Telegram whitelist.{hint}", ) try: async with httpx.AsyncClient(timeout=10) as http: resp = await http.post( _API.format(token=token), json={"chat_id": chat_id, "text": message[:4096]}, ) resp.raise_for_status() data = resp.json() if not data.get("ok"): return ToolResult(success=False, error=f"Telegram API error: {data}") return ToolResult(success=True, data={"sent": True, "chat_id": chat_id}) except httpx.HTTPStatusError as e: return ToolResult(success=False, error=f"Telegram HTTP error: {e.response.status_code}") except Exception as e: return ToolResult(success=False, error=f"Telegram error: {e}") class BoundTelegramTool(TelegramTool): """TelegramTool with a pre-configured chat_id — used by email handling accounts. The model only supplies the message; chat_id is fixed at account configuration time so the model cannot send to arbitrary chats. """ def __init__(self, chat_id: str, reply_keyword: str | None = None) -> None: self._bound_chat_id = chat_id self._reply_keyword = reply_keyword hint = f"\n\n💬 Reply: /{reply_keyword} " if reply_keyword else "" self._reply_hint = hint self.description = ( f"Send a Telegram message to the configured chat (chat_id {chat_id}). " "Only supply the message text — the destination is fixed. " f"A reply hint will be appended automatically{' (/' + reply_keyword + ')' if reply_keyword else ''}." ) self.input_schema = { "type": "object", "properties": { "message": { "type": "string", "description": "Message text (plain text, max 4096 characters)", }, }, "required": ["message"], } async def execute(self, message: str, **kwargs) -> ToolResult: # type: ignore[override] full_message = message + self._reply_hint if self._reply_hint else message return await super().execute(chat_id=self._bound_chat_id, message=full_message, **kwargs)