Files
oai-web/server/security.py
2026-04-08 12:43:24 +02:00

171 lines
6.6 KiB
Python

"""
security.py — Hard-coded security constants and async enforcement functions.
IMPORTANT: The whitelists here are CODE, not config.
Changing them requires editing this file and restarting the server.
This is intentional — it prevents the agent from being tricked into
expanding its reach via prompt injection or UI manipulation.
"""
from __future__ import annotations
import re
from pathlib import Path
# ─── Enforcement functions (async — all use async DB stores) ──────────────────
class SecurityError(Exception):
"""Raised when a security check fails. Always caught by the tool dispatcher."""
async def assert_recipient_allowed(address: str) -> None:
"""Raise SecurityError if the email address is not in the DB whitelist."""
from .database import email_whitelist_store
entry = await email_whitelist_store.get(address)
if entry is None:
raise SecurityError(
f"Email recipient '{address}' is not in the allowed list. "
"Add it via Settings → Email Whitelist."
)
async def assert_email_rate_limit(address: str) -> None:
"""Raise SecurityError if the daily send limit for this address is exceeded."""
from .database import email_whitelist_store
allowed, count, limit = await email_whitelist_store.check_rate_limit(address)
if not allowed:
raise SecurityError(
f"Daily send limit reached for '{address}' ({count}/{limit} emails sent today)."
)
async def assert_path_allowed(path: str | Path) -> Path:
"""
Raise SecurityError if the path is outside all sandbox directories.
Resolves symlinks before checking (prevents path traversal).
Returns the resolved Path.
Implicit allow: paths under the calling user's personal folder are always
permitted (set via current_user_folder context var by the agent loop, or
derived from current_user for web-chat sessions).
"""
import os
from pathlib import Path as _Path
# Resolve the raw path first so we can check containment safely
try:
resolved = _Path(os.path.realpath(str(path)))
except Exception as e:
raise SecurityError(f"Invalid path: {e}")
def _is_under(child: _Path, parent: _Path) -> bool:
try:
child.relative_to(parent)
return True
except ValueError:
return False
# --- Implicit allow: calling user's personal folder ---
# 1. Agent context: current_user_folder ContextVar set by agent.py
from .context_vars import current_user_folder as _cuf
_folder = _cuf.get()
if _folder:
user_folder = _Path(os.path.realpath(_folder))
if _is_under(resolved, user_folder):
return resolved
# 2. Web-chat context: current_user ContextVar set by auth middleware
from .context_vars import current_user as _cu
_web_user = _cu.get()
if _web_user and getattr(_web_user, "username", None):
from .database import credential_store
base = await credential_store.get("system:users_base_folder")
if base:
web_folder = _Path(os.path.realpath(os.path.join(base.rstrip("/"), _web_user.username)))
if _is_under(resolved, web_folder):
return resolved
# --- Explicit filesystem whitelist ---
from .database import filesystem_whitelist_store
sandboxes = await filesystem_whitelist_store.list()
if not sandboxes:
raise SecurityError(
"Filesystem access is not configured. Add directories via Settings → Filesystem."
)
try:
allowed, resolved_str = await filesystem_whitelist_store.is_allowed(path)
except ValueError as e:
raise SecurityError(str(e))
if not allowed:
allowed_str = ", ".join(e["path"] for e in sandboxes)
raise SecurityError(
f"Path '{resolved_str}' is outside the allowed directories: {allowed_str}"
)
return Path(resolved_str)
async def assert_domain_tier1(url: str) -> bool:
"""
Return True if the URL's domain is in the Tier 1 whitelist (DB-managed).
Returns False (does NOT raise) — callers decide how to handle Tier 2.
"""
from .database import web_whitelist_store
return await web_whitelist_store.is_allowed(url)
# ─── Prompt injection sanitisation ───────────────────────────────────────────
_INJECTION_PATTERNS = [
re.compile(r"<\s*tool_use\b", re.IGNORECASE),
re.compile(r"<\s*system\b", re.IGNORECASE),
re.compile(r"\bIGNORE\s+(PREVIOUS|ALL|ABOVE)\b", re.IGNORECASE),
re.compile(r"\bFORGET\s+(PREVIOUS|ALL|ABOVE|YOUR)\b", re.IGNORECASE),
re.compile(r"\bNEW\s+INSTRUCTIONS?\b", re.IGNORECASE),
re.compile(r"\bYOU\s+ARE\s+NOW\b", re.IGNORECASE),
re.compile(r"\bACT\s+AS\b", re.IGNORECASE),
re.compile(r"\[SYSTEM\]", re.IGNORECASE),
re.compile(r"<<<.*>>>"),
]
_EXTENDED_INJECTION_PATTERNS = [
re.compile(r"\bDISREGARD\s+(YOUR|ALL|PREVIOUS|PRIOR)\b", re.IGNORECASE),
re.compile(r"\bPRETEND\s+(YOU\s+ARE|TO\s+BE)\b", re.IGNORECASE),
re.compile(r"\bYOUR\s+(NEW\s+)?(PRIMARY\s+)?DIRECTIVE\b", re.IGNORECASE),
re.compile(r"\bSTOP\b.*\bNEW\s+(TASK|INSTRUCTIONS?)\b", re.IGNORECASE),
re.compile(r"\[/?INST\]", re.IGNORECASE),
re.compile(r"<\|im_start\|>|<\|im_end\|>"),
re.compile(r"</?s>"),
re.compile(r"\bJAILBREAK\b", re.IGNORECASE),
re.compile(r"\bDAN\s+MODE\b", re.IGNORECASE),
]
_BASE64_BLOB_PATTERN = re.compile(r"(?:[A-Za-z0-9+/]{40,}={0,2})")
async def sanitize_external_content(text: str, source: str = "external") -> str:
"""
Remove patterns that resemble prompt injection from external content.
When system:security_sanitize_enhanced is enabled, additional extended patterns are also applied.
"""
import logging as _logging
_logger = _logging.getLogger(__name__)
sanitized = text
for pattern in _INJECTION_PATTERNS:
sanitized = pattern.sub(f"[{source}: content redacted]", sanitized)
try:
from .security_screening import is_option_enabled
if await is_option_enabled("system:security_sanitize_enhanced"):
for pattern in _EXTENDED_INJECTION_PATTERNS:
sanitized = pattern.sub(f"[{source}: content redacted]", sanitized)
if _BASE64_BLOB_PATTERN.search(sanitized):
_logger.info(
"sanitize_external_content: base64-like blob detected in %s content "
"(not redacted — may be a legitimate email signature)",
source,
)
except Exception:
pass
return sanitized