""" tools/filesystem_tool.py — Sandboxed filesystem access. All paths are validated against FILESYSTEM_SANDBOX_DIRS before any operation. Symlinks are resolved before checking (prevents traversal attacks). Binary files are rejected — text only. """ from __future__ import annotations import os import shutil from pathlib import Path from ..context_vars import current_user as _current_user_var from ..security import SecurityError, assert_path_allowed, sanitize_external_content from ..security_screening import get_content_limit, is_option_enabled from .base import BaseTool, ToolResult MAX_FILE_SIZE = 500 * 1024 # 500 KB (file size gate — still enforced regardless of truncation option) _DEFAULT_MAX_FILE_CHARS = 20_000 # default char limit when truncation option is enabled MAX_DIR_ENTRIES = 200 # Image extensions and their MIME types — returned as base64 for vision-capable models _IMAGE_MEDIA_TYPES: dict[str, str] = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp", ".tiff": "image/tiff", ".tif": "image/tiff", } class FilesystemTool(BaseTool): name = "filesystem" description = ( "Read and write files in the owner's designated directories. " "Operations: read_file, list_directory, write_file, delete_file, " "create_directory, delete_directory. " "Image files (jpg, png, gif, webp, etc.) are returned as visual content " "that vision-capable models can analyse. " "write_file, delete_file, and delete_directory require user confirmation." ) input_schema = { "type": "object", "properties": { "operation": { "type": "string", "enum": ["read_file", "list_directory", "write_file", "delete_file", "create_directory", "delete_directory"], "description": "The filesystem operation to perform", }, "path": { "type": "string", "description": "Absolute or relative path to the file or directory", }, "content": { "type": "string", "description": "File content for write_file operations", }, }, "required": ["operation", "path"], } requires_confirmation = False # set dynamically per operation allowed_in_scheduled_tasks = True # Operations that require confirmation _CONFIRM_OPS = {"write_file", "delete_file", "delete_directory"} async def execute( self, operation: str, path: str, content: str = "", **kwargs, ) -> ToolResult: # Check operation if operation not in ("read_file", "list_directory", "write_file", "delete_file", "create_directory", "delete_directory"): return ToolResult(success=False, error=f"Unknown operation: {operation!r}") # Sandbox check try: safe_path = await assert_path_allowed(path) except SecurityError as e: return ToolResult(success=False, error=str(e)) # Per-user folder restriction: non-admins may only access their personal folder _user = _current_user_var.get() if _user is not None and not _user.is_admin: from ..database import credential_store base = await credential_store.get("system:users_base_folder") if not base: return ToolResult(success=False, error="Filesystem access is not available for your account.") user_folder = Path(base.rstrip("/")) / _user.username try: resolved = safe_path.resolve() user_folder_resolved = user_folder.resolve() if not str(resolved).startswith(str(user_folder_resolved) + "/") and resolved != user_folder_resolved: return ToolResult(success=False, error="Access denied: path is outside your personal folder.") except Exception: return ToolResult(success=False, error="Access denied: could not verify path.") # Dispatch if operation == "read_file": return await self._read_file(safe_path) elif operation == "list_directory": return self._list_directory(safe_path) elif operation == "write_file": return self._write_file(safe_path, content) elif operation == "delete_file": return self._delete_file(safe_path) elif operation == "create_directory": return self._create_directory(safe_path) elif operation == "delete_directory": return self._delete_directory(safe_path) async def _read_file(self, path: Path) -> ToolResult: if not path.exists(): return ToolResult(success=False, error=f"File not found: {path}") if not path.is_file(): return ToolResult(success=False, error=f"Not a file: {path}") size = path.stat().st_size if size > MAX_FILE_SIZE: return ToolResult( success=False, error=f"File too large ({size:,} bytes). Max: {MAX_FILE_SIZE:,} bytes.", ) # Image files: return as base64 for vision-capable models media_type = _IMAGE_MEDIA_TYPES.get(path.suffix.lower()) if media_type: try: import base64 image_data = base64.b64encode(path.read_bytes()).decode("ascii") except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") return ToolResult( success=True, data={ "path": str(path), "is_image": True, "media_type": media_type, "image_data": image_data, "size_bytes": size, }, ) try: text = path.read_text(encoding="utf-8") except UnicodeDecodeError: return ToolResult(success=False, error="Binary files are not supported. Text files only.") except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") # Apply configurable char limit — truncate rather than reject truncated = False if await is_option_enabled("system:security_truncation_enabled"): max_chars = await get_content_limit("system:security_max_file_chars", _DEFAULT_MAX_FILE_CHARS) total_chars = len(text) if total_chars > max_chars: text = text[:max_chars] text += f"\n\n[File truncated: showing first {max_chars:,} of {total_chars:,} total chars]" truncated = True # Sanitise before returning to agent text = await sanitize_external_content(text, source="file") return ToolResult( success=True, data={"path": str(path), "content": text, "size_bytes": size, "truncated": truncated}, ) def _list_directory(self, path: Path) -> ToolResult: if not path.exists(): return ToolResult(success=False, error=f"Directory not found: {path}") if not path.is_dir(): return ToolResult(success=False, error=f"Not a directory: {path}") try: entries = [] for i, entry in enumerate(sorted(path.iterdir())): if i >= MAX_DIR_ENTRIES: entries.append({"name": f"... ({i} entries truncated)", "type": "truncated"}) break stat = entry.stat() entries.append({ "name": entry.name, "type": "directory" if entry.is_dir() else "file", "size_bytes": stat.st_size if entry.is_file() else None, }) except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") return ToolResult( success=True, data={"path": str(path), "entries": entries, "count": len(entries)}, ) def _write_file(self, path: Path, content: str) -> ToolResult: try: path.parent.mkdir(parents=True, exist_ok=True) # Support base64 data URLs for binary files (e.g. images from image_gen tool) if content.startswith("data:") and ";base64," in content: import base64 as _b64 try: _, b64_data = content.split(",", 1) raw_bytes = _b64.b64decode(b64_data) path.write_bytes(raw_bytes) return ToolResult(success=True, data={"path": str(path), "bytes_written": len(raw_bytes)}) except Exception as e: return ToolResult(success=False, error=f"Failed to decode base64 data: {e}") path.write_text(content, encoding="utf-8") except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") except Exception as e: return ToolResult(success=False, error=f"Write failed: {e}") return ToolResult( success=True, data={"path": str(path), "bytes_written": len(content.encode())}, ) def _delete_file(self, path: Path) -> ToolResult: if not path.exists(): return ToolResult(success=False, error=f"File not found: {path}") if not path.is_file(): return ToolResult(success=False, error="Can only delete files, not directories.") try: path.unlink() except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") except Exception as e: return ToolResult(success=False, error=f"Delete failed: {e}") return ToolResult(success=True, data={"deleted": str(path)}) def _create_directory(self, path: Path) -> ToolResult: try: path.mkdir(parents=True, exist_ok=True) except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") except Exception as e: return ToolResult(success=False, error=f"Create directory failed: {e}") return ToolResult(success=True, data={"created": str(path)}) def _delete_directory(self, path: Path) -> ToolResult: if not path.exists(): return ToolResult(success=False, error=f"Directory not found: {path}") if not path.is_dir(): return ToolResult(success=False, error=f"Not a directory: {path}") try: shutil.rmtree(path) except PermissionError: return ToolResult(success=False, error=f"Permission denied: {path}") except Exception as e: return ToolResult(success=False, error=f"Delete directory failed: {e}") return ToolResult(success=True, data={"deleted": str(path)}) def confirmation_description(self, operation: str = "", path: str = "", **kwargs) -> str: if operation == "delete_file": return f"Permanently delete file: {path}" if operation == "delete_directory": return f"Permanently delete directory and all its contents: {path}" if operation == "write_file": content_preview = kwargs.get("content", "")[:80] return f"Write to file: {path}\nContent preview: {content_preview}..." return f"{operation}: {path}"