Files
webui/api/workspace.py
nesquena-hermes 37850a4dfd fix: workspace list cleaner — all 1055 tests pass (#418)
* fix: workspace list cleaner — allow own-profile paths, remove brittle string filter

Two bugs in _clean_workspace_list() caused workspace adds to silently vanish
on the next load, making the duplicate-check test and workspace rename test fail:

1. Brittle string filter: 'if test-workspace in path or webui-mvp-test in path:
   continue' — removed. The test server's workspace IS under these paths, so any
   workspace added during testing got silently dropped on the next load_workspaces()
   call. The p.is_dir() check already handles non-existent paths.

2. Cross-profile filter too broad: 'if p is under ~/.hermes/profiles/: skip' —
   this correctly blocked cross-profile leakage but also blocked the current
   profile's own paths (e.g. ~/.hermes/profiles/webui/webui-mvp-test/...).
   Fixed: only skip if the path is under profiles/ AND under a DIFFERENT profile's
   directory. Paths under the current profile's own home are kept.

* docs: v0.50.36 release — version badge and CHANGELOG

---------

Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
2026-04-14 00:14:25 -07:00

367 lines
14 KiB
Python

"""
Hermes Web UI -- Workspace and file system helpers.
Workspace lists and last-used workspace are stored per-profile so each
profile has its own workspace configuration. State files live at
``{profile_home}/webui_state/workspaces.json`` and
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
paths are used as fallback when no profile module is available.
"""
import json
import logging
import os
import subprocess
from pathlib import Path
logger = logging.getLogger(__name__)
from api.config import (
WORKSPACES_FILE as _GLOBAL_WS_FILE,
LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
)
# ── Profile-aware path resolution ───────────────────────────────────────────
def _profile_state_dir() -> Path:
"""Return the webui_state directory for the active profile.
For the default profile, returns the global STATE_DIR (respects
HERMES_WEBUI_STATE_DIR env var for test isolation).
For named profiles, returns {profile_home}/webui_state/.
"""
try:
from api.profiles import get_active_profile_name, get_active_hermes_home
name = get_active_profile_name()
if name and name != 'default':
d = get_active_hermes_home() / 'webui_state'
d.mkdir(parents=True, exist_ok=True)
return d
except ImportError:
logger.debug("Failed to import profiles module, using global state dir")
return _GLOBAL_WS_FILE.parent
def _workspaces_file() -> Path:
"""Return the workspaces.json path for the active profile."""
return _profile_state_dir() / 'workspaces.json'
def _last_workspace_file() -> Path:
"""Return the last_workspace.txt path for the active profile."""
return _profile_state_dir() / 'last_workspace.txt'
def _profile_default_workspace() -> str:
"""Read the profile's default workspace from its config.yaml.
Checks keys in priority order:
1. 'workspace' — explicit webui workspace key
2. 'default_workspace' — alternate explicit key
3. 'terminal.cwd' — hermes-agent terminal working dir (most common)
Falls back to the boot-time DEFAULT_WORKSPACE constant.
"""
try:
from api.config import get_config
cfg = get_config()
# Explicit webui workspace keys first
for key in ('workspace', 'default_workspace'):
ws = cfg.get(key)
if ws:
p = Path(str(ws)).expanduser().resolve()
if p.is_dir():
return str(p)
# Fall through to terminal.cwd — the agent's configured working directory
terminal_cfg = cfg.get('terminal', {})
if isinstance(terminal_cfg, dict):
cwd = terminal_cfg.get('cwd', '')
if cwd and str(cwd) not in ('.', ''):
p = Path(str(cwd)).expanduser().resolve()
if p.is_dir():
return str(p)
except (ImportError, Exception):
logger.debug("Failed to load profile default workspace config")
return str(_BOOT_DEFAULT_WORKSPACE)
# ── Public API ──────────────────────────────────────────────────────────────
def _clean_workspace_list(workspaces: list) -> list:
"""Sanitize a workspace list:
- Remove entries whose paths no longer exist on disk.
- Remove entries whose paths live inside another profile's directory
(e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
- Rename any entry whose name is literally 'default' to 'Home' (avoids
confusion with the 'default' profile name).
Returns the cleaned list (may be empty).
"""
hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
result = []
for w in workspaces:
path = w.get('path', '')
name = w.get('name', '')
p = Path(path).resolve() if path else Path('/')
# Skip paths that no longer exist
if not p.is_dir():
continue
# Skip paths inside a DIFFERENT profile's directory (cross-profile leak).
# Allow paths inside the CURRENT profile's own directory (e.g. test workspaces
# created under ~/.hermes/profiles/webui/webui-mvp-test/).
try:
p.relative_to(hermes_profiles)
# p is under ~/.hermes/profiles/ — only skip if it's under a DIFFERENT profile
try:
from api.profiles import get_active_hermes_home
own_profile_dir = get_active_hermes_home().resolve()
p.relative_to(own_profile_dir)
# p is under our own profile dir — keep it
except (ValueError, Exception):
continue # under profiles/ but not our own — cross-profile leak, skip
except ValueError:
pass # not under profiles/ at all — keep it
# Rename confusing 'default' label to 'Home'
if name.lower() == 'default':
name = 'Home'
result.append({'path': str(p), 'name': name})
return result
def _migrate_global_workspaces() -> list:
"""Read the legacy global workspaces.json, clean it, and return the result.
This is the migration path for users upgrading from a pre-profile version:
their global file may contain cross-profile entries, test artifacts, and
stale paths accumulated over time. We clean it in-place and rewrite it.
"""
if not _GLOBAL_WS_FILE.exists():
return []
try:
raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
cleaned = _clean_workspace_list(raw)
if len(cleaned) != len(raw):
# Rewrite the cleaned version so future reads are already clean
_GLOBAL_WS_FILE.write_text(
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
)
return cleaned
except Exception:
return []
def load_workspaces() -> list:
ws_file = _workspaces_file()
if ws_file.exists():
try:
raw = json.loads(ws_file.read_text(encoding='utf-8'))
cleaned = _clean_workspace_list(raw)
if len(cleaned) != len(raw):
# Persist the cleaned version so stale entries don't keep reappearing
try:
ws_file.write_text(
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
)
except Exception:
logger.debug("Failed to persist cleaned workspace list")
return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
except Exception:
logger.debug("Failed to load workspaces from %s", ws_file)
# No profile-local file yet.
# For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
# For NAMED profiles: always start clean with just their own workspace.
try:
from api.profiles import get_active_profile_name
is_default = get_active_profile_name() in ('default', None)
except ImportError:
is_default = True
if is_default:
migrated = _migrate_global_workspaces()
if migrated:
return migrated
# Fresh start: single entry from the profile's configured workspace, labeled "Home"
return [{'path': _profile_default_workspace(), 'name': 'Home'}]
def save_workspaces(workspaces: list) -> None:
ws_file = _workspaces_file()
ws_file.parent.mkdir(parents=True, exist_ok=True)
ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')
def get_last_workspace() -> str:
lw_file = _last_workspace_file()
if lw_file.exists():
try:
p = lw_file.read_text(encoding='utf-8').strip()
if p and Path(p).is_dir():
return p
except Exception:
logger.debug("Failed to read last workspace from %s", lw_file)
# Fallback: try global file
if _GLOBAL_LW_FILE.exists():
try:
p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
if p and Path(p).is_dir():
return p
except Exception:
logger.debug("Failed to read global last workspace")
return _profile_default_workspace()
def set_last_workspace(path: str) -> None:
try:
lw_file = _last_workspace_file()
lw_file.parent.mkdir(parents=True, exist_ok=True)
lw_file.write_text(str(path), encoding='utf-8')
except Exception:
logger.debug("Failed to set last workspace")
def resolve_trusted_workspace(path: str | Path | None = None) -> Path:
"""Resolve and validate a workspace path.
A path is trusted if it satisfies at least one of:
(A) It is under the user's home directory (Path.home()).
Works cross-platform: ~/... on Linux/macOS, C:\\Users\\... on Windows.
(B) It is already in the profile's saved workspace list.
This covers self-hosted deployments where workspaces live outside home
(e.g. /data/projects, /opt/workspace) — once a workspace is saved by
an admin, it can be reused without re-validation.
Additionally enforced regardless of (A)/(B):
1. The path must exist.
2. The path must be a directory.
3. The path must not be a known system root (/etc, /usr, /var, /bin, /sbin,
/boot, /proc, /sys, /dev, /root on Linux/macOS; Windows system dirs).
This prevents even admin-saved workspaces from pointing at OS internals.
None/empty path falls back to the boot-time DEFAULT_WORKSPACE, which is always
trusted (it was validated at server startup).
"""
_BLOCKED_SYSTEM_ROOTS = {
# Linux / macOS
Path('/etc'), Path('/usr'), Path('/var'), Path('/bin'), Path('/sbin'),
Path('/boot'), Path('/proc'), Path('/sys'), Path('/dev'), Path('/root'),
Path('/lib'), Path('/lib64'), Path('/opt/homebrew'),
}
if path in (None, ""):
return Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
candidate = Path(path).expanduser().resolve()
if not candidate.exists():
raise ValueError(f"Path does not exist: {candidate}")
if not candidate.is_dir():
raise ValueError(f"Path is not a directory: {candidate}")
# Block known system roots and their children
for blocked in _BLOCKED_SYSTEM_ROOTS:
try:
candidate.relative_to(blocked)
raise ValueError(f"Path points to a system directory: {candidate}")
except ValueError as e:
if "system directory" in str(e):
raise
# relative_to raised ValueError = candidate is NOT under blocked = safe
# (A) Trusted if under the user's home directory — cross-platform via Path.home()
try:
candidate.relative_to(Path.home().resolve())
return candidate
except ValueError:
pass
# (B) Trusted if already in the saved workspace list — covers non-home installs
try:
saved = load_workspaces()
saved_paths = {Path(w["path"]).resolve() for w in saved if w.get("path")}
if candidate in saved_paths:
return candidate
except Exception:
pass
raise ValueError(
f"Path is outside the user home directory and not in the saved workspace "
f"list: {candidate}. Add it via Settings → Workspaces first."
)
def safe_resolve_ws(root: Path, requested: str) -> Path:
"""Resolve a relative path inside a workspace root, raising ValueError on traversal."""
resolved = (root / requested).resolve()
resolved.relative_to(root.resolve())
return resolved
def list_dir(workspace: Path, rel: str='.'):
target = safe_resolve_ws(workspace, rel)
if not target.is_dir():
raise FileNotFoundError(f"Not a directory: {rel}")
entries = []
for item in sorted(target.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
entries.append({
'name': item.name,
'path': str(item.relative_to(workspace)),
'type': 'dir' if item.is_dir() else 'file',
'size': item.stat().st_size if item.is_file() else None,
})
if len(entries) >= 200:
break
return entries
def read_file_content(workspace: Path, rel: str) -> dict:
target = safe_resolve_ws(workspace, rel)
if not target.is_file():
raise FileNotFoundError(f"Not a file: {rel}")
size = target.stat().st_size
if size > MAX_FILE_BYTES:
raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
content = target.read_text(encoding='utf-8', errors='replace')
return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}
# ── Git detection ──────────────────────────────────────────────────────────
def _run_git(args, cwd, timeout=3):
"""Run a git command and return stdout, or None on failure."""
try:
r = subprocess.run(
['git'] + args, cwd=str(cwd), capture_output=True,
text=True, timeout=timeout,
)
return r.stdout.strip() if r.returncode == 0 else None
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
def git_info_for_workspace(workspace: Path) -> dict:
"""Return git info for a workspace directory, or None if not a git repo."""
if not (workspace / '.git').exists():
return None
branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
if branch is None:
return None
# Status counts
status_out = _run_git(['status', '--porcelain'], workspace) or ''
lines = [l for l in status_out.splitlines() if l]
# git status --porcelain: XY format where X=index, Y=worktree
modified = sum(1 for l in lines if len(l) >= 2 and (l[0] in 'MAR' or l[1] in 'MAR'))
untracked = sum(1 for l in lines if l.startswith('??'))
dirty = len(lines)
# Ahead/behind
ahead = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
behind = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
return {
'branch': branch,
'dirty': dirty,
'modified': modified,
'untracked': untracked,
'ahead': int(ahead) if ahead and ahead.isdigit() else 0,
'behind': int(behind) if behind and behind.isdigit() else 0,
'is_git': True,
}