Hermes Web UI — Sprints 11-14: multi-provider models, settings, session QoL, alerts, polish

Sprint 11 (v0.13): multi-provider model support, streaming smoothness
- Dynamic model dropdown populated from configured API keys (OpenAI, Anthropic,
  Google, DeepSeek, GLM, Kimi, MiniMax, OpenRouter, Nous Portal)
- Scroll pinning during streaming (no forced scroll when user has scrolled up)
- All route handlers extracted to api/routes.py (server.py now ~76 lines)

Sprint 12 (v0.14): settings panel, SSE reconnect, session QoL
- Settings panel (gear icon) -- persist default model and workspace server-side
- SSE auto-reconnect on network blips
- Pin/star sessions to top of sidebar
- Import session from JSON export

Sprint 13 (v0.15): cron alerts, background errors, session duplicate, tab title
- Cron completion alerts: toast per completion + unread badge on Tasks tab
- Background agent error banner when a non-active session errors mid-stream
- Session duplicate button
- Browser tab title reflects active session name

Sprint 14 (v0.16): Mermaid diagrams, file ops, session archive/tags, timestamps
- Mermaid diagram rendering inline (dark theme, lazy CDN load)
- File rename (double-click in file tree) and create folder
- Session archive (hide without deleting, toggle to show)
- Session tags -- #hashtag in title becomes colored chip + click-to-filter
- Message timestamps (HH:MM on hover, full date as tooltip)

Test suite: 224 tests across 14 sprint files + regression gate, 0 failures.
This commit is contained in:
Hermes
2026-03-31 07:02:47 +00:00
parent 732d227b97
commit 7019c25021
29 changed files with 2871 additions and 1122 deletions

View File

@@ -156,14 +156,17 @@ def test_cancel_nonexistent_stream_returns_not_cancelled(cleanup_test_sessions):
def test_server_py_sse_loop_breaks_on_cancel(cleanup_test_sessions):
"""R5b: server.py SSE loop must include 'cancel' in the break condition.
"""R5b: SSE loop must include 'cancel' in the break condition.
When missing, the connection hung after the cancel event was processed.
Sprint 11: logic moved from server.py to api/routes.py -- check both.
"""
src = (REPO_ROOT / "server.py").read_text()
# Find the SSE break condition
import re
m = re.search(r"if event in \([^)]+\):\s*break", src)
assert m, "SSE break condition not found in server.py"
# Check server.py first, then api/routes.py (Sprint 11 extracted routes)
src = (REPO_ROOT / "server.py").read_text()
routes_src = (REPO_ROOT / "api" / "routes.py").read_text() if (REPO_ROOT / "api" / "routes.py").exists() else ""
combined = src + routes_src
m = re.search(r"if event in \([^)]+\):\s*break", combined)
assert m, "SSE break condition not found in server.py or api/routes.py"
assert "cancel" in m.group(), \
f"'cancel' missing from SSE break condition: {m.group()}"
@@ -275,16 +278,21 @@ def test_deleted_session_does_not_appear_in_list(cleanup_test_sessions):
def test_server_delete_invalidates_index(cleanup_test_sessions):
"""R8b: server.py session/delete handler must unlink _index.json.
"""R8b: session/delete handler must unlink _index.json.
Static check that the fix is in place.
Sprint 11: handler moved from server.py to api/routes.py -- check both.
"""
src = (REPO_ROOT / "server.py").read_text()
# Find the delete handler and verify it unlinks the index
delete_idx = src.find("if parsed.path == '/api/session/delete':")
assert delete_idx >= 0, "session/delete handler not found"
delete_block = src[delete_idx:delete_idx+600]
assert "SESSION_INDEX_FILE" in delete_block, "server.py session/delete must invalidate SESSION_INDEX_FILE"
routes_src = (REPO_ROOT / "api" / "routes.py").read_text() if (REPO_ROOT / "api" / "routes.py").exists() else ""
# Find the delete handler in either file
for label, text in [("server.py", src), ("api/routes.py", routes_src)]:
delete_idx = text.find("if parsed.path == '/api/session/delete':")
if delete_idx >= 0:
delete_block = text[delete_idx:delete_idx+600]
assert "SESSION_INDEX_FILE" in delete_block, \
f"{label} session/delete must invalidate SESSION_INDEX_FILE"
return
assert False, "session/delete handler not found in server.py or api/routes.py"
# ── R9: Token/tool SSE events write to wrong session after switch ─────────────
@@ -292,25 +300,36 @@ def test_token_handler_guards_session_id(cleanup_test_sessions):
"""R9a: The SSE token event handler must check activeSid before writing to DOM.
When missing, tokens from session A would render into session B's message area
if the user switched sessions mid-stream.
Sprint 12: handler moved into _wireSSE(source), so search source.addEventListener.
"""
src = (REPO_ROOT / "static/messages.js").read_text()
# Find the token event handler
token_idx = src.find("es.addEventListener('token'")
# Sprint 12 refactored es.addEventListener -> source.addEventListener inside _wireSSE()
token_idx = src.find("source.addEventListener('token'")
if token_idx < 0:
token_idx = src.find("es.addEventListener('token'")
assert token_idx >= 0, "token event handler not found"
token_block = src[token_idx:token_idx+300]
assert "activeSid" in token_block, "token handler must check activeSid before writing to DOM"
assert "S.session.session_id!==activeSid" in token_block or "S.session.session_id===activeSid" in token_block, "token handler must compare current session to activeSid"
assert "activeSid" in token_block, \
"token handler must check activeSid before writing to DOM"
assert "S.session.session_id!==activeSid" in token_block or \
"S.session.session_id===activeSid" in token_block, \
"token handler must compare current session to activeSid"
def test_tool_handler_guards_session_id(cleanup_test_sessions):
"""R9b: The SSE tool event handler must check activeSid before writing to DOM.
When missing, tool cards from session A would render into session B's message area.
Sprint 12: handler moved into _wireSSE(source), so search source.addEventListener.
"""
src = (REPO_ROOT / "static/messages.js").read_text()
tool_idx = src.find("es.addEventListener('tool'")
tool_idx = src.find("source.addEventListener('tool'")
if tool_idx < 0:
tool_idx = src.find("es.addEventListener('tool'")
assert tool_idx >= 0, "tool event handler not found"
tool_block = src[tool_idx:tool_idx+400]
assert "activeSid" in tool_block, "tool handler must check activeSid before writing to DOM"
assert "activeSid" in tool_block, \
"tool handler must check activeSid before writing to DOM"
# ── R10: respondApproval uses wrong session_id after switch (multi-session) ─
@@ -337,8 +356,10 @@ def test_tool_status_only_shown_for_current_session(cleanup_test_sessions):
When missing, session A's tool names would appear in session B's activity bar.
"""
src = (REPO_ROOT / "static/messages.js").read_text()
# Find the tool event handler
tool_idx = src.find("es.addEventListener('tool'")
# Sprint 12: handler moved into _wireSSE(source)
tool_idx = src.find("source.addEventListener('tool'")
if tool_idx < 0:
tool_idx = src.find("es.addEventListener('tool'")
assert tool_idx >= 0
tool_block = src[tool_idx:tool_idx+400]
# setStatus must be inside the activeSid guard, not before it
@@ -347,8 +368,8 @@ def test_tool_status_only_shown_for_current_session(cleanup_test_sessions):
assert guard_pos >= 0, "tool handler must guard with activeSid check"
# The guard must appear BEFORE or AROUND the setStatus call
# (status only fires for the current session)
assert status_pos > tool_block.find("activeSid"), "setStatus in tool handler must be inside the activeSid guard"
assert status_pos > tool_block.find("activeSid"), \
"setStatus in tool handler must be inside the activeSid guard"
# ── R12: Live tool cards lost on switch-away and switch-back ──────────────
@@ -375,7 +396,10 @@ def test_done_handler_sets_busy_false_before_renderMessages(cleanup_test_session
tool cards are skipped entirely after a response completes.
"""
src = (REPO_ROOT / "static/messages.js").read_text()
done_idx = src.find("es.addEventListener('done'")
# Sprint 12: handler moved into _wireSSE(source)
done_idx = src.find("source.addEventListener('done'")
if done_idx < 0:
done_idx = src.find("es.addEventListener('done'")
assert done_idx >= 0
done_block = src[done_idx:done_idx+1500]
# S.busy=false must appear before renderMessages() within the done handler

View File

@@ -1,5 +1,5 @@
"""
Sprint 1 test suite for the Hermes WebUI.
Sprint 1 test suite for the Hermes Web UI.
Tests use the ISOLATED test server running on http://127.0.0.1:8788.
Production server (port 8787) and your real conversations are never touched.

96
tests/test_sprint11.py Normal file
View File

@@ -0,0 +1,96 @@
"""
Sprint 11 Tests: multi-provider model support, streaming smoothness, routes extraction.
"""
import json, pathlib, urllib.error, urllib.request, urllib.parse
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
BASE = "http://127.0.0.1:8788"
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
# ── /api/models endpoint ──────────────────────────────────────────────────
def test_models_endpoint_returns_200():
"""GET /api/models returns a valid response."""
d, status = get("/api/models")
assert status == 200
def test_models_has_required_fields():
"""Response includes groups, default_model, and active_provider."""
d, _ = get("/api/models")
assert 'groups' in d
assert 'default_model' in d
assert 'active_provider' in d
def test_models_groups_structure():
"""Each group has provider name and models list."""
d, _ = get("/api/models")
assert isinstance(d['groups'], list)
assert len(d['groups']) > 0
for group in d['groups']:
assert 'provider' in group
assert 'models' in group
assert isinstance(group['models'], list)
assert len(group['models']) > 0
def test_models_model_structure():
"""Each model has id and label."""
d, _ = get("/api/models")
for group in d['groups']:
for model in group['models']:
assert 'id' in model
assert 'label' in model
assert isinstance(model['id'], str)
assert isinstance(model['label'], str)
assert len(model['id']) > 0
assert len(model['label']) > 0
def test_models_default_model_not_empty():
"""Default model should be a non-empty string."""
d, _ = get("/api/models")
assert isinstance(d['default_model'], str)
assert len(d['default_model']) > 0
def test_models_at_least_one_provider():
"""At least one provider group should exist (fallback list at minimum)."""
d, _ = get("/api/models")
providers = [g['provider'] for g in d['groups']]
assert len(providers) >= 1
def test_models_no_duplicate_ids():
"""Model IDs should not be duplicated within a single group."""
d, _ = get("/api/models")
for group in d['groups']:
ids = [m['id'] for m in group['models']]
assert len(ids) == len(set(ids)), f"Duplicate model IDs in {group['provider']}: {ids}"
def test_session_preserves_unlisted_model():
"""A session with a model not in the dropdown should still load correctly."""
# Create a session with a custom model string
d, _ = post("/api/session/new", {})
sid = d['session']['session_id']
try:
custom_model = 'custom-provider/test-model-999'
post("/api/session/update", {
'session_id': sid,
'model': custom_model,
'workspace': d['session']['workspace']
})
# Reload and verify model persisted
d2, _ = get(f"/api/session?session_id={sid}")
assert d2['session']['model'] == custom_model
finally:
post("/api/session/delete", {'session_id': sid})

179
tests/test_sprint12.py Normal file
View File

@@ -0,0 +1,179 @@
"""
Sprint 12 Tests: settings panel, session pinning, session import, SSE reconnect.
"""
import json, pathlib, urllib.error, urllib.request, urllib.parse
BASE = "http://127.0.0.1:8788"
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
def make_session(created_list):
d, _ = post("/api/session/new", {})
sid = d["session"]["session_id"]
created_list.append(sid)
return sid
# ── Settings API ──────────────────────────────────────────────────────────
def test_settings_get_returns_defaults():
"""GET /api/settings returns default settings."""
d, status = get("/api/settings")
assert status == 200
assert 'default_model' in d
assert 'default_workspace' in d
def test_settings_post_persists():
"""POST /api/settings saves and returns merged settings."""
d, status = post("/api/settings", {"default_model": "test/model-123"})
assert status == 200
assert d['default_model'] == 'test/model-123'
# Verify it persisted
d2, _ = get("/api/settings")
assert d2['default_model'] == 'test/model-123'
# Restore
post("/api/settings", {"default_model": "openai/gpt-5.4-mini"})
def test_settings_partial_update():
"""POST /api/settings with partial data doesn't clobber other fields."""
d1, _ = get("/api/settings")
original_ws = d1['default_workspace']
post("/api/settings", {"default_model": "anthropic/claude-sonnet-4.6"})
d2, _ = get("/api/settings")
assert d2['default_model'] == 'anthropic/claude-sonnet-4.6'
assert d2['default_workspace'] == original_ws
# Restore
post("/api/settings", {"default_model": "openai/gpt-5.4-mini"})
# ── Session Pinning ───────────────────────────────────────────────────────
def test_pin_session():
"""POST /api/session/pin sets pinned=true."""
created = []
try:
sid = make_session(created)
d, status = post("/api/session/pin", {"session_id": sid, "pinned": True})
assert status == 200
assert d['ok'] is True
assert d['session']['pinned'] is True
finally:
for sid in created:
post("/api/session/delete", {"session_id": sid})
def test_unpin_session():
"""POST /api/session/pin with pinned=false unpins."""
created = []
try:
sid = make_session(created)
post("/api/session/pin", {"session_id": sid, "pinned": True})
d, status = post("/api/session/pin", {"session_id": sid, "pinned": False})
assert status == 200
assert d['session']['pinned'] is False
finally:
for sid in created:
post("/api/session/delete", {"session_id": sid})
def test_pinned_in_session_list():
"""Pinned sessions include pinned field in session list."""
created = []
try:
sid = make_session(created)
# Pin it and give it a title so it shows in the list
post("/api/session/rename", {"session_id": sid, "title": "Pinned Test"})
post("/api/session/pin", {"session_id": sid, "pinned": True})
d, _ = get("/api/sessions")
match = [s for s in d['sessions'] if s['session_id'] == sid]
assert len(match) == 1
assert match[0]['pinned'] is True
finally:
for sid in created:
post("/api/session/delete", {"session_id": sid})
def test_pinned_persists_on_reload():
"""Pin status survives session reload from disk."""
created = []
try:
sid = make_session(created)
post("/api/session/pin", {"session_id": sid, "pinned": True})
d, _ = get(f"/api/session?session_id={sid}")
assert d['session']['pinned'] is True
finally:
for sid in created:
post("/api/session/delete", {"session_id": sid})
# ── Session Import ────────────────────────────────────────────────────────
def test_import_session_basic():
"""POST /api/session/import creates a new session from JSON."""
payload = {
"title": "Imported Test",
"messages": [
{"role": "user", "content": "Hello from import"},
{"role": "assistant", "content": "Hi there!"},
],
"model": "test/import-model",
}
d, status = post("/api/session/import", payload)
assert status == 200
assert d['ok'] is True
sid = d['session']['session_id']
try:
assert d['session']['title'] == 'Imported Test'
assert len(d['session']['messages']) == 2
# Verify it loads correctly
d2, _ = get(f"/api/session?session_id={sid}")
assert d2['session']['model'] == 'test/import-model'
finally:
post("/api/session/delete", {"session_id": sid})
def test_import_requires_messages():
"""Import fails without a messages array."""
d, status = post("/api/session/import", {"title": "No messages"})
assert status == 400
def test_import_creates_new_id():
"""Imported session gets a new session_id, not reusing any from the payload."""
payload = {
"session_id": "should_be_ignored",
"title": "ID Test",
"messages": [{"role": "user", "content": "test"}],
}
d, _ = post("/api/session/import", payload)
sid = d['session']['session_id']
try:
# The import should create a new ID, not use the one from the payload
assert sid != "should_be_ignored"
finally:
post("/api/session/delete", {"session_id": sid})
def test_import_with_pinned():
"""Imported session can be pinned."""
payload = {
"title": "Pinned Import",
"messages": [{"role": "user", "content": "test"}],
"pinned": True,
}
d, _ = post("/api/session/import", payload)
sid = d['session']['session_id']
try:
d2, _ = get(f"/api/session?session_id={sid}")
assert d2['session']['pinned'] is True
finally:
post("/api/session/delete", {"session_id": sid})

120
tests/test_sprint13.py Normal file
View File

@@ -0,0 +1,120 @@
"""
Sprint 13 Tests: cron recent endpoint, session duplicate, background alerts.
"""
import json, pathlib, urllib.error, urllib.request
BASE = "http://127.0.0.1:8788"
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
def make_session(created_list):
d, _ = post("/api/session/new", {})
sid = d["session"]["session_id"]
created_list.append(sid)
return sid, d["session"]
# ── Cron recent endpoint ──────────────────────────────────────────────────
def test_crons_recent_returns_200():
"""GET /api/crons/recent returns completions list."""
d, status = get("/api/crons/recent?since=0")
assert status == 200
assert 'completions' in d
assert isinstance(d['completions'], list)
assert 'since' in d
def test_crons_recent_with_future_since():
"""Completions list is empty when since is in the future."""
import time
d, _ = get(f"/api/crons/recent?since={time.time() + 99999}")
assert d['completions'] == []
def test_crons_recent_default_since():
"""Default since=0 returns all completions."""
d, status = get("/api/crons/recent")
assert status == 200
assert 'completions' in d
# ── Session duplicate ─────────────────────────────────────────────────────
def test_duplicate_session():
"""Duplicating a session creates a new one with same workspace/model."""
created = []
try:
sid, sess = make_session(created)
# Set a specific model on the session
post("/api/session/update", {
"session_id": sid, "model": "test/dup-model",
"workspace": sess["workspace"]
})
# Duplicate: create new session with same workspace/model
d2, status = post("/api/session/new", {
"workspace": sess["workspace"], "model": "test/dup-model"
})
assert status == 200
new_sid = d2["session"]["session_id"]
created.append(new_sid)
assert new_sid != sid
assert d2["session"]["model"] == "test/dup-model"
assert d2["session"]["workspace"] == sess["workspace"]
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
# ── Session pinned field preserved across operations ──────────────────────
def test_pinned_survives_update():
"""Pinned status survives session update."""
created = []
try:
sid, sess = make_session(created)
post("/api/session/pin", {"session_id": sid, "pinned": True})
# Update workspace/model
post("/api/session/update", {
"session_id": sid, "model": "test/other",
"workspace": sess["workspace"]
})
d, _ = get(f"/api/session?session_id={sid}")
assert d["session"]["pinned"] is True
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
# ── Workspace symlink validation ──────────────────────────────────────────
def test_workspace_add_rejects_nonexistent():
"""Adding a non-existent path returns 400."""
d, status = post("/api/workspaces/add", {"path": "/nonexistent/path/12345"})
assert status == 400
def test_workspace_add_accepts_real_dir():
"""Adding a real directory succeeds."""
import tempfile
tmp = tempfile.mkdtemp()
try:
d, status = post("/api/workspaces/add", {"path": tmp, "name": "test-ws"})
assert status == 200
assert d["ok"] is True
finally:
post("/api/workspaces/remove", {"path": tmp})
import shutil
shutil.rmtree(tmp, ignore_errors=True)

153
tests/test_sprint14.py Normal file
View File

@@ -0,0 +1,153 @@
"""
Sprint 14 Tests: file rename, folder create, session archive, session tags, mermaid, timestamps.
"""
import json, os, pathlib, shutil, tempfile, urllib.error, urllib.request
BASE = "http://127.0.0.1:8788"
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
def make_session(created_list):
d, _ = post("/api/session/new", {})
sid = d["session"]["session_id"]
created_list.append(sid)
return sid, d["session"]
# ── File rename ───────────────────────────────────────────────────────────
def test_file_rename():
"""Renaming a file changes its name on disk."""
created = []
try:
sid, sess = make_session(created)
# Create a file first
post("/api/file/create", {"session_id": sid, "path": "rename_test.txt", "content": "hello"})
d, status = post("/api/file/rename", {
"session_id": sid, "path": "rename_test.txt", "new_name": "renamed.txt"
})
assert status == 200
assert d["ok"] is True
assert "renamed.txt" in d["new_path"]
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
def test_file_rename_rejects_path_traversal():
"""Rename rejects names with path separators."""
created = []
try:
sid, sess = make_session(created)
post("/api/file/create", {"session_id": sid, "path": "safe.txt", "content": ""})
d, status = post("/api/file/rename", {
"session_id": sid, "path": "safe.txt", "new_name": "../evil.txt"
})
assert status == 400
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
def test_file_rename_rejects_existing():
"""Rename fails if target name already exists."""
created = []
try:
sid, sess = make_session(created)
post("/api/file/create", {"session_id": sid, "path": "a.txt", "content": "a"})
post("/api/file/create", {"session_id": sid, "path": "b.txt", "content": "b"})
d, status = post("/api/file/rename", {
"session_id": sid, "path": "a.txt", "new_name": "b.txt"
})
assert status == 400
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
# ── Folder create ─────────────────────────────────────────────────────────
def test_create_dir():
"""Creating a folder succeeds."""
created = []
try:
sid, sess = make_session(created)
d, status = post("/api/file/create-dir", {
"session_id": sid, "path": "test_folder"
})
assert status == 200
assert d["ok"] is True
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
def test_create_dir_rejects_existing():
"""Creating a folder that already exists fails."""
created = []
try:
sid, sess = make_session(created)
post("/api/file/create-dir", {"session_id": sid, "path": "dup_folder"})
d, status = post("/api/file/create-dir", {"session_id": sid, "path": "dup_folder"})
assert status == 400
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
# ── Session archive ───────────────────────────────────────────────────────
def test_archive_session():
"""Archiving a session sets archived=true."""
created = []
try:
sid, _ = make_session(created)
d, status = post("/api/session/archive", {"session_id": sid, "archived": True})
assert status == 200
assert d["session"]["archived"] is True
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
def test_unarchive_session():
"""Unarchiving a session sets archived=false."""
created = []
try:
sid, _ = make_session(created)
post("/api/session/archive", {"session_id": sid, "archived": True})
d, status = post("/api/session/archive", {"session_id": sid, "archived": False})
assert status == 200
assert d["session"]["archived"] is False
finally:
for s in created:
post("/api/session/delete", {"session_id": s})
def test_archived_in_compact():
"""Archived field appears in session list."""
created = []
try:
sid, _ = make_session(created)
post("/api/session/rename", {"session_id": sid, "title": "Archive Test"})
post("/api/session/archive", {"session_id": sid, "archived": True})
d, _ = get(f"/api/session?session_id={sid}")
assert d["session"]["archived"] is True
finally:
for s in created:
post("/api/session/delete", {"session_id": s})