Files
oai-web/server/context_vars.py
2026-04-08 12:43:24 +02:00

34 lines
1.3 KiB
Python

"""
context_vars.py — asyncio ContextVars for per-request state.
Set by the agent loop before dispatching tool calls.
Read by tools that need session/task context (e.g. WebTool for Tier 2 check).
Using ContextVar is safe in async code — each task gets its own copy.
"""
from __future__ import annotations
from __future__ import annotations
from contextvars import ContextVar
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .auth import CurrentUser
# Current session ID (None for anonymous/scheduled)
current_session_id: ContextVar[str | None] = ContextVar("session_id", default=None)
# Current authenticated user (None for scheduled/API-key-less tasks)
current_user: ContextVar[CurrentUser | None] = ContextVar("current_user", default=None)
# Current task ID (None for interactive sessions)
current_task_id: ContextVar[str | None] = ContextVar("task_id", default=None)
# Whether Tier 2 web access is enabled for this session
# Set True when the agent determines the user is requesting external web access
web_tier2_enabled: ContextVar[bool] = ContextVar("web_tier2_enabled", default=False)
# Absolute path to the calling user's personal folder (e.g. /users/rune).
# Set by agent.py at run start so assert_path_allowed can implicitly allow it.
current_user_folder: ContextVar[str | None] = ContextVar("current_user_folder", default=None)