277 lines
10 KiB
Python
277 lines
10 KiB
Python
"""
|
|
mcp.py — 2nd Brain MCP server.
|
|
|
|
Exposes four MCP tools over Streamable HTTP transport (the modern MCP protocol),
|
|
mounted on the existing FastAPI app at /brain-mcp. Access is protected by a
|
|
bearer key checked on every request.
|
|
|
|
Connect via:
|
|
Claude Desktop / Claude Code:
|
|
claude mcp add --transport http brain http://your-server:8080/brain-mcp/sse \\
|
|
--header "x-brain-key: YOUR_KEY"
|
|
Any MCP client supporting Streamable HTTP:
|
|
URL: http://your-server:8080/brain-mcp/sse
|
|
|
|
The key can be passed as:
|
|
?key=... query parameter
|
|
x-brain-key: ... request header
|
|
Authorization: Bearer ...
|
|
|
|
Note: _session_manager must be started via its run() context manager in the
|
|
app lifespan (see main.py).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from contextvars import ContextVar
|
|
|
|
from mcp.server import Server
|
|
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
|
|
from mcp.types import TextContent, Tool
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response
|
|
|
|
# Set per-request by handle_mcp; read by call_tool to scope DB queries.
|
|
_mcp_user_id: ContextVar[str | None] = ContextVar("_mcp_user_id", default=None)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── MCP Server definition ─────────────────────────────────────────────────────
|
|
|
|
_server = Server("open-brain")
|
|
|
|
# Session manager — started in main.py lifespan via _session_manager.run()
|
|
_session_manager = StreamableHTTPSessionManager(_server, stateless=True)
|
|
|
|
|
|
async def _resolve_key(request: Request) -> str | None:
|
|
"""Resolve the provided key to a user_id, or None if invalid/missing.
|
|
|
|
Looks up the key in user_settings["brain_mcp_key"] across all users.
|
|
Returns the matching user_id, or None if no match.
|
|
"""
|
|
provided = (
|
|
request.query_params.get("key")
|
|
or request.headers.get("x-brain-key")
|
|
or request.headers.get("authorization", "").removeprefix("Bearer ").strip()
|
|
or ""
|
|
)
|
|
if not provided:
|
|
return None
|
|
try:
|
|
from .database import _pool as _main_pool
|
|
if _main_pool:
|
|
async with _main_pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT user_id FROM user_settings WHERE key='brain_mcp_key' AND value=$1",
|
|
provided,
|
|
)
|
|
if row:
|
|
return str(row["user_id"])
|
|
except Exception:
|
|
logger.warning("Brain key lookup failed", exc_info=True)
|
|
return None
|
|
|
|
|
|
async def _check_key(request: Request) -> bool:
|
|
"""Return True if the request carries a valid per-user brain key."""
|
|
user_id = await _resolve_key(request)
|
|
return user_id is not None
|
|
|
|
|
|
# ── Tool definitions ──────────────────────────────────────────────────────────
|
|
|
|
@_server.list_tools()
|
|
async def list_tools() -> list[Tool]:
|
|
return [
|
|
Tool(
|
|
name="search_thoughts",
|
|
description=(
|
|
"Search your 2nd Brain by meaning (semantic similarity). "
|
|
"Finds thoughts even when exact keywords don't match. "
|
|
"Returns results ranked by relevance."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {
|
|
"type": "string",
|
|
"description": "What to search for — describe it naturally.",
|
|
},
|
|
"threshold": {
|
|
"type": "number",
|
|
"description": "Similarity threshold 0-1 (default 0.7). Lower = broader, more results.",
|
|
"default": 0.7,
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max number of results (default 10).",
|
|
"default": 10,
|
|
},
|
|
},
|
|
"required": ["query"],
|
|
},
|
|
),
|
|
Tool(
|
|
name="browse_recent",
|
|
description=(
|
|
"Browse the most recent thoughts in your 2nd Brain, "
|
|
"optionally filtered by type (insight, person_note, task, reference, idea, other)."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max thoughts to return (default 20).",
|
|
"default": 20,
|
|
},
|
|
"type_filter": {
|
|
"type": "string",
|
|
"description": "Filter by type: insight | person_note | task | reference | idea | other",
|
|
"enum": ["insight", "person_note", "task", "reference", "idea", "other"],
|
|
},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_stats",
|
|
description=(
|
|
"Get an overview of your 2nd Brain: total thought count, "
|
|
"breakdown by type, and most recent capture date."
|
|
),
|
|
inputSchema={"type": "object", "properties": {}},
|
|
),
|
|
Tool(
|
|
name="capture_thought",
|
|
description=(
|
|
"Save a new thought to your 2nd Brain. "
|
|
"The thought is automatically embedded and classified. "
|
|
"Use this from any AI client to capture without switching to Telegram."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {
|
|
"type": "string",
|
|
"description": "The thought to capture — write it naturally.",
|
|
},
|
|
},
|
|
"required": ["content"],
|
|
},
|
|
),
|
|
]
|
|
|
|
|
|
@_server.call_tool()
|
|
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
|
import json
|
|
|
|
async def _fail(msg: str) -> list[TextContent]:
|
|
return [TextContent(type="text", text=f"Error: {msg}")]
|
|
|
|
try:
|
|
from .brain.database import get_pool
|
|
if get_pool() is None:
|
|
return await _fail("Brain DB not available — check BRAIN_DB_URL in .env")
|
|
|
|
user_id = _mcp_user_id.get()
|
|
|
|
if name == "search_thoughts":
|
|
from .brain.search import semantic_search
|
|
results = await semantic_search(
|
|
arguments["query"],
|
|
threshold=float(arguments.get("threshold", 0.7)),
|
|
limit=int(arguments.get("limit", 10)),
|
|
user_id=user_id,
|
|
)
|
|
if not results:
|
|
return [TextContent(type="text", text="No matching thoughts found.")]
|
|
lines = [f"Found {len(results)} thought(s):\n"]
|
|
for r in results:
|
|
meta = r["metadata"]
|
|
tags = ", ".join(meta.get("tags", []))
|
|
lines.append(
|
|
f"[{r['created_at'][:10]}] ({meta.get('type', '?')}"
|
|
+ (f" — {tags}" if tags else "")
|
|
+ f") similarity={r['similarity']}\n{r['content']}\n"
|
|
)
|
|
return [TextContent(type="text", text="\n".join(lines))]
|
|
|
|
elif name == "browse_recent":
|
|
from .brain.database import browse_thoughts
|
|
results = await browse_thoughts(
|
|
limit=int(arguments.get("limit", 20)),
|
|
type_filter=arguments.get("type_filter"),
|
|
user_id=user_id,
|
|
)
|
|
if not results:
|
|
return [TextContent(type="text", text="No thoughts captured yet.")]
|
|
lines = [f"{len(results)} recent thought(s):\n"]
|
|
for r in results:
|
|
meta = r["metadata"]
|
|
tags = ", ".join(meta.get("tags", []))
|
|
lines.append(
|
|
f"[{r['created_at'][:10]}] ({meta.get('type', '?')}"
|
|
+ (f" — {tags}" if tags else "")
|
|
+ f")\n{r['content']}\n"
|
|
)
|
|
return [TextContent(type="text", text="\n".join(lines))]
|
|
|
|
elif name == "get_stats":
|
|
from .brain.database import get_stats
|
|
stats = await get_stats(user_id=user_id)
|
|
lines = [f"Total thoughts: {stats['total']}"]
|
|
if stats["most_recent"]:
|
|
lines.append(f"Most recent: {stats['most_recent'][:10]}")
|
|
lines.append("\nBy type:")
|
|
for entry in stats["by_type"]:
|
|
lines.append(f" {entry['type']}: {entry['count']}")
|
|
return [TextContent(type="text", text="\n".join(lines))]
|
|
|
|
elif name == "capture_thought":
|
|
from .brain.ingest import ingest_thought
|
|
result = await ingest_thought(arguments["content"], user_id=user_id)
|
|
return [TextContent(type="text", text=result["confirmation"])]
|
|
|
|
else:
|
|
return await _fail(f"Unknown tool: {name}")
|
|
|
|
except Exception as e:
|
|
logger.error("MCP tool error (%s): %s", name, e)
|
|
return await _fail(str(e))
|
|
|
|
|
|
# ── Streamable HTTP transport and routing ─────────────────────────────────────
|
|
|
|
def create_mcp_app():
|
|
"""
|
|
Return a raw ASGI app that handles all /brain-mcp requests.
|
|
|
|
Uses Streamable HTTP transport (modern MCP protocol) which handles both
|
|
GET (SSE stream) and POST (JSON) requests at a single /sse endpoint.
|
|
|
|
Must be mounted as a sub-app (app.mount("/brain-mcp", create_mcp_app()))
|
|
so handle_request can write directly to the ASGI send channel without
|
|
Starlette trying to send a second response afterwards.
|
|
"""
|
|
async def handle_mcp(scope, receive, send):
|
|
if scope["type"] != "http":
|
|
return
|
|
request = Request(scope, receive, send)
|
|
user_id = await _resolve_key(request)
|
|
if user_id is None:
|
|
response = Response("Unauthorized", status_code=401)
|
|
await response(scope, receive, send)
|
|
return
|
|
token = _mcp_user_id.set(user_id)
|
|
try:
|
|
await _session_manager.handle_request(scope, receive, send)
|
|
finally:
|
|
_mcp_user_id.reset(token)
|
|
|
|
return handle_mcp
|