Hermes WebUI v0.1.0 — initial public release
This commit is contained in:
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
240
tests/conftest.py
Normal file
240
tests/conftest.py
Normal file
@@ -0,0 +1,240 @@
|
||||
"""
|
||||
Shared pytest fixtures for webui-mvp tests.
|
||||
|
||||
TEST ISOLATION:
|
||||
Tests run against a SEPARATE server instance on port 8788 with a
|
||||
completely separate state directory. Production data is never touched.
|
||||
The test state dir is wiped before each full test run and again on teardown.
|
||||
|
||||
PATH DISCOVERY:
|
||||
No hardcoded paths. Discovery order:
|
||||
1. Environment variables (HERMES_WEBUI_AGENT_DIR, HERMES_WEBUI_PYTHON, etc.)
|
||||
2. Sibling checkout heuristics relative to this repo
|
||||
3. Common install paths (~/.hermes/hermes-agent)
|
||||
4. System python3 as a last resort
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import pytest
|
||||
|
||||
# ── Repo root discovery ────────────────────────────────────────────────────
|
||||
# conftest.py lives at <repo>/tests/conftest.py
|
||||
TESTS_DIR = pathlib.Path(__file__).parent.resolve()
|
||||
REPO_ROOT = TESTS_DIR.parent.resolve()
|
||||
HOME = pathlib.Path.home()
|
||||
HERMES_HOME = pathlib.Path(os.getenv('HERMES_HOME', str(HOME / '.hermes')))
|
||||
|
||||
# ── Test server config ────────────────────────────────────────────────────
|
||||
TEST_PORT = int(os.getenv('HERMES_WEBUI_TEST_PORT', '8788'))
|
||||
TEST_BASE = f"http://127.0.0.1:{TEST_PORT}"
|
||||
TEST_STATE_DIR = pathlib.Path(os.getenv(
|
||||
'HERMES_WEBUI_TEST_STATE_DIR',
|
||||
str(HERMES_HOME / 'webui-mvp-test')
|
||||
))
|
||||
TEST_WORKSPACE = TEST_STATE_DIR / 'test-workspace'
|
||||
|
||||
# ── Server script: always relative to repo root ───────────────────────────
|
||||
SERVER_SCRIPT = REPO_ROOT / 'server.py'
|
||||
if not SERVER_SCRIPT.exists():
|
||||
raise RuntimeError(
|
||||
f"server.py not found at {SERVER_SCRIPT}. "
|
||||
"Is conftest.py in the tests/ subdirectory of the repo?"
|
||||
)
|
||||
|
||||
# ── Hermes agent discovery (mirrors api/config._discover_agent_dir) ───────
|
||||
def _discover_agent_dir() -> pathlib.Path:
|
||||
candidates = [
|
||||
os.getenv('HERMES_WEBUI_AGENT_DIR', ''),
|
||||
str(HERMES_HOME / 'hermes-agent'),
|
||||
str(REPO_ROOT.parent / 'hermes-agent'),
|
||||
str(HOME / '.hermes' / 'hermes-agent'),
|
||||
str(HOME / 'hermes-agent'),
|
||||
]
|
||||
for c in candidates:
|
||||
if not c:
|
||||
continue
|
||||
p = pathlib.Path(c).expanduser()
|
||||
if p.exists() and (p / 'run_agent.py').exists():
|
||||
return p.resolve()
|
||||
return None
|
||||
|
||||
# ── Python discovery (mirrors api/config._discover_python) ────────────────
|
||||
def _discover_python(agent_dir) -> str:
|
||||
if os.getenv('HERMES_WEBUI_PYTHON'):
|
||||
return os.getenv('HERMES_WEBUI_PYTHON')
|
||||
if agent_dir:
|
||||
venv_py = agent_dir / 'venv' / 'bin' / 'python'
|
||||
if venv_py.exists():
|
||||
return str(venv_py)
|
||||
local_venv = REPO_ROOT / '.venv' / 'bin' / 'python'
|
||||
if local_venv.exists():
|
||||
return str(local_venv)
|
||||
return shutil.which('python3') or shutil.which('python') or 'python3'
|
||||
|
||||
HERMES_AGENT = _discover_agent_dir()
|
||||
VENV_PYTHON = _discover_python(HERMES_AGENT)
|
||||
|
||||
# Work dir: agent dir if found, else repo root
|
||||
WORKDIR = str(HERMES_AGENT) if HERMES_AGENT else str(REPO_ROOT)
|
||||
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _post(base, path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(
|
||||
base + path, data=data, headers={"Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read())
|
||||
except urllib.error.HTTPError as e:
|
||||
try:
|
||||
return json.loads(e.read())
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _wait_for_server(base, timeout=20):
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
with urllib.request.urlopen(base + "/health", timeout=2) as r:
|
||||
if json.loads(r.read()).get("status") == "ok":
|
||||
return True
|
||||
except Exception:
|
||||
time.sleep(0.3)
|
||||
return False
|
||||
|
||||
|
||||
# ── Session-scoped test server ────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def test_server():
|
||||
"""
|
||||
Start an isolated test server on TEST_PORT with a clean state directory.
|
||||
Paths are discovered dynamically -- no hardcoded absolute path assumptions.
|
||||
"""
|
||||
# Clean slate
|
||||
if TEST_STATE_DIR.exists():
|
||||
shutil.rmtree(TEST_STATE_DIR)
|
||||
TEST_STATE_DIR.mkdir(parents=True)
|
||||
TEST_WORKSPACE.mkdir(parents=True)
|
||||
|
||||
# Symlink real skills into test home so skill-related tests work,
|
||||
# but all write-heavy state stays isolated.
|
||||
real_skills = HERMES_HOME / 'skills'
|
||||
test_skills = TEST_STATE_DIR / 'skills'
|
||||
if real_skills.exists() and not test_skills.exists():
|
||||
test_skills.symlink_to(real_skills)
|
||||
|
||||
# Isolated cron state
|
||||
(TEST_STATE_DIR / 'cron').mkdir(parents=True, exist_ok=True)
|
||||
|
||||
env = os.environ.copy()
|
||||
env.update({
|
||||
"HERMES_WEBUI_PORT": str(TEST_PORT),
|
||||
"HERMES_WEBUI_HOST": "127.0.0.1",
|
||||
"HERMES_WEBUI_STATE_DIR": str(TEST_STATE_DIR),
|
||||
"HERMES_WEBUI_DEFAULT_WORKSPACE": str(TEST_WORKSPACE),
|
||||
"HERMES_WEBUI_DEFAULT_MODEL": "openai/gpt-5.4-mini",
|
||||
"HERMES_HOME": str(TEST_STATE_DIR),
|
||||
})
|
||||
|
||||
# Pass agent dir if discovered so server.py doesn't have to re-discover
|
||||
if HERMES_AGENT:
|
||||
env["HERMES_WEBUI_AGENT_DIR"] = str(HERMES_AGENT)
|
||||
|
||||
proc = subprocess.Popen(
|
||||
[VENV_PYTHON, str(SERVER_SCRIPT)],
|
||||
cwd=WORKDIR,
|
||||
env=env,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
|
||||
if not _wait_for_server(TEST_BASE, timeout=20):
|
||||
proc.kill()
|
||||
pytest.fail(
|
||||
f"Test server on port {TEST_PORT} did not start within 20s.\n"
|
||||
f" server.py : {SERVER_SCRIPT}\n"
|
||||
f" python : {VENV_PYTHON}\n"
|
||||
f" agent dir : {HERMES_AGENT}\n"
|
||||
f" workdir : {WORKDIR}\n"
|
||||
)
|
||||
|
||||
yield proc
|
||||
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
|
||||
try:
|
||||
shutil.rmtree(TEST_STATE_DIR)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ── Test base URL ─────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_url():
|
||||
return TEST_BASE
|
||||
|
||||
|
||||
# ── Per-test session cleanup ──────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_test_sessions():
|
||||
"""
|
||||
Yields a list for tests to register created session IDs.
|
||||
Deletes all registered sessions after each test.
|
||||
Resets last_workspace to the test workspace to prevent state bleed.
|
||||
"""
|
||||
created: list[str] = []
|
||||
yield created
|
||||
|
||||
for sid in created:
|
||||
try:
|
||||
_post(TEST_BASE, "/api/session/delete", {"session_id": sid})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
_post(TEST_BASE, "/api/sessions/cleanup_zero_message")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
last_ws_file = TEST_STATE_DIR / "last_workspace.txt"
|
||||
last_ws_file.write_text(str(TEST_WORKSPACE), encoding='utf-8')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ── Convenience helpers ────────────────────────────────────────────────────────
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
"""
|
||||
Create a session on the test server and register it for cleanup.
|
||||
|
||||
Usage:
|
||||
def test_something(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
"""
|
||||
body = {}
|
||||
if ws:
|
||||
body["workspace"] = str(ws)
|
||||
d = _post(TEST_BASE, "/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
ws_path = pathlib.Path(d["session"]["workspace"])
|
||||
created_list.append(sid)
|
||||
return sid, ws_path
|
||||
416
tests/test_regressions.py
Normal file
416
tests/test_regressions.py
Normal file
@@ -0,0 +1,416 @@
|
||||
"""
|
||||
Regression tests -- one test per bug that was introduced and fixed.
|
||||
These tests exist specifically to prevent those bugs from silently returning.
|
||||
|
||||
Each test is tagged with the sprint/commit where the bug was found and fixed.
|
||||
"""
|
||||
import json
|
||||
import pathlib
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
|
||||
|
||||
BASE = "http://127.0.0.1:8788"
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def get_raw(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read(), r.headers.get("Content-Type",""), r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(
|
||||
BASE + path, data=data, headers={"Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session(created_list):
|
||||
d, _ = post("/api/session/new", {})
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid
|
||||
|
||||
|
||||
# ── R1: uuid not imported in server.py (Sprint 10 split regression) ──────────
|
||||
|
||||
def test_chat_start_returns_stream_id(cleanup_test_sessions):
|
||||
"""R1: chat/start must return stream_id -- catches missing uuid import.
|
||||
When uuid was missing, this returned 500 (NameError).
|
||||
"""
|
||||
sid = make_session(cleanup_test_sessions)
|
||||
data, status = post("/api/chat/start", {
|
||||
"session_id": sid,
|
||||
"message": "ping",
|
||||
"model": "openai/gpt-5.4-mini",
|
||||
})
|
||||
# Must return 200 with a stream_id -- not 500
|
||||
assert status == 200, f"chat/start failed with {status}: {data}"
|
||||
assert "stream_id" in data, "stream_id missing from chat/start response"
|
||||
assert len(data["stream_id"]) > 8, "stream_id looks invalid"
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
cleanup_test_sessions.clear()
|
||||
|
||||
|
||||
# ── R2: AIAgent not imported in api/streaming.py (Sprint 10 split regression) ─
|
||||
|
||||
def test_chat_stream_opens_successfully(cleanup_test_sessions):
|
||||
"""R2: After chat/start, GET /api/chat/stream must return 200 (SSE opens).
|
||||
When AIAgent was missing, the thread crashed immediately, popped STREAMS,
|
||||
and the SSE GET returned 404.
|
||||
"""
|
||||
sid = make_session(cleanup_test_sessions)
|
||||
data, status = post("/api/chat/start", {
|
||||
"session_id": sid,
|
||||
"message": "say: hello",
|
||||
"model": "openai/gpt-5.4-mini",
|
||||
})
|
||||
assert status == 200, f"chat/start failed: {data}"
|
||||
stream_id = data["stream_id"]
|
||||
|
||||
# Open the SSE stream -- must return 200, not 404
|
||||
# We only check headers (don't read the full stream body)
|
||||
req = urllib.request.Request(BASE + f"/api/chat/stream?stream_id={stream_id}")
|
||||
try:
|
||||
r = urllib.request.urlopen(req, timeout=3)
|
||||
assert r.status == 200, f"SSE stream returned {r.status} (expected 200)"
|
||||
ct = r.headers.get("Content-Type", "")
|
||||
assert "text/event-stream" in ct, f"Wrong Content-Type: {ct}"
|
||||
r.close()
|
||||
except urllib.error.HTTPError as e:
|
||||
assert False, f"SSE stream returned {e.code} -- AIAgent may not be imported"
|
||||
except Exception:
|
||||
pass # timeout or connection close after brief read is fine
|
||||
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
cleanup_test_sessions.clear()
|
||||
|
||||
|
||||
# ── R3: Session.__init__ missing tool_calls param (Sprint 10 split regression) ─
|
||||
|
||||
def test_session_with_tool_calls_in_json_loads_ok(cleanup_test_sessions):
|
||||
"""R3: Sessions that have tool_calls in their JSON must load without 500.
|
||||
When tool_calls=None was missing from Session.__init__, loading such sessions
|
||||
threw TypeError: unexpected keyword argument.
|
||||
"""
|
||||
sid = make_session(cleanup_test_sessions)
|
||||
|
||||
# Manually inject tool_calls into the session's JSON file
|
||||
sessions_dir = pathlib.Path.home() / ".hermes" / "webui-mvp-test" / "sessions"
|
||||
session_file = sessions_dir / f"{sid}.json"
|
||||
if session_file.exists():
|
||||
d = json.loads(session_file.read_text())
|
||||
d["tool_calls"] = [
|
||||
{"name": "terminal", "snippet": "test output", "tid": "test_tid_001", "assistant_msg_idx": 1}
|
||||
]
|
||||
session_file.write_text(json.dumps(d))
|
||||
|
||||
# Loading the session must return 200, not 500
|
||||
data, status = get(f"/api/session?session_id={urllib.parse.quote(sid)}")
|
||||
assert status == 200, f"Session with tool_calls returned {status}: {data}"
|
||||
assert data["session"]["session_id"] == sid
|
||||
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
cleanup_test_sessions.clear()
|
||||
|
||||
|
||||
# ── R4: has_pending not imported in streaming.py (Sprint 10 split regression) ─
|
||||
|
||||
def test_streaming_py_imports_has_pending(cleanup_test_sessions):
|
||||
"""R4: api/streaming.py must import or define has_pending.
|
||||
When missing, the approval check mid-stream caused NameError.
|
||||
"""
|
||||
src = (REPO_ROOT / "api/streaming.py").read_text()
|
||||
assert "has_pending" in src, "has_pending not found in api/streaming.py"
|
||||
# Verify it's imported (not just used)
|
||||
assert "import" in src and "has_pending" in src, \
|
||||
"has_pending must be imported in api/streaming.py"
|
||||
|
||||
|
||||
def test_aiagent_imported_in_streaming(cleanup_test_sessions):
|
||||
"""R2b: api/streaming.py must import AIAgent.
|
||||
When missing, the streaming thread crashed immediately after being spawned.
|
||||
"""
|
||||
src = (REPO_ROOT / "api/streaming.py").read_text()
|
||||
assert "AIAgent" in src, "AIAgent not referenced in api/streaming.py"
|
||||
assert "from run_agent import AIAgent" in src or "import AIAgent" in src, \
|
||||
"AIAgent must be imported in api/streaming.py"
|
||||
|
||||
|
||||
# ── R5: SSE loop did not break on cancel event (Sprint 10 bug) ───────────────
|
||||
|
||||
def test_cancel_nonexistent_stream_returns_not_cancelled(cleanup_test_sessions):
|
||||
"""R5a: Cancel endpoint works and returns cancelled:false for unknown stream."""
|
||||
data, status = get("/api/chat/cancel?stream_id=nonexistent_test_xyz")
|
||||
assert status == 200
|
||||
assert data["ok"] is True
|
||||
assert data["cancelled"] is False
|
||||
|
||||
|
||||
def test_server_py_sse_loop_breaks_on_cancel(cleanup_test_sessions):
|
||||
"""R5b: server.py SSE loop must include 'cancel' in the break condition.
|
||||
When missing, the connection hung after the cancel event was processed.
|
||||
"""
|
||||
src = (REPO_ROOT / "server.py").read_text()
|
||||
# Find the SSE break condition
|
||||
import re
|
||||
m = re.search(r"if event in \([^)]+\):\s*break", src)
|
||||
assert m, "SSE break condition not found in server.py"
|
||||
assert "cancel" in m.group(), \
|
||||
f"'cancel' missing from SSE break condition: {m.group()}"
|
||||
|
||||
|
||||
# ── R6: Test cron isolation (Sprint 10) ──────────────────────────────────────
|
||||
|
||||
def test_real_jobs_json_not_polluted_by_tests(cleanup_test_sessions):
|
||||
"""R6: Test runs must not write to the real ~/.hermes/cron/jobs.json.
|
||||
When HERMES_HOME isolation was missing, every test run added test-job-* entries.
|
||||
"""
|
||||
real_jobs_path = pathlib.Path.home() / ".hermes" / "cron" / "jobs.json"
|
||||
if not real_jobs_path.exists():
|
||||
return # no jobs file at all -- fine
|
||||
|
||||
jobs = json.loads(real_jobs_path.read_text())
|
||||
if isinstance(jobs, dict):
|
||||
jobs = jobs.get("jobs", [])
|
||||
|
||||
test_jobs = [j for j in jobs if j.get("name", "").startswith("test-job-")]
|
||||
assert len(test_jobs) == 0, \
|
||||
f"Real jobs.json contains {len(test_jobs)} test-job-* entries: " \
|
||||
f"{[j['name'] for j in test_jobs]}"
|
||||
|
||||
|
||||
# ── General: api modules all importable ──────────────────────────────────────
|
||||
|
||||
def test_all_api_modules_importable(cleanup_test_sessions):
|
||||
"""All api/ modules must be importable without NameError or ImportError.
|
||||
Catches missing imports introduced during future module splits.
|
||||
"""
|
||||
import ast, pathlib
|
||||
api_dir = REPO_ROOT / "api"
|
||||
for module_file in api_dir.glob("*.py"):
|
||||
src = module_file.read_text()
|
||||
try:
|
||||
ast.parse(src)
|
||||
except SyntaxError as e:
|
||||
assert False, f"{module_file.name} has syntax error: {e}"
|
||||
|
||||
|
||||
def test_server_py_importable(cleanup_test_sessions):
|
||||
"""server.py must parse without syntax errors after any split."""
|
||||
import ast, pathlib
|
||||
src = (REPO_ROOT / "server.py").read_text()
|
||||
try:
|
||||
ast.parse(src)
|
||||
except SyntaxError as e:
|
||||
assert False, f"server.py has syntax error: {e}"
|
||||
|
||||
# ── R7: Cross-session busy state bleed ───────────────────────────────────────
|
||||
|
||||
def test_loadSession_resets_busy_state_for_idle_session(cleanup_test_sessions):
|
||||
"""R7: sessions.js loadSession for a non-inflight session must reset S.busy to false.
|
||||
When missing, switching from a busy session to an idle one left the Send button
|
||||
disabled, showed the wrong activity bar, and pointed Cancel at the wrong stream.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/sessions.js").read_text()
|
||||
# The fix adds explicit S.busy=false in the non-inflight else branch
|
||||
assert "S.busy=false;" in src, "sessions.js loadSession must set S.busy=false when loading a non-inflight session"
|
||||
# btnSend must be explicitly re-enabled
|
||||
assert "$('btnSend').disabled=false;" in src, "sessions.js loadSession must enable btnSend for non-inflight sessions"
|
||||
|
||||
|
||||
def test_done_handler_guards_setbusy_with_inflight_check(cleanup_test_sessions):
|
||||
"""R7b: messages.js done/error handlers must not call setBusy(false) if the
|
||||
currently viewed session is itself still in-flight.
|
||||
When missing, finishing session A while viewing in-flight session B would
|
||||
disable B's Send button.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
# The fix wraps setBusy(false) in a guard
|
||||
assert "INFLIGHT[S.session.session_id]" in src, "messages.js must guard setBusy(false) with INFLIGHT check for current session"
|
||||
|
||||
|
||||
def test_cancel_button_not_cleared_across_sessions(cleanup_test_sessions):
|
||||
"""R7c: The Cancel button and activeStreamId must only be cleared when the
|
||||
done/error event belongs to the currently viewed session.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
# Both clear operations must be inside the activeSid === S.session guard
|
||||
# We check for the pattern added by the fix
|
||||
assert "S.session.session_id===activeSid" in src, "messages.js must guard activeStreamId/Cancel clearing with session identity check"
|
||||
|
||||
# ── R8: Session delete does not invalidate index (ghost sessions) ─────────────
|
||||
|
||||
def test_deleted_session_does_not_appear_in_list(cleanup_test_sessions):
|
||||
"""R8: After deleting a session, it must not appear in /api/sessions.
|
||||
When _index.json was not invalidated on delete, the session reappeared
|
||||
in the list even after the JSON file was removed.
|
||||
"""
|
||||
# Create a session with a title so it shows in the list
|
||||
d, _ = post("/api/session/new", {})
|
||||
sid = d["session"]["session_id"]
|
||||
post("/api/session/rename", {"session_id": sid, "title": "regression-test-delete-R8"})
|
||||
|
||||
# Verify it appears
|
||||
sessions, _ = get("/api/sessions")
|
||||
ids_before = [s["session_id"] for s in sessions["sessions"]]
|
||||
assert sid in ids_before, "Session must appear in list before delete"
|
||||
|
||||
# Delete it
|
||||
result, status = post("/api/session/delete", {"session_id": sid})
|
||||
assert status == 200 and result.get("ok") is True
|
||||
|
||||
# Verify it no longer appears -- even after a second fetch (index rebuild)
|
||||
sessions2, _ = get("/api/sessions")
|
||||
ids_after = [s["session_id"] for s in sessions2["sessions"]]
|
||||
assert sid not in ids_after, f"Deleted session {sid} still appears in list -- index not invalidated on delete"
|
||||
|
||||
|
||||
def test_server_delete_invalidates_index(cleanup_test_sessions):
|
||||
"""R8b: server.py session/delete handler must unlink _index.json.
|
||||
Static check that the fix is in place.
|
||||
"""
|
||||
src = (REPO_ROOT / "server.py").read_text()
|
||||
# Find the delete handler and verify it unlinks the index
|
||||
delete_idx = src.find("if parsed.path == '/api/session/delete':")
|
||||
assert delete_idx >= 0, "session/delete handler not found"
|
||||
delete_block = src[delete_idx:delete_idx+600]
|
||||
assert "SESSION_INDEX_FILE" in delete_block, "server.py session/delete must invalidate SESSION_INDEX_FILE"
|
||||
|
||||
|
||||
# ── R9: Token/tool SSE events write to wrong session after switch ─────────────
|
||||
|
||||
def test_token_handler_guards_session_id(cleanup_test_sessions):
|
||||
"""R9a: The SSE token event handler must check activeSid before writing to DOM.
|
||||
When missing, tokens from session A would render into session B's message area
|
||||
if the user switched sessions mid-stream.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
# Find the token event handler
|
||||
token_idx = src.find("es.addEventListener('token'")
|
||||
assert token_idx >= 0, "token event handler not found"
|
||||
token_block = src[token_idx:token_idx+300]
|
||||
assert "activeSid" in token_block, "token handler must check activeSid before writing to DOM"
|
||||
assert "S.session.session_id!==activeSid" in token_block or "S.session.session_id===activeSid" in token_block, "token handler must compare current session to activeSid"
|
||||
|
||||
|
||||
def test_tool_handler_guards_session_id(cleanup_test_sessions):
|
||||
"""R9b: The SSE tool event handler must check activeSid before writing to DOM.
|
||||
When missing, tool cards from session A would render into session B's message area.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
tool_idx = src.find("es.addEventListener('tool'")
|
||||
assert tool_idx >= 0, "tool event handler not found"
|
||||
tool_block = src[tool_idx:tool_idx+400]
|
||||
assert "activeSid" in tool_block, "tool handler must check activeSid before writing to DOM"
|
||||
|
||||
# ── R10: respondApproval uses wrong session_id after switch (multi-session) ─
|
||||
|
||||
def test_respond_approval_uses_approval_session_id(cleanup_test_sessions):
|
||||
"""R10: respondApproval must use the session_id of the session that triggered
|
||||
the approval, not S.session.session_id (which may be a different session
|
||||
if the user switched while approval was pending).
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
# The fix introduces _approvalSessionId to track the correct session
|
||||
assert "_approvalSessionId" in src, "messages.js must use _approvalSessionId in respondApproval"
|
||||
# respondApproval must use _approvalSessionId, not S.session.session_id directly
|
||||
idx = src.find("async function respondApproval(")
|
||||
assert idx >= 0, "respondApproval not found"
|
||||
fn_body = src[idx:idx+300]
|
||||
assert "_approvalSessionId" in fn_body, "respondApproval must read _approvalSessionId, not S.session.session_id"
|
||||
|
||||
|
||||
# ── R11: Activity bar shows cross-session tool status ─────────────────────
|
||||
|
||||
def test_tool_status_only_shown_for_current_session(cleanup_test_sessions):
|
||||
"""R11: The activity bar setStatus() call in the tool SSE handler must only
|
||||
fire when the user is viewing the session that triggered the tool.
|
||||
When missing, session A's tool names would appear in session B's activity bar.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
# Find the tool event handler
|
||||
tool_idx = src.find("es.addEventListener('tool'")
|
||||
assert tool_idx >= 0
|
||||
tool_block = src[tool_idx:tool_idx+400]
|
||||
# setStatus must be inside the activeSid guard, not before it
|
||||
status_pos = tool_block.find("setStatus(")
|
||||
guard_pos = tool_block.find("S.session.session_id===activeSid")
|
||||
assert guard_pos >= 0, "tool handler must guard with activeSid check"
|
||||
# The guard must appear BEFORE or AROUND the setStatus call
|
||||
# (status only fires for the current session)
|
||||
assert status_pos > tool_block.find("activeSid"), "setStatus in tool handler must be inside the activeSid guard"
|
||||
|
||||
|
||||
# ── R12: Live tool cards lost on switch-away and switch-back ──────────────
|
||||
|
||||
def test_loadSession_inflight_restores_live_tool_cards(cleanup_test_sessions):
|
||||
"""R12: When switching back to an in-flight session, live tool cards in
|
||||
#liveToolCards must be restored from S.toolCalls.
|
||||
When missing, tool cards disappeared on switch-away even though the session
|
||||
was still processing.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/sessions.js").read_text()
|
||||
# INFLIGHT branch must call appendLiveToolCard
|
||||
inflight_idx = src.find("if(INFLIGHT[sid]){")
|
||||
assert inflight_idx >= 0, "INFLIGHT branch not found in loadSession"
|
||||
inflight_block = src[inflight_idx:inflight_idx+500]
|
||||
assert "appendLiveToolCard" in inflight_block, "loadSession INFLIGHT branch must restore live tool cards via appendLiveToolCard"
|
||||
assert "clearLiveToolCards" in inflight_block, "loadSession INFLIGHT branch must clear old live cards before restoring"
|
||||
|
||||
# ── R13: renderMessages() called before S.busy=false in done handler ────────
|
||||
|
||||
def test_done_handler_sets_busy_false_before_renderMessages(cleanup_test_sessions):
|
||||
"""R13: In the done handler, S.busy must be set to false BEFORE renderMessages()
|
||||
is called for the active session. The !S.busy guard in renderMessages() controls
|
||||
whether settled tool cards are rendered. When S.busy=true during renderMessages(),
|
||||
tool cards are skipped entirely after a response completes.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
done_idx = src.find("es.addEventListener('done'")
|
||||
assert done_idx >= 0
|
||||
done_block = src[done_idx:done_idx+1500]
|
||||
# S.busy=false must appear before renderMessages() within the done handler
|
||||
busy_pos = done_block.find("S.busy=false;")
|
||||
render_pos = done_block.find("renderMessages()")
|
||||
assert busy_pos >= 0, "done handler must set S.busy=false before renderMessages()"
|
||||
assert busy_pos < render_pos, f"S.busy=false (pos {busy_pos}) must come before renderMessages() (pos {render_pos})"
|
||||
|
||||
|
||||
# ── R14: send() uses stale modelSelect.value instead of session model ────────
|
||||
|
||||
def test_send_uses_session_model_as_authoritative_source(cleanup_test_sessions):
|
||||
"""R14: send() must use S.session.model as the authoritative model, not just
|
||||
$('modelSelect').value. When a session was created with a model not in the
|
||||
current dropdown list, the select value would be stale after switching sessions,
|
||||
causing the wrong model to be sent.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/messages.js").read_text()
|
||||
# The model field in the chat/start payload must prefer S.session.model
|
||||
chat_start_idx = src.find("/api/chat/start")
|
||||
assert chat_start_idx >= 0
|
||||
payload_block = src[chat_start_idx:chat_start_idx+300]
|
||||
assert "S.session.model" in payload_block, "send() must use S.session.model in the chat/start payload"
|
||||
|
||||
|
||||
# ── R15: newSession does not clear live tool cards ────────────────────────────
|
||||
|
||||
def test_newSession_clears_live_tool_cards(cleanup_test_sessions):
|
||||
"""R15: newSession() must call clearLiveToolCards() so live cards from a
|
||||
previous in-flight session don't persist when starting a fresh conversation.
|
||||
"""
|
||||
src = (REPO_ROOT / "static/sessions.js").read_text()
|
||||
new_sess_idx = src.find("async function newSession(")
|
||||
assert new_sess_idx >= 0
|
||||
# Find end of newSession (next async function)
|
||||
next_fn = src.find("async function ", new_sess_idx + 10)
|
||||
new_sess_body = src[new_sess_idx:next_fn]
|
||||
assert "clearLiveToolCards" in new_sess_body, "newSession() must call clearLiveToolCards() to clear stale live cards"
|
||||
437
tests/test_sprint1.py
Normal file
437
tests/test_sprint1.py
Normal file
@@ -0,0 +1,437 @@
|
||||
"""
|
||||
Sprint 1 test suite for the Hermes WebUI.
|
||||
|
||||
Tests use the ISOLATED test server running on http://127.0.0.1:8788.
|
||||
Production server (port 8787) and your real conversations are never touched.
|
||||
Start the server before running:
|
||||
<repo>/start.sh
|
||||
# wait 2 seconds
|
||||
pytest webui-mvp/tests/test_sprint1.py -v
|
||||
|
||||
All tests are HTTP-level: they call real API endpoints and verify responses.
|
||||
No mocking required for session CRUD, upload parser, or approval API.
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
import tempfile
|
||||
import pathlib
|
||||
|
||||
# Allow importing server modules directly for unit tests
|
||||
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent.parent))
|
||||
|
||||
BASE = "http://127.0.0.1:8788" # test server (isolated from production)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# HTTP helpers
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def get(path):
|
||||
url = BASE + path
|
||||
with urllib.request.urlopen(url, timeout=10) as r:
|
||||
return json.loads(r.read())
|
||||
|
||||
|
||||
def post(path, body=None):
|
||||
url = BASE + path
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(url, data=data,
|
||||
headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
|
||||
def post_multipart(path, fields, files):
|
||||
"""Post a multipart/form-data request. files: {name: (filename, bytes)}"""
|
||||
boundary = uuid.uuid4().hex.encode()
|
||||
body = b""
|
||||
for name, value in fields.items():
|
||||
body += b"--" + boundary + b"\r\n"
|
||||
body += f"Content-Disposition: form-data; name=\"{name}\"\r\n\r\n".encode()
|
||||
body += value.encode() + b"\r\n"
|
||||
for name, (filename, data) in files.items():
|
||||
body += b"--" + boundary + b"\r\n"
|
||||
body += f"Content-Disposition: form-data; name=\"{name}\"; filename=\"{filename}\"\r\n".encode()
|
||||
body += b"Content-Type: application/octet-stream\r\n\r\n"
|
||||
body += data + b"\r\n"
|
||||
body += b"--" + boundary + b"--\r\n"
|
||||
req = urllib.request.Request(BASE + path, data=body,
|
||||
headers={"Content-Type": f"multipart/form-data; boundary={boundary.decode()}"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
"""Create a session and register it with the cleanup fixture."""
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Health check (prerequisite for all tests)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_health():
|
||||
"""Server must be running and healthy."""
|
||||
data = get("/health")
|
||||
assert data["status"] == "ok", f"health not ok: {data}"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# B11: /api/session GET footgun fix
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_session_get_no_id_returns_400():
|
||||
"""B11: GET /api/session with no session_id must return 400, not silently create."""
|
||||
try:
|
||||
data = get("/api/session")
|
||||
# If we get here, the server returned 200 (old broken behavior)
|
||||
assert False, f"Expected 400 but got 200: {data}"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400, f"Expected 400, got {e.code}"
|
||||
body = json.loads(e.read())
|
||||
assert "error" in body
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Session CRUD
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_session_create_and_load():
|
||||
"""Create a session, verify it appears in /api/sessions, load it."""
|
||||
data, status = post("/api/session/new", {"model": "openai/gpt-5.4-mini"})
|
||||
assert status == 200, f"Expected 200, got {status}: {data}"
|
||||
assert "session" in data
|
||||
sid = data["session"]["session_id"]
|
||||
assert len(sid) == 12 # uuid4().hex[:12]
|
||||
|
||||
# Give it a title so it's visible in the session list (empty Untitled sessions are filtered)
|
||||
post("/api/session/rename", {"session_id": sid, "title": "test-create-verify"})
|
||||
|
||||
# Verify it appears in /api/sessions list
|
||||
sessions = get("/api/sessions")
|
||||
sids = [s["session_id"] for s in sessions["sessions"]]
|
||||
assert sid in sids, f"New session {sid} not in sessions list"
|
||||
|
||||
# Load it directly
|
||||
loaded = get(f"/api/session?session_id={sid}")
|
||||
assert loaded["session"]["session_id"] == sid
|
||||
assert loaded["session"]["messages"] == []
|
||||
|
||||
# Cleanup
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
|
||||
|
||||
def test_session_update():
|
||||
"""Create session, update workspace and model, verify persisted."""
|
||||
data, _ = post("/api/session/new", {})
|
||||
sid = data["session"]["session_id"]
|
||||
|
||||
updated, status = post("/api/session/update", {
|
||||
"session_id": sid,
|
||||
"workspace": "/tmp",
|
||||
"model": "anthropic/claude-sonnet-4.6"
|
||||
})
|
||||
assert status == 200
|
||||
assert updated["session"]["model"] == "anthropic/claude-sonnet-4.6"
|
||||
|
||||
# Reload and verify persistence
|
||||
reloaded = get(f"/api/session?session_id={sid}")
|
||||
assert reloaded["session"]["model"] == "anthropic/claude-sonnet-4.6"
|
||||
|
||||
|
||||
def test_session_delete():
|
||||
"""Create session, delete it, verify it no longer loads."""
|
||||
data, _ = post("/api/session/new", {})
|
||||
sid = data["session"]["session_id"]
|
||||
|
||||
result, status = post("/api/session/delete", {"session_id": sid})
|
||||
assert status == 200
|
||||
assert result.get("ok") is True
|
||||
|
||||
# Trying to load it should now 404/500 (KeyError -> 500 in current handler)
|
||||
try:
|
||||
get(f"/api/session?session_id={sid}")
|
||||
assert False, "Expected error loading deleted session"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code in (404, 500), f"Expected 404 or 500, got {e.code}"
|
||||
|
||||
|
||||
def test_session_delete_nonexistent():
|
||||
"""Deleting a nonexistent session should return ok:True (idempotent)."""
|
||||
result, status = post("/api/session/delete", {"session_id": "doesnotexist"})
|
||||
assert status == 200
|
||||
assert result.get("ok") is True
|
||||
|
||||
|
||||
def test_sessions_list_sorted():
|
||||
"""Sessions list should be sorted most-recently-updated first."""
|
||||
# Create two sessions with a title so they're visible (empty Untitled sessions are filtered)
|
||||
a, _ = post("/api/session/new", {})
|
||||
time.sleep(0.05)
|
||||
b, _ = post("/api/session/new", {})
|
||||
sid_a = a["session"]["session_id"]
|
||||
sid_b = b["session"]["session_id"]
|
||||
post("/api/session/rename", {"session_id": sid_a, "title": "test-sort-a"})
|
||||
time.sleep(0.05)
|
||||
post("/api/session/rename", {"session_id": sid_b, "title": "test-sort-b"})
|
||||
|
||||
sessions = get("/api/sessions")
|
||||
sids = [s["session_id"] for s in sessions["sessions"]]
|
||||
|
||||
# b was updated more recently, should appear before a
|
||||
assert sids.index(sid_b) < sids.index(sid_a), \
|
||||
"Sessions not sorted by updated_at desc"
|
||||
|
||||
# Cleanup
|
||||
post("/api/session/delete", {"session_id": sid_a})
|
||||
post("/api/session/delete", {"session_id": sid_b})
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Upload parser unit tests (pure function, no HTTP)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_parse_multipart_text_file():
|
||||
"""parse_multipart correctly parses a text file field."""
|
||||
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent.parent))
|
||||
# Import the function directly from the server module
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"server",
|
||||
str(pathlib.Path(__file__).parent.parent / "server.py")
|
||||
)
|
||||
# We only need parse_multipart; import it without running the server
|
||||
# Parse manually by reading the source and exec only the function
|
||||
src = pathlib.Path(__file__).parent.parent.joinpath("api/upload.py").read_text()
|
||||
# Extract and exec parse_multipart
|
||||
import re
|
||||
# Find the function
|
||||
m = re.search(r"(def parse_multipart\(.*?)(?=\ndef )", src, re.DOTALL)
|
||||
assert m, "Could not find parse_multipart in server.py"
|
||||
ns = {}
|
||||
exec("import re as _re, email.parser as _ep\n" + m.group(1), ns)
|
||||
parse_multipart = ns["parse_multipart"]
|
||||
|
||||
# Build a minimal multipart body
|
||||
boundary = b"testboundary"
|
||||
body = (
|
||||
b"--testboundary\r\n"
|
||||
b"Content-Disposition: form-data; name=\"session_id\"\r\n\r\n"
|
||||
b"abc123\r\n"
|
||||
b"--testboundary\r\n"
|
||||
b"Content-Disposition: form-data; name=\"file\"; filename=\"hello.txt\"\r\n"
|
||||
b"Content-Type: text/plain\r\n\r\n"
|
||||
b"hello world\r\n"
|
||||
b"--testboundary--\r\n"
|
||||
)
|
||||
fields, files = parse_multipart(
|
||||
io.BytesIO(body),
|
||||
"multipart/form-data; boundary=testboundary",
|
||||
len(body)
|
||||
)
|
||||
assert fields.get("session_id") == "abc123", f"fields: {fields}"
|
||||
assert "file" in files, f"files: {files}"
|
||||
filename, content = files["file"]
|
||||
assert filename == "hello.txt"
|
||||
assert content == b"hello world"
|
||||
|
||||
|
||||
def test_parse_multipart_binary_file():
|
||||
"""parse_multipart handles binary (PNG header bytes) without corruption."""
|
||||
src = pathlib.Path(__file__).parent.parent.joinpath("api/upload.py").read_text()
|
||||
import re
|
||||
m = re.search(r"(def parse_multipart\(.*?)(?=\ndef )", src, re.DOTALL)
|
||||
ns = {}
|
||||
exec("import re as _re, email.parser as _ep\n" + m.group(1), ns)
|
||||
parse_multipart = ns["parse_multipart"]
|
||||
|
||||
# Fake PNG: first 8 bytes of PNG magic
|
||||
png_magic = b"\x89PNG\r\n\x1a\n"
|
||||
boundary = b"binboundary"
|
||||
body = (
|
||||
b"--binboundary\r\n"
|
||||
b"Content-Disposition: form-data; name=\"session_id\"\r\n\r\n"
|
||||
b"sess1\r\n"
|
||||
b"--binboundary\r\n"
|
||||
b"Content-Disposition: form-data; name=\"file\"; filename=\"test.png\"\r\n"
|
||||
b"Content-Type: image/png\r\n\r\n" + png_magic + b"\r\n"
|
||||
b"--binboundary--\r\n"
|
||||
)
|
||||
fields, files = parse_multipart(
|
||||
io.BytesIO(body),
|
||||
"multipart/form-data; boundary=binboundary",
|
||||
len(body)
|
||||
)
|
||||
assert "file" in files
|
||||
filename, content = files["file"]
|
||||
assert filename == "test.png"
|
||||
assert content == png_magic, f"Binary content corrupted: {content!r}"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# File upload via HTTP
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_upload_text_file(cleanup_test_sessions):
|
||||
"""Upload a text file to a session workspace, verify it appears in /api/list."""
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
|
||||
result, status = post_multipart("/api/upload", {"session_id": sid}, {
|
||||
"file": ("test_upload.txt", b"sprint1 test content")
|
||||
})
|
||||
assert status == 200, f"Upload failed {status}: {result}"
|
||||
assert "filename" in result
|
||||
assert result["size"] == len(b"sprint1 test content")
|
||||
|
||||
# Verify file appears in listing
|
||||
listing = get(f"/api/list?session_id={sid}&path=.")
|
||||
names = [e["name"] for e in listing["entries"]]
|
||||
assert result["filename"] in names, f"{result['filename']} not in {names}"
|
||||
# Cleanup the uploaded file
|
||||
post("/api/file/delete", {"session_id": sid, "path": result["filename"]})
|
||||
|
||||
|
||||
def test_upload_too_large(cleanup_test_sessions):
|
||||
"""Uploading a file over MAX_UPLOAD_BYTES is rejected (413 or connection closed)."""
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
|
||||
# 21MB > 20MB limit
|
||||
big = b"x" * (21 * 1024 * 1024)
|
||||
try:
|
||||
result, status = post_multipart("/api/upload", {"session_id": sid}, {
|
||||
"file": ("big.bin", big)
|
||||
})
|
||||
# If we get a response it should be 413
|
||||
assert status == 413, f"Expected 413, got {status}: {result}"
|
||||
except (urllib.error.URLError, ConnectionResetError, BrokenPipeError):
|
||||
# Server closed connection after reading Content-Length > limit before body
|
||||
# This is also valid rejection behavior
|
||||
pass
|
||||
|
||||
|
||||
def test_upload_no_file_field(cleanup_test_sessions):
|
||||
"""Upload with no file field returns 400."""
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post_multipart("/api/upload", {"session_id": sid}, {})
|
||||
assert status == 400, f"Expected 400, got {status}: {result}"
|
||||
|
||||
|
||||
def test_upload_bad_session():
|
||||
"""Upload to nonexistent session returns 404."""
|
||||
result, status = post_multipart("/api/upload", {"session_id": "nosuchsession"}, {
|
||||
"file": ("x.txt", b"data")
|
||||
})
|
||||
assert status == 404, f"Expected 404, got {status}: {result}"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Approval API
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_approval_pending_none():
|
||||
"""GET /api/approval/pending for a session with no pending entry returns null."""
|
||||
data = get("/api/approval/pending?session_id=no_such_session")
|
||||
assert data["pending"] is None
|
||||
|
||||
|
||||
def test_approval_submit_and_respond():
|
||||
"""Inject a pending approval via server endpoint, retrieve it, respond with deny."""
|
||||
test_sid = f"test-approval-{uuid.uuid4().hex[:6]}"
|
||||
cmd = "rm -rf /tmp/testdir"
|
||||
key = "recursive_delete"
|
||||
|
||||
# Inject into server process via test endpoint (shared module state)
|
||||
inject = get(f"/api/approval/inject_test?session_id={urllib.parse.quote(test_sid)}&pattern_key={key}&command={urllib.parse.quote(cmd)}")
|
||||
assert inject["ok"] is True
|
||||
|
||||
# Poll should now show the pending entry
|
||||
data = get(f"/api/approval/pending?session_id={urllib.parse.quote(test_sid)}")
|
||||
assert data["pending"] is not None, "Pending entry not visible after inject"
|
||||
assert data["pending"]["command"] == cmd
|
||||
|
||||
# Respond with deny
|
||||
result, status = post("/api/approval/respond", {
|
||||
"session_id": test_sid,
|
||||
"choice": "deny"
|
||||
})
|
||||
assert status == 200
|
||||
assert result["ok"] is True
|
||||
assert result["choice"] == "deny"
|
||||
|
||||
# Pending should be gone
|
||||
data2 = get(f"/api/approval/pending?session_id={urllib.parse.quote(test_sid)}")
|
||||
assert data2["pending"] is None, "Pending entry should be cleared after respond"
|
||||
|
||||
|
||||
def test_approval_respond_allow_session():
|
||||
"""Inject pending entry, respond with session choice, verify cleared (approved)."""
|
||||
test_sid = f"test-approval-sess-{uuid.uuid4().hex[:6]}"
|
||||
|
||||
inject = get(f"/api/approval/inject_test?session_id={urllib.parse.quote(test_sid)}&pattern_key=force_kill&command=pkill+-9+someproc")
|
||||
assert inject["ok"] is True
|
||||
|
||||
result, status = post("/api/approval/respond", {
|
||||
"session_id": test_sid,
|
||||
"choice": "session"
|
||||
})
|
||||
assert status == 200
|
||||
assert result["ok"] is True
|
||||
assert result["choice"] == "session"
|
||||
|
||||
# After session approval, pending should be cleared
|
||||
data = get(f"/api/approval/pending?session_id={urllib.parse.quote(test_sid)}")
|
||||
assert data["pending"] is None, "Pending entry should be cleared after session approval"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Stream status endpoint (B4/B5)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_stream_status_unknown_id():
|
||||
"""GET /api/chat/stream/status for unknown stream_id returns active:false."""
|
||||
data = get("/api/chat/stream/status?stream_id=doesnotexist")
|
||||
assert data["active"] is False
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# File browser
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def test_list_dir(cleanup_test_sessions):
|
||||
"""List workspace directory for a session."""
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
listing = get(f"/api/list?session_id={sid}&path=.")
|
||||
assert "entries" in listing
|
||||
assert isinstance(listing["entries"], list)
|
||||
|
||||
|
||||
def test_list_dir_path_traversal(cleanup_test_sessions):
|
||||
"""Path traversal via ../.. should be blocked (500 or 400)."""
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
try:
|
||||
listing = get(f"/api/list?session_id={sid}&path=../../etc")
|
||||
# If server returns entries outside workspace root, that is a bug
|
||||
# (safe_resolve should raise ValueError)
|
||||
assert False, f"Expected error for path traversal, got: {listing}"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code in (400, 404, 500), f"Expected 400/404/500 for traversal, got {e.code}"
|
||||
139
tests/test_sprint10.py
Normal file
139
tests/test_sprint10.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""
|
||||
Sprint 10 Tests: server.py split, cancel endpoint, cron history, tool card polish.
|
||||
"""
|
||||
import json, pathlib, urllib.error, urllib.request, urllib.parse
|
||||
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
|
||||
|
||||
BASE = "http://127.0.0.1:8788"
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def get_text(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read().decode(), r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data,
|
||||
headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session(created_list):
|
||||
d, _ = post("/api/session/new", {})
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid
|
||||
|
||||
# ── server.py split: api/ modules served / importable ─────────────────────
|
||||
|
||||
def test_health_still_works(cleanup_test_sessions):
|
||||
data, status = get("/health")
|
||||
assert status == 200
|
||||
assert data["status"] == "ok"
|
||||
assert "uptime_seconds" in data
|
||||
assert "active_streams" in data
|
||||
|
||||
def test_api_modules_exist(cleanup_test_sessions):
|
||||
"""All api/ module files must exist on disk."""
|
||||
base = REPO_ROOT / "api"
|
||||
for mod in ["__init__.py", "config.py", "helpers.py", "models.py",
|
||||
"workspace.py", "upload.py", "streaming.py"]:
|
||||
assert (base / mod).exists(), f"Missing api/{mod}"
|
||||
|
||||
def test_server_py_under_750_lines(cleanup_test_sessions):
|
||||
"""server.py should be under 750 lines after the split."""
|
||||
lines = len((REPO_ROOT / "server.py").read_text().splitlines())
|
||||
assert lines < 750, f"server.py is {lines} lines -- split may not have landed"
|
||||
|
||||
def test_api_config_has_cancel_flags(cleanup_test_sessions):
|
||||
src = (REPO_ROOT / "api/config.py").read_text()
|
||||
assert "CANCEL_FLAGS" in src
|
||||
assert "STREAMS" in src
|
||||
|
||||
def test_session_crud_still_works(cleanup_test_sessions):
|
||||
"""Full session lifecycle works after split."""
|
||||
created = []
|
||||
sid = make_session(created)
|
||||
data, status = get(f"/api/session?session_id={urllib.parse.quote(sid)}")
|
||||
assert status == 200
|
||||
assert data["session"]["session_id"] == sid
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
|
||||
def test_static_files_still_served(cleanup_test_sessions):
|
||||
for f in ["ui.js", "workspace.js", "sessions.js", "messages.js", "panels.js", "boot.js"]:
|
||||
src, status = get_text(f"/static/{f}")
|
||||
assert status == 200, f"/static/{f} returned {status}"
|
||||
assert len(src) > 100
|
||||
|
||||
# ── Cancel endpoint ────────────────────────────────────────────────────────
|
||||
|
||||
def test_cancel_requires_stream_id(cleanup_test_sessions):
|
||||
try:
|
||||
data, status = get("/api/chat/cancel")
|
||||
assert status == 400
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_cancel_nonexistent_stream(cleanup_test_sessions):
|
||||
data, status = get("/api/chat/cancel?stream_id=nonexistent_xyz")
|
||||
assert status == 200
|
||||
assert data["ok"] is True
|
||||
assert data["cancelled"] is False
|
||||
|
||||
def test_cancel_button_in_html(cleanup_test_sessions):
|
||||
src, _ = get_text("/")
|
||||
assert "btnCancel" in src
|
||||
assert "cancelStream" in src
|
||||
|
||||
def test_cancel_function_in_boot_js(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/boot.js")
|
||||
assert "async function cancelStream(" in src
|
||||
assert "/api/chat/cancel" in src
|
||||
|
||||
# ── Cron history ───────────────────────────────────────────────────────────
|
||||
|
||||
def test_crons_output_limit_param(cleanup_test_sessions):
|
||||
"""Server accepts limit parameter > 1."""
|
||||
data, status = get("/api/crons/output?job_id=nonexistent&limit=20")
|
||||
# 404 or 200 with empty -- both acceptable for nonexistent job
|
||||
assert status in (200, 404)
|
||||
|
||||
def test_cron_history_button_in_panels_js(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/panels.js")
|
||||
assert "loadCronHistory" in src
|
||||
assert "All runs" in src
|
||||
|
||||
def test_cron_output_snippet_helper(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/panels.js")
|
||||
assert "_cronOutputSnippet" in src
|
||||
|
||||
# ── Tool card polish ───────────────────────────────────────────────────────
|
||||
|
||||
def test_tool_card_running_dot_in_css(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/style.css")
|
||||
assert "tool-card-running-dot" in src
|
||||
|
||||
def test_tool_card_show_more_in_ui_js(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/ui.js")
|
||||
assert "Show more" in src
|
||||
assert "tool-card-more" in src
|
||||
|
||||
def test_tool_card_smart_truncation_in_ui_js(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/ui.js")
|
||||
assert "displaySnippet" in src
|
||||
assert "lastBreak" in src
|
||||
|
||||
def test_cancel_sse_event_handler_in_messages_js(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/messages.js")
|
||||
assert "addEventListener('cancel'" in src
|
||||
assert "Task cancelled" in src
|
||||
|
||||
def test_active_stream_id_tracked(cleanup_test_sessions):
|
||||
src, _ = get_text("/static/messages.js")
|
||||
assert "S.activeStreamId" in src
|
||||
106
tests/test_sprint2.py
Normal file
106
tests/test_sprint2.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Sprint 2 tests: image preview, file types, markdown. Uses cleanup_test_sessions fixture."""
|
||||
import io, json, uuid, urllib.request, urllib.error, pathlib
|
||||
|
||||
BASE = "http://127.0.0.1:8788" # test server (isolated from production)
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def get_raw(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read(), r.headers.get('Content-Type', ''), r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
"""Create a session and register it with the cleanup fixture."""
|
||||
import pathlib as _pathlib
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, _pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
|
||||
|
||||
def test_raw_endpoint_serves_png(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
png = (b"\x89PNG\r\n\x1a\n" b"\x00\x00\x00\rIHDR\x00\x00\x00\x01"
|
||||
b"\x00\x00\x00\x01\x08\x02\x00\x00\x00"
|
||||
b"\x90wS\xde\x00\x00\x00\x0cIDATx\x9cc"
|
||||
b"\xf8\x0f\x00\x00\x01\x01\x00\x05\x18"
|
||||
b"\xd8N\x00\x00\x00\x00IEND\xaeB`\x82")
|
||||
(ws / "test.png").write_bytes(png)
|
||||
raw, ct, status = get_raw(f"/api/file/raw?session_id={sid}&path=test.png")
|
||||
assert status == 200
|
||||
assert "image/png" in ct
|
||||
assert raw == png
|
||||
|
||||
def test_raw_endpoint_serves_jpeg(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
jpeg = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xd9"
|
||||
(ws / "photo.jpg").write_bytes(jpeg)
|
||||
raw, ct, status = get_raw(f"/api/file/raw?session_id={sid}&path=photo.jpg")
|
||||
assert status == 200
|
||||
assert "image/jpeg" in ct
|
||||
|
||||
def test_raw_endpoint_serves_svg(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
svg = b"<svg xmlns=\"http://www.w3.org/2000/svg\" width=\"100\" height=\"100\"><circle/></svg>"
|
||||
(ws / "icon.svg").write_bytes(svg)
|
||||
raw, ct, status = get_raw(f"/api/file/raw?session_id={sid}&path=icon.svg")
|
||||
assert status == 200
|
||||
assert "image/svg" in ct
|
||||
|
||||
def test_raw_endpoint_path_traversal_blocked(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
try:
|
||||
get_raw(f"/api/file/raw?session_id={sid}&path=../../etc/passwd")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code in (400, 500)
|
||||
|
||||
def test_raw_endpoint_missing_file_returns_404(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
try:
|
||||
get_raw(f"/api/file/raw?session_id={sid}&path=no_such_file.png")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code in (404, 500)
|
||||
|
||||
def test_md_file_returns_text_via_api_file(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
md = "# Hello\n\nThis is **bold**.\n"
|
||||
(ws / "README.md").write_text(md)
|
||||
data, status = get(f"/api/file?session_id={sid}&path=README.md")
|
||||
assert status == 200
|
||||
assert data["content"] == md
|
||||
|
||||
def test_md_file_with_table(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
md = "| Name | Value |\n|------|-------|\n| foo | bar |\n"
|
||||
(ws / "table.md").write_text(md)
|
||||
data, status = get(f"/api/file?session_id={sid}&path=table.md")
|
||||
assert status == 200
|
||||
assert "| Name | Value |" in data["content"]
|
||||
|
||||
def test_file_listing_includes_images(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
(ws / "photo.png").write_bytes(b"fake png")
|
||||
(ws / "notes.md").write_text("# Notes")
|
||||
(ws / "script.py").write_text("print('hello')")
|
||||
data, status = get(f"/api/list?session_id={sid}&path=.")
|
||||
assert status == 200
|
||||
names = {e["name"]: e for e in data["entries"]}
|
||||
assert "photo.png" in names
|
||||
assert "notes.md" in names
|
||||
assert "script.py" in names
|
||||
144
tests/test_sprint3.py
Normal file
144
tests/test_sprint3.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""Sprint 3 tests: cron API, skills API, memory API, input validation."""
|
||||
import json, uuid, urllib.request, urllib.error
|
||||
|
||||
BASE = "http://127.0.0.1:8788" # test server (isolated from production)
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
"""Create a session and register it with the cleanup fixture."""
|
||||
import pathlib as _pathlib
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, _pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
|
||||
def test_crons_list():
|
||||
data, status = get("/api/crons")
|
||||
assert status == 200
|
||||
assert "jobs" in data
|
||||
|
||||
def test_crons_list_has_required_fields():
|
||||
data, _ = get("/api/crons")
|
||||
if not data["jobs"]: return
|
||||
job = data["jobs"][0]
|
||||
for field in ("id", "name", "prompt", "enabled", "schedule_display"):
|
||||
assert field in job
|
||||
|
||||
def test_crons_output_requires_job_id():
|
||||
try:
|
||||
get("/api/crons/output")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_crons_output_real_job():
|
||||
data, _ = get("/api/crons")
|
||||
if not data["jobs"]: return
|
||||
job_id = data["jobs"][0]["id"]
|
||||
out, status = get(f"/api/crons/output?job_id={job_id}&limit=3")
|
||||
assert status == 200
|
||||
assert "outputs" in out
|
||||
|
||||
def test_crons_pause_requires_job_id():
|
||||
result, status = post("/api/crons/pause", {})
|
||||
assert status in (400, 404)
|
||||
|
||||
def test_crons_resume_requires_job_id():
|
||||
result, status = post("/api/crons/resume", {})
|
||||
assert status in (400, 404)
|
||||
|
||||
def test_crons_run_nonexistent():
|
||||
result, status = post("/api/crons/run", {"job_id": "doesnotexist999"})
|
||||
assert status == 404
|
||||
|
||||
def test_skills_list():
|
||||
data, status = get("/api/skills")
|
||||
assert status == 200
|
||||
assert len(data["skills"]) > 0
|
||||
|
||||
def test_skills_list_has_required_fields():
|
||||
data, _ = get("/api/skills")
|
||||
skill = data["skills"][0]
|
||||
assert "name" in skill and "description" in skill
|
||||
|
||||
def test_skills_content_known():
|
||||
data, status = get("/api/skills/content?name=dogfood")
|
||||
assert status == 200
|
||||
assert len(data["content"]) > 0
|
||||
|
||||
def test_skills_content_requires_name():
|
||||
try:
|
||||
get("/api/skills/content")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_skills_search_returns_subset():
|
||||
data, _ = get("/api/skills")
|
||||
assert len(data["skills"]) > 5
|
||||
|
||||
def test_memory_returns_both_files():
|
||||
data, status = get("/api/memory")
|
||||
assert status == 200
|
||||
assert "memory" in data and "user" in data
|
||||
|
||||
def test_memory_content_is_string():
|
||||
data, _ = get("/api/memory")
|
||||
assert isinstance(data["memory"], str)
|
||||
assert isinstance(data["user"], str)
|
||||
|
||||
def test_memory_has_mtime():
|
||||
data, _ = get("/api/memory")
|
||||
assert "memory_mtime" in data and "user_mtime" in data
|
||||
|
||||
def test_session_update_requires_session_id():
|
||||
result, status = post("/api/session/update", {"model": "openai/gpt-5.4-mini"})
|
||||
assert status == 400
|
||||
|
||||
def test_session_delete_requires_session_id():
|
||||
result, status = post("/api/session/delete", {})
|
||||
assert status == 400
|
||||
|
||||
def test_chat_start_requires_session_id():
|
||||
result, status = post("/api/chat/start", {"message": "hello"})
|
||||
assert status == 400
|
||||
|
||||
def test_chat_start_requires_message(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/chat/start", {"session_id": sid, "message": ""})
|
||||
assert status == 400
|
||||
|
||||
def test_session_update_unknown_id_returns_404():
|
||||
result, status = post("/api/session/update", {"session_id": "nosuchsession", "model": "openai/gpt-5.4-mini"})
|
||||
assert status == 404
|
||||
|
||||
def test_session_search_returns_matches(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
post("/api/session/rename", {"session_id": sid, "title": f"unique-s3-{sid}"})
|
||||
data, status = get(f"/api/sessions/search?q=unique-s3-{sid}")
|
||||
assert status == 200
|
||||
sids = [s["session_id"] for s in data["sessions"]]
|
||||
assert sid in sids
|
||||
|
||||
def test_session_search_empty_query_returns_all():
|
||||
data, status = get("/api/sessions/search?q=")
|
||||
assert status == 200 and "sessions" in data
|
||||
|
||||
def test_session_search_no_results():
|
||||
data, status = get("/api/sessions/search?q=zzznomatchzzz9999")
|
||||
assert status == 200 and data["sessions"] == []
|
||||
156
tests/test_sprint4.py
Normal file
156
tests/test_sprint4.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Sprint 4 tests: relocation, session rename, search, file ops, validation."""
|
||||
import json, pathlib, uuid, urllib.request, urllib.error
|
||||
|
||||
BASE = "http://127.0.0.1:8788" # test server (isolated from production)
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def get_raw(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read(), r.headers.get("Content-Type",""), r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
"""Create a session and register it with the cleanup fixture."""
|
||||
import pathlib as _pathlib
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, _pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
|
||||
def test_server_running_from_new_location():
|
||||
data, status = get("/health")
|
||||
assert status == 200 and data["status"] == "ok"
|
||||
|
||||
def test_static_css_served():
|
||||
raw, ct, status = get_raw("/static/style.css")
|
||||
assert status == 200 and "text/css" in ct and b"--bg" in raw
|
||||
|
||||
def test_static_unknown_file_404():
|
||||
try:
|
||||
get_raw("/static/doesnotexist.xyz")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 404
|
||||
|
||||
def test_session_rename(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/session/rename", {"session_id": sid, "title": "Renamed Session"})
|
||||
assert status == 200 and result["session"]["title"] == "Renamed Session"
|
||||
|
||||
def test_session_rename_persists(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
post("/api/session/rename", {"session_id": sid, "title": "Persisted"})
|
||||
loaded, _ = get(f"/api/session?session_id={sid}")
|
||||
assert loaded["session"]["title"] == "Persisted"
|
||||
|
||||
def test_session_rename_truncates(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/session/rename", {"session_id": sid, "title": "A" * 200})
|
||||
assert status == 200 and len(result["session"]["title"]) <= 80
|
||||
|
||||
def test_session_rename_requires_fields():
|
||||
result, status = post("/api/session/rename", {"session_id": "x"})
|
||||
assert status == 400
|
||||
result2, status2 = post("/api/session/rename", {"title": "hi"})
|
||||
assert status2 == 400
|
||||
|
||||
def test_session_rename_unknown_id():
|
||||
result, status = post("/api/session/rename", {"session_id": "nosuchid", "title": "hi"})
|
||||
assert status == 404
|
||||
|
||||
def test_session_search_returns_matches(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
uid = uuid.uuid4().hex[:8]
|
||||
post("/api/session/rename", {"session_id": sid, "title": f"s4-search-{uid}"})
|
||||
data, status = get(f"/api/sessions/search?q=s4-search-{uid}")
|
||||
assert status == 200
|
||||
sids = [s["session_id"] for s in data["sessions"]]
|
||||
assert sid in sids
|
||||
|
||||
def test_session_search_empty_query_returns_all():
|
||||
data, status = get("/api/sessions/search?q=")
|
||||
assert status == 200 and "sessions" in data
|
||||
|
||||
def test_session_search_no_results():
|
||||
data, status = get("/api/sessions/search?q=zzznomatchzzz9999")
|
||||
assert status == 200 and data["sessions"] == []
|
||||
|
||||
def test_file_create(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
fname = f"test_{uuid.uuid4().hex[:6]}.txt"
|
||||
result, status = post("/api/file/create", {"session_id": sid, "path": fname, "content": "hello sprint4"})
|
||||
assert status == 200 and result["ok"] is True
|
||||
assert (ws / fname).read_text() == "hello sprint4"
|
||||
|
||||
def test_file_create_requires_fields(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/file/create", {"session_id": sid})
|
||||
assert status == 400
|
||||
result2, status2 = post("/api/file/create", {"path": "x.txt"})
|
||||
assert status2 == 400
|
||||
|
||||
def test_file_create_duplicate_rejected(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
fname = f"dup_{uuid.uuid4().hex[:6]}.txt"
|
||||
post("/api/file/create", {"session_id": sid, "path": fname, "content": ""})
|
||||
result, status = post("/api/file/create", {"session_id": sid, "path": fname, "content": ""})
|
||||
assert status == 400
|
||||
|
||||
def test_file_delete(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
(ws / "to_delete.txt").write_text("bye")
|
||||
result, status = post("/api/file/delete", {"session_id": sid, "path": "to_delete.txt"})
|
||||
assert status == 200 and not (ws / "to_delete.txt").exists()
|
||||
|
||||
def test_file_delete_missing_returns_404(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/file/delete", {"session_id": sid, "path": "nosuchfile.txt"})
|
||||
assert status == 404
|
||||
|
||||
def test_file_delete_path_traversal_blocked(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/file/delete", {"session_id": sid, "path": "../../etc/passwd"})
|
||||
assert status in (400, 500)
|
||||
|
||||
def test_list_requires_session_id():
|
||||
try:
|
||||
get("/api/list?path=.")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_file_requires_session_id():
|
||||
try:
|
||||
get("/api/file?path=readme.txt")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_file_requires_path(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
try:
|
||||
get(f"/api/file?session_id={sid}")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_new_session_inherits_workspace(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
post("/api/session/update", {"session_id": sid, "workspace": "/tmp", "model": "openai/gpt-5.4-mini"})
|
||||
sid2, _ = make_session_tracked(cleanup_test_sessions)
|
||||
data, _ = get(f"/api/session?session_id={sid2}")
|
||||
assert data["session"]["workspace"] == "/tmp"
|
||||
140
tests/test_sprint5.py
Normal file
140
tests/test_sprint5.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""Sprint 5 tests: workspace CRUD, file save, session index, JS serving."""
|
||||
import json, pathlib, uuid, urllib.request, urllib.error
|
||||
|
||||
BASE = "http://127.0.0.1:8788" # test server (isolated from production)
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def get_raw(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read(), r.headers.get("Content-Type",""), r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
"""Create a session and register it with the cleanup fixture."""
|
||||
import pathlib as _pathlib
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, _pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
|
||||
def test_server_running_from_new_location():
|
||||
data, status = get("/health")
|
||||
assert status == 200 and data["status"] == "ok"
|
||||
|
||||
def test_app_js_served():
|
||||
"""Sprint 9: app.js replaced by modules. Verify ui.js (contains renderMd) is served."""
|
||||
raw, ct, status = get_raw("/static/ui.js")
|
||||
assert status == 200 and "javascript" in ct and b"renderMd" in raw
|
||||
|
||||
def test_workspaces_list():
|
||||
data, status = get("/api/workspaces")
|
||||
assert status == 200 and "workspaces" in data and "last" in data
|
||||
|
||||
def test_workspace_add_valid():
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
result, status = post("/api/workspaces/add", {"path": "/tmp", "name": "Temp"})
|
||||
assert status == 200 and any(w["path"]=="/tmp" for w in result["workspaces"])
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
|
||||
def test_workspace_add_validates_existence():
|
||||
result, status = post("/api/workspaces/add", {"path": "/tmp/does_not_exist_xyz_999"})
|
||||
assert status == 400
|
||||
|
||||
def test_workspace_add_validates_is_dir():
|
||||
result, status = post("/api/workspaces/add", {"path": "/etc/hostname"})
|
||||
assert status == 400
|
||||
|
||||
def test_workspace_add_no_duplicate():
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
post("/api/workspaces/add", {"path": "/tmp"})
|
||||
result, status = post("/api/workspaces/add", {"path": "/tmp"})
|
||||
assert status == 400 and "already" in result.get("error","").lower()
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
|
||||
def test_workspace_add_requires_path():
|
||||
result, status = post("/api/workspaces/add", {})
|
||||
assert status == 400
|
||||
|
||||
def test_workspace_remove():
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
post("/api/workspaces/add", {"path": "/tmp", "name": "Temp"})
|
||||
result, status = post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
assert status == 200 and "/tmp" not in [w["path"] for w in result["workspaces"]]
|
||||
|
||||
def test_workspace_rename():
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
post("/api/workspaces/add", {"path": "/tmp", "name": "Temp"})
|
||||
result, status = post("/api/workspaces/rename", {"path": "/tmp", "name": "My Temp"})
|
||||
assert status == 200
|
||||
assert {w["path"]: w["name"] for w in result["workspaces"]}.get("/tmp") == "My Temp"
|
||||
post("/api/workspaces/remove", {"path": "/tmp"})
|
||||
|
||||
def test_workspace_rename_unknown():
|
||||
result, status = post("/api/workspaces/rename", {"path": "/no/such/path", "name": "X"})
|
||||
assert status == 404
|
||||
|
||||
def test_last_workspace_updates_on_session_update(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
post("/api/session/update", {"session_id": sid, "workspace": "/tmp", "model": "openai/gpt-5.4-mini"})
|
||||
data, _ = get("/api/workspaces")
|
||||
assert data["last"] == "/tmp"
|
||||
|
||||
def test_file_save(cleanup_test_sessions):
|
||||
sid, ws = make_session_tracked(cleanup_test_sessions)
|
||||
fname = f"save_{uuid.uuid4().hex[:6]}.txt"
|
||||
(ws / fname).write_text("original content")
|
||||
result, status = post("/api/file/save", {"session_id": sid, "path": fname, "content": "updated"})
|
||||
assert status == 200 and (ws / fname).read_text() == "updated"
|
||||
|
||||
def test_file_save_requires_fields(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/file/save", {"session_id": sid})
|
||||
assert status == 400
|
||||
|
||||
def test_file_save_nonexistent_returns_404(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/file/save", {"session_id": sid, "path": "no_such.txt", "content": ""})
|
||||
assert status == 404
|
||||
|
||||
def test_file_save_path_traversal_blocked(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/file/save", {"session_id": sid, "path": "../../etc/passwd", "content": ""})
|
||||
assert status in (400, 500)
|
||||
|
||||
def test_session_index_created_after_save(cleanup_test_sessions):
|
||||
# Index is created in the TEST state dir, not the production dir
|
||||
test_state_dir = pathlib.Path.home() / ".hermes" / "webui-mvp-test"
|
||||
index_path = test_state_dir / "sessions" / "_index.json"
|
||||
make_session_tracked(cleanup_test_sessions)
|
||||
# Index may not exist yet if cleanup already wiped it -- just check the endpoint works
|
||||
data, status = get("/api/sessions")
|
||||
assert status == 200
|
||||
assert isinstance(data["sessions"], list)
|
||||
|
||||
def test_sessions_endpoint_returns_sorted():
|
||||
data, status = get("/api/sessions")
|
||||
assert status == 200
|
||||
sessions = data["sessions"]
|
||||
if len(sessions) >= 2:
|
||||
assert sessions[0]["updated_at"] >= sessions[1]["updated_at"]
|
||||
|
||||
def test_new_session_inherits_last_workspace(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
post("/api/session/update", {"session_id": sid, "workspace": "/tmp", "model": "openai/gpt-5.4-mini"})
|
||||
sid2, _ = make_session_tracked(cleanup_test_sessions)
|
||||
d, _ = get(f"/api/session?session_id={sid2}")
|
||||
assert d["session"]["workspace"] == "/tmp"
|
||||
151
tests/test_sprint6.py
Normal file
151
tests/test_sprint6.py
Normal file
@@ -0,0 +1,151 @@
|
||||
"""Sprint 6 tests: Escape from editor, Phase D validation, HTML extraction, cron create, session export."""
|
||||
import json, uuid, pathlib, urllib.request, urllib.error
|
||||
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
|
||||
|
||||
BASE = "http://127.0.0.1:8788" # isolated test server
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
def get_raw(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read(), r.headers, r.status
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
# ── Phase E: HTML served from static/index.html ──
|
||||
|
||||
def test_index_html_served():
|
||||
raw, headers, status = get_raw("/")
|
||||
assert status == 200
|
||||
assert b"sidebarResize" in raw, "Resize handle not found in HTML"
|
||||
assert b"cronCreateForm" in raw, "Cron create form not found in HTML"
|
||||
assert b"btnExportJSON" in raw, "Export JSON button not found in HTML"
|
||||
|
||||
def test_index_html_file_exists():
|
||||
p = REPO_ROOT / "static/index.html"
|
||||
assert p.exists(), "static/index.html does not exist"
|
||||
assert p.stat().st_size > 5000, "index.html seems too small"
|
||||
|
||||
def test_server_py_has_no_html_string():
|
||||
txt = (REPO_ROOT / "server.py").read_text()
|
||||
assert 'HTML = r"""' not in txt, "server.py still contains inline HTML string"
|
||||
assert "doctype html" not in txt.lower(), "server.py still contains raw HTML"
|
||||
|
||||
# ── Phase D: remaining endpoint validation ──
|
||||
|
||||
def test_approval_respond_requires_session_id():
|
||||
result, status = post("/api/approval/respond", {"choice": "deny"})
|
||||
assert status == 400
|
||||
|
||||
def test_approval_respond_rejects_invalid_choice(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
result, status = post("/api/approval/respond", {"session_id": sid, "choice": "INVALID"})
|
||||
assert status == 400
|
||||
|
||||
def test_file_raw_requires_session_id():
|
||||
try:
|
||||
get_raw("/api/file/raw?path=test.png")
|
||||
assert False, "Expected 400"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_file_raw_unknown_session():
|
||||
try:
|
||||
get_raw("/api/file/raw?session_id=nosuchsession&path=test.png")
|
||||
assert False, "Expected 404"
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 404
|
||||
|
||||
# ── Cron create ──
|
||||
|
||||
def test_cron_create_requires_prompt():
|
||||
result, status = post("/api/crons/create", {"schedule": "0 9 * * *"})
|
||||
assert status == 400
|
||||
assert "prompt" in result.get("error", "").lower()
|
||||
|
||||
def test_cron_create_requires_schedule():
|
||||
result, status = post("/api/crons/create", {"prompt": "Say hello"})
|
||||
assert status == 400
|
||||
assert "schedule" in result.get("error", "").lower()
|
||||
|
||||
def test_cron_create_invalid_schedule():
|
||||
result, status = post("/api/crons/create", {
|
||||
"prompt": "Say hello", "schedule": "not_a_valid_schedule_xyz"
|
||||
})
|
||||
assert status == 400
|
||||
|
||||
def test_cron_create_success():
|
||||
uid = uuid.uuid4().hex[:6]
|
||||
result, status = post("/api/crons/create", {
|
||||
"name": f"test-job-{uid}",
|
||||
"prompt": "Just say 'hello' and nothing else.",
|
||||
"schedule": "every 999h", # far future -- won't actually run during test
|
||||
"deliver": "local",
|
||||
})
|
||||
assert status == 200, f"Expected 200 got {status}: {result}"
|
||||
assert result["ok"] is True
|
||||
assert "job" in result
|
||||
job_id = result["job"]["id"]
|
||||
# Verify it appears in the cron list
|
||||
jobs, _ = get("/api/crons")
|
||||
ids = [j["id"] for j in jobs["jobs"]]
|
||||
assert job_id in ids, f"Created job {job_id} not in list"
|
||||
|
||||
# ── Session export ──
|
||||
|
||||
def test_session_export_requires_session_id():
|
||||
try:
|
||||
get_raw("/api/session/export")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 400
|
||||
|
||||
def test_session_export_unknown_session():
|
||||
try:
|
||||
get_raw("/api/session/export?session_id=nosuchsession")
|
||||
assert False
|
||||
except urllib.error.HTTPError as e:
|
||||
assert e.code == 404
|
||||
|
||||
def test_session_export_returns_json(cleanup_test_sessions):
|
||||
sid, _ = make_session_tracked(cleanup_test_sessions)
|
||||
raw, headers, status = get_raw(f"/api/session/export?session_id={sid}")
|
||||
assert status == 200
|
||||
assert "application/json" in headers.get("Content-Type", "")
|
||||
data = json.loads(raw)
|
||||
assert data["session_id"] == sid
|
||||
assert "messages" in data
|
||||
assert "title" in data
|
||||
|
||||
# ── Resizable panels: static files present ──
|
||||
|
||||
def test_static_index_has_resize_handles():
|
||||
raw, _, status = get_raw("/")
|
||||
assert status == 200
|
||||
assert b"sidebarResize" in raw
|
||||
assert b"rightpanelResize" in raw
|
||||
|
||||
def test_app_js_has_resize_logic():
|
||||
"""Sprint 9: app.js replaced by modules. Resize logic lives in boot.js."""
|
||||
raw, _, status = get_raw("/static/boot.js")
|
||||
assert status == 200
|
||||
assert b"_initResizePanels" in raw
|
||||
assert b"hermes-sidebar-w" in raw
|
||||
assert b"hermes-panel-w" in raw
|
||||
130
tests/test_sprint7.py
Normal file
130
tests/test_sprint7.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""
|
||||
Sprint 7 Tests: Cron CRUD, Skill CRUD, Memory Write, Session Content Search, Health
|
||||
"""
|
||||
import json, pathlib, urllib.error, urllib.parse, urllib.request
|
||||
|
||||
BASE = "http://127.0.0.1:8788"
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read())
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list, ws=None):
|
||||
body = {}
|
||||
if ws: body["workspace"] = str(ws)
|
||||
d, _ = post("/api/session/new", body)
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid, pathlib.Path(d["session"]["workspace"])
|
||||
|
||||
# ── Health (Phase G) ──────────────────────────────────────────────
|
||||
|
||||
def test_health_has_active_streams():
|
||||
data = get("/health")
|
||||
assert "active_streams" in data
|
||||
assert isinstance(data["active_streams"], int) and data["active_streams"] >= 0
|
||||
|
||||
def test_health_has_uptime_seconds():
|
||||
data = get("/health")
|
||||
assert "uptime_seconds" in data
|
||||
assert isinstance(data["uptime_seconds"], (int, float)) and data["uptime_seconds"] >= 0
|
||||
|
||||
# ── Session content search ────────────────────────────────────────
|
||||
|
||||
def test_session_search_empty_returns_all(cleanup_test_sessions):
|
||||
data = get("/api/sessions/search?q=")
|
||||
assert "sessions" in data
|
||||
|
||||
def test_session_search_content_params_accepted(cleanup_test_sessions):
|
||||
data = get("/api/sessions/search?q=hello&content=1&depth=3")
|
||||
assert "sessions" in data and "query" in data and data["query"] == "hello"
|
||||
|
||||
def test_session_search_returns_count(cleanup_test_sessions):
|
||||
data = get("/api/sessions/search?q=nonexistent_xyz_9999&content=1")
|
||||
assert "count" in data and data["count"] == 0
|
||||
|
||||
# ── Cron update ───────────────────────────────────────────────────
|
||||
|
||||
def test_cron_update_requires_job_id(cleanup_test_sessions):
|
||||
data, status = post("/api/crons/update", {"name": "test"})
|
||||
assert status == 400
|
||||
|
||||
def test_cron_update_unknown_job_404(cleanup_test_sessions):
|
||||
data, status = post("/api/crons/update", {"job_id": "nonexistent_abc123"})
|
||||
assert status == 404
|
||||
|
||||
# ── Cron delete ───────────────────────────────────────────────────
|
||||
|
||||
def test_cron_delete_requires_job_id(cleanup_test_sessions):
|
||||
data, status = post("/api/crons/delete", {})
|
||||
assert status == 400
|
||||
|
||||
def test_cron_delete_unknown_404(cleanup_test_sessions):
|
||||
data, status = post("/api/crons/delete", {"job_id": "nonexistent_xyz999"})
|
||||
assert status == 404
|
||||
|
||||
# ── Skill save ────────────────────────────────────────────────────
|
||||
|
||||
def test_skill_save_requires_name(cleanup_test_sessions):
|
||||
data, status = post("/api/skills/save", {"content": "# test"})
|
||||
assert status == 400
|
||||
|
||||
def test_skill_save_requires_content(cleanup_test_sessions):
|
||||
data, status = post("/api/skills/save", {"name": "test-no-content"})
|
||||
assert status == 400
|
||||
|
||||
def test_skill_save_invalid_name_rejected(cleanup_test_sessions):
|
||||
data, status = post("/api/skills/save", {"name": "../../../etc/passwd", "content": "bad"})
|
||||
assert status == 400
|
||||
|
||||
def test_skill_save_delete_roundtrip(cleanup_test_sessions):
|
||||
skill_name = "test-sprint7-skill"
|
||||
content = "---\nname: test-sprint7-skill\ndescription: Sprint 7 test.\ntags: [test]\n---\n\n# Test\n\nSprint 7 test skill."
|
||||
data, status = post("/api/skills/save", {"name": skill_name, "content": content})
|
||||
assert status == 200 and data.get("ok") is True
|
||||
skill_path = pathlib.Path(data["path"])
|
||||
assert skill_path.exists() and skill_path.read_text() == content
|
||||
del_data, del_status = post("/api/skills/delete", {"name": skill_name})
|
||||
assert del_status == 200 and del_data.get("ok") is True
|
||||
assert not skill_path.exists()
|
||||
|
||||
def test_skill_delete_requires_name(cleanup_test_sessions):
|
||||
data, status = post("/api/skills/delete", {})
|
||||
assert status == 400
|
||||
|
||||
def test_skill_delete_unknown_404(cleanup_test_sessions):
|
||||
data, status = post("/api/skills/delete", {"name": "nonexistent-skill-xyz-9999"})
|
||||
assert status == 404
|
||||
|
||||
# ── Memory write ──────────────────────────────────────────────────
|
||||
|
||||
def test_memory_write_requires_section(cleanup_test_sessions):
|
||||
data, status = post("/api/memory/write", {"content": "test"})
|
||||
assert status == 400
|
||||
|
||||
def test_memory_write_requires_content(cleanup_test_sessions):
|
||||
data, status = post("/api/memory/write", {"section": "memory"})
|
||||
assert status == 400
|
||||
|
||||
def test_memory_write_invalid_section(cleanup_test_sessions):
|
||||
data, status = post("/api/memory/write", {"section": "invalid", "content": "test"})
|
||||
assert status == 400
|
||||
|
||||
def test_memory_write_read_roundtrip(cleanup_test_sessions):
|
||||
original = get("/api/memory").get("memory", "")
|
||||
test_content = "# Sprint 7 Test\nWritten by test_memory_write_read_roundtrip."
|
||||
data, status = post("/api/memory/write", {"section": "memory", "content": test_content})
|
||||
assert status == 200 and data.get("ok") is True
|
||||
read_back = get("/api/memory").get("memory")
|
||||
assert read_back == test_content
|
||||
# Restore
|
||||
post("/api/memory/write", {"section": "memory", "content": original})
|
||||
125
tests/test_sprint8.py
Normal file
125
tests/test_sprint8.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""
|
||||
Sprint 8 Tests: Edit/regenerate, clear conversation, truncate, reconnect banner fix, syntax highlight.
|
||||
"""
|
||||
import json, pathlib, urllib.error, urllib.parse, urllib.request
|
||||
|
||||
BASE = "http://127.0.0.1:8788"
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read())
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
def make_session_tracked(created_list):
|
||||
d, _ = post("/api/session/new", {})
|
||||
sid = d["session"]["session_id"]
|
||||
created_list.append(sid)
|
||||
return sid
|
||||
|
||||
# ── /api/session/clear ─────────────────────────────────────────────
|
||||
|
||||
def test_session_clear_requires_session_id(cleanup_test_sessions):
|
||||
data, status = post("/api/session/clear", {})
|
||||
assert status == 400
|
||||
|
||||
def test_session_clear_unknown_session_404(cleanup_test_sessions):
|
||||
data, status = post("/api/session/clear", {"session_id": "nonexistent_xyz"})
|
||||
assert status == 404
|
||||
|
||||
def test_session_clear_wipes_messages(cleanup_test_sessions):
|
||||
created = []
|
||||
sid = make_session_tracked(created)
|
||||
# Inject a fake message directly into the session via rename (to give it a title first)
|
||||
post("/api/session/rename", {"session_id": sid, "title": "clear-test"})
|
||||
# Manually load and verify session exists
|
||||
sess = get(f"/api/session?session_id={urllib.parse.quote(sid)}")
|
||||
assert sess["session"]["session_id"] == sid
|
||||
# Clear it
|
||||
data, status = post("/api/session/clear", {"session_id": sid})
|
||||
assert status == 200, f"Expected 200, got {status}: {data}"
|
||||
assert data.get("ok") is True
|
||||
assert data.get("session") is not None
|
||||
# Load again and verify messages empty
|
||||
sess2 = get(f"/api/session?session_id={urllib.parse.quote(sid)}")
|
||||
assert sess2["session"]["messages"] == []
|
||||
assert sess2["session"]["title"] == "Untitled"
|
||||
# Cleanup
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
|
||||
def test_session_clear_returns_session_compact(cleanup_test_sessions):
|
||||
created = []
|
||||
sid = make_session_tracked(created)
|
||||
data, status = post("/api/session/clear", {"session_id": sid})
|
||||
assert status == 200
|
||||
assert "session" in data
|
||||
assert data["session"]["session_id"] == sid
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
|
||||
# ── /api/session/truncate ──────────────────────────────────────────
|
||||
|
||||
def test_session_truncate_requires_session_id(cleanup_test_sessions):
|
||||
data, status = post("/api/session/truncate", {"keep_count": 2})
|
||||
assert status == 400
|
||||
|
||||
def test_session_truncate_requires_keep_count(cleanup_test_sessions):
|
||||
data, status = post("/api/session/truncate", {"session_id": "xyz"})
|
||||
assert status == 400
|
||||
|
||||
def test_session_truncate_unknown_session_404(cleanup_test_sessions):
|
||||
data, status = post("/api/session/truncate", {"session_id": "nonexistent_xyz", "keep_count": 0})
|
||||
assert status == 404
|
||||
|
||||
def test_session_truncate_returns_messages(cleanup_test_sessions):
|
||||
created = []
|
||||
sid = make_session_tracked(created)
|
||||
data, status = post("/api/session/truncate", {"session_id": sid, "keep_count": 0})
|
||||
assert status == 200
|
||||
assert data.get("ok") is True
|
||||
assert "messages" in data["session"]
|
||||
assert data["session"]["messages"] == []
|
||||
post("/api/session/delete", {"session_id": sid})
|
||||
|
||||
# ── Static files contain new features ─────────────────────────────
|
||||
|
||||
def test_app_js_contains_edit_message(cleanup_test_sessions):
|
||||
"""Verify editMessage function is present in ui.js (Sprint 9: module split)."""
|
||||
with urllib.request.urlopen(BASE + "/static/ui.js", timeout=10) as r:
|
||||
src = r.read().decode()
|
||||
assert "editMessage" in src
|
||||
assert "msg-edit-area" in src
|
||||
|
||||
def test_app_js_contains_regenerate(cleanup_test_sessions):
|
||||
with urllib.request.urlopen(BASE + "/static/ui.js", timeout=10) as r:
|
||||
src = r.read().decode()
|
||||
assert "regenerateResponse" in src
|
||||
|
||||
def test_app_js_contains_clear_conversation(cleanup_test_sessions):
|
||||
with urllib.request.urlopen(BASE + "/static/panels.js", timeout=10) as r:
|
||||
src = r.read().decode()
|
||||
assert "clearConversation" in src
|
||||
assert "api/session/clear" in src
|
||||
|
||||
def test_app_js_contains_highlight_code(cleanup_test_sessions):
|
||||
with urllib.request.urlopen(BASE + "/static/ui.js", timeout=10) as r:
|
||||
src = r.read().decode()
|
||||
assert "highlightCode" in src
|
||||
assert "Prism" in src
|
||||
|
||||
def test_index_html_contains_prism(cleanup_test_sessions):
|
||||
with urllib.request.urlopen(BASE + "/", timeout=10) as r:
|
||||
src = r.read().decode()
|
||||
assert "prismjs" in src.lower()
|
||||
|
||||
def test_index_html_contains_clear_button(cleanup_test_sessions):
|
||||
with urllib.request.urlopen(BASE + "/", timeout=10) as r:
|
||||
src = r.read().decode()
|
||||
assert "btnClearConv" in src
|
||||
assert "clearConversation" in src
|
||||
115
tests/test_sprint9.py
Normal file
115
tests/test_sprint9.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""
|
||||
Sprint 9 Tests: app.js module split verification, tool cards, todo panel.
|
||||
Run: python -m pytest tests/test_sprint9.py -v
|
||||
"""
|
||||
import json, pathlib, urllib.error, urllib.request
|
||||
|
||||
BASE = "http://127.0.0.1:8788"
|
||||
|
||||
def get_text(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return r.read().decode()
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read())
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data,
|
||||
headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
return json.loads(e.read()), e.code
|
||||
|
||||
# ── Module split: all 6 files served ──────────────────────────────────────
|
||||
|
||||
def test_ui_js_served(cleanup_test_sessions):
|
||||
src = get_text("/static/ui.js")
|
||||
assert len(src) > 1000
|
||||
assert "function setBusy" in src
|
||||
assert "function syncTopbar" in src
|
||||
assert "const S=" in src or "const S =" in src
|
||||
|
||||
def test_workspace_js_served(cleanup_test_sessions):
|
||||
src = get_text("/static/workspace.js")
|
||||
assert "async function api(" in src
|
||||
assert "async function loadDir(" in src
|
||||
assert "async function openFile(" in src # renderFileTree is in ui.js
|
||||
|
||||
def test_sessions_js_served(cleanup_test_sessions):
|
||||
src = get_text("/static/sessions.js")
|
||||
assert "async function newSession(" in src
|
||||
assert "async function loadSession(" in src
|
||||
assert "async function renderSessionList(" in src
|
||||
|
||||
def test_messages_js_served(cleanup_test_sessions):
|
||||
src = get_text("/static/messages.js")
|
||||
assert "async function send(" in src
|
||||
assert "function transcript(" in src
|
||||
|
||||
def test_panels_js_served(cleanup_test_sessions):
|
||||
src = get_text("/static/panels.js")
|
||||
assert "async function switchPanel(" in src
|
||||
assert "async function loadCrons(" in src
|
||||
assert "async function loadSkills(" in src
|
||||
assert "async function loadMemory(" in src
|
||||
|
||||
def test_boot_js_served(cleanup_test_sessions):
|
||||
src = get_text("/static/boot.js")
|
||||
assert "btnSend" in src
|
||||
assert "btnNewChat" in src
|
||||
# boot IIFE
|
||||
assert "(async()=>{" in src or "(async () => {" in src
|
||||
|
||||
def test_app_js_no_longer_referenced_in_html(cleanup_test_sessions):
|
||||
"""index.html must not reference the old monolithic app.js."""
|
||||
html = get_text("/")
|
||||
assert 'src="/static/app.js"' not in html
|
||||
# All 6 modules must be present
|
||||
for module in ["ui.js", "workspace.js", "sessions.js", "messages.js", "panels.js", "boot.js"]:
|
||||
assert f'src="/static/{module}"' in html, f"Missing {module} in index.html"
|
||||
|
||||
def test_module_load_order_correct(cleanup_test_sessions):
|
||||
"""ui.js must appear before sessions.js which must appear before boot.js."""
|
||||
html = get_text("/")
|
||||
ui_pos = html.find('src="/static/ui.js"')
|
||||
ws_pos = html.find('src="/static/workspace.js"')
|
||||
sess_pos = html.find('src="/static/sessions.js"')
|
||||
msg_pos = html.find('src="/static/messages.js"')
|
||||
panels_pos = html.find('src="/static/panels.js"')
|
||||
boot_pos = html.find('src="/static/boot.js"')
|
||||
assert ui_pos < ws_pos < sess_pos < msg_pos < panels_pos < boot_pos
|
||||
|
||||
def test_no_duplicate_function_definitions(cleanup_test_sessions):
|
||||
"""No function name should appear in more than one module."""
|
||||
import re
|
||||
modules = ["ui.js", "workspace.js", "sessions.js", "messages.js", "panels.js", "boot.js"]
|
||||
seen = {}
|
||||
for m in modules:
|
||||
src = get_text(f"/static/{m}")
|
||||
fns = re.findall(r'(?:async )?function ([a-zA-Z_$][a-zA-Z0-9_$]*)\s*\(', src)
|
||||
for fn in fns:
|
||||
if fn in seen:
|
||||
assert False, f"Duplicate function {fn} in both {seen[fn]} and {m}"
|
||||
seen[fn] = m
|
||||
assert len(seen) > 50, f"Expected 50+ functions, got {len(seen)}"
|
||||
|
||||
def test_all_functions_present_across_modules(cleanup_test_sessions):
|
||||
"""Key functions must be present somewhere in the split modules."""
|
||||
import re
|
||||
modules = ["ui.js", "workspace.js", "sessions.js", "messages.js", "panels.js", "boot.js"]
|
||||
all_src = ""
|
||||
for m in modules:
|
||||
all_src += get_text(f"/static/{m}")
|
||||
required = [
|
||||
"setBusy", "syncTopbar", "renderMessages", "send", "loadSession",
|
||||
"newSession", "renderSessionList", "loadDir", "switchPanel",
|
||||
"loadCrons", "loadSkills", "loadMemory", "editMessage",
|
||||
"regenerateResponse", "clearConversation", "highlightCode",
|
||||
"toggleSkillForm", "submitSkillSave", "toggleMemoryEdit",
|
||||
]
|
||||
for fn in required:
|
||||
assert fn in all_src, f"Function {fn} missing from all modules"
|
||||
Reference in New Issue
Block a user