🚀 Initial commit: Rose's custom WebUI with modernization + agent attribution

This commit is contained in:
Rose
2026-04-20 10:36:59 +02:00
parent 3bdf430413
commit 99dd1f57ae
118 changed files with 41900 additions and 0 deletions

265
api/gateways.py Normal file
View File

@@ -0,0 +1,265 @@
"""
Gateway management API for Hermes WebUI.
Provides endpoints to list, start, stop, restart, and add gateway connections
like Telegram, OpenClaw, and other Hermes gateway types.
"""
import os
import re
import subprocess
import threading
import time
from pathlib import Path
from typing import Optional
# In-memory gateway registry (gateway_name -> info)
_gateways: dict[str, dict] = {}
_gateways_lock = threading.Lock()
# Track running gateway processes (gateway_name -> PID)
_gateway_pids: dict[str, int] = {}
def _get_hermes_home() -> Path:
"""Get the Hermes home directory."""
hermes_home = os.environ.get("HERMES_HOME", "")
if hermes_home and str(Path(hermes_home).parent) != "profiles":
return Path(hermes_home)
return Path.home() / ".hermes"
def _get_gateway_pid(name: str) -> Optional[int]:
"""Get PID of a running gateway process by name."""
try:
result = subprocess.run(
["pgrep", "-f", f"hermes.*gateway.*{name}"],
capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split("\n")
return int(pids[0]) if pids else None
except Exception:
pass
return _gateway_pids.get(name)
def _is_gateway_running(name: str) -> bool:
"""Check if a gateway process is running."""
pid = _get_gateway_pid(name)
if pid:
try:
os.kill(pid, 0)
return True
except OSError:
pass
return False
def _get_gateway_info(name: str) -> str:
"""Get additional info about a gateway."""
pid = _get_gateway_pid(name)
if pid:
try:
result = subprocess.run(
["ps", "-p", str(pid), "-o", "etime=", "-o", "args="],
capture_output=True, text=True
)
if result.returncode == 0:
parts = result.stdout.strip().split(None, 1)
if len(parts) >= 2:
elapsed = parts[0]
return f"PID {pid} · running {elapsed}"
except Exception:
pass
return f"PID {pid}"
return ""
def _detect_telegram_gateway() -> dict:
"""Detect if Telegram gateway is configured and running."""
hermes_home = _get_hermes_home()
gateway_running = False
info = ""
# Check if there's a telegram gateway config
config_paths = [
hermes_home / "gateways" / "telegram",
hermes_home / "gateway" / "telegram",
hermes_home / ".env",
]
has_config = False
for p in config_paths:
if p.exists():
has_config = True
break
if has_config:
gateway_running = _is_gateway_running("telegram")
if gateway_running:
info = _get_gateway_info("telegram")
return {
"name": "telegram",
"type": "telegram",
"running": gateway_running,
"info": info,
"has_config": has_config,
}
def _detect_openclaw_gateway() -> dict:
"""Detect if OpenClaw gateway is configured and running."""
hermes_home = _get_hermes_home()
gateway_running = False
info = ""
config_paths = [
hermes_home / "gateways" / "openclaw",
hermes_home / "gateway" / "openclaw",
]
has_config = False
for p in config_paths:
if p.exists():
has_config = True
break
if has_config:
gateway_running = _is_gateway_running("openclaw")
if gateway_running:
info = _get_gateway_info("openclaw")
return {
"name": "openclaw",
"type": "openclaw",
"running": gateway_running,
"info": info,
"has_config": has_config,
}
def _discover_gateways() -> list[dict]:
"""Discover all available and configured gateways."""
gateways = []
# Always show telegram if detected
telegram = _detect_telegram_gateway()
gateways.append(telegram)
# Check for openclaw
openclaw = _detect_openclaw_gateway()
gateways.append(openclaw)
# Add any manually registered gateways
with _gateways_lock:
for name, info in _gateways.items():
if not any(g["name"] == name for g in gateways):
running = _is_gateway_running(name)
gw_info = _get_gateway_info(name) if running else ""
gateways.append({
"name": name,
"type": info.get("type", "unknown"),
"running": running,
"info": gw_info,
"has_config": True,
})
return gateways
def list_gateways_api() -> list[dict]:
"""List all gateways with their status."""
return _discover_gateways()
def start_gateway_api(name: str) -> dict:
"""Start a gateway by name."""
# Check if already running
if _is_gateway_running(name):
raise RuntimeError(f"Gateway '{name}' is already running")
hermes_home = _get_hermes_home()
# Determine the gateway type and command
if name == "telegram":
cmd = ["hermes", "gateway", "run", "--type", "telegram"]
elif name == "openclaw":
cmd = ["hermes", "gateway", "run", "--type", "openclaw"]
else:
cmd = ["hermes", "gateway", "run", "--name", name]
# Start the gateway process
try:
proc = subprocess.Popen(
cmd,
cwd=str(hermes_home),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
_gateway_pids[name] = proc.pid
# Give it a moment to start
time.sleep(1)
if proc.poll() is not None:
# Process already terminated
stdout, stderr = proc.communicate()
raise RuntimeError(f"Gateway failed to start: {stderr.decode()[:200]}")
return {"ok": True, "message": f"Gateway '{name}' started", "pid": proc.pid}
except FileNotFoundError:
raise RuntimeError("hermes CLI not found in PATH. Is Hermes installed?")
except Exception as e:
raise RuntimeError(f"Failed to start gateway: {e}")
def stop_gateway_api(name: str) -> dict:
"""Stop a gateway by name."""
pid = _get_gateway_pid(name)
if not pid:
raise RuntimeError(f"Gateway '{name}' is not running")
try:
os.kill(pid, 9) # SIGKILL
time.sleep(0.5)
if name in _gateway_pids:
del _gateway_pids[name]
return {"ok": True, "message": f"Gateway '{name}' stopped"}
except OSError as e:
raise RuntimeError(f"Failed to stop gateway: {e}")
def restart_gateway_api(name: str) -> dict:
"""Restart a gateway by name."""
# Check if running first
if not _is_gateway_running(name):
raise RuntimeError(f"Gateway '{name}' is not running")
# Stop it
stop_gateway_api(name)
time.sleep(1)
# Start it again
return start_gateway_api(name)
def add_gateway_api(name: str, gw_type: str = "telegram") -> dict:
"""Register a new gateway."""
# Validate name
if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{0,63}$", name):
raise ValueError("Invalid gateway name")
# Check if already exists
for g in _discover_gateways():
if g["name"] == name:
raise FileExistsError(f"Gateway '{name}' already exists")
with _gateways_lock:
_gateways[name] = {"type": gw_type, "registered_at": time.time()}
return {"ok": True, "message": f"Gateway '{name}' added as {gw_type}"}