Files
oai-web/server/tools/email_tool.py
2026-04-08 12:43:24 +02:00

398 lines
16 KiB
Python

"""
tools/email_tool.py — IMAP email reading + SMTP sending.
Read operations: list_emails, read_email — no confirmation required.
Send operation: send_email — whitelisted recipients only, requires confirmation.
Prompt injection guard: all email body text is sanitised before returning to agent.
Max body length: 10,000 characters (truncated with notice).
"""
from __future__ import annotations
import email as email_lib
import smtplib
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid, parseaddr
import imapclient
from bs4 import BeautifulSoup
from ..database import credential_store
from ..security import SecurityError, assert_email_rate_limit, assert_recipient_allowed, sanitize_external_content
from ..security_screening import get_content_limit, is_option_enabled
from .base import BaseTool, ToolResult
MAX_BODY_CHARS = 10_000 # legacy fallback when truncation option disabled
_DEFAULT_MAX_EMAIL_CHARS = 6_000 # default when truncation option enabled
_DEFAULT_MAX_SUBJECT_CHARS = 200 # default subject limit when truncation option enabled
MAX_LIST_EMAILS = 50
class EmailTool(BaseTool):
name = "email"
description = (
"Read and send emails via IMAP/SMTP (Mailcow). "
"Operations: list_emails (list inbox), read_email (read full message), "
"send_email (send to one or more whitelisted recipients — requires confirmation), "
"list_whitelist (return all approved recipient addresses). "
"Email bodies are sanitised before being returned."
)
input_schema = {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["list_emails", "read_email", "send_email", "list_whitelist"],
"description": "The email operation to perform. list_whitelist returns all approved recipient addresses.",
},
"folder": {
"type": "string",
"description": "IMAP folder (default: INBOX)",
},
"limit": {
"type": "integer",
"description": f"Max emails to list (default 20, max {MAX_LIST_EMAILS})",
},
"unread_only": {
"type": "boolean",
"description": "Only list unread emails (default false)",
},
"email_id": {
"type": "string",
"description": "Email UID for read_email",
},
"to": {
"anyOf": [
{"type": "string"},
{"type": "array", "items": {"type": "string"}},
],
"description": "Recipient address or list of addresses for send_email (all must be whitelisted)",
},
"subject": {
"type": "string",
"description": "Email subject for send_email",
},
"body": {
"type": "string",
"description": "Email body text (plain text) for send_email",
},
"html_body": {
"type": "string",
"description": "Full HTML email body for send_email. If provided, used as the HTML part instead of the plain-text fallback wrapper. Include complete <html>...</html> with inline <style>.",
},
"reply_to_id": {
"type": "string",
"description": "Email UID to reply to (sets In-Reply-To header)",
},
},
"required": ["operation"],
}
requires_confirmation = False # only send_email requires it — checked in execute()
allowed_in_scheduled_tasks = True
async def _load_credentials(self) -> tuple[str, str, str, str, int]:
"""Returns (imap_host, smtp_host, username, password, smtp_port)."""
base_host = await credential_store.require("mailcow_host")
username = await credential_store.require("mailcow_username")
password = await credential_store.require("mailcow_password")
imap_host = await credential_store.get("mailcow_imap_host") or base_host
smtp_host = await credential_store.get("mailcow_smtp_host") or base_host
smtp_port = int(await credential_store.get("mailcow_smtp_port") or "465")
return imap_host, smtp_host, username, password, smtp_port
async def execute(
self,
operation: str,
folder: str = "INBOX",
limit: int = 20,
unread_only: bool = False,
email_id: str = "",
to=None,
subject: str = "",
body: str = "",
html_body: str = "",
reply_to_id: str = "",
**kwargs,
) -> ToolResult:
if operation == "list_emails":
return await self._list_emails(folder, min(limit, MAX_LIST_EMAILS), unread_only)
if operation == "read_email":
if not email_id:
return ToolResult(success=False, error="email_id is required for read_email")
return await self._read_email(folder, email_id)
if operation == "list_whitelist":
return await self._list_whitelist()
if operation == "send_email":
# Normalise to → list[str]
if isinstance(to, list):
recipients = [r.strip() for r in to if r.strip()]
elif isinstance(to, str) and to.strip():
recipients = [to.strip()]
else:
recipients = []
if not (recipients and subject and (body or html_body)):
return ToolResult(success=False, error="to, subject, and body (or html_body) are required for send_email")
return await self._send_email(recipients, subject, body, html_body, reply_to_id)
return ToolResult(success=False, error=f"Unknown operation: {operation!r}")
# ── IMAP ──────────────────────────────────────────────────────────────────
async def _list_emails(self, folder: str, limit: int, unread_only: bool) -> ToolResult:
try:
imap_host, _, username, password, _ = await self._load_credentials()
except RuntimeError as e:
return ToolResult(success=False, error=str(e))
try:
with imapclient.IMAPClient(imap_host, ssl=True, port=993) as client:
client.login(username, password)
client.select_folder(folder, readonly=True)
criteria = ["UNSEEN"] if unread_only else ["ALL"]
uids = client.search(criteria)
# Most recent first, limited
uids = list(reversed(uids))[:limit]
if not uids:
return ToolResult(success=True, data={"emails": [], "count": 0})
messages = client.fetch(uids, ["ENVELOPE", "FLAGS", "RFC822.SIZE"])
emails = []
for uid, data in messages.items():
env = data.get(b"ENVELOPE")
if not env:
continue
from_addr = _format_address(env.from_) if env.from_ else ""
emails.append({
"id": str(uid),
"from": from_addr,
"subject": _decode_header(env.subject),
"date": str(env.date) if env.date else "",
"unread": b"\\Seen" not in (data.get(b"FLAGS") or []),
"size_bytes": data.get(b"RFC822.SIZE", 0),
})
# Sort by date desc (approximate — ENVELOPE date isn't always reliable)
return ToolResult(
success=True,
data={"emails": emails, "count": len(emails)},
)
except Exception as e:
return ToolResult(success=False, error=f"IMAP error: {e}")
async def _read_email(self, folder: str, email_id: str) -> ToolResult:
try:
imap_host, _, username, password, _ = await self._load_credentials()
except RuntimeError as e:
return ToolResult(success=False, error=str(e))
try:
uid = int(email_id)
except ValueError:
return ToolResult(success=False, error=f"Invalid email_id: {email_id!r}")
try:
with imapclient.IMAPClient(imap_host, ssl=True, port=993) as client:
client.login(username, password)
client.select_folder(folder, readonly=True)
messages = client.fetch([uid], ["RFC822"])
if not messages or uid not in messages:
return ToolResult(success=False, error=f"Email {email_id} not found")
raw = messages[uid][b"RFC822"]
msg = email_lib.message_from_bytes(raw)
from_addr = msg.get("From", "")
subject = _decode_header(msg.get("Subject", ""))
date = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
body_text = _extract_email_body(msg)
# Truncate body
truncated = False
if await is_option_enabled("system:security_truncation_enabled"):
max_body = await get_content_limit("system:security_max_email_chars", _DEFAULT_MAX_EMAIL_CHARS)
if len(body_text) > max_body:
body_text = body_text[:max_body]
truncated = True
# Truncate subject
max_subj = await get_content_limit("system:security_max_subject_chars", _DEFAULT_MAX_SUBJECT_CHARS)
if len(subject) > max_subj:
subject = subject[:max_subj] + " [subject truncated]"
elif len(body_text) > MAX_BODY_CHARS:
body_text = body_text[:MAX_BODY_CHARS]
truncated = True
# Sanitise — critical security step (also sanitises subject)
body_text = await sanitize_external_content(body_text, source="email")
subject = await sanitize_external_content(subject, source="email_subject")
return ToolResult(
success=True,
data={
"id": email_id,
"from": from_addr,
"subject": subject,
"date": date,
"message_id": message_id,
"body": body_text,
"truncated": truncated,
},
)
except Exception as e:
return ToolResult(success=False, error=f"IMAP error: {e}")
# ── SMTP ──────────────────────────────────────────────────────────────────
async def _list_whitelist(self) -> ToolResult:
from ..database import email_whitelist_store
entries = await email_whitelist_store.list()
return ToolResult(
success=True,
data={"recipients": [e["email"] for e in entries], "count": len(entries)},
)
async def _send_email(
self,
to: list[str],
subject: str,
body: str,
html_body: str = "",
reply_to_id: str = "",
) -> ToolResult:
# Security: enforce whitelist + rate limit for every recipient
try:
for addr in to:
await assert_recipient_allowed(addr)
await assert_email_rate_limit(addr)
except SecurityError as e:
return ToolResult(success=False, error=str(e))
try:
_, smtp_host, username, password, smtp_port = await self._load_credentials()
except RuntimeError as e:
return ToolResult(success=False, error=str(e))
# Build MIME message
msg = MIMEMultipart("alternative")
msg["From"] = username
msg["To"] = ", ".join(to)
msg["Subject"] = subject
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
if reply_to_id:
msg["In-Reply-To"] = reply_to_id
msg["References"] = reply_to_id
# Plain text
msg.attach(MIMEText(body, "plain", "utf-8"))
# HTML version — use provided html_body if given, otherwise wrap plain text
if not html_body:
html_body = f"<html><body><pre style='font-family:sans-serif'>{body}</pre></body></html>"
msg.attach(MIMEText(html_body, "html", "utf-8"))
try:
if smtp_port == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=context, timeout=10) as smtp:
smtp.login(username, password)
smtp.sendmail(username, to, msg.as_bytes())
else:
with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.login(username, password)
smtp.sendmail(username, to, msg.as_bytes())
return ToolResult(
success=True,
data={"sent": True, "to": to, "subject": subject},
)
except smtplib.SMTPAuthenticationError:
return ToolResult(success=False, error="SMTP authentication failed. Check mailcow_password.")
except smtplib.SMTPException as e:
return ToolResult(success=False, error=f"SMTP error: {e}")
except Exception as e:
return ToolResult(success=False, error=f"Send error: {e}")
def confirmation_description(self, to=None, subject: str = "", body: str = "", **kwargs) -> str:
if isinstance(to, list):
to_str = ", ".join(to)
else:
to_str = to or ""
return f"Send email to {to_str}\nSubject: {subject}\n\n{body[:200]}..."
# ── Helpers ───────────────────────────────────────────────────────────────────
def _decode_header(value) -> str:
"""Decode IMAP header value (may be bytes or string)."""
if value is None:
return ""
if isinstance(value, bytes):
try:
return value.decode("utf-8", errors="replace")
except Exception:
return str(value)
return str(value)
def _format_address(addresses) -> str:
"""Format IMAP ENVELOPE address list to 'Name <email>' string."""
if not addresses:
return ""
addr = addresses[0]
name = _decode_header(addr.name) if addr.name else ""
mailbox = _decode_header(addr.mailbox) if addr.mailbox else ""
host = _decode_header(addr.host) if addr.host else ""
email_addr = f"{mailbox}@{host}" if host else mailbox
return f"{name} <{email_addr}>" if name else email_addr
def _extract_email_body(msg: email_lib.message.Message) -> str:
"""Extract plain text from email, stripping HTML if needed."""
plain_parts = []
html_parts = []
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
charset = part.get_content_charset() or "utf-8"
if ct == "text/plain":
try:
plain_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
except Exception:
pass
elif ct == "text/html":
try:
html_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
except Exception:
pass
else:
ct = msg.get_content_type()
charset = msg.get_content_charset() or "utf-8"
payload = msg.get_payload(decode=True) or b""
text = payload.decode(charset, errors="replace")
if ct == "text/html":
html_parts.append(text)
else:
plain_parts.append(text)
if plain_parts:
return "\n".join(plain_parts)
if html_parts:
soup = BeautifulSoup("\n".join(html_parts), "html.parser")
return soup.get_text(separator="\n")
return ""