""" tools/email_handling_tool.py — Read/organise email tool for handling accounts. NOT in the global tool registry. Instantiated at dispatch time with decrypted account credentials. Passed as extra_tools to agent.run() with force_only_extra_tools=True. Deliberately excludes: send, reply, forward, create draft, delete email, expunge folder, any SMTP operation. Uses imaplib (stdlib) via asyncio.to_thread — avoids aioimaplib auth issues with some Dovecot configurations. """ from __future__ import annotations import asyncio import email as email_lib import imaplib import logging import re from .base import BaseTool, ToolResult logger = logging.getLogger(__name__) class EmailHandlingTool(BaseTool): name = "email_handling" description = "Read, organise and manage emails within the configured folders." # overridden in __init__ input_schema = { "type": "object", "properties": { "operation": { "type": "string", "enum": [ "list_emails", "read_email", "mark_email", "move_email", "list_folders", "create_folder", ], "description": "The operation to perform", }, "folder": { "type": "string", "description": "IMAP folder name (default: INBOX)", }, "uid": { "type": "string", "description": "Email UID for read_email, mark_email, move_email", }, "limit": { "type": "integer", "description": "Max emails to return for list_emails (default: 20, max: 100)", }, "unread_only": { "type": "boolean", "description": "Only list unread emails (for list_emails)", }, "search": { "type": "string", "description": "IMAP SEARCH criteria string (for list_emails)", }, "flag": { "type": "string", "enum": ["read", "unread", "flagged", "unflagged", "spam"], "description": "Flag action for mark_email", }, "source_folder": { "type": "string", "description": "Source folder for move_email", }, "target_folder": { "type": "string", "description": "Target folder for move_email or parent for create_folder", }, "name": { "type": "string", "description": "New folder name for create_folder", }, }, "required": ["operation"], } requires_confirmation = False allowed_in_scheduled_tasks = True def __init__(self, account: dict) -> None: """account: dict with decrypted imap_host/port/username/password.""" self._host = account["imap_host"] self._port = int(account.get("imap_port") or 993) self._username = account["imap_username"] self._password = account["imap_password"] raw = account.get("monitored_folders") if raw is None: self._allowed_folders: list[str] | None = None # None = no folder restriction elif isinstance(raw, list): self._allowed_folders = raw if raw else None else: self._allowed_folders = ["INBOX"] if self._allowed_folders: folder_list = ", ".join(repr(f) for f in self._allowed_folders) restriction = f"You may ONLY access these folders (and their subfolders): {folder_list}. Any attempt to read from or move to a folder outside this list will be rejected." else: restriction = "You may access all folders." self.description = ( f"Read, organise and manage emails. " f"Operations: list_emails, read_email, mark_email, move_email, list_folders, create_folder. " f"Cannot send, delete or permanently expunge emails. " f"{restriction}" ) def _check_folder(self, folder: str) -> str | None: """Return an error string if folder is outside allowed_folders, else None.""" if self._allowed_folders is None: return None for allowed in self._allowed_folders: if folder == allowed or folder.startswith(allowed.rstrip("/") + "/"): return None return ( f"Folder {folder!r} is outside the allowed folders for this account: " + ", ".join(repr(f) for f in self._allowed_folders) ) def _open(self) -> imaplib.IMAP4_SSL: """Open and authenticate a synchronous IMAP4_SSL connection.""" M = imaplib.IMAP4_SSL(self._host, self._port) M.login(self._username, self._password) return M async def execute(self, operation: str = "", **kwargs) -> ToolResult: try: if operation == "list_emails": return await self._list_emails(**kwargs) elif operation == "read_email": return await self._read_email(**kwargs) elif operation == "mark_email": return await self._mark_email(**kwargs) elif operation == "move_email": return await self._move_email(**kwargs) elif operation == "list_folders": return await self._list_folders() elif operation == "create_folder": return await self._create_folder(**kwargs) else: return ToolResult(success=False, error=f"Unknown operation: {operation!r}") except Exception as e: logger.error("[email_handling] %s error: %s", operation, e) return ToolResult(success=False, error=str(e)) # ── Operations (run blocking imaplib calls in a thread) ─────────────────── async def _list_emails( self, folder: str = "INBOX", limit: int = 20, unread_only: bool = False, search: str = "", **_, ) -> ToolResult: err = self._check_folder(folder) if err: return ToolResult(success=False, error=err) limit = min(int(limit), 100) return await asyncio.to_thread( self._sync_list_emails, folder, limit, unread_only, search ) def _sync_list_emails( self, folder: str, limit: int, unread_only: bool, search: str ) -> ToolResult: M = self._open() try: M.select(folder, readonly=True) criteria = search if search else ("UNSEEN" if unread_only else "ALL") typ, data = M.search(None, criteria) if typ != "OK" or not data or not data[0]: return ToolResult(success=True, data={"emails": [], "count": 0, "folder": folder}) nums = data[0].split() nums = nums[-limit:][::-1] # most recent first emails = [] for num in nums: typ2, msg_data = M.fetch( num, "(FLAGS BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)])" ) if typ2 != "OK" or not msg_data or not msg_data[0]: continue flags_str = str(msg_data[0][0]) if isinstance(msg_data[0], tuple) else str(msg_data[0]) header_bytes = msg_data[0][1] if isinstance(msg_data[0], tuple) else b"" msg = email_lib.message_from_bytes(header_bytes) is_unread = "\\Seen" not in flags_str emails.append({ "uid": num.decode() if isinstance(num, bytes) else str(num), "from": msg.get("From", ""), "to": msg.get("To", ""), "subject": msg.get("Subject", ""), "date": msg.get("Date", ""), "unread": is_unread, }) return ToolResult(success=True, data={"emails": emails, "count": len(emails), "folder": folder}) finally: _close(M) async def _read_email(self, uid: str = "", folder: str = "INBOX", **_) -> ToolResult: if not uid: return ToolResult(success=False, error="uid is required") err = self._check_folder(folder) if err: return ToolResult(success=False, error=err) return await asyncio.to_thread(self._sync_read_email, uid, folder) def _sync_read_email(self, uid: str, folder: str) -> ToolResult: M = self._open() try: M.select(folder, readonly=True) typ, data = M.fetch(uid, "(FLAGS BODY.PEEK[])") if typ != "OK" or not data or not data[0]: return ToolResult(success=False, error=f"Cannot fetch message uid={uid}") flags_str = str(data[0][0]) if isinstance(data[0], tuple) else str(data[0]) raw = data[0][1] if isinstance(data[0], tuple) else b"" msg = email_lib.message_from_bytes(raw) is_unread = "\\Seen" not in flags_str body = _extract_body(msg) return ToolResult(success=True, data={ "uid": uid, "folder": folder, "from": msg.get("From", ""), "to": msg.get("To", ""), "cc": msg.get("Cc", ""), "subject": msg.get("Subject", ""), "date": msg.get("Date", ""), "unread": is_unread, "body": body[:6000], }) finally: _close(M) async def _mark_email( self, uid: str = "", folder: str = "INBOX", flag: str = "read", **_ ) -> ToolResult: if not uid: return ToolResult(success=False, error="uid is required") err = self._check_folder(folder) if err: return ToolResult(success=False, error=err) flag_map = { "read": ("+FLAGS", "\\Seen"), "unread": ("-FLAGS", "\\Seen"), "flagged": ("+FLAGS", "\\Flagged"), "unflagged": ("-FLAGS", "\\Flagged"), "spam": ("+FLAGS", "Junk"), } if flag not in flag_map: return ToolResult(success=False, error=f"Unknown flag: {flag!r}") return await asyncio.to_thread(self._sync_mark_email, uid, folder, flag_map[flag]) def _sync_mark_email(self, uid: str, folder: str, flag_op: tuple) -> ToolResult: action, imap_flag = flag_op M = self._open() try: M.select(folder) typ, _ = M.store(uid, action, imap_flag) if typ != "OK": return ToolResult(success=False, error=f"Failed to mark email uid={uid}") return ToolResult(success=True, data={"uid": uid, "flag": action}) finally: _close(M) async def _move_email( self, uid: str = "", source_folder: str = "INBOX", target_folder: str = "", **_, ) -> ToolResult: if not uid: return ToolResult(success=False, error="uid is required") if not target_folder: return ToolResult(success=False, error="target_folder is required") for folder, label in ((source_folder, "source_folder"), (target_folder, "target_folder")): err = self._check_folder(folder) if err: return ToolResult(success=False, error=f"{label}: {err}") return await asyncio.to_thread(self._sync_move_email, uid, source_folder, target_folder) def _sync_move_email(self, uid: str, source_folder: str, target_folder: str) -> ToolResult: M = self._open() try: M.select(source_folder) typ, _ = M.copy(uid, target_folder) if typ != "OK": return ToolResult(success=False, error=f"Failed to copy to {target_folder!r}") M.store(uid, "+FLAGS", "\\Deleted") M.expunge() return ToolResult(success=True, data={"uid": uid, "moved_to": target_folder}) finally: _close(M) async def _list_folders(self, **_) -> ToolResult: return await asyncio.to_thread(self._sync_list_folders) def _sync_list_folders(self) -> ToolResult: M = self._open() try: typ, data = M.list() if typ != "OK": return ToolResult(success=False, error="Failed to list folders") folders = [] for line in data: if not line: continue line_str = line.decode() if isinstance(line, bytes) else str(line) match = re.search(r'"([^"]+)"\s*$', line_str) or re.search(r'(\S+)\s*$', line_str) if match: name = match.group(1) if name and name.upper() != "NIL": folders.append(name) if self._allowed_folders is not None: folders = [ f for f in folders if any( f == a or f.startswith(a.rstrip("/") + "/") for a in self._allowed_folders ) ] return ToolResult(success=True, data={"folders": folders, "count": len(folders)}) finally: _close(M) async def _create_folder(self, name: str = "", target_folder: str = "", **_) -> ToolResult: if not name: return ToolResult(success=False, error="name is required") full_name = f"{target_folder}/{name}" if target_folder else name err = self._check_folder(full_name) if err: return ToolResult(success=False, error=err) return await asyncio.to_thread(self._sync_create_folder, full_name) def _sync_create_folder(self, full_name: str) -> ToolResult: M = self._open() try: typ, _ = M.create(full_name) if typ != "OK": return ToolResult(success=False, error=f"Failed to create folder {full_name!r}") return ToolResult(success=True, data={"folder": full_name, "created": True}) finally: _close(M) # ── Helpers ─────────────────────────────────────────────────────────────────── def _close(M: imaplib.IMAP4_SSL) -> None: try: M.logout() except Exception: pass def _extract_body(msg: email_lib.message.Message) -> str: if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) return payload.decode("utf-8", errors="replace") if payload else "" for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) html = payload.decode("utf-8", errors="replace") if payload else "" return re.sub(r"<[^>]+>", "", html).strip() else: payload = msg.get_payload(decode=True) return payload.decode("utf-8", errors="replace") if payload else "" return ""