Files
oai-web/server/tools/email_handling_tool.py
2026-04-08 12:43:24 +02:00

384 lines
15 KiB
Python

"""
tools/email_handling_tool.py — Read/organise email tool for handling accounts.
NOT in the global tool registry.
Instantiated at dispatch time with decrypted account credentials.
Passed as extra_tools to agent.run() with force_only_extra_tools=True.
Deliberately excludes: send, reply, forward, create draft, delete email,
expunge folder, any SMTP operation.
Uses imaplib (stdlib) via asyncio.to_thread — avoids aioimaplib auth issues
with some Dovecot configurations.
"""
from __future__ import annotations
import asyncio
import email as email_lib
import imaplib
import logging
import re
from .base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
class EmailHandlingTool(BaseTool):
name = "email_handling"
description = "Read, organise and manage emails within the configured folders." # overridden in __init__
input_schema = {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": [
"list_emails",
"read_email",
"mark_email",
"move_email",
"list_folders",
"create_folder",
],
"description": "The operation to perform",
},
"folder": {
"type": "string",
"description": "IMAP folder name (default: INBOX)",
},
"uid": {
"type": "string",
"description": "Email UID for read_email, mark_email, move_email",
},
"limit": {
"type": "integer",
"description": "Max emails to return for list_emails (default: 20, max: 100)",
},
"unread_only": {
"type": "boolean",
"description": "Only list unread emails (for list_emails)",
},
"search": {
"type": "string",
"description": "IMAP SEARCH criteria string (for list_emails)",
},
"flag": {
"type": "string",
"enum": ["read", "unread", "flagged", "unflagged", "spam"],
"description": "Flag action for mark_email",
},
"source_folder": {
"type": "string",
"description": "Source folder for move_email",
},
"target_folder": {
"type": "string",
"description": "Target folder for move_email or parent for create_folder",
},
"name": {
"type": "string",
"description": "New folder name for create_folder",
},
},
"required": ["operation"],
}
requires_confirmation = False
allowed_in_scheduled_tasks = True
def __init__(self, account: dict) -> None:
"""account: dict with decrypted imap_host/port/username/password."""
self._host = account["imap_host"]
self._port = int(account.get("imap_port") or 993)
self._username = account["imap_username"]
self._password = account["imap_password"]
raw = account.get("monitored_folders")
if raw is None:
self._allowed_folders: list[str] | None = None # None = no folder restriction
elif isinstance(raw, list):
self._allowed_folders = raw if raw else None
else:
self._allowed_folders = ["INBOX"]
if self._allowed_folders:
folder_list = ", ".join(repr(f) for f in self._allowed_folders)
restriction = f"You may ONLY access these folders (and their subfolders): {folder_list}. Any attempt to read from or move to a folder outside this list will be rejected."
else:
restriction = "You may access all folders."
self.description = (
f"Read, organise and manage emails. "
f"Operations: list_emails, read_email, mark_email, move_email, list_folders, create_folder. "
f"Cannot send, delete or permanently expunge emails. "
f"{restriction}"
)
def _check_folder(self, folder: str) -> str | None:
"""Return an error string if folder is outside allowed_folders, else None."""
if self._allowed_folders is None:
return None
for allowed in self._allowed_folders:
if folder == allowed or folder.startswith(allowed.rstrip("/") + "/"):
return None
return (
f"Folder {folder!r} is outside the allowed folders for this account: "
+ ", ".join(repr(f) for f in self._allowed_folders)
)
def _open(self) -> imaplib.IMAP4_SSL:
"""Open and authenticate a synchronous IMAP4_SSL connection."""
M = imaplib.IMAP4_SSL(self._host, self._port)
M.login(self._username, self._password)
return M
async def execute(self, operation: str = "", **kwargs) -> ToolResult:
try:
if operation == "list_emails":
return await self._list_emails(**kwargs)
elif operation == "read_email":
return await self._read_email(**kwargs)
elif operation == "mark_email":
return await self._mark_email(**kwargs)
elif operation == "move_email":
return await self._move_email(**kwargs)
elif operation == "list_folders":
return await self._list_folders()
elif operation == "create_folder":
return await self._create_folder(**kwargs)
else:
return ToolResult(success=False, error=f"Unknown operation: {operation!r}")
except Exception as e:
logger.error("[email_handling] %s error: %s", operation, e)
return ToolResult(success=False, error=str(e))
# ── Operations (run blocking imaplib calls in a thread) ───────────────────
async def _list_emails(
self,
folder: str = "INBOX",
limit: int = 20,
unread_only: bool = False,
search: str = "",
**_,
) -> ToolResult:
err = self._check_folder(folder)
if err:
return ToolResult(success=False, error=err)
limit = min(int(limit), 100)
return await asyncio.to_thread(
self._sync_list_emails, folder, limit, unread_only, search
)
def _sync_list_emails(
self, folder: str, limit: int, unread_only: bool, search: str
) -> ToolResult:
M = self._open()
try:
M.select(folder, readonly=True)
criteria = search if search else ("UNSEEN" if unread_only else "ALL")
typ, data = M.search(None, criteria)
if typ != "OK" or not data or not data[0]:
return ToolResult(success=True, data={"emails": [], "count": 0, "folder": folder})
nums = data[0].split()
nums = nums[-limit:][::-1] # most recent first
emails = []
for num in nums:
typ2, msg_data = M.fetch(
num, "(FLAGS BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)])"
)
if typ2 != "OK" or not msg_data or not msg_data[0]:
continue
flags_str = str(msg_data[0][0]) if isinstance(msg_data[0], tuple) else str(msg_data[0])
header_bytes = msg_data[0][1] if isinstance(msg_data[0], tuple) else b""
msg = email_lib.message_from_bytes(header_bytes)
is_unread = "\\Seen" not in flags_str
emails.append({
"uid": num.decode() if isinstance(num, bytes) else str(num),
"from": msg.get("From", ""),
"to": msg.get("To", ""),
"subject": msg.get("Subject", ""),
"date": msg.get("Date", ""),
"unread": is_unread,
})
return ToolResult(success=True, data={"emails": emails, "count": len(emails), "folder": folder})
finally:
_close(M)
async def _read_email(self, uid: str = "", folder: str = "INBOX", **_) -> ToolResult:
if not uid:
return ToolResult(success=False, error="uid is required")
err = self._check_folder(folder)
if err:
return ToolResult(success=False, error=err)
return await asyncio.to_thread(self._sync_read_email, uid, folder)
def _sync_read_email(self, uid: str, folder: str) -> ToolResult:
M = self._open()
try:
M.select(folder, readonly=True)
typ, data = M.fetch(uid, "(FLAGS BODY.PEEK[])")
if typ != "OK" or not data or not data[0]:
return ToolResult(success=False, error=f"Cannot fetch message uid={uid}")
flags_str = str(data[0][0]) if isinstance(data[0], tuple) else str(data[0])
raw = data[0][1] if isinstance(data[0], tuple) else b""
msg = email_lib.message_from_bytes(raw)
is_unread = "\\Seen" not in flags_str
body = _extract_body(msg)
return ToolResult(success=True, data={
"uid": uid,
"folder": folder,
"from": msg.get("From", ""),
"to": msg.get("To", ""),
"cc": msg.get("Cc", ""),
"subject": msg.get("Subject", ""),
"date": msg.get("Date", ""),
"unread": is_unread,
"body": body[:6000],
})
finally:
_close(M)
async def _mark_email(
self, uid: str = "", folder: str = "INBOX", flag: str = "read", **_
) -> ToolResult:
if not uid:
return ToolResult(success=False, error="uid is required")
err = self._check_folder(folder)
if err:
return ToolResult(success=False, error=err)
flag_map = {
"read": ("+FLAGS", "\\Seen"),
"unread": ("-FLAGS", "\\Seen"),
"flagged": ("+FLAGS", "\\Flagged"),
"unflagged": ("-FLAGS", "\\Flagged"),
"spam": ("+FLAGS", "Junk"),
}
if flag not in flag_map:
return ToolResult(success=False, error=f"Unknown flag: {flag!r}")
return await asyncio.to_thread(self._sync_mark_email, uid, folder, flag_map[flag])
def _sync_mark_email(self, uid: str, folder: str, flag_op: tuple) -> ToolResult:
action, imap_flag = flag_op
M = self._open()
try:
M.select(folder)
typ, _ = M.store(uid, action, imap_flag)
if typ != "OK":
return ToolResult(success=False, error=f"Failed to mark email uid={uid}")
return ToolResult(success=True, data={"uid": uid, "flag": action})
finally:
_close(M)
async def _move_email(
self,
uid: str = "",
source_folder: str = "INBOX",
target_folder: str = "",
**_,
) -> ToolResult:
if not uid:
return ToolResult(success=False, error="uid is required")
if not target_folder:
return ToolResult(success=False, error="target_folder is required")
for folder, label in ((source_folder, "source_folder"), (target_folder, "target_folder")):
err = self._check_folder(folder)
if err:
return ToolResult(success=False, error=f"{label}: {err}")
return await asyncio.to_thread(self._sync_move_email, uid, source_folder, target_folder)
def _sync_move_email(self, uid: str, source_folder: str, target_folder: str) -> ToolResult:
M = self._open()
try:
M.select(source_folder)
typ, _ = M.copy(uid, target_folder)
if typ != "OK":
return ToolResult(success=False, error=f"Failed to copy to {target_folder!r}")
M.store(uid, "+FLAGS", "\\Deleted")
M.expunge()
return ToolResult(success=True, data={"uid": uid, "moved_to": target_folder})
finally:
_close(M)
async def _list_folders(self, **_) -> ToolResult:
return await asyncio.to_thread(self._sync_list_folders)
def _sync_list_folders(self) -> ToolResult:
M = self._open()
try:
typ, data = M.list()
if typ != "OK":
return ToolResult(success=False, error="Failed to list folders")
folders = []
for line in data:
if not line:
continue
line_str = line.decode() if isinstance(line, bytes) else str(line)
match = re.search(r'"([^"]+)"\s*$', line_str) or re.search(r'(\S+)\s*$', line_str)
if match:
name = match.group(1)
if name and name.upper() != "NIL":
folders.append(name)
if self._allowed_folders is not None:
folders = [
f for f in folders
if any(
f == a or f.startswith(a.rstrip("/") + "/")
for a in self._allowed_folders
)
]
return ToolResult(success=True, data={"folders": folders, "count": len(folders)})
finally:
_close(M)
async def _create_folder(self, name: str = "", target_folder: str = "", **_) -> ToolResult:
if not name:
return ToolResult(success=False, error="name is required")
full_name = f"{target_folder}/{name}" if target_folder else name
err = self._check_folder(full_name)
if err:
return ToolResult(success=False, error=err)
return await asyncio.to_thread(self._sync_create_folder, full_name)
def _sync_create_folder(self, full_name: str) -> ToolResult:
M = self._open()
try:
typ, _ = M.create(full_name)
if typ != "OK":
return ToolResult(success=False, error=f"Failed to create folder {full_name!r}")
return ToolResult(success=True, data={"folder": full_name, "created": True})
finally:
_close(M)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _close(M: imaplib.IMAP4_SSL) -> None:
try:
M.logout()
except Exception:
pass
def _extract_body(msg: email_lib.message.Message) -> str:
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
return payload.decode("utf-8", errors="replace") if payload else ""
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
html = payload.decode("utf-8", errors="replace") if payload else ""
return re.sub(r"<[^>]+>", "", html).strip()
else:
payload = msg.get_payload(decode=True)
return payload.decode("utf-8", errors="replace") if payload else ""
return ""