""" tools/caldav_tool.py — CalDAV calendar access (Mailcow / SOGo). Credential keys (set via /settings): mailcow_host — e.g. mail.yourdomain.com mailcow_username — e.g. you@yourdomain.com mailcow_password — account or app password caldav_calendar_name — optional display-name filter; if omitted uses first calendar found Uses principal discovery (/SOGo/dav//) to find calendars automatically. No hardcoded URL path — works regardless of internal calendar slug. All datetimes are stored as UTC internally. Display times are converted to the configured timezone (Europe/Oslo by default). create/update/delete require user confirmation. Max events returned: 100. """ from __future__ import annotations import logging import traceback from datetime import datetime, timedelta, timezone from typing import Any import caldav import vobject from dateutil import parser as dateutil_parser from ..config import settings from ..context_vars import current_user from ..database import credential_store from .base import BaseTool, ToolResult logger = logging.getLogger(__name__) async def _get_caldav_config(user_id: str | None = None) -> dict: """ Per-user CalDAV config lookup — no system-wide fallback. Each user (including admin) configures their own CalDAV credentials in Settings → CalDAV. Returns a dict with url, username, password, calendar_name (any may be None/empty). """ if user_id: from ..database import user_settings_store return { "url": await user_settings_store.get(user_id, "caldav_url"), "username": await user_settings_store.get(user_id, "caldav_username"), "password": await user_settings_store.get(user_id, "caldav_password"), "calendar_name": await user_settings_store.get(user_id, "caldav_calendar_name"), } return {"url": None, "username": None, "password": None, "calendar_name": None} MAX_EVENTS = 100 class CalDAVTool(BaseTool): name = "caldav" description = ( "Manage calendar events via CalDAV (Mailcow/SOGo). " "Operations: list_events, get_event, create_event, update_event, delete_event. " "create_event, update_event, and delete_event require user confirmation. " "All dates should be in ISO8601 format (e.g. '2026-02-17T14:00:00+01:00')." ) input_schema = { "type": "object", "properties": { "operation": { "type": "string", "enum": ["list_events", "get_event", "create_event", "update_event", "delete_event"], }, "start_date": { "type": "string", "description": "Start of date range for list_events (ISO8601)", }, "end_date": { "type": "string", "description": "End of date range for list_events (ISO8601). Default: 30 days after start", }, "event_id": { "type": "string", "description": "Event UID for get/update/delete operations", }, "summary": { "type": "string", "description": "Event title for create/update", }, "start": { "type": "string", "description": "Event start datetime (ISO8601) for create/update", }, "end": { "type": "string", "description": "Event end datetime (ISO8601) for create/update", }, "description": { "type": "string", "description": "Event description/notes for create/update", }, "location": { "type": "string", "description": "Event location for create/update", }, }, "required": ["operation"], } requires_confirmation = False # checked per-operation in execute() allowed_in_scheduled_tasks = True async def _get_client(self) -> tuple[caldav.DAVClient, caldav.Calendar]: """Return (client, calendar). Raises RuntimeError if credentials missing.""" # Resolve current user from context (may be None for scheduled/agent runs) user = current_user.get() user_id = user.id if user else None cfg = await _get_caldav_config(user_id=user_id) url = cfg.get("url") username = cfg.get("username") password = cfg.get("password") calendar_name = cfg.get("calendar_name") or "" if not url or not username or not password: raise RuntimeError( "CalDAV credentials not configured. " "Set them in Settings → CalDAV / CardDAV." ) # Normalise scheme — users often enter just the hostname if not url.startswith(("http://", "https://")): url = "https://" + url # Build principal URL: if the stored URL is already the full principal URL use it directly; # otherwise append the SOGo-style path. if "/SOGo/dav/" in url or url.rstrip("/").endswith(username): principal_url = url.rstrip("/") + "/" else: principal_url = f"{url.rstrip('/')}/SOGo/dav/{username}/" if calendar_name: logger.info("[caldav] Connecting — principal_url=%s username=%s calendar_filter=%r", principal_url, username, calendar_name) else: logger.info("[caldav] Connecting — principal_url=%s username=%s " "calendar_filter=(none set — will use first found; " "set 'caldav_calendar_name' credential to pick a specific one)", principal_url, username) client = caldav.DAVClient(url=principal_url, username=username, password=password) logger.debug("[caldav] Fetching principal…") principal = client.principal() logger.debug("[caldav] Principal URL: %s", principal.url) logger.debug("[caldav] Discovering calendars…") calendars = principal.calendars() if not calendars: logger.error("[caldav] No calendars found for %s", username) raise RuntimeError("No calendars found for this account") logger.info("[caldav] Found %d calendar(s): %s", len(calendars), ", ".join(f"{c.name!r} ({c.url})" for c in calendars)) if calendar_name: needle = calendar_name.lower() # Exact match first, then substring fallback match = next((c for c in calendars if (c.name or "").lower() == needle), None) if match is None: match = next((c for c in calendars if needle in (c.name or "").lower()), None) if match is None: names = ", ".join(c.name or "?" for c in calendars) logger.error("[caldav] Calendar %r not found. Available: %s", calendar_name, names) raise RuntimeError( f"Calendar '{calendar_name}' not found. Available: {names}" ) logger.info("[caldav] Using calendar %r url=%s", match.name, match.url) return client, match chosen = calendars[0] logger.info("[caldav] Using first calendar: %r url=%s", chosen.name, chosen.url) return client, chosen async def execute( self, operation: str, start_date: str = "", end_date: str = "", event_id: str = "", summary: str = "", start: str = "", end: str = "", description: str = "", location: str = "", **kwargs, ) -> ToolResult: logger.info("[caldav] execute operation=%s summary=%r start=%r end=%r event_id=%r", operation, summary, start, end, event_id) try: _, calendar = await self._get_client() except RuntimeError as e: logger.error("[caldav] Connection/credential error: %s", e) return ToolResult(success=False, error=str(e)) except Exception as e: logger.error("[caldav] Unexpected connection error: %s\n%s", e, traceback.format_exc()) return ToolResult(success=False, error=f"CalDAV connection error: {e}") if operation == "list_events": return self._list_events(calendar, start_date, end_date) if operation == "get_event": if not event_id: return ToolResult(success=False, error="event_id is required for get_event") return self._get_event(calendar, event_id) if operation == "create_event": if not (summary and start and end): return ToolResult(success=False, error="summary, start, and end are required for create_event") return self._create_event(calendar, summary, start, end, description, location) if operation == "update_event": if not event_id: return ToolResult(success=False, error="event_id is required for update_event") return self._update_event(calendar, event_id, summary, start, end, description, location) if operation == "delete_event": if not event_id: return ToolResult(success=False, error="event_id is required for delete_event") return self._delete_event(calendar, event_id) logger.warning("[caldav] Unknown operation: %r", operation) return ToolResult(success=False, error=f"Unknown operation: {operation!r}") # ── Read operations ─────────────────────────────────────────────────────── def _list_events(self, calendar: caldav.Calendar, start_date: str, end_date: str) -> ToolResult: try: if start_date: start_dt = dateutil_parser.parse(start_date).replace(tzinfo=timezone.utc) else: start_dt = datetime.now(timezone.utc) if end_date: end_dt = dateutil_parser.parse(end_date).replace(tzinfo=timezone.utc) else: end_dt = start_dt + timedelta(days=30) logger.info("[caldav] list_events range=%s → %s calendar_url=%s", start_dt.isoformat(), end_dt.isoformat(), calendar.url) events = calendar.date_search(start=start_dt, end=end_dt, expand=True) events = events[:MAX_EVENTS] logger.info("[caldav] list_events returned %d event(s)", len(events)) result = [] for event in events: summary = _get_property(event, "summary", "No title") ev_start = _get_dt_str(event, "dtstart") ev_end = _get_dt_str(event, "dtend") logger.debug("[caldav] event: %r start=%s end=%s", summary, ev_start, ev_end) result.append({ "id": _get_uid(event), "summary": summary, "start": ev_start, "end": ev_end, "location": _get_property(event, "location", ""), "description_preview": _get_property(event, "description", "")[:100], }) return ToolResult( success=True, data={"events": result, "count": len(result)}, ) except Exception as e: logger.error("[caldav] list_events failed: %s\n%s", e, traceback.format_exc()) return ToolResult(success=False, error=f"CalDAV list error: {e}") def _get_event(self, calendar: caldav.Calendar, event_id: str) -> ToolResult: logger.info("[caldav] get_event event_id=%s", event_id) try: event = _find_event(calendar, event_id) if event is None: logger.warning("[caldav] get_event: event not found: %s", event_id) return ToolResult(success=False, error=f"Event not found: {event_id}") data = { "id": _get_uid(event), "summary": _get_property(event, "summary", ""), "start": _get_dt_str(event, "dtstart"), "end": _get_dt_str(event, "dtend"), "location": _get_property(event, "location", ""), "description": _get_property(event, "description", ""), } logger.info("[caldav] get_event found: %r start=%s", data["summary"], data["start"]) return ToolResult(success=True, data=data) except Exception as e: logger.error("[caldav] get_event failed: %s\n%s", e, traceback.format_exc()) return ToolResult(success=False, error=f"CalDAV get error: {e}") # ── Write operations ────────────────────────────────────────────────────── def _create_event( self, calendar: caldav.Calendar, summary: str, start: str, end: str, description: str, location: str, ) -> ToolResult: import uuid logger.info("[caldav] create_event summary=%r start=%s end=%s location=%r calendar_url=%s", summary, start, end, location, calendar.url) try: start_dt = dateutil_parser.parse(start) end_dt = dateutil_parser.parse(end) logger.debug("[caldav] create_event parsed start_dt=%s end_dt=%s", start_dt, end_dt) uid = str(uuid.uuid4()) ical = _build_ical(uid, summary, start_dt, end_dt, description, location) logger.debug("[caldav] create_event ical payload:\n%s", ical) calendar.add_event(ical) logger.info("[caldav] create_event success uid=%s", uid) return ToolResult( success=True, data={"created": True, "uid": uid, "summary": summary}, ) except Exception as e: logger.error("[caldav] create_event failed: %s\n%s", e, traceback.format_exc()) return ToolResult(success=False, error=f"CalDAV create error: {e}") def _update_event( self, calendar: caldav.Calendar, event_id: str, summary: str, start: str, end: str, description: str, location: str, ) -> ToolResult: logger.info("[caldav] update_event event_id=%s summary=%r start=%s end=%s", event_id, summary, start, end) try: event = _find_event(calendar, event_id) if event is None: logger.warning("[caldav] update_event: event not found: %s", event_id) return ToolResult(success=False, error=f"Event not found: {event_id}") vevent = event.vobject_instance.vevent if summary: vevent.summary.value = summary if start: vevent.dtstart.value = dateutil_parser.parse(start) if end: vevent.dtend.value = dateutil_parser.parse(end) if description: if hasattr(vevent, "description"): vevent.description.value = description else: vevent.add("description").value = description if location: if hasattr(vevent, "location"): vevent.location.value = location else: vevent.add("location").value = location event.save() logger.info("[caldav] update_event success uid=%s", event_id) return ToolResult(success=True, data={"updated": True, "uid": event_id}) except Exception as e: logger.error("[caldav] update_event failed: %s\n%s", e, traceback.format_exc()) return ToolResult(success=False, error=f"CalDAV update error: {e}") def _delete_event(self, calendar: caldav.Calendar, event_id: str) -> ToolResult: logger.info("[caldav] delete_event event_id=%s", event_id) try: event = _find_event(calendar, event_id) if event is None: logger.warning("[caldav] delete_event: event not found: %s", event_id) return ToolResult(success=False, error=f"Event not found: {event_id}") event.delete() logger.info("[caldav] delete_event success uid=%s", event_id) return ToolResult(success=True, data={"deleted": True, "uid": event_id}) except Exception as e: logger.error("[caldav] delete_event failed: %s\n%s", e, traceback.format_exc()) return ToolResult(success=False, error=f"CalDAV delete error: {e}") def confirmation_description(self, operation: str = "", summary: str = "", event_id: str = "", **kwargs) -> str: if operation == "create_event": start = kwargs.get("start", "") return f"Create calendar event: '{summary}' at {start}" if operation == "update_event": return f"Update calendar event: {event_id}" + (f" → '{summary}'" if summary else "") if operation == "delete_event": return f"Permanently delete calendar event: {event_id}" return f"{operation}: {event_id or summary}" # ── Helpers ─────────────────────────────────────────────────────────────────── def _get_uid(event: caldav.Event) -> str: try: return str(event.vobject_instance.vevent.uid.value) except Exception: return "" def _get_property(event: caldav.Event, prop: str, default: str = "") -> str: try: return str(getattr(event.vobject_instance.vevent, prop).value) except Exception: return default def _get_dt_str(event: caldav.Event, prop: str) -> str: try: val = getattr(event.vobject_instance.vevent, prop).value if isinstance(val, datetime): return val.isoformat() return str(val) except Exception: return "" def _find_event(calendar: caldav.Calendar, uid: str) -> caldav.Event | None: """Find an event by UID. Returns None if not found.""" try: # Try direct URL lookup first for event in calendar.events(): if _get_uid(event) == uid: return event except Exception: pass return None def _build_ical( uid: str, summary: str, start: datetime, end: datetime, description: str, location: str, ) -> str: now = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") start_str = start.strftime("%Y%m%dT%H%M%S") end_str = end.strftime("%Y%m%dT%H%M%S") tz = "Europe/Oslo" lines = [ "BEGIN:VCALENDAR", "VERSION:2.0", "PRODID:-//aide//aide//EN", "BEGIN:VEVENT", f"UID:{uid}", f"DTSTAMP:{now}", f"DTSTART;TZID={tz}:{start_str}", f"DTEND;TZID={tz}:{end_str}", f"SUMMARY:{summary}", ] if description: lines.append(f"DESCRIPTION:{description.replace(chr(10), '\\n')}") if location: lines.append(f"LOCATION:{location}") lines += ["END:VEVENT", "END:VCALENDAR"] return "\r\n".join(lines)