Files
webui/tests/test_sprint43.py
nesquena-hermes dd17a0e9b7 security: bandit fixes B310/B324/B110 + QuietHTTPServer (#354)
* security: fix bandit security issues (B310, B324)

- Add usedforsecurity=False to MD5 hash in gateway_watcher.py
- Add URL scheme validation to prevent file:// access in config.py
- Add URL validation to bootstrap.py health check
- Add nosec comments where runtime validation exists

* fix: handle ConnectionResetError gracefully and add debug logging

- Add QuietHTTPServer class to suppress noisy connection reset errors
  caused by clients disconnecting abruptly (fixes log spam from
  'ConnectionResetError: [Errno 54] Connection reset by peer')

- Replace silent 'pass' statements with logger.debug() calls across
  api/auth.py, api/config.py, api/gateway_watcher.py, api/models.py,
  and api/onboarding.py for better observability during troubleshooting

- All tests pass (25 passed in test_regressions.py)

* chore: add debug logging to profiles and routes modules

- Replace silent 'pass' statements with logger.debug() calls in
  api/profiles.py for better error visibility during profile switching
  and module patching

- Add logger initialization to api/routes.py

* security: fix B110 bare except/pass issues (bandit security scan)

- Replace bare except/pass patterns with logger.debug() calls
- Fixes CWE-703 (improper check/handling of exceptional conditions)
- Files affected: routes.py, state_sync.py, streaming.py, workspace.py, server.py
- All tests pass successfully

* security: bandit fixes B310/B324/B110 + QuietHTTPServer (#354)

- api/gateway_watcher.py: MD5 usedforsecurity=False (B324)
- api/config.py, bootstrap.py: URL scheme validation before urlopen (B310)
- 12 files: replace bare except/pass with logger.debug() (B110)
- server.py: QuietHTTPServer suppresses client disconnect log noise
- server.py: fix sys.exc_info() (was traceback.sys.exc_info(), impl detail)
- tests/test_sprint43.py: 19 new tests covering all security fixes
- CHANGELOG.md: v0.50.14 entry; 841 tests total (up from 822)

---------

Co-authored-by: lawrencel1ng <lawrence.ling@global.ntt>
Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
2026-04-13 11:11:56 -07:00

254 lines
10 KiB
Python

"""
Sprint 43 Tests: Bandit security fixes — B310, B324, B110 + QuietHTTPServer (PR #354).
Covers:
- gateway_watcher.py: MD5 uses usedforsecurity=False (B324)
- config.py: URL scheme validation before urlopen (B310)
- bootstrap.py: URL scheme validation in wait_for_health (B310)
- server.py: QuietHTTPServer class exists and extends ThreadingHTTPServer
- server.py: QuietHTTPServer.handle_error suppresses client disconnect errors
- server.py: QuietHTTPServer uses sys.exc_info() not traceback.sys.exc_info()
- Logging: at least 5 modules add a module-level logger (B110 remediation)
- routes.py: session titles redacted in /api/sessions list response
"""
import ast
import pathlib
import re
import sys
import unittest
REPO_ROOT = pathlib.Path(__file__).parent.parent
GATEWAY_WATCHER_PY = (REPO_ROOT / "api" / "gateway_watcher.py").read_text()
CONFIG_PY = (REPO_ROOT / "api" / "config.py").read_text()
BOOTSTRAP_PY = (REPO_ROOT / "bootstrap.py").read_text()
SERVER_PY = (REPO_ROOT / "server.py").read_text()
ROUTES_PY = (REPO_ROOT / "api" / "routes.py").read_text()
AUTH_PY = (REPO_ROOT / "api" / "auth.py").read_text()
PROFILES_PY = (REPO_ROOT / "api" / "profiles.py").read_text()
STREAMING_PY = (REPO_ROOT / "api" / "streaming.py").read_text()
WORKSPACE_PY = (REPO_ROOT / "api" / "workspace.py").read_text()
STATE_SYNC_PY = (REPO_ROOT / "api" / "state_sync.py").read_text()
# ── B324: MD5 usedforsecurity=False ─────────────────────────────────────────
class TestMD5SecurityFix(unittest.TestCase):
"""B324: hashlib.md5 must use usedforsecurity=False for non-crypto hashes."""
def test_gateway_watcher_md5_usedforsecurity_false(self):
"""_snapshot_hash must pass usedforsecurity=False to hashlib.md5 (PR #354)."""
self.assertIn(
"usedforsecurity=False",
GATEWAY_WATCHER_PY,
"gateway_watcher.py: MD5 must use usedforsecurity=False (B324)",
)
def test_gateway_watcher_md5_pattern(self):
"""Exact pattern: hashlib.md5(..., usedforsecurity=False)."""
# Use re.search with DOTALL since the arg may span parens internally
import re
self.assertIsNotNone(
re.search(r"hashlib\.md5\(.*?usedforsecurity=False\)", GATEWAY_WATCHER_PY, re.DOTALL),
"MD5 call must include usedforsecurity=False kwarg",
)
# ── B310: URL scheme validation ──────────────────────────────────────────────
class TestUrlSchemeValidation(unittest.TestCase):
"""B310: urllib.request.urlopen must not be called with arbitrary schemes."""
def test_config_scheme_validation_present(self):
"""config.py must validate URL scheme before urlopen (B310 fix)."""
self.assertIn(
"parsed_url.scheme",
CONFIG_PY,
"config.py: URL scheme validation missing (B310)",
)
# Must check against allowed schemes
self.assertRegex(
CONFIG_PY,
r'parsed_url\.scheme\s+not\s+in\s+\(',
"config.py: scheme check must use 'not in (...)' pattern",
)
def test_config_urlopen_has_nosec(self):
"""The urlopen call in config.py must have a # nosec B310 comment."""
self.assertIn(
"nosec B310",
CONFIG_PY,
"config.py: urlopen must have # nosec B310 after scheme validation",
)
def test_bootstrap_scheme_validation_present(self):
"""bootstrap.py wait_for_health must validate URL scheme before urlopen."""
self.assertIn(
"Invalid health check URL",
BOOTSTRAP_PY,
"bootstrap.py: URL scheme validation missing in wait_for_health (B310)",
)
self.assertRegex(
BOOTSTRAP_PY,
r'url\.startswith\([^)]+http',
"bootstrap.py: must check url starts with http:// or https://",
)
def test_bootstrap_urlopen_has_nosec(self):
"""The urlopen call in bootstrap.py must have a # nosec B310 comment."""
self.assertIn(
"nosec B310",
BOOTSTRAP_PY,
"bootstrap.py: urlopen must have # nosec B310 after scheme validation",
)
def test_config_allows_http_and_https(self):
"""config.py scheme check must permit both http and https."""
self.assertIn('"http"', CONFIG_PY, "config.py: http must be in allowed schemes")
self.assertIn('"https"', CONFIG_PY, "config.py: https must be in allowed schemes")
# ── B110: Bare except/pass → logger.debug() ─────────────────────────────────
class TestBareExceptLogging(unittest.TestCase):
"""B110: bare except/pass blocks must be replaced with logger.debug()."""
MODULES_REQUIRING_LOGGER = [
("api/auth.py", AUTH_PY),
("api/config.py", CONFIG_PY),
("api/gateway_watcher.py", GATEWAY_WATCHER_PY),
("api/profiles.py", PROFILES_PY),
("api/streaming.py", STREAMING_PY),
("api/workspace.py", WORKSPACE_PY),
("api/state_sync.py", STATE_SYNC_PY),
("api/routes.py", ROUTES_PY),
]
def test_module_level_loggers_present(self):
"""All fixed modules must have a module-level logger = logging.getLogger(__name__)."""
for name, src in self.MODULES_REQUIRING_LOGGER:
with self.subTest(module=name):
self.assertIn(
"logger = logging.getLogger(__name__)",
src,
f"{name}: module-level logger missing (B110 fix requires logger)",
)
def test_gateway_watcher_no_bare_pass_in_except(self):
"""gateway_watcher.py critical except blocks must not use bare pass."""
# The poll loop except block that previously had 'pass' must now use logger
self.assertIn(
"logger.debug",
GATEWAY_WATCHER_PY,
"gateway_watcher.py: must use logger.debug not bare pass (B110)",
)
def test_profiles_reload_dotenv_logs_on_error(self):
"""profiles.py _reload_dotenv except must log + reset _loaded_profile_env_keys."""
# Both the reset and the debug log should be present in the except block
self.assertIn(
"_loaded_profile_env_keys = set()",
PROFILES_PY,
"profiles.py: _reload_dotenv except must reset _loaded_profile_env_keys",
)
self.assertIn(
"Failed to reload dotenv",
PROFILES_PY,
"profiles.py: _reload_dotenv except must log a warning",
)
# ── QuietHTTPServer ──────────────────────────────────────────────────────────
class TestQuietHTTPServer(unittest.TestCase):
"""server.py: QuietHTTPServer suppresses client disconnect noise."""
def test_quiet_http_server_class_exists(self):
"""QuietHTTPServer must be defined in server.py."""
self.assertIn(
"class QuietHTTPServer",
SERVER_PY,
"server.py: QuietHTTPServer class missing (PR #354)",
)
def test_quiet_http_server_extends_threading_http_server(self):
"""QuietHTTPServer must extend ThreadingHTTPServer."""
self.assertRegex(
SERVER_PY,
r"class QuietHTTPServer\(ThreadingHTTPServer\)",
"QuietHTTPServer must extend ThreadingHTTPServer",
)
def test_quiet_http_server_used_as_server(self):
"""main() must instantiate QuietHTTPServer not raw ThreadingHTTPServer."""
# After the class is defined, the server creation should use QuietHTTPServer
after_class = SERVER_PY[SERVER_PY.find("class QuietHTTPServer"):]
self.assertIn(
"QuietHTTPServer(",
after_class,
"main() must use QuietHTTPServer, not ThreadingHTTPServer directly",
)
def test_handle_error_suppresses_connection_reset(self):
"""handle_error must suppress ConnectionResetError and BrokenPipeError."""
self.assertIn(
"ConnectionResetError",
SERVER_PY,
"QuietHTTPServer.handle_error must handle ConnectionResetError",
)
self.assertIn(
"BrokenPipeError",
SERVER_PY,
"QuietHTTPServer.handle_error must handle BrokenPipeError",
)
def test_uses_sys_exc_info_not_traceback_sys(self):
"""handle_error must use sys.exc_info() not traceback.sys.exc_info() (implementation detail)."""
self.assertNotIn(
"traceback.sys.exc_info()",
SERVER_PY,
"server.py: must use sys.exc_info() not traceback.sys.exc_info()",
)
self.assertIn(
"sys.exc_info()",
SERVER_PY,
"server.py: handle_error must call sys.exc_info()",
)
def test_sys_imported_in_server(self):
"""server.py must import sys (needed for sys.exc_info)."""
import re
self.assertIsNotNone(
re.search(r"^import sys", SERVER_PY, re.MULTILINE),
"server.py: sys must be imported",
)
def test_handle_error_calls_super(self):
"""handle_error must call super().handle_error for non-client-disconnect errors."""
self.assertIn(
"super().handle_error(request, client_address)",
SERVER_PY,
"QuietHTTPServer.handle_error must delegate to super for real errors",
)
# ── Session title redaction in /api/sessions ────────────────────────────────
class TestSessionTitleRedaction(unittest.TestCase):
"""routes.py: session titles must be redacted in the sessions list endpoint."""
def test_redact_text_called_on_session_titles(self):
"""routes.py must call _redact_text on session titles in /api/sessions."""
self.assertRegex(
ROUTES_PY,
r'_redact_text\([^)]*\btitle\b[^)]*\)',
"routes.py: session titles must be redacted via _redact_text in /api/sessions",
)
def test_redact_text_imported_in_routes(self):
"""routes.py must import _redact_text from api.helpers."""
self.assertIn(
"_redact_text",
ROUTES_PY,
"routes.py: _redact_text must be imported from api.helpers",
)