""" Hermes Web UI -- Route handlers for GET and POST endpoints. Extracted from server.py (Sprint 11) so server.py is a thin shell. """ import html as _html import json import logging import os import queue import sys import threading import time import uuid from pathlib import Path from urllib.parse import parse_qs logger = logging.getLogger(__name__) from api.config import ( STATE_DIR, SESSION_DIR, DEFAULT_WORKSPACE, DEFAULT_MODEL, SESSIONS, SESSIONS_MAX, LOCK, STREAMS, STREAMS_LOCK, CANCEL_FLAGS, SERVER_START_TIME, CLI_TOOLSETS, _INDEX_HTML_PATH, get_available_models, IMAGE_EXTS, MD_EXTS, MIME_MAP, MAX_FILE_BYTES, MAX_UPLOAD_BYTES, CHAT_LOCK, load_settings, save_settings, ) from api.helpers import ( require, bad, safe_resolve, j, t, read_body, _security_headers, _sanitize_error, redact_session_data, _redact_text, ) # ── CSRF: validate Origin/Referer on POST ──────────────────────────────────── import re as _re def _normalize_host_port(value: str) -> tuple[str, str | None]: """Split a host or host:port string into (hostname, port|None). Handles IPv6 bracket notation, e.g. [::1]:8080.""" value = value.strip().lower() if not value: return '', None if value.startswith('['): end = value.find(']') if end != -1: host = value[1:end] rest = value[end + 1 :] if rest.startswith(':') and rest[1:].isdigit(): return host, rest[1:] return host, None if value.count(':') == 1: host, port = value.rsplit(':', 1) if port.isdigit(): return host, port return value, None def _ports_match(origin_scheme: str, origin_port: str | None, allowed_port: str | None) -> bool: """Return True when two ports should be considered equivalent, scheme-aware. Treats an absent port as the scheme default: port 80 for http, port 443 for https. Port 80 is NOT treated as equivalent to 443 (different protocols = different origins). """ if origin_port == allowed_port: return True # Determine the default port for the origin's scheme default = '443' if origin_scheme == 'https' else '80' if not origin_port and allowed_port == default: return True if not allowed_port and origin_port == default: return True return False def _allowed_public_origins() -> set[str]: """Parse HERMES_WEBUI_ALLOWED_ORIGINS env var (comma-separated) into a set. Each entry must include the scheme, e.g. https://myapp.example.com:8000. Entries without a scheme are silently skipped and a warning is printed. """ raw = os.getenv('HERMES_WEBUI_ALLOWED_ORIGINS', '') result = set() for value in raw.split(','): value = value.strip().rstrip('/').lower() if not value: continue if not (value.startswith('http://') or value.startswith('https://')): import sys print( f"[webui] WARNING: HERMES_WEBUI_ALLOWED_ORIGINS entry {value!r} is missing " f"the scheme (expected https://hostname or http://hostname). Entry ignored.", flush=True, file=sys.stderr, ) continue result.add(value) return result def _check_csrf(handler) -> bool: """Reject cross-origin POST requests. Returns True if OK.""" origin = handler.headers.get("Origin", "") referer = handler.headers.get("Referer", "") host = handler.headers.get("Host", "") if not origin and not referer: return True # non-browser clients (curl, agent) have no Origin target = origin or referer # Extract host:port from origin/referer m = _re.match(r"^https?://([^/]+)", target) if not m: return False origin_host = m.group(1) origin_scheme = m.group(0).split('://')[0].lower() # 'http' or 'https' origin_name, origin_port = _normalize_host_port(origin_host) # Check against explicitly allowed public origins (env var) origin_value = m.group(0).rstrip('/').lower() if origin_value in _allowed_public_origins(): return True # Allow same-origin: check Host, X-Forwarded-Host (reverse proxy), and # X-Real-Host against the origin. Reverse proxies (Caddy, nginx) set # X-Forwarded-Host to the client's original Host header. allowed_hosts = [ h.strip() for h in [ host, handler.headers.get("X-Forwarded-Host", ""), handler.headers.get("X-Real-Host", ""), ] if h.strip() ] for allowed in allowed_hosts: allowed_name, allowed_port = _normalize_host_port(allowed) if origin_name == allowed_name and _ports_match(origin_scheme, origin_port, allowed_port): return True return False from api.models import ( Session, get_session, new_session, all_sessions, title_from, _write_session_index, SESSION_INDEX_FILE, load_projects, save_projects, import_cli_session, get_cli_sessions, get_cli_session_messages, ) from api.workspace import ( load_workspaces, save_workspaces, get_last_workspace, set_last_workspace, list_dir, read_file_content, safe_resolve_ws, ) from api.upload import handle_upload, handle_transcribe from api.streaming import _sse, _run_agent_streaming, cancel_stream from api.onboarding import ( apply_onboarding_setup, get_onboarding_status, complete_onboarding, ) # Approval system (optional -- graceful fallback if agent not available) try: from tools.approval import ( submit_pending, approve_session, approve_permanent, save_permanent_allowlist, is_approved, _pending, _lock, _permanent_approved, resolve_gateway_approval, ) except ImportError: submit_pending = lambda *a, **k: None approve_session = lambda *a, **k: None approve_permanent = lambda *a, **k: None save_permanent_allowlist = lambda *a, **k: None is_approved = lambda *a, **k: True resolve_gateway_approval = lambda *a, **k: 0 _pending = {} _lock = threading.Lock() _permanent_approved = set() # ── Login page locale strings ───────────────────────────────────────────────── # Add entries here to support more languages on the login page. # The key must match the 'language' setting value (from static/i18n.js LOCALES). _LOGIN_LOCALE = { "en": { "lang": "en", "title": "Sign in", "subtitle": "Enter your password to continue", "placeholder": "Password", "btn": "Sign in", "invalid_pw": "Invalid password", "conn_failed": "Connection failed", }, "zh": { "lang": "zh-CN", "title": "\u767b\u5f55", "subtitle": "\u8f93\u5165\u5bc6\u7801\u7ee7\u7eed\u4f7f\u7528", "placeholder": "\u5bc6\u7801", "btn": "\u767b\u5f55", "invalid_pw": "\u5bc6\u7801\u9519\u8bef", "conn_failed": "\u8fde\u63a5\u5931\u8d25", }, } # ── Login page (self-contained, no external deps) ──────────────────────────── _LOGIN_PAGE_HTML = """ {{BOT_NAME}} — {{LOGIN_TITLE}}

{{BOT_NAME}}

{{LOGIN_SUBTITLE}}

""" # ── GET routes ──────────────────────────────────────────────────────────────── def handle_get(handler, parsed) -> bool: """Handle all GET routes. Returns True if handled, False for 404.""" if parsed.path in ("/", "/index.html"): return t( handler, _INDEX_HTML_PATH.read_text(encoding="utf-8"), content_type="text/html; charset=utf-8", ) if parsed.path == "/login": _settings = load_settings() _bn = _html.escape(_settings.get("bot_name") or "Hermes") _lang = _settings.get("language", "en") _login_strings = _LOGIN_LOCALE.get(_lang, _LOGIN_LOCALE["en"]) _page = ( _LOGIN_PAGE_HTML.replace("{{BOT_NAME}}", _bn) .replace("{{BOT_NAME_INITIAL}}", _bn[0].upper()) .replace("{{LANG}}", _html.escape(_login_strings["lang"])) .replace("{{LOGIN_TITLE}}", _html.escape(_login_strings["title"])) .replace("{{LOGIN_SUBTITLE}}", _html.escape(_login_strings["subtitle"])) .replace( "{{LOGIN_PLACEHOLDER}}", _html.escape(_login_strings["placeholder"]) ) .replace("{{LOGIN_BTN}}", _html.escape(_login_strings["btn"])) .replace("{{LOGIN_INVALID_PW}}", _html.escape(_login_strings["invalid_pw"])) .replace( "{{LOGIN_CONN_FAILED}}", _html.escape(_login_strings["conn_failed"]) ) ) return t(handler, _page, content_type="text/html; charset=utf-8") if parsed.path == "/api/auth/status": from api.auth import is_auth_enabled, parse_cookie, verify_session logged_in = False if is_auth_enabled(): cv = parse_cookie(handler) logged_in = bool(cv and verify_session(cv)) return j(handler, {"auth_enabled": is_auth_enabled(), "logged_in": logged_in}) if parsed.path == "/favicon.ico": handler.send_response(204) handler.end_headers() return True if parsed.path == "/health": with STREAMS_LOCK: n_streams = len(STREAMS) return j( handler, { "status": "ok", "sessions": len(SESSIONS), "active_streams": n_streams, "uptime_seconds": round(time.time() - SERVER_START_TIME, 1), }, ) if parsed.path == "/api/models": return j(handler, get_available_models()) if parsed.path == "/api/models/live": return _handle_live_models(handler, parsed) if parsed.path == "/api/settings": settings = load_settings() # Never expose the stored password hash to clients settings.pop("password_hash", None) return j(handler, settings) if parsed.path == "/api/onboarding/status": return j(handler, get_onboarding_status()) if parsed.path.startswith("/static/"): return _serve_static(handler, parsed) if parsed.path == "/api/session": sid = parse_qs(parsed.query).get("session_id", [""])[0] if not sid: return j(handler, {"error": "session_id is required"}, status=400) try: s = get_session(sid) raw = s.compact() | { "messages": s.messages, "tool_calls": getattr(s, "tool_calls", []), "active_stream_id": getattr(s, "active_stream_id", None), "pending_user_message": getattr(s, "pending_user_message", None), "pending_attachments": getattr(s, "pending_attachments", []), "pending_started_at": getattr(s, "pending_started_at", None), } return j(handler, {"session": redact_session_data(raw)}) except KeyError: # Not a WebUI session -- try CLI store msgs = get_cli_session_messages(sid) if msgs: cli_meta = None for cs in get_cli_sessions(): if cs["session_id"] == sid: cli_meta = cs break sess = { "session_id": sid, "title": (cli_meta or {}).get("title", "CLI Session"), "workspace": (cli_meta or {}).get("workspace", ""), "model": (cli_meta or {}).get("model", "unknown"), "message_count": len(msgs), "created_at": (cli_meta or {}).get("created_at", 0), "updated_at": (cli_meta or {}).get("updated_at", 0), "pinned": False, "archived": False, "project_id": None, "profile": (cli_meta or {}).get("profile"), "is_cli_session": True, "messages": msgs, "tool_calls": [], } return j(handler, {"session": redact_session_data(sess)}) return bad(handler, "Session not found", 404) if parsed.path == "/api/sessions": webui_sessions = all_sessions() settings = load_settings() if settings.get("show_cli_sessions"): cli = get_cli_sessions() webui_ids = {s["session_id"] for s in webui_sessions} deduped_cli = [s for s in cli if s["session_id"] not in webui_ids] else: deduped_cli = [] merged = webui_sessions + deduped_cli merged.sort(key=lambda s: s.get("updated_at", 0) or 0, reverse=True) safe_merged = [] for s in merged: item = dict(s) if isinstance(item.get("title"), str): item["title"] = _redact_text(item["title"]) safe_merged.append(item) return j(handler, {"sessions": safe_merged, "cli_count": len(deduped_cli)}) if parsed.path == "/api/projects": return j(handler, {"projects": load_projects()}) if parsed.path == "/api/session/export": return _handle_session_export(handler, parsed) if parsed.path == "/api/workspaces": return j( handler, {"workspaces": load_workspaces(), "last": get_last_workspace()} ) if parsed.path == "/api/sessions/search": return _handle_sessions_search(handler, parsed) if parsed.path == "/api/list": return _handle_list_dir(handler, parsed) if parsed.path == "/api/personalities": # Read personalities from config.yaml agent.personalities section # (matches hermes-agent CLI behavior, not filesystem SOUL.md approach) from api.config import reload_config as _reload_cfg _reload_cfg() # pick up config.yaml changes without server restart from api.config import get_config as _get_cfg _cfg = _get_cfg() agent_cfg = _cfg.get("agent", {}) raw_personalities = agent_cfg.get("personalities", {}) personalities = [] if isinstance(raw_personalities, dict): for name, value in raw_personalities.items(): desc = "" if isinstance(value, dict): desc = value.get("description", "") elif isinstance(value, str): desc = value[:80] + ("..." if len(value) > 80 else "") personalities.append({"name": name, "description": desc}) return j(handler, {"personalities": personalities}) if parsed.path == "/api/git-info": qs = parse_qs(parsed.query) sid = qs.get("session_id", [""])[0] if not sid: return bad(handler, "session_id required") try: s = get_session(sid) except KeyError: return bad(handler, "Session not found", 404) from api.workspace import git_info_for_workspace info = git_info_for_workspace(Path(s.workspace)) return j(handler, {"git": info}) if parsed.path == "/api/updates/check": settings = load_settings() if not settings.get("check_for_updates", True): return j(handler, {"disabled": True}) qs = parse_qs(parsed.query) force = qs.get("force", ["0"])[0] == "1" # ?simulate=1 returns fake behind counts for UI testing (localhost only) if ( qs.get("simulate", ["0"])[0] == "1" and handler.client_address[0] == "127.0.0.1" ): return j( handler, { "webui": { "name": "webui", "behind": 3, "current_sha": "abc1234", "latest_sha": "def5678", "branch": "master", }, "agent": { "name": "agent", "behind": 1, "current_sha": "aaa0001", "latest_sha": "bbb0002", "branch": "master", }, "checked_at": 0, }, ) from api.updates import check_for_updates return j(handler, check_for_updates(force=force)) if parsed.path == "/api/chat/stream/status": stream_id = parse_qs(parsed.query).get("stream_id", [""])[0] return j(handler, {"active": stream_id in STREAMS, "stream_id": stream_id}) if parsed.path == "/api/chat/cancel": stream_id = parse_qs(parsed.query).get("stream_id", [""])[0] if not stream_id: return bad(handler, "stream_id required") cancelled = cancel_stream(stream_id) return j(handler, {"ok": True, "cancelled": cancelled, "stream_id": stream_id}) if parsed.path == "/api/chat/stream": return _handle_sse_stream(handler, parsed) if parsed.path == '/api/sessions/gateway/stream': return _handle_gateway_sse_stream(handler) if parsed.path == "/api/file/raw": return _handle_file_raw(handler, parsed) if parsed.path == "/api/file": return _handle_file_read(handler, parsed) if parsed.path == "/api/approval/pending": return _handle_approval_pending(handler, parsed) if parsed.path == "/api/approval/inject_test": # Loopback-only: used by automated tests; blocked from any remote client if handler.client_address[0] != "127.0.0.1": return j(handler, {"error": "not found"}, status=404) return _handle_approval_inject(handler, parsed) # ── Cron API (GET) ── if parsed.path == "/api/crons": from cron.jobs import list_jobs return j(handler, {"jobs": list_jobs(include_disabled=True)}) if parsed.path == "/api/crons/output": return _handle_cron_output(handler, parsed) if parsed.path == "/api/crons/recent": return _handle_cron_recent(handler, parsed) # ── Skills API (GET) ── if parsed.path == "/api/skills": from tools.skills_tool import skills_list as _skills_list raw = _skills_list() data = json.loads(raw) if isinstance(raw, str) else raw return j(handler, {"skills": data.get("skills", [])}) if parsed.path == "/api/skills/content": from tools.skills_tool import skill_view as _skill_view, SKILLS_DIR qs = parse_qs(parsed.query) name = qs.get("name", [""])[0] if not name: return j(handler, {"error": "name required"}, status=400) file_path = qs.get("file", [""])[0] if file_path: # Serve a linked file from the skill directory import re as _re if _re.search(r"[*?\[\]]", name): return bad(handler, "Invalid skill name", 400) skill_dir = None for p in SKILLS_DIR.rglob(name): if p.is_dir(): skill_dir = p break if not skill_dir: return bad(handler, "Skill not found", 404) target = (skill_dir / file_path).resolve() try: target.relative_to(skill_dir.resolve()) except ValueError: return bad(handler, "Invalid file path", 400) if not target.exists() or not target.is_file(): return bad(handler, "File not found", 404) return j( handler, {"content": target.read_text(encoding="utf-8"), "path": file_path}, ) raw = _skill_view(name) data = json.loads(raw) if isinstance(raw, str) else raw if "linked_files" not in data: data["linked_files"] = {} return j(handler, data) # ── Memory API (GET) ── if parsed.path == "/api/memory": return _handle_memory_read(handler) # ── Profile API (GET) ── if parsed.path == "/api/profiles": from api.profiles import list_profiles_api, get_active_profile_name return j( handler, {"profiles": list_profiles_api(), "active": get_active_profile_name()}, ) if parsed.path == "/api/profile/active": from api.profiles import get_active_profile_name, get_active_hermes_home return j( handler, {"name": get_active_profile_name(), "path": str(get_active_hermes_home())}, ) return False # 404 # ── POST routes ─────────────────────────────────────────────────────────────── def handle_post(handler, parsed) -> bool: """Handle all POST routes. Returns True if handled, False for 404.""" # CSRF: reject cross-origin browser requests if not _check_csrf(handler): return j(handler, {"error": "Cross-origin request rejected"}, status=403) if parsed.path == "/api/upload": return handle_upload(handler) if parsed.path == "/api/transcribe": return handle_transcribe(handler) body = read_body(handler) if parsed.path == "/api/session/new": s = new_session(workspace=body.get("workspace"), model=body.get("model")) return j(handler, {"session": s.compact() | {"messages": s.messages}}) if parsed.path == "/api/sessions/cleanup": return _handle_sessions_cleanup(handler, body, zero_only=False) if parsed.path == "/api/sessions/cleanup_zero_message": return _handle_sessions_cleanup(handler, body, zero_only=True) if parsed.path == "/api/session/rename": try: require(body, "session_id", "title") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) s.title = str(body["title"]).strip()[:80] or "Untitled" s.save() return j(handler, {"session": s.compact()}) if parsed.path == "/api/personality/set": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) if "name" not in body: return bad(handler, "Missing required field: name") sid = body["session_id"] name = body["name"].strip() try: s = get_session(sid) except KeyError: return bad(handler, "Session not found", 404) # Resolve personality from config.yaml agent.personalities section # (matches hermes-agent CLI behavior) prompt = "" if name: from api.config import reload_config as _reload_cfg2 _reload_cfg2() # pick up config changes without restart from api.config import get_config as _get_cfg2 _cfg2 = _get_cfg2() agent_cfg = _cfg2.get("agent", {}) raw_personalities = agent_cfg.get("personalities", {}) if not isinstance(raw_personalities, dict) or name not in raw_personalities: return bad( handler, f'Personality "{name}" not found in config.yaml', 404 ) value = raw_personalities[name] # Resolve prompt using the same logic as hermes-agent cli.py if isinstance(value, dict): parts = [value.get("system_prompt", "") or value.get("prompt", "")] if value.get("tone"): parts.append(f"Tone: {value['tone']}") if value.get("style"): parts.append(f"Style: {value['style']}") prompt = "\n".join(p for p in parts if p) else: prompt = str(value) s.personality = name if name else None s.save() return j(handler, {"ok": True, "personality": s.personality, "prompt": prompt}) if parsed.path == "/api/session/update": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) new_ws = str(Path(body.get("workspace", s.workspace)).expanduser().resolve()) s.workspace = new_ws s.model = body.get("model", s.model) s.save() set_last_workspace(new_ws) return j(handler, {"session": s.compact() | {"messages": s.messages}}) if parsed.path == "/api/session/delete": sid = body.get("session_id", "") if not sid: return bad(handler, "session_id is required") # Delete from WebUI session store with LOCK: SESSIONS.pop(sid, None) p = SESSION_DIR / f"{sid}.json" try: p.unlink(missing_ok=True) except Exception: logger.debug("Failed to unlink session file %s", p) try: SESSION_INDEX_FILE.unlink(missing_ok=True) except Exception: logger.debug("Failed to unlink session index") # Also delete from CLI state.db (for CLI sessions shown in sidebar) try: from api.models import delete_cli_session delete_cli_session(sid) except Exception: logger.debug("Failed to delete CLI session %s", sid) return j(handler, {"ok": True}) if parsed.path == "/api/session/clear": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) s.messages = [] s.tool_calls = [] s.title = "Untitled" s.save() return j(handler, {"ok": True, "session": s.compact()}) if parsed.path == "/api/session/truncate": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) if body.get("keep_count") is None: return bad(handler, "Missing required field(s): keep_count") try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) keep = int(body["keep_count"]) s.messages = s.messages[:keep] s.save() return j( handler, {"ok": True, "session": s.compact() | {"messages": s.messages}} ) if parsed.path == "/api/chat/start": return _handle_chat_start(handler, body) if parsed.path == "/api/chat": return _handle_chat_sync(handler, body) # ── Cron API (POST) ── if parsed.path == "/api/crons/create": return _handle_cron_create(handler, body) if parsed.path == "/api/crons/update": return _handle_cron_update(handler, body) if parsed.path == "/api/crons/delete": return _handle_cron_delete(handler, body) if parsed.path == "/api/crons/run": return _handle_cron_run(handler, body) if parsed.path == "/api/crons/pause": return _handle_cron_pause(handler, body) if parsed.path == "/api/crons/resume": return _handle_cron_resume(handler, body) # ── File ops (POST) ── if parsed.path == "/api/file/delete": return _handle_file_delete(handler, body) if parsed.path == "/api/file/save": return _handle_file_save(handler, body) if parsed.path == "/api/file/create": return _handle_file_create(handler, body) if parsed.path == "/api/file/rename": return _handle_file_rename(handler, body) if parsed.path == "/api/file/create-dir": return _handle_create_dir(handler, body) # ── Workspace management (POST) ── if parsed.path == "/api/workspaces/add": return _handle_workspace_add(handler, body) if parsed.path == "/api/workspaces/remove": return _handle_workspace_remove(handler, body) if parsed.path == "/api/workspaces/rename": return _handle_workspace_rename(handler, body) # ── Approval (POST) ── if parsed.path == "/api/approval/respond": return _handle_approval_respond(handler, body) # ── Skills (POST) ── if parsed.path == "/api/skills/save": return _handle_skill_save(handler, body) if parsed.path == "/api/skills/delete": return _handle_skill_delete(handler, body) # ── Memory (POST) ── if parsed.path == "/api/memory/write": return _handle_memory_write(handler, body) # ── Profile API (POST) ── if parsed.path == "/api/profile/switch": name = body.get("name", "").strip() if not name: return bad(handler, "name is required") try: from api.profiles import switch_profile, _validate_profile_name if name != 'default': _validate_profile_name(name) result = switch_profile(name) return j(handler, result) except (ValueError, FileNotFoundError) as e: return bad(handler, _sanitize_error(e), 404) except RuntimeError as e: return bad(handler, str(e), 409) if parsed.path == "/api/profile/create": name = body.get("name", "").strip() if not name: return bad(handler, "name is required") import re as _re if not _re.match(r"^[a-z0-9][a-z0-9_-]{0,63}$", name): return bad( handler, "Invalid profile name: lowercase letters, numbers, hyphens, underscores only", ) clone_from = body.get("clone_from") if clone_from is not None: clone_from = str(clone_from).strip() if not _re.match(r"^[a-z0-9][a-z0-9_-]{0,63}$", clone_from): return bad(handler, "Invalid clone_from name") base_url = body.get("base_url", "").strip() if body.get("base_url") else None api_key = body.get("api_key", "").strip() if body.get("api_key") else None if base_url and not base_url.startswith(("http://", "https://")): return bad(handler, "base_url must start with http:// or https://") try: from api.profiles import create_profile_api result = create_profile_api( name, clone_from=clone_from, clone_config=bool(body.get("clone_config", False)), base_url=base_url, api_key=api_key, ) return j(handler, {"ok": True, "profile": result}) except (ValueError, FileExistsError, RuntimeError) as e: return bad(handler, str(e)) if parsed.path == "/api/profile/delete": name = body.get("name", "").strip() if not name: return bad(handler, "name is required") try: from api.profiles import delete_profile_api, _validate_profile_name _validate_profile_name(name) result = delete_profile_api(name) return j(handler, result) except (ValueError, FileNotFoundError) as e: return bad(handler, _sanitize_error(e)) except RuntimeError as e: return bad(handler, str(e), 409) # ── Settings (POST) ── if parsed.path == "/api/settings": if "bot_name" in body: body["bot_name"] = (str(body["bot_name"]) or "").strip() or "Hermes" saved = save_settings(body) saved.pop("password_hash", None) # never expose hash to client return j(handler, saved) if parsed.path == "/api/onboarding/setup": # Writing API keys to disk - restrict to local/private networks unless auth is active. # In Docker, requests arrive from the bridge network (172.x.x.x), not 127.0.0.1, # even when the user accesses via localhost:8787 on the host. # Behind a reverse proxy (nginx/Caddy/Traefik) or SSH tunnel, X-Forwarded-For # carries the real origin IP — read it first before falling back to the raw socket addr. # HERMES_WEBUI_ONBOARDING_OPEN=1 lets operators on remote servers explicitly bypass # the check when they control network access themselves (e.g. firewall + VPN). from api.auth import is_auth_enabled import os as _os if not is_auth_enabled() and not _os.getenv("HERMES_WEBUI_ONBOARDING_OPEN"): import ipaddress try: # Prefer forwarded headers set by reverse proxies _xff = handler.headers.get("X-Forwarded-For", "").split(",")[0].strip() _xri = handler.headers.get("X-Real-IP", "").strip() _raw = handler.client_address[0] _ip_str = _xff or _xri or _raw addr = ipaddress.ip_address(_ip_str) is_local = addr.is_loopback or addr.is_private except ValueError: is_local = False if not is_local: return bad(handler, "Onboarding setup is only available from local networks when auth is not enabled. To bypass this on a remote server, set HERMES_WEBUI_ONBOARDING_OPEN=1.", 403) try: return j(handler, apply_onboarding_setup(body)) except ValueError as e: return bad(handler, str(e)) except RuntimeError as e: return bad(handler, str(e), 500) if parsed.path == "/api/onboarding/complete": return j(handler, complete_onboarding()) # ── Session pin (POST) ── if parsed.path == "/api/session/pin": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) s.pinned = bool(body.get("pinned", True)) s.save() return j(handler, {"ok": True, "session": s.compact()}) # ── Session archive (POST) ── if parsed.path == "/api/session/archive": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) s.archived = bool(body.get("archived", True)) s.save() return j(handler, {"ok": True, "session": s.compact()}) # ── Session move to project (POST) ── if parsed.path == "/api/session/move": try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) s.project_id = body.get("project_id") or None s.save() return j(handler, {"ok": True, "session": s.compact()}) # ── Project CRUD (POST) ── if parsed.path == "/api/projects/create": try: require(body, "name") except ValueError as e: return bad(handler, str(e)) import re as _re name = body["name"].strip()[:128] if not name: return bad(handler, "name required") color = body.get("color") if color and not _re.match(r"^#[0-9a-fA-F]{3,8}$", color): return bad(handler, "Invalid color format") projects = load_projects() proj = { "project_id": uuid.uuid4().hex[:12], "name": name, "color": color, "created_at": time.time(), } projects.append(proj) save_projects(projects) return j(handler, {"ok": True, "project": proj}) if parsed.path == "/api/projects/rename": try: require(body, "project_id", "name") except ValueError as e: return bad(handler, str(e)) import re as _re projects = load_projects() proj = next( (p for p in projects if p["project_id"] == body["project_id"]), None ) if not proj: return bad(handler, "Project not found", 404) proj["name"] = body["name"].strip()[:128] if "color" in body: color = body["color"] if color and not _re.match(r"^#[0-9a-fA-F]{3,8}$", color): return bad(handler, "Invalid color format") proj["color"] = color save_projects(projects) return j(handler, {"ok": True, "project": proj}) if parsed.path == "/api/projects/delete": try: require(body, "project_id") except ValueError as e: return bad(handler, str(e)) projects = load_projects() proj = next( (p for p in projects if p["project_id"] == body["project_id"]), None ) if not proj: return bad(handler, "Project not found", 404) projects = [p for p in projects if p["project_id"] != body["project_id"]] save_projects(projects) # Unassign all sessions that belonged to this project if SESSION_INDEX_FILE.exists(): try: index = json.loads(SESSION_INDEX_FILE.read_text(encoding="utf-8")) for entry in index: if entry.get("project_id") == body["project_id"]: try: s = get_session(entry["session_id"]) s.project_id = None s.save() except Exception: logger.debug("Failed to update session %s", entry.get("session_id")) except Exception: logger.debug("Failed to load session index for project unlink") return j(handler, {"ok": True}) # ── Session import from JSON (POST) ── if parsed.path == "/api/session/import": return _handle_session_import(handler, body) # ── Self-update (POST) ── if parsed.path == "/api/updates/apply": target = body.get("target", "") if target not in ("webui", "agent"): return bad(handler, 'target must be "webui" or "agent"') from api.updates import apply_update return j(handler, apply_update(target)) # ── CLI session import (POST) ── if parsed.path == "/api/session/import_cli": return _handle_session_import_cli(handler, body) # ── Auth endpoints (POST) ── if parsed.path == "/api/auth/login": from api.auth import ( verify_password, create_session, set_auth_cookie, is_auth_enabled, ) from api.auth import _check_login_rate, _record_login_attempt if not is_auth_enabled(): return j(handler, {"ok": True, "message": "Auth not enabled"}) client_ip = handler.client_address[0] if not _check_login_rate(client_ip): return j( handler, {"error": "Too many attempts. Try again in a minute."}, status=429, ) password = body.get("password", "") if not verify_password(password): _record_login_attempt(client_ip) return bad(handler, "Invalid password", 401) cookie_val = create_session() handler.send_response(200) handler.send_header("Content-Type", "application/json") handler.send_header("Cache-Control", "no-store") _security_headers(handler) set_auth_cookie(handler, cookie_val) handler.end_headers() handler.wfile.write(json.dumps({"ok": True}).encode()) return True if parsed.path == "/api/auth/logout": from api.auth import clear_auth_cookie, invalidate_session, parse_cookie cookie_val = parse_cookie(handler) if cookie_val: invalidate_session(cookie_val) handler.send_response(200) handler.send_header("Content-Type", "application/json") handler.send_header("Cache-Control", "no-store") _security_headers(handler) clear_auth_cookie(handler) handler.end_headers() handler.wfile.write(json.dumps({"ok": True}).encode()) return True return False # 404 # ── GET route helpers ───────────────────────────────────────────────────────── def _serve_static(handler, parsed): static_root = (Path(__file__).parent.parent / "static").resolve() # Strip the leading '/static/' prefix, then resolve and sandbox rel = parsed.path[len("/static/") :] static_file = (static_root / rel).resolve() try: static_file.relative_to(static_root) except ValueError: return j(handler, {"error": "not found"}, status=404) if not static_file.exists() or not static_file.is_file(): return j(handler, {"error": "not found"}, status=404) ext = static_file.suffix.lower() ct = {"css": "text/css", "js": "application/javascript", "html": "text/html"}.get( ext.lstrip("."), "text/plain" ) handler.send_response(200) handler.send_header("Content-Type", f"{ct}; charset=utf-8") handler.send_header("Cache-Control", "no-store") raw = static_file.read_bytes() handler.send_header("Content-Length", str(len(raw))) handler.end_headers() handler.wfile.write(raw) return True def _handle_session_export(handler, parsed): sid = parse_qs(parsed.query).get("session_id", [""])[0] if not sid: return bad(handler, "session_id is required") try: s = get_session(sid) except KeyError: return bad(handler, "Session not found", 404) safe = redact_session_data(s.__dict__) payload = json.dumps(safe, ensure_ascii=False, indent=2) handler.send_response(200) handler.send_header("Content-Type", "application/json; charset=utf-8") handler.send_header( "Content-Disposition", f'attachment; filename="hermes-{sid}.json"' ) handler.send_header("Content-Length", str(len(payload.encode("utf-8")))) handler.send_header("Cache-Control", "no-store") handler.end_headers() handler.wfile.write(payload.encode("utf-8")) return True def _handle_sessions_search(handler, parsed): qs = parse_qs(parsed.query) q = qs.get("q", [""])[0].lower().strip() content_search = qs.get("content", ["1"])[0] == "1" depth = int(qs.get("depth", ["5"])[0]) if not q: safe_sessions = [] for s in all_sessions(): item = dict(s) if isinstance(item.get("title"), str): item["title"] = _redact_text(item["title"]) safe_sessions.append(item) return j(handler, {"sessions": safe_sessions}) results = [] for s in all_sessions(): title_match = q in (s.get("title") or "").lower() if title_match: item = dict(s, match_type="title") if isinstance(item.get("title"), str): item["title"] = _redact_text(item["title"]) results.append(item) continue if content_search: try: sess = get_session(s["session_id"]) msgs = sess.messages[:depth] if depth else sess.messages for m in msgs: c = m.get("content") or "" if isinstance(c, list): c = " ".join( p.get("text", "") for p in c if isinstance(p, dict) and p.get("type") == "text" ) if q in str(c).lower(): item = dict(s, match_type="content") if isinstance(item.get("title"), str): item["title"] = _redact_text(item["title"]) results.append(item) break except (KeyError, Exception): pass return j(handler, {"sessions": results, "query": q, "count": len(results)}) def _handle_list_dir(handler, parsed): qs = parse_qs(parsed.query) sid = qs.get("session_id", [""])[0] if not sid: return bad(handler, "session_id is required") try: s = get_session(sid) workspace = s.workspace except KeyError: # Fallback for CLI sessions not loaded in WebUI memory try: cli_meta = None for cs in get_cli_sessions(): if cs["session_id"] == sid: cli_meta = cs break if not cli_meta: return bad(handler, "Session not found", 404) workspace = cli_meta.get("workspace", "") except Exception: return bad(handler, "Session not found", 404) try: return j( handler, { "entries": list_dir(Path(workspace), qs.get("path", ["."])[0]), "path": qs.get("path", ["."])[0], }, ) except (FileNotFoundError, ValueError) as e: return bad(handler, _sanitize_error(e), 404) def _handle_sse_stream(handler, parsed): stream_id = parse_qs(parsed.query).get("stream_id", [""])[0] q = STREAMS.get(stream_id) if q is None: return j(handler, {"error": "stream not found"}, status=404) handler.send_response(200) handler.send_header("Content-Type", "text/event-stream; charset=utf-8") handler.send_header("Cache-Control", "no-cache") handler.send_header("X-Accel-Buffering", "no") handler.send_header("Connection", "keep-alive") handler.end_headers() try: while True: try: event, data = q.get(timeout=30) except queue.Empty: handler.wfile.write(b": heartbeat\n\n") handler.wfile.flush() continue _sse(handler, event, data) if event in ("done", "error", "cancel"): break except (BrokenPipeError, ConnectionResetError): pass return True def _handle_gateway_sse_stream(handler): """SSE endpoint for real-time gateway session updates. Streams change events from the gateway watcher background thread. Only active when show_cli_sessions (show_agent_sessions) setting is enabled. """ # Check if the feature is enabled settings = load_settings() if not settings.get('show_cli_sessions'): return j(handler, {'error': 'agent sessions not enabled'}, status=404) from api.gateway_watcher import get_watcher watcher = get_watcher() if watcher is None: return j(handler, {'error': 'watcher not started'}, status=503) handler.send_response(200) handler.send_header('Content-Type', 'text/event-stream; charset=utf-8') handler.send_header('Cache-Control', 'no-cache') handler.send_header('X-Accel-Buffering', 'no') handler.send_header('Connection', 'keep-alive') handler.end_headers() q = watcher.subscribe() try: # Send initial snapshot immediately from api.models import get_cli_sessions initial = get_cli_sessions() _sse(handler, 'sessions_changed', {'sessions': initial}) while True: try: event_data = q.get(timeout=30) except queue.Empty: handler.wfile.write(b': keepalive\n\n') handler.wfile.flush() continue if event_data is None: break # watcher is stopping _sse(handler, event_data.get('type', 'sessions_changed'), event_data) except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError): pass finally: watcher.unsubscribe(q) return True def _content_disposition_value(disposition: str, filename: str) -> str: """Build a latin-1-safe Content-Disposition value with RFC 5987 filename*.""" import urllib.parse as _up safe_name = Path(filename).name.replace("\r", "").replace("\n", "") ascii_fallback = "".join( ch if 32 <= ord(ch) < 127 and ch not in {'"', '\\'} else "_" for ch in safe_name ).strip(" .") if not ascii_fallback: suffix = Path(safe_name).suffix ascii_suffix = "".join( ch if 32 <= ord(ch) < 127 and ch not in {'"', '\\'} else "_" for ch in suffix ) ascii_fallback = f"download{ascii_suffix}" if ascii_suffix else "download" quoted_name = _up.quote(safe_name, safe="") return ( f'{disposition}; filename="{ascii_fallback}"; ' f"filename*=UTF-8''{quoted_name}" ) def _handle_file_raw(handler, parsed): qs = parse_qs(parsed.query) sid = qs.get("session_id", [""])[0] if not sid: return bad(handler, "session_id is required") try: s = get_session(sid) except KeyError: return bad(handler, "Session not found", 404) rel = qs.get("path", [""])[0] force_download = qs.get("download", [""])[0] == "1" target = safe_resolve(Path(s.workspace), rel) if not target.exists() or not target.is_file(): return j(handler, {"error": "not found"}, status=404) ext = target.suffix.lower() mime = MIME_MAP.get(ext, "application/octet-stream") raw_bytes = target.read_bytes() handler.send_response(200) handler.send_header("Content-Type", mime) handler.send_header("Content-Length", str(len(raw_bytes))) handler.send_header("Cache-Control", "no-store") # Security: force download for dangerous MIME types to prevent XSS dangerous_types = {"text/html", "application/xhtml+xml", "image/svg+xml"} if force_download or mime in dangerous_types: handler.send_header( "Content-Disposition", _content_disposition_value("attachment", target.name), ) else: handler.send_header( "Content-Disposition", _content_disposition_value("inline", target.name), ) handler.end_headers() handler.wfile.write(raw_bytes) return True def _handle_file_read(handler, parsed): qs = parse_qs(parsed.query) sid = qs.get("session_id", [""])[0] if not sid: return bad(handler, "session_id is required") try: s = get_session(sid) except KeyError: return bad(handler, "Session not found", 404) rel = qs.get("path", [""])[0] if not rel: return bad(handler, "path is required") try: return j(handler, read_file_content(Path(s.workspace), rel)) except (FileNotFoundError, ValueError) as e: return bad(handler, _sanitize_error(e), 404) def _handle_approval_pending(handler, parsed): sid = parse_qs(parsed.query).get("session_id", [""])[0] with _lock: p = _pending.get(sid) if p: return j(handler, {"pending": dict(p)}) return j(handler, {"pending": None}) def _handle_approval_inject(handler, parsed): """Inject a fake pending approval -- loopback-only, used by automated tests.""" qs = parse_qs(parsed.query) sid = qs.get("session_id", [""])[0] key = qs.get("pattern_key", ["test_pattern"])[0] cmd = qs.get("command", ["rm -rf /tmp/test"])[0] if sid: submit_pending( sid, { "command": cmd, "pattern_key": key, "pattern_keys": [key], "description": "test pattern", }, ) return j(handler, {"ok": True, "session_id": sid}) return j(handler, {"error": "session_id required"}, status=400) def _handle_live_models(handler, parsed): """Fetch the live model list from a provider's /v1/models endpoint. Returns the provider's actual model catalog so the UI can show all available models, not just the hardcoded fallback list. Query params: provider (optional) — provider ID to fetch for; defaults to active base_url (optional) — override the base URL for the provider Providers that don't expose a /v1/models endpoint (Anthropic) are not supported here — the caller should fall back to the static list. Supported: openai, openrouter, custom (any OpenAI-compatible endpoint). """ import urllib.request as _ur import ipaddress as _ip import socket as _sock from urllib.parse import urlparse as _up qs = parse_qs(parsed.query) provider = (qs.get("provider", [""])[0] or "").lower().strip() base_url_override = (qs.get("base_url", [""])[0] or "").strip() try: from api.config import get_config as _gc, resolve_model_provider as _rmp cfg = _gc() active_provider = cfg.get("model", {}).get("provider") or "" if not provider: provider = active_provider # Resolve API key and base URL for this provider api_key = None base_url = base_url_override or "" try: from hermes_cli.runtime_provider import resolve_runtime_provider rt = resolve_runtime_provider(requested=provider) api_key = rt.get("api_key") if not base_url: base_url = rt.get("base_url") or "" except Exception: pass # Determine the /v1/models endpoint URL if not base_url: if provider in ("openai", "openai-codex", "copilot"): base_url = "https://api.openai.com/v1" elif provider == "openrouter": base_url = "https://openrouter.ai/api/v1" elif provider in ("anthropic",): # Anthropic doesn't support /v1/models in a standard way return j(handler, {"error": "not_supported", "models": []}) elif provider in ("google", "gemini"): return j(handler, {"error": "not_supported", "models": []}) else: # Generic OpenAI-compatible — try common paths base_url = "" if not base_url: return j(handler, {"error": "no_base_url", "models": []}) # Build URL safely base_url = base_url.rstrip("/") if base_url.endswith("/v1"): endpoint_url = base_url + "/models" elif "/v1" in base_url: endpoint_url = base_url.rstrip("/") + "/models" else: endpoint_url = base_url + "/v1/models" # Validate scheme (B310 guard) parsed_ep = _up(endpoint_url) if parsed_ep.scheme not in ("http", "https"): return j(handler, {"error": "invalid_scheme", "models": []}, status=400) # SSRF guard: block private IPs (allow known local provider hostnames). # Use exact hostname match — NOT substring — to prevent bypass via # hostnames like evil-ollama.attacker.com containing "ollama". _KNOWN_LOCAL_HOSTS = {"localhost", "127.0.0.1", "0.0.0.0", "::1"} if parsed_ep.hostname: hostname_lower = (parsed_ep.hostname or "").lower() try: for _, _, _, _, addr in _sock.getaddrinfo(parsed_ep.hostname, None): addr_obj = _ip.ip_address(addr[0]) if addr_obj.is_private or addr_obj.is_loopback: if hostname_lower not in _KNOWN_LOCAL_HOSTS: return j(handler, {"error": "ssrf_blocked", "models": []}, status=400) except _sock.gaierror: pass # Fetch models req = _ur.Request(endpoint_url, method="GET") req.add_header("User-Agent", "HermesWebUI/1.0") if api_key: req.add_header("Authorization", f"Bearer {api_key}") with _ur.urlopen(req, timeout=8) as resp: # nosec B310 raw = resp.read().decode("utf-8") import json as _json data = _json.loads(raw) raw_models = data.get("data") or data.get("models") or [] # Normalise to {id, label} list; filter to text-generation models models = [] seen = set() for m in raw_models: if not isinstance(m, dict): continue mid = m.get("id") or m.get("name") or "" if not mid or mid in seen: continue # Skip embedding/image/audio models for direct providers obj_type = (m.get("object") or "").lower() if obj_type and obj_type not in ("model",): continue # Heuristic: skip obvious non-chat models if any(skip in mid.lower() for skip in ("embed", "tts", "whisper", "dall-e", "davinci-edit", "babbage", "ada", "curie")): continue seen.add(mid) label = m.get("name") or m.get("display_name") or mid # For OpenAI, the id IS the label — clean it up if label == mid: label = mid.replace("-", " ").replace(".", ".").title() # Restore original casing for well-known names for known in ("GPT", "o1", "o3", "o4", "gpt"): label = label.replace(known.title(), known) models.append({"id": mid, "label": label}) # Sort: newest (higher version numbers) first via lexicographic sort on reversed id models.sort(key=lambda m: m["id"], reverse=True) return j(handler, {"provider": provider, "models": models, "count": len(models)}) except Exception as _e: logger.debug("Failed to fetch live models for %s: %s", provider, _e) return j(handler, {"error": str(_e), "models": []}) def _handle_cron_output(handler, parsed): from cron.jobs import OUTPUT_DIR as CRON_OUT qs = parse_qs(parsed.query) job_id = qs.get("job_id", [""])[0] limit = int(qs.get("limit", ["5"])[0]) if not job_id: return j(handler, {"error": "job_id required"}, status=400) out_dir = CRON_OUT / job_id outputs = [] if out_dir.exists(): files = sorted(out_dir.glob("*.md"), reverse=True)[:limit] for f in files: try: txt = f.read_text(encoding="utf-8", errors="replace") outputs.append({"filename": f.name, "content": txt[:8000]}) except Exception: logger.debug("Failed to read cron output file %s", f) return j(handler, {"job_id": job_id, "outputs": outputs}) def _handle_cron_recent(handler, parsed): """Return cron jobs that have completed since a given timestamp.""" import datetime qs = parse_qs(parsed.query) since = float(qs.get("since", ["0"])[0]) try: from cron.jobs import list_jobs jobs = list_jobs(include_disabled=True) completions = [] for job in jobs: last_run = job.get("last_run_at") if not last_run: continue if isinstance(last_run, str): try: ts = datetime.datetime.fromisoformat( last_run.replace("Z", "+00:00") ).timestamp() except (ValueError, TypeError): continue else: ts = float(last_run) if ts > since: completions.append( { "job_id": job.get("id", ""), "name": job.get("name", "Unknown"), "status": job.get("last_status", "unknown"), "completed_at": ts, } ) return j(handler, {"completions": completions, "since": since}) except ImportError: return j(handler, {"completions": [], "since": since}) def _handle_memory_read(handler): try: from api.profiles import get_active_hermes_home mem_dir = get_active_hermes_home() / "memories" except ImportError: mem_dir = Path.home() / ".hermes" / "memories" mem_file = mem_dir / "MEMORY.md" user_file = mem_dir / "USER.md" memory = ( mem_file.read_text(encoding="utf-8", errors="replace") if mem_file.exists() else "" ) user = ( user_file.read_text(encoding="utf-8", errors="replace") if user_file.exists() else "" ) return j( handler, { "memory": _redact_text(memory), "user": _redact_text(user), "memory_path": str(mem_file), "user_path": str(user_file), "memory_mtime": mem_file.stat().st_mtime if mem_file.exists() else None, "user_mtime": user_file.stat().st_mtime if user_file.exists() else None, }, ) # ── POST route helpers ──────────────────────────────────────────────────────── def _handle_sessions_cleanup(handler, body, zero_only=False): cleaned = 0 for p in SESSION_DIR.glob("*.json"): if p.name.startswith("_"): continue try: s = Session.load(p.stem) if zero_only: should_delete = s and len(s.messages) == 0 else: should_delete = s and s.title == "Untitled" and len(s.messages) == 0 if should_delete: with LOCK: SESSIONS.pop(p.stem, None) p.unlink(missing_ok=True) cleaned += 1 except Exception: logger.debug("Failed to clean up session file %s", p) if SESSION_INDEX_FILE.exists(): SESSION_INDEX_FILE.unlink(missing_ok=True) return j(handler, {"ok": True, "cleaned": cleaned}) def _handle_chat_start(handler, body): try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) msg = str(body.get("message", "")).strip() if not msg: return bad(handler, "message is required") attachments = [str(a) for a in (body.get("attachments") or [])][:20] workspace = str(Path(body.get("workspace") or s.workspace).expanduser().resolve()) model = body.get("model") or s.model stream_id = uuid.uuid4().hex s.workspace = workspace s.model = model s.active_stream_id = stream_id s.pending_user_message = msg s.pending_attachments = attachments s.pending_started_at = time.time() s.save() set_last_workspace(workspace) q = queue.Queue() with STREAMS_LOCK: STREAMS[stream_id] = q thr = threading.Thread( target=_run_agent_streaming, args=(s.session_id, msg, model, workspace, stream_id, attachments), daemon=True, ) thr.start() return j(handler, {"stream_id": stream_id, "session_id": s.session_id}) def _handle_chat_sync(handler, body): """Fallback synchronous chat endpoint (POST /api/chat). Not used by frontend.""" from api.config import _get_session_agent_lock s = get_session(body["session_id"]) msg = str(body.get("message", "")).strip() if not msg: return j(handler, {"error": "empty message"}, status=400) workspace = Path(body.get("workspace") or s.workspace).expanduser().resolve() s.workspace = str(workspace) s.model = body.get("model") or s.model from api.streaming import _ENV_LOCK with _ENV_LOCK: old_cwd = os.environ.get("TERMINAL_CWD") os.environ["TERMINAL_CWD"] = str(workspace) old_exec_ask = os.environ.get("HERMES_EXEC_ASK") old_session_key = os.environ.get("HERMES_SESSION_KEY") os.environ["HERMES_EXEC_ASK"] = "1" os.environ["HERMES_SESSION_KEY"] = s.session_id try: from run_agent import AIAgent with CHAT_LOCK: from api.config import resolve_model_provider _model, _provider, _base_url = resolve_model_provider(s.model) # Resolve API key via Hermes runtime provider (matches gateway behaviour) _api_key = None try: from hermes_cli.runtime_provider import resolve_runtime_provider _rt = resolve_runtime_provider(requested=_provider) _api_key = _rt.get("api_key") # Also use runtime provider/base_url if the webui config didn't resolve them if not _provider: _provider = _rt.get("provider") if not _base_url: _base_url = _rt.get("base_url") except Exception as _e: print( f"[webui] WARNING: resolve_runtime_provider failed: {_e}", flush=True, ) agent = AIAgent( model=_model, provider=_provider, base_url=_base_url, api_key=_api_key, platform="cli", quiet_mode=True, enabled_toolsets=CLI_TOOLSETS, session_id=s.session_id, ) workspace_ctx = f"[Workspace: {s.workspace}]\n" workspace_system_msg = ( f"Active workspace at session start: {s.workspace}\n" "Every user message is prefixed with [Workspace: /absolute/path] indicating the " "workspace the user has selected in the web UI at the time they sent that message. " "This tag is the single authoritative source of the active workspace and updates " "with every message. It overrides any prior workspace mentioned in this system " "prompt, memory, or conversation history. Always use the value from the most recent " "[Workspace: ...] tag as your default working directory for ALL file operations: " "write_file, read_file, search_files, terminal workdir, and patch. " "Never fall back to a hardcoded path when this tag is present." ) from api.streaming import _sanitize_messages_for_api result = agent.run_conversation( user_message=workspace_ctx + msg, system_message=workspace_system_msg, conversation_history=_sanitize_messages_for_api(s.messages), task_id=s.session_id, persist_user_message=msg, ) finally: with _ENV_LOCK: if old_cwd is None: os.environ.pop("TERMINAL_CWD", None) else: os.environ["TERMINAL_CWD"] = old_cwd if old_exec_ask is None: os.environ.pop("HERMES_EXEC_ASK", None) else: os.environ["HERMES_EXEC_ASK"] = old_exec_ask if old_session_key is None: os.environ.pop("HERMES_SESSION_KEY", None) else: os.environ["HERMES_SESSION_KEY"] = old_session_key s.messages = result.get("messages") or s.messages # Only auto-generate title when still default; preserves user renames if s.title == "Untitled": s.title = title_from(s.messages, s.title) s.save() # Sync to state.db for /insights (opt-in setting) try: if load_settings().get("sync_to_insights"): from api.state_sync import sync_session_usage sync_session_usage( session_id=s.session_id, input_tokens=s.input_tokens or 0, output_tokens=s.output_tokens or 0, estimated_cost=s.estimated_cost, model=s.model, title=s.title, message_count=len(s.messages), ) except Exception: logger.debug("Failed to update session cost tracking") return j( handler, { "answer": result.get("final_response") or "", "status": "done" if result.get("completed", True) else "partial", "session": s.compact() | {"messages": s.messages}, "result": {k: v for k, v in result.items() if k != "messages"}, }, ) def _handle_cron_create(handler, body): try: require(body, "prompt", "schedule") except ValueError as e: return bad(handler, str(e)) try: from cron.jobs import create_job job = create_job( prompt=body["prompt"], schedule=body["schedule"], name=body.get("name") or None, deliver=body.get("deliver") or "local", skills=body.get("skills") or [], model=body.get("model") or None, ) return j(handler, {"ok": True, "job": job}) except Exception as e: return j(handler, {"error": str(e)}, status=400) def _handle_cron_update(handler, body): try: require(body, "job_id") except ValueError as e: return bad(handler, str(e)) from cron.jobs import update_job updates = {k: v for k, v in body.items() if k != "job_id" and v is not None} job = update_job(body["job_id"], updates) if not job: return bad(handler, "Job not found", 404) return j(handler, {"ok": True, "job": job}) def _handle_cron_delete(handler, body): try: require(body, "job_id") except ValueError as e: return bad(handler, str(e)) from cron.jobs import remove_job ok = remove_job(body["job_id"]) if not ok: return bad(handler, "Job not found", 404) return j(handler, {"ok": True, "job_id": body["job_id"]}) def _handle_cron_run(handler, body): job_id = body.get("job_id", "") if not job_id: return bad(handler, "job_id required") from cron.jobs import get_job from cron.scheduler import run_job job = get_job(job_id) if not job: return bad(handler, "Job not found", 404) threading.Thread(target=run_job, args=(job,), daemon=True).start() return j(handler, {"ok": True, "job_id": job_id, "status": "triggered"}) def _handle_cron_pause(handler, body): job_id = body.get("job_id", "") if not job_id: return bad(handler, "job_id required") from cron.jobs import pause_job result = pause_job(job_id, reason=body.get("reason")) if result: return j(handler, {"ok": True, "job": result}) return bad(handler, "Job not found", 404) def _handle_cron_resume(handler, body): job_id = body.get("job_id", "") if not job_id: return bad(handler, "job_id required") from cron.jobs import resume_job result = resume_job(job_id) if result: return j(handler, {"ok": True, "job": result}) return bad(handler, "Job not found", 404) def _handle_file_delete(handler, body): try: require(body, "session_id", "path") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) try: target = safe_resolve(Path(s.workspace), body["path"]) if not target.exists(): return bad(handler, "File not found", 404) if target.is_dir(): return bad(handler, "Cannot delete directories via this endpoint") target.unlink() return j(handler, {"ok": True, "path": body["path"]}) except (ValueError, PermissionError) as e: return bad(handler, _sanitize_error(e)) def _handle_file_save(handler, body): try: require(body, "session_id", "path") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) try: target = safe_resolve(Path(s.workspace), body["path"]) if not target.exists(): return bad(handler, "File not found", 404) if target.is_dir(): return bad(handler, "Cannot save: path is a directory") target.write_text(body.get("content", ""), encoding="utf-8") return j( handler, {"ok": True, "path": body["path"], "size": target.stat().st_size} ) except (ValueError, PermissionError) as e: return bad(handler, _sanitize_error(e)) def _handle_file_create(handler, body): try: require(body, "session_id", "path") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) try: target = safe_resolve(Path(s.workspace), body["path"]) if target.exists(): return bad(handler, "File already exists") target.parent.mkdir(parents=True, exist_ok=True) target.write_text(body.get("content", ""), encoding="utf-8") return j( handler, {"ok": True, "path": str(target.relative_to(Path(s.workspace)))} ) except (ValueError, PermissionError) as e: return bad(handler, _sanitize_error(e)) def _handle_file_rename(handler, body): try: require(body, "session_id", "path", "new_name") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) try: source = safe_resolve(Path(s.workspace), body["path"]) if not source.exists(): return bad(handler, "File not found", 404) new_name = body["new_name"].strip() if not new_name or "/" in new_name or ".." in new_name: return bad(handler, "Invalid file name") dest = source.parent / new_name if dest.exists(): return bad(handler, f'A file named "{new_name}" already exists') source.rename(dest) new_rel = str(dest.relative_to(Path(s.workspace))) return j(handler, {"ok": True, "old_path": body["path"], "new_path": new_rel}) except (ValueError, PermissionError, OSError) as e: return bad(handler, _sanitize_error(e)) def _handle_create_dir(handler, body): try: require(body, "session_id", "path") except ValueError as e: return bad(handler, str(e)) try: s = get_session(body["session_id"]) except KeyError: return bad(handler, "Session not found", 404) try: target = safe_resolve(Path(s.workspace), body["path"]) if target.exists(): return bad(handler, "Path already exists") target.mkdir(parents=True) return j( handler, {"ok": True, "path": str(target.relative_to(Path(s.workspace)))} ) except (ValueError, PermissionError, OSError) as e: return bad(handler, _sanitize_error(e)) def _handle_workspace_add(handler, body): path_str = body.get("path", "").strip() name = body.get("name", "").strip() if not path_str: return bad(handler, "path is required") p = Path(path_str).expanduser().resolve() if not p.exists(): return bad(handler, f"Path does not exist: {p}") if not p.is_dir(): return bad(handler, f"Path is not a directory: {p}") wss = load_workspaces() if any(w["path"] == str(p) for w in wss): return bad(handler, "Workspace already in list") wss.append({"path": str(p), "name": name or p.name}) save_workspaces(wss) return j(handler, {"ok": True, "workspaces": wss}) def _handle_workspace_remove(handler, body): path_str = body.get("path", "").strip() if not path_str: return bad(handler, "path is required") wss = load_workspaces() wss = [w for w in wss if w["path"] != path_str] save_workspaces(wss) return j(handler, {"ok": True, "workspaces": wss}) def _handle_workspace_rename(handler, body): path_str = body.get("path", "").strip() name = body.get("name", "").strip() if not path_str or not name: return bad(handler, "path and name are required") wss = load_workspaces() for w in wss: if w["path"] == path_str: w["name"] = name break else: return bad(handler, "Workspace not found", 404) save_workspaces(wss) return j(handler, {"ok": True, "workspaces": wss}) def _handle_approval_respond(handler, body): sid = body.get("session_id", "") if not sid: return bad(handler, "session_id is required") choice = body.get("choice", "deny") if choice not in ("once", "session", "always", "deny"): return bad(handler, f"Invalid choice: {choice}") # Pop the legacy polling-mode pending entry (no-op when gateway path is active). with _lock: pending = _pending.pop(sid, None) if pending: keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")] if choice in ("once", "session"): for k in keys: approve_session(sid, k) elif choice == "always": for k in keys: approve_session(sid, k) approve_permanent(k) save_permanent_allowlist(_permanent_approved) # Unblock the agent thread waiting in the gateway approval queue. # This is the primary signal when streaming is active — the agent # thread is parked in entry.event.wait() and needs to be woken up. resolve_gateway_approval(sid, choice, resolve_all=False) return j(handler, {"ok": True, "choice": choice}) def _handle_skill_save(handler, body): try: require(body, "name", "content") except ValueError as e: return bad(handler, str(e)) skill_name = body["name"].strip().lower().replace(" ", "-") if not skill_name or "/" in skill_name or ".." in skill_name: return bad(handler, "Invalid skill name") category = body.get("category", "").strip() if category and ("/" in category or ".." in category): return bad(handler, "Invalid category") from tools.skills_tool import SKILLS_DIR if category: skill_dir = SKILLS_DIR / category / skill_name else: skill_dir = SKILLS_DIR / skill_name # Validate resolved path stays within SKILLS_DIR try: skill_dir.resolve().relative_to(SKILLS_DIR.resolve()) except ValueError: return bad(handler, "Invalid skill path") skill_dir.mkdir(parents=True, exist_ok=True) skill_file = skill_dir / "SKILL.md" skill_file.write_text(body["content"], encoding="utf-8") return j(handler, {"ok": True, "name": skill_name, "path": str(skill_file)}) def _handle_skill_delete(handler, body): try: require(body, "name") except ValueError as e: return bad(handler, str(e)) from tools.skills_tool import SKILLS_DIR import shutil matches = list(SKILLS_DIR.rglob(f"{body['name']}/SKILL.md")) if not matches: return bad(handler, "Skill not found", 404) skill_dir = matches[0].parent shutil.rmtree(str(skill_dir)) return j(handler, {"ok": True, "name": body["name"]}) def _handle_memory_write(handler, body): try: require(body, "section", "content") except ValueError as e: return bad(handler, str(e)) try: from api.profiles import get_active_hermes_home mem_dir = get_active_hermes_home() / "memories" except ImportError: mem_dir = Path.home() / ".hermes" / "memories" mem_dir.mkdir(parents=True, exist_ok=True) section = body["section"] if section == "memory": target = mem_dir / "MEMORY.md" elif section == "user": target = mem_dir / "USER.md" else: return bad(handler, 'section must be "memory" or "user"') target.write_text(body["content"], encoding="utf-8") return j(handler, {"ok": True, "section": section, "path": str(target)}) def _handle_session_import_cli(handler, body): """Import a single CLI session into the WebUI store.""" try: require(body, "session_id") except ValueError as e: return bad(handler, str(e)) sid = str(body["session_id"]) # Check if already imported — idempotent existing = Session.load(sid) if existing: return j( handler, { "session": existing.compact() | { "messages": existing.messages, "is_cli_session": True, }, "imported": False, }, ) # Fetch messages from CLI store msgs = get_cli_session_messages(sid) if not msgs: return bad(handler, "Session not found in CLI store", 404) # Derive title from first user message title = title_from(msgs, "CLI Session") model = "unknown" # Get profile, model, and timestamps from CLI session metadata profile = None created_at = None updated_at = None for cs in get_cli_sessions(): if cs["session_id"] == sid: profile = cs.get("profile") model = cs.get("model", "unknown") created_at = cs.get("created_at") updated_at = cs.get("updated_at") break s = import_cli_session( sid, title, msgs, model, profile=profile, created_at=created_at, updated_at=updated_at, ) s.is_cli_session = True s._cli_origin = sid s.save(touch_updated_at=False) return j( handler, { "session": s.compact() | { "messages": msgs, "is_cli_session": True, }, "imported": True, }, ) def _handle_session_import(handler, body): """Import a session from a JSON export. Creates a new session with a new ID.""" if not body or not isinstance(body, dict): return bad(handler, "Request body must be a JSON object") messages = body.get("messages") if not isinstance(messages, list): return bad(handler, 'JSON must contain a "messages" array') title = body.get("title", "Imported session") workspace = body.get("workspace", str(DEFAULT_WORKSPACE)) model = body.get("model", DEFAULT_MODEL) s = Session( title=title, workspace=workspace, model=model, messages=messages, tool_calls=body.get("tool_calls", []), ) s.pinned = body.get("pinned", False) with LOCK: SESSIONS[s.session_id] = s SESSIONS.move_to_end(s.session_id) while len(SESSIONS) > SESSIONS_MAX: SESSIONS.popitem(last=False) s.save() return j(handler, {"ok": True, "session": s.compact() | {"messages": s.messages}})