* fix: warn on provider/model mismatch, surface auth errors (#266) Fixes #266 — WebUI silently ignores provider/model selection mismatch. The problem: selecting an OpenRouter (or Anthropic/OpenAI) model while Hermes is configured for a different provider (e.g. local Ollama) sends the request to the wrong endpoint, which returns a 401 Unauthorized error with no UI indication of why. Three-layer fix: 1. api/streaming.py — detect 401/auth errors explicitly Added is_auth_error detection covering '401', 'AuthenticationError', 'authentication', 'unauthorized', 'invalid api key', and the specific Ollama error string 'no cookie auth credentials'. Auth errors emit apperror with type='auth_mismatch' and a hint pointing to 'hermes model'. 2. static/ui.js — expose active_provider and warn on selection - populateModelDropdown() stores data.active_provider from /api/models as window._activeProvider (the field was already in the response but the frontend never used it) - New _checkProviderMismatch(modelId) helper: compares the selected model's slash-prefix (e.g. 'openai/' from 'openai/gpt-4o') against the active provider. Skips the check for 'openrouter' and 'custom' to avoid false positives on configs that legitimately route any model. 3. static/boot.js — warn on model dropdown change modelSelect.onchange calls _checkProviderMismatch() and shows a toast when the selected model looks incompatible with the configured provider. 4. static/messages.js — distinct UI label for auth errors apperror handler now distinguishes type='auth_mismatch' and shows 'Provider mismatch' as the error label instead of 'Error'. 5. static/i18n.js — provider_mismatch_warning and provider_mismatch_label keys added to all 5 locales (en, es, de, zh-Hans, zh-Hant). Tests: 21 new tests in tests/test_provider_mismatch.py covering all five change areas. 679/679 total pass (658 baseline + 21 new). * fix: t() call args spread + use i18n label for auth mismatch 1. ui.js: _checkProviderMismatch passed [modelId, ap] as a single array arg to t(). Since t(key, ...args) spreads, the function received the array as m and undefined as p. Fixed to pass as separate args: t('provider_mismatch_warning', modelId, ap). 2. messages.js: 'Provider mismatch' label was hardcoded instead of using t('provider_mismatch_label'). Now uses the i18n key with fallback for when t() isn't available. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Nathan Esquenazi <nesquena@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
544 lines
25 KiB
Python
544 lines
25 KiB
Python
"""
|
|
Hermes Web UI -- SSE streaming engine and agent thread runner.
|
|
Includes Sprint 10 cancel support via CANCEL_FLAGS.
|
|
"""
|
|
import json
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
from api.config import (
|
|
STREAMS, STREAMS_LOCK, CANCEL_FLAGS, AGENT_INSTANCES, CLI_TOOLSETS,
|
|
LOCK, SESSIONS, SESSION_DIR,
|
|
_get_session_agent_lock, _set_thread_env, _clear_thread_env,
|
|
resolve_model_provider,
|
|
)
|
|
from api.helpers import redact_session_data
|
|
|
|
# Global lock for os.environ writes. Per-session locks (_agent_lock) prevent
|
|
# concurrent runs of the SAME session, but two DIFFERENT sessions can still
|
|
# interleave their os.environ writes. This global lock serializes the env
|
|
# save/restore around the entire agent run.
|
|
_ENV_LOCK = threading.Lock()
|
|
|
|
# Lazy import to avoid circular deps -- hermes-agent is on sys.path via api/config.py
|
|
try:
|
|
from run_agent import AIAgent
|
|
except ImportError:
|
|
AIAgent = None
|
|
|
|
def _get_ai_agent():
|
|
"""Return AIAgent class, retrying the import if the initial attempt failed.
|
|
|
|
auto_install_agent_deps() in server.py may install missing packages after
|
|
this module is first imported (common in Docker with a volume-mounted agent).
|
|
Re-attempting the import here picks up the newly installed packages without
|
|
requiring a server restart.
|
|
"""
|
|
global AIAgent
|
|
if AIAgent is None:
|
|
try:
|
|
from run_agent import AIAgent as _cls # noqa: PLC0415
|
|
AIAgent = _cls
|
|
except ImportError:
|
|
pass
|
|
return AIAgent
|
|
from api.models import get_session, title_from
|
|
from api.workspace import set_last_workspace
|
|
|
|
# Fields that are safe to send to LLM provider APIs.
|
|
# Everything else (attachments, timestamp, _ts, etc.) is display-only
|
|
# metadata added by the webui and must be stripped before the API call.
|
|
_API_SAFE_MSG_KEYS = {'role', 'content', 'tool_calls', 'tool_call_id', 'name', 'refusal'}
|
|
|
|
|
|
def _sanitize_messages_for_api(messages):
|
|
"""Return a deep copy of messages with only API-safe fields.
|
|
|
|
The webui stores extra metadata on messages (attachments, timestamp, _ts)
|
|
for display purposes. Some providers (e.g. Z.AI/GLM) reject unknown fields
|
|
instead of ignoring them, causing HTTP 400 errors on subsequent messages.
|
|
"""
|
|
clean = []
|
|
for msg in messages:
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
sanitized = {k: v for k, v in msg.items() if k in _API_SAFE_MSG_KEYS}
|
|
if sanitized.get('role'):
|
|
clean.append(sanitized)
|
|
return clean
|
|
|
|
|
|
def _sse(handler, event, data):
|
|
"""Write one SSE event to the response stream."""
|
|
payload = f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
|
handler.wfile.write(payload.encode('utf-8'))
|
|
handler.wfile.flush()
|
|
|
|
|
|
def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, attachments=None):
|
|
"""Run agent in background thread, writing SSE events to STREAMS[stream_id]."""
|
|
q = STREAMS.get(stream_id)
|
|
if q is None:
|
|
return
|
|
|
|
# Sprint 10: create a cancel event for this stream
|
|
cancel_event = threading.Event()
|
|
with STREAMS_LOCK:
|
|
CANCEL_FLAGS[stream_id] = cancel_event
|
|
|
|
def put(event, data):
|
|
# If cancelled, drop all further events except the cancel event itself
|
|
if cancel_event.is_set() and event not in ('cancel', 'error'):
|
|
return
|
|
try:
|
|
q.put_nowait((event, data))
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
s = get_session(session_id)
|
|
s.workspace = str(Path(workspace).expanduser().resolve())
|
|
s.model = model
|
|
|
|
_agent_lock = _get_session_agent_lock(session_id)
|
|
# TD1: set thread-local env context so concurrent sessions don't clobber globals
|
|
# Check for pre-flight cancel (user cancelled before agent even started)
|
|
if cancel_event.is_set():
|
|
put('cancel', {'message': 'Cancelled before start'})
|
|
return
|
|
|
|
# Resolve profile home for this agent run (snapshot at start)
|
|
try:
|
|
from api.profiles import get_active_hermes_home
|
|
_profile_home = str(get_active_hermes_home())
|
|
except ImportError:
|
|
_profile_home = os.environ.get('HERMES_HOME', '')
|
|
|
|
_set_thread_env(
|
|
TERMINAL_CWD=str(s.workspace),
|
|
HERMES_EXEC_ASK='1',
|
|
HERMES_SESSION_KEY=session_id,
|
|
HERMES_HOME=_profile_home,
|
|
)
|
|
# Still set process-level env as fallback for tools that bypass thread-local
|
|
# Acquire lock only for the env mutation, then release before the agent runs.
|
|
# The finally block re-acquires to restore — keeping critical sections short
|
|
# and preventing a deadlock where the restore would re-enter the same lock.
|
|
with _ENV_LOCK:
|
|
old_cwd = os.environ.get('TERMINAL_CWD')
|
|
old_exec_ask = os.environ.get('HERMES_EXEC_ASK')
|
|
old_session_key = os.environ.get('HERMES_SESSION_KEY')
|
|
old_hermes_home = os.environ.get('HERMES_HOME')
|
|
os.environ['TERMINAL_CWD'] = str(s.workspace)
|
|
os.environ['HERMES_EXEC_ASK'] = '1'
|
|
os.environ['HERMES_SESSION_KEY'] = session_id
|
|
if _profile_home:
|
|
os.environ['HERMES_HOME'] = _profile_home
|
|
# Lock released — agent runs without holding it
|
|
# Register a gateway-style notify callback so the approval system can
|
|
# push the `approval` SSE event the moment a dangerous command is
|
|
# detected, without waiting for the next on_tool() poll cycle.
|
|
# Without this, the agent thread blocks inside the terminal tool
|
|
# waiting for approval that the UI never knew to ask for, leaving
|
|
# the chat stuck in "Thinking…" forever.
|
|
_approval_registered = False
|
|
_unreg_notify = None
|
|
try:
|
|
from tools.approval import (
|
|
register_gateway_notify as _reg_notify,
|
|
unregister_gateway_notify as _unreg_notify,
|
|
)
|
|
def _approval_notify_cb(approval_data):
|
|
put('approval', approval_data)
|
|
_reg_notify(session_id, _approval_notify_cb)
|
|
_approval_registered = True
|
|
except ImportError:
|
|
pass # approval module not available — fall back to polling
|
|
|
|
try:
|
|
def on_token(text):
|
|
if text is None:
|
|
return # end-of-stream sentinel
|
|
put('token', {'text': text})
|
|
|
|
def on_tool(name, preview, args):
|
|
args_snap = {}
|
|
if isinstance(args, dict):
|
|
for k, v in list(args.items())[:4]:
|
|
s2 = str(v); args_snap[k] = s2[:120]+('...' if len(s2)>120 else '')
|
|
put('tool', {'name': name, 'preview': preview, 'args': args_snap})
|
|
# Fallback: poll for pending approval in case notify_cb wasn't
|
|
# registered (e.g. older approval module without gateway support).
|
|
try:
|
|
from tools.approval import has_pending as _has_pending, _pending, _lock
|
|
if _has_pending(session_id):
|
|
with _lock:
|
|
p = dict(_pending.get(session_id, {}))
|
|
if p:
|
|
put('approval', p)
|
|
except ImportError:
|
|
pass
|
|
|
|
_AIAgent = _get_ai_agent()
|
|
if _AIAgent is None:
|
|
raise ImportError("AIAgent not available -- check that hermes-agent is on sys.path")
|
|
resolved_model, resolved_provider, resolved_base_url = resolve_model_provider(model)
|
|
|
|
# Resolve API key via Hermes runtime provider (matches gateway behaviour).
|
|
# Pass the resolved provider so non-default providers get their own credentials.
|
|
resolved_api_key = None
|
|
try:
|
|
from hermes_cli.runtime_provider import resolve_runtime_provider
|
|
_rt = resolve_runtime_provider(requested=resolved_provider)
|
|
resolved_api_key = _rt.get("api_key")
|
|
if not resolved_provider:
|
|
resolved_provider = _rt.get("provider")
|
|
if not resolved_base_url:
|
|
resolved_base_url = _rt.get("base_url")
|
|
except Exception as _e:
|
|
print(f"[webui] WARNING: resolve_runtime_provider failed: {_e}", flush=True)
|
|
|
|
# Read per-profile config at call time (not module-level snapshot)
|
|
from api.config import get_config as _get_config
|
|
_cfg = _get_config()
|
|
|
|
# Per-profile toolsets (fall back to module-level CLI_TOOLSETS)
|
|
_pt = _cfg.get('platform_toolsets', {})
|
|
_toolsets = _pt.get('cli', CLI_TOOLSETS) if isinstance(_pt, dict) else CLI_TOOLSETS
|
|
|
|
# Fallback model from profile config (e.g. for rate-limit recovery)
|
|
_fallback = _cfg.get('fallback_model') or None
|
|
if _fallback:
|
|
# Resolve the fallback through our provider logic too
|
|
fb_model = _fallback.get('model', '')
|
|
fb_provider = _fallback.get('provider', '')
|
|
fb_base_url = _fallback.get('base_url')
|
|
_fallback_resolved = {
|
|
'model': fb_model,
|
|
'provider': fb_provider,
|
|
'base_url': fb_base_url,
|
|
}
|
|
else:
|
|
_fallback_resolved = None
|
|
|
|
agent = _AIAgent(
|
|
model=resolved_model,
|
|
provider=resolved_provider,
|
|
base_url=resolved_base_url,
|
|
api_key=resolved_api_key,
|
|
platform='cli',
|
|
quiet_mode=True,
|
|
enabled_toolsets=_toolsets,
|
|
fallback_model=_fallback_resolved,
|
|
session_id=session_id,
|
|
stream_delta_callback=on_token,
|
|
tool_progress_callback=on_tool,
|
|
)
|
|
|
|
# Store agent instance for cancel/interrupt propagation
|
|
with STREAMS_LOCK:
|
|
AGENT_INSTANCES[stream_id] = agent
|
|
# Check if cancel was requested during agent initialization
|
|
if stream_id in CANCEL_FLAGS and CANCEL_FLAGS[stream_id].is_set():
|
|
# Cancel arrived during agent creation - interrupt immediately
|
|
try:
|
|
agent.interrupt("Cancelled before start")
|
|
except Exception:
|
|
pass
|
|
put('cancel', {'message': 'Cancelled by user'})
|
|
return
|
|
|
|
# Prepend workspace context so the agent always knows which directory
|
|
# to use for file operations, regardless of session age or AGENTS.md defaults.
|
|
workspace_ctx = f"[Workspace: {s.workspace}]\n"
|
|
workspace_system_msg = (
|
|
f"Active workspace at session start: {s.workspace}\n"
|
|
"Every user message is prefixed with [Workspace: /absolute/path] indicating the "
|
|
"workspace the user has selected in the web UI at the time they sent that message. "
|
|
"This tag is the single authoritative source of the active workspace and updates "
|
|
"with every message. It overrides any prior workspace mentioned in this system "
|
|
"prompt, memory, or conversation history. Always use the value from the most recent "
|
|
"[Workspace: ...] tag as your default working directory for ALL file operations: "
|
|
"write_file, read_file, search_files, terminal workdir, and patch. "
|
|
"Never fall back to a hardcoded path when this tag is present."
|
|
)
|
|
# Resolve personality prompt from config.yaml agent.personalities
|
|
# (matches hermes-agent CLI behavior — passes via ephemeral_system_prompt)
|
|
_personality_prompt = None
|
|
_pname = getattr(s, 'personality', None)
|
|
if _pname:
|
|
_agent_cfg = _cfg.get('agent', {})
|
|
_personalities = _agent_cfg.get('personalities', {})
|
|
if isinstance(_personalities, dict) and _pname in _personalities:
|
|
_pval = _personalities[_pname]
|
|
if isinstance(_pval, dict):
|
|
_parts = [_pval.get('system_prompt', '') or _pval.get('prompt', '')]
|
|
if _pval.get('tone'):
|
|
_parts.append(f'Tone: {_pval["tone"]}')
|
|
if _pval.get('style'):
|
|
_parts.append(f'Style: {_pval["style"]}')
|
|
_personality_prompt = '\n'.join(p for p in _parts if p)
|
|
else:
|
|
_personality_prompt = str(_pval)
|
|
# Pass personality via ephemeral_system_prompt (agent's own mechanism)
|
|
if _personality_prompt:
|
|
agent.ephemeral_system_prompt = _personality_prompt
|
|
result = agent.run_conversation(
|
|
user_message=workspace_ctx + msg_text,
|
|
system_message=workspace_system_msg,
|
|
conversation_history=_sanitize_messages_for_api(s.messages),
|
|
task_id=session_id,
|
|
persist_user_message=msg_text,
|
|
)
|
|
s.messages = result.get('messages') or s.messages
|
|
|
|
# ── Handle context compression side effects ──
|
|
# If compression fired inside run_conversation, the agent may have
|
|
# rotated its session_id. Detect and fix the mismatch so the WebUI
|
|
# continues writing to the correct session file.
|
|
_agent_sid = getattr(agent, 'session_id', None)
|
|
_compressed = False
|
|
if _agent_sid and _agent_sid != session_id:
|
|
old_sid = session_id
|
|
new_sid = _agent_sid
|
|
# Rename the session file
|
|
old_path = SESSION_DIR / f'{old_sid}.json'
|
|
new_path = SESSION_DIR / f'{new_sid}.json'
|
|
s.session_id = new_sid
|
|
with LOCK:
|
|
if old_sid in SESSIONS:
|
|
SESSIONS[new_sid] = SESSIONS.pop(old_sid)
|
|
if old_path.exists() and not new_path.exists():
|
|
try:
|
|
old_path.rename(new_path)
|
|
except OSError:
|
|
pass
|
|
_compressed = True
|
|
# Also detect compression via the result dict or compressor state
|
|
if not _compressed:
|
|
_compressor = getattr(agent, 'context_compressor', None)
|
|
if _compressor and getattr(_compressor, 'compression_count', 0) > 0:
|
|
_compressed = True
|
|
# Notify the frontend that compression happened
|
|
if _compressed:
|
|
put('compressed', {
|
|
'message': 'Context auto-compressed to continue the conversation',
|
|
})
|
|
|
|
# Stamp 'timestamp' on any messages that don't have one yet
|
|
_now = time.time()
|
|
for _m in s.messages:
|
|
if isinstance(_m, dict) and not _m.get('timestamp') and not _m.get('_ts'):
|
|
_m['timestamp'] = int(_now)
|
|
s.title = title_from(s.messages, s.title)
|
|
# Read token/cost usage from the agent object (if available)
|
|
input_tokens = getattr(agent, 'session_prompt_tokens', 0) or 0
|
|
output_tokens = getattr(agent, 'session_completion_tokens', 0) or 0
|
|
estimated_cost = getattr(agent, 'session_estimated_cost_usd', None)
|
|
s.input_tokens = (s.input_tokens or 0) + input_tokens
|
|
s.output_tokens = (s.output_tokens or 0) + output_tokens
|
|
if estimated_cost:
|
|
s.estimated_cost = (s.estimated_cost or 0) + estimated_cost
|
|
# Extract tool call metadata grouped by assistant message index
|
|
# Each tool call gets assistant_msg_idx so the client can render
|
|
# cards inline with the assistant bubble that triggered them.
|
|
tool_calls = []
|
|
pending_names = {} # tool_call_id -> name
|
|
pending_args = {} # tool_call_id -> args dict
|
|
pending_asst_idx = {} # tool_call_id -> index in s.messages
|
|
for msg_idx, m in enumerate(s.messages):
|
|
if m.get('role') == 'assistant':
|
|
c = m.get('content', '')
|
|
# Anthropic format: content is a list with type=tool_use blocks
|
|
if isinstance(c, list):
|
|
for p in c:
|
|
if isinstance(p, dict) and p.get('type') == 'tool_use':
|
|
tid = p.get('id', '')
|
|
pending_names[tid] = p.get('name', '')
|
|
pending_args[tid] = p.get('input', {})
|
|
pending_asst_idx[tid] = msg_idx
|
|
# OpenAI format: tool_calls as top-level field on the message
|
|
for tc in m.get('tool_calls', []):
|
|
if not isinstance(tc, dict):
|
|
continue
|
|
tid = tc.get('id', '') or tc.get('call_id', '')
|
|
fn = tc.get('function', {})
|
|
name = fn.get('name', '')
|
|
try:
|
|
import json as _j
|
|
args = _j.loads(fn.get('arguments', '{}') or '{}')
|
|
except Exception:
|
|
args = {}
|
|
if tid and name:
|
|
pending_names[tid] = name
|
|
pending_args[tid] = args
|
|
pending_asst_idx[tid] = msg_idx
|
|
elif m.get('role') == 'tool':
|
|
tid = m.get('tool_call_id') or m.get('tool_use_id', '')
|
|
name = pending_names.get(tid, '')
|
|
if not name or name == 'tool':
|
|
continue # skip unresolvable tool entries
|
|
asst_idx = pending_asst_idx.get(tid, -1)
|
|
args = pending_args.get(tid, {})
|
|
raw = str(m.get('content', ''))
|
|
try:
|
|
rd = json.loads(raw)
|
|
snippet = str(rd.get('output') or rd.get('result') or rd.get('error') or raw)[:200]
|
|
except Exception:
|
|
snippet = raw[:200]
|
|
# Truncate args values for storage
|
|
args_snap = {}
|
|
if isinstance(args, dict):
|
|
for k, v in list(args.items())[:6]:
|
|
s2 = str(v)
|
|
args_snap[k] = s2[:120] + ('...' if len(s2) > 120 else '')
|
|
tool_calls.append({
|
|
'name': name, 'snippet': snippet, 'tid': tid,
|
|
'assistant_msg_idx': asst_idx, 'args': args_snap,
|
|
})
|
|
s.tool_calls = tool_calls
|
|
# Tag the matching user message with attachment filenames for display on reload
|
|
# Only tag a user message whose content relates to this turn's text
|
|
# (msg_text is the full message including the [Attached files: ...] suffix)
|
|
if attachments:
|
|
for m in reversed(s.messages):
|
|
if m.get('role') == 'user':
|
|
content = str(m.get('content', ''))
|
|
# Match if content is part of the sent message or vice-versa
|
|
base_text = msg_text.split('\n\n[Attached files:')[0].strip()
|
|
if base_text[:60] in content or content[:60] in msg_text:
|
|
m['attachments'] = attachments
|
|
break
|
|
s.save()
|
|
# Sync to state.db for /insights (opt-in setting)
|
|
try:
|
|
from api.config import load_settings as _load_settings
|
|
if _load_settings().get('sync_to_insights'):
|
|
from api.state_sync import sync_session_usage
|
|
sync_session_usage(
|
|
session_id=s.session_id,
|
|
input_tokens=s.input_tokens or 0,
|
|
output_tokens=s.output_tokens or 0,
|
|
estimated_cost=s.estimated_cost,
|
|
model=model,
|
|
title=s.title,
|
|
message_count=len(s.messages),
|
|
)
|
|
except Exception:
|
|
pass # never crash the stream for sync failures
|
|
usage = {'input_tokens': input_tokens, 'output_tokens': output_tokens, 'estimated_cost': estimated_cost}
|
|
# Include context window data from the agent's compressor for the UI indicator
|
|
_cc = getattr(agent, 'context_compressor', None)
|
|
if _cc:
|
|
usage['context_length'] = getattr(_cc, 'context_length', 0) or 0
|
|
usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0
|
|
usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0
|
|
raw_session = s.compact() | {'messages': s.messages, 'tool_calls': tool_calls}
|
|
put('done', {'session': redact_session_data(raw_session), 'usage': usage})
|
|
finally:
|
|
# Unregister the gateway approval callback and unblock any threads
|
|
# still waiting on approval (e.g. stream cancelled mid-approval).
|
|
if _approval_registered and _unreg_notify is not None:
|
|
try:
|
|
_unreg_notify(session_id)
|
|
except Exception:
|
|
pass
|
|
with _ENV_LOCK:
|
|
if old_cwd is None: os.environ.pop('TERMINAL_CWD', None)
|
|
else: os.environ['TERMINAL_CWD'] = old_cwd
|
|
if old_exec_ask is None: os.environ.pop('HERMES_EXEC_ASK', None)
|
|
else: os.environ['HERMES_EXEC_ASK'] = old_exec_ask
|
|
if old_session_key is None: os.environ.pop('HERMES_SESSION_KEY', None)
|
|
else: os.environ['HERMES_SESSION_KEY'] = old_session_key
|
|
if old_hermes_home is None: os.environ.pop('HERMES_HOME', None)
|
|
else: os.environ['HERMES_HOME'] = old_hermes_home
|
|
|
|
except Exception as e:
|
|
print('[webui] stream error:\n' + traceback.format_exc(), flush=True)
|
|
err_str = str(e)
|
|
# Detect rate limit errors specifically so the client can show a helpful card
|
|
# rather than the generic "Connection lost" message
|
|
is_rate_limit = 'rate limit' in err_str.lower() or '429' in err_str or 'RateLimitError' in type(e).__name__
|
|
is_auth_error = (
|
|
'401' in err_str
|
|
or 'AuthenticationError' in type(e).__name__
|
|
or 'authentication' in err_str.lower()
|
|
or 'unauthorized' in err_str.lower()
|
|
or 'invalid api key' in err_str.lower()
|
|
or 'no cookie auth credentials' in err_str.lower()
|
|
)
|
|
if is_rate_limit:
|
|
put('apperror', {
|
|
'message': err_str,
|
|
'type': 'rate_limit',
|
|
'hint': 'Rate limit reached. The fallback model (if configured) was also exhausted. Try again in a moment.',
|
|
})
|
|
elif is_auth_error:
|
|
put('apperror', {
|
|
'message': err_str,
|
|
'type': 'auth_mismatch',
|
|
'hint': (
|
|
'The selected model may not be supported by your configured provider. '
|
|
'Run `hermes model` in your terminal to switch providers, then restart the WebUI.'
|
|
),
|
|
})
|
|
else:
|
|
put('apperror', {'message': err_str, 'type': 'error'})
|
|
finally:
|
|
_clear_thread_env() # TD1: always clear thread-local context
|
|
with STREAMS_LOCK:
|
|
STREAMS.pop(stream_id, None)
|
|
CANCEL_FLAGS.pop(stream_id, None)
|
|
AGENT_INSTANCES.pop(stream_id, None) # Clean up agent instance reference
|
|
|
|
# ============================================================
|
|
# SECTION: HTTP Request Handler
|
|
# do_GET: read-only API endpoints + SSE stream + static HTML
|
|
# do_POST: mutating endpoints (session CRUD, chat, upload, approval)
|
|
# Routing is a flat if/elif chain. See ARCHITECTURE.md section 4.1.
|
|
# ============================================================
|
|
|
|
|
|
def cancel_stream(stream_id: str) -> bool:
|
|
"""Signal an in-flight stream to cancel. Returns True if the stream existed."""
|
|
with STREAMS_LOCK:
|
|
if stream_id not in STREAMS:
|
|
return False
|
|
|
|
# Set WebUI layer cancel flag
|
|
flag = CANCEL_FLAGS.get(stream_id)
|
|
if flag:
|
|
flag.set()
|
|
|
|
# Interrupt the AIAgent instance to stop tool execution
|
|
agent = AGENT_INSTANCES.get(stream_id)
|
|
if agent:
|
|
try:
|
|
agent.interrupt("Cancelled by user")
|
|
except Exception as e:
|
|
# Log but don't block the cancel flow
|
|
import logging
|
|
logging.getLogger(__name__).debug(
|
|
f"Failed to interrupt agent for stream {stream_id}: {e}"
|
|
)
|
|
else:
|
|
# Agent not yet stored - cancel_event flag will be checked by agent thread
|
|
import logging
|
|
logging.getLogger(__name__).debug(
|
|
f"Cancel requested for stream {stream_id} before agent ready - "
|
|
f"cancel_event flag set, will be checked on agent startup"
|
|
)
|
|
|
|
# Put a cancel sentinel into the queue so the SSE handler wakes up
|
|
q = STREAMS.get(stream_id)
|
|
if q:
|
|
try:
|
|
q.put_nowait(('cancel', {'message': 'Cancelled by user'}))
|
|
except Exception:
|
|
pass
|
|
return True
|