feat: harden clarify dialog flow and refresh recovery

This commit is contained in:
Frank Song
2026-04-15 13:10:50 +08:00
parent 45d3dc0f68
commit ccba2f5c01
11 changed files with 1066 additions and 9 deletions

128
api/clarify.py Normal file
View File

@@ -0,0 +1,128 @@
"""Clarify prompt state for the WebUI.
This mirrors the approval flow structure, but the response is a free-form
clarification string instead of an approval decision.
"""
from __future__ import annotations
import threading
from typing import Optional
_lock = threading.Lock()
_pending: dict[str, dict] = {}
_gateway_queues: dict[str, list] = {}
_gateway_notify_cbs: dict[str, object] = {}
class _ClarifyEntry:
"""One pending clarify request inside a session."""
__slots__ = ("event", "data", "result")
def __init__(self, data: dict):
self.event = threading.Event()
self.data = data
self.result: Optional[str] = None
def register_gateway_notify(session_key: str, cb) -> None:
"""Register a per-session callback for sending clarify requests to the UI."""
with _lock:
_gateway_notify_cbs[session_key] = cb
def _clear_queue_locked(session_key: str) -> list[_ClarifyEntry]:
entries = _gateway_queues.pop(session_key, [])
_pending.pop(session_key, None)
return entries
def unregister_gateway_notify(session_key: str) -> None:
"""Unregister the per-session callback and unblock any waiting clarify prompt."""
with _lock:
_gateway_notify_cbs.pop(session_key, None)
entries = _clear_queue_locked(session_key)
for entry in entries:
entry.event.set()
def clear_pending(session_key: str) -> int:
"""Clear any pending clarify prompts for the session without removing the callback."""
with _lock:
entries = _clear_queue_locked(session_key)
for entry in entries:
entry.event.set()
return len(entries)
def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
"""Queue a pending clarify request and notify the UI callback if registered."""
with _lock:
queue = _gateway_queues.setdefault(session_key, [])
# De-duplicate while unresolved: if the most recent pending clarify is
# semantically identical, reuse it instead of stacking duplicates.
if queue:
last = queue[-1]
if (
str(last.data.get("question", "")) == str(data.get("question", ""))
and list(last.data.get("choices_offered") or [])
== list(data.get("choices_offered") or [])
):
entry = last
cb = _gateway_notify_cbs.get(session_key)
# Keep _pending aligned to the oldest unresolved entry.
_pending[session_key] = queue[0].data
if cb:
try:
cb(dict(entry.data))
except Exception:
pass
return entry
entry = _ClarifyEntry(data)
queue.append(entry)
_pending[session_key] = queue[0].data
cb = _gateway_notify_cbs.get(session_key)
if cb:
try:
cb(data)
except Exception:
pass
return entry
def get_pending(session_key: str) -> dict | None:
"""Return the oldest pending clarify request for this session, if any."""
with _lock:
queue = _gateway_queues.get(session_key) or []
if queue:
return dict(queue[0].data)
pending = _pending.get(session_key)
return dict(pending) if pending else None
def has_pending(session_key: str) -> bool:
with _lock:
return bool(_gateway_queues.get(session_key))
def resolve_clarify(session_key: str, response: str, resolve_all: bool = False) -> int:
"""Resolve the oldest pending clarify request for a session."""
with _lock:
queue = _gateway_queues.get(session_key)
if not queue:
_pending.pop(session_key, None)
return 0
entries = list(queue) if resolve_all else [queue.pop(0)]
if queue:
_pending[session_key] = queue[0].data
else:
_clear_queue_locked(session_key)
count = 0
for entry in entries:
entry.result = response
entry.event.set()
count += 1
return count

View File

@@ -214,6 +214,18 @@ except ImportError:
_lock = threading.Lock()
_permanent_approved = set()
# Clarify prompts (optional -- graceful fallback if agent not available)
try:
from api.clarify import (
submit_pending as submit_clarify_pending,
get_pending as get_clarify_pending,
resolve_clarify,
)
except ImportError:
submit_clarify_pending = lambda *a, **k: None
get_clarify_pending = lambda *a, **k: None
resolve_clarify = lambda *a, **k: 0
# ── Login page locale strings ─────────────────────────────────────────────────
# Add entries here to support more languages on the login page.
@@ -603,6 +615,15 @@ def handle_get(handler, parsed) -> bool:
return j(handler, {"error": "not found"}, status=404)
return _handle_approval_inject(handler, parsed)
if parsed.path == "/api/clarify/pending":
return _handle_clarify_pending(handler, parsed)
if parsed.path == "/api/clarify/inject_test":
# Loopback-only: used by automated tests; blocked from any remote client
if handler.client_address[0] != "127.0.0.1":
return j(handler, {"error": "not found"}, status=404)
return _handle_clarify_inject(handler, parsed)
# ── Cron API (GET) ──
if parsed.path == "/api/crons":
from cron.jobs import list_jobs
@@ -911,6 +932,10 @@ def handle_post(handler, parsed) -> bool:
if parsed.path == "/api/approval/respond":
return _handle_approval_respond(handler, body)
# ── Clarify (POST) ──
if parsed.path == "/api/clarify/respond":
return _handle_clarify_respond(handler, body)
# ── Skills (POST) ──
if parsed.path == "/api/skills/save":
return _handle_skill_save(handler, body)
@@ -1672,6 +1697,34 @@ def _handle_approval_inject(handler, parsed):
return j(handler, {"error": "session_id required"}, status=400)
def _handle_clarify_pending(handler, parsed):
sid = parse_qs(parsed.query).get("session_id", [""])[0]
pending = get_clarify_pending(sid)
if pending:
return j(handler, {"pending": pending})
return j(handler, {"pending": None})
def _handle_clarify_inject(handler, parsed):
"""Inject a fake pending clarify prompt -- loopback-only, used by automated tests."""
qs = parse_qs(parsed.query)
sid = qs.get("session_id", [""])[0]
question = qs.get("question", ["Which option?"])[0]
choices = qs.get("choices", [])
if sid:
submit_clarify_pending(
sid,
{
"question": question,
"choices_offered": choices,
"session_id": sid,
"kind": "clarify",
},
)
return j(handler, {"ok": True, "session_id": sid})
return j(handler, {"error": "session_id required"}, status=400)
def _handle_live_models(handler, parsed):
"""Return the live model list for a provider.
@@ -1892,6 +1945,24 @@ def _handle_chat_start(handler, body):
except ValueError as e:
return bad(handler, str(e))
model = body.get("model") or s.model
# Prevent duplicate runs in the same session while a stream is still active.
# This commonly happens after page refresh/reconnect races and can produce
# duplicated clarify cards for what appears to be a single user request.
current_stream_id = getattr(s, "active_stream_id", None)
if current_stream_id:
with STREAMS_LOCK:
current_active = current_stream_id in STREAMS
if current_active:
return j(
handler,
{
"error": "session already has an active stream",
"active_stream_id": current_stream_id,
},
status=409,
)
# Stale stream id from a previous run; clear and continue.
s.active_stream_id = None
stream_id = uuid.uuid4().hex
s.workspace = workspace
s.model = model
@@ -2303,6 +2374,22 @@ def _handle_approval_respond(handler, body):
return j(handler, {"ok": True, "choice": choice})
def _handle_clarify_respond(handler, body):
sid = body.get("session_id", "")
if not sid:
return bad(handler, "session_id is required")
response = body.get("response")
if response is None:
response = body.get("answer")
if response is None:
response = body.get("choice")
response = str(response or "").strip()
if not response:
return bad(handler, "response is required")
resolve_clarify(sid, response, resolve_all=False)
return j(handler, {"ok": True, "response": response})
def _handle_skill_save(handler, body):
try:
require(body, "name", "content")

View File

@@ -88,6 +88,16 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
if q is None:
return
# ── MCP Server Discovery (lazy import, idempotent) ──
# discover_mcp_tools() is called here (rather than at server startup) so that
# the hermes-agent package is fully initialized before we try to connect.
# It is safe to call multiple times — already-connected servers are skipped.
try:
from tools.mcp_tool import discover_mcp_tools
discover_mcp_tools()
except Exception:
pass # MCP not available or not configured — non-fatal
# Sprint 10: create a cancel event for this stream
cancel_event = threading.Event()
with STREAMS_LOCK:
@@ -162,6 +172,65 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
except ImportError:
logger.debug("Approval module not available, falling back to polling")
_clarify_registered = False
_unreg_clarify_notify = None
try:
from api.clarify import (
register_gateway_notify as _reg_clarify_notify,
unregister_gateway_notify as _unreg_clarify_notify,
)
def _clarify_notify_cb(clarify_data):
put('clarify', clarify_data)
_reg_clarify_notify(session_id, _clarify_notify_cb)
_clarify_registered = True
except ImportError:
logger.debug("Clarify module not available, falling back to polling")
def _clarify_callback_impl(question, choices, sid, cancel_evt, put_event):
"""Bridge Hermes clarify prompts to the WebUI."""
timeout = 120
choices_list = [str(choice) for choice in (choices or [])]
data = {
'question': str(question or ''),
'choices_offered': choices_list,
'session_id': sid,
'kind': 'clarify',
'requested_at': time.time(),
}
try:
from api.clarify import submit_pending as _submit_clarify_pending, clear_pending as _clear_clarify_pending
except ImportError:
return (
"The user did not provide a response within the time limit. "
"Use your best judgement to make the choice and proceed."
)
entry = _submit_clarify_pending(sid, data)
deadline = time.monotonic() + timeout
while True:
if cancel_evt.is_set():
_clear_clarify_pending(sid)
return (
"The user did not provide a response within the time limit. "
"Use your best judgement to make the choice and proceed."
)
remaining = deadline - time.monotonic()
if remaining <= 0:
_clear_clarify_pending(sid)
return (
"The user did not provide a response within the time limit. "
"Use your best judgement to make the choice and proceed."
)
if entry.event.wait(timeout=min(1.0, remaining)):
response = str(entry.result or "").strip()
return (
response
or "The user did not provide a response within the time limit. "
"Use your best judgement to make the choice and proceed."
)
try:
_token_sent = False # tracks whether any streamed tokens were sent
_reasoning_text = '' # accumulates reasoning/thinking trace for persistence
@@ -304,6 +373,11 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
stream_delta_callback=on_token,
reasoning_callback=on_reasoning,
tool_progress_callback=on_tool,
clarify_callback=(
lambda question, choices: _clarify_callback_impl(
question, choices, session_id, cancel_event, put
)
),
)
# Store agent instance for cancel/interrupt propagation
@@ -565,6 +639,11 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
_unreg_notify(session_id)
except Exception:
logger.debug("Failed to unregister approval callback")
if _clarify_registered and _unreg_clarify_notify is not None:
try:
_unreg_clarify_notify(session_id)
except Exception:
logger.debug("Failed to unregister clarify callback")
with _ENV_LOCK:
if old_cwd is None: os.environ.pop('TERMINAL_CWD', None)
else: os.environ['TERMINAL_CWD'] = old_cwd
@@ -660,6 +739,15 @@ def cancel_stream(stream_id: str) -> bool:
f"cancel_event flag set, will be checked on agent startup"
)
# Clear any pending clarify prompt so the blocked tool call can unwind.
try:
from api.clarify import clear_pending as _clear_clarify_pending
if agent and getattr(agent, "session_id", None):
_clear_clarify_pending(agent.session_id)
except Exception:
logger.debug("Failed to clear clarify prompt during cancel")
# Put a cancel sentinel into the queue so the SSE handler wakes up
q = STREAMS.get(stream_id)
if q: