Files
oai/oai/mcp/manager.py
Rune Olsen b0cf88704e 2.1 (#2)
Final release of version 2.1.

Headlights:

### Core Features
- 🤖 Interactive chat with 300+ AI models via OpenRouter
- 🔍 Model selection with search and filtering
- 💾 Conversation save/load/export (Markdown, JSON, HTML)
- 📎 File attachments (images, PDFs, code files)
- 💰 Real-time cost tracking and credit monitoring
- 🎨 Rich terminal UI with syntax highlighting
- 📝 Persistent command history with search (Ctrl+R)
- 🌐 Online mode (web search capabilities)
- 🧠 Conversation memory toggle

### MCP Integration
- 🔧 **File Mode**: AI can read, search, and list local files
  - Automatic .gitignore filtering
  - Virtual environment exclusion
  - Large file handling (auto-truncates >50KB)

- ✍️ **Write Mode**: AI can modify files with permission
  - Create, edit, delete files
  - Move, copy, organize files
  - Always requires explicit opt-in

- 🗄️ **Database Mode**: AI can query SQLite databases
  - Read-only access (safe)
  - Schema inspection
  - Full SQL query support

Reviewed-on: #2
Co-authored-by: Rune Olsen <rune@rune.pm>
Co-committed-by: Rune Olsen <rune@rune.pm>
2026-02-03 09:02:44 +01:00

1366 lines
48 KiB
Python

"""
MCP Manager for oAI.
This module provides the high-level interface for managing MCP operations,
including server lifecycle, mode switching, folder/database management,
and tool schema generation.
"""
import asyncio
import datetime
import json
from pathlib import Path
from typing import Optional, List, Dict, Any
from oai.constants import MAX_TOOL_LOOPS
from oai.config.database import get_database
from oai.mcp.platform import CrossPlatformMCPConfig
from oai.mcp.server import MCPFilesystemServer
from oai.utils.logging import get_logger
class MCPManager:
"""
Manage MCP server lifecycle, tool calls, and mode switching.
This class provides the main interface for MCP functionality including:
- Enabling/disabling the MCP server
- Managing allowed folders and databases
- Switching between file and database modes
- Generating tool schemas for API requests
- Executing tool calls
Attributes:
enabled: Whether MCP is currently enabled
write_enabled: Whether write mode is enabled (non-persistent)
mode: Current mode ('files' or 'database')
selected_db_index: Index of selected database (if in database mode)
server: The MCPFilesystemServer instance
allowed_folders: List of allowed folder paths
databases: List of registered databases
config: Platform configuration
session_start_time: When the current session started
"""
def __init__(self):
"""Initialize the MCP manager."""
self.enabled = False
self.write_enabled = False # Off by default, resets each session
self.mode = "files"
self.selected_db_index: Optional[int] = None
self.server: Optional[MCPFilesystemServer] = None
self.allowed_folders: List[Path] = []
self.databases: List[Dict[str, Any]] = []
self.config = CrossPlatformMCPConfig()
self.session_start_time: Optional[datetime.datetime] = None
# Load persisted data
self._load_folders()
self._load_databases()
get_logger().info("MCP Manager initialized")
# =========================================================================
# PERSISTENCE
# =========================================================================
def _load_folders(self) -> None:
"""Load allowed folders from database."""
logger = get_logger()
db = get_database()
folders_json = db.get_mcp_config("allowed_folders")
if folders_json:
try:
folder_paths = json.loads(folders_json)
self.allowed_folders = [
self.config.normalize_path(p) for p in folder_paths
]
logger.info(f"Loaded {len(self.allowed_folders)} folders from config")
except Exception as e:
logger.error(f"Error loading MCP folders: {e}")
self.allowed_folders = []
def _save_folders(self) -> None:
"""Save allowed folders to database."""
db = get_database()
folder_paths = [str(p) for p in self.allowed_folders]
db.set_mcp_config("allowed_folders", json.dumps(folder_paths))
get_logger().info(f"Saved {len(self.allowed_folders)} folders to config")
def _load_databases(self) -> None:
"""Load databases from database."""
self.databases = get_database().get_mcp_databases()
get_logger().info(f"Loaded {len(self.databases)} databases from config")
# =========================================================================
# ENABLE/DISABLE
# =========================================================================
def enable(self) -> Dict[str, Any]:
"""
Enable MCP server.
Returns:
Dictionary containing:
- success: Whether operation succeeded
- folder_count: Number of allowed folders
- database_count: Number of registered databases
- message: Status message
- error: Error message if failed
"""
logger = get_logger()
if self.enabled:
return {
"success": False,
"error": "MCP is already enabled"
}
try:
self.server = MCPFilesystemServer(self.allowed_folders)
self.enabled = True
self.session_start_time = datetime.datetime.now()
get_database().set_mcp_config("mcp_enabled", "true")
logger.info("MCP Filesystem Server enabled")
return {
"success": True,
"folder_count": len(self.allowed_folders),
"database_count": len(self.databases),
"message": "MCP Filesystem Server started successfully"
}
except Exception as e:
logger.error(f"Error enabling MCP: {e}")
return {
"success": False,
"error": str(e)
}
def disable(self) -> Dict[str, Any]:
"""
Disable MCP server.
Returns:
Dictionary containing:
- success: Whether operation succeeded
- message: Status message
- error: Error message if failed
"""
logger = get_logger()
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled"
}
try:
self.server = None
self.enabled = False
self.session_start_time = None
self.mode = "files"
self.selected_db_index = None
get_database().set_mcp_config("mcp_enabled", "false")
logger.info("MCP Filesystem Server disabled")
return {
"success": True,
"message": "MCP Filesystem Server stopped"
}
except Exception as e:
logger.error(f"Error disabling MCP: {e}")
return {
"success": False,
"error": str(e)
}
# =========================================================================
# WRITE MODE
# =========================================================================
def enable_write(self) -> bool:
"""
Enable write mode.
This method should be called after user confirmation in the UI.
Returns:
True if write mode was enabled, False otherwise
"""
logger = get_logger()
if not self.enabled:
logger.warning("Attempted to enable write mode without MCP enabled")
return False
self.write_enabled = True
logger.info("MCP write mode enabled by user")
get_database().log_mcp_stat("write_mode_enabled", "", True)
return True
def disable_write(self) -> None:
"""Disable write mode."""
if self.write_enabled:
self.write_enabled = False
get_logger().info("MCP write mode disabled")
get_database().log_mcp_stat("write_mode_disabled", "", True)
# =========================================================================
# MODE SWITCHING
# =========================================================================
def switch_mode(
self,
new_mode: str,
db_index: Optional[int] = None
) -> Dict[str, Any]:
"""
Switch between files and database mode.
Args:
new_mode: 'files' or 'database'
db_index: Database number (1-based) if switching to database mode
Returns:
Dictionary containing:
- success: Whether operation succeeded
- mode: Current mode
- message: Status message
- tools: Available tools in this mode
- database: Database info (if database mode)
"""
logger = get_logger()
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled. Use /mcp on first"
}
if new_mode == "files":
self.mode = "files"
self.selected_db_index = None
logger.info("Switched to file mode")
return {
"success": True,
"mode": "files",
"message": "Switched to file mode",
"tools": ["read_file", "list_directory", "search_files"]
}
elif new_mode == "database":
if db_index is None:
return {
"success": False,
"error": "Database index required. Use /mcp db <number>"
}
if db_index < 1 or db_index > len(self.databases):
return {
"success": False,
"error": f"Invalid database number. Use 1-{len(self.databases)}"
}
self.mode = "database"
self.selected_db_index = db_index - 1 # Convert to 0-based
db = self.databases[self.selected_db_index]
logger.info(f"Switched to database mode: {db['name']}")
return {
"success": True,
"mode": "database",
"database": db,
"message": f"Switched to database #{db_index}: {db['name']}",
"tools": ["inspect_database", "search_database", "query_database"]
}
else:
return {
"success": False,
"error": f"Invalid mode: {new_mode}"
}
# =========================================================================
# FOLDER MANAGEMENT
# =========================================================================
def add_folder(self, folder_path: str) -> Dict[str, Any]:
"""
Add a folder to the allowed list.
Args:
folder_path: Path to the folder
Returns:
Dictionary containing:
- success: Whether operation succeeded
- path: Normalized path
- stats: Folder statistics
- total_folders: Total number of allowed folders
- warning: Optional warning message
"""
logger = get_logger()
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled. Use /mcp on first"
}
try:
path = self.config.normalize_path(folder_path)
# Validation
if not path.exists():
return {
"success": False,
"error": f"Directory does not exist: {path}"
}
if not path.is_dir():
return {
"success": False,
"error": f"Not a directory: {path}"
}
if self.config.is_system_directory(path):
return {
"success": False,
"error": f"Cannot add system directory: {path}"
}
if path in self.allowed_folders:
return {
"success": False,
"error": f"Folder already in allowed list: {path}"
}
# Check for nested paths
parent_folder = None
for existing in self.allowed_folders:
try:
path.relative_to(existing)
parent_folder = existing
break
except ValueError:
continue
# Get folder stats
stats = self.config.get_folder_stats(path)
# Add folder
self.allowed_folders.append(path)
self._save_folders()
# Update server and reload gitignores
if self.server:
self.server.allowed_folders = self.allowed_folders
self.server.reload_gitignores()
logger.info(f"Added folder to MCP: {path}")
result = {
"success": True,
"path": str(path),
"stats": stats,
"total_folders": len(self.allowed_folders)
}
if parent_folder:
result["warning"] = f"Note: {parent_folder} is already allowed (parent folder)"
return result
except Exception as e:
logger.error(f"Error adding folder: {e}")
return {
"success": False,
"error": str(e)
}
def remove_folder(self, folder_ref: str) -> Dict[str, Any]:
"""
Remove a folder from the allowed list.
Args:
folder_ref: Folder path or number (1-based)
Returns:
Dictionary containing:
- success: Whether operation succeeded
- path: Removed folder path
- total_folders: Remaining folder count
- warning: Optional warning message
"""
logger = get_logger()
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled. Use /mcp on first"
}
try:
# Check if it's a number
if folder_ref.isdigit():
index = int(folder_ref) - 1
if 0 <= index < len(self.allowed_folders):
path = self.allowed_folders[index]
else:
return {
"success": False,
"error": f"Invalid folder number: {folder_ref}"
}
else:
# Treat as path
path = self.config.normalize_path(folder_ref)
if path not in self.allowed_folders:
return {
"success": False,
"error": f"Folder not in allowed list: {path}"
}
# Remove folder
self.allowed_folders.remove(path)
self._save_folders()
# Update server and reload gitignores
if self.server:
self.server.allowed_folders = self.allowed_folders
self.server.reload_gitignores()
logger.info(f"Removed folder from MCP: {path}")
result = {
"success": True,
"path": str(path),
"total_folders": len(self.allowed_folders)
}
if len(self.allowed_folders) == 0:
result["warning"] = "No allowed folders remaining. Add one with /mcp add <folder>"
return result
except Exception as e:
logger.error(f"Error removing folder: {e}")
return {
"success": False,
"error": str(e)
}
def list_folders(self) -> Dict[str, Any]:
"""
List all allowed folders with stats.
Returns:
Dictionary containing:
- success: Whether operation succeeded
- folders: List of folder info
- total_folders: Total folder count
- total_files: Total file count across folders
- total_size_mb: Total size in MB
"""
try:
folders_info = []
total_files = 0
total_size = 0
for idx, folder in enumerate(self.allowed_folders, 1):
stats = self.config.get_folder_stats(folder)
folder_info = {
"number": idx,
"path": str(folder),
"exists": stats.get("exists", False)
}
if stats.get("exists"):
folder_info["file_count"] = stats["file_count"]
folder_info["size_mb"] = stats["size_mb"]
total_files += stats["file_count"]
total_size += stats["total_size"]
folders_info.append(folder_info)
return {
"success": True,
"folders": folders_info,
"total_folders": len(self.allowed_folders),
"total_files": total_files,
"total_size_mb": total_size / (1024 * 1024)
}
except Exception as e:
get_logger().error(f"Error listing folders: {e}")
return {
"success": False,
"error": str(e)
}
# =========================================================================
# DATABASE MANAGEMENT
# =========================================================================
def add_database(self, db_path: str) -> Dict[str, Any]:
"""
Add a SQLite database.
Args:
db_path: Path to the database file
Returns:
Dictionary containing:
- success: Whether operation succeeded
- database: Database info
- number: Database number
- message: Status message
"""
import sqlite3
logger = get_logger()
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled. Use /mcp on first"
}
try:
path = Path(db_path).resolve()
# Validation
if not path.exists():
return {
"success": False,
"error": f"Database file not found: {path}"
}
if not path.is_file():
return {
"success": False,
"error": f"Not a file: {path}"
}
# Check if already added
if any(db["path"] == str(path) for db in self.databases):
return {
"success": False,
"error": f"Database already added: {path.name}"
}
# Validate it's a SQLite database
try:
conn = sqlite3.connect(f"file:{path}?mode=ro", uri=True)
cursor = conn.cursor()
# Get tables
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = [row[0] for row in cursor.fetchall()]
conn.close()
except sqlite3.DatabaseError as e:
return {
"success": False,
"error": f"Not a valid SQLite database: {e}"
}
# Get file size
db_size = path.stat().st_size
# Create database entry
db_info = {
"path": str(path),
"name": path.name,
"size": db_size,
"tables": tables,
"added": datetime.datetime.now().isoformat()
}
# Save to config
db_id = get_database().add_mcp_database(db_info)
db_info["id"] = db_id
# Add to list
self.databases.append(db_info)
logger.info(f"Added database: {path.name}")
return {
"success": True,
"database": db_info,
"number": len(self.databases),
"message": f"Added database #{len(self.databases)}: {path.name}"
}
except Exception as e:
logger.error(f"Error adding database: {e}")
return {
"success": False,
"error": str(e)
}
def remove_database(self, db_ref: str) -> Dict[str, Any]:
"""
Remove a database.
Args:
db_ref: Database number (1-based) or path
Returns:
Dictionary containing:
- success: Whether operation succeeded
- database: Removed database info
- message: Status message
- warning: Optional warning message
"""
logger = get_logger()
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled"
}
try:
# Check if it's a number
if db_ref.isdigit():
index = int(db_ref) - 1
if 0 <= index < len(self.databases):
db = self.databases[index]
else:
return {
"success": False,
"error": f"Invalid database number: {db_ref}"
}
else:
# Treat as path
path = str(Path(db_ref).resolve())
db = next((d for d in self.databases if d["path"] == path), None)
if not db:
return {
"success": False,
"error": f"Database not found: {db_ref}"
}
index = self.databases.index(db)
# If currently selected, deselect
if self.mode == "database" and self.selected_db_index == index:
self.mode = "files"
self.selected_db_index = None
# Remove from config
get_database().remove_mcp_database(db["path"])
# Remove from list
self.databases.pop(index)
logger.info(f"Removed database: {db['name']}")
result = {
"success": True,
"database": db,
"message": f"Removed database: {db['name']}"
}
if len(self.databases) == 0:
result["warning"] = "No databases remaining. Add one with /mcp add db <path>"
return result
except Exception as e:
logger.error(f"Error removing database: {e}")
return {
"success": False,
"error": str(e)
}
def list_databases(self) -> Dict[str, Any]:
"""
List all databases.
Returns:
Dictionary containing:
- success: Whether operation succeeded
- databases: List of database info
- count: Number of databases
"""
try:
db_list = []
for idx, db in enumerate(self.databases, 1):
db_info = {
"number": idx,
"name": db["name"],
"path": db["path"],
"size_mb": db["size"] / (1024 * 1024),
"tables": db["tables"],
"table_count": len(db["tables"]),
"added": db["added"]
}
# Check if file still exists
if not Path(db["path"]).exists():
db_info["warning"] = "File not found"
db_list.append(db_info)
return {
"success": True,
"databases": db_list,
"count": len(db_list)
}
except Exception as e:
get_logger().error(f"Error listing databases: {e}")
return {
"success": False,
"error": str(e)
}
# =========================================================================
# GITIGNORE
# =========================================================================
def toggle_gitignore(self, enabled: bool) -> Dict[str, Any]:
"""
Toggle .gitignore filtering.
Args:
enabled: Whether to enable gitignore filtering
Returns:
Dictionary containing:
- success: Whether operation succeeded
- message: Status message
- pattern_count: Number of patterns (if enabled)
"""
if not self.enabled:
return {
"success": False,
"error": "MCP is not enabled"
}
if not self.server:
return {
"success": False,
"error": "MCP server not running"
}
self.server.respect_gitignore = enabled
status = "enabled" if enabled else "disabled"
get_logger().info(f".gitignore filtering {status}")
return {
"success": True,
"message": f".gitignore filtering {status}",
"pattern_count": self.server.gitignore_parser.pattern_count if enabled else 0
}
# =========================================================================
# STATUS
# =========================================================================
def get_status(self) -> Dict[str, Any]:
"""
Get comprehensive MCP status.
Returns:
Dictionary containing full MCP status information
"""
try:
stats = get_database().get_mcp_stats()
uptime = None
if self.session_start_time:
delta = datetime.datetime.now() - self.session_start_time
hours = delta.seconds // 3600
minutes = (delta.seconds % 3600) // 60
uptime = f"{hours}h {minutes}m" if hours > 0 else f"{minutes}m"
folder_info = self.list_folders()
gitignore_status = "enabled" if (
self.server and self.server.respect_gitignore
) else "disabled"
gitignore_patterns = (
self.server.gitignore_parser.pattern_count if self.server else 0
)
# Current mode info
mode_info = {
"mode": self.mode,
"mode_display": (
"Files" if self.mode == "files"
else f"DB #{self.selected_db_index + 1}"
)
}
if self.mode == "database" and self.selected_db_index is not None:
mode_info["database"] = self.databases[self.selected_db_index]
return {
"success": True,
"enabled": self.enabled,
"write_enabled": self.write_enabled,
"uptime": uptime,
"mode_info": mode_info,
"folder_count": len(self.allowed_folders),
"database_count": len(self.databases),
"total_files": folder_info.get("total_files", 0),
"total_size_mb": folder_info.get("total_size_mb", 0),
"stats": stats,
"tools_available": self._get_current_tools(),
"gitignore_status": gitignore_status,
"gitignore_patterns": gitignore_patterns
}
except Exception as e:
get_logger().error(f"Error getting MCP status: {e}")
return {
"success": False,
"error": str(e)
}
def _get_current_tools(self) -> List[str]:
"""Get tools available in current mode."""
if not self.enabled:
return []
if self.mode == "files":
tools = ["read_file", "list_directory", "search_files"]
if self.write_enabled:
tools.extend([
"write_file", "edit_file", "delete_file",
"create_directory", "move_file", "copy_file"
])
return tools
elif self.mode == "database":
return ["inspect_database", "search_database", "query_database"]
return []
# =========================================================================
# TOOL EXECUTION
# =========================================================================
async def call_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]:
"""
Call an MCP tool.
Args:
tool_name: Name of the tool to call
**kwargs: Tool arguments
Returns:
Tool execution result
"""
if not self.enabled or not self.server:
return {"error": "MCP is not enabled"}
# Check write permissions for write operations
write_tools = {
"write_file", "edit_file", "delete_file",
"create_directory", "move_file", "copy_file"
}
if tool_name in write_tools and not self.write_enabled:
return {"error": "Write operations disabled. Enable with /mcp write on"}
try:
# File mode tools (read-only)
if tool_name == "read_file":
return await self.server.read_file(kwargs.get("file_path", ""))
elif tool_name == "list_directory":
return await self.server.list_directory(
kwargs.get("dir_path", ""),
kwargs.get("recursive", False)
)
elif tool_name == "search_files":
return await self.server.search_files(
kwargs.get("pattern", ""),
kwargs.get("search_path"),
kwargs.get("content_search", False)
)
# Database mode tools
elif tool_name == "inspect_database":
if self.mode != "database" or self.selected_db_index is None:
return {"error": "Not in database mode. Use /mcp db <number> first"}
db = self.databases[self.selected_db_index]
return await self.server.inspect_database(
db["path"],
kwargs.get("table_name")
)
elif tool_name == "search_database":
if self.mode != "database" or self.selected_db_index is None:
return {"error": "Not in database mode. Use /mcp db <number> first"}
db = self.databases[self.selected_db_index]
return await self.server.search_database(
db["path"],
kwargs.get("search_term", ""),
kwargs.get("table_name"),
kwargs.get("column_name")
)
elif tool_name == "query_database":
if self.mode != "database" or self.selected_db_index is None:
return {"error": "Not in database mode. Use /mcp db <number> first"}
db = self.databases[self.selected_db_index]
return await self.server.query_database(
db["path"],
kwargs.get("query", ""),
kwargs.get("limit")
)
# Write mode tools
elif tool_name == "write_file":
return await self.server.write_file(
kwargs.get("file_path", ""),
kwargs.get("content", "")
)
elif tool_name == "edit_file":
return await self.server.edit_file(
kwargs.get("file_path", ""),
kwargs.get("old_text", ""),
kwargs.get("new_text", "")
)
elif tool_name == "delete_file":
return await self.server.delete_file(
kwargs.get("file_path", ""),
kwargs.get("reason", "No reason provided"),
kwargs.get("confirm_callback")
)
elif tool_name == "create_directory":
return await self.server.create_directory(
kwargs.get("dir_path", "")
)
elif tool_name == "move_file":
return await self.server.move_file(
kwargs.get("source_path", ""),
kwargs.get("dest_path", "")
)
elif tool_name == "copy_file":
return await self.server.copy_file(
kwargs.get("source_path", ""),
kwargs.get("dest_path", "")
)
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
get_logger().error(f"Error calling MCP tool {tool_name}: {e}")
return {"error": str(e)}
# =========================================================================
# TOOL SCHEMA GENERATION
# =========================================================================
def get_tools_schema(self) -> List[Dict[str, Any]]:
"""
Get MCP tools as OpenAI function calling schema.
Returns the appropriate schema based on current mode.
Returns:
List of tool definitions for API request
"""
if not self.enabled:
return []
if self.mode == "files":
return self._get_file_tools_schema()
elif self.mode == "database":
return self._get_database_tools_schema()
return []
def _get_file_tools_schema(self) -> List[Dict[str, Any]]:
"""Get file mode tools schema."""
if len(self.allowed_folders) == 0:
return []
allowed_dirs_str = ", ".join(str(f) for f in self.allowed_folders)
# Base read-only tools
tools = [
{
"type": "function",
"function": {
"name": "search_files",
"description": (
f"Search for files in the user's local filesystem. "
f"Allowed directories: {allowed_dirs_str}. "
"Can search by filename pattern (e.g., '*.py' for Python files) "
"or search within file contents. Automatically respects .gitignore "
"patterns and excludes virtual environments."
),
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": (
"Search pattern. For filename search, use glob patterns "
"like '*.py', '*.txt', 'report*'. For content search, "
"use plain text to search for."
)
},
"content_search": {
"type": "boolean",
"description": (
"If true, searches inside file contents (SLOW). "
"If false, searches only filenames (FAST). Default is false."
),
"default": False
},
"search_path": {
"type": "string",
"description": (
"Optional: specific directory to search in. "
"If not provided, searches all allowed directories."
),
}
},
"required": ["pattern"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": (
"Read the complete contents of a text file. "
"Only works for files within allowed directories. "
"Maximum file size: 10MB. Files larger than 50KB are automatically truncated. "
"Respects .gitignore patterns."
),
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": (
"Full path to the file to read "
"(e.g., /Users/username/Documents/report.txt)"
)
}
},
"required": ["file_path"]
}
}
},
{
"type": "function",
"function": {
"name": "list_directory",
"description": (
"List all files and subdirectories in a directory. "
"Automatically filters out virtual environments, build artifacts, "
"and .gitignore patterns. Limited to 1000 items."
),
"parameters": {
"type": "object",
"properties": {
"dir_path": {
"type": "string",
"description": (
"Directory path to list "
"(e.g., ~/Documents or /Users/username/Projects)"
)
},
"recursive": {
"type": "boolean",
"description": (
"If true, lists files in subdirectories recursively. "
"Default is true. WARNING: Can be very large."
),
"default": True
}
},
"required": ["dir_path"]
}
}
}
]
# Add write tools if write mode is enabled
if self.write_enabled:
tools.extend(self._get_write_tools_schema(allowed_dirs_str))
return tools
def _get_write_tools_schema(self, allowed_dirs_str: str) -> List[Dict[str, Any]]:
"""Get write mode tools schema."""
return [
{
"type": "function",
"function": {
"name": "write_file",
"description": (
f"Create or overwrite a file with content. "
f"Allowed directories: {allowed_dirs_str}. "
"Automatically creates parent directories if needed."
),
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Full path to the file to create or overwrite"
},
"content": {
"type": "string",
"description": "The complete content to write to the file"
}
},
"required": ["file_path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "edit_file",
"description": (
"Make targeted edits by finding and replacing specific text. "
"The old_text must match exactly and appear only once in the file."
),
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Full path to the file to edit"
},
"old_text": {
"type": "string",
"description": (
"The exact text to find and replace. "
"Must match exactly and appear only once."
)
},
"new_text": {
"type": "string",
"description": "The new text to replace the old text with"
}
},
"required": ["file_path", "old_text", "new_text"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_file",
"description": (
"Delete a file. ALWAYS requires user confirmation. "
"The user will see the file details and your reason before deciding."
),
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Full path to the file to delete"
},
"reason": {
"type": "string",
"description": (
"Clear explanation for why this file should be deleted. "
"The user will see this reason."
)
}
},
"required": ["file_path", "reason"]
}
}
},
{
"type": "function",
"function": {
"name": "create_directory",
"description": (
"Create a new directory (and parent directories if needed). "
"Returns success if directory already exists."
),
"parameters": {
"type": "object",
"properties": {
"dir_path": {
"type": "string",
"description": "Full path to the directory to create"
}
},
"required": ["dir_path"]
}
}
},
{
"type": "function",
"function": {
"name": "move_file",
"description": "Move or rename a file within allowed directories.",
"parameters": {
"type": "object",
"properties": {
"source_path": {
"type": "string",
"description": "Full path to the file to move/rename"
},
"dest_path": {
"type": "string",
"description": "Full path for the new location/name"
}
},
"required": ["source_path", "dest_path"]
}
}
},
{
"type": "function",
"function": {
"name": "copy_file",
"description": "Copy a file to a new location within allowed directories.",
"parameters": {
"type": "object",
"properties": {
"source_path": {
"type": "string",
"description": "Full path to the file to copy"
},
"dest_path": {
"type": "string",
"description": "Full path for the copy destination"
}
},
"required": ["source_path", "dest_path"]
}
}
}
]
def _get_database_tools_schema(self) -> List[Dict[str, Any]]:
"""Get database mode tools schema."""
if self.selected_db_index is None or self.selected_db_index >= len(self.databases):
return []
db = self.databases[self.selected_db_index]
db_name = db["name"]
tables_str = ", ".join(db["tables"])
return [
{
"type": "function",
"function": {
"name": "inspect_database",
"description": (
f"Inspect the schema of the currently selected database ({db_name}). "
f"Can get all tables or details for a specific table. "
f"Available tables: {tables_str}."
),
"parameters": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": (
f"Optional: specific table to inspect. "
f"If not provided, returns info for all tables. "
f"Available: {tables_str}"
)
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "search_database",
"description": (
f"Search for a value across tables in the database ({db_name}). "
f"Performs partial matching across columns. "
f"Limited to {self.server.default_query_limit} results."
),
"parameters": {
"type": "object",
"properties": {
"search_term": {
"type": "string",
"description": "Value to search for (partial match supported)"
},
"table_name": {
"type": "string",
"description": f"Optional: limit search to specific table. Available: {tables_str}"
},
"column_name": {
"type": "string",
"description": "Optional: limit search to specific column"
}
},
"required": ["search_term"]
}
}
},
{
"type": "function",
"function": {
"name": "query_database",
"description": (
f"Execute a read-only SQL query on the database ({db_name}). "
f"Supports SELECT queries including JOINs, subqueries, CTEs. "
f"Maximum {self.server.max_query_results} rows. "
f"Timeout: {self.server.max_query_timeout} seconds. "
"INSERT/UPDATE/DELETE/DROP are blocked."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
f"SQL SELECT query to execute. "
f"Available tables: {tables_str}."
)
},
"limit": {
"type": "integer",
"description": (
f"Optional: max rows to return "
f"(default {self.server.default_query_limit}, "
f"max {self.server.max_query_results})"
)
}
},
"required": ["query"]
}
}
}
]
# Global MCP manager instance
_mcp_manager: Optional[MCPManager] = None
def get_mcp_manager() -> MCPManager:
"""
Get the global MCP manager instance.
Returns:
The shared MCPManager instance
"""
global _mcp_manager
if _mcp_manager is None:
_mcp_manager = MCPManager()
return _mcp_manager