115 lines
3.7 KiB
Python
115 lines
3.7 KiB
Python
"""
|
|
agent/confirmation.py — Confirmation flow for side-effect tool calls.
|
|
|
|
When a tool has requires_confirmation=True, the agent loop calls
|
|
ConfirmationManager.request(). This suspends the tool call and returns
|
|
control to the web layer, which shows the user a Yes/No prompt.
|
|
|
|
The web route calls ConfirmationManager.respond() when the user decides.
|
|
The suspended coroutine resumes with the result.
|
|
|
|
Pending confirmations expire after TIMEOUT_SECONDS.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TIMEOUT_SECONDS = 300 # 5 minutes
|
|
|
|
|
|
@dataclass
|
|
class PendingConfirmation:
|
|
session_id: str
|
|
tool_name: str
|
|
arguments: dict
|
|
description: str # Human-readable summary shown to user
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
_event: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
|
|
_approved: bool = False
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"session_id": self.session_id,
|
|
"tool_name": self.tool_name,
|
|
"arguments": self.arguments,
|
|
"description": self.description,
|
|
"created_at": self.created_at.isoformat(),
|
|
}
|
|
|
|
|
|
class ConfirmationManager:
|
|
"""
|
|
Singleton-style manager. One instance shared across the app.
|
|
Thread-safe for asyncio (single event loop).
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._pending: dict[str, PendingConfirmation] = {}
|
|
|
|
async def request(
|
|
self,
|
|
session_id: str,
|
|
tool_name: str,
|
|
arguments: dict,
|
|
description: str,
|
|
) -> bool:
|
|
"""
|
|
Called by the agent loop when a tool requires confirmation.
|
|
Suspends until the user responds (Yes/No) or the timeout expires.
|
|
|
|
Returns True if approved, False if denied or timed out.
|
|
"""
|
|
if session_id in self._pending:
|
|
# Previous confirmation timed out and wasn't cleaned up
|
|
logger.warning(f"Overwriting stale pending confirmation for session {session_id}")
|
|
|
|
confirmation = PendingConfirmation(
|
|
session_id=session_id,
|
|
tool_name=tool_name,
|
|
arguments=arguments,
|
|
description=description,
|
|
)
|
|
self._pending[session_id] = confirmation
|
|
|
|
try:
|
|
await asyncio.wait_for(confirmation._event.wait(), timeout=TIMEOUT_SECONDS)
|
|
approved = confirmation._approved
|
|
except asyncio.TimeoutError:
|
|
logger.info(f"Confirmation timed out for session {session_id} / tool {tool_name}")
|
|
approved = False
|
|
finally:
|
|
self._pending.pop(session_id, None)
|
|
|
|
action = "approved" if approved else "denied/timed out"
|
|
logger.info(f"Confirmation {action}: session={session_id} tool={tool_name}")
|
|
return approved
|
|
|
|
def respond(self, session_id: str, approved: bool) -> bool:
|
|
"""
|
|
Called by the web route (/api/confirm) when the user clicks Yes or No.
|
|
Returns False if no pending confirmation exists for this session.
|
|
"""
|
|
confirmation = self._pending.get(session_id)
|
|
if confirmation is None:
|
|
logger.warning(f"No pending confirmation for session {session_id}")
|
|
return False
|
|
|
|
confirmation._approved = approved
|
|
confirmation._event.set()
|
|
return True
|
|
|
|
def get_pending(self, session_id: str) -> PendingConfirmation | None:
|
|
return self._pending.get(session_id)
|
|
|
|
def list_pending(self) -> list[dict]:
|
|
return [c.to_dict() for c in self._pending.values()]
|
|
|
|
|
|
# Module-level singleton
|
|
confirmation_manager = ConfirmationManager()
|