384 lines
15 KiB
Python
384 lines
15 KiB
Python
"""
|
|
tools/email_handling_tool.py — Read/organise email tool for handling accounts.
|
|
|
|
NOT in the global tool registry.
|
|
Instantiated at dispatch time with decrypted account credentials.
|
|
Passed as extra_tools to agent.run() with force_only_extra_tools=True.
|
|
|
|
Deliberately excludes: send, reply, forward, create draft, delete email,
|
|
expunge folder, any SMTP operation.
|
|
|
|
Uses imaplib (stdlib) via asyncio.to_thread — avoids aioimaplib auth issues
|
|
with some Dovecot configurations.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import email as email_lib
|
|
import imaplib
|
|
import logging
|
|
import re
|
|
|
|
from .base import BaseTool, ToolResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailHandlingTool(BaseTool):
|
|
name = "email_handling"
|
|
description = "Read, organise and manage emails within the configured folders." # overridden in __init__
|
|
input_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"operation": {
|
|
"type": "string",
|
|
"enum": [
|
|
"list_emails",
|
|
"read_email",
|
|
"mark_email",
|
|
"move_email",
|
|
"list_folders",
|
|
"create_folder",
|
|
],
|
|
"description": "The operation to perform",
|
|
},
|
|
"folder": {
|
|
"type": "string",
|
|
"description": "IMAP folder name (default: INBOX)",
|
|
},
|
|
"uid": {
|
|
"type": "string",
|
|
"description": "Email UID for read_email, mark_email, move_email",
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max emails to return for list_emails (default: 20, max: 100)",
|
|
},
|
|
"unread_only": {
|
|
"type": "boolean",
|
|
"description": "Only list unread emails (for list_emails)",
|
|
},
|
|
"search": {
|
|
"type": "string",
|
|
"description": "IMAP SEARCH criteria string (for list_emails)",
|
|
},
|
|
"flag": {
|
|
"type": "string",
|
|
"enum": ["read", "unread", "flagged", "unflagged", "spam"],
|
|
"description": "Flag action for mark_email",
|
|
},
|
|
"source_folder": {
|
|
"type": "string",
|
|
"description": "Source folder for move_email",
|
|
},
|
|
"target_folder": {
|
|
"type": "string",
|
|
"description": "Target folder for move_email or parent for create_folder",
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "New folder name for create_folder",
|
|
},
|
|
},
|
|
"required": ["operation"],
|
|
}
|
|
requires_confirmation = False
|
|
allowed_in_scheduled_tasks = True
|
|
|
|
def __init__(self, account: dict) -> None:
|
|
"""account: dict with decrypted imap_host/port/username/password."""
|
|
self._host = account["imap_host"]
|
|
self._port = int(account.get("imap_port") or 993)
|
|
self._username = account["imap_username"]
|
|
self._password = account["imap_password"]
|
|
raw = account.get("monitored_folders")
|
|
if raw is None:
|
|
self._allowed_folders: list[str] | None = None # None = no folder restriction
|
|
elif isinstance(raw, list):
|
|
self._allowed_folders = raw if raw else None
|
|
else:
|
|
self._allowed_folders = ["INBOX"]
|
|
if self._allowed_folders:
|
|
folder_list = ", ".join(repr(f) for f in self._allowed_folders)
|
|
restriction = f"You may ONLY access these folders (and their subfolders): {folder_list}. Any attempt to read from or move to a folder outside this list will be rejected."
|
|
else:
|
|
restriction = "You may access all folders."
|
|
self.description = (
|
|
f"Read, organise and manage emails. "
|
|
f"Operations: list_emails, read_email, mark_email, move_email, list_folders, create_folder. "
|
|
f"Cannot send, delete or permanently expunge emails. "
|
|
f"{restriction}"
|
|
)
|
|
|
|
def _check_folder(self, folder: str) -> str | None:
|
|
"""Return an error string if folder is outside allowed_folders, else None."""
|
|
if self._allowed_folders is None:
|
|
return None
|
|
for allowed in self._allowed_folders:
|
|
if folder == allowed or folder.startswith(allowed.rstrip("/") + "/"):
|
|
return None
|
|
return (
|
|
f"Folder {folder!r} is outside the allowed folders for this account: "
|
|
+ ", ".join(repr(f) for f in self._allowed_folders)
|
|
)
|
|
|
|
def _open(self) -> imaplib.IMAP4_SSL:
|
|
"""Open and authenticate a synchronous IMAP4_SSL connection."""
|
|
M = imaplib.IMAP4_SSL(self._host, self._port)
|
|
M.login(self._username, self._password)
|
|
return M
|
|
|
|
async def execute(self, operation: str = "", **kwargs) -> ToolResult:
|
|
try:
|
|
if operation == "list_emails":
|
|
return await self._list_emails(**kwargs)
|
|
elif operation == "read_email":
|
|
return await self._read_email(**kwargs)
|
|
elif operation == "mark_email":
|
|
return await self._mark_email(**kwargs)
|
|
elif operation == "move_email":
|
|
return await self._move_email(**kwargs)
|
|
elif operation == "list_folders":
|
|
return await self._list_folders()
|
|
elif operation == "create_folder":
|
|
return await self._create_folder(**kwargs)
|
|
else:
|
|
return ToolResult(success=False, error=f"Unknown operation: {operation!r}")
|
|
except Exception as e:
|
|
logger.error("[email_handling] %s error: %s", operation, e)
|
|
return ToolResult(success=False, error=str(e))
|
|
|
|
# ── Operations (run blocking imaplib calls in a thread) ───────────────────
|
|
|
|
async def _list_emails(
|
|
self,
|
|
folder: str = "INBOX",
|
|
limit: int = 20,
|
|
unread_only: bool = False,
|
|
search: str = "",
|
|
**_,
|
|
) -> ToolResult:
|
|
err = self._check_folder(folder)
|
|
if err:
|
|
return ToolResult(success=False, error=err)
|
|
limit = min(int(limit), 100)
|
|
return await asyncio.to_thread(
|
|
self._sync_list_emails, folder, limit, unread_only, search
|
|
)
|
|
|
|
def _sync_list_emails(
|
|
self, folder: str, limit: int, unread_only: bool, search: str
|
|
) -> ToolResult:
|
|
M = self._open()
|
|
try:
|
|
M.select(folder, readonly=True)
|
|
criteria = search if search else ("UNSEEN" if unread_only else "ALL")
|
|
typ, data = M.search(None, criteria)
|
|
if typ != "OK" or not data or not data[0]:
|
|
return ToolResult(success=True, data={"emails": [], "count": 0, "folder": folder})
|
|
|
|
nums = data[0].split()
|
|
nums = nums[-limit:][::-1] # most recent first
|
|
|
|
emails = []
|
|
for num in nums:
|
|
typ2, msg_data = M.fetch(
|
|
num, "(FLAGS BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)])"
|
|
)
|
|
if typ2 != "OK" or not msg_data or not msg_data[0]:
|
|
continue
|
|
flags_str = str(msg_data[0][0]) if isinstance(msg_data[0], tuple) else str(msg_data[0])
|
|
header_bytes = msg_data[0][1] if isinstance(msg_data[0], tuple) else b""
|
|
msg = email_lib.message_from_bytes(header_bytes)
|
|
is_unread = "\\Seen" not in flags_str
|
|
emails.append({
|
|
"uid": num.decode() if isinstance(num, bytes) else str(num),
|
|
"from": msg.get("From", ""),
|
|
"to": msg.get("To", ""),
|
|
"subject": msg.get("Subject", ""),
|
|
"date": msg.get("Date", ""),
|
|
"unread": is_unread,
|
|
})
|
|
|
|
return ToolResult(success=True, data={"emails": emails, "count": len(emails), "folder": folder})
|
|
finally:
|
|
_close(M)
|
|
|
|
async def _read_email(self, uid: str = "", folder: str = "INBOX", **_) -> ToolResult:
|
|
if not uid:
|
|
return ToolResult(success=False, error="uid is required")
|
|
err = self._check_folder(folder)
|
|
if err:
|
|
return ToolResult(success=False, error=err)
|
|
return await asyncio.to_thread(self._sync_read_email, uid, folder)
|
|
|
|
def _sync_read_email(self, uid: str, folder: str) -> ToolResult:
|
|
M = self._open()
|
|
try:
|
|
M.select(folder, readonly=True)
|
|
typ, data = M.fetch(uid, "(FLAGS BODY.PEEK[])")
|
|
if typ != "OK" or not data or not data[0]:
|
|
return ToolResult(success=False, error=f"Cannot fetch message uid={uid}")
|
|
|
|
flags_str = str(data[0][0]) if isinstance(data[0], tuple) else str(data[0])
|
|
raw = data[0][1] if isinstance(data[0], tuple) else b""
|
|
msg = email_lib.message_from_bytes(raw)
|
|
is_unread = "\\Seen" not in flags_str
|
|
body = _extract_body(msg)
|
|
|
|
return ToolResult(success=True, data={
|
|
"uid": uid,
|
|
"folder": folder,
|
|
"from": msg.get("From", ""),
|
|
"to": msg.get("To", ""),
|
|
"cc": msg.get("Cc", ""),
|
|
"subject": msg.get("Subject", ""),
|
|
"date": msg.get("Date", ""),
|
|
"unread": is_unread,
|
|
"body": body[:6000],
|
|
})
|
|
finally:
|
|
_close(M)
|
|
|
|
async def _mark_email(
|
|
self, uid: str = "", folder: str = "INBOX", flag: str = "read", **_
|
|
) -> ToolResult:
|
|
if not uid:
|
|
return ToolResult(success=False, error="uid is required")
|
|
err = self._check_folder(folder)
|
|
if err:
|
|
return ToolResult(success=False, error=err)
|
|
|
|
flag_map = {
|
|
"read": ("+FLAGS", "\\Seen"),
|
|
"unread": ("-FLAGS", "\\Seen"),
|
|
"flagged": ("+FLAGS", "\\Flagged"),
|
|
"unflagged": ("-FLAGS", "\\Flagged"),
|
|
"spam": ("+FLAGS", "Junk"),
|
|
}
|
|
if flag not in flag_map:
|
|
return ToolResult(success=False, error=f"Unknown flag: {flag!r}")
|
|
|
|
return await asyncio.to_thread(self._sync_mark_email, uid, folder, flag_map[flag])
|
|
|
|
def _sync_mark_email(self, uid: str, folder: str, flag_op: tuple) -> ToolResult:
|
|
action, imap_flag = flag_op
|
|
M = self._open()
|
|
try:
|
|
M.select(folder)
|
|
typ, _ = M.store(uid, action, imap_flag)
|
|
if typ != "OK":
|
|
return ToolResult(success=False, error=f"Failed to mark email uid={uid}")
|
|
return ToolResult(success=True, data={"uid": uid, "flag": action})
|
|
finally:
|
|
_close(M)
|
|
|
|
async def _move_email(
|
|
self,
|
|
uid: str = "",
|
|
source_folder: str = "INBOX",
|
|
target_folder: str = "",
|
|
**_,
|
|
) -> ToolResult:
|
|
if not uid:
|
|
return ToolResult(success=False, error="uid is required")
|
|
if not target_folder:
|
|
return ToolResult(success=False, error="target_folder is required")
|
|
for folder, label in ((source_folder, "source_folder"), (target_folder, "target_folder")):
|
|
err = self._check_folder(folder)
|
|
if err:
|
|
return ToolResult(success=False, error=f"{label}: {err}")
|
|
return await asyncio.to_thread(self._sync_move_email, uid, source_folder, target_folder)
|
|
|
|
def _sync_move_email(self, uid: str, source_folder: str, target_folder: str) -> ToolResult:
|
|
M = self._open()
|
|
try:
|
|
M.select(source_folder)
|
|
typ, _ = M.copy(uid, target_folder)
|
|
if typ != "OK":
|
|
return ToolResult(success=False, error=f"Failed to copy to {target_folder!r}")
|
|
M.store(uid, "+FLAGS", "\\Deleted")
|
|
M.expunge()
|
|
return ToolResult(success=True, data={"uid": uid, "moved_to": target_folder})
|
|
finally:
|
|
_close(M)
|
|
|
|
async def _list_folders(self, **_) -> ToolResult:
|
|
return await asyncio.to_thread(self._sync_list_folders)
|
|
|
|
def _sync_list_folders(self) -> ToolResult:
|
|
M = self._open()
|
|
try:
|
|
typ, data = M.list()
|
|
if typ != "OK":
|
|
return ToolResult(success=False, error="Failed to list folders")
|
|
|
|
folders = []
|
|
for line in data:
|
|
if not line:
|
|
continue
|
|
line_str = line.decode() if isinstance(line, bytes) else str(line)
|
|
match = re.search(r'"([^"]+)"\s*$', line_str) or re.search(r'(\S+)\s*$', line_str)
|
|
if match:
|
|
name = match.group(1)
|
|
if name and name.upper() != "NIL":
|
|
folders.append(name)
|
|
|
|
if self._allowed_folders is not None:
|
|
folders = [
|
|
f for f in folders
|
|
if any(
|
|
f == a or f.startswith(a.rstrip("/") + "/")
|
|
for a in self._allowed_folders
|
|
)
|
|
]
|
|
|
|
return ToolResult(success=True, data={"folders": folders, "count": len(folders)})
|
|
finally:
|
|
_close(M)
|
|
|
|
async def _create_folder(self, name: str = "", target_folder: str = "", **_) -> ToolResult:
|
|
if not name:
|
|
return ToolResult(success=False, error="name is required")
|
|
full_name = f"{target_folder}/{name}" if target_folder else name
|
|
err = self._check_folder(full_name)
|
|
if err:
|
|
return ToolResult(success=False, error=err)
|
|
return await asyncio.to_thread(self._sync_create_folder, full_name)
|
|
|
|
def _sync_create_folder(self, full_name: str) -> ToolResult:
|
|
M = self._open()
|
|
try:
|
|
typ, _ = M.create(full_name)
|
|
if typ != "OK":
|
|
return ToolResult(success=False, error=f"Failed to create folder {full_name!r}")
|
|
return ToolResult(success=True, data={"folder": full_name, "created": True})
|
|
finally:
|
|
_close(M)
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _close(M: imaplib.IMAP4_SSL) -> None:
|
|
try:
|
|
M.logout()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _extract_body(msg: email_lib.message.Message) -> str:
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
return payload.decode("utf-8", errors="replace") if payload else ""
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
html = payload.decode("utf-8", errors="replace") if payload else ""
|
|
return re.sub(r"<[^>]+>", "", html).strip()
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
return payload.decode("utf-8", errors="replace") if payload else ""
|
|
return ""
|