When the workspace root is a git repo, a badge in the panel header shows the current branch name, dirty file count, and ahead/behind status. Updates on every root directory load. Backend: - git_info_for_workspace() in api/workspace.py runs lightweight git commands (rev-parse, status --porcelain, rev-list) with 3s timeout - New GET /api/git-info endpoint returns branch, dirty count, modified, untracked, ahead, behind Frontend: - _refreshGitBadge() in workspace.js fetches git info on root load - Git badge element in panel header shows branch + status - Badge turns gold when workspace has uncommitted changes Inspired by PR #75 (@MartinNielsenDev). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
287 lines
11 KiB
Python
287 lines
11 KiB
Python
"""
|
|
Hermes Web UI -- Workspace and file system helpers.
|
|
|
|
Workspace lists and last-used workspace are stored per-profile so each
|
|
profile has its own workspace configuration. State files live at
|
|
``{profile_home}/webui_state/workspaces.json`` and
|
|
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
|
|
paths are used as fallback when no profile module is available.
|
|
"""
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from api.config import (
|
|
WORKSPACES_FILE as _GLOBAL_WS_FILE,
|
|
LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
|
|
DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
|
|
MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
|
|
)
|
|
|
|
|
|
# ── Profile-aware path resolution ───────────────────────────────────────────
|
|
|
|
def _profile_state_dir() -> Path:
|
|
"""Return the webui_state directory for the active profile.
|
|
|
|
For the default profile, returns the global STATE_DIR (respects
|
|
HERMES_WEBUI_STATE_DIR env var for test isolation).
|
|
For named profiles, returns {profile_home}/webui_state/.
|
|
"""
|
|
try:
|
|
from api.profiles import get_active_profile_name, get_active_hermes_home
|
|
name = get_active_profile_name()
|
|
if name and name != 'default':
|
|
d = get_active_hermes_home() / 'webui_state'
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
except ImportError:
|
|
pass
|
|
return _GLOBAL_WS_FILE.parent
|
|
|
|
|
|
def _workspaces_file() -> Path:
|
|
"""Return the workspaces.json path for the active profile."""
|
|
return _profile_state_dir() / 'workspaces.json'
|
|
|
|
|
|
def _last_workspace_file() -> Path:
|
|
"""Return the last_workspace.txt path for the active profile."""
|
|
return _profile_state_dir() / 'last_workspace.txt'
|
|
|
|
|
|
def _profile_default_workspace() -> str:
|
|
"""Read the profile's default workspace from its config.yaml.
|
|
|
|
Checks keys in priority order:
|
|
1. 'workspace' — explicit webui workspace key
|
|
2. 'default_workspace' — alternate explicit key
|
|
3. 'terminal.cwd' — hermes-agent terminal working dir (most common)
|
|
|
|
Falls back to the boot-time DEFAULT_WORKSPACE constant.
|
|
"""
|
|
try:
|
|
from api.config import get_config
|
|
cfg = get_config()
|
|
# Explicit webui workspace keys first
|
|
for key in ('workspace', 'default_workspace'):
|
|
ws = cfg.get(key)
|
|
if ws:
|
|
p = Path(str(ws)).expanduser().resolve()
|
|
if p.is_dir():
|
|
return str(p)
|
|
# Fall through to terminal.cwd — the agent's configured working directory
|
|
terminal_cfg = cfg.get('terminal', {})
|
|
if isinstance(terminal_cfg, dict):
|
|
cwd = terminal_cfg.get('cwd', '')
|
|
if cwd and str(cwd) not in ('.', ''):
|
|
p = Path(str(cwd)).expanduser().resolve()
|
|
if p.is_dir():
|
|
return str(p)
|
|
except (ImportError, Exception):
|
|
pass
|
|
return str(_BOOT_DEFAULT_WORKSPACE)
|
|
|
|
|
|
# ── Public API ──────────────────────────────────────────────────────────────
|
|
|
|
def _clean_workspace_list(workspaces: list) -> list:
|
|
"""Sanitize a workspace list:
|
|
- Remove entries whose paths no longer exist on disk.
|
|
- Remove entries that look like test artifacts (webui-mvp-test, test-workspace).
|
|
- Remove entries whose paths live inside another profile's directory
|
|
(e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
|
|
- Rename any entry whose name is literally 'default' to 'Home' (avoids
|
|
confusion with the 'default' profile name).
|
|
Returns the cleaned list (may be empty).
|
|
"""
|
|
hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
|
|
result = []
|
|
for w in workspaces:
|
|
path = w.get('path', '')
|
|
name = w.get('name', '')
|
|
p = Path(path).resolve() if path else Path('/')
|
|
# Skip test artifacts
|
|
if 'test-workspace' in path or 'webui-mvp-test' in path:
|
|
continue
|
|
# Skip paths that no longer exist
|
|
if not p.is_dir():
|
|
continue
|
|
# Skip paths inside a named profile's directory (cross-profile leak)
|
|
try:
|
|
p.relative_to(hermes_profiles)
|
|
continue # it IS under profiles/ — remove it
|
|
except ValueError:
|
|
pass
|
|
# Rename confusing 'default' label to 'Home'
|
|
if name.lower() == 'default':
|
|
name = 'Home'
|
|
result.append({'path': str(p), 'name': name})
|
|
return result
|
|
|
|
|
|
def _migrate_global_workspaces() -> list:
|
|
"""Read the legacy global workspaces.json, clean it, and return the result.
|
|
|
|
This is the migration path for users upgrading from a pre-profile version:
|
|
their global file may contain cross-profile entries, test artifacts, and
|
|
stale paths accumulated over time. We clean it in-place and rewrite it.
|
|
"""
|
|
if not _GLOBAL_WS_FILE.exists():
|
|
return []
|
|
try:
|
|
raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
|
|
cleaned = _clean_workspace_list(raw)
|
|
if len(cleaned) != len(raw):
|
|
# Rewrite the cleaned version so future reads are already clean
|
|
_GLOBAL_WS_FILE.write_text(
|
|
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
|
|
)
|
|
return cleaned
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def load_workspaces() -> list:
|
|
ws_file = _workspaces_file()
|
|
if ws_file.exists():
|
|
try:
|
|
raw = json.loads(ws_file.read_text(encoding='utf-8'))
|
|
cleaned = _clean_workspace_list(raw)
|
|
if len(cleaned) != len(raw):
|
|
# Persist the cleaned version so stale entries don't keep reappearing
|
|
try:
|
|
ws_file.write_text(
|
|
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
|
|
)
|
|
except Exception:
|
|
pass
|
|
return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
|
|
except Exception:
|
|
pass
|
|
# No profile-local file yet.
|
|
# For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
|
|
# For NAMED profiles: always start clean with just their own workspace.
|
|
try:
|
|
from api.profiles import get_active_profile_name
|
|
is_default = get_active_profile_name() in ('default', None)
|
|
except ImportError:
|
|
is_default = True
|
|
if is_default:
|
|
migrated = _migrate_global_workspaces()
|
|
if migrated:
|
|
return migrated
|
|
# Fresh start: single entry from the profile's configured workspace, labeled "Home"
|
|
return [{'path': _profile_default_workspace(), 'name': 'Home'}]
|
|
|
|
|
|
def save_workspaces(workspaces: list):
|
|
ws_file = _workspaces_file()
|
|
ws_file.parent.mkdir(parents=True, exist_ok=True)
|
|
ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
|
|
def get_last_workspace() -> str:
|
|
lw_file = _last_workspace_file()
|
|
if lw_file.exists():
|
|
try:
|
|
p = lw_file.read_text(encoding='utf-8').strip()
|
|
if p and Path(p).is_dir():
|
|
return p
|
|
except Exception:
|
|
pass
|
|
# Fallback: try global file
|
|
if _GLOBAL_LW_FILE.exists():
|
|
try:
|
|
p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
|
|
if p and Path(p).is_dir():
|
|
return p
|
|
except Exception:
|
|
pass
|
|
return _profile_default_workspace()
|
|
|
|
|
|
def set_last_workspace(path: str):
|
|
try:
|
|
lw_file = _last_workspace_file()
|
|
lw_file.parent.mkdir(parents=True, exist_ok=True)
|
|
lw_file.write_text(str(path), encoding='utf-8')
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def safe_resolve_ws(root: Path, requested: str) -> Path:
|
|
"""Resolve a relative path inside a workspace root, raising ValueError on traversal."""
|
|
resolved = (root / requested).resolve()
|
|
resolved.relative_to(root.resolve())
|
|
return resolved
|
|
|
|
|
|
def list_dir(workspace: Path, rel='.'):
|
|
target = safe_resolve_ws(workspace, rel)
|
|
if not target.is_dir():
|
|
raise FileNotFoundError(f"Not a directory: {rel}")
|
|
entries = []
|
|
for item in sorted(target.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
|
|
entries.append({
|
|
'name': item.name,
|
|
'path': str(item.relative_to(workspace)),
|
|
'type': 'dir' if item.is_dir() else 'file',
|
|
'size': item.stat().st_size if item.is_file() else None,
|
|
})
|
|
if len(entries) >= 200:
|
|
break
|
|
return entries
|
|
|
|
|
|
def read_file_content(workspace: Path, rel: str):
|
|
target = safe_resolve_ws(workspace, rel)
|
|
if not target.is_file():
|
|
raise FileNotFoundError(f"Not a file: {rel}")
|
|
size = target.stat().st_size
|
|
if size > MAX_FILE_BYTES:
|
|
raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
|
|
content = target.read_text(encoding='utf-8', errors='replace')
|
|
return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}
|
|
|
|
|
|
# ── Git detection ──────────────────────────────────────────────────────────
|
|
|
|
def _run_git(args, cwd, timeout=3):
|
|
"""Run a git command and return stdout, or None on failure."""
|
|
try:
|
|
r = subprocess.run(
|
|
['git'] + args, cwd=str(cwd), capture_output=True,
|
|
text=True, timeout=timeout,
|
|
)
|
|
return r.stdout.strip() if r.returncode == 0 else None
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
|
return None
|
|
|
|
|
|
def git_info_for_workspace(workspace: Path) -> dict:
|
|
"""Return git info for a workspace directory, or None if not a git repo."""
|
|
if not (workspace / '.git').exists():
|
|
return None
|
|
branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
|
|
if branch is None:
|
|
return None
|
|
# Status counts
|
|
status_out = _run_git(['status', '--porcelain'], workspace) or ''
|
|
modified = sum(1 for l in status_out.splitlines() if l and l[0:2].strip() in ('M', 'MM', 'AM'))
|
|
untracked = sum(1 for l in status_out.splitlines() if l.startswith('??'))
|
|
dirty = len(status_out.splitlines()) if status_out else 0
|
|
# Ahead/behind
|
|
ahead = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
|
|
behind = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
|
|
return {
|
|
'branch': branch,
|
|
'dirty': dirty,
|
|
'modified': modified,
|
|
'untracked': untracked,
|
|
'ahead': int(ahead) if ahead and ahead.isdigit() else 0,
|
|
'behind': int(behind) if behind and behind.isdigit() else 0,
|
|
'is_git': True,
|
|
}
|