290 lines
10 KiB
Python
290 lines
10 KiB
Python
"""
|
|
Heartbeat System API for WebUI.
|
|
Provides endpoints to manage heartbeats and monitor the manager/watchdog.
|
|
"""
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
HEARTBEAT_DIR = Path.home() / ".hermes" / "heartbeat"
|
|
REGISTRY_FILE = HEARTBEAT_DIR / "registry.json"
|
|
MANAGER_SCRIPT = Path.home() / ".hermes" / "scripts" / "heartbeat_manager.py"
|
|
WATCHDOG_LOG = Path.home() / ".hermes" / "logs" / "heartbeat_watchdog.log"
|
|
MANAGER_LOG = Path.home() / ".hermes" / "logs" / "heartbeat_manager.log"
|
|
HB_API = Path.home() / ".hermes" / "scripts" / "heartbeat_api.py"
|
|
|
|
def _run_api(args: list) -> dict:
|
|
"""Run heartbeat_api.py with given args, return parsed JSON."""
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, str(HB_API)] + args,
|
|
capture_output=True, text=True, timeout=30,
|
|
cwd=str(Path.home() / ".hermes")
|
|
)
|
|
if result.returncode == 0:
|
|
stdout = result.stdout.strip()
|
|
# Try to parse JSON from stdout
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("{"):
|
|
return json.loads(line)
|
|
# Plain text output = success
|
|
return {"ok": True, "output": stdout}
|
|
# Error case
|
|
stderr = result.stderr.strip()
|
|
if stderr:
|
|
return {"error": stderr}
|
|
return {"error": f"Exit code {result.returncode}"}
|
|
except subprocess.TimeoutExpired:
|
|
return {"error": "Command timed out"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def _load_registry() -> dict:
|
|
try:
|
|
with REGISTRY_FILE.open(encoding="utf-8") as f:
|
|
return json.loads(f.read())
|
|
except Exception:
|
|
return {"heartbeats": []}
|
|
|
|
|
|
def _manager_pid() -> str | None:
|
|
result = subprocess.run(
|
|
["pgrep", "-f", "heartbeat_manager.py"],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return result.stdout.strip().split()[0]
|
|
return None
|
|
|
|
|
|
def _manager_log_tail(lines: int = 20) -> str:
|
|
try:
|
|
if MANAGER_LOG.exists():
|
|
all_lines = MANAGER_LOG.read_text().splitlines()
|
|
return "\n".join(all_lines[-lines:])
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def _watchdog_log_tail(lines: int = 10) -> str:
|
|
try:
|
|
if WATCHDOG_LOG.exists():
|
|
all_lines = WATCHDOG_LOG.read_text().splitlines()
|
|
return "\n".join(all_lines[-lines:])
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ── Public API ──────────────────────────────────────────────────────────────
|
|
|
|
def handle_get(path: str) -> dict:
|
|
"""Handle GET /api/heartbeats/* routes."""
|
|
if path == "/api/heartbeats":
|
|
# List all heartbeats with status summary + manager info
|
|
registry = _load_registry()
|
|
heartbeats = registry.get("heartbeats", [])
|
|
by_status = {}
|
|
by_priority = {}
|
|
by_source = {}
|
|
pending_due = 0
|
|
now = datetime.now().isoformat()
|
|
|
|
for hb in heartbeats:
|
|
s = hb.get("status", "unknown")
|
|
by_status[s] = by_status.get(s, 0) + 1
|
|
p = hb.get("priority", "normal")
|
|
by_priority[p] = by_priority.get(p, 0) + 1
|
|
src = hb.get("source", "unknown")
|
|
by_source[src] = by_source.get(src, 0) + 1
|
|
if s == "pending" and hb.get("trigger_at", "") <= now:
|
|
pending_due += 1
|
|
|
|
# Manager info
|
|
pid = _manager_pid()
|
|
|
|
return {
|
|
"heartbeats": heartbeats,
|
|
"total": len(heartbeats),
|
|
"pending_due_count": pending_due,
|
|
"by_status": by_status,
|
|
"by_priority": by_priority,
|
|
"by_source": by_source,
|
|
"_manager": {
|
|
"running": pid is not None,
|
|
"pid": pid,
|
|
}
|
|
}
|
|
|
|
if path == "/api/heartbeats/manager":
|
|
pid = _manager_pid()
|
|
return {
|
|
"running": pid is not None,
|
|
"pid": pid,
|
|
"log_tail": _manager_log_tail(15),
|
|
}
|
|
|
|
if path == "/api/heartbeats/watchdog":
|
|
return {
|
|
"log_tail": _watchdog_log_tail(10),
|
|
}
|
|
|
|
if path == "/api/heartbeats/stats":
|
|
# Compute firing stats from log files
|
|
import glob, re
|
|
log_dir = HEARTBEAT_DIR / "logs"
|
|
fired_24h = 0
|
|
fired_total = 0
|
|
now = datetime.now()
|
|
day_ago = datetime.fromtimestamp(now.timestamp() - 86400)
|
|
|
|
for log_file in glob.glob(str(log_dir / "heartbeat_*.log")):
|
|
try:
|
|
for line in Path(log_file).read_text().splitlines():
|
|
if "processed heartbeat" in line or "fired" in line.lower():
|
|
# Parse timestamp from log line: [2026-04-28 08:17:54]
|
|
m = re.match(r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]", line)
|
|
if m:
|
|
fired_total += 1
|
|
try:
|
|
dt = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
|
|
if dt >= day_ago:
|
|
fired_24h += 1
|
|
except: pass
|
|
except: pass
|
|
|
|
# Next scheduled heartbeat
|
|
registry = _load_registry()
|
|
next_hb = None
|
|
for hb in registry.get("heartbeats", []):
|
|
if hb.get("status") == "pending":
|
|
ta = hb.get("trigger_at", "")
|
|
if ta and (next_hb is None or ta < next_hb):
|
|
next_hb = ta
|
|
|
|
# Load heartbeat.json config
|
|
config_file = Path.home() / ".hermes" / "config" / "heartbeat.json"
|
|
config = {}
|
|
if config_file.exists():
|
|
try:
|
|
with config_file.open(encoding="utf-8") as f:
|
|
config = json.loads(f.read())
|
|
except: pass
|
|
|
|
return {
|
|
"fired_total": fired_total,
|
|
"fired_24h": fired_24h,
|
|
"next_scheduled": next_hb,
|
|
"config": config,
|
|
}
|
|
|
|
# GET /api/heartbeats/{id}
|
|
if path.startswith("/api/heartbeats/"):
|
|
hb_id = path.split("/")[-1]
|
|
if hb_id in ("manager", "watchdog"):
|
|
return {"error": "Not found"}, 404
|
|
registry = _load_registry()
|
|
for hb in registry.get("heartbeats", []):
|
|
if hb.get("id") == hb_id:
|
|
return hb
|
|
return {"error": f"Heartbeat {hb_id} not found"}, 404
|
|
|
|
return None # Not handled
|
|
|
|
|
|
def handle_post(path: str, body: dict) -> dict:
|
|
"""Handle POST /api/heartbeats/* routes."""
|
|
if path == "/api/heartbeats":
|
|
# Create heartbeat
|
|
source = body.get("source", "webui")
|
|
action = body.get("action", "rose_continue")
|
|
instruction = body.get("instruction", "")
|
|
minutes = int(body.get("minutes", 5))
|
|
priority = body.get("priority")
|
|
mode = body.get("mode", "silent")
|
|
recurring = bool(body.get("recurring", False))
|
|
interval_minutes = int(body.get("interval_minutes", minutes)) if recurring else None
|
|
max_iterations = int(body["max_iterations"]) if body.get("max_iterations") else None
|
|
|
|
args = [
|
|
"create",
|
|
"--source", source,
|
|
"--action", action,
|
|
"--instruction", instruction,
|
|
"--minutes", str(minutes),
|
|
"--mode", mode,
|
|
]
|
|
if priority:
|
|
args += ["--priority", priority]
|
|
if recurring:
|
|
args.append("--recurring")
|
|
if interval_minutes:
|
|
args += ["--interval-minutes", str(interval_minutes)]
|
|
if max_iterations:
|
|
args += ["--max-iterations", str(max_iterations)]
|
|
|
|
result = _run_api(args)
|
|
return result
|
|
|
|
if path == "/api/heartbeats/manager/restart":
|
|
pid = _manager_pid()
|
|
if pid:
|
|
subprocess.run(["kill", pid], capture_output=True)
|
|
subprocess.Popen(
|
|
[sys.executable, str(MANAGER_SCRIPT), "--daemon"],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
start_new_session=True, cwd=str(Path.home() / ".hermes")
|
|
)
|
|
return {"ok": True, "message": "Manager restart initiated"}
|
|
|
|
if path.startswith("/api/heartbeats/") and path.endswith("/cancel"):
|
|
hb_id = path.split("/")[-2]
|
|
result = _run_api(["cancel", "--id", hb_id])
|
|
return result
|
|
|
|
if path.startswith("/api/heartbeats/") and path.endswith("/fire"):
|
|
# Manual fire (for testing)
|
|
hb_id = path.split("/")[-2]
|
|
# Simulate fire by updating trigger_at to now
|
|
registry = _load_registry()
|
|
for hb in registry.get("heartbeats", []):
|
|
if hb.get("id") == hb_id:
|
|
hb["trigger_at"] = datetime.now().isoformat()
|
|
REGISTRY_FILE.write_text(json.dumps(registry, indent=2))
|
|
return {"ok": True, "message": f"Heartbeat {hb_id} fire time set to now"}
|
|
return {"error": f"Heartbeat {hb_id} not found"}, 404
|
|
|
|
if path == "/api/heartbeats/config":
|
|
# Update heartbeat config (quiet hours, intervals, telegram)
|
|
config_file = Path.home() / ".hermes" / "config" / "heartbeat.json"
|
|
config = {}
|
|
if config_file.exists():
|
|
try:
|
|
with config_file.open(encoding="utf-8") as f:
|
|
config = json.loads(f.read())
|
|
except: pass
|
|
for key in ("quiet_hours", "daemon_interval_seconds", "intervals", "telegram", "critical_override"):
|
|
if key in body:
|
|
config[key] = body[key]
|
|
config_file.write_text(json.dumps(config, indent=2, ensure_ascii=False))
|
|
return {"ok": True, "config": config}
|
|
|
|
return None # Not handled
|
|
|
|
|
|
def handle_delete(path: str) -> dict:
|
|
"""Handle DELETE /api/heartbeats/{id}."""
|
|
if path.startswith("/api/heartbeats/"):
|
|
hb_id = path.split("/")[-1]
|
|
if hb_id in ("manager", "watchdog"):
|
|
return {"error": "Cannot delete system endpoint"}, 400
|
|
result = _run_api(["cancel", "--id", hb_id])
|
|
return result
|
|
return None
|