* security: fix bandit security issues (B310, B324) - Add usedforsecurity=False to MD5 hash in gateway_watcher.py - Add URL scheme validation to prevent file:// access in config.py - Add URL validation to bootstrap.py health check - Add nosec comments where runtime validation exists * fix: handle ConnectionResetError gracefully and add debug logging - Add QuietHTTPServer class to suppress noisy connection reset errors caused by clients disconnecting abruptly (fixes log spam from 'ConnectionResetError: [Errno 54] Connection reset by peer') - Replace silent 'pass' statements with logger.debug() calls across api/auth.py, api/config.py, api/gateway_watcher.py, api/models.py, and api/onboarding.py for better observability during troubleshooting - All tests pass (25 passed in test_regressions.py) * chore: add debug logging to profiles and routes modules - Replace silent 'pass' statements with logger.debug() calls in api/profiles.py for better error visibility during profile switching and module patching - Add logger initialization to api/routes.py * security: fix B110 bare except/pass issues (bandit security scan) - Replace bare except/pass patterns with logger.debug() calls - Fixes CWE-703 (improper check/handling of exceptional conditions) - Files affected: routes.py, state_sync.py, streaming.py, workspace.py, server.py - All tests pass successfully * security: bandit fixes B310/B324/B110 + QuietHTTPServer (#354) - api/gateway_watcher.py: MD5 usedforsecurity=False (B324) - api/config.py, bootstrap.py: URL scheme validation before urlopen (B310) - 12 files: replace bare except/pass with logger.debug() (B110) - server.py: QuietHTTPServer suppresses client disconnect log noise - server.py: fix sys.exc_info() (was traceback.sys.exc_info(), impl detail) - tests/test_sprint43.py: 19 new tests covering all security fixes - CHANGELOG.md: v0.50.14 entry; 841 tests total (up from 822) --------- Co-authored-by: lawrencel1ng <lawrence.ling@global.ntt> Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
292 lines
11 KiB
Python
292 lines
11 KiB
Python
"""
|
|
Hermes Web UI -- Workspace and file system helpers.
|
|
|
|
Workspace lists and last-used workspace are stored per-profile so each
|
|
profile has its own workspace configuration. State files live at
|
|
``{profile_home}/webui_state/workspaces.json`` and
|
|
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
|
|
paths are used as fallback when no profile module is available.
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from api.config import (
|
|
WORKSPACES_FILE as _GLOBAL_WS_FILE,
|
|
LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
|
|
DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
|
|
MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
|
|
)
|
|
|
|
|
|
# ── Profile-aware path resolution ───────────────────────────────────────────
|
|
|
|
def _profile_state_dir() -> Path:
|
|
"""Return the webui_state directory for the active profile.
|
|
|
|
For the default profile, returns the global STATE_DIR (respects
|
|
HERMES_WEBUI_STATE_DIR env var for test isolation).
|
|
For named profiles, returns {profile_home}/webui_state/.
|
|
"""
|
|
try:
|
|
from api.profiles import get_active_profile_name, get_active_hermes_home
|
|
name = get_active_profile_name()
|
|
if name and name != 'default':
|
|
d = get_active_hermes_home() / 'webui_state'
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
except ImportError:
|
|
logger.debug("Failed to import profiles module, using global state dir")
|
|
return _GLOBAL_WS_FILE.parent
|
|
|
|
|
|
def _workspaces_file() -> Path:
|
|
"""Return the workspaces.json path for the active profile."""
|
|
return _profile_state_dir() / 'workspaces.json'
|
|
|
|
|
|
def _last_workspace_file() -> Path:
|
|
"""Return the last_workspace.txt path for the active profile."""
|
|
return _profile_state_dir() / 'last_workspace.txt'
|
|
|
|
|
|
def _profile_default_workspace() -> str:
|
|
"""Read the profile's default workspace from its config.yaml.
|
|
|
|
Checks keys in priority order:
|
|
1. 'workspace' — explicit webui workspace key
|
|
2. 'default_workspace' — alternate explicit key
|
|
3. 'terminal.cwd' — hermes-agent terminal working dir (most common)
|
|
|
|
Falls back to the boot-time DEFAULT_WORKSPACE constant.
|
|
"""
|
|
try:
|
|
from api.config import get_config
|
|
cfg = get_config()
|
|
# Explicit webui workspace keys first
|
|
for key in ('workspace', 'default_workspace'):
|
|
ws = cfg.get(key)
|
|
if ws:
|
|
p = Path(str(ws)).expanduser().resolve()
|
|
if p.is_dir():
|
|
return str(p)
|
|
# Fall through to terminal.cwd — the agent's configured working directory
|
|
terminal_cfg = cfg.get('terminal', {})
|
|
if isinstance(terminal_cfg, dict):
|
|
cwd = terminal_cfg.get('cwd', '')
|
|
if cwd and str(cwd) not in ('.', ''):
|
|
p = Path(str(cwd)).expanduser().resolve()
|
|
if p.is_dir():
|
|
return str(p)
|
|
except (ImportError, Exception):
|
|
logger.debug("Failed to load profile default workspace config")
|
|
return str(_BOOT_DEFAULT_WORKSPACE)
|
|
|
|
|
|
# ── Public API ──────────────────────────────────────────────────────────────
|
|
|
|
def _clean_workspace_list(workspaces: list) -> list:
|
|
"""Sanitize a workspace list:
|
|
- Remove entries whose paths no longer exist on disk.
|
|
- Remove entries that look like test artifacts (webui-mvp-test, test-workspace).
|
|
- Remove entries whose paths live inside another profile's directory
|
|
(e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
|
|
- Rename any entry whose name is literally 'default' to 'Home' (avoids
|
|
confusion with the 'default' profile name).
|
|
Returns the cleaned list (may be empty).
|
|
"""
|
|
hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
|
|
result = []
|
|
for w in workspaces:
|
|
path = w.get('path', '')
|
|
name = w.get('name', '')
|
|
p = Path(path).resolve() if path else Path('/')
|
|
# Skip test artifacts
|
|
if 'test-workspace' in path or 'webui-mvp-test' in path:
|
|
continue
|
|
# Skip paths that no longer exist
|
|
if not p.is_dir():
|
|
continue
|
|
# Skip paths inside a named profile's directory (cross-profile leak)
|
|
try:
|
|
p.relative_to(hermes_profiles)
|
|
continue # it IS under profiles/ — remove it
|
|
except ValueError:
|
|
pass
|
|
# Rename confusing 'default' label to 'Home'
|
|
if name.lower() == 'default':
|
|
name = 'Home'
|
|
result.append({'path': str(p), 'name': name})
|
|
return result
|
|
|
|
|
|
def _migrate_global_workspaces() -> list:
|
|
"""Read the legacy global workspaces.json, clean it, and return the result.
|
|
|
|
This is the migration path for users upgrading from a pre-profile version:
|
|
their global file may contain cross-profile entries, test artifacts, and
|
|
stale paths accumulated over time. We clean it in-place and rewrite it.
|
|
"""
|
|
if not _GLOBAL_WS_FILE.exists():
|
|
return []
|
|
try:
|
|
raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
|
|
cleaned = _clean_workspace_list(raw)
|
|
if len(cleaned) != len(raw):
|
|
# Rewrite the cleaned version so future reads are already clean
|
|
_GLOBAL_WS_FILE.write_text(
|
|
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
|
|
)
|
|
return cleaned
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def load_workspaces() -> list:
|
|
ws_file = _workspaces_file()
|
|
if ws_file.exists():
|
|
try:
|
|
raw = json.loads(ws_file.read_text(encoding='utf-8'))
|
|
cleaned = _clean_workspace_list(raw)
|
|
if len(cleaned) != len(raw):
|
|
# Persist the cleaned version so stale entries don't keep reappearing
|
|
try:
|
|
ws_file.write_text(
|
|
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
|
|
)
|
|
except Exception:
|
|
logger.debug("Failed to persist cleaned workspace list")
|
|
return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
|
|
except Exception:
|
|
logger.debug("Failed to load workspaces from %s", ws_file)
|
|
# No profile-local file yet.
|
|
# For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
|
|
# For NAMED profiles: always start clean with just their own workspace.
|
|
try:
|
|
from api.profiles import get_active_profile_name
|
|
is_default = get_active_profile_name() in ('default', None)
|
|
except ImportError:
|
|
is_default = True
|
|
if is_default:
|
|
migrated = _migrate_global_workspaces()
|
|
if migrated:
|
|
return migrated
|
|
# Fresh start: single entry from the profile's configured workspace, labeled "Home"
|
|
return [{'path': _profile_default_workspace(), 'name': 'Home'}]
|
|
|
|
|
|
def save_workspaces(workspaces: list) -> None:
|
|
ws_file = _workspaces_file()
|
|
ws_file.parent.mkdir(parents=True, exist_ok=True)
|
|
ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
|
|
def get_last_workspace() -> str:
|
|
lw_file = _last_workspace_file()
|
|
if lw_file.exists():
|
|
try:
|
|
p = lw_file.read_text(encoding='utf-8').strip()
|
|
if p and Path(p).is_dir():
|
|
return p
|
|
except Exception:
|
|
logger.debug("Failed to read last workspace from %s", lw_file)
|
|
# Fallback: try global file
|
|
if _GLOBAL_LW_FILE.exists():
|
|
try:
|
|
p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
|
|
if p and Path(p).is_dir():
|
|
return p
|
|
except Exception:
|
|
logger.debug("Failed to read global last workspace")
|
|
return _profile_default_workspace()
|
|
|
|
|
|
def set_last_workspace(path: str) -> None:
|
|
try:
|
|
lw_file = _last_workspace_file()
|
|
lw_file.parent.mkdir(parents=True, exist_ok=True)
|
|
lw_file.write_text(str(path), encoding='utf-8')
|
|
except Exception:
|
|
logger.debug("Failed to set last workspace")
|
|
|
|
|
|
def safe_resolve_ws(root: Path, requested: str) -> Path:
|
|
"""Resolve a relative path inside a workspace root, raising ValueError on traversal."""
|
|
resolved = (root / requested).resolve()
|
|
resolved.relative_to(root.resolve())
|
|
return resolved
|
|
|
|
|
|
def list_dir(workspace: Path, rel: str='.'):
|
|
target = safe_resolve_ws(workspace, rel)
|
|
if not target.is_dir():
|
|
raise FileNotFoundError(f"Not a directory: {rel}")
|
|
entries = []
|
|
for item in sorted(target.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
|
|
entries.append({
|
|
'name': item.name,
|
|
'path': str(item.relative_to(workspace)),
|
|
'type': 'dir' if item.is_dir() else 'file',
|
|
'size': item.stat().st_size if item.is_file() else None,
|
|
})
|
|
if len(entries) >= 200:
|
|
break
|
|
return entries
|
|
|
|
|
|
def read_file_content(workspace: Path, rel: str) -> dict:
|
|
target = safe_resolve_ws(workspace, rel)
|
|
if not target.is_file():
|
|
raise FileNotFoundError(f"Not a file: {rel}")
|
|
size = target.stat().st_size
|
|
if size > MAX_FILE_BYTES:
|
|
raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
|
|
content = target.read_text(encoding='utf-8', errors='replace')
|
|
return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}
|
|
|
|
|
|
# ── Git detection ──────────────────────────────────────────────────────────
|
|
|
|
def _run_git(args, cwd, timeout=3):
|
|
"""Run a git command and return stdout, or None on failure."""
|
|
try:
|
|
r = subprocess.run(
|
|
['git'] + args, cwd=str(cwd), capture_output=True,
|
|
text=True, timeout=timeout,
|
|
)
|
|
return r.stdout.strip() if r.returncode == 0 else None
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
|
return None
|
|
|
|
|
|
def git_info_for_workspace(workspace: Path) -> dict:
|
|
"""Return git info for a workspace directory, or None if not a git repo."""
|
|
if not (workspace / '.git').exists():
|
|
return None
|
|
branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
|
|
if branch is None:
|
|
return None
|
|
# Status counts
|
|
status_out = _run_git(['status', '--porcelain'], workspace) or ''
|
|
lines = [l for l in status_out.splitlines() if l]
|
|
# git status --porcelain: XY format where X=index, Y=worktree
|
|
modified = sum(1 for l in lines if len(l) >= 2 and (l[0] in 'MAR' or l[1] in 'MAR'))
|
|
untracked = sum(1 for l in lines if l.startswith('??'))
|
|
dirty = len(lines)
|
|
# Ahead/behind
|
|
ahead = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
|
|
behind = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
|
|
return {
|
|
'branch': branch,
|
|
'dirty': dirty,
|
|
'modified': modified,
|
|
'untracked': untracked,
|
|
'ahead': int(ahead) if ahead and ahead.isdigit() else 0,
|
|
'behind': int(behind) if behind and behind.isdigit() else 0,
|
|
'is_git': True,
|
|
}
|