Files
oai-web/server/tools/contacts_tool.py
Rune Olsen 7b0a9ccc2b Settings: add dedicated DAV/Pushover tabs, fix CalDAV/CardDAV bugs
- Add admin DAV tab (rename from CalDAV/CardDAV) and Pushover tab
  - Add per-user Pushover tab (User Key only; App Token stays admin-managed)
  - Remove system-wide CalDAV/CardDAV fallback — per-user config only
  - Rewrite contacts_tool.py using httpx directly (caldav 2.x dropped AddressBook)
  - Fix CardDAV REPORT/PROPFIND using SOGo URL pattern
  - Fix CalDAV/CardDAV test endpoints (POST method, URL scheme normalization)
  - Fix Show Password button — API now returns actual credential values
  - Convert Credentials tab to generic key-value store; dedicated keys
    (CalDAV, Pushover, trusted_proxy) excluded via _DEDICATED_CRED_KEYS
2026-04-10 12:06:23 +02:00

424 lines
18 KiB
Python

"""
tools/contacts_tool.py — CardDAV contacts access (Mailcow / SOGo).
Uses httpx directly for CardDAV protocol (caldav 2.x dropped AddressBook support).
Address book URL format: {base_url}/SOGo/dav/{username}/Contacts/personal/
Read operations: list_contacts, search_contacts, get_contact (always available)
Write operations: create_contact, update_contact, delete_contact
→ only available when credential_store key 'contacts:allow_write' == '1'
→ all write ops require user confirmation
"""
from __future__ import annotations
import logging
import uuid as _uuid
from urllib.parse import urlparse
from xml.etree import ElementTree as ET
import httpx
import vobject
from ..context_vars import current_user
from ..database import credential_store
from .base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
MAX_CONTACTS = 100
_CARDDAV_NS = "urn:ietf:params:xml:ns:carddav"
_DAV_NS = "DAV:"
# ── URL helpers ───────────────────────────────────────────────────────────────
def _sogo_carddav_url(base_url: str, username: str) -> str:
"""Return the SOGo CardDAV personal address book URL for the given user."""
if not base_url.startswith(("http://", "https://")):
base_url = "https://" + base_url
if "/SOGo/dav/" in base_url or base_url.rstrip("/").endswith("/Contacts/personal"):
return base_url.rstrip("/") + "/"
return f"{base_url.rstrip('/')}/SOGo/dav/{username}/Contacts/personal/"
def _abs_href(href: str, base_url: str) -> str:
"""Convert a relative href to an absolute URL using base_url's scheme+host."""
if href.startswith("http://") or href.startswith("https://"):
return href
parsed = urlparse(base_url)
return f"{parsed.scheme}://{parsed.netloc}{href}"
# ── vCard helpers ─────────────────────────────────────────────────────────────
def _vcard_to_dict(vcard_text: str, contact_url: str = "") -> dict:
"""Parse a vCard string into a structured dict."""
try:
vc = vobject.readOne(vcard_text)
except Exception:
return {"id": contact_url, "raw": vcard_text[:200]}
def _get(component: str) -> str:
try:
return str(getattr(vc, component).value)
except Exception:
return ""
def _get_list(component: str) -> list[str]:
try:
items = vc.contents.get(component.lower(), [])
return [str(item.value) for item in items]
except Exception:
return []
name = _get("fn")
emails = _get_list("email")
phones = _get_list("tel")
org = _get("org")
note = _get("note")
uid = _get("uid") or contact_url
result: dict = {"id": contact_url or uid, "name": name}
if emails:
result["email"] = emails[0]
if len(emails) > 1:
result["emails"] = emails
if phones:
result["phone"] = phones[0]
if len(phones) > 1:
result["phones"] = phones
if org:
result["organization"] = org
if note:
result["note"] = note[:200]
return result
# ── Credential helpers ────────────────────────────────────────────────────────
async def _get_carddav_config(user_id: str | None = None) -> dict:
"""
Per-user CardDAV config lookup — no system-wide fallback.
If carddav_same_as_caldav is set, reuses the user's own CalDAV credentials.
Otherwise uses the user's explicit CardDAV URL/username/password.
Returns empty dict values when not configured.
"""
from .caldav_tool import _get_caldav_config
if not user_id:
return {"url": None, "username": None, "password": None}
from ..database import user_settings_store
same = await user_settings_store.get(user_id, "carddav_same_as_caldav")
if same == "1":
return await _get_caldav_config(user_id)
carddav_url = await user_settings_store.get(user_id, "carddav_url")
carddav_user = await user_settings_store.get(user_id, "carddav_username")
carddav_pass = await user_settings_store.get(user_id, "carddav_password")
return {"url": carddav_url, "username": carddav_user, "password": carddav_pass}
# ── Low-level CardDAV HTTP ────────────────────────────────────────────────────
async def _carddav_report(abook_url: str, auth: tuple[str, str]) -> list[dict]:
"""
Issue an addressbook-query REPORT to fetch all vCards.
Returns list of {"url": str, "vcard_text": str}.
"""
body = (
'<?xml version="1.0" encoding="utf-8"?>'
'<C:addressbook-query xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:carddav">'
"<D:prop><D:getetag/><C:address-data/></D:prop>"
"<C:filter/>"
"</C:addressbook-query>"
)
async with httpx.AsyncClient(auth=auth, timeout=30) as client:
r = await client.request(
"REPORT",
abook_url,
content=body,
headers={"Content-Type": "application/xml; charset=utf-8", "Depth": "1"},
)
if r.status_code != 207:
raise RuntimeError(f"CardDAV REPORT returned HTTP {r.status_code}")
root = ET.fromstring(r.content)
results = []
for resp in root.findall(f"{{{_DAV_NS}}}response"):
href_el = resp.find(f"{{{_DAV_NS}}}href")
status_el = resp.find(f"{{{_DAV_NS}}}propstat/{{{_DAV_NS}}}status")
data_el = resp.find(
f"{{{_DAV_NS}}}propstat/{{{_DAV_NS}}}prop/{{{_CARDDAV_NS}}}address-data"
)
if href_el is None or data_el is None or not (data_el.text or "").strip():
continue
if status_el is not None and "200" not in (status_el.text or ""):
continue
abs_url = _abs_href(href_el.text, abook_url)
results.append({"url": abs_url, "vcard_text": data_el.text})
return results
async def _carddav_put(url: str, vcard_text: str, auth: tuple[str, str], etag: str = "") -> None:
headers: dict = {"Content-Type": "text/vcard; charset=utf-8"}
if etag:
headers["If-Match"] = etag
async with httpx.AsyncClient(auth=auth, timeout=15) as client:
r = await client.put(url, content=vcard_text.encode(), headers=headers)
if r.status_code not in (200, 201, 204):
raise RuntimeError(f"CardDAV PUT returned HTTP {r.status_code}")
async def _carddav_delete(url: str, auth: tuple[str, str]) -> None:
async with httpx.AsyncClient(auth=auth, timeout=15) as client:
r = await client.delete(url)
if r.status_code not in (200, 204):
raise RuntimeError(f"CardDAV DELETE returned HTTP {r.status_code}")
# ── Tool ──────────────────────────────────────────────────────────────────────
_WRITE_OPS = {"create_contact", "update_contact", "delete_contact"}
class ContactsTool(BaseTool):
name = "contacts"
description = (
"Search, read, and (if write access is enabled) modify contacts in the CardDAV address book. "
"Read operations: list_contacts, search_contacts, get_contact. "
"Write operations (require contacts:allow_write=1): create_contact, update_contact, delete_contact. "
"Uses the same CalDAV server credentials."
)
input_schema = {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": [
"list_contacts", "search_contacts", "get_contact",
"create_contact", "update_contact", "delete_contact",
],
"description": "Operation to perform.",
},
"query": {
"type": "string",
"description": "Search query (name or email substring). Required for search_contacts.",
},
"contact_id": {
"type": "string",
"description": "Contact URL or UID. Required for get_contact, update_contact, delete_contact.",
},
"name": {"type": "string", "description": "Full display name. Required for create_contact."},
"email": {"type": "string", "description": "Primary email address."},
"phone": {"type": "string", "description": "Primary phone number."},
"organization": {"type": "string", "description": "Organization / company name."},
"note": {"type": "string", "description": "Free-text note."},
},
"required": ["operation"],
}
requires_confirmation = False
allowed_in_scheduled_tasks = True
async def execute(
self,
operation: str,
query: str = "",
contact_id: str = "",
name: str = "",
email: str = "",
phone: str = "",
organization: str = "",
note: str = "",
**_,
) -> ToolResult:
user_id = None
try:
u = current_user.get()
if u:
user_id = u.id
except Exception:
pass
# Gate write operations
if operation in _WRITE_OPS:
allow_write = False
if user_id:
from ..database import user_settings_store
per_user = await user_settings_store.get(user_id, "contacts_allow_write")
allow_write = (per_user == "1")
if not allow_write:
allow_write = (await credential_store.get("contacts:allow_write") == "1")
if not allow_write:
return ToolResult(
success=False,
error=(
"Contact write access is disabled. "
"Enable it in Settings → CalDAV/CardDAV."
),
)
cfg = await _get_carddav_config(user_id)
base_url = cfg.get("url")
username = cfg.get("username")
password = cfg.get("password")
if not base_url or not username or not password:
return ToolResult(
success=False,
error=(
"CardDAV credentials not configured. "
"Set them in Settings → CalDAV/CardDAV."
),
)
abook_url = _sogo_carddav_url(base_url, username)
auth = (username, password)
if operation == "list_contacts":
return await self._list_contacts(abook_url, auth)
if operation == "search_contacts":
if not query:
return ToolResult(success=False, error="'query' is required for search_contacts")
return await self._search_contacts(abook_url, auth, query)
if operation == "get_contact":
if not contact_id:
return ToolResult(success=False, error="'contact_id' is required for get_contact")
return await self._get_contact(abook_url, auth, contact_id)
if operation == "create_contact":
if not name:
return ToolResult(success=False, error="'name' is required for create_contact")
return await self._create_contact(abook_url, auth, name, email, phone, organization, note)
if operation == "update_contact":
if not contact_id:
return ToolResult(success=False, error="'contact_id' is required for update_contact")
return await self._update_contact(abook_url, auth, contact_id, name, email, phone, organization, note)
if operation == "delete_contact":
if not contact_id:
return ToolResult(success=False, error="'contact_id' is required for delete_contact")
return await self._delete_contact(abook_url, auth, contact_id)
return ToolResult(success=False, error=f"Unknown operation: {operation}")
# ── Read operations ────────────────────────────────────────────────────────
async def _list_contacts(self, abook_url: str, auth: tuple) -> ToolResult:
try:
items = await _carddav_report(abook_url, auth)
contacts = [_vcard_to_dict(it["vcard_text"], it["url"]) for it in items[:MAX_CONTACTS]]
contacts.sort(key=lambda c: c.get("name", "").lower())
return ToolResult(success=True, data={"contacts": contacts, "count": len(contacts)})
except Exception as e:
return ToolResult(success=False, error=f"Failed to list contacts: {e}")
async def _search_contacts(self, abook_url: str, auth: tuple, query: str) -> ToolResult:
try:
q_lower = query.lower()
items = await _carddav_report(abook_url, auth)
matches = []
for it in items:
d = _vcard_to_dict(it["vcard_text"], it["url"])
name_match = q_lower in d.get("name", "").lower()
email_match = any(q_lower in e.lower() for e in ([d.get("email", "")] + d.get("emails", [])))
if name_match or email_match:
matches.append(d)
if len(matches) >= MAX_CONTACTS:
break
matches.sort(key=lambda c: c.get("name", "").lower())
return ToolResult(success=True, data={"contacts": matches, "count": len(matches), "query": query})
except Exception as e:
return ToolResult(success=False, error=f"Failed to search contacts: {e}")
async def _get_contact(self, abook_url: str, auth: tuple, contact_id: str) -> ToolResult:
try:
items = await _carddav_report(abook_url, auth)
for it in items:
d = _vcard_to_dict(it["vcard_text"], it["url"])
if d.get("id") == contact_id or contact_id in it["url"]:
return ToolResult(success=True, data=d)
return ToolResult(success=False, error=f"Contact '{contact_id}' not found")
except Exception as e:
return ToolResult(success=False, error=f"Failed to get contact: {e}")
# ── Write operations ───────────────────────────────────────────────────────
def _build_vcard(
self,
name: str,
email: str = "",
phone: str = "",
organization: str = "",
note: str = "",
uid: str | None = None,
) -> str:
vc = vobject.vCard()
vc.add("fn").value = name
parts = name.split(" ", 1)
n = vobject.vcard.Name(family=parts[-1], given=parts[0] if len(parts) > 1 else "")
vc.add("n").value = n
vc.add("uid").value = uid or str(_uuid.uuid4())
if email:
email_obj = vc.add("email")
email_obj.value = email
email_obj.type_param = "INTERNET"
if phone:
tel_obj = vc.add("tel")
tel_obj.value = phone
tel_obj.type_param = "VOICE"
if organization:
vc.add("org").value = [organization]
if note:
vc.add("note").value = note
return vc.serialize()
async def _create_contact(
self, abook_url: str, auth: tuple, name: str, email: str, phone: str, organization: str, note: str
) -> ToolResult:
try:
uid = str(_uuid.uuid4())
vcard_text = self._build_vcard(name, email, phone, organization, note, uid=uid)
url = abook_url.rstrip("/") + f"/{uid}.vcf"
await _carddav_put(url, vcard_text, auth)
return ToolResult(success=True, data={"created": name, "id": url})
except Exception as e:
return ToolResult(success=False, error=f"Failed to create contact: {e}")
async def _update_contact(
self, abook_url: str, auth: tuple, contact_id: str, name: str, email: str, phone: str, organization: str, note: str
) -> ToolResult:
try:
items = await _carddav_report(abook_url, auth)
for it in items:
d = _vcard_to_dict(it["vcard_text"], it["url"])
if d.get("id") == contact_id or contact_id in it["url"]:
new_name = name or d.get("name", "")
new_email = email or d.get("email", "")
new_phone = phone or d.get("phone", "")
new_org = organization or d.get("organization", "")
new_note = note or d.get("note", "")
# Extract UID from original vCard to reuse
try:
vc_orig = vobject.readOne(it["vcard_text"])
uid = str(vc_orig.uid.value)
except Exception:
uid = None
new_vcard = self._build_vcard(new_name, new_email, new_phone, new_org, new_note, uid=uid)
await _carddav_put(it["url"], new_vcard, auth)
return ToolResult(success=True, data={"updated": new_name})
return ToolResult(success=False, error=f"Contact '{contact_id}' not found")
except Exception as e:
return ToolResult(success=False, error=f"Failed to update contact: {e}")
async def _delete_contact(self, abook_url: str, auth: tuple, contact_id: str) -> ToolResult:
try:
items = await _carddav_report(abook_url, auth)
for it in items:
d = _vcard_to_dict(it["vcard_text"], it["url"])
if d.get("id") == contact_id or contact_id in it["url"]:
await _carddav_delete(it["url"], auth)
return ToolResult(success=True, data={"deleted": d.get("name", contact_id)})
return ToolResult(success=False, error=f"Contact '{contact_id}' not found")
except Exception as e:
return ToolResult(success=False, error=f"Failed to delete contact: {e}")