""" context_vars.py — asyncio ContextVars for per-request state. Set by the agent loop before dispatching tool calls. Read by tools that need session/task context (e.g. WebTool for Tier 2 check). Using ContextVar is safe in async code — each task gets its own copy. """ from __future__ import annotations from __future__ import annotations from contextvars import ContextVar from typing import TYPE_CHECKING if TYPE_CHECKING: from .auth import CurrentUser # Current session ID (None for anonymous/scheduled) current_session_id: ContextVar[str | None] = ContextVar("session_id", default=None) # Current authenticated user (None for scheduled/API-key-less tasks) current_user: ContextVar[CurrentUser | None] = ContextVar("current_user", default=None) # Current task ID (None for interactive sessions) current_task_id: ContextVar[str | None] = ContextVar("task_id", default=None) # Whether Tier 2 web access is enabled for this session # Set True when the agent determines the user is requesting external web access web_tier2_enabled: ContextVar[bool] = ContextVar("web_tier2_enabled", default=False) # Absolute path to the calling user's personal folder (e.g. /users/rune). # Set by agent.py at run start so assert_path_allowed can implicitly allow it. current_user_folder: ContextVar[str | None] = ContextVar("current_user_folder", default=None)