171 lines
6.6 KiB
Python
171 lines
6.6 KiB
Python
"""
|
|
security.py — Hard-coded security constants and async enforcement functions.
|
|
|
|
IMPORTANT: The whitelists here are CODE, not config.
|
|
Changing them requires editing this file and restarting the server.
|
|
This is intentional — it prevents the agent from being tricked into
|
|
expanding its reach via prompt injection or UI manipulation.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from pathlib import Path
|
|
|
|
# ─── Enforcement functions (async — all use async DB stores) ──────────────────
|
|
|
|
class SecurityError(Exception):
|
|
"""Raised when a security check fails. Always caught by the tool dispatcher."""
|
|
|
|
|
|
async def assert_recipient_allowed(address: str) -> None:
|
|
"""Raise SecurityError if the email address is not in the DB whitelist."""
|
|
from .database import email_whitelist_store
|
|
entry = await email_whitelist_store.get(address)
|
|
if entry is None:
|
|
raise SecurityError(
|
|
f"Email recipient '{address}' is not in the allowed list. "
|
|
"Add it via Settings → Email Whitelist."
|
|
)
|
|
|
|
|
|
async def assert_email_rate_limit(address: str) -> None:
|
|
"""Raise SecurityError if the daily send limit for this address is exceeded."""
|
|
from .database import email_whitelist_store
|
|
allowed, count, limit = await email_whitelist_store.check_rate_limit(address)
|
|
if not allowed:
|
|
raise SecurityError(
|
|
f"Daily send limit reached for '{address}' ({count}/{limit} emails sent today)."
|
|
)
|
|
|
|
|
|
async def assert_path_allowed(path: str | Path) -> Path:
|
|
"""
|
|
Raise SecurityError if the path is outside all sandbox directories.
|
|
Resolves symlinks before checking (prevents path traversal).
|
|
Returns the resolved Path.
|
|
|
|
Implicit allow: paths under the calling user's personal folder are always
|
|
permitted (set via current_user_folder context var by the agent loop, or
|
|
derived from current_user for web-chat sessions).
|
|
"""
|
|
import os
|
|
from pathlib import Path as _Path
|
|
|
|
# Resolve the raw path first so we can check containment safely
|
|
try:
|
|
resolved = _Path(os.path.realpath(str(path)))
|
|
except Exception as e:
|
|
raise SecurityError(f"Invalid path: {e}")
|
|
|
|
def _is_under(child: _Path, parent: _Path) -> bool:
|
|
try:
|
|
child.relative_to(parent)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
# --- Implicit allow: calling user's personal folder ---
|
|
# 1. Agent context: current_user_folder ContextVar set by agent.py
|
|
from .context_vars import current_user_folder as _cuf
|
|
_folder = _cuf.get()
|
|
if _folder:
|
|
user_folder = _Path(os.path.realpath(_folder))
|
|
if _is_under(resolved, user_folder):
|
|
return resolved
|
|
|
|
# 2. Web-chat context: current_user ContextVar set by auth middleware
|
|
from .context_vars import current_user as _cu
|
|
_web_user = _cu.get()
|
|
if _web_user and getattr(_web_user, "username", None):
|
|
from .database import credential_store
|
|
base = await credential_store.get("system:users_base_folder")
|
|
if base:
|
|
web_folder = _Path(os.path.realpath(os.path.join(base.rstrip("/"), _web_user.username)))
|
|
if _is_under(resolved, web_folder):
|
|
return resolved
|
|
|
|
# --- Explicit filesystem whitelist ---
|
|
from .database import filesystem_whitelist_store
|
|
sandboxes = await filesystem_whitelist_store.list()
|
|
if not sandboxes:
|
|
raise SecurityError(
|
|
"Filesystem access is not configured. Add directories via Settings → Filesystem."
|
|
)
|
|
try:
|
|
allowed, resolved_str = await filesystem_whitelist_store.is_allowed(path)
|
|
except ValueError as e:
|
|
raise SecurityError(str(e))
|
|
if not allowed:
|
|
allowed_str = ", ".join(e["path"] for e in sandboxes)
|
|
raise SecurityError(
|
|
f"Path '{resolved_str}' is outside the allowed directories: {allowed_str}"
|
|
)
|
|
return Path(resolved_str)
|
|
|
|
|
|
async def assert_domain_tier1(url: str) -> bool:
|
|
"""
|
|
Return True if the URL's domain is in the Tier 1 whitelist (DB-managed).
|
|
Returns False (does NOT raise) — callers decide how to handle Tier 2.
|
|
"""
|
|
from .database import web_whitelist_store
|
|
return await web_whitelist_store.is_allowed(url)
|
|
|
|
|
|
# ─── Prompt injection sanitisation ───────────────────────────────────────────
|
|
|
|
_INJECTION_PATTERNS = [
|
|
re.compile(r"<\s*tool_use\b", re.IGNORECASE),
|
|
re.compile(r"<\s*system\b", re.IGNORECASE),
|
|
re.compile(r"\bIGNORE\s+(PREVIOUS|ALL|ABOVE)\b", re.IGNORECASE),
|
|
re.compile(r"\bFORGET\s+(PREVIOUS|ALL|ABOVE|YOUR)\b", re.IGNORECASE),
|
|
re.compile(r"\bNEW\s+INSTRUCTIONS?\b", re.IGNORECASE),
|
|
re.compile(r"\bYOU\s+ARE\s+NOW\b", re.IGNORECASE),
|
|
re.compile(r"\bACT\s+AS\b", re.IGNORECASE),
|
|
re.compile(r"\[SYSTEM\]", re.IGNORECASE),
|
|
re.compile(r"<<<.*>>>"),
|
|
]
|
|
|
|
_EXTENDED_INJECTION_PATTERNS = [
|
|
re.compile(r"\bDISREGARD\s+(YOUR|ALL|PREVIOUS|PRIOR)\b", re.IGNORECASE),
|
|
re.compile(r"\bPRETEND\s+(YOU\s+ARE|TO\s+BE)\b", re.IGNORECASE),
|
|
re.compile(r"\bYOUR\s+(NEW\s+)?(PRIMARY\s+)?DIRECTIVE\b", re.IGNORECASE),
|
|
re.compile(r"\bSTOP\b.*\bNEW\s+(TASK|INSTRUCTIONS?)\b", re.IGNORECASE),
|
|
re.compile(r"\[/?INST\]", re.IGNORECASE),
|
|
re.compile(r"<\|im_start\|>|<\|im_end\|>"),
|
|
re.compile(r"</?s>"),
|
|
re.compile(r"\bJAILBREAK\b", re.IGNORECASE),
|
|
re.compile(r"\bDAN\s+MODE\b", re.IGNORECASE),
|
|
]
|
|
|
|
_BASE64_BLOB_PATTERN = re.compile(r"(?:[A-Za-z0-9+/]{40,}={0,2})")
|
|
|
|
|
|
async def sanitize_external_content(text: str, source: str = "external") -> str:
|
|
"""
|
|
Remove patterns that resemble prompt injection from external content.
|
|
When system:security_sanitize_enhanced is enabled, additional extended patterns are also applied.
|
|
"""
|
|
import logging as _logging
|
|
_logger = _logging.getLogger(__name__)
|
|
|
|
sanitized = text
|
|
for pattern in _INJECTION_PATTERNS:
|
|
sanitized = pattern.sub(f"[{source}: content redacted]", sanitized)
|
|
|
|
try:
|
|
from .security_screening import is_option_enabled
|
|
if await is_option_enabled("system:security_sanitize_enhanced"):
|
|
for pattern in _EXTENDED_INJECTION_PATTERNS:
|
|
sanitized = pattern.sub(f"[{source}: content redacted]", sanitized)
|
|
if _BASE64_BLOB_PATTERN.search(sanitized):
|
|
_logger.info(
|
|
"sanitize_external_content: base64-like blob detected in %s content "
|
|
"(not redacted — may be a legitimate email signature)",
|
|
source,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return sanitized
|