266 lines
7.4 KiB
Python
266 lines
7.4 KiB
Python
"""
|
|
Gateway management API for Hermes WebUI.
|
|
|
|
Provides endpoints to list, start, stop, restart, and add gateway connections
|
|
like Telegram, OpenClaw, and other Hermes gateway types.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# In-memory gateway registry (gateway_name -> info)
|
|
_gateways: dict[str, dict] = {}
|
|
_gateways_lock = threading.Lock()
|
|
|
|
# Track running gateway processes (gateway_name -> PID)
|
|
_gateway_pids: dict[str, int] = {}
|
|
|
|
|
|
def _get_hermes_home() -> Path:
|
|
"""Get the Hermes home directory."""
|
|
hermes_home = os.environ.get("HERMES_HOME", "")
|
|
if hermes_home and str(Path(hermes_home).parent) != "profiles":
|
|
return Path(hermes_home)
|
|
return Path.home() / ".hermes"
|
|
|
|
|
|
def _get_gateway_pid(name: str) -> Optional[int]:
|
|
"""Get PID of a running gateway process by name."""
|
|
try:
|
|
result = subprocess.run(
|
|
["pgrep", "-f", f"hermes.*gateway.*{name}"],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
pids = result.stdout.strip().split("\n")
|
|
return int(pids[0]) if pids else None
|
|
except Exception:
|
|
pass
|
|
return _gateway_pids.get(name)
|
|
|
|
|
|
def _is_gateway_running(name: str) -> bool:
|
|
"""Check if a gateway process is running."""
|
|
pid = _get_gateway_pid(name)
|
|
if pid:
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except OSError:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _get_gateway_info(name: str) -> str:
|
|
"""Get additional info about a gateway."""
|
|
pid = _get_gateway_pid(name)
|
|
if pid:
|
|
try:
|
|
result = subprocess.run(
|
|
["ps", "-p", str(pid), "-o", "etime=", "-o", "args="],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0:
|
|
parts = result.stdout.strip().split(None, 1)
|
|
if len(parts) >= 2:
|
|
elapsed = parts[0]
|
|
return f"PID {pid} · running {elapsed}"
|
|
except Exception:
|
|
pass
|
|
return f"PID {pid}"
|
|
return ""
|
|
|
|
|
|
def _detect_telegram_gateway() -> dict:
|
|
"""Detect if Telegram gateway is configured and running."""
|
|
hermes_home = _get_hermes_home()
|
|
gateway_running = False
|
|
info = ""
|
|
|
|
# Check if there's a telegram gateway config
|
|
config_paths = [
|
|
hermes_home / "gateways" / "telegram",
|
|
hermes_home / "gateway" / "telegram",
|
|
hermes_home / ".env",
|
|
]
|
|
|
|
has_config = False
|
|
for p in config_paths:
|
|
if p.exists():
|
|
has_config = True
|
|
break
|
|
|
|
if has_config:
|
|
gateway_running = _is_gateway_running("telegram")
|
|
if gateway_running:
|
|
info = _get_gateway_info("telegram")
|
|
|
|
return {
|
|
"name": "telegram",
|
|
"type": "telegram",
|
|
"running": gateway_running,
|
|
"info": info,
|
|
"has_config": has_config,
|
|
}
|
|
|
|
|
|
def _detect_openclaw_gateway() -> dict:
|
|
"""Detect if OpenClaw gateway is configured and running."""
|
|
hermes_home = _get_hermes_home()
|
|
gateway_running = False
|
|
info = ""
|
|
|
|
config_paths = [
|
|
hermes_home / "gateways" / "openclaw",
|
|
hermes_home / "gateway" / "openclaw",
|
|
]
|
|
|
|
has_config = False
|
|
for p in config_paths:
|
|
if p.exists():
|
|
has_config = True
|
|
break
|
|
|
|
if has_config:
|
|
gateway_running = _is_gateway_running("openclaw")
|
|
if gateway_running:
|
|
info = _get_gateway_info("openclaw")
|
|
|
|
return {
|
|
"name": "openclaw",
|
|
"type": "openclaw",
|
|
"running": gateway_running,
|
|
"info": info,
|
|
"has_config": has_config,
|
|
}
|
|
|
|
|
|
def _discover_gateways() -> list[dict]:
|
|
"""Discover all available and configured gateways."""
|
|
gateways = []
|
|
|
|
# Always show telegram if detected
|
|
telegram = _detect_telegram_gateway()
|
|
gateways.append(telegram)
|
|
|
|
# Check for openclaw
|
|
openclaw = _detect_openclaw_gateway()
|
|
gateways.append(openclaw)
|
|
|
|
# Add any manually registered gateways
|
|
with _gateways_lock:
|
|
for name, info in _gateways.items():
|
|
if not any(g["name"] == name for g in gateways):
|
|
running = _is_gateway_running(name)
|
|
gw_info = _get_gateway_info(name) if running else ""
|
|
gateways.append({
|
|
"name": name,
|
|
"type": info.get("type", "unknown"),
|
|
"running": running,
|
|
"info": gw_info,
|
|
"has_config": True,
|
|
})
|
|
|
|
return gateways
|
|
|
|
|
|
def list_gateways_api() -> list[dict]:
|
|
"""List all gateways with their status."""
|
|
return _discover_gateways()
|
|
|
|
|
|
def start_gateway_api(name: str) -> dict:
|
|
"""Start a gateway by name."""
|
|
# Check if already running
|
|
if _is_gateway_running(name):
|
|
raise RuntimeError(f"Gateway '{name}' is already running")
|
|
|
|
hermes_home = _get_hermes_home()
|
|
|
|
# Determine the gateway type and command
|
|
if name == "telegram":
|
|
cmd = ["hermes", "gateway", "run", "--type", "telegram"]
|
|
elif name == "openclaw":
|
|
cmd = ["hermes", "gateway", "run", "--type", "openclaw"]
|
|
else:
|
|
cmd = ["hermes", "gateway", "run", "--name", name]
|
|
|
|
# Start the gateway process
|
|
try:
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
cwd=str(hermes_home),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
start_new_session=True
|
|
)
|
|
_gateway_pids[name] = proc.pid
|
|
|
|
# Give it a moment to start
|
|
time.sleep(1)
|
|
|
|
if proc.poll() is not None:
|
|
# Process already terminated
|
|
stdout, stderr = proc.communicate()
|
|
raise RuntimeError(f"Gateway failed to start: {stderr.decode()[:200]}")
|
|
|
|
return {"ok": True, "message": f"Gateway '{name}' started", "pid": proc.pid}
|
|
|
|
except FileNotFoundError:
|
|
raise RuntimeError("hermes CLI not found in PATH. Is Hermes installed?")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to start gateway: {e}")
|
|
|
|
|
|
def stop_gateway_api(name: str) -> dict:
|
|
"""Stop a gateway by name."""
|
|
pid = _get_gateway_pid(name)
|
|
|
|
if not pid:
|
|
raise RuntimeError(f"Gateway '{name}' is not running")
|
|
|
|
try:
|
|
os.kill(pid, 9) # SIGKILL
|
|
time.sleep(0.5)
|
|
if name in _gateway_pids:
|
|
del _gateway_pids[name]
|
|
return {"ok": True, "message": f"Gateway '{name}' stopped"}
|
|
except OSError as e:
|
|
raise RuntimeError(f"Failed to stop gateway: {e}")
|
|
|
|
|
|
def restart_gateway_api(name: str) -> dict:
|
|
"""Restart a gateway by name."""
|
|
# Check if running first
|
|
if not _is_gateway_running(name):
|
|
raise RuntimeError(f"Gateway '{name}' is not running")
|
|
|
|
# Stop it
|
|
stop_gateway_api(name)
|
|
time.sleep(1)
|
|
|
|
# Start it again
|
|
return start_gateway_api(name)
|
|
|
|
|
|
def add_gateway_api(name: str, gw_type: str = "telegram") -> dict:
|
|
"""Register a new gateway."""
|
|
# Validate name
|
|
if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{0,63}$", name):
|
|
raise ValueError("Invalid gateway name")
|
|
|
|
# Check if already exists
|
|
for g in _discover_gateways():
|
|
if g["name"] == name:
|
|
raise FileExistsError(f"Gateway '{name}' already exists")
|
|
|
|
with _gateways_lock:
|
|
_gateways[name] = {"type": gw_type, "registered_at": time.time()}
|
|
|
|
return {"ok": True, "message": f"Gateway '{name}' added as {gw_type}"}
|