Files
webui/api/streaming.py
Nathan Esquenazi 4422a87de9 fix: resolve _ENV_LOCK deadlock that blocks chat after first message
The v0.39.0 security sprint introduced _ENV_LOCK to protect env var
mutations in the streaming path. The implementation held the lock for
the entire agent run (potentially minutes), then tried to re-acquire
it in the finally block — a guaranteed deadlock on any non-reentrant
threading.Lock().

Result: first message completes (done event fires before finally hits),
but the lock is never released. Every subsequent chat/start POST blocks
forever waiting for that lock.

Fix: narrow the lock scope to just the env mutation. Set the vars inside
the with block, then let the lock release before the agent starts. The
finally block re-acquires cleanly since it no longer re-enters an
already-held lock.

No logic change — only the critical section boundary moves.
2026-04-08 14:22:39 +00:00

439 lines
21 KiB
Python

"""
Hermes Web UI -- SSE streaming engine and agent thread runner.
Includes Sprint 10 cancel support via CANCEL_FLAGS.
"""
import json
import os
import queue
import threading
import time
import traceback
from pathlib import Path
from api.config import (
STREAMS, STREAMS_LOCK, CANCEL_FLAGS, CLI_TOOLSETS,
LOCK, SESSIONS, SESSION_DIR,
_get_session_agent_lock, _set_thread_env, _clear_thread_env,
resolve_model_provider,
)
# Global lock for os.environ writes. Per-session locks (_agent_lock) prevent
# concurrent runs of the SAME session, but two DIFFERENT sessions can still
# interleave their os.environ writes. This global lock serializes the env
# save/restore around the entire agent run.
_ENV_LOCK = threading.Lock()
# Lazy import to avoid circular deps -- hermes-agent is on sys.path via api/config.py
try:
from run_agent import AIAgent
except ImportError:
AIAgent = None
from api.models import get_session, title_from
from api.workspace import set_last_workspace
# Fields that are safe to send to LLM provider APIs.
# Everything else (attachments, timestamp, _ts, etc.) is display-only
# metadata added by the webui and must be stripped before the API call.
_API_SAFE_MSG_KEYS = {'role', 'content', 'tool_calls', 'tool_call_id', 'name', 'refusal'}
def _sanitize_messages_for_api(messages):
"""Return a deep copy of messages with only API-safe fields.
The webui stores extra metadata on messages (attachments, timestamp, _ts)
for display purposes. Some providers (e.g. Z.AI/GLM) reject unknown fields
instead of ignoring them, causing HTTP 400 errors on subsequent messages.
"""
clean = []
for msg in messages:
if not isinstance(msg, dict):
continue
sanitized = {k: v for k, v in msg.items() if k in _API_SAFE_MSG_KEYS}
if sanitized.get('role'):
clean.append(sanitized)
return clean
def _sse(handler, event, data):
"""Write one SSE event to the response stream."""
payload = f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
handler.wfile.write(payload.encode('utf-8'))
handler.wfile.flush()
def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, attachments=None):
"""Run agent in background thread, writing SSE events to STREAMS[stream_id]."""
q = STREAMS.get(stream_id)
if q is None:
return
# Sprint 10: create a cancel event for this stream
cancel_event = threading.Event()
with STREAMS_LOCK:
CANCEL_FLAGS[stream_id] = cancel_event
def put(event, data):
# If cancelled, drop all further events except the cancel event itself
if cancel_event.is_set() and event not in ('cancel', 'error'):
return
try:
q.put_nowait((event, data))
except Exception:
pass
try:
s = get_session(session_id)
s.workspace = str(Path(workspace).expanduser().resolve())
s.model = model
_agent_lock = _get_session_agent_lock(session_id)
# TD1: set thread-local env context so concurrent sessions don't clobber globals
# Check for pre-flight cancel (user cancelled before agent even started)
if cancel_event.is_set():
put('cancel', {'message': 'Cancelled before start'})
return
# Resolve profile home for this agent run (snapshot at start)
try:
from api.profiles import get_active_hermes_home
_profile_home = str(get_active_hermes_home())
except ImportError:
_profile_home = os.environ.get('HERMES_HOME', '')
_set_thread_env(
TERMINAL_CWD=str(s.workspace),
HERMES_EXEC_ASK='1',
HERMES_SESSION_KEY=session_id,
HERMES_HOME=_profile_home,
)
# Still set process-level env as fallback for tools that bypass thread-local
# Acquire lock only for the env mutation, then release before the agent runs.
# The finally block re-acquires to restore — keeping critical sections short
# and preventing a deadlock where the restore would re-enter the same lock.
with _ENV_LOCK:
old_cwd = os.environ.get('TERMINAL_CWD')
old_exec_ask = os.environ.get('HERMES_EXEC_ASK')
old_session_key = os.environ.get('HERMES_SESSION_KEY')
old_hermes_home = os.environ.get('HERMES_HOME')
os.environ['TERMINAL_CWD'] = str(s.workspace)
os.environ['HERMES_EXEC_ASK'] = '1'
os.environ['HERMES_SESSION_KEY'] = session_id
if _profile_home:
os.environ['HERMES_HOME'] = _profile_home
# Lock released — agent runs without holding it
try:
def on_token(text):
if text is None:
return # end-of-stream sentinel
put('token', {'text': text})
def on_tool(name, preview, args):
args_snap = {}
if isinstance(args, dict):
for k, v in list(args.items())[:4]:
s2 = str(v); args_snap[k] = s2[:120]+('...' if len(s2)>120 else '')
put('tool', {'name': name, 'preview': preview, 'args': args_snap})
# also check for pending approval and surface it immediately
from tools.approval import has_pending as _has_pending, _pending, _lock
if _has_pending(session_id):
with _lock:
p = dict(_pending.get(session_id, {}))
if p:
put('approval', p)
if AIAgent is None:
raise ImportError("AIAgent not available -- check that hermes-agent is on sys.path")
resolved_model, resolved_provider, resolved_base_url = resolve_model_provider(model)
# Resolve API key via Hermes runtime provider (matches gateway behaviour).
# Pass the resolved provider so non-default providers get their own credentials.
resolved_api_key = None
try:
from hermes_cli.runtime_provider import resolve_runtime_provider
_rt = resolve_runtime_provider(requested=resolved_provider)
resolved_api_key = _rt.get("api_key")
if not resolved_provider:
resolved_provider = _rt.get("provider")
if not resolved_base_url:
resolved_base_url = _rt.get("base_url")
except Exception as _e:
print(f"[webui] WARNING: resolve_runtime_provider failed: {_e}", flush=True)
# Read per-profile config at call time (not module-level snapshot)
from api.config import get_config as _get_config
_cfg = _get_config()
# Per-profile toolsets (fall back to module-level CLI_TOOLSETS)
_pt = _cfg.get('platform_toolsets', {})
_toolsets = _pt.get('cli', CLI_TOOLSETS) if isinstance(_pt, dict) else CLI_TOOLSETS
# Fallback model from profile config (e.g. for rate-limit recovery)
_fallback = _cfg.get('fallback_model') or None
if _fallback:
# Resolve the fallback through our provider logic too
fb_model = _fallback.get('model', '')
fb_provider = _fallback.get('provider', '')
fb_base_url = _fallback.get('base_url')
_fallback_resolved = {
'model': fb_model,
'provider': fb_provider,
'base_url': fb_base_url,
}
else:
_fallback_resolved = None
agent = AIAgent(
model=resolved_model,
provider=resolved_provider,
base_url=resolved_base_url,
api_key=resolved_api_key,
platform='cli',
quiet_mode=True,
enabled_toolsets=_toolsets,
fallback_model=_fallback_resolved,
session_id=session_id,
stream_delta_callback=on_token,
tool_progress_callback=on_tool,
)
# Prepend workspace context so the agent always knows which directory
# to use for file operations, regardless of session age or AGENTS.md defaults.
workspace_ctx = f"[Workspace: {s.workspace}]\n"
workspace_system_msg = (
f"Active workspace at session start: {s.workspace}\n"
"Every user message is prefixed with [Workspace: /absolute/path] indicating the "
"workspace the user has selected in the web UI at the time they sent that message. "
"This tag is the single authoritative source of the active workspace and updates "
"with every message. It overrides any prior workspace mentioned in this system "
"prompt, memory, or conversation history. Always use the value from the most recent "
"[Workspace: ...] tag as your default working directory for ALL file operations: "
"write_file, read_file, search_files, terminal workdir, and patch. "
"Never fall back to a hardcoded path when this tag is present."
)
# Resolve personality prompt from config.yaml agent.personalities
# (matches hermes-agent CLI behavior — passes via ephemeral_system_prompt)
_personality_prompt = None
_pname = getattr(s, 'personality', None)
if _pname:
_agent_cfg = _cfg.get('agent', {})
_personalities = _agent_cfg.get('personalities', {})
if isinstance(_personalities, dict) and _pname in _personalities:
_pval = _personalities[_pname]
if isinstance(_pval, dict):
_parts = [_pval.get('system_prompt', '') or _pval.get('prompt', '')]
if _pval.get('tone'):
_parts.append(f'Tone: {_pval["tone"]}')
if _pval.get('style'):
_parts.append(f'Style: {_pval["style"]}')
_personality_prompt = '\n'.join(p for p in _parts if p)
else:
_personality_prompt = str(_pval)
# Pass personality via ephemeral_system_prompt (agent's own mechanism)
if _personality_prompt:
agent.ephemeral_system_prompt = _personality_prompt
result = agent.run_conversation(
user_message=workspace_ctx + msg_text,
system_message=workspace_system_msg,
conversation_history=_sanitize_messages_for_api(s.messages),
task_id=session_id,
persist_user_message=msg_text,
)
s.messages = result.get('messages') or s.messages
# ── Handle context compression side effects ──
# If compression fired inside run_conversation, the agent may have
# rotated its session_id. Detect and fix the mismatch so the WebUI
# continues writing to the correct session file.
_agent_sid = getattr(agent, 'session_id', None)
_compressed = False
if _agent_sid and _agent_sid != session_id:
old_sid = session_id
new_sid = _agent_sid
# Rename the session file
old_path = SESSION_DIR / f'{old_sid}.json'
new_path = SESSION_DIR / f'{new_sid}.json'
s.session_id = new_sid
with LOCK:
if old_sid in SESSIONS:
SESSIONS[new_sid] = SESSIONS.pop(old_sid)
if old_path.exists() and not new_path.exists():
try:
old_path.rename(new_path)
except OSError:
pass
_compressed = True
# Also detect compression via the result dict or compressor state
if not _compressed:
_compressor = getattr(agent, 'context_compressor', None)
if _compressor and getattr(_compressor, 'compression_count', 0) > 0:
_compressed = True
# Notify the frontend that compression happened
if _compressed:
put('compressed', {
'message': 'Context auto-compressed to continue the conversation',
})
# Stamp 'timestamp' on any messages that don't have one yet
_now = time.time()
for _m in s.messages:
if isinstance(_m, dict) and not _m.get('timestamp') and not _m.get('_ts'):
_m['timestamp'] = int(_now)
s.title = title_from(s.messages, s.title)
# Read token/cost usage from the agent object (if available)
input_tokens = getattr(agent, 'session_prompt_tokens', 0) or 0
output_tokens = getattr(agent, 'session_completion_tokens', 0) or 0
estimated_cost = getattr(agent, 'session_estimated_cost_usd', None)
s.input_tokens = (s.input_tokens or 0) + input_tokens
s.output_tokens = (s.output_tokens or 0) + output_tokens
if estimated_cost:
s.estimated_cost = (s.estimated_cost or 0) + estimated_cost
# Extract tool call metadata grouped by assistant message index
# Each tool call gets assistant_msg_idx so the client can render
# cards inline with the assistant bubble that triggered them.
tool_calls = []
pending_names = {} # tool_call_id -> name
pending_args = {} # tool_call_id -> args dict
pending_asst_idx = {} # tool_call_id -> index in s.messages
for msg_idx, m in enumerate(s.messages):
if m.get('role') == 'assistant':
c = m.get('content', '')
# Anthropic format: content is a list with type=tool_use blocks
if isinstance(c, list):
for p in c:
if isinstance(p, dict) and p.get('type') == 'tool_use':
tid = p.get('id', '')
pending_names[tid] = p.get('name', '')
pending_args[tid] = p.get('input', {})
pending_asst_idx[tid] = msg_idx
# OpenAI format: tool_calls as top-level field on the message
for tc in m.get('tool_calls', []):
if not isinstance(tc, dict):
continue
tid = tc.get('id', '') or tc.get('call_id', '')
fn = tc.get('function', {})
name = fn.get('name', '')
try:
import json as _j
args = _j.loads(fn.get('arguments', '{}') or '{}')
except Exception:
args = {}
if tid and name:
pending_names[tid] = name
pending_args[tid] = args
pending_asst_idx[tid] = msg_idx
elif m.get('role') == 'tool':
tid = m.get('tool_call_id') or m.get('tool_use_id', '')
name = pending_names.get(tid, '')
if not name or name == 'tool':
continue # skip unresolvable tool entries
asst_idx = pending_asst_idx.get(tid, -1)
args = pending_args.get(tid, {})
raw = str(m.get('content', ''))
try:
rd = json.loads(raw)
snippet = str(rd.get('output') or rd.get('result') or rd.get('error') or raw)[:200]
except Exception:
snippet = raw[:200]
# Truncate args values for storage
args_snap = {}
if isinstance(args, dict):
for k, v in list(args.items())[:6]:
s2 = str(v)
args_snap[k] = s2[:120] + ('...' if len(s2) > 120 else '')
tool_calls.append({
'name': name, 'snippet': snippet, 'tid': tid,
'assistant_msg_idx': asst_idx, 'args': args_snap,
})
s.tool_calls = tool_calls
# Tag the matching user message with attachment filenames for display on reload
# Only tag a user message whose content relates to this turn's text
# (msg_text is the full message including the [Attached files: ...] suffix)
if attachments:
for m in reversed(s.messages):
if m.get('role') == 'user':
content = str(m.get('content', ''))
# Match if content is part of the sent message or vice-versa
base_text = msg_text.split('\n\n[Attached files:')[0].strip()
if base_text[:60] in content or content[:60] in msg_text:
m['attachments'] = attachments
break
s.save()
# Sync to state.db for /insights (opt-in setting)
try:
from api.config import load_settings as _load_settings
if _load_settings().get('sync_to_insights'):
from api.state_sync import sync_session_usage
sync_session_usage(
session_id=s.session_id,
input_tokens=s.input_tokens or 0,
output_tokens=s.output_tokens or 0,
estimated_cost=s.estimated_cost,
model=model,
title=s.title,
message_count=len(s.messages),
)
except Exception:
pass # never crash the stream for sync failures
usage = {'input_tokens': input_tokens, 'output_tokens': output_tokens, 'estimated_cost': estimated_cost}
# Include context window data from the agent's compressor for the UI indicator
_cc = getattr(agent, 'context_compressor', None)
if _cc:
usage['context_length'] = getattr(_cc, 'context_length', 0) or 0
usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0
usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0
put('done', {'session': s.compact() | {'messages': s.messages, 'tool_calls': tool_calls}, 'usage': usage})
finally:
with _ENV_LOCK:
if old_cwd is None: os.environ.pop('TERMINAL_CWD', None)
else: os.environ['TERMINAL_CWD'] = old_cwd
if old_exec_ask is None: os.environ.pop('HERMES_EXEC_ASK', None)
else: os.environ['HERMES_EXEC_ASK'] = old_exec_ask
if old_session_key is None: os.environ.pop('HERMES_SESSION_KEY', None)
else: os.environ['HERMES_SESSION_KEY'] = old_session_key
if old_hermes_home is None: os.environ.pop('HERMES_HOME', None)
else: os.environ['HERMES_HOME'] = old_hermes_home
except Exception as e:
print('[webui] stream error:\n' + traceback.format_exc(), flush=True)
err_str = str(e)
# Detect rate limit errors specifically so the client can show a helpful card
# rather than the generic "Connection lost" message
is_rate_limit = 'rate limit' in err_str.lower() or '429' in err_str or 'RateLimitError' in type(e).__name__
if is_rate_limit:
put('apperror', {
'message': err_str,
'type': 'rate_limit',
'hint': 'Rate limit reached. The fallback model (if configured) was also exhausted. Try again in a moment.',
})
else:
put('apperror', {'message': err_str, 'type': 'error'})
finally:
_clear_thread_env() # TD1: always clear thread-local context
with STREAMS_LOCK:
STREAMS.pop(stream_id, None)
CANCEL_FLAGS.pop(stream_id, None)
# ============================================================
# SECTION: HTTP Request Handler
# do_GET: read-only API endpoints + SSE stream + static HTML
# do_POST: mutating endpoints (session CRUD, chat, upload, approval)
# Routing is a flat if/elif chain. See ARCHITECTURE.md section 4.1.
# ============================================================
def cancel_stream(stream_id: str) -> bool:
"""Signal an in-flight stream to cancel. Returns True if the stream existed."""
with STREAMS_LOCK:
if stream_id not in STREAMS:
return False
flag = CANCEL_FLAGS.get(stream_id)
if flag:
flag.set()
# Put a cancel sentinel into the queue so the SSE handler wakes up
q = STREAMS.get(stream_id)
if q:
try:
q.put_nowait(('cancel', {'message': 'Cancelled by user'}))
except Exception:
pass
return True