""" mcp.py — 2nd Brain MCP server. Exposes four MCP tools over Streamable HTTP transport (the modern MCP protocol), mounted on the existing FastAPI app at /brain-mcp. Access is protected by a bearer key checked on every request. Connect via: Claude Desktop / Claude Code: claude mcp add --transport http brain http://your-server:8080/brain-mcp/sse \\ --header "x-brain-key: YOUR_KEY" Any MCP client supporting Streamable HTTP: URL: http://your-server:8080/brain-mcp/sse The key can be passed as: ?key=... query parameter x-brain-key: ... request header Authorization: Bearer ... Note: _session_manager must be started via its run() context manager in the app lifespan (see main.py). """ from __future__ import annotations import logging import os from typing import Any from contextvars import ContextVar from mcp.server import Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.types import TextContent, Tool from starlette.requests import Request from starlette.responses import Response # Set per-request by handle_mcp; read by call_tool to scope DB queries. _mcp_user_id: ContextVar[str | None] = ContextVar("_mcp_user_id", default=None) logger = logging.getLogger(__name__) # ── MCP Server definition ───────────────────────────────────────────────────── _server = Server("open-brain") # Session manager — started in main.py lifespan via _session_manager.run() _session_manager = StreamableHTTPSessionManager(_server, stateless=True) async def _resolve_key(request: Request) -> str | None: """Resolve the provided key to a user_id, or None if invalid/missing. Looks up the key in user_settings["brain_mcp_key"] across all users. Returns the matching user_id, or None if no match. """ provided = ( request.query_params.get("key") or request.headers.get("x-brain-key") or request.headers.get("authorization", "").removeprefix("Bearer ").strip() or "" ) if not provided: return None try: from .database import _pool as _main_pool if _main_pool: async with _main_pool.acquire() as conn: row = await conn.fetchrow( "SELECT user_id FROM user_settings WHERE key='brain_mcp_key' AND value=$1", provided, ) if row: return str(row["user_id"]) except Exception: logger.warning("Brain key lookup failed", exc_info=True) return None async def _check_key(request: Request) -> bool: """Return True if the request carries a valid per-user brain key.""" user_id = await _resolve_key(request) return user_id is not None # ── Tool definitions ────────────────────────────────────────────────────────── @_server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="search_thoughts", description=( "Search your 2nd Brain by meaning (semantic similarity). " "Finds thoughts even when exact keywords don't match. " "Returns results ranked by relevance." ), inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "What to search for — describe it naturally.", }, "threshold": { "type": "number", "description": "Similarity threshold 0-1 (default 0.7). Lower = broader, more results.", "default": 0.7, }, "limit": { "type": "integer", "description": "Max number of results (default 10).", "default": 10, }, }, "required": ["query"], }, ), Tool( name="browse_recent", description=( "Browse the most recent thoughts in your 2nd Brain, " "optionally filtered by type (insight, person_note, task, reference, idea, other)." ), inputSchema={ "type": "object", "properties": { "limit": { "type": "integer", "description": "Max thoughts to return (default 20).", "default": 20, }, "type_filter": { "type": "string", "description": "Filter by type: insight | person_note | task | reference | idea | other", "enum": ["insight", "person_note", "task", "reference", "idea", "other"], }, }, }, ), Tool( name="get_stats", description=( "Get an overview of your 2nd Brain: total thought count, " "breakdown by type, and most recent capture date." ), inputSchema={"type": "object", "properties": {}}, ), Tool( name="capture_thought", description=( "Save a new thought to your 2nd Brain. " "The thought is automatically embedded and classified. " "Use this from any AI client to capture without switching to Telegram." ), inputSchema={ "type": "object", "properties": { "content": { "type": "string", "description": "The thought to capture — write it naturally.", }, }, "required": ["content"], }, ), ] @_server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: import json async def _fail(msg: str) -> list[TextContent]: return [TextContent(type="text", text=f"Error: {msg}")] try: from .brain.database import get_pool if get_pool() is None: return await _fail("Brain DB not available — check BRAIN_DB_URL in .env") user_id = _mcp_user_id.get() if name == "search_thoughts": from .brain.search import semantic_search results = await semantic_search( arguments["query"], threshold=float(arguments.get("threshold", 0.7)), limit=int(arguments.get("limit", 10)), user_id=user_id, ) if not results: return [TextContent(type="text", text="No matching thoughts found.")] lines = [f"Found {len(results)} thought(s):\n"] for r in results: meta = r["metadata"] tags = ", ".join(meta.get("tags", [])) lines.append( f"[{r['created_at'][:10]}] ({meta.get('type', '?')}" + (f" — {tags}" if tags else "") + f") similarity={r['similarity']}\n{r['content']}\n" ) return [TextContent(type="text", text="\n".join(lines))] elif name == "browse_recent": from .brain.database import browse_thoughts results = await browse_thoughts( limit=int(arguments.get("limit", 20)), type_filter=arguments.get("type_filter"), user_id=user_id, ) if not results: return [TextContent(type="text", text="No thoughts captured yet.")] lines = [f"{len(results)} recent thought(s):\n"] for r in results: meta = r["metadata"] tags = ", ".join(meta.get("tags", [])) lines.append( f"[{r['created_at'][:10]}] ({meta.get('type', '?')}" + (f" — {tags}" if tags else "") + f")\n{r['content']}\n" ) return [TextContent(type="text", text="\n".join(lines))] elif name == "get_stats": from .brain.database import get_stats stats = await get_stats(user_id=user_id) lines = [f"Total thoughts: {stats['total']}"] if stats["most_recent"]: lines.append(f"Most recent: {stats['most_recent'][:10]}") lines.append("\nBy type:") for entry in stats["by_type"]: lines.append(f" {entry['type']}: {entry['count']}") return [TextContent(type="text", text="\n".join(lines))] elif name == "capture_thought": from .brain.ingest import ingest_thought result = await ingest_thought(arguments["content"], user_id=user_id) return [TextContent(type="text", text=result["confirmation"])] else: return await _fail(f"Unknown tool: {name}") except Exception as e: logger.error("MCP tool error (%s): %s", name, e) return await _fail(str(e)) # ── Streamable HTTP transport and routing ───────────────────────────────────── def create_mcp_app(): """ Return a raw ASGI app that handles all /brain-mcp requests. Uses Streamable HTTP transport (modern MCP protocol) which handles both GET (SSE stream) and POST (JSON) requests at a single /sse endpoint. Must be mounted as a sub-app (app.mount("/brain-mcp", create_mcp_app())) so handle_request can write directly to the ASGI send channel without Starlette trying to send a second response afterwards. """ async def handle_mcp(scope, receive, send): if scope["type"] != "http": return request = Request(scope, receive, send) user_id = await _resolve_key(request) if user_id is None: response = Response("Unauthorized", status_code=401) await response(scope, receive, send) return token = _mcp_user_id.set(user_id) try: await _session_manager.handle_request(scope, receive, send) finally: _mcp_user_id.reset(token) return handle_mcp