398 lines
16 KiB
Python
398 lines
16 KiB
Python
"""
|
|
tools/email_tool.py — IMAP email reading + SMTP sending.
|
|
|
|
Read operations: list_emails, read_email — no confirmation required.
|
|
Send operation: send_email — whitelisted recipients only, requires confirmation.
|
|
|
|
Prompt injection guard: all email body text is sanitised before returning to agent.
|
|
Max body length: 10,000 characters (truncated with notice).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import email as email_lib
|
|
import smtplib
|
|
import ssl
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.utils import formatdate, make_msgid, parseaddr
|
|
|
|
import imapclient
|
|
from bs4 import BeautifulSoup
|
|
|
|
from ..database import credential_store
|
|
from ..security import SecurityError, assert_email_rate_limit, assert_recipient_allowed, sanitize_external_content
|
|
from ..security_screening import get_content_limit, is_option_enabled
|
|
from .base import BaseTool, ToolResult
|
|
|
|
MAX_BODY_CHARS = 10_000 # legacy fallback when truncation option disabled
|
|
_DEFAULT_MAX_EMAIL_CHARS = 6_000 # default when truncation option enabled
|
|
_DEFAULT_MAX_SUBJECT_CHARS = 200 # default subject limit when truncation option enabled
|
|
MAX_LIST_EMAILS = 50
|
|
|
|
|
|
class EmailTool(BaseTool):
|
|
name = "email"
|
|
description = (
|
|
"Read and send emails via IMAP/SMTP (Mailcow). "
|
|
"Operations: list_emails (list inbox), read_email (read full message), "
|
|
"send_email (send to one or more whitelisted recipients — requires confirmation), "
|
|
"list_whitelist (return all approved recipient addresses). "
|
|
"Email bodies are sanitised before being returned."
|
|
)
|
|
input_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"operation": {
|
|
"type": "string",
|
|
"enum": ["list_emails", "read_email", "send_email", "list_whitelist"],
|
|
"description": "The email operation to perform. list_whitelist returns all approved recipient addresses.",
|
|
},
|
|
"folder": {
|
|
"type": "string",
|
|
"description": "IMAP folder (default: INBOX)",
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": f"Max emails to list (default 20, max {MAX_LIST_EMAILS})",
|
|
},
|
|
"unread_only": {
|
|
"type": "boolean",
|
|
"description": "Only list unread emails (default false)",
|
|
},
|
|
"email_id": {
|
|
"type": "string",
|
|
"description": "Email UID for read_email",
|
|
},
|
|
"to": {
|
|
"anyOf": [
|
|
{"type": "string"},
|
|
{"type": "array", "items": {"type": "string"}},
|
|
],
|
|
"description": "Recipient address or list of addresses for send_email (all must be whitelisted)",
|
|
},
|
|
"subject": {
|
|
"type": "string",
|
|
"description": "Email subject for send_email",
|
|
},
|
|
"body": {
|
|
"type": "string",
|
|
"description": "Email body text (plain text) for send_email",
|
|
},
|
|
"html_body": {
|
|
"type": "string",
|
|
"description": "Full HTML email body for send_email. If provided, used as the HTML part instead of the plain-text fallback wrapper. Include complete <html>...</html> with inline <style>.",
|
|
},
|
|
"reply_to_id": {
|
|
"type": "string",
|
|
"description": "Email UID to reply to (sets In-Reply-To header)",
|
|
},
|
|
},
|
|
"required": ["operation"],
|
|
}
|
|
requires_confirmation = False # only send_email requires it — checked in execute()
|
|
allowed_in_scheduled_tasks = True
|
|
|
|
async def _load_credentials(self) -> tuple[str, str, str, str, int]:
|
|
"""Returns (imap_host, smtp_host, username, password, smtp_port)."""
|
|
base_host = await credential_store.require("mailcow_host")
|
|
username = await credential_store.require("mailcow_username")
|
|
password = await credential_store.require("mailcow_password")
|
|
imap_host = await credential_store.get("mailcow_imap_host") or base_host
|
|
smtp_host = await credential_store.get("mailcow_smtp_host") or base_host
|
|
smtp_port = int(await credential_store.get("mailcow_smtp_port") or "465")
|
|
return imap_host, smtp_host, username, password, smtp_port
|
|
|
|
async def execute(
|
|
self,
|
|
operation: str,
|
|
folder: str = "INBOX",
|
|
limit: int = 20,
|
|
unread_only: bool = False,
|
|
email_id: str = "",
|
|
to=None,
|
|
subject: str = "",
|
|
body: str = "",
|
|
html_body: str = "",
|
|
reply_to_id: str = "",
|
|
**kwargs,
|
|
) -> ToolResult:
|
|
if operation == "list_emails":
|
|
return await self._list_emails(folder, min(limit, MAX_LIST_EMAILS), unread_only)
|
|
if operation == "read_email":
|
|
if not email_id:
|
|
return ToolResult(success=False, error="email_id is required for read_email")
|
|
return await self._read_email(folder, email_id)
|
|
if operation == "list_whitelist":
|
|
return await self._list_whitelist()
|
|
if operation == "send_email":
|
|
# Normalise to → list[str]
|
|
if isinstance(to, list):
|
|
recipients = [r.strip() for r in to if r.strip()]
|
|
elif isinstance(to, str) and to.strip():
|
|
recipients = [to.strip()]
|
|
else:
|
|
recipients = []
|
|
if not (recipients and subject and (body or html_body)):
|
|
return ToolResult(success=False, error="to, subject, and body (or html_body) are required for send_email")
|
|
return await self._send_email(recipients, subject, body, html_body, reply_to_id)
|
|
|
|
return ToolResult(success=False, error=f"Unknown operation: {operation!r}")
|
|
|
|
# ── IMAP ──────────────────────────────────────────────────────────────────
|
|
|
|
async def _list_emails(self, folder: str, limit: int, unread_only: bool) -> ToolResult:
|
|
try:
|
|
imap_host, _, username, password, _ = await self._load_credentials()
|
|
except RuntimeError as e:
|
|
return ToolResult(success=False, error=str(e))
|
|
|
|
try:
|
|
with imapclient.IMAPClient(imap_host, ssl=True, port=993) as client:
|
|
client.login(username, password)
|
|
client.select_folder(folder, readonly=True)
|
|
|
|
criteria = ["UNSEEN"] if unread_only else ["ALL"]
|
|
uids = client.search(criteria)
|
|
|
|
# Most recent first, limited
|
|
uids = list(reversed(uids))[:limit]
|
|
if not uids:
|
|
return ToolResult(success=True, data={"emails": [], "count": 0})
|
|
|
|
messages = client.fetch(uids, ["ENVELOPE", "FLAGS", "RFC822.SIZE"])
|
|
emails = []
|
|
for uid, data in messages.items():
|
|
env = data.get(b"ENVELOPE")
|
|
if not env:
|
|
continue
|
|
|
|
from_addr = _format_address(env.from_) if env.from_ else ""
|
|
emails.append({
|
|
"id": str(uid),
|
|
"from": from_addr,
|
|
"subject": _decode_header(env.subject),
|
|
"date": str(env.date) if env.date else "",
|
|
"unread": b"\\Seen" not in (data.get(b"FLAGS") or []),
|
|
"size_bytes": data.get(b"RFC822.SIZE", 0),
|
|
})
|
|
|
|
# Sort by date desc (approximate — ENVELOPE date isn't always reliable)
|
|
return ToolResult(
|
|
success=True,
|
|
data={"emails": emails, "count": len(emails)},
|
|
)
|
|
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"IMAP error: {e}")
|
|
|
|
async def _read_email(self, folder: str, email_id: str) -> ToolResult:
|
|
try:
|
|
imap_host, _, username, password, _ = await self._load_credentials()
|
|
except RuntimeError as e:
|
|
return ToolResult(success=False, error=str(e))
|
|
|
|
try:
|
|
uid = int(email_id)
|
|
except ValueError:
|
|
return ToolResult(success=False, error=f"Invalid email_id: {email_id!r}")
|
|
|
|
try:
|
|
with imapclient.IMAPClient(imap_host, ssl=True, port=993) as client:
|
|
client.login(username, password)
|
|
client.select_folder(folder, readonly=True)
|
|
|
|
messages = client.fetch([uid], ["RFC822"])
|
|
if not messages or uid not in messages:
|
|
return ToolResult(success=False, error=f"Email {email_id} not found")
|
|
|
|
raw = messages[uid][b"RFC822"]
|
|
msg = email_lib.message_from_bytes(raw)
|
|
|
|
from_addr = msg.get("From", "")
|
|
subject = _decode_header(msg.get("Subject", ""))
|
|
date = msg.get("Date", "")
|
|
message_id = msg.get("Message-ID", "")
|
|
body_text = _extract_email_body(msg)
|
|
|
|
# Truncate body
|
|
truncated = False
|
|
if await is_option_enabled("system:security_truncation_enabled"):
|
|
max_body = await get_content_limit("system:security_max_email_chars", _DEFAULT_MAX_EMAIL_CHARS)
|
|
if len(body_text) > max_body:
|
|
body_text = body_text[:max_body]
|
|
truncated = True
|
|
# Truncate subject
|
|
max_subj = await get_content_limit("system:security_max_subject_chars", _DEFAULT_MAX_SUBJECT_CHARS)
|
|
if len(subject) > max_subj:
|
|
subject = subject[:max_subj] + " [subject truncated]"
|
|
elif len(body_text) > MAX_BODY_CHARS:
|
|
body_text = body_text[:MAX_BODY_CHARS]
|
|
truncated = True
|
|
|
|
# Sanitise — critical security step (also sanitises subject)
|
|
body_text = await sanitize_external_content(body_text, source="email")
|
|
subject = await sanitize_external_content(subject, source="email_subject")
|
|
|
|
return ToolResult(
|
|
success=True,
|
|
data={
|
|
"id": email_id,
|
|
"from": from_addr,
|
|
"subject": subject,
|
|
"date": date,
|
|
"message_id": message_id,
|
|
"body": body_text,
|
|
"truncated": truncated,
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"IMAP error: {e}")
|
|
|
|
# ── SMTP ──────────────────────────────────────────────────────────────────
|
|
|
|
async def _list_whitelist(self) -> ToolResult:
|
|
from ..database import email_whitelist_store
|
|
entries = await email_whitelist_store.list()
|
|
return ToolResult(
|
|
success=True,
|
|
data={"recipients": [e["email"] for e in entries], "count": len(entries)},
|
|
)
|
|
|
|
async def _send_email(
|
|
self,
|
|
to: list[str],
|
|
subject: str,
|
|
body: str,
|
|
html_body: str = "",
|
|
reply_to_id: str = "",
|
|
) -> ToolResult:
|
|
# Security: enforce whitelist + rate limit for every recipient
|
|
try:
|
|
for addr in to:
|
|
await assert_recipient_allowed(addr)
|
|
await assert_email_rate_limit(addr)
|
|
except SecurityError as e:
|
|
return ToolResult(success=False, error=str(e))
|
|
|
|
try:
|
|
_, smtp_host, username, password, smtp_port = await self._load_credentials()
|
|
except RuntimeError as e:
|
|
return ToolResult(success=False, error=str(e))
|
|
|
|
# Build MIME message
|
|
msg = MIMEMultipart("alternative")
|
|
msg["From"] = username
|
|
msg["To"] = ", ".join(to)
|
|
msg["Subject"] = subject
|
|
msg["Date"] = formatdate(localtime=True)
|
|
msg["Message-ID"] = make_msgid()
|
|
|
|
if reply_to_id:
|
|
msg["In-Reply-To"] = reply_to_id
|
|
msg["References"] = reply_to_id
|
|
|
|
# Plain text
|
|
msg.attach(MIMEText(body, "plain", "utf-8"))
|
|
# HTML version — use provided html_body if given, otherwise wrap plain text
|
|
if not html_body:
|
|
html_body = f"<html><body><pre style='font-family:sans-serif'>{body}</pre></body></html>"
|
|
msg.attach(MIMEText(html_body, "html", "utf-8"))
|
|
|
|
try:
|
|
if smtp_port == 465:
|
|
context = ssl.create_default_context()
|
|
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=context, timeout=10) as smtp:
|
|
smtp.login(username, password)
|
|
smtp.sendmail(username, to, msg.as_bytes())
|
|
else:
|
|
with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as smtp:
|
|
smtp.ehlo()
|
|
smtp.starttls()
|
|
smtp.login(username, password)
|
|
smtp.sendmail(username, to, msg.as_bytes())
|
|
|
|
return ToolResult(
|
|
success=True,
|
|
data={"sent": True, "to": to, "subject": subject},
|
|
)
|
|
|
|
except smtplib.SMTPAuthenticationError:
|
|
return ToolResult(success=False, error="SMTP authentication failed. Check mailcow_password.")
|
|
except smtplib.SMTPException as e:
|
|
return ToolResult(success=False, error=f"SMTP error: {e}")
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Send error: {e}")
|
|
|
|
def confirmation_description(self, to=None, subject: str = "", body: str = "", **kwargs) -> str:
|
|
if isinstance(to, list):
|
|
to_str = ", ".join(to)
|
|
else:
|
|
to_str = to or ""
|
|
return f"Send email to {to_str}\nSubject: {subject}\n\n{body[:200]}..."
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _decode_header(value) -> str:
|
|
"""Decode IMAP header value (may be bytes or string)."""
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, bytes):
|
|
try:
|
|
return value.decode("utf-8", errors="replace")
|
|
except Exception:
|
|
return str(value)
|
|
return str(value)
|
|
|
|
|
|
def _format_address(addresses) -> str:
|
|
"""Format IMAP ENVELOPE address list to 'Name <email>' string."""
|
|
if not addresses:
|
|
return ""
|
|
addr = addresses[0]
|
|
name = _decode_header(addr.name) if addr.name else ""
|
|
mailbox = _decode_header(addr.mailbox) if addr.mailbox else ""
|
|
host = _decode_header(addr.host) if addr.host else ""
|
|
email_addr = f"{mailbox}@{host}" if host else mailbox
|
|
return f"{name} <{email_addr}>" if name else email_addr
|
|
|
|
|
|
def _extract_email_body(msg: email_lib.message.Message) -> str:
|
|
"""Extract plain text from email, stripping HTML if needed."""
|
|
plain_parts = []
|
|
html_parts = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
charset = part.get_content_charset() or "utf-8"
|
|
if ct == "text/plain":
|
|
try:
|
|
plain_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
|
|
except Exception:
|
|
pass
|
|
elif ct == "text/html":
|
|
try:
|
|
html_parts.append(part.get_payload(decode=True).decode(charset, errors="replace"))
|
|
except Exception:
|
|
pass
|
|
else:
|
|
ct = msg.get_content_type()
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
payload = msg.get_payload(decode=True) or b""
|
|
text = payload.decode(charset, errors="replace")
|
|
if ct == "text/html":
|
|
html_parts.append(text)
|
|
else:
|
|
plain_parts.append(text)
|
|
|
|
if plain_parts:
|
|
return "\n".join(plain_parts)
|
|
|
|
if html_parts:
|
|
soup = BeautifulSoup("\n".join(html_parts), "html.parser")
|
|
return soup.get_text(separator="\n")
|
|
|
|
return ""
|