""" tools/contacts_tool.py — CardDAV contacts access (Mailcow / SOGo). Uses httpx directly for CardDAV protocol (caldav 2.x dropped AddressBook support). Address book URL format: {base_url}/SOGo/dav/{username}/Contacts/personal/ Read operations: list_contacts, search_contacts, get_contact (always available) Write operations: create_contact, update_contact, delete_contact → only available when credential_store key 'contacts:allow_write' == '1' → all write ops require user confirmation """ from __future__ import annotations import logging import uuid as _uuid from urllib.parse import urlparse from xml.etree import ElementTree as ET import httpx import vobject from ..context_vars import current_user from ..database import credential_store from .base import BaseTool, ToolResult logger = logging.getLogger(__name__) MAX_CONTACTS = 100 _CARDDAV_NS = "urn:ietf:params:xml:ns:carddav" _DAV_NS = "DAV:" # ── URL helpers ─────────────────────────────────────────────────────────────── def _sogo_carddav_url(base_url: str, username: str) -> str: """Return the SOGo CardDAV personal address book URL for the given user.""" if not base_url.startswith(("http://", "https://")): base_url = "https://" + base_url if "/SOGo/dav/" in base_url or base_url.rstrip("/").endswith("/Contacts/personal"): return base_url.rstrip("/") + "/" return f"{base_url.rstrip('/')}/SOGo/dav/{username}/Contacts/personal/" def _abs_href(href: str, base_url: str) -> str: """Convert a relative href to an absolute URL using base_url's scheme+host.""" if href.startswith("http://") or href.startswith("https://"): return href parsed = urlparse(base_url) return f"{parsed.scheme}://{parsed.netloc}{href}" # ── vCard helpers ───────────────────────────────────────────────────────────── def _vcard_to_dict(vcard_text: str, contact_url: str = "") -> dict: """Parse a vCard string into a structured dict.""" try: vc = vobject.readOne(vcard_text) except Exception: return {"id": contact_url, "raw": vcard_text[:200]} def _get(component: str) -> str: try: return str(getattr(vc, component).value) except Exception: return "" def _get_list(component: str) -> list[str]: try: items = vc.contents.get(component.lower(), []) return [str(item.value) for item in items] except Exception: return [] name = _get("fn") emails = _get_list("email") phones = _get_list("tel") org = _get("org") note = _get("note") uid = _get("uid") or contact_url result: dict = {"id": contact_url or uid, "name": name} if emails: result["email"] = emails[0] if len(emails) > 1: result["emails"] = emails if phones: result["phone"] = phones[0] if len(phones) > 1: result["phones"] = phones if org: result["organization"] = org if note: result["note"] = note[:200] return result # ── Credential helpers ──────────────────────────────────────────────────────── async def _get_carddav_config(user_id: str | None = None) -> dict: """ Per-user CardDAV config lookup — no system-wide fallback. If carddav_same_as_caldav is set, reuses the user's own CalDAV credentials. Otherwise uses the user's explicit CardDAV URL/username/password. Returns empty dict values when not configured. """ from .caldav_tool import _get_caldav_config if not user_id: return {"url": None, "username": None, "password": None} from ..database import user_settings_store same = await user_settings_store.get(user_id, "carddav_same_as_caldav") if same == "1": return await _get_caldav_config(user_id) carddav_url = await user_settings_store.get(user_id, "carddav_url") carddav_user = await user_settings_store.get(user_id, "carddav_username") carddav_pass = await user_settings_store.get(user_id, "carddav_password") return {"url": carddav_url, "username": carddav_user, "password": carddav_pass} # ── Low-level CardDAV HTTP ──────────────────────────────────────────────────── async def _carddav_report(abook_url: str, auth: tuple[str, str]) -> list[dict]: """ Issue an addressbook-query REPORT to fetch all vCards. Returns list of {"url": str, "vcard_text": str}. """ body = ( '' '' "" "" "" ) async with httpx.AsyncClient(auth=auth, timeout=30) as client: r = await client.request( "REPORT", abook_url, content=body, headers={"Content-Type": "application/xml; charset=utf-8", "Depth": "1"}, ) if r.status_code != 207: raise RuntimeError(f"CardDAV REPORT returned HTTP {r.status_code}") root = ET.fromstring(r.content) results = [] for resp in root.findall(f"{{{_DAV_NS}}}response"): href_el = resp.find(f"{{{_DAV_NS}}}href") status_el = resp.find(f"{{{_DAV_NS}}}propstat/{{{_DAV_NS}}}status") data_el = resp.find( f"{{{_DAV_NS}}}propstat/{{{_DAV_NS}}}prop/{{{_CARDDAV_NS}}}address-data" ) if href_el is None or data_el is None or not (data_el.text or "").strip(): continue if status_el is not None and "200" not in (status_el.text or ""): continue abs_url = _abs_href(href_el.text, abook_url) results.append({"url": abs_url, "vcard_text": data_el.text}) return results async def _carddav_put(url: str, vcard_text: str, auth: tuple[str, str], etag: str = "") -> None: headers: dict = {"Content-Type": "text/vcard; charset=utf-8"} if etag: headers["If-Match"] = etag async with httpx.AsyncClient(auth=auth, timeout=15) as client: r = await client.put(url, content=vcard_text.encode(), headers=headers) if r.status_code not in (200, 201, 204): raise RuntimeError(f"CardDAV PUT returned HTTP {r.status_code}") async def _carddav_delete(url: str, auth: tuple[str, str]) -> None: async with httpx.AsyncClient(auth=auth, timeout=15) as client: r = await client.delete(url) if r.status_code not in (200, 204): raise RuntimeError(f"CardDAV DELETE returned HTTP {r.status_code}") # ── Tool ────────────────────────────────────────────────────────────────────── _WRITE_OPS = {"create_contact", "update_contact", "delete_contact"} class ContactsTool(BaseTool): name = "contacts" description = ( "Search, read, and (if write access is enabled) modify contacts in the CardDAV address book. " "Read operations: list_contacts, search_contacts, get_contact. " "Write operations (require contacts:allow_write=1): create_contact, update_contact, delete_contact. " "Uses the same CalDAV server credentials." ) input_schema = { "type": "object", "properties": { "operation": { "type": "string", "enum": [ "list_contacts", "search_contacts", "get_contact", "create_contact", "update_contact", "delete_contact", ], "description": "Operation to perform.", }, "query": { "type": "string", "description": "Search query (name or email substring). Required for search_contacts.", }, "contact_id": { "type": "string", "description": "Contact URL or UID. Required for get_contact, update_contact, delete_contact.", }, "name": {"type": "string", "description": "Full display name. Required for create_contact."}, "email": {"type": "string", "description": "Primary email address."}, "phone": {"type": "string", "description": "Primary phone number."}, "organization": {"type": "string", "description": "Organization / company name."}, "note": {"type": "string", "description": "Free-text note."}, }, "required": ["operation"], } requires_confirmation = False allowed_in_scheduled_tasks = True async def execute( self, operation: str, query: str = "", contact_id: str = "", name: str = "", email: str = "", phone: str = "", organization: str = "", note: str = "", **_, ) -> ToolResult: user_id = None try: u = current_user.get() if u: user_id = u.id except Exception: pass # Gate write operations if operation in _WRITE_OPS: allow_write = False if user_id: from ..database import user_settings_store per_user = await user_settings_store.get(user_id, "contacts_allow_write") allow_write = (per_user == "1") if not allow_write: allow_write = (await credential_store.get("contacts:allow_write") == "1") if not allow_write: return ToolResult( success=False, error=( "Contact write access is disabled. " "Enable it in Settings → CalDAV/CardDAV." ), ) cfg = await _get_carddav_config(user_id) base_url = cfg.get("url") username = cfg.get("username") password = cfg.get("password") if not base_url or not username or not password: return ToolResult( success=False, error=( "CardDAV credentials not configured. " "Set them in Settings → CalDAV/CardDAV." ), ) abook_url = _sogo_carddav_url(base_url, username) auth = (username, password) if operation == "list_contacts": return await self._list_contacts(abook_url, auth) if operation == "search_contacts": if not query: return ToolResult(success=False, error="'query' is required for search_contacts") return await self._search_contacts(abook_url, auth, query) if operation == "get_contact": if not contact_id: return ToolResult(success=False, error="'contact_id' is required for get_contact") return await self._get_contact(abook_url, auth, contact_id) if operation == "create_contact": if not name: return ToolResult(success=False, error="'name' is required for create_contact") return await self._create_contact(abook_url, auth, name, email, phone, organization, note) if operation == "update_contact": if not contact_id: return ToolResult(success=False, error="'contact_id' is required for update_contact") return await self._update_contact(abook_url, auth, contact_id, name, email, phone, organization, note) if operation == "delete_contact": if not contact_id: return ToolResult(success=False, error="'contact_id' is required for delete_contact") return await self._delete_contact(abook_url, auth, contact_id) return ToolResult(success=False, error=f"Unknown operation: {operation}") # ── Read operations ──────────────────────────────────────────────────────── async def _list_contacts(self, abook_url: str, auth: tuple) -> ToolResult: try: items = await _carddav_report(abook_url, auth) contacts = [_vcard_to_dict(it["vcard_text"], it["url"]) for it in items[:MAX_CONTACTS]] contacts.sort(key=lambda c: c.get("name", "").lower()) return ToolResult(success=True, data={"contacts": contacts, "count": len(contacts)}) except Exception as e: return ToolResult(success=False, error=f"Failed to list contacts: {e}") async def _search_contacts(self, abook_url: str, auth: tuple, query: str) -> ToolResult: try: q_lower = query.lower() items = await _carddav_report(abook_url, auth) matches = [] for it in items: d = _vcard_to_dict(it["vcard_text"], it["url"]) name_match = q_lower in d.get("name", "").lower() email_match = any(q_lower in e.lower() for e in ([d.get("email", "")] + d.get("emails", []))) if name_match or email_match: matches.append(d) if len(matches) >= MAX_CONTACTS: break matches.sort(key=lambda c: c.get("name", "").lower()) return ToolResult(success=True, data={"contacts": matches, "count": len(matches), "query": query}) except Exception as e: return ToolResult(success=False, error=f"Failed to search contacts: {e}") async def _get_contact(self, abook_url: str, auth: tuple, contact_id: str) -> ToolResult: try: items = await _carddav_report(abook_url, auth) for it in items: d = _vcard_to_dict(it["vcard_text"], it["url"]) if d.get("id") == contact_id or contact_id in it["url"]: return ToolResult(success=True, data=d) return ToolResult(success=False, error=f"Contact '{contact_id}' not found") except Exception as e: return ToolResult(success=False, error=f"Failed to get contact: {e}") # ── Write operations ─────────────────────────────────────────────────────── def _build_vcard( self, name: str, email: str = "", phone: str = "", organization: str = "", note: str = "", uid: str | None = None, ) -> str: vc = vobject.vCard() vc.add("fn").value = name parts = name.split(" ", 1) n = vobject.vcard.Name(family=parts[-1], given=parts[0] if len(parts) > 1 else "") vc.add("n").value = n vc.add("uid").value = uid or str(_uuid.uuid4()) if email: email_obj = vc.add("email") email_obj.value = email email_obj.type_param = "INTERNET" if phone: tel_obj = vc.add("tel") tel_obj.value = phone tel_obj.type_param = "VOICE" if organization: vc.add("org").value = [organization] if note: vc.add("note").value = note return vc.serialize() async def _create_contact( self, abook_url: str, auth: tuple, name: str, email: str, phone: str, organization: str, note: str ) -> ToolResult: try: uid = str(_uuid.uuid4()) vcard_text = self._build_vcard(name, email, phone, organization, note, uid=uid) url = abook_url.rstrip("/") + f"/{uid}.vcf" await _carddav_put(url, vcard_text, auth) return ToolResult(success=True, data={"created": name, "id": url}) except Exception as e: return ToolResult(success=False, error=f"Failed to create contact: {e}") async def _update_contact( self, abook_url: str, auth: tuple, contact_id: str, name: str, email: str, phone: str, organization: str, note: str ) -> ToolResult: try: items = await _carddav_report(abook_url, auth) for it in items: d = _vcard_to_dict(it["vcard_text"], it["url"]) if d.get("id") == contact_id or contact_id in it["url"]: new_name = name or d.get("name", "") new_email = email or d.get("email", "") new_phone = phone or d.get("phone", "") new_org = organization or d.get("organization", "") new_note = note or d.get("note", "") # Extract UID from original vCard to reuse try: vc_orig = vobject.readOne(it["vcard_text"]) uid = str(vc_orig.uid.value) except Exception: uid = None new_vcard = self._build_vcard(new_name, new_email, new_phone, new_org, new_note, uid=uid) await _carddav_put(it["url"], new_vcard, auth) return ToolResult(success=True, data={"updated": new_name}) return ToolResult(success=False, error=f"Contact '{contact_id}' not found") except Exception as e: return ToolResult(success=False, error=f"Failed to update contact: {e}") async def _delete_contact(self, abook_url: str, auth: tuple, contact_id: str) -> ToolResult: try: items = await _carddav_report(abook_url, auth) for it in items: d = _vcard_to_dict(it["vcard_text"], it["url"]) if d.get("id") == contact_id or contact_id in it["url"]: await _carddav_delete(it["url"], auth) return ToolResult(success=True, data={"deleted": d.get("name", contact_id)}) return ToolResult(success=False, error=f"Contact '{contact_id}' not found") except Exception as e: return ToolResult(success=False, error=f"Failed to delete contact: {e}")