#!/usr/bin/python3 -W ignore::DeprecationWarning
import sys
import os
import requests
import time # For response time tracking
from pathlib import Path
from typing import Optional, List, Dict, Any
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.markdown import Markdown
from rich.live import Live
from openrouter import OpenRouter
import pyperclip
import mimetypes
import base64
import re
import sqlite3
import json
import datetime
import logging
from logging.handlers import RotatingFileHandler # Added for log rotation
from prompt_toolkit import PromptSession
from prompt_toolkit.history import FileHistory
from rich.logging import RichHandler
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from packaging import version as pkg_version
import io # Added for custom handler
# App version. Changes by author with new releases.
version = '1.9.5'
app = typer.Typer()
# Application identification for OpenRouter
APP_NAME = "oAI"
APP_URL = "https://iurl.no/oai"
# Paths
home = Path.home()
config_dir = home / '.config' / 'oai'
cache_dir = home / '.cache' / 'oai'
history_file = config_dir / 'history.txt' # Persistent input history file
database = config_dir / 'oai_config.db'
log_file = config_dir / 'oai.log'
# Create dirs if needed
config_dir.mkdir(parents=True, exist_ok=True)
cache_dir.mkdir(parents=True, exist_ok=True)
# Rich console for chat UI (separate from logging)
console = Console()
# Valid commands list for validation
VALID_COMMANDS = {
'/retry', '/online', '/memory', '/paste', '/export', '/save', '/load',
'/delete', '/list', '/prev', '/next', '/stats', '/middleout', '/reset',
'/info', '/model', '/maxtoken', '/system', '/config', '/credits', '/clear', '/cl', '/help'
}
# Supported code file extensions
SUPPORTED_CODE_EXTENSIONS = {
'.py', '.js', '.ts', '.cs', '.java', '.c', '.cpp', '.h', '.hpp',
'.rb', '.ruby', '.php', '.swift', '.kt', '.kts', '.go',
'.sh', '.bat', '.ps1', '.R', '.scala', '.pl', '.lua', '.dart',
'.elm', '.xml', '.json', '.yaml', '.yml', '.md', '.txt'
}
# Session metrics constants (per 1M tokens, in USD; adjustable)
MODEL_PRICING = {
'input': 3.0, # $3/M input tokens (adjustable)
'output': 15.0 # $15/M output tokens (adjustable)
}
LOW_CREDIT_RATIO = 0.1 # Warn if credits left < 10% of total
LOW_CREDIT_AMOUNT = 1.0 # Warn if credits left < $1 in absolute terms
HIGH_COST_WARNING = "cost_warning_threshold" # Configurable key for cost threshold, default $0.01
# DB configuration
database = config_dir / 'oai_config.db'
DB_FILE = str(database)
def create_table_if_not_exists():
"""Ensure the config and conversation_sessions tables exist."""
os.makedirs(config_dir, exist_ok=True)
with sqlite3.connect(DB_FILE) as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)''')
conn.execute('''CREATE TABLE IF NOT EXISTS conversation_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
timestamp TEXT NOT NULL,
data TEXT NOT NULL -- JSON of session_history
)''')
conn.commit()
def get_config(key: str) -> Optional[str]:
create_table_if_not_exists()
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,))
result = cursor.fetchone()
return result[0] if result else None
def set_config(key: str, value: str):
create_table_if_not_exists()
with sqlite3.connect(DB_FILE) as conn:
conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value))
conn.commit()
# ============================================================================
# ROTATING RICH HANDLER - Combines RotatingFileHandler with Rich formatting
# ============================================================================
class RotatingRichHandler(RotatingFileHandler):
"""Custom handler that combines RotatingFileHandler with Rich formatting."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Create a Rich console that writes to a string buffer
self.rich_console = Console(file=io.StringIO(), width=120, force_terminal=False)
self.rich_handler = RichHandler(
console=self.rich_console,
show_time=True,
show_path=True,
rich_tracebacks=True,
tracebacks_suppress=['requests', 'openrouter', 'urllib3', 'httpx', 'openai']
)
def emit(self, record):
try:
# Let RichHandler format the record
self.rich_handler.emit(record)
# Get the formatted output from the string buffer
output = self.rich_console.file.getvalue()
# Clear the buffer for next use
self.rich_console.file.seek(0)
self.rich_console.file.truncate(0)
# Write the Rich-formatted output to our rotating file
if output:
self.stream.write(output)
self.flush()
except Exception:
self.handleError(record)
# Get log configuration from DB
LOG_MAX_SIZE_MB = int(get_config('log_max_size_mb') or "10")
LOG_BACKUP_COUNT = int(get_config('log_backup_count') or "2")
# Create the custom rotating handler
app_handler = RotatingRichHandler(
filename=str(log_file),
maxBytes=LOG_MAX_SIZE_MB * 1024 * 1024, # Convert MB to bytes
backupCount=LOG_BACKUP_COUNT,
encoding='utf-8'
)
logging.basicConfig(
level=logging.NOTSET,
format="%(message)s", # Rich formats it
datefmt="[%X]",
handlers=[app_handler]
)
app_logger = logging.getLogger("oai_app")
app_logger.setLevel(logging.INFO)
# ============================================================================
# END OF LOGGING SETUP
# ============================================================================
logger = logging.getLogger(__name__)
def check_for_updates(current_version: str) -> str:
"""
Check if a new version is available using semantic versioning.
Returns:
Formatted status string for display
"""
try:
response = requests.get(
'https://gitlab.pm/api/v1/repos/rune/oai/releases/latest',
headers={"Content-Type": "application/json"},
timeout=1.0,
allow_redirects=True
)
response.raise_for_status()
data = response.json()
version_online = data.get('tag_name', '').lstrip('v')
if not version_online:
logger.warning("No version found in API response")
return f"[bold green]oAI version {current_version}[/]"
current = pkg_version.parse(current_version)
latest = pkg_version.parse(version_online)
if latest > current:
logger.info(f"Update available: {current_version} → {version_online}")
return f"[bold green]oAI version {current_version} [/][bold red](Update available: {current_version} → {version_online})[/]"
else:
logger.debug(f"Already up to date: {current_version}")
return f"[bold green]oAI version {current_version} (up to date)[/]"
except requests.exceptions.HTTPError as e:
logger.warning(f"HTTP error checking for updates: {e.response.status_code}")
return f"[bold green]oAI version {current_version}[/]"
except requests.exceptions.ConnectionError:
logger.warning("Network error checking for updates (offline?)")
return f"[bold green]oAI version {current_version}[/]"
except requests.exceptions.Timeout:
logger.warning("Timeout checking for updates")
return f"[bold green]oAI version {current_version}[/]"
except requests.exceptions.RequestException as e:
logger.warning(f"Request error checking for updates: {type(e).__name__}")
return f"[bold green]oAI version {current_version}[/]"
except (KeyError, ValueError) as e:
logger.warning(f"Invalid API response checking for updates: {e}")
return f"[bold green]oAI version {current_version}[/]"
except Exception as e:
logger.error(f"Unexpected error checking for updates: {e}")
return f"[bold green]oAI version {current_version}[/]"
def save_conversation(name: str, data: List[Dict[str, str]]):
"""Save conversation history to DB."""
timestamp = datetime.datetime.now().isoformat()
data_json = json.dumps(data)
with sqlite3.connect(DB_FILE) as conn:
conn.execute('INSERT INTO conversation_sessions (name, timestamp, data) VALUES (?, ?, ?)', (name, timestamp, data_json))
conn.commit()
def load_conversation(name: str) -> Optional[List[Dict[str, str]]]:
"""Load conversation history from DB (latest by timestamp)."""
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.execute('SELECT data FROM conversation_sessions WHERE name = ? ORDER BY timestamp DESC LIMIT 1', (name,))
result = cursor.fetchone()
if result:
return json.loads(result[0])
return None
def delete_conversation(name: str) -> int:
"""Delete all conversation sessions with the given name. Returns number of deleted rows."""
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.execute('DELETE FROM conversation_sessions WHERE name = ?', (name,))
conn.commit()
return cursor.rowcount
def list_conversations() -> List[Dict[str, Any]]:
"""List all saved conversations from DB with metadata."""
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.execute('''
SELECT name, MAX(timestamp) as last_saved, data
FROM conversation_sessions
GROUP BY name
ORDER BY last_saved DESC
''')
conversations = []
for row in cursor.fetchall():
name, timestamp, data_json = row
data = json.loads(data_json)
conversations.append({
'name': name,
'timestamp': timestamp,
'message_count': len(data)
})
return conversations
def estimate_cost(input_tokens: int, output_tokens: int) -> float:
"""Estimate cost in USD based on token counts."""
return (input_tokens * MODEL_PRICING['input'] / 1_000_000) + (output_tokens * MODEL_PRICING['output'] / 1_000_000)
def has_web_search_capability(model: Dict[str, Any]) -> bool:
"""Check if model supports web search based on supported_parameters."""
supported_params = model.get("supported_parameters", [])
# Web search is typically indicated by 'tools' parameter support
return "tools" in supported_params
def has_image_capability(model: Dict[str, Any]) -> bool:
"""Check if model supports image input based on input modalities."""
architecture = model.get("architecture", {})
input_modalities = architecture.get("input_modalities", [])
return "image" in input_modalities
def supports_online_mode(model: Dict[str, Any]) -> bool:
"""Check if model supports :online suffix for web search."""
# Models that support tools parameter can use :online
return has_web_search_capability(model)
def get_effective_model_id(base_model_id: str, online_enabled: bool) -> str:
"""Get the effective model ID with :online suffix if enabled."""
if online_enabled and not base_model_id.endswith(':online'):
return f"{base_model_id}:online"
return base_model_id
def export_as_markdown(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str:
"""Export conversation history as Markdown."""
lines = ["# Conversation Export", ""]
if session_system_prompt:
lines.extend([f"**System Prompt:** {session_system_prompt}", ""])
lines.append(f"**Export Date:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
lines.append("---")
lines.append("")
for i, entry in enumerate(session_history, 1):
lines.append(f"## Message {i}")
lines.append("")
lines.append("**User:**")
lines.append("")
lines.append(entry['prompt'])
lines.append("")
lines.append("**Assistant:**")
lines.append("")
lines.append(entry['response'])
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def export_as_json(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str:
"""Export conversation history as JSON."""
export_data = {
"export_date": datetime.datetime.now().isoformat(),
"system_prompt": session_system_prompt,
"message_count": len(session_history),
"messages": session_history
}
return json.dumps(export_data, indent=2, ensure_ascii=False)
def export_as_html(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str:
"""Export conversation history as HTML."""
# Escape HTML special characters
def escape_html(text):
return text.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''')
html_parts = [
"",
"",
"
",
" ",
" ",
" Conversation Export",
" ",
"",
"",
" ",
]
if session_system_prompt:
html_parts.extend([
" ",
"
⚙️ System Prompt",
f"
{escape_html(session_system_prompt)}
",
"
",
])
for i, entry in enumerate(session_history, 1):
html_parts.extend([
" ",
f"
Message {i} of {len(session_history)}
",
"
",
"
👤 User
",
f"
{escape_html(entry['prompt'])}
",
"
",
"
",
"
🤖 Assistant
",
f"
{escape_html(entry['response'])}
",
"
",
"
",
])
html_parts.extend([
" ",
"",
"",
])
return "\n".join(html_parts)
# Load configs
API_KEY = get_config('api_key')
OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1"
STREAM_ENABLED = get_config('stream_enabled') or "on"
DEFAULT_MODEL_ID = get_config('default_model')
MAX_TOKEN = int(get_config('max_token') or "100000")
COST_WARNING_THRESHOLD = float(get_config(HIGH_COST_WARNING) or "0.01") # Configurable cost threshold for alerts
DEFAULT_ONLINE_MODE = get_config('default_online_mode') or "off" # New: Default online mode setting
# Fetch models with app identification headers
models_data = []
text_models = []
try:
headers = {
"Authorization": f"Bearer {API_KEY}",
"HTTP-Referer": APP_URL,
"X-Title": APP_NAME
} if API_KEY else {
"HTTP-Referer": APP_URL,
"X-Title": APP_NAME
}
response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers)
response.raise_for_status()
models_data = response.json()["data"]
text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])]
selected_model_default = None
if DEFAULT_MODEL_ID:
selected_model_default = next((m for m in text_models if m["id"] == DEFAULT_MODEL_ID), None)
if not selected_model_default:
console.print(f"[bold yellow]Warning: Default model '{DEFAULT_MODEL_ID}' unavailable. Use '/config model'.[/]")
except Exception as e:
models_data = []
text_models = []
app_logger.error(f"Failed to fetch models: {e}")
def get_credits(api_key: str, base_url: str = OPENROUTER_BASE_URL) -> Optional[Dict[str, str]]:
if not api_key:
return None
url = f"{base_url}/credits"
headers = {
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": APP_URL,
"X-Title": APP_NAME
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json().get('data', {})
total_credits = float(data.get('total_credits', 0))
total_usage = float(data.get('total_usage', 0))
credits_left = total_credits - total_usage
return {
'total_credits': f"${total_credits:.2f}",
'used_credits': f"${total_usage:.2f}",
'credits_left': f"${credits_left:.2f}"
}
except Exception as e:
console.print(f"[bold red]Error fetching credits: {e}[/]")
return None
def check_credit_alerts(credits_data: Optional[Dict[str, str]]) -> List[str]:
"""Check and return list of credit-related alerts."""
alerts = []
if credits_data:
credits_left_value = float(credits_data['credits_left'].strip('$'))
total_credits_value = float(credits_data['total_credits'].strip('$'))
if credits_left_value < LOW_CREDIT_AMOUNT:
alerts.append(f"Critical credit alert: Less than ${LOW_CREDIT_AMOUNT:.2f} left ({credits_data['credits_left']})")
elif credits_left_value < total_credits_value * LOW_CREDIT_RATIO:
alerts.append(f"Low credit alert: Credits left < 10% of total ({credits_data['credits_left']})")
return alerts
def clear_screen():
try:
print("\033[H\033[J", end="", flush=True)
except:
print("\n" * 100)
def display_paginated_table(table: Table, title: str):
"""Display a table with pagination support using Rich console for colored output, repeating header on each page."""
# Get terminal height (subtract some lines for prompt and margins)
try:
terminal_height = os.get_terminal_size().lines - 8
except:
terminal_height = 20 # Fallback if terminal size can't be determined
# Create a segment-based approach to capture Rich-rendered output
from rich.segment import Segment
# Render the table to segments
segments = list(console.render(table))
# Convert segments to lines while preserving style
current_line_segments = []
all_lines = []
for segment in segments:
if segment.text == '\n':
all_lines.append(current_line_segments)
current_line_segments = []
else:
current_line_segments.append(segment)
# Add last line if not empty
if current_line_segments:
all_lines.append(current_line_segments)
total_lines = len(all_lines)
# If fits on one screen after segment analysis
if total_lines <= terminal_height:
console.print(Panel(table, title=title, title_align="left"))
return
# Separate header from data rows
header_lines = []
data_lines = []
# Find where the header ends
header_end_index = 0
found_header_text = False
for i, line_segments in enumerate(all_lines):
# Check if this line contains header-style text
has_header_style = any(
seg.style and ('bold' in str(seg.style) or 'magenta' in str(seg.style))
for seg in line_segments
)
if has_header_style:
found_header_text = True
# After finding header text, the next line with box-drawing chars is the separator
if found_header_text and i > 0:
line_text = ''.join(seg.text for seg in line_segments)
if any(char in line_text for char in ['─', '━', '┼', '╪', '┤', '├']):
header_end_index = i
break
# If we found a header separator, split there
if header_end_index > 0:
header_lines = all_lines[:header_end_index + 1]
data_lines = all_lines[header_end_index + 1:]
else:
# Fallback: assume first 3 lines are header
header_lines = all_lines[:min(3, len(all_lines))]
data_lines = all_lines[min(3, len(all_lines)):]
# Calculate how many data lines fit per page
lines_per_page = terminal_height - len(header_lines)
# Display with pagination
current_line = 0
page_number = 1
while current_line < len(data_lines):
# Clear screen for each page
clear_screen()
# Print title
console.print(f"[bold cyan]{title} (Page {page_number})[/]")
# Print header on every page
for line_segments in header_lines:
for segment in line_segments:
console.print(segment.text, style=segment.style, end="")
console.print()
# Calculate how many data lines to show on this page
end_line = min(current_line + lines_per_page, len(data_lines))
# Print data lines for this page
for line_segments in data_lines[current_line:end_line]:
for segment in line_segments:
console.print(segment.text, style=segment.style, end="")
console.print()
# Update position
current_line = end_line
page_number += 1
# If there's more content, wait for user
if current_line < len(data_lines):
console.print(f"\n[dim yellow]--- Press SPACE for next page, or any other key to finish (Page {page_number - 1}, showing {end_line}/{len(data_lines)} data rows) ---[/dim yellow]")
try:
import sys
import tty
import termios
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
char = sys.stdin.read(1)
if char != ' ':
break
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except:
# Fallback for Windows or if termios not available
input_char = input().strip()
if input_char != '':
break
else:
break
@app.command()
def chat():
global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED, MAX_TOKEN, COST_WARNING_THRESHOLD, DEFAULT_ONLINE_MODE, LOG_MAX_SIZE_MB, LOG_BACKUP_COUNT
session_max_token = 0
session_system_prompt = ""
session_history = []
current_index = -1
total_input_tokens = 0
total_output_tokens = 0
total_cost = 0.0
message_count = 0
middle_out_enabled = False # Session-level middle-out transform flag
conversation_memory_enabled = True # Memory ON by default
memory_start_index = 0 # Track when memory was last enabled
saved_conversations_cache = [] # Cache for /list results to use with /load by number
online_mode_enabled = DEFAULT_ONLINE_MODE == "on" # Initialize from config
app_logger.info("Starting new chat session with memory enabled")
if not API_KEY:
console.print("[bold red]API key not found. Use '/config api'.[/]")
try:
new_api_key = typer.prompt("Enter API key")
if new_api_key.strip():
set_config('api_key', new_api_key.strip())
API_KEY = new_api_key.strip()
console.print("[bold green]API key saved. Re-run.[/]")
else:
raise typer.Exit()
except:
console.print("[bold red]No API key. Exiting.[/]")
raise typer.Exit()
if not text_models:
console.print("[bold red]No models available. Check API key/URL.[/]")
raise typer.Exit()
# Check for credit alerts at startup
credits_data = get_credits(API_KEY, OPENROUTER_BASE_URL)
startup_credit_alerts = check_credit_alerts(credits_data)
if startup_credit_alerts:
startup_alert_msg = " | ".join(startup_credit_alerts)
console.print(f"[bold red]⚠️ Startup {startup_alert_msg}[/]")
app_logger.warning(f"Startup credit alerts: {startup_alert_msg}")
selected_model = selected_model_default
# Initialize OpenRouter client
client = OpenRouter(api_key=API_KEY)
if selected_model:
online_status = "enabled" if online_mode_enabled else "disabled"
console.print(f"[bold blue]Welcome to oAI![/] [bold red]Active model: {selected_model['name']}[/] [dim cyan](Online mode: {online_status})[/]")
else:
console.print("[bold blue]Welcome to oAI![/] [italic blue]Select a model with '/model'.[/]")
if not selected_model:
console.print("[bold yellow]No model selected. Use '/model'.[/]")
# Persistent input history
session = PromptSession(history=FileHistory(str(history_file)))
while True:
try:
user_input = session.prompt("You> ", auto_suggest=AutoSuggestFromHistory()).strip()
# Handle // escape sequence - convert to single / and treat as regular text
if user_input.startswith("//"):
user_input = user_input[1:] # Remove first slash, keep the rest
# Check for unknown commands
elif user_input.startswith("/") and user_input.lower() not in ["exit", "quit", "bye"]:
command_word = user_input.split()[0].lower() if user_input.split() else user_input.lower()
if not any(command_word.startswith(cmd) for cmd in VALID_COMMANDS):
console.print(f"[bold red]Unknown command: {command_word}[/]")
console.print("[bold yellow]Type /help to see all available commands.[/]")
app_logger.warning(f"Unknown command attempted: {command_word}")
continue
if user_input.lower() in ["exit", "quit", "bye"]:
total_tokens = total_input_tokens + total_output_tokens
app_logger.info(f"Session ended. Total messages: {message_count}, Total tokens: {total_tokens}, Total cost: ${total_cost:.4f}")
console.print("[bold yellow]Goodbye![/]")
return
# Commands with logging
if user_input.lower() == "/retry":
if not session_history:
console.print("[bold red]No history to retry.[/]")
app_logger.warning("Retry attempted with no history")
continue
last_prompt = session_history[-1]['prompt']
console.print("[bold green]Retrying last prompt...[/]")
app_logger.info(f"Retrying prompt: {last_prompt[:100]}...")
user_input = last_prompt
elif user_input.lower().startswith("/online"):
args = user_input[8:].strip()
if not args:
status = "enabled" if online_mode_enabled else "disabled"
default_status = "enabled" if DEFAULT_ONLINE_MODE == "on" else "disabled"
console.print(f"[bold blue]Online mode (web search) {status}.[/]")
console.print(f"[dim blue]Default setting: {default_status} (use '/config online on|off' to change)[/]")
if selected_model:
if supports_online_mode(selected_model):
console.print(f"[dim green]Current model '{selected_model['name']}' supports online mode.[/]")
else:
console.print(f"[dim yellow]Current model '{selected_model['name']}' does not support online mode.[/]")
continue
if args.lower() == "on":
if not selected_model:
console.print("[bold red]No model selected. Select a model first with '/model'.[/]")
continue
if not supports_online_mode(selected_model):
console.print(f"[bold red]Model '{selected_model['name']}' does not support online mode (web search).[/]")
console.print("[dim yellow]Online mode requires models with 'tools' parameter support.[/]")
app_logger.warning(f"Online mode activation failed - model {selected_model['id']} doesn't support it")
continue
online_mode_enabled = True
console.print("[bold green]Online mode enabled for this session. Model will use web search capabilities.[/]")
console.print(f"[dim blue]Effective model ID: {get_effective_model_id(selected_model['id'], True)}[/]")
app_logger.info(f"Online mode enabled for model {selected_model['id']}")
elif args.lower() == "off":
online_mode_enabled = False
console.print("[bold green]Online mode disabled for this session. Model will not use web search.[/]")
if selected_model:
console.print(f"[dim blue]Effective model ID: {selected_model['id']}[/]")
app_logger.info("Online mode disabled")
else:
console.print("[bold yellow]Usage: /online on|off (or /online to view status)[/]")
continue
elif user_input.lower().startswith("/memory"):
args = user_input[8:].strip()
if not args:
status = "enabled" if conversation_memory_enabled else "disabled"
history_count = len(session_history) - memory_start_index if conversation_memory_enabled and memory_start_index < len(session_history) else 0
console.print(f"[bold blue]Conversation memory {status}.[/]")
if conversation_memory_enabled:
console.print(f"[dim blue]Tracking {history_count} message(s) since memory enabled.[/]")
else:
console.print(f"[dim yellow]Memory disabled. Each request is independent (saves tokens/cost).[/]")
continue
if args.lower() == "on":
conversation_memory_enabled = True
memory_start_index = len(session_history)
console.print("[bold green]Conversation memory enabled. Will remember conversations from this point forward.[/]")
console.print(f"[dim blue]Memory will track messages starting from index {memory_start_index}.[/]")
app_logger.info(f"Conversation memory enabled at index {memory_start_index}")
elif args.lower() == "off":
conversation_memory_enabled = False
console.print("[bold green]Conversation memory disabled. API calls will not include history (lower cost).[/]")
console.print(f"[dim yellow]Note: Messages are still saved locally but not sent to API.[/]")
app_logger.info("Conversation memory disabled")
else:
console.print("[bold yellow]Usage: /memory on|off (or /memory to view status)[/]")
continue
elif user_input.lower().startswith("/paste"):
optional_prompt = user_input[7:].strip()
try:
clipboard_content = pyperclip.paste()
except Exception as e:
console.print(f"[bold red]Failed to access clipboard: {e}[/]")
app_logger.error(f"Clipboard access error: {e}")
continue
if not clipboard_content or not clipboard_content.strip():
console.print("[bold red]Clipboard is empty.[/]")
app_logger.warning("Paste attempted with empty clipboard")
continue
try:
clipboard_content.encode('utf-8')
preview_lines = clipboard_content.split('\n')[:10]
preview_text = '\n'.join(preview_lines)
if len(clipboard_content.split('\n')) > 10:
preview_text += "\n... (content truncated for preview)"
char_count = len(clipboard_content)
line_count = len(clipboard_content.split('\n'))
console.print(Panel(
preview_text,
title=f"[bold cyan]📋 Clipboard Content Preview ({char_count} chars, {line_count} lines)[/]",
title_align="left",
border_style="cyan"
))
if optional_prompt:
final_prompt = f"{optional_prompt}\n\n```\n{clipboard_content}\n```"
console.print(f"[dim blue]Sending with prompt: '{optional_prompt}'[/]")
else:
final_prompt = clipboard_content
console.print("[dim blue]Sending clipboard content without additional prompt[/]")
user_input = final_prompt
app_logger.info(f"Pasted content from clipboard: {char_count} chars, {line_count} lines, with prompt: {bool(optional_prompt)}")
except UnicodeDecodeError:
console.print("[bold red]Clipboard contains non-text (binary) data. Only plain text is supported.[/]")
app_logger.error("Paste failed - clipboard contains binary data")
continue
except Exception as e:
console.print(f"[bold red]Error processing clipboard content: {e}[/]")
app_logger.error(f"Clipboard processing error: {e}")
continue
elif user_input.lower().startswith("/export"):
args = user_input[8:].strip().split(maxsplit=1)
if len(args) != 2:
console.print("[bold red]Usage: /export [/]")
console.print("[bold yellow]Formats: md (Markdown), json (JSON), html (HTML)[/]")
console.print("[bold yellow]Example: /export md my_conversation.md[/]")
continue
export_format = args[0].lower()
filename = args[1]
if not session_history:
console.print("[bold red]No conversation history to export.[/]")
continue
if export_format not in ['md', 'json', 'html']:
console.print("[bold red]Invalid format. Use: md, json, or html[/]")
continue
try:
if export_format == 'md':
content = export_as_markdown(session_history, session_system_prompt)
elif export_format == 'json':
content = export_as_json(session_history, session_system_prompt)
elif export_format == 'html':
content = export_as_html(session_history, session_system_prompt)
export_path = Path(filename).expanduser()
with open(export_path, 'w', encoding='utf-8') as f:
f.write(content)
console.print(f"[bold green]✅ Conversation exported to: {export_path.absolute()}[/]")
console.print(f"[dim blue]Format: {export_format.upper()} | Messages: {len(session_history)} | Size: {len(content)} bytes[/]")
app_logger.info(f"Conversation exported as {export_format} to {export_path} ({len(session_history)} messages)")
except Exception as e:
console.print(f"[bold red]Export failed: {e}[/]")
app_logger.error(f"Export error: {e}")
continue
elif user_input.lower().startswith("/save"):
args = user_input[6:].strip()
if not args:
console.print("[bold red]Usage: /save [/]")
continue
if not session_history:
console.print("[bold red]No history to save.[/]")
continue
save_conversation(args, session_history)
console.print(f"[bold green]Conversation saved as '{args}'.[/]")
app_logger.info(f"Conversation saved as '{args}' with {len(session_history)} messages")
continue
elif user_input.lower().startswith("/load"):
args = user_input[6:].strip()
if not args:
console.print("[bold red]Usage: /load [/]")
console.print("[bold yellow]Tip: Use /list to see numbered conversations[/]")
continue
conversation_name = None
if args.isdigit():
conv_number = int(args)
if saved_conversations_cache and 1 <= conv_number <= len(saved_conversations_cache):
conversation_name = saved_conversations_cache[conv_number - 1]['name']
console.print(f"[bold cyan]Loading conversation #{conv_number}: '{conversation_name}'[/]")
else:
console.print(f"[bold red]Invalid conversation number: {conv_number}[/]")
console.print(f"[bold yellow]Use /list to see available conversations (1-{len(saved_conversations_cache) if saved_conversations_cache else 0})[/]")
continue
else:
conversation_name = args
loaded_data = load_conversation(conversation_name)
if not loaded_data:
console.print(f"[bold red]Conversation '{conversation_name}' not found.[/]")
app_logger.warning(f"Load failed for '{conversation_name}' - not found")
continue
session_history = loaded_data
current_index = len(session_history) - 1
if conversation_memory_enabled:
memory_start_index = 0
total_input_tokens = 0
total_output_tokens = 0
total_cost = 0.0
message_count = 0
console.print(f"[bold green]Conversation '{conversation_name}' loaded with {len(session_history)} messages.[/]")
app_logger.info(f"Conversation '{conversation_name}' loaded with {len(session_history)} messages")
continue
elif user_input.lower().startswith("/delete"):
args = user_input[8:].strip()
if not args:
console.print("[bold red]Usage: /delete [/]")
console.print("[bold yellow]Tip: Use /list to see numbered conversations[/]")
continue
conversation_name = None
if args.isdigit():
conv_number = int(args)
if saved_conversations_cache and 1 <= conv_number <= len(saved_conversations_cache):
conversation_name = saved_conversations_cache[conv_number - 1]['name']
console.print(f"[bold cyan]Deleting conversation #{conv_number}: '{conversation_name}'[/]")
else:
console.print(f"[bold red]Invalid conversation number: {conv_number}[/]")
console.print(f"[bold yellow]Use /list to see available conversations (1-{len(saved_conversations_cache) if saved_conversations_cache else 0})[/]")
continue
else:
conversation_name = args
try:
confirm = typer.confirm(f"Delete conversation '{conversation_name}'? This cannot be undone.", default=False)
if not confirm:
console.print("[bold yellow]Deletion cancelled.[/]")
continue
except (EOFError, KeyboardInterrupt):
console.print("\n[bold yellow]Deletion cancelled.[/]")
continue
deleted_count = delete_conversation(conversation_name)
if deleted_count > 0:
console.print(f"[bold green]Conversation '{conversation_name}' deleted ({deleted_count} version(s) removed).[/]")
app_logger.info(f"Conversation '{conversation_name}' deleted - {deleted_count} version(s)")
if saved_conversations_cache:
saved_conversations_cache = [c for c in saved_conversations_cache if c['name'] != conversation_name]
else:
console.print(f"[bold red]Conversation '{conversation_name}' not found.[/]")
app_logger.warning(f"Delete failed for '{conversation_name}' - not found")
continue
elif user_input.lower() == "/list":
conversations = list_conversations()
if not conversations:
console.print("[bold yellow]No saved conversations found.[/]")
app_logger.info("User viewed conversation list - empty")
saved_conversations_cache = []
continue
saved_conversations_cache = conversations
table = Table("No.", "Name", "Messages", "Last Saved", show_header=True, header_style="bold magenta")
for idx, conv in enumerate(conversations, 1):
try:
dt = datetime.datetime.fromisoformat(conv['timestamp'])
formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S')
except:
formatted_time = conv['timestamp']
table.add_row(
str(idx),
conv['name'],
str(conv['message_count']),
formatted_time
)
console.print(Panel(table, title=f"[bold green]Saved Conversations ({len(conversations)} total)[/]", title_align="left", subtitle="[dim]Use /load or /delete to manage conversations[/]", subtitle_align="right"))
app_logger.info(f"User viewed conversation list - {len(conversations)} conversations")
continue
elif user_input.lower() == "/prev":
if not session_history or current_index <= 0:
console.print("[bold red]No previous response.[/]")
continue
current_index -= 1
prev_response = session_history[current_index]['response']
md = Markdown(prev_response)
console.print(Panel(md, title=f"[bold green]Previous Response ({current_index + 1}/{len(session_history)})[/]", title_align="left"))
app_logger.debug(f"Viewed previous response at index {current_index}")
continue
elif user_input.lower() == "/next":
if not session_history or current_index >= len(session_history) - 1:
console.print("[bold red]No next response.[/]")
continue
current_index += 1
next_response = session_history[current_index]['response']
md = Markdown(next_response)
console.print(Panel(md, title=f"[bold green]Next Response ({current_index + 1}/{len(session_history)})[/]", title_align="left"))
app_logger.debug(f"Viewed next response at index {current_index}")
continue
elif user_input.lower() == "/stats":
credits = get_credits(API_KEY, OPENROUTER_BASE_URL)
credits_left = credits['credits_left'] if credits else "Unknown"
stats = f"Total Input: {total_input_tokens}, Total Output: {total_output_tokens}, Total Tokens: {total_input_tokens + total_output_tokens}, Total Cost: ${total_cost:.4f}, Avg Cost/Message: ${total_cost / message_count:.4f}" if message_count > 0 else "No messages."
table = Table("Metric", "Value", show_header=True, header_style="bold magenta")
table.add_row("Session Stats", stats)
table.add_row("Credits Left", credits_left)
console.print(Panel(table, title="[bold green]Session Cost Summary[/]", title_align="left"))
app_logger.info(f"User viewed stats: {stats}")
warnings = check_credit_alerts(credits)
if warnings:
warning_text = '|'.join(warnings)
console.print(f"[bold red]⚠️ {warning_text}[/]")
app_logger.warning(f"Warnings in stats: {warning_text}")
continue
elif user_input.lower().startswith("/middleout"):
args = user_input[11:].strip()
if not args:
console.print(f"[bold blue]Middle-out transform {'enabled' if middle_out_enabled else 'disabled'}.[/]")
continue
if args.lower() == "on":
middle_out_enabled = True
console.print("[bold green]Middle-out transform enabled.[/]")
elif args.lower() == "off":
middle_out_enabled = False
console.print("[bold green]Middle-out transform disabled.[/]")
else:
console.print("[bold yellow]Usage: /middleout on|off (or /middleout to view status)[/]")
continue
elif user_input.lower() == "/reset":
confirm = typer.confirm("Reset conversation context? This clears history and prompt.", default=False)
if not confirm:
console.print("[bold yellow]Reset cancelled.[/]")
continue
session_history = []
current_index = -1
session_system_prompt = ""
memory_start_index = 0
total_input_tokens = 0
total_output_tokens = 0
total_cost = 0.0
message_count = 0
console.print("[bold green]Conversation context reset.[/]")
app_logger.info("Conversation context reset by user")
continue
elif user_input.lower().startswith("/info"):
args = user_input[6:].strip()
if not args:
if not selected_model:
console.print("[bold red]No model selected and no model ID provided. Use '/model' first or '/info '.[/]")
continue
model_to_show = selected_model
else:
model_to_show = next((m for m in models_data if m["id"] == args or m.get("canonical_slug") == args or args.lower() in m["name"].lower()), None)
if not model_to_show:
console.print(f"[bold red]Model '{args}' not found.[/]")
continue
pricing = model_to_show.get("pricing", {})
architecture = model_to_show.get("architecture", {})
supported_params = ", ".join(model_to_show.get("supported_parameters", [])) or "None"
top_provider = model_to_show.get("top_provider", {})
table = Table("Property", "Value", show_header=True, header_style="bold magenta")
table.add_row("ID", model_to_show["id"])
table.add_row("Name", model_to_show["name"])
table.add_row("Description", model_to_show.get("description", "N/A"))
table.add_row("Context Length", str(model_to_show.get("context_length", "N/A")))
table.add_row("Online Support", "Yes" if supports_online_mode(model_to_show) else "No")
table.add_row("Pricing - Prompt ($/M tokens)", pricing.get("prompt", "N/A"))
table.add_row("Pricing - Completion ($/M tokens)", pricing.get("completion", "N/A"))
table.add_row("Pricing - Request ($)", pricing.get("request", "N/A"))
table.add_row("Pricing - Image ($)", pricing.get("image", "N/A"))
table.add_row("Input Modalities", ", ".join(architecture.get("input_modalities", [])) or "None")
table.add_row("Output Modalities", ", ".join(architecture.get("output_modalities", [])) or "None")
table.add_row("Supported Parameters", supported_params)
table.add_row("Top Provider Context Length", str(top_provider.get("context_length", "N/A")))
table.add_row("Max Completion Tokens", str(top_provider.get("max_completion_tokens", "N/A")))
table.add_row("Moderated", "Yes" if top_provider.get("is_moderated", False) else "No")
console.print(Panel(table, title=f"[bold green]Model Info: {model_to_show['name']}[/]", title_align="left"))
continue
elif user_input.startswith("/model"):
app_logger.info("User initiated model selection")
args = user_input[7:].strip()
search_term = args if args else ""
filtered_models = text_models
if search_term:
filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()]
if not filtered_models:
console.print(f"[bold red]No models match '{search_term}'. Try '/model'.[/]")
continue
table = Table("No.", "Name", "ID", "Image", "Online", show_header=True, header_style="bold magenta")
for i, model in enumerate(filtered_models, 1):
image_support = "[green]✓[/green]" if has_image_capability(model) else "[red]✗[/red]"
online_support = "[green]✓[/green]" if supports_online_mode(model) else "[red]✗[/red]"
table.add_row(str(i), model["name"], model["id"], image_support, online_support)
title = f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]"
display_paginated_table(table, title)
while True:
try:
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
if choice == 0:
break
if 1 <= choice <= len(filtered_models):
selected_model = filtered_models[choice - 1]
# Apply default online mode if model supports it
if supports_online_mode(selected_model) and DEFAULT_ONLINE_MODE == "on":
online_mode_enabled = True
console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]")
console.print("[dim cyan]✓ Online mode auto-enabled (default setting). Use '/online off' to disable for this session.[/]")
else:
online_mode_enabled = False
console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]")
if supports_online_mode(selected_model):
console.print("[dim green]✓ This model supports online mode. Use '/online on' to enable web search.[/]")
app_logger.info(f"Model selected: {selected_model['name']} ({selected_model['id']}), Online: {online_mode_enabled}")
break
console.print("[bold red]Invalid choice. Try again.[/]")
except ValueError:
console.print("[bold red]Invalid input. Enter a number.[/]")
continue
elif user_input.startswith("/maxtoken"):
args = user_input[10:].strip()
if not args:
console.print(f"[bold blue]Current session max tokens: {session_max_token}[/]")
continue
try:
new_limit = int(args)
if new_limit < 1:
console.print("[bold red]Session token limit must be at least 1.[/]")
continue
if new_limit > MAX_TOKEN:
console.print(f"[bold yellow]Cannot exceed stored max ({MAX_TOKEN}). Capping.[/]")
new_limit = MAX_TOKEN
session_max_token = new_limit
console.print(f"[bold green]Session max tokens set to: {session_max_token}[/]")
except ValueError:
console.print("[bold red]Invalid token limit. Provide a positive integer.[/]")
continue
elif user_input.startswith("/system"):
args = user_input[8:].strip()
if not args:
if session_system_prompt:
console.print(f"[bold blue]Current session system prompt:[/] {session_system_prompt}")
else:
console.print("[bold blue]No session system prompt set.[/]")
continue
if args.lower() == "clear":
session_system_prompt = ""
console.print("[bold green]Session system prompt cleared.[/]")
else:
session_system_prompt = args
console.print(f"[bold green]Session system prompt set to: {session_system_prompt}[/]")
continue
elif user_input.startswith("/config"):
args = user_input[8:].strip().lower()
update = check_for_updates(version)
if args == "api":
try:
new_api_key = typer.prompt("Enter new API key")
if new_api_key.strip():
set_config('api_key', new_api_key.strip())
API_KEY = new_api_key.strip()
client = OpenRouter(api_key=API_KEY)
console.print("[bold green]API key updated![/]")
else:
console.print("[bold yellow]No change.[/]")
except Exception as e:
console.print(f"[bold red]Error updating API key: {e}[/]")
elif args == "url":
try:
new_url = typer.prompt("Enter new base URL")
if new_url.strip():
set_config('base_url', new_url.strip())
OPENROUTER_BASE_URL = new_url.strip()
console.print("[bold green]Base URL updated![/]")
else:
console.print("[bold yellow]No change.[/]")
except Exception as e:
console.print(f"[bold red]Error updating URL: {e}[/]")
elif args.startswith("costwarning"):
sub_args = args[11:].strip()
if not sub_args:
console.print(f"[bold blue]Stored cost warning threshold: ${COST_WARNING_THRESHOLD:.4f}[/]")
continue
try:
new_threshold = float(sub_args)
if new_threshold < 0:
console.print("[bold red]Cost warning threshold must be >= 0.[/]")
continue
set_config(HIGH_COST_WARNING, str(new_threshold))
COST_WARNING_THRESHOLD = new_threshold
console.print(f"[bold green]Cost warning threshold set to ${COST_WARNING_THRESHOLD:.4f}[/]")
except ValueError:
console.print("[bold red]Invalid cost threshold. Provide a valid number.[/]")
elif args.startswith("stream"):
sub_args = args[7:].strip()
if sub_args in ["on", "off"]:
set_config('stream_enabled', sub_args)
STREAM_ENABLED = sub_args
console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]")
else:
console.print("[bold yellow]Usage: /config stream on|off[/]")
elif args.startswith("log"):
sub_args = args[4:].strip()
if not sub_args:
console.print(f"[bold blue]Current log file size limit: {LOG_MAX_SIZE_MB} MB[/]")
console.print(f"[bold blue]Log backup count: {LOG_BACKUP_COUNT} files[/]")
console.print(f"[dim yellow]Total max disk usage: ~{LOG_MAX_SIZE_MB * (LOG_BACKUP_COUNT + 1)} MB[/]")
continue
try:
new_size_mb = int(sub_args)
if new_size_mb < 1:
console.print("[bold red]Log size must be at least 1 MB.[/]")
continue
if new_size_mb > 100:
console.print("[bold yellow]Warning: Log size > 100MB. Capping at 100MB.[/]")
new_size_mb = 100
set_config('log_max_size_mb', str(new_size_mb))
LOG_MAX_SIZE_MB = new_size_mb
console.print(f"[bold green]Log size limit set to {new_size_mb} MB.[/]")
console.print("[bold yellow]⚠️ Restart the application for this change to take effect.[/]")
app_logger.info(f"Log size limit updated to {new_size_mb} MB (requires restart)")
except ValueError:
console.print("[bold red]Invalid size. Provide a number in MB.[/]")
elif args.startswith("online"):
sub_args = args[7:].strip()
if not sub_args:
current_default = "enabled" if DEFAULT_ONLINE_MODE == "on" else "disabled"
console.print(f"[bold blue]Default online mode: {current_default}[/]")
console.print("[dim yellow]This sets the default for new models. Use '/online on|off' to override in current session.[/]")
continue
if sub_args in ["on", "off"]:
set_config('default_online_mode', sub_args)
DEFAULT_ONLINE_MODE = sub_args
console.print(f"[bold green]Default online mode {'enabled' if sub_args == 'on' else 'disabled'}.[/]")
console.print("[dim blue]Note: This affects new model selections. Current session unchanged.[/]")
app_logger.info(f"Default online mode set to {sub_args}")
else:
console.print("[bold yellow]Usage: /config online on|off[/]")
elif args.startswith("maxtoken"):
sub_args = args[9:].strip()
if not sub_args:
console.print(f"[bold blue]Stored max token limit: {MAX_TOKEN}[/]")
continue
try:
new_max = int(sub_args)
if new_max < 1:
console.print("[bold red]Max token limit must be at least 1.[/]")
continue
if new_max > 1000000:
console.print("[bold yellow]Capped at 1M for safety.[/]")
new_max = 1000000
set_config('max_token', str(new_max))
MAX_TOKEN = new_max
if session_max_token > MAX_TOKEN:
session_max_token = MAX_TOKEN
console.print(f"[bold yellow]Session adjusted to {session_max_token}.[/]")
console.print(f"[bold green]Stored max token limit updated to: {MAX_TOKEN}[/]")
except ValueError:
console.print("[bold red]Invalid token limit.[/]")
elif args.startswith("model"):
sub_args = args[6:].strip()
search_term = sub_args if sub_args else ""
filtered_models = text_models
if search_term:
filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()]
if not filtered_models:
console.print(f"[bold red]No models match '{search_term}'. Try without search.[/]")
continue
table = Table("No.", "Name", "ID", "Image", "Online", show_header=True, header_style="bold magenta")
for i, model in enumerate(filtered_models, 1):
image_support = "[green]✓[/green]" if has_image_capability(model) else "[red]✗[/red]"
online_support = "[green]✓[/green]" if supports_online_mode(model) else "[red]✗[/red]"
table.add_row(str(i), model["name"], model["id"], image_support, online_support)
title = f"[bold green]Available Models for Default ({'All' if not search_term else f'Search: {search_term}'})[/]"
display_paginated_table(table, title)
while True:
try:
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
if choice == 0:
break
if 1 <= choice <= len(filtered_models):
default_model = filtered_models[choice - 1]
set_config('default_model', default_model["id"])
current_name = selected_model['name'] if selected_model else "None"
console.print(f"[bold cyan]Default model set to: {default_model['name']} ({default_model['id']}). Current unchanged: {current_name}[/]")
break
console.print("[bold red]Invalid choice. Try again.[/]")
except ValueError:
console.print("[bold red]Invalid input. Enter a number.[/]")
else:
DEFAULT_MODEL_ID = get_config('default_model')
memory_status = "Enabled" if conversation_memory_enabled else "Disabled"
memory_tracked = len(session_history) - memory_start_index if conversation_memory_enabled else 0
table = Table("Setting", "Value", show_header=True, header_style="bold magenta", width=console.width - 10)
table.add_row("API Key", API_KEY or "[Not set]")
table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]")
table.add_row("DB Path", str(database) or "[Not set]")
table.add_row("Logfile", str(log_file) or "[Not set]")
table.add_row("Log Size Limit", f"{LOG_MAX_SIZE_MB} MB")
table.add_row("Log Backups", str(LOG_BACKUP_COUNT))
table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled")
table.add_row("Default Model", DEFAULT_MODEL_ID or "[Not set]")
table.add_row("Current Model", "[Not set]" if selected_model is None else str(selected_model["name"]))
table.add_row("Default Online Mode", "Enabled" if DEFAULT_ONLINE_MODE == "on" else "Disabled")
table.add_row("Session Online Mode", "Enabled" if online_mode_enabled else "Disabled")
table.add_row("Max Token", str(MAX_TOKEN))
table.add_row("Session Token", "[Not set]" if session_max_token == 0 else str(session_max_token))
table.add_row("Session System Prompt", session_system_prompt or "[Not set]")
table.add_row("Cost Warning Threshold", f"${COST_WARNING_THRESHOLD:.4f}")
table.add_row("Middle-out Transform", "Enabled" if middle_out_enabled else "Disabled")
table.add_row("Conversation Memory", f"{memory_status} ({memory_tracked} tracked)" if conversation_memory_enabled else memory_status)
table.add_row("History Size", str(len(session_history)))
table.add_row("Current History Index", str(current_index) if current_index >= 0 else "[None]")
table.add_row("App Name", APP_NAME)
table.add_row("App URL", APP_URL)
credits = get_credits(API_KEY, OPENROUTER_BASE_URL)
if credits:
table.add_row("Total Credits", credits['total_credits'])
table.add_row("Used Credits", credits['used_credits'])
table.add_row("Credits Left", credits['credits_left'])
else:
table.add_row("Total Credits", "[Unavailable - Check API key]")
table.add_row("Used Credits", "[Unavailable - Check API key]")
table.add_row("Credits Left", "[Unavailable - Check API key]")
console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left", subtitle="%s" %(update), subtitle_align="right"))
continue
if user_input.lower() == "/credits":
credits = get_credits(API_KEY, OPENROUTER_BASE_URL)
if credits:
console.print(f"[bold green]Credits left: {credits['credits_left']}[/]")
alerts = check_credit_alerts(credits)
if alerts:
for alert in alerts:
console.print(f"[bold red]⚠️ {alert}[/]")
else:
console.print("[bold red]Unable to fetch credits. Check your API key or network.[/]")
continue
if user_input.lower() == "/clear" or user_input.lower() == "/cl":
clear_screen()
DEFAULT_MODEL_ID = get_config('default_model')
token_value = session_max_token if session_max_token != 0 else " Not set"
console.print(f"[bold cyan]Token limits: Max= {MAX_TOKEN}, Session={token_value}[/]")
console.print("[bold blue]Active model[/] [bold red]%s[/]" %(str(selected_model["name"]) if selected_model else "None"))
if online_mode_enabled:
console.print("[bold cyan]Online mode: Enabled (web search active)[/]")
continue
if user_input.lower() == "/help":
help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan", width=console.width - 10)
# SESSION COMMANDS
help_table.add_row(
"[bold yellow]━━━ SESSION COMMANDS ━━━[/]",
"",
""
)
help_table.add_row(
"/clear or /cl",
"Clear the terminal screen for a clean interface. You can also use the keycombo [bold]ctrl+l[/]",
"/clear\n/cl"
)
help_table.add_row(
"/help",
"Show this help menu with all available commands.",
"/help"
)
help_table.add_row(
"/memory [on|off]",
"Toggle conversation memory. ON sends history (AI remembers), OFF sends only current message (saves cost).",
"/memory\n/memory off"
)
help_table.add_row(
"/next",
"View the next response in history.",
"/next"
)
help_table.add_row(
"/online [on|off]",
"Enable/disable online mode (web search) for current session. Overrides default setting.",
"/online on\n/online off"
)
help_table.add_row(
"/paste [prompt]",
"Paste plain text/code from clipboard and send to AI. Optional prompt can be added.",
"/paste\n/paste Explain this code"
)
help_table.add_row(
"/prev",
"View the previous response in history.",
"/prev"
)
help_table.add_row(
"/reset",
"Clear conversation history and reset system prompt (resets session metrics). Requires confirmation.",
"/reset"
)
help_table.add_row(
"/retry",
"Resend the last prompt from history.",
"/retry"
)
# MODEL COMMANDS
help_table.add_row(
"[bold yellow]━━━ MODEL COMMANDS ━━━[/]",
"",
""
)
help_table.add_row(
"/info [model_id]",
"Display detailed info (pricing, modalities, context length, online support, etc.) for current or specified model.",
"/info\n/info gpt-4o"
)
help_table.add_row(
"/model [search]",
"Select or change the current model for the session. Shows image and online capabilities. Supports searching by name or ID.",
"/model\n/model gpt"
)
# CONFIGURATION COMMANDS
help_table.add_row(
"[bold yellow]━━━ CONFIGURATION ━━━[/]",
"",
""
)
help_table.add_row(
"/config",
"View all current configurations, including limits, credits, and history.",
"/config"
)
help_table.add_row(
"/config api",
"Set or update the OpenRouter API key.",
"/config api"
)
help_table.add_row(
"/config costwarning [value]",
"Set the cost warning threshold. Alerts when response exceeds this cost (in USD).",
"/config costwarning 0.05"
)
help_table.add_row(
"/config log [size_mb]",
"Set log file size limit in MB. Older logs are rotated automatically. Requires restart.",
"/config log 20"
)
help_table.add_row(
"/config maxtoken [value]",
"Set stored max token limit (persisted in DB). View current if no value provided.",
"/config maxtoken 50000"
)
help_table.add_row(
"/config model [search]",
"Set default model that loads on startup. Shows image and online capabilities. Doesn't change current session model.",
"/config model gpt"
)
help_table.add_row(
"/config online [on|off]",
"Set default online mode for new model selections. Use '/online on|off' to override current session.",
"/config online on"
)
help_table.add_row(
"/config stream [on|off]",
"Enable or disable response streaming.",
"/config stream off"
)
help_table.add_row(
"/config url",
"Set or update the base URL for OpenRouter API.",
"/config url"
)
# TOKEN & SYSTEM COMMANDS
help_table.add_row(
"[bold yellow]━━━ TOKEN & SYSTEM ━━━[/]",
"",
""
)
help_table.add_row(
"/maxtoken [value]",
"Set temporary session token limit (≤ stored max). View current if no value provided.",
"/maxtoken 2000"
)
help_table.add_row(
"/middleout [on|off]",
"Enable/disable middle-out transform to compress prompts exceeding context size.",
"/middleout on"
)
help_table.add_row(
"/system [prompt|clear]",
"Set session-level system prompt to guide AI behavior. Use 'clear' to reset.",
"/system You are a Python expert"
)
# CONVERSATION MANAGEMENT
help_table.add_row(
"[bold yellow]━━━ CONVERSATION MGMT ━━━[/]",
"",
""
)
help_table.add_row(
"/delete ",
"Delete a saved conversation by name or number (from /list). Requires confirmation.",
"/delete my_chat\n/delete 3"
)
help_table.add_row(
"/export ",
"Export conversation to file. Formats: md (Markdown), json (JSON), html (HTML).",
"/export md notes.md\n/export html report.html"
)
help_table.add_row(
"/list",
"List all saved conversations with numbers, message counts, and timestamps.",
"/list"
)
help_table.add_row(
"/load ",
"Load a saved conversation by name or number (from /list). Resets session metrics.",
"/load my_chat\n/load 3"
)
help_table.add_row(
"/save ",
"Save current conversation history to database.",
"/save my_chat"
)
# MONITORING & STATS
help_table.add_row(
"[bold yellow]━━━ MONITORING & STATS ━━━[/]",
"",
""
)
help_table.add_row(
"/credits",
"Display credits left on your OpenRouter account with alerts.",
"/credits"
)
help_table.add_row(
"/stats",
"Display session cost summary: tokens, cost, credits left, and warnings.",
"/stats"
)
# INPUT METHODS
help_table.add_row(
"[bold yellow]━━━ INPUT METHODS ━━━[/]",
"",
""
)
help_table.add_row(
"@/path/to/file",
"Attach files to messages: images (PNG, JPG, etc.), PDFs, and code files (.py, .js, etc.).",
"Debug @script.py\nSummarize @document.pdf\nAnalyze @image.png"
)
help_table.add_row(
"Clipboard paste",
"Use /paste to send clipboard content (plain text/code) to AI.",
"/paste\n/paste Explain this"
)
help_table.add_row(
"// escape",
"Start message with // to send a literal / character (e.g., //command sends '/command' as text, not a command)",
"//help sends '/help' as text"
)
# EXIT
help_table.add_row(
"[bold yellow]━━━ EXIT ━━━[/]",
"",
""
)
help_table.add_row(
"exit | quit | bye",
"Quit the chat application and display session summary.",
"exit"
)
console.print(Panel(
help_table,
title="[bold cyan]oAI Chat Help (Version %s)[/]" % version,
title_align="center",
subtitle="💡 Tip: Commands are case-insensitive • Memory ON by default (toggle with /memory) • Use // to escape / at start of input • Visit: https://iurl.no/oai",
subtitle_align="center",
border_style="cyan"
))
continue
if not selected_model:
console.print("[bold yellow]Select a model first with '/model'.[/]")
continue
# Process file attachments with PDF support
content_blocks = []
text_part = user_input
file_attachments = []
for match in re.finditer(r'@([^\s]+)', user_input):
file_path = match.group(1)
expanded_path = os.path.expanduser(os.path.abspath(file_path))
if not os.path.exists(expanded_path) or os.path.isdir(expanded_path):
console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]")
continue
file_size = os.path.getsize(expanded_path)
if file_size > 10 * 1024 * 1024:
console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]")
continue
mime_type, _ = mimetypes.guess_type(expanded_path)
file_ext = os.path.splitext(expanded_path)[1].lower()
try:
with open(expanded_path, 'rb') as f:
file_data = f.read()
# Handle images
if mime_type and mime_type.startswith('image/'):
modalities = selected_model.get("architecture", {}).get("input_modalities", [])
if "image" not in modalities:
console.print("[bold red]Selected model does not support image attachments.[/]")
console.print(f"[dim yellow]Supported modalities: {', '.join(modalities) if modalities else 'text only'}[/]")
continue
b64_data = base64.b64encode(file_data).decode('utf-8')
content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}})
console.print(f"[dim green]✓ Image attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]")
# Handle PDFs
elif mime_type == 'application/pdf' or file_ext == '.pdf':
modalities = selected_model.get("architecture", {}).get("input_modalities", [])
supports_pdf = any(mod in modalities for mod in ["document", "pdf", "file"])
if not supports_pdf:
console.print("[bold red]Selected model does not support PDF attachments.[/]")
console.print(f"[dim yellow]Supported modalities: {', '.join(modalities) if modalities else 'text only'}[/]")
continue
b64_data = base64.b64encode(file_data).decode('utf-8')
content_blocks.append({"type": "image_url", "image_url": {"url": f"data:application/pdf;base64,{b64_data}"}})
console.print(f"[dim green]✓ PDF attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]")
# Handle code/text files
elif (mime_type == 'text/plain' or file_ext in SUPPORTED_CODE_EXTENSIONS):
text_content = file_data.decode('utf-8')
content_blocks.append({"type": "text", "text": f"Code File: {os.path.basename(expanded_path)}\n\n{text_content}"})
console.print(f"[dim green]✓ Code file attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]")
else:
console.print(f"[bold red]Unsupported file type ({mime_type}) for {expanded_path}.[/]")
console.print("[bold yellow]Supported types: images (PNG, JPG, etc.), PDFs, and code files (.py, .js, etc.)[/]")
continue
file_attachments.append(file_path)
app_logger.info(f"File attached: {os.path.basename(expanded_path)}, Type: {mime_type or file_ext}, Size: {file_size / 1024:.1f} KB")
except UnicodeDecodeError:
console.print(f"[bold red]Cannot decode {expanded_path} as UTF-8. File may be binary or use unsupported encoding.[/]")
app_logger.error(f"UTF-8 decode error for {expanded_path}")
continue
except Exception as e:
console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]")
app_logger.error(f"File read error for {expanded_path}: {e}")
continue
text_part = re.sub(r'@([^\s]+)', '', text_part).strip()
# Build message content
if text_part or content_blocks:
message_content = []
if text_part:
message_content.append({"type": "text", "text": text_part})
message_content.extend(content_blocks)
else:
console.print("[bold red]Prompt cannot be empty.[/]")
continue
# Build API messages with conversation history if memory is enabled
api_messages = []
if session_system_prompt:
api_messages.append({"role": "system", "content": session_system_prompt})
if conversation_memory_enabled:
for i in range(memory_start_index, len(session_history)):
history_entry = session_history[i]
api_messages.append({
"role": "user",
"content": history_entry['prompt']
})
api_messages.append({
"role": "assistant",
"content": history_entry['response']
})
api_messages.append({"role": "user", "content": message_content})
# Get effective model ID with :online suffix if enabled
effective_model_id = get_effective_model_id(selected_model["id"], online_mode_enabled)
# Build API params
api_params = {
"model": effective_model_id,
"messages": api_messages,
"stream": STREAM_ENABLED == "on",
"http_headers": {
"HTTP-Referer": APP_URL,
"X-Title": APP_NAME
}
}
if session_max_token > 0:
api_params["max_tokens"] = session_max_token
if middle_out_enabled:
api_params["transforms"] = ["middle-out"]
# Log API request
file_count = len(file_attachments)
history_messages_count = len(session_history) - memory_start_index if conversation_memory_enabled else 0
memory_status = "ON" if conversation_memory_enabled else "OFF"
online_status = "ON" if online_mode_enabled else "OFF"
app_logger.info(f"API Request: Model '{effective_model_id}' (Online: {online_status}), Prompt length: {len(text_part)} chars, {file_count} file(s) attached, Memory: {memory_status}, History sent: {history_messages_count} messages, Transforms: middle-out {'enabled' if middle_out_enabled else 'disabled'}, App: {APP_NAME} ({APP_URL}).")
# Send and handle response
is_streaming = STREAM_ENABLED == "on"
if is_streaming:
console.print("[bold green]Streaming response...[/] [dim](Press Ctrl+C to cancel)[/]")
if online_mode_enabled:
console.print("[dim cyan]🌐 Online mode active - model has web search access[/]")
console.print("")
else:
console.print("[bold green]Thinking...[/]", end="\r")
start_time = time.time()
try:
response = client.chat.send(**api_params)
app_logger.info(f"API call successful for model '{effective_model_id}'")
except Exception as e:
console.print(f"[bold red]Error sending request: {e}[/]")
app_logger.error(f"API Error: {type(e).__name__}: {e}")
continue
response_time = time.time() - start_time
full_response = ""
if is_streaming:
try:
with Live("", console=console, refresh_per_second=10, auto_refresh=True) as live:
for chunk in response:
if hasattr(chunk, 'error') and chunk.error:
console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]")
app_logger.error(f"Stream error: {chunk.error.message}")
break
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
full_response += content_chunk
md = Markdown(full_response)
live.update(md)
console.print("")
except KeyboardInterrupt:
console.print("\n[bold yellow]Streaming cancelled![/]")
app_logger.info("Streaming cancelled by user")
continue
else:
full_response = response.choices[0].message.content if response.choices else ""
console.print(f"\r{' ' * 20}\r", end="")
if full_response:
if not is_streaming:
md = Markdown(full_response)
console.print(Panel(md, title="[bold green]AI Response[/]", title_align="left", border_style="green"))
session_history.append({'prompt': user_input, 'response': full_response})
current_index = len(session_history) - 1
# Process metrics
usage = getattr(response, 'usage', None)
input_tokens = usage.input_tokens if usage and hasattr(usage, 'input_tokens') else 0
output_tokens = usage.output_tokens if usage and hasattr(usage, 'output_tokens') else 0
msg_cost = usage.total_cost_usd if usage and hasattr(usage, 'total_cost_usd') else estimate_cost(input_tokens, output_tokens)
total_input_tokens += input_tokens
total_output_tokens += output_tokens
total_cost += msg_cost
message_count += 1
app_logger.info(f"Response: Tokens - I:{input_tokens} O:{output_tokens} T:{input_tokens + output_tokens}, Cost: ${msg_cost:.4f}, Time: {response_time:.2f}s, Online: {online_mode_enabled}")
# Per-message metrics display
if conversation_memory_enabled:
context_count = len(session_history) - memory_start_index
context_info = f", Context: {context_count} msg(s)" if context_count > 1 else ""
else:
context_info = ", Memory: OFF"
online_info = " 🌐" if online_mode_enabled else ""
console.print(f"\n[dim blue]📊 Metrics: {input_tokens + output_tokens} tokens | ${msg_cost:.4f} | {response_time:.2f}s{context_info}{online_info} | Session: {total_input_tokens + total_output_tokens} tokens | ${total_cost:.4f}[/]")
# Cost and credit alerts
warnings = []
if msg_cost > COST_WARNING_THRESHOLD:
warnings.append(f"High cost alert: This response exceeded configurable threshold ${COST_WARNING_THRESHOLD:.4f}")
credits_data = get_credits(API_KEY, OPENROUTER_BASE_URL)
if credits_data:
warning_alerts = check_credit_alerts(credits_data)
warnings.extend(warning_alerts)
if warnings:
warning_text = ' | '.join(warnings)
console.print(f"[bold red]⚠️ {warning_text}[/]")
app_logger.warning(f"Warnings triggered: {warning_text}")
console.print("")
try:
copy_choice = input("💾 Type 'c' to copy response, or press Enter to continue: ").strip().lower()
if copy_choice == "c":
pyperclip.copy(full_response)
console.print("[bold green]✅ Response copied to clipboard![/]")
except (EOFError, KeyboardInterrupt):
pass
console.print("")
else:
console.print("[bold red]No response received.[/]")
app_logger.error("No response from API")
except KeyboardInterrupt:
console.print("\n[bold yellow]Input interrupted. Continuing...[/]")
app_logger.warning("Input interrupted by Ctrl+C")
continue
except EOFError:
console.print("\n[bold yellow]Goodbye![/]")
total_tokens = total_input_tokens + total_output_tokens
app_logger.info(f"Session ended via EOF. Total messages: {message_count}, Total tokens: {total_tokens}, Total cost: ${total_cost:.4f}")
return
except Exception as e:
console.print(f"[bold red]Error: {e}[/]")
console.print("[bold yellow]Try again or select a model.[/]")
app_logger.error(f"Unexpected error: {type(e).__name__}: {e}")
if __name__ == "__main__":
clear_screen()
app()