""" monitors/rss_monitor.py — RSS / Atom feed monitor. Polls feeds on a cron schedule, tracks seen item IDs, and dispatches an agent (or Pushover) when new items appear. Sends ETag / If-Modified-Since headers for bandwidth efficiency. """ from __future__ import annotations import logging from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from ..config import settings from . import store logger = logging.getLogger(__name__) def _item_id(entry) -> str: """Return a stable ID for a feed entry (id → link → title).""" return entry.get("id") or entry.get("link") or entry.get("title") or "" def _format_items(entries: list) -> str: """Format new feed entries as a readable message for agents.""" lines = [] for e in entries: title = e.get("title", "(no title)") link = e.get("link", "") summary = e.get("summary", "")[:500] lines.append(f"- {title}\n {link}") if summary: lines.append(f" {summary}") return "\n\n".join(lines) class RssFeedManager: """Manages APScheduler jobs for all rss_feeds entries.""" def __init__(self) -> None: self._scheduler: AsyncIOScheduler | None = None def init(self, scheduler: AsyncIOScheduler) -> None: self._scheduler = scheduler async def start_all(self) -> None: feeds = await store.list_rss_feeds() for feed in feeds: if feed["enabled"]: self._add_job(feed) logger.info("[rss-monitor] Registered %d RSS feed jobs", len([f for f in feeds if f["enabled"]])) def _add_job(self, feed: dict) -> None: if not self._scheduler: return try: self._scheduler.add_job( self._fetch_feed, trigger=CronTrigger.from_crontab(feed["schedule"], timezone=settings.timezone), id=f"rss:{feed['id']}", args=[str(feed["id"])], replace_existing=True, misfire_grace_time=300, ) except Exception as e: logger.error("[rss-monitor] Failed to schedule feed '%s': %s", feed["name"], e) def reschedule(self, feed: dict) -> None: if not self._scheduler: return job_id = f"rss:{feed['id']}" try: self._scheduler.remove_job(job_id) except Exception: pass if feed.get("enabled"): self._add_job(feed) def remove(self, feed_id: str) -> None: if not self._scheduler: return try: self._scheduler.remove_job(f"rss:{feed_id}") except Exception: pass async def fetch_now(self, feed_id: str) -> dict: """Force-fetch a feed immediately (UI-triggered). Returns status dict.""" return await self._fetch_feed(feed_id) async def _fetch_feed(self, feed_id: str) -> dict: import feedparser feed_row = await store.get_rss_feed(feed_id) if not feed_row: return {"error": "Feed not found"} logger.info("[rss-monitor] Fetching '%s' (%s)", feed_row["name"], feed_row["url"]) # Build request with conditional headers for bandwidth efficiency request_headers = {} if feed_row.get("last_etag"): request_headers["ETag"] = feed_row["last_etag"] if feed_row.get("last_modified"): request_headers["If-Modified-Since"] = feed_row["last_modified"] try: import httpx async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: resp = await client.get(feed_row["url"], headers=request_headers) if resp.status_code == 304: logger.info("[rss-monitor] '%s' unchanged (304)", feed_row["name"]) await store.update_feed_fetch_result(feed_id, feed_row.get("seen_item_ids") or []) return {"new_items": 0, "status": "not_modified"} resp.raise_for_status() parsed = feedparser.parse(resp.text) etag = resp.headers.get("etag") last_modified = resp.headers.get("last-modified") except Exception as e: error_msg = str(e)[:200] logger.warning("[rss-monitor] Failed to fetch '%s': %s", feed_row["url"], error_msg) await store.update_feed_fetch_result(feed_id, feed_row.get("seen_item_ids") or [], error=error_msg) return {"error": error_msg} seen = set(feed_row.get("seen_item_ids") or []) max_items = feed_row.get("max_items_per_run") or 5 new_entries = [e for e in parsed.entries if _item_id(e) and _item_id(e) not in seen][:max_items] # Update seen IDs (keep last 500 to prevent unbounded growth) all_ids = list(seen | {_item_id(e) for e in parsed.entries if _item_id(e)}) all_ids = all_ids[-500:] await store.update_feed_fetch_result( feed_id, seen_item_ids=all_ids, etag=etag, last_modified=last_modified, ) if new_entries: logger.info("[rss-monitor] %d new items in '%s'", len(new_entries), feed_row["name"]) await self._dispatch_new_items(feed_row, new_entries) return {"new_items": len(new_entries)} async def _dispatch_new_items(self, feed_row: dict, entries: list) -> None: mode = feed_row.get("notification_mode", "agent") count = len(entries) items_text = _format_items(entries) message = ( f"{count} new item{'s' if count != 1 else ''} in feed: {feed_row['name']}\n" f"URL: {feed_row['url']}\n\n" f"{items_text}" ) if mode in ("pushover", "both"): try: from ..tools.pushover_tool import PushoverTool await PushoverTool().execute( title=f"RSS: {count} new in {feed_row['name']}", message=items_text[:512], priority=0, ) except Exception as e: logger.warning("[rss-monitor] Pushover notify failed: %s", e) if mode in ("agent", "both"): agent_id = feed_row.get("agent_id") if agent_id: try: from ..agents.runner import agent_runner await agent_runner.run_agent_now( agent_id=agent_id, override_message=message, ) except Exception as e: logger.warning("[rss-monitor] Agent dispatch failed: %s", e) rss_monitor = RssFeedManager()