- Add admin DAV tab (rename from CalDAV/CardDAV) and Pushover tab
- Add per-user Pushover tab (User Key only; App Token stays admin-managed)
- Remove system-wide CalDAV/CardDAV fallback — per-user config only
- Rewrite contacts_tool.py using httpx directly (caldav 2.x dropped AddressBook)
- Fix CardDAV REPORT/PROPFIND using SOGo URL pattern
- Fix CalDAV/CardDAV test endpoints (POST method, URL scheme normalization)
- Fix Show Password button — API now returns actual credential values
- Convert Credentials tab to generic key-value store; dedicated keys
(CalDAV, Pushover, trusted_proxy) excluded via _DEDICATED_CRED_KEYS
471 lines
19 KiB
Python
471 lines
19 KiB
Python
"""
|
|
tools/caldav_tool.py — CalDAV calendar access (Mailcow / SOGo).
|
|
|
|
Credential keys (set via /settings):
|
|
mailcow_host — e.g. mail.yourdomain.com
|
|
mailcow_username — e.g. you@yourdomain.com
|
|
mailcow_password — account or app password
|
|
caldav_calendar_name — optional display-name filter; if omitted uses first calendar found
|
|
|
|
Uses principal discovery (/SOGo/dav/<username>/) to find calendars automatically.
|
|
No hardcoded URL path — works regardless of internal calendar slug.
|
|
|
|
All datetimes are stored as UTC internally. Display times are converted
|
|
to the configured timezone (Europe/Oslo by default).
|
|
|
|
create/update/delete require user confirmation.
|
|
Max events returned: 100.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import traceback
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import caldav
|
|
import vobject
|
|
from dateutil import parser as dateutil_parser
|
|
|
|
from ..config import settings
|
|
from ..context_vars import current_user
|
|
from ..database import credential_store
|
|
from .base import BaseTool, ToolResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def _get_caldav_config(user_id: str | None = None) -> dict:
|
|
"""
|
|
Per-user CalDAV config lookup — no system-wide fallback.
|
|
|
|
Each user (including admin) configures their own CalDAV credentials in Settings → CalDAV.
|
|
Returns a dict with url, username, password, calendar_name (any may be None/empty).
|
|
"""
|
|
if user_id:
|
|
from ..database import user_settings_store
|
|
return {
|
|
"url": await user_settings_store.get(user_id, "caldav_url"),
|
|
"username": await user_settings_store.get(user_id, "caldav_username"),
|
|
"password": await user_settings_store.get(user_id, "caldav_password"),
|
|
"calendar_name": await user_settings_store.get(user_id, "caldav_calendar_name"),
|
|
}
|
|
|
|
return {"url": None, "username": None, "password": None, "calendar_name": None}
|
|
|
|
MAX_EVENTS = 100
|
|
|
|
|
|
class CalDAVTool(BaseTool):
|
|
name = "caldav"
|
|
description = (
|
|
"Manage calendar events via CalDAV (Mailcow/SOGo). "
|
|
"Operations: list_events, get_event, create_event, update_event, delete_event. "
|
|
"create_event, update_event, and delete_event require user confirmation. "
|
|
"All dates should be in ISO8601 format (e.g. '2026-02-17T14:00:00+01:00')."
|
|
)
|
|
input_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"operation": {
|
|
"type": "string",
|
|
"enum": ["list_events", "get_event", "create_event", "update_event", "delete_event"],
|
|
},
|
|
"start_date": {
|
|
"type": "string",
|
|
"description": "Start of date range for list_events (ISO8601)",
|
|
},
|
|
"end_date": {
|
|
"type": "string",
|
|
"description": "End of date range for list_events (ISO8601). Default: 30 days after start",
|
|
},
|
|
"event_id": {
|
|
"type": "string",
|
|
"description": "Event UID for get/update/delete operations",
|
|
},
|
|
"summary": {
|
|
"type": "string",
|
|
"description": "Event title for create/update",
|
|
},
|
|
"start": {
|
|
"type": "string",
|
|
"description": "Event start datetime (ISO8601) for create/update",
|
|
},
|
|
"end": {
|
|
"type": "string",
|
|
"description": "Event end datetime (ISO8601) for create/update",
|
|
},
|
|
"description": {
|
|
"type": "string",
|
|
"description": "Event description/notes for create/update",
|
|
},
|
|
"location": {
|
|
"type": "string",
|
|
"description": "Event location for create/update",
|
|
},
|
|
},
|
|
"required": ["operation"],
|
|
}
|
|
requires_confirmation = False # checked per-operation in execute()
|
|
allowed_in_scheduled_tasks = True
|
|
|
|
async def _get_client(self) -> tuple[caldav.DAVClient, caldav.Calendar]:
|
|
"""Return (client, calendar). Raises RuntimeError if credentials missing."""
|
|
# Resolve current user from context (may be None for scheduled/agent runs)
|
|
user = current_user.get()
|
|
user_id = user.id if user else None
|
|
|
|
cfg = await _get_caldav_config(user_id=user_id)
|
|
url = cfg.get("url")
|
|
username = cfg.get("username")
|
|
password = cfg.get("password")
|
|
calendar_name = cfg.get("calendar_name") or ""
|
|
|
|
if not url or not username or not password:
|
|
raise RuntimeError(
|
|
"CalDAV credentials not configured. "
|
|
"Set them in Settings → CalDAV / CardDAV."
|
|
)
|
|
|
|
# Normalise scheme — users often enter just the hostname
|
|
if not url.startswith(("http://", "https://")):
|
|
url = "https://" + url
|
|
|
|
# Build principal URL: if the stored URL is already the full principal URL use it directly;
|
|
# otherwise append the SOGo-style path.
|
|
if "/SOGo/dav/" in url or url.rstrip("/").endswith(username):
|
|
principal_url = url.rstrip("/") + "/"
|
|
else:
|
|
principal_url = f"{url.rstrip('/')}/SOGo/dav/{username}/"
|
|
if calendar_name:
|
|
logger.info("[caldav] Connecting — principal_url=%s username=%s calendar_filter=%r",
|
|
principal_url, username, calendar_name)
|
|
else:
|
|
logger.info("[caldav] Connecting — principal_url=%s username=%s "
|
|
"calendar_filter=(none set — will use first found; "
|
|
"set 'caldav_calendar_name' credential to pick a specific one)",
|
|
principal_url, username)
|
|
|
|
client = caldav.DAVClient(url=principal_url, username=username, password=password)
|
|
|
|
logger.debug("[caldav] Fetching principal…")
|
|
principal = client.principal()
|
|
logger.debug("[caldav] Principal URL: %s", principal.url)
|
|
|
|
logger.debug("[caldav] Discovering calendars…")
|
|
calendars = principal.calendars()
|
|
if not calendars:
|
|
logger.error("[caldav] No calendars found for %s", username)
|
|
raise RuntimeError("No calendars found for this account")
|
|
|
|
logger.info("[caldav] Found %d calendar(s): %s",
|
|
len(calendars),
|
|
", ".join(f"{c.name!r} ({c.url})" for c in calendars))
|
|
|
|
if calendar_name:
|
|
needle = calendar_name.lower()
|
|
# Exact match first, then substring fallback
|
|
match = next((c for c in calendars if (c.name or "").lower() == needle), None)
|
|
if match is None:
|
|
match = next((c for c in calendars if needle in (c.name or "").lower()), None)
|
|
if match is None:
|
|
names = ", ".join(c.name or "?" for c in calendars)
|
|
logger.error("[caldav] Calendar %r not found. Available: %s", calendar_name, names)
|
|
raise RuntimeError(
|
|
f"Calendar '{calendar_name}' not found. Available: {names}"
|
|
)
|
|
logger.info("[caldav] Using calendar %r url=%s", match.name, match.url)
|
|
return client, match
|
|
|
|
chosen = calendars[0]
|
|
logger.info("[caldav] Using first calendar: %r url=%s", chosen.name, chosen.url)
|
|
return client, chosen
|
|
|
|
async def execute(
|
|
self,
|
|
operation: str,
|
|
start_date: str = "",
|
|
end_date: str = "",
|
|
event_id: str = "",
|
|
summary: str = "",
|
|
start: str = "",
|
|
end: str = "",
|
|
description: str = "",
|
|
location: str = "",
|
|
**kwargs,
|
|
) -> ToolResult:
|
|
logger.info("[caldav] execute operation=%s summary=%r start=%r end=%r event_id=%r",
|
|
operation, summary, start, end, event_id)
|
|
|
|
try:
|
|
_, calendar = await self._get_client()
|
|
except RuntimeError as e:
|
|
logger.error("[caldav] Connection/credential error: %s", e)
|
|
return ToolResult(success=False, error=str(e))
|
|
except Exception as e:
|
|
logger.error("[caldav] Unexpected connection error: %s\n%s", e, traceback.format_exc())
|
|
return ToolResult(success=False, error=f"CalDAV connection error: {e}")
|
|
|
|
if operation == "list_events":
|
|
return self._list_events(calendar, start_date, end_date)
|
|
if operation == "get_event":
|
|
if not event_id:
|
|
return ToolResult(success=False, error="event_id is required for get_event")
|
|
return self._get_event(calendar, event_id)
|
|
if operation == "create_event":
|
|
if not (summary and start and end):
|
|
return ToolResult(success=False, error="summary, start, and end are required for create_event")
|
|
return self._create_event(calendar, summary, start, end, description, location)
|
|
if operation == "update_event":
|
|
if not event_id:
|
|
return ToolResult(success=False, error="event_id is required for update_event")
|
|
return self._update_event(calendar, event_id, summary, start, end, description, location)
|
|
if operation == "delete_event":
|
|
if not event_id:
|
|
return ToolResult(success=False, error="event_id is required for delete_event")
|
|
return self._delete_event(calendar, event_id)
|
|
|
|
logger.warning("[caldav] Unknown operation: %r", operation)
|
|
return ToolResult(success=False, error=f"Unknown operation: {operation!r}")
|
|
|
|
# ── Read operations ───────────────────────────────────────────────────────
|
|
|
|
def _list_events(self, calendar: caldav.Calendar, start_date: str, end_date: str) -> ToolResult:
|
|
try:
|
|
if start_date:
|
|
start_dt = dateutil_parser.parse(start_date).replace(tzinfo=timezone.utc)
|
|
else:
|
|
start_dt = datetime.now(timezone.utc)
|
|
|
|
if end_date:
|
|
end_dt = dateutil_parser.parse(end_date).replace(tzinfo=timezone.utc)
|
|
else:
|
|
end_dt = start_dt + timedelta(days=30)
|
|
|
|
logger.info("[caldav] list_events range=%s → %s calendar_url=%s",
|
|
start_dt.isoformat(), end_dt.isoformat(), calendar.url)
|
|
|
|
events = calendar.date_search(start=start_dt, end=end_dt, expand=True)
|
|
events = events[:MAX_EVENTS]
|
|
|
|
logger.info("[caldav] list_events returned %d event(s)", len(events))
|
|
|
|
result = []
|
|
for event in events:
|
|
summary = _get_property(event, "summary", "No title")
|
|
ev_start = _get_dt_str(event, "dtstart")
|
|
ev_end = _get_dt_str(event, "dtend")
|
|
logger.debug("[caldav] event: %r start=%s end=%s", summary, ev_start, ev_end)
|
|
result.append({
|
|
"id": _get_uid(event),
|
|
"summary": summary,
|
|
"start": ev_start,
|
|
"end": ev_end,
|
|
"location": _get_property(event, "location", ""),
|
|
"description_preview": _get_property(event, "description", "")[:100],
|
|
})
|
|
|
|
return ToolResult(
|
|
success=True,
|
|
data={"events": result, "count": len(result)},
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("[caldav] list_events failed: %s\n%s", e, traceback.format_exc())
|
|
return ToolResult(success=False, error=f"CalDAV list error: {e}")
|
|
|
|
def _get_event(self, calendar: caldav.Calendar, event_id: str) -> ToolResult:
|
|
logger.info("[caldav] get_event event_id=%s", event_id)
|
|
try:
|
|
event = _find_event(calendar, event_id)
|
|
if event is None:
|
|
logger.warning("[caldav] get_event: event not found: %s", event_id)
|
|
return ToolResult(success=False, error=f"Event not found: {event_id}")
|
|
|
|
data = {
|
|
"id": _get_uid(event),
|
|
"summary": _get_property(event, "summary", ""),
|
|
"start": _get_dt_str(event, "dtstart"),
|
|
"end": _get_dt_str(event, "dtend"),
|
|
"location": _get_property(event, "location", ""),
|
|
"description": _get_property(event, "description", ""),
|
|
}
|
|
logger.info("[caldav] get_event found: %r start=%s", data["summary"], data["start"])
|
|
return ToolResult(success=True, data=data)
|
|
except Exception as e:
|
|
logger.error("[caldav] get_event failed: %s\n%s", e, traceback.format_exc())
|
|
return ToolResult(success=False, error=f"CalDAV get error: {e}")
|
|
|
|
# ── Write operations ──────────────────────────────────────────────────────
|
|
|
|
def _create_event(
|
|
self,
|
|
calendar: caldav.Calendar,
|
|
summary: str,
|
|
start: str,
|
|
end: str,
|
|
description: str,
|
|
location: str,
|
|
) -> ToolResult:
|
|
import uuid
|
|
logger.info("[caldav] create_event summary=%r start=%s end=%s location=%r calendar_url=%s",
|
|
summary, start, end, location, calendar.url)
|
|
try:
|
|
start_dt = dateutil_parser.parse(start)
|
|
end_dt = dateutil_parser.parse(end)
|
|
logger.debug("[caldav] create_event parsed start_dt=%s end_dt=%s", start_dt, end_dt)
|
|
|
|
uid = str(uuid.uuid4())
|
|
ical = _build_ical(uid, summary, start_dt, end_dt, description, location)
|
|
logger.debug("[caldav] create_event ical payload:\n%s", ical)
|
|
|
|
calendar.add_event(ical)
|
|
logger.info("[caldav] create_event success uid=%s", uid)
|
|
|
|
return ToolResult(
|
|
success=True,
|
|
data={"created": True, "uid": uid, "summary": summary},
|
|
)
|
|
except Exception as e:
|
|
logger.error("[caldav] create_event failed: %s\n%s", e, traceback.format_exc())
|
|
return ToolResult(success=False, error=f"CalDAV create error: {e}")
|
|
|
|
def _update_event(
|
|
self,
|
|
calendar: caldav.Calendar,
|
|
event_id: str,
|
|
summary: str,
|
|
start: str,
|
|
end: str,
|
|
description: str,
|
|
location: str,
|
|
) -> ToolResult:
|
|
logger.info("[caldav] update_event event_id=%s summary=%r start=%s end=%s",
|
|
event_id, summary, start, end)
|
|
try:
|
|
event = _find_event(calendar, event_id)
|
|
if event is None:
|
|
logger.warning("[caldav] update_event: event not found: %s", event_id)
|
|
return ToolResult(success=False, error=f"Event not found: {event_id}")
|
|
|
|
vevent = event.vobject_instance.vevent
|
|
|
|
if summary:
|
|
vevent.summary.value = summary
|
|
if start:
|
|
vevent.dtstart.value = dateutil_parser.parse(start)
|
|
if end:
|
|
vevent.dtend.value = dateutil_parser.parse(end)
|
|
if description:
|
|
if hasattr(vevent, "description"):
|
|
vevent.description.value = description
|
|
else:
|
|
vevent.add("description").value = description
|
|
if location:
|
|
if hasattr(vevent, "location"):
|
|
vevent.location.value = location
|
|
else:
|
|
vevent.add("location").value = location
|
|
|
|
event.save()
|
|
logger.info("[caldav] update_event success uid=%s", event_id)
|
|
return ToolResult(success=True, data={"updated": True, "uid": event_id})
|
|
|
|
except Exception as e:
|
|
logger.error("[caldav] update_event failed: %s\n%s", e, traceback.format_exc())
|
|
return ToolResult(success=False, error=f"CalDAV update error: {e}")
|
|
|
|
def _delete_event(self, calendar: caldav.Calendar, event_id: str) -> ToolResult:
|
|
logger.info("[caldav] delete_event event_id=%s", event_id)
|
|
try:
|
|
event = _find_event(calendar, event_id)
|
|
if event is None:
|
|
logger.warning("[caldav] delete_event: event not found: %s", event_id)
|
|
return ToolResult(success=False, error=f"Event not found: {event_id}")
|
|
event.delete()
|
|
logger.info("[caldav] delete_event success uid=%s", event_id)
|
|
return ToolResult(success=True, data={"deleted": True, "uid": event_id})
|
|
except Exception as e:
|
|
logger.error("[caldav] delete_event failed: %s\n%s", e, traceback.format_exc())
|
|
return ToolResult(success=False, error=f"CalDAV delete error: {e}")
|
|
|
|
def confirmation_description(self, operation: str = "", summary: str = "", event_id: str = "", **kwargs) -> str:
|
|
if operation == "create_event":
|
|
start = kwargs.get("start", "")
|
|
return f"Create calendar event: '{summary}' at {start}"
|
|
if operation == "update_event":
|
|
return f"Update calendar event: {event_id}" + (f" → '{summary}'" if summary else "")
|
|
if operation == "delete_event":
|
|
return f"Permanently delete calendar event: {event_id}"
|
|
return f"{operation}: {event_id or summary}"
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _get_uid(event: caldav.Event) -> str:
|
|
try:
|
|
return str(event.vobject_instance.vevent.uid.value)
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _get_property(event: caldav.Event, prop: str, default: str = "") -> str:
|
|
try:
|
|
return str(getattr(event.vobject_instance.vevent, prop).value)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _get_dt_str(event: caldav.Event, prop: str) -> str:
|
|
try:
|
|
val = getattr(event.vobject_instance.vevent, prop).value
|
|
if isinstance(val, datetime):
|
|
return val.isoformat()
|
|
return str(val)
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _find_event(calendar: caldav.Calendar, uid: str) -> caldav.Event | None:
|
|
"""Find an event by UID. Returns None if not found."""
|
|
try:
|
|
# Try direct URL lookup first
|
|
for event in calendar.events():
|
|
if _get_uid(event) == uid:
|
|
return event
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _build_ical(
|
|
uid: str,
|
|
summary: str,
|
|
start: datetime,
|
|
end: datetime,
|
|
description: str,
|
|
location: str,
|
|
) -> str:
|
|
now = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
start_str = start.strftime("%Y%m%dT%H%M%S")
|
|
end_str = end.strftime("%Y%m%dT%H%M%S")
|
|
tz = "Europe/Oslo"
|
|
|
|
lines = [
|
|
"BEGIN:VCALENDAR",
|
|
"VERSION:2.0",
|
|
"PRODID:-//aide//aide//EN",
|
|
"BEGIN:VEVENT",
|
|
f"UID:{uid}",
|
|
f"DTSTAMP:{now}",
|
|
f"DTSTART;TZID={tz}:{start_str}",
|
|
f"DTEND;TZID={tz}:{end_str}",
|
|
f"SUMMARY:{summary}",
|
|
]
|
|
if description:
|
|
lines.append(f"DESCRIPTION:{description.replace(chr(10), '\\n')}")
|
|
if location:
|
|
lines.append(f"LOCATION:{location}")
|
|
lines += ["END:VEVENT", "END:VCALENDAR"]
|
|
return "\r\n".join(lines)
|