120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
"""
|
|
tools/bash_tool.py — Sandboxed bash command execution.
|
|
|
|
Runs shell commands in a working directory that must be within the
|
|
filesystem whitelist. Captures stdout, stderr, and exit code.
|
|
|
|
Requires confirmation in interactive sessions. For scheduled tasks and
|
|
agents, declare "bash" in allowed_tools to enable it without confirmation.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from ..context_vars import current_user as _current_user_var
|
|
from ..security import SecurityError, assert_path_allowed
|
|
from .base import BaseTool, ToolResult
|
|
|
|
DEFAULT_TIMEOUT = 30 # seconds
|
|
MAX_TIMEOUT = 120 # seconds
|
|
MAX_OUTPUT_BYTES = 50_000 # 50 KB per stream
|
|
|
|
|
|
class BashTool(BaseTool):
|
|
name = "bash"
|
|
description = (
|
|
"Execute a shell command in a sandboxed working directory. "
|
|
"working_directory must be within the filesystem whitelist. "
|
|
"Returns stdout, stderr, and exit_code. "
|
|
"Use for running scripts, CLI tools, and automation tasks."
|
|
)
|
|
input_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {
|
|
"type": "string",
|
|
"description": "Shell command to execute (run via /bin/bash -c)",
|
|
},
|
|
"working_directory": {
|
|
"type": "string",
|
|
"description": (
|
|
"Absolute path to run the command in. "
|
|
"Must be within the filesystem whitelist."
|
|
),
|
|
},
|
|
"timeout": {
|
|
"type": "integer",
|
|
"description": (
|
|
f"Timeout in seconds (default {DEFAULT_TIMEOUT}, max {MAX_TIMEOUT}). "
|
|
"The command is killed if it exceeds this limit."
|
|
),
|
|
},
|
|
},
|
|
"required": ["command", "working_directory"],
|
|
}
|
|
requires_confirmation = True
|
|
allowed_in_scheduled_tasks = True
|
|
|
|
async def execute(
|
|
self,
|
|
command: str,
|
|
working_directory: str,
|
|
timeout: int = DEFAULT_TIMEOUT,
|
|
**kwargs,
|
|
) -> ToolResult:
|
|
# Defence-in-depth: reject non-admin users regardless of how the tool was invoked
|
|
_user = _current_user_var.get()
|
|
if _user is not None and not _user.is_admin:
|
|
return ToolResult(success=False, error="Bash tool requires administrator privileges.")
|
|
|
|
# Validate working directory against filesystem whitelist
|
|
try:
|
|
safe_cwd = await assert_path_allowed(working_directory)
|
|
except SecurityError as e:
|
|
return ToolResult(success=False, error=str(e))
|
|
|
|
if not safe_cwd.is_dir():
|
|
return ToolResult(
|
|
success=False,
|
|
error=f"Working directory does not exist: {working_directory}",
|
|
)
|
|
|
|
timeout = max(1, min(int(timeout), MAX_TIMEOUT))
|
|
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=str(safe_cwd),
|
|
)
|
|
try:
|
|
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
|
proc.communicate(), timeout=timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.communicate()
|
|
return ToolResult(
|
|
success=False,
|
|
error=f"Command timed out after {timeout}s",
|
|
)
|
|
|
|
stdout = stdout_bytes[:MAX_OUTPUT_BYTES].decode("utf-8", errors="replace")
|
|
stderr = stderr_bytes[:MAX_OUTPUT_BYTES].decode("utf-8", errors="replace")
|
|
exit_code = proc.returncode
|
|
|
|
return ToolResult(
|
|
success=exit_code == 0,
|
|
data={"stdout": stdout, "stderr": stderr, "exit_code": exit_code},
|
|
error=f"Exit code {exit_code}: {(stderr or stdout)[:500]}" if exit_code != 0 else None,
|
|
)
|
|
|
|
except Exception as e:
|
|
return ToolResult(success=False, error=f"Failed to run command: {e}")
|
|
|
|
def confirmation_description(
|
|
self, command: str = "", working_directory: str = "", **kwargs
|
|
) -> str:
|
|
return f"Run shell command in {working_directory}:\n{command}"
|