* Fix unicode filenames in file download headers * docs: v0.50.19 CHANGELOG entry for unicode filename fix (PR #378) * docs: fix test count in v0.50.19 CHANGELOG (924 not 926) --------- Co-authored-by: shaoxianbilly <40623436+shaoxianbilly@users.noreply.github.com> Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
732 lines
32 KiB
Python
732 lines
32 KiB
Python
"""
|
|
Sprint 29 Tests: Security hardening — 12 fixes from PR #171.
|
|
|
|
Covers:
|
|
1. CSRF protection — cross-origin POST rejected, same-origin allowed
|
|
2. Login rate limiting — 5th attempt 429, 6th rejected, still works after burst
|
|
3. Session ID validation — non-hex chars rejected in Session.load()
|
|
4. Error path sanitization — _sanitize_error() strips filesystem paths
|
|
5. Secure cookie detection — getattr used safely on plain socket
|
|
6. HMAC signature length — 32-char hex (128-bit), not 16
|
|
7. Skills path traversal — path outside SKILLS_DIR rejected
|
|
8. Content-Disposition for dangerous MIME types — HTML/SVG force download
|
|
9. PBKDF2 password hashing — save_settings uses auth._hash_password
|
|
10. Non-loopback startup warning (manual / integration test)
|
|
11. SSRF DNS check logic (unit test on helper function)
|
|
12. ENV_LOCK export — _ENV_LOCK importable from streaming module
|
|
"""
|
|
import importlib
|
|
import json
|
|
import pathlib
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
sys.path.insert(0, str(pathlib.Path(__file__).parent))
|
|
from conftest import TEST_STATE_DIR
|
|
|
|
BASE = "http://127.0.0.1:8788"
|
|
|
|
|
|
def get(path, headers=None):
|
|
req = urllib.request.Request(BASE + path, headers=headers or {})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
return json.loads(r.read()), r.status
|
|
except urllib.error.HTTPError as e:
|
|
return json.loads(e.read()), e.code
|
|
|
|
|
|
def post(path, body=None, headers=None):
|
|
data = json.dumps(body or {}).encode()
|
|
h = {"Content-Type": "application/json"}
|
|
if headers:
|
|
h.update(headers)
|
|
req = urllib.request.Request(BASE + path, data=data, headers=h)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
return json.loads(r.read()), r.status
|
|
except urllib.error.HTTPError as e:
|
|
return json.loads(e.read()), e.code
|
|
|
|
|
|
def get_raw_with_headers(path):
|
|
req = urllib.request.Request(BASE + path)
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
return r.read(), dict(r.headers.items()), r.status
|
|
|
|
|
|
# ── 1. CSRF Protection ─────────────────────────────────────────────────────
|
|
|
|
|
|
class TestCSRF:
|
|
@staticmethod
|
|
def _csrf_allowed(headers):
|
|
from types import SimpleNamespace
|
|
from api.routes import _check_csrf
|
|
|
|
return _check_csrf(SimpleNamespace(headers=headers))
|
|
|
|
def test_no_origin_no_referer_allowed(self):
|
|
"""Curl-style request with no Origin/Referer must pass CSRF check."""
|
|
body, status = post("/api/sessions/new", {})
|
|
# Should succeed (200 or 404) but NOT 403
|
|
assert status != 403, f"Expected non-403 for no-origin request, got {status}"
|
|
|
|
def test_cross_origin_post_rejected(self):
|
|
"""Cross-origin POST (Origin != Host) must be rejected with 403."""
|
|
body, status = post(
|
|
"/api/sessions/new",
|
|
{},
|
|
headers={"Origin": "http://evil.com", "Host": "127.0.0.1:8788"},
|
|
)
|
|
assert status == 403, f"Expected 403 for cross-origin request, got {status}: {body}"
|
|
assert "cross-origin" in body.get("error", "").lower() or "rejected" in body.get("error", "").lower()
|
|
|
|
def test_same_origin_post_allowed(self):
|
|
"""Same-origin POST (Origin matches Host) must be allowed."""
|
|
body, status = post(
|
|
"/api/sessions/new",
|
|
{},
|
|
headers={"Origin": "http://127.0.0.1:8788", "Host": "127.0.0.1:8788"},
|
|
)
|
|
assert status != 403, f"Expected non-403 for same-origin request, got {status}: {body}"
|
|
|
|
def test_same_origin_referer_allowed(self):
|
|
"""Same-origin Referer (matching Host) must be allowed."""
|
|
body, status = post(
|
|
"/api/sessions/new",
|
|
{},
|
|
headers={"Referer": "http://127.0.0.1:8788/", "Host": "127.0.0.1:8788"},
|
|
)
|
|
assert status != 403, f"Expected non-403 for same-referer request, got {status}"
|
|
|
|
def test_proxy_host_default_https_port_matches_http_origin(self):
|
|
"""http:// origin without port must NOT match X-Forwarded-Host with :443.
|
|
|
|
After the scheme-aware _ports_match fix: http:// absent port = :80,
|
|
which is not equal to :443. These are different protocols/ports and
|
|
should be rejected. In real reverse proxy scenarios where the external
|
|
URL is HTTPS, the browser sends Origin: https://... not http://...
|
|
See test_proxy_host_default_https_port_matches_https_origin for the
|
|
real-world proxy case that should pass.
|
|
"""
|
|
assert not self._csrf_allowed({
|
|
"Origin": "http://example.com",
|
|
"X-Forwarded-Host": "example.com:443",
|
|
}), 'http origin (port :80) must not match https host (:443)'
|
|
|
|
def test_proxy_host_default_https_port_matches_https_origin(self):
|
|
"""HTTPS Origin without port should match X-Forwarded-Host with explicit :443."""
|
|
assert self._csrf_allowed({
|
|
"Origin": "https://example.com",
|
|
"X-Forwarded-Host": "example.com:443",
|
|
})
|
|
|
|
def test_proxy_host_port_normalization_still_rejects_other_host(self):
|
|
"""Port normalization must not allow different hosts through."""
|
|
assert not self._csrf_allowed({
|
|
"Origin": "https://evil.com",
|
|
"X-Forwarded-Host": "example.com:443",
|
|
})
|
|
|
|
def test_allowed_public_origin_bypasses_missing_proxy_port(self, monkeypatch):
|
|
"""Explicitly configured public origins should pass even if proxy strips :port from Host."""
|
|
monkeypatch.setenv('HERMES_WEBUI_ALLOWED_ORIGINS', 'https://myapp.example.com:8000')
|
|
assert self._csrf_allowed({
|
|
'Origin': 'https://myapp.example.com:8000',
|
|
'Host': 'myapp.example.com',
|
|
'X-Forwarded-Proto': 'https',
|
|
})
|
|
|
|
def test_other_origin_not_allowed_by_public_origin_allowlist(self, monkeypatch):
|
|
"""Allowlist must stay exact; unrelated origins must still be rejected."""
|
|
monkeypatch.setenv('HERMES_WEBUI_ALLOWED_ORIGINS', 'https://myapp.example.com:8000')
|
|
assert not self._csrf_allowed({
|
|
'Origin': 'https://evil.com:8000',
|
|
'Host': 'myapp.example.com',
|
|
'X-Forwarded-Proto': 'https',
|
|
})
|
|
|
|
# ── Port normalization: scheme-aware (M-1 fix) ────────────────────────────
|
|
|
|
def test_cross_protocol_port_not_confused_http_origin_https_host(self):
|
|
"""http:// origin must NOT match a host with :443 (HTTPS default).
|
|
|
|
Before M-1 fix, _ports_match treated both 80 and 443 as equivalent to
|
|
absent port, allowing http://host to match https://host:443 servers.
|
|
"""
|
|
assert not self._csrf_allowed({
|
|
'Origin': 'http://example.com', # http, no port = :80
|
|
'X-Forwarded-Host': 'example.com:443', # HTTPS port
|
|
}), 'http origin should NOT match host advertising port 443'
|
|
|
|
def test_cross_protocol_port_not_confused_https_origin_http_host(self):
|
|
"""https:// origin must NOT match a host with :80 (HTTP default)."""
|
|
assert not self._csrf_allowed({
|
|
'Origin': 'https://example.com', # https, no port = :443
|
|
'X-Forwarded-Host': 'example.com:80', # HTTP port
|
|
}), 'https origin should NOT match host advertising port 80'
|
|
|
|
def test_http_explicit_port_80_matches_host_without_port(self):
|
|
"""http://example.com:80 is the same origin as http://example.com."""
|
|
assert self._csrf_allowed({
|
|
'Origin': 'http://example.com:80',
|
|
'Host': 'example.com',
|
|
})
|
|
|
|
def test_https_explicit_port_443_matches_host_without_port(self):
|
|
"""https://example.com:443 is the same origin as https://example.com."""
|
|
assert self._csrf_allowed({
|
|
'Origin': 'https://example.com:443',
|
|
'Host': 'example.com',
|
|
})
|
|
|
|
def test_non_default_port_not_waived(self):
|
|
"""Non-default ports (e.g. :8000) must not be treated as equivalent to absent."""
|
|
assert not self._csrf_allowed({
|
|
'Origin': 'https://example.com:8000',
|
|
'Host': 'example.com',
|
|
})
|
|
|
|
# ── Bug scenario: proxy strips non-standard port ──────────────────────────
|
|
|
|
def test_bug_origin_8000_host_without_port_rejected_without_allowlist(self, monkeypatch):
|
|
"""Without the allowlist, origin with :8000 must be rejected when proxy strips port.
|
|
|
|
This documents the original bug: Origin: https://app.com:8000 with
|
|
Host: app.com (proxy stripped the port). Before this PR that returned 403.
|
|
The fix (HERMES_WEBUI_ALLOWED_ORIGINS) handles it; without the env var
|
|
the request is still rejected, which is the safe default.
|
|
"""
|
|
monkeypatch.delenv('HERMES_WEBUI_ALLOWED_ORIGINS', raising=False)
|
|
assert not self._csrf_allowed({
|
|
'Origin': 'https://myapp.example.com:8000',
|
|
'Host': 'myapp.example.com',
|
|
}), 'without allowlist, port mismatch must be rejected (safe default)'
|
|
|
|
def test_allowed_origins_comma_separated(self, monkeypatch):
|
|
"""HERMES_WEBUI_ALLOWED_ORIGINS accepts multiple comma-separated origins."""
|
|
monkeypatch.setenv(
|
|
'HERMES_WEBUI_ALLOWED_ORIGINS',
|
|
'https://app1.example.com:8000, https://app2.example.com:9000',
|
|
)
|
|
assert self._csrf_allowed({'Origin': 'https://app1.example.com:8000', 'Host': 'proxy.internal'})
|
|
assert self._csrf_allowed({'Origin': 'https://app2.example.com:9000', 'Host': 'proxy.internal'})
|
|
assert not self._csrf_allowed({'Origin': 'https://evil.com:8000', 'Host': 'proxy.internal'})
|
|
|
|
def test_allowed_origins_without_scheme_ignored(self, monkeypatch, capsys):
|
|
"""Allowlist entries missing the scheme are skipped and a warning is printed."""
|
|
monkeypatch.setenv('HERMES_WEBUI_ALLOWED_ORIGINS', 'myapp.example.com:8000')
|
|
from api.routes import _allowed_public_origins
|
|
result = _allowed_public_origins()
|
|
assert len(result) == 0, 'entry without scheme must be ignored'
|
|
captured = capsys.readouterr()
|
|
assert 'WARNING' in captured.err and 'scheme' in captured.err.lower()
|
|
|
|
def test_allowed_origins_trailing_slash_normalized(self, monkeypatch):
|
|
"""Trailing slash in allowlist entry is stripped before comparison."""
|
|
monkeypatch.setenv('HERMES_WEBUI_ALLOWED_ORIGINS', 'https://myapp.example.com:8000/')
|
|
assert self._csrf_allowed({
|
|
'Origin': 'https://myapp.example.com:8000',
|
|
'Host': 'proxy.internal',
|
|
})
|
|
|
|
|
|
# ── CSRF helpers: unit tests ─────────────────────────────────────────────────
|
|
|
|
|
|
class TestCSRFHelpers:
|
|
"""Direct unit tests for _normalize_host_port and _ports_match."""
|
|
def test_normalize_host_only(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port('example.com') == ('example.com', None)
|
|
|
|
def test_normalize_host_with_port(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port('example.com:8000') == ('example.com', '8000')
|
|
|
|
def test_normalize_ipv6_no_port(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port('[::1]') == ('::1', None)
|
|
|
|
def test_normalize_ipv6_with_port(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port('[::1]:8080') == ('::1', '8080')
|
|
|
|
def test_normalize_empty(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port('') == ('', None)
|
|
|
|
def test_normalize_whitespace_stripped(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port(' example.com ') == ('example.com', None)
|
|
|
|
def test_normalize_lowercases(self):
|
|
from api.routes import _normalize_host_port
|
|
assert _normalize_host_port('EXAMPLE.COM:80') == ('example.com', '80')
|
|
|
|
def test_ports_match_identical(self):
|
|
from api.routes import _ports_match
|
|
assert _ports_match('https', '8000', '8000') is True
|
|
|
|
def test_ports_match_both_absent(self):
|
|
from api.routes import _ports_match
|
|
assert _ports_match('https', None, None) is True
|
|
|
|
def test_ports_match_https_absent_vs_443(self):
|
|
from api.routes import _ports_match
|
|
assert _ports_match('https', None, '443') is True
|
|
assert _ports_match('https', '443', None) is True
|
|
|
|
def test_ports_match_http_absent_vs_80(self):
|
|
from api.routes import _ports_match
|
|
assert _ports_match('http', None, '80') is True
|
|
assert _ports_match('http', '80', None) is True
|
|
|
|
def test_ports_match_http_absent_vs_443_rejected(self):
|
|
"""http:// scheme: absent port is :80, not :443."""
|
|
from api.routes import _ports_match
|
|
assert _ports_match('http', None, '443') is False
|
|
assert _ports_match('http', '443', None) is False
|
|
|
|
def test_ports_match_https_absent_vs_80_rejected(self):
|
|
"""https:// scheme: absent port is :443, not :80."""
|
|
from api.routes import _ports_match
|
|
assert _ports_match('https', None, '80') is False
|
|
assert _ports_match('https', '80', None) is False
|
|
|
|
def test_ports_match_non_default_never_waived(self):
|
|
from api.routes import _ports_match
|
|
assert _ports_match('https', None, '8000') is False
|
|
assert _ports_match('https', '8000', None) is False
|
|
assert _ports_match('http', None, '8080') is False
|
|
|
|
def test_ports_match_different_non_default(self):
|
|
from api.routes import _ports_match
|
|
assert _ports_match('https', '8000', '9000') is False
|
|
|
|
|
|
# ── 2. Login Rate Limiting ─────────────────────────────────────────────────
|
|
|
|
|
|
class TestLoginRateLimit:
|
|
def test_rate_limit_triggers_429(self):
|
|
"""More than 5 failed login attempts from same IP must yield 429."""
|
|
from api.auth import _login_attempts, _LOGIN_WINDOW
|
|
|
|
# Force the rate limiter state: inject 5 stale-now timestamps so next call is fresh
|
|
# Actually easier: just hit the endpoint 6 times with wrong password
|
|
# But we can't set a password in a test without config file.
|
|
# Instead test the helper directly.
|
|
import time
|
|
from api import auth as _auth
|
|
|
|
# Reset state for a fake IP
|
|
fake_ip = "10.255.254.253"
|
|
_auth._login_attempts[fake_ip] = []
|
|
|
|
# Record 5 attempts — should still be allowed
|
|
for _ in range(5):
|
|
_auth._record_login_attempt(fake_ip)
|
|
assert not _auth._check_login_rate(fake_ip), \
|
|
"After 5 attempts, _check_login_rate should return False (blocked)"
|
|
|
|
def test_rate_limit_resets_after_window(self):
|
|
"""After window expires, rate limit resets."""
|
|
import time
|
|
from api import auth as _auth
|
|
|
|
fake_ip = "10.255.254.252"
|
|
# Inject 5 old timestamps (outside window)
|
|
old_ts = time.time() - 70 # 70s ago, outside 60s window
|
|
_auth._login_attempts[fake_ip] = [old_ts] * 5
|
|
assert _auth._check_login_rate(fake_ip), \
|
|
"After window expires, IP should be allowed again"
|
|
|
|
def test_rate_limit_endpoint_returns_429(self, webui_server):
|
|
"""Live endpoint: 6th bad attempt returns 429 (auth enabled required)."""
|
|
# This test only runs meaningfully when auth is enabled.
|
|
# We can still verify the helper returns 429 from the unit test above.
|
|
# If auth not enabled, endpoint returns 200 OK with 'Auth not enabled'.
|
|
from api import auth as _auth
|
|
|
|
fake_ip = "10.255.254.251"
|
|
# Fill the bucket
|
|
_auth._login_attempts[fake_ip] = [time.time()] * 5
|
|
assert not _auth._check_login_rate(fake_ip)
|
|
|
|
|
|
# ── 3. Session ID Validation ───────────────────────────────────────────────
|
|
|
|
|
|
class TestSessionIDValidation:
|
|
def test_hex_session_id_loads(self, tmp_path):
|
|
"""A valid hex session ID gets past the validation check."""
|
|
import sys
|
|
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
|
|
from api.models import Session, SESSION_DIR
|
|
valid_hex = "deadbeef" * 8 # 64 hex chars
|
|
# Should not raise — returns None only if file doesn't exist (it won't)
|
|
result = Session.load(valid_hex)
|
|
assert result is None # No file, but no error
|
|
|
|
def test_new_format_session_id_passes_validation(self):
|
|
"""New hermes-agent session IDs (YYYYMMDD_HHMMSS_xxxxxx) must pass validation."""
|
|
from api.models import Session
|
|
# Should pass the validator (returns None only because the file doesn't exist)
|
|
result = Session.load("20260406_164014_74b2d1")
|
|
assert result is None # file doesn't exist, but validator passed
|
|
|
|
def test_non_hex_session_id_rejected(self):
|
|
"""A session ID with dangerous chars must be rejected."""
|
|
from api.models import Session
|
|
evil_ids = [
|
|
"../../../etc/passwd",
|
|
"../../../../root/.ssh/id_rsa",
|
|
"session; rm -rf /",
|
|
"hello world",
|
|
"ZZZZZZZZZZZZZZZZ",
|
|
"session\x00evil",
|
|
"..\\..\\windows\\system32",
|
|
"session/../../etc/passwd",
|
|
"valid_looking.json",
|
|
]
|
|
for sid in evil_ids:
|
|
result = Session.load(sid)
|
|
assert result is None, \
|
|
f"Session.load should reject dangerous ID '{sid}', got {result}"
|
|
|
|
def test_empty_session_id_rejected(self):
|
|
"""An empty session ID must be rejected."""
|
|
from api.models import Session
|
|
assert Session.load("") is None
|
|
assert Session.load(None) is None
|
|
|
|
|
|
# ── 4. Error Path Sanitization ────────────────────────────────────────────
|
|
|
|
|
|
class TestSanitizeError:
|
|
def test_unix_path_stripped(self):
|
|
from api.helpers import _sanitize_error
|
|
e = FileNotFoundError("/home/hermes/.hermes/sessions/abc123.json")
|
|
result = _sanitize_error(e)
|
|
assert "/home/hermes" not in result
|
|
assert "<path>" in result
|
|
|
|
def test_nested_unix_path_stripped(self):
|
|
from api.helpers import _sanitize_error
|
|
e = ValueError("cannot read /var/lib/hermes/data.db: permission denied")
|
|
result = _sanitize_error(e)
|
|
assert "/var/lib/hermes" not in result
|
|
assert "<path>" in result
|
|
|
|
def test_no_path_unchanged(self):
|
|
from api.helpers import _sanitize_error
|
|
e = ValueError("session not found")
|
|
result = _sanitize_error(e)
|
|
assert result == "session not found"
|
|
|
|
def test_windows_path_stripped(self):
|
|
from api.helpers import _sanitize_error
|
|
e = FileNotFoundError("C:\\Users\\hermes\\AppData\\sessions\\x.json not found")
|
|
result = _sanitize_error(e)
|
|
assert "C:\\Users\\hermes" not in result
|
|
|
|
def test_live_404_does_not_leak_path(self, webui_server):
|
|
"""Live server: file-not-found errors must not expose filesystem paths."""
|
|
body, status = post("/api/file/read", {"path": "../../etc/passwd"})
|
|
err = body.get("error", "")
|
|
assert "/home" not in err and "/var" not in err and "/etc" not in err, \
|
|
f"Error message leaks filesystem path: {err}"
|
|
|
|
|
|
# ── 5. Secure Cookie Flag ─────────────────────────────────────────────────
|
|
|
|
|
|
class TestSecureCookieFlag:
|
|
def test_getattr_safe_on_plain_socket(self):
|
|
"""getattr(handler.request, 'getpeercert', None) must not raise on plain socket."""
|
|
import socket
|
|
# Plain socket has no getpeercert attribute
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
result = getattr(s, 'getpeercert', None)
|
|
assert result is None, \
|
|
f"Expected None on plain socket, got {result}"
|
|
finally:
|
|
s.close()
|
|
|
|
def test_secure_flag_not_set_for_plain_http(self, webui_server):
|
|
"""Login endpoint over plain HTTP must NOT set Secure cookie flag."""
|
|
# Auth is disabled in tests, so this just checks no crash
|
|
body, status = post("/api/auth/login", {"password": "test"})
|
|
# Either 200 (auth not enabled) or 401 (auth enabled, wrong pw)
|
|
assert status in (200, 401, 429), f"Unexpected status {status}"
|
|
|
|
|
|
# ── 6. HMAC Signature Length ──────────────────────────────────────────────
|
|
|
|
|
|
class TestHMACLength:
|
|
def test_session_token_sig_is_32_chars(self):
|
|
"""Session cookie signature must be 32 hex chars (128-bit), not 16."""
|
|
from api.auth import create_session
|
|
cookie = create_session()
|
|
token, sig = cookie.rsplit('.', 1)
|
|
assert len(sig) == 32, \
|
|
f"Expected 32-char signature (128-bit), got {len(sig)}: {sig}"
|
|
|
|
def test_verify_session_rejects_old_16char_sig(self):
|
|
"""A cookie with a 16-char sig must fail verification."""
|
|
import hmac as _hmac
|
|
import hashlib
|
|
from api.auth import _signing_key, verify_session, _sessions
|
|
import time
|
|
import secrets
|
|
|
|
token = secrets.token_hex(32)
|
|
_sessions[token] = time.time() + 3600 # valid session
|
|
old_sig = _hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest()[:16]
|
|
old_cookie = f"{token}.{old_sig}"
|
|
# Should fail: sig length wrong
|
|
assert not verify_session(old_cookie), \
|
|
"Old 16-char sig cookie must not verify (sig mismatch)"
|
|
|
|
|
|
# ── 7. Skills Path Traversal ──────────────────────────────────────────────
|
|
|
|
|
|
class TestSkillsPathTraversal:
|
|
def test_traversal_rejected(self, webui_server):
|
|
"""Saving a skill with a traversal path must return 400."""
|
|
body, status = post("/api/skills/save", {
|
|
"name": "../../evil",
|
|
"content": "# evil",
|
|
})
|
|
assert status in (400, 403), \
|
|
f"Expected 400/403 for traversal skill path, got {status}: {body}"
|
|
|
|
def test_valid_skill_accepted(self, webui_server):
|
|
"""Saving a skill with a valid name must succeed."""
|
|
body, status = post("/api/skills/save", {
|
|
"name": "test-security-skill",
|
|
"content": "---\nname: test-security-skill\ndescription: test\n---\n# test",
|
|
})
|
|
# 500 = skills module not available (hermes-agent not installed) — skip
|
|
if status == 500:
|
|
import pytest; pytest.skip("skills module requires hermes-agent")
|
|
# Should succeed (200) or need auth (401/403) — not path error (400)
|
|
assert status in (200, 401, 403, 404), \
|
|
f"Valid skill save got unexpected status {status}: {body}"
|
|
|
|
|
|
# ── 8. Content-Disposition for Dangerous MIME Types ───────────────────────
|
|
|
|
|
|
class TestContentDisposition:
|
|
def test_html_file_forced_download(self, webui_server, tmp_path):
|
|
"""HTML files served via /api/file/raw must have Content-Disposition: attachment."""
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
# Use a session to create an HTML file in the workspace
|
|
sessions_body, _ = post("/api/sessions/new", {})
|
|
sid = sessions_body.get("session_id") or sessions_body.get("id")
|
|
if not sid:
|
|
return # Skip if sessions API shape is unexpected
|
|
|
|
# Can't easily create a file via the test server without a workspace,
|
|
# so test the logic directly instead.
|
|
from api.routes import _handle_file_raw
|
|
dangerous_types = {'text/html', 'application/xhtml+xml', 'image/svg+xml'}
|
|
for mime in dangerous_types:
|
|
assert mime in dangerous_types, f"{mime} should be in dangerous_types set"
|
|
|
|
def test_dangerous_mime_types_set_complete(self):
|
|
"""The set of dangerous MIME types must include html, xhtml, and svg."""
|
|
import ast
|
|
import pathlib
|
|
routes_src = pathlib.Path(__file__).parent.parent / "api" / "routes.py"
|
|
src = routes_src.read_text()
|
|
assert "text/html" in src
|
|
assert "application/xhtml+xml" in src
|
|
assert "image/svg+xml" in src
|
|
assert "dangerous_types" in src
|
|
|
|
def test_unicode_filename_download_header_is_latin1_safe(self, cleanup_test_sessions):
|
|
"""Unicode filenames must not crash download responses."""
|
|
body, status = post("/api/session/new", {})
|
|
assert status == 200, body
|
|
sid = body["session"]["session_id"]
|
|
cleanup_test_sessions.append(sid)
|
|
ws = pathlib.Path(body["session"]["workspace"])
|
|
filename = "中文对照表.pdf"
|
|
pdf_bytes = b"%PDF-1.3\n1 0 obj\n<<>>\nendobj\ntrailer\n<<>>\n%%EOF\n"
|
|
(ws / filename).write_bytes(pdf_bytes)
|
|
|
|
encoded = urllib.parse.quote(filename)
|
|
raw, headers, raw_status = get_raw_with_headers(
|
|
f"/api/file/raw?session_id={sid}&path={encoded}&download=1"
|
|
)
|
|
|
|
assert raw_status == 200
|
|
assert raw == pdf_bytes
|
|
disp = headers["Content-Disposition"]
|
|
assert disp.startswith("attachment; ")
|
|
assert "filename*=UTF-8''" in disp
|
|
disp.encode("latin-1")
|
|
|
|
def test_unicode_filename_inline_header_is_latin1_safe(self, cleanup_test_sessions):
|
|
"""Inline responses must also work for unicode filenames."""
|
|
body, status = post("/api/session/new", {})
|
|
assert status == 200, body
|
|
sid = body["session"]["session_id"]
|
|
cleanup_test_sessions.append(sid)
|
|
ws = pathlib.Path(body["session"]["workspace"])
|
|
filename = "预览.pdf"
|
|
pdf_bytes = b"%PDF-1.3\n1 0 obj\n<<>>\nendobj\ntrailer\n<<>>\n%%EOF\n"
|
|
(ws / filename).write_bytes(pdf_bytes)
|
|
|
|
encoded = urllib.parse.quote(filename)
|
|
raw, headers, raw_status = get_raw_with_headers(
|
|
f"/api/file/raw?session_id={sid}&path={encoded}"
|
|
)
|
|
|
|
assert raw_status == 200
|
|
assert raw == pdf_bytes
|
|
disp = headers["Content-Disposition"]
|
|
assert disp.startswith("inline; ")
|
|
assert "filename*=UTF-8''" in disp
|
|
disp.encode("latin-1")
|
|
|
|
|
|
# ── 9. PBKDF2 Password Hashing ───────────────────────────────────────────
|
|
|
|
|
|
class TestPasswordHashing:
|
|
def test_hash_password_is_hex(self):
|
|
"""_hash_password must produce a non-empty hex string (PBKDF2-SHA256)."""
|
|
from api.auth import _hash_password
|
|
result = _hash_password("mysecretpassword")
|
|
assert isinstance(result, str) and len(result) == 64, \
|
|
f"Expected 64-char hex hash (SHA-256 output), got len={len(result)}: {result}"
|
|
# Hex-only chars
|
|
assert all(c in "0123456789abcdef" for c in result), \
|
|
f"Hash must be hex string, got: {result}"
|
|
|
|
def test_hash_password_is_deterministic_with_same_salt(self):
|
|
"""_hash_password must return the same hash for same input (signing key is stable)."""
|
|
from api.auth import _hash_password
|
|
h1 = _hash_password("consistent_password")
|
|
h2 = _hash_password("consistent_password")
|
|
assert h1 == h2, "Same password must produce same hash (stable signing key)"
|
|
|
|
def test_hash_password_different_inputs_differ(self):
|
|
"""Different passwords must produce different hashes."""
|
|
from api.auth import _hash_password
|
|
assert _hash_password("password_a") != _hash_password("password_b"), \
|
|
"Different passwords must produce different hashes"
|
|
|
|
def test_hash_password_longer_than_sha256(self):
|
|
"""PBKDF2 with 600k iterations is much stronger than single SHA-256.
|
|
We verify indirectly: the code must call pbkdf2_hmac, not sha256 directly."""
|
|
import inspect
|
|
from api import auth as _auth
|
|
src = inspect.getsource(_auth._hash_password)
|
|
assert "pbkdf2_hmac" in src, \
|
|
"_hash_password must use pbkdf2_hmac, not raw sha256"
|
|
assert "600_000" in src or "600000" in src, \
|
|
"_hash_password must use 600,000 iterations"
|
|
|
|
def test_save_settings_stores_64char_hex_hash(self):
|
|
"""save_settings with _set_password must store a 64-char hex hash (PBKDF2)."""
|
|
from api.config import save_settings, load_settings, SETTINGS_FILE
|
|
import json
|
|
|
|
# Remember original content so we can restore it
|
|
original = None
|
|
if SETTINGS_FILE.exists():
|
|
original = SETTINGS_FILE.read_text()
|
|
|
|
try:
|
|
save_settings({"_set_password": "test_pbkdf2_pw"})
|
|
settings = load_settings()
|
|
ph = settings.get("password_hash", "")
|
|
assert len(ph) == 64 and all(c in "0123456789abcdef" for c in ph), \
|
|
f"save_settings must store 64-char hex PBKDF2 hash, got: {ph!r}"
|
|
finally:
|
|
# Restore original settings
|
|
if original is not None:
|
|
SETTINGS_FILE.write_text(original)
|
|
else:
|
|
save_settings({"_clear_password": True})
|
|
|
|
|
|
# ── 10. Non-loopback Startup Warning ─────────────────────────────────────
|
|
|
|
|
|
class TestStartupWarning:
|
|
def test_warning_code_present_in_server(self):
|
|
"""server.py must contain non-loopback warning code."""
|
|
src = pathlib.Path(__file__).parent.parent / "server.py"
|
|
text = src.read_text()
|
|
assert "0.0.0.0" in text or "non-loopback" in text.lower() or "WARNING" in text, \
|
|
"server.py must contain non-loopback warning logic"
|
|
assert "is_auth_enabled" in text, \
|
|
"server.py must check is_auth_enabled() before warning"
|
|
|
|
|
|
# ── 11. SSRF DNS Check ─────────────────────────────────────────────────────
|
|
|
|
|
|
class TestSSRFCheck:
|
|
def test_ssrf_guard_code_present_in_config(self):
|
|
"""config.py must contain SSRF DNS resolution guard."""
|
|
src = pathlib.Path(__file__).parent.parent / "api" / "config.py"
|
|
text = src.read_text()
|
|
assert "getaddrinfo" in text, "SSRF guard must resolve DNS with getaddrinfo"
|
|
assert "is_private" in text, "SSRF guard must check is_private IP"
|
|
assert "is_loopback" in text, "SSRF guard must check is_loopback IP"
|
|
|
|
def test_known_local_providers_whitelisted(self):
|
|
"""Ollama and localhost endpoints should NOT be blocked by SSRF guard."""
|
|
src = pathlib.Path(__file__).parent.parent / "api" / "config.py"
|
|
text = src.read_text()
|
|
assert "ollama" in text.lower()
|
|
assert "localhost" in text.lower()
|
|
assert "lmstudio" in text.lower() or "lm-studio" in text.lower()
|
|
|
|
|
|
# ── 12. ENV_LOCK Export ────────────────────────────────────────────────────
|
|
|
|
|
|
class TestENVLock:
|
|
def test_env_lock_importable_from_streaming(self):
|
|
"""_ENV_LOCK must be importable from api.streaming."""
|
|
from api.streaming import _ENV_LOCK
|
|
import threading
|
|
assert isinstance(_ENV_LOCK, type(threading.Lock())), \
|
|
"_ENV_LOCK must be a threading.Lock"
|
|
|
|
def test_env_lock_importable_in_routes(self):
|
|
"""api.routes must be able to import _ENV_LOCK from api.streaming."""
|
|
# If routes.py fails to import, this will raise ImportError
|
|
import importlib
|
|
import api.routes # noqa: F401 -- just checking import works
|
|
# No error means the circular import is OK
|
|
|
|
|
|
# ── Fixture ────────────────────────────────────────────────────────────────
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def webui_server():
|
|
"""Reuse the module-scoped server started by conftest.py."""
|
|
return BASE
|