""" monitors/store.py — DB CRUD for watched_pages and rss_feeds tables. """ from __future__ import annotations import json from datetime import datetime, timezone from ..database import _rowcount, get_pool def _utcnow() -> str: return datetime.now(timezone.utc).isoformat() def _page_row(row) -> dict: return dict(row) def _feed_row(row) -> dict: d = dict(row) if isinstance(d.get("seen_item_ids"), str): try: d["seen_item_ids"] = json.loads(d["seen_item_ids"]) except Exception: d["seen_item_ids"] = [] return d # ─── Watched Pages ──────────────────────────────────────────────────────────── async def create_watched_page( name: str, url: str, schedule: str = "0 * * * *", css_selector: str | None = None, agent_id: str | None = None, notification_mode: str = "agent", owner_user_id: str | None = None, ) -> dict: pool = await get_pool() row = await pool.fetchrow( """ INSERT INTO watched_pages (name, url, schedule, css_selector, agent_id, notification_mode, owner_user_id, created_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) RETURNING * """, name, url, schedule, css_selector, agent_id or None, notification_mode, owner_user_id, _utcnow(), ) return _page_row(row) async def list_watched_pages(owner_user_id: str | None = None) -> list[dict]: pool = await get_pool() if owner_user_id: rows = await pool.fetch( "SELECT * FROM watched_pages WHERE owner_user_id = $1 ORDER BY created_at DESC", owner_user_id, ) else: rows = await pool.fetch("SELECT * FROM watched_pages ORDER BY created_at DESC") return [_page_row(r) for r in rows] async def get_watched_page(page_id: str) -> dict | None: pool = await get_pool() row = await pool.fetchrow("SELECT * FROM watched_pages WHERE id = $1::uuid", page_id) return _page_row(row) if row else None async def update_watched_page(page_id: str, **fields) -> dict | None: allowed = {"name", "url", "schedule", "css_selector", "agent_id", "notification_mode", "enabled"} updates = {k: v for k, v in fields.items() if k in allowed} if not updates: return await get_watched_page(page_id) pool = await get_pool() set_clauses = ", ".join(f"{k} = ${i + 2}" for i, k in enumerate(updates)) await pool.execute( f"UPDATE watched_pages SET {set_clauses} WHERE id = $1::uuid", page_id, *updates.values(), ) return await get_watched_page(page_id) async def delete_watched_page(page_id: str) -> bool: pool = await get_pool() status = await pool.execute("DELETE FROM watched_pages WHERE id = $1::uuid", page_id) return _rowcount(status) > 0 async def update_page_check_result( page_id: str, content_hash: str | None, changed: bool, error: str | None = None, ) -> None: pool = await get_pool() now = _utcnow() if error: await pool.execute( "UPDATE watched_pages SET last_checked_at=$1, last_error=$2 WHERE id=$3::uuid", now, error, page_id, ) elif changed: await pool.execute( """UPDATE watched_pages SET last_checked_at=$1, last_content_hash=$2, last_changed_at=$1, last_error=NULL WHERE id=$3::uuid""", now, content_hash, page_id, ) else: await pool.execute( """UPDATE watched_pages SET last_checked_at=$1, last_content_hash=$2, last_error=NULL WHERE id=$3::uuid""", now, content_hash, page_id, ) # ─── RSS Feeds ──────────────────────────────────────────────────────────────── async def create_rss_feed( name: str, url: str, schedule: str = "0 */4 * * *", agent_id: str | None = None, notification_mode: str = "agent", max_items_per_run: int = 5, owner_user_id: str | None = None, ) -> dict: pool = await get_pool() row = await pool.fetchrow( """ INSERT INTO rss_feeds (name, url, schedule, agent_id, notification_mode, max_items_per_run, owner_user_id, created_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8) RETURNING * """, name, url, schedule, agent_id or None, notification_mode, max_items_per_run, owner_user_id, _utcnow(), ) return _feed_row(row) async def list_rss_feeds(owner_user_id: str | None = None) -> list[dict]: pool = await get_pool() if owner_user_id: rows = await pool.fetch( "SELECT * FROM rss_feeds WHERE owner_user_id = $1 ORDER BY created_at DESC", owner_user_id, ) else: rows = await pool.fetch("SELECT * FROM rss_feeds ORDER BY created_at DESC") return [_feed_row(r) for r in rows] async def get_rss_feed(feed_id: str) -> dict | None: pool = await get_pool() row = await pool.fetchrow("SELECT * FROM rss_feeds WHERE id = $1::uuid", feed_id) return _feed_row(row) if row else None async def update_rss_feed(feed_id: str, **fields) -> dict | None: allowed = {"name", "url", "schedule", "agent_id", "notification_mode", "max_items_per_run", "enabled"} updates = {k: v for k, v in fields.items() if k in allowed} if not updates: return await get_rss_feed(feed_id) pool = await get_pool() set_clauses = ", ".join(f"{k} = ${i + 2}" for i, k in enumerate(updates)) await pool.execute( f"UPDATE rss_feeds SET {set_clauses} WHERE id = $1::uuid", feed_id, *updates.values(), ) return await get_rss_feed(feed_id) async def delete_rss_feed(feed_id: str) -> bool: pool = await get_pool() status = await pool.execute("DELETE FROM rss_feeds WHERE id = $1::uuid", feed_id) return _rowcount(status) > 0 async def update_feed_fetch_result( feed_id: str, seen_item_ids: list[str], etag: str | None = None, last_modified: str | None = None, error: str | None = None, ) -> None: pool = await get_pool() now = _utcnow() if error: await pool.execute( "UPDATE rss_feeds SET last_fetched_at=$1, last_error=$2 WHERE id=$3::uuid", now, error, feed_id, ) else: await pool.execute( """UPDATE rss_feeds SET last_fetched_at=$1, seen_item_ids=$2, last_etag=$3, last_modified=$4, last_error=NULL WHERE id=$5::uuid""", now, seen_item_ids, etag, last_modified, feed_id, )