147 lines
5.5 KiB
Python
147 lines
5.5 KiB
Python
"""
|
|
inbox/telegram_handler.py — Route Telegram /keyword messages to email handling agents.
|
|
|
|
Called by the global Telegram listener before normal trigger matching.
|
|
Returns True if the message was handled (consumed), False to fall through.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Built-in commands handled directly without agent dispatch
|
|
_BUILTIN = {"pause", "resume", "status"}
|
|
|
|
|
|
async def handle_keyword_message(
|
|
chat_id: str,
|
|
user_id: str | None,
|
|
keyword: str,
|
|
message: str,
|
|
) -> bool:
|
|
"""
|
|
Returns True if a matching email account was found and the message was handled.
|
|
message is the text AFTER the /keyword prefix (stripped).
|
|
"""
|
|
from ..database import get_pool
|
|
from .accounts import get_account, pause_account, resume_account
|
|
|
|
pool = await get_pool()
|
|
|
|
# Find email account matching keyword + chat_id (security: must match bound chat)
|
|
row = await pool.fetchrow(
|
|
"SELECT * FROM email_accounts WHERE telegram_keyword = $1 AND telegram_chat_id = $2",
|
|
keyword.lower(), str(chat_id),
|
|
)
|
|
if row is None:
|
|
return False
|
|
|
|
account_id = str(row["id"])
|
|
from .accounts import get_account as _get_account
|
|
account = await _get_account(account_id)
|
|
if account is None:
|
|
return False
|
|
label = account.get("label", keyword)
|
|
|
|
# ── Built-in commands ────────────────────────────────────────────────────
|
|
cmd = message.strip().lower().split()[0] if message.strip() else ""
|
|
|
|
if cmd == "pause":
|
|
await pause_account(account_id)
|
|
from ..inbox.listener import inbox_listener
|
|
inbox_listener.stop_account(account_id)
|
|
await _send_reply(chat_id, account, f"⏸ *{label}* listener paused. Send `/{keyword} resume` to restart.")
|
|
logger.info("[telegram-handler] paused account %s (%s)", account_id, label)
|
|
return True
|
|
|
|
if cmd == "resume":
|
|
await resume_account(account_id)
|
|
from ..inbox.listener import inbox_listener
|
|
from ..inbox.accounts import get_account as _get
|
|
updated = await _get(account_id)
|
|
if updated:
|
|
inbox_listener.start_account(account_id, updated)
|
|
await _send_reply(chat_id, account, f"▶ *{label}* listener resumed.")
|
|
logger.info("[telegram-handler] resumed account %s (%s)", account_id, label)
|
|
return True
|
|
|
|
if cmd == "status":
|
|
enabled = account.get("enabled", False)
|
|
paused = account.get("paused", False)
|
|
state = "paused" if paused else ("enabled" if enabled else "disabled")
|
|
reply = (
|
|
f"📊 *{label}* status\n"
|
|
f"State: {state}\n"
|
|
f"IMAP: {account.get('imap_username', '?')}\n"
|
|
f"Keyword: /{keyword}"
|
|
)
|
|
await _send_reply(chat_id, account, reply)
|
|
return True
|
|
|
|
# ── Agent dispatch ───────────────────────────────────────────────────────
|
|
agent_id = str(account.get("agent_id") or "")
|
|
if not agent_id:
|
|
await _send_reply(chat_id, account, f"⚠️ No agent configured for *{label}*.")
|
|
return True
|
|
|
|
# Build extra tools (same as email processing dispatch)
|
|
from ..tools.email_handling_tool import EmailHandlingTool
|
|
from ..tools.telegram_tool import BoundTelegramTool
|
|
extra_tools = [EmailHandlingTool(account=account)]
|
|
|
|
tg_chat_id = account.get("telegram_chat_id") or ""
|
|
tg_keyword = account.get("telegram_keyword") or ""
|
|
if tg_chat_id:
|
|
extra_tools.append(BoundTelegramTool(chat_id=tg_chat_id, reply_keyword=tg_keyword))
|
|
|
|
# Add BoundFilesystemTool scoped to user's provisioned folder
|
|
if user_id:
|
|
from ..users import get_user_folder
|
|
data_folder = await get_user_folder(str(user_id))
|
|
if data_folder:
|
|
from ..tools.bound_filesystem_tool import BoundFilesystemTool
|
|
extra_tools.append(BoundFilesystemTool(base_path=data_folder))
|
|
|
|
from ..agents.runner import agent_runner
|
|
|
|
task_message = (
|
|
f"The user sent you a message via Telegram:\n\n{message}\n\n"
|
|
f"Respond via Telegram (/{keyword}). "
|
|
f"Read your memory file first if you need context."
|
|
)
|
|
|
|
try:
|
|
await agent_runner.run_agent_and_wait(
|
|
agent_id,
|
|
override_message=task_message,
|
|
extra_tools=extra_tools,
|
|
force_only_extra_tools=True,
|
|
)
|
|
except Exception as e:
|
|
logger.error("[telegram-handler] agent dispatch failed for %s: %s", label, e)
|
|
await _send_reply(chat_id, account, f"⚠️ Error dispatching to *{label}* agent: {e}")
|
|
|
|
return True
|
|
|
|
|
|
async def _send_reply(chat_id: str, account: dict, text: str) -> None:
|
|
"""Send a Telegram reply using the account's bound token."""
|
|
import httpx
|
|
from ..database import credential_store, user_settings_store
|
|
|
|
token = await credential_store.get("telegram:bot_token")
|
|
if not token and account.get("user_id"):
|
|
token = await user_settings_store.get(str(account["user_id"]), "telegram_bot_token")
|
|
if not token:
|
|
return
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as http:
|
|
await http.post(
|
|
f"https://api.telegram.org/bot{token}/sendMessage",
|
|
json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"},
|
|
)
|
|
except Exception as e:
|
|
logger.warning("[telegram-handler] reply send failed: %s", e)
|