Files
oai-web/server/tools/filesystem_tool.py
2026-04-08 12:43:24 +02:00

269 lines
11 KiB
Python

"""
tools/filesystem_tool.py — Sandboxed filesystem access.
All paths are validated against FILESYSTEM_SANDBOX_DIRS before any operation.
Symlinks are resolved before checking (prevents traversal attacks).
Binary files are rejected — text only.
"""
from __future__ import annotations
import os
import shutil
from pathlib import Path
from ..context_vars import current_user as _current_user_var
from ..security import SecurityError, assert_path_allowed, sanitize_external_content
from ..security_screening import get_content_limit, is_option_enabled
from .base import BaseTool, ToolResult
MAX_FILE_SIZE = 500 * 1024 # 500 KB (file size gate — still enforced regardless of truncation option)
_DEFAULT_MAX_FILE_CHARS = 20_000 # default char limit when truncation option is enabled
MAX_DIR_ENTRIES = 200
# Image extensions and their MIME types — returned as base64 for vision-capable models
_IMAGE_MEDIA_TYPES: dict[str, str] = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".webp": "image/webp", ".bmp": "image/bmp",
".tiff": "image/tiff", ".tif": "image/tiff",
}
class FilesystemTool(BaseTool):
name = "filesystem"
description = (
"Read and write files in the owner's designated directories. "
"Operations: read_file, list_directory, write_file, delete_file, "
"create_directory, delete_directory. "
"Image files (jpg, png, gif, webp, etc.) are returned as visual content "
"that vision-capable models can analyse. "
"write_file, delete_file, and delete_directory require user confirmation."
)
input_schema = {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["read_file", "list_directory", "write_file", "delete_file",
"create_directory", "delete_directory"],
"description": "The filesystem operation to perform",
},
"path": {
"type": "string",
"description": "Absolute or relative path to the file or directory",
},
"content": {
"type": "string",
"description": "File content for write_file operations",
},
},
"required": ["operation", "path"],
}
requires_confirmation = False # set dynamically per operation
allowed_in_scheduled_tasks = True
# Operations that require confirmation
_CONFIRM_OPS = {"write_file", "delete_file", "delete_directory"}
async def execute(
self,
operation: str,
path: str,
content: str = "",
**kwargs,
) -> ToolResult:
# Check operation
if operation not in ("read_file", "list_directory", "write_file", "delete_file",
"create_directory", "delete_directory"):
return ToolResult(success=False, error=f"Unknown operation: {operation!r}")
# Sandbox check
try:
safe_path = await assert_path_allowed(path)
except SecurityError as e:
return ToolResult(success=False, error=str(e))
# Per-user folder restriction: non-admins may only access their personal folder
_user = _current_user_var.get()
if _user is not None and not _user.is_admin:
from ..database import credential_store
base = await credential_store.get("system:users_base_folder")
if not base:
return ToolResult(success=False, error="Filesystem access is not available for your account.")
user_folder = Path(base.rstrip("/")) / _user.username
try:
resolved = safe_path.resolve()
user_folder_resolved = user_folder.resolve()
if not str(resolved).startswith(str(user_folder_resolved) + "/") and resolved != user_folder_resolved:
return ToolResult(success=False, error="Access denied: path is outside your personal folder.")
except Exception:
return ToolResult(success=False, error="Access denied: could not verify path.")
# Dispatch
if operation == "read_file":
return await self._read_file(safe_path)
elif operation == "list_directory":
return self._list_directory(safe_path)
elif operation == "write_file":
return self._write_file(safe_path, content)
elif operation == "delete_file":
return self._delete_file(safe_path)
elif operation == "create_directory":
return self._create_directory(safe_path)
elif operation == "delete_directory":
return self._delete_directory(safe_path)
async def _read_file(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, error=f"File not found: {path}")
if not path.is_file():
return ToolResult(success=False, error=f"Not a file: {path}")
size = path.stat().st_size
if size > MAX_FILE_SIZE:
return ToolResult(
success=False,
error=f"File too large ({size:,} bytes). Max: {MAX_FILE_SIZE:,} bytes.",
)
# Image files: return as base64 for vision-capable models
media_type = _IMAGE_MEDIA_TYPES.get(path.suffix.lower())
if media_type:
try:
import base64
image_data = base64.b64encode(path.read_bytes()).decode("ascii")
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
return ToolResult(
success=True,
data={
"path": str(path),
"is_image": True,
"media_type": media_type,
"image_data": image_data,
"size_bytes": size,
},
)
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return ToolResult(success=False, error="Binary files are not supported. Text files only.")
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
# Apply configurable char limit — truncate rather than reject
truncated = False
if await is_option_enabled("system:security_truncation_enabled"):
max_chars = await get_content_limit("system:security_max_file_chars", _DEFAULT_MAX_FILE_CHARS)
total_chars = len(text)
if total_chars > max_chars:
text = text[:max_chars]
text += f"\n\n[File truncated: showing first {max_chars:,} of {total_chars:,} total chars]"
truncated = True
# Sanitise before returning to agent
text = await sanitize_external_content(text, source="file")
return ToolResult(
success=True,
data={"path": str(path), "content": text, "size_bytes": size, "truncated": truncated},
)
def _list_directory(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, error=f"Directory not found: {path}")
if not path.is_dir():
return ToolResult(success=False, error=f"Not a directory: {path}")
try:
entries = []
for i, entry in enumerate(sorted(path.iterdir())):
if i >= MAX_DIR_ENTRIES:
entries.append({"name": f"... ({i} entries truncated)", "type": "truncated"})
break
stat = entry.stat()
entries.append({
"name": entry.name,
"type": "directory" if entry.is_dir() else "file",
"size_bytes": stat.st_size if entry.is_file() else None,
})
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
return ToolResult(
success=True,
data={"path": str(path), "entries": entries, "count": len(entries)},
)
def _write_file(self, path: Path, content: str) -> ToolResult:
try:
path.parent.mkdir(parents=True, exist_ok=True)
# Support base64 data URLs for binary files (e.g. images from image_gen tool)
if content.startswith("data:") and ";base64," in content:
import base64 as _b64
try:
_, b64_data = content.split(",", 1)
raw_bytes = _b64.b64decode(b64_data)
path.write_bytes(raw_bytes)
return ToolResult(success=True, data={"path": str(path), "bytes_written": len(raw_bytes)})
except Exception as e:
return ToolResult(success=False, error=f"Failed to decode base64 data: {e}")
path.write_text(content, encoding="utf-8")
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
except Exception as e:
return ToolResult(success=False, error=f"Write failed: {e}")
return ToolResult(
success=True,
data={"path": str(path), "bytes_written": len(content.encode())},
)
def _delete_file(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, error=f"File not found: {path}")
if not path.is_file():
return ToolResult(success=False, error="Can only delete files, not directories.")
try:
path.unlink()
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
except Exception as e:
return ToolResult(success=False, error=f"Delete failed: {e}")
return ToolResult(success=True, data={"deleted": str(path)})
def _create_directory(self, path: Path) -> ToolResult:
try:
path.mkdir(parents=True, exist_ok=True)
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
except Exception as e:
return ToolResult(success=False, error=f"Create directory failed: {e}")
return ToolResult(success=True, data={"created": str(path)})
def _delete_directory(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, error=f"Directory not found: {path}")
if not path.is_dir():
return ToolResult(success=False, error=f"Not a directory: {path}")
try:
shutil.rmtree(path)
except PermissionError:
return ToolResult(success=False, error=f"Permission denied: {path}")
except Exception as e:
return ToolResult(success=False, error=f"Delete directory failed: {e}")
return ToolResult(success=True, data={"deleted": str(path)})
def confirmation_description(self, operation: str = "", path: str = "", **kwargs) -> str:
if operation == "delete_file":
return f"Permanently delete file: {path}"
if operation == "delete_directory":
return f"Permanently delete directory and all its contents: {path}"
if operation == "write_file":
content_preview = kwargs.get("content", "")[:80]
return f"Write to file: {path}\nContent preview: {content_preview}..."
return f"{operation}: {path}"