""" Heartbeat System API for WebUI. Provides endpoints to manage heartbeats and monitor the manager/watchdog. """ import json import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any HEARTBEAT_DIR = Path.home() / ".hermes" / "heartbeat" REGISTRY_FILE = HEARTBEAT_DIR / "registry.json" MANAGER_SCRIPT = Path.home() / ".hermes" / "scripts" / "heartbeat_manager.py" WATCHDOG_LOG = Path.home() / ".hermes" / "logs" / "heartbeat_watchdog.log" MANAGER_LOG = Path.home() / ".hermes" / "logs" / "heartbeat_manager.log" HB_API = Path.home() / ".hermes" / "scripts" / "heartbeat_api.py" def _run_api(args: list) -> dict: """Run heartbeat_api.py with given args, return parsed JSON.""" try: result = subprocess.run( [sys.executable, str(HB_API)] + args, capture_output=True, text=True, timeout=30, cwd=str(Path.home() / ".hermes") ) if result.returncode == 0: stdout = result.stdout.strip() # Try to parse JSON from stdout for line in stdout.splitlines(): line = line.strip() if line.startswith("{"): return json.loads(line) # Plain text output = success return {"ok": True, "output": stdout} # Error case stderr = result.stderr.strip() if stderr: return {"error": stderr} return {"error": f"Exit code {result.returncode}"} except subprocess.TimeoutExpired: return {"error": "Command timed out"} except Exception as e: return {"error": str(e)} def _load_registry() -> dict: try: with REGISTRY_FILE.open(encoding="utf-8") as f: return json.loads(f.read()) except Exception: return {"heartbeats": []} def _manager_pid() -> str | None: result = subprocess.run( ["pgrep", "-f", "heartbeat_manager.py"], capture_output=True, text=True ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip().split()[0] return None def _manager_log_tail(lines: int = 20) -> str: try: if MANAGER_LOG.exists(): all_lines = MANAGER_LOG.read_text().splitlines() return "\n".join(all_lines[-lines:]) except Exception: pass return "" def _watchdog_log_tail(lines: int = 10) -> str: try: if WATCHDOG_LOG.exists(): all_lines = WATCHDOG_LOG.read_text().splitlines() return "\n".join(all_lines[-lines:]) except Exception: pass return "" # ── Public API ────────────────────────────────────────────────────────────── def handle_get(path: str) -> dict: """Handle GET /api/heartbeats/* routes.""" if path == "/api/heartbeats": # List all heartbeats with status summary + manager info registry = _load_registry() heartbeats = registry.get("heartbeats", []) by_status = {} by_priority = {} by_source = {} pending_due = 0 now = datetime.now().isoformat() for hb in heartbeats: s = hb.get("status", "unknown") by_status[s] = by_status.get(s, 0) + 1 p = hb.get("priority", "normal") by_priority[p] = by_priority.get(p, 0) + 1 src = hb.get("source", "unknown") by_source[src] = by_source.get(src, 0) + 1 if s == "pending" and hb.get("trigger_at", "") <= now: pending_due += 1 # Manager info pid = _manager_pid() return { "heartbeats": heartbeats, "total": len(heartbeats), "pending_due_count": pending_due, "by_status": by_status, "by_priority": by_priority, "by_source": by_source, "_manager": { "running": pid is not None, "pid": pid, } } if path == "/api/heartbeats/manager": pid = _manager_pid() return { "running": pid is not None, "pid": pid, "log_tail": _manager_log_tail(15), } if path == "/api/heartbeats/watchdog": return { "log_tail": _watchdog_log_tail(10), } if path == "/api/heartbeats/stats": # Compute firing stats from log files import glob, re log_dir = HEARTBEAT_DIR / "logs" fired_24h = 0 fired_total = 0 now = datetime.now() day_ago = datetime.fromtimestamp(now.timestamp() - 86400) for log_file in glob.glob(str(log_dir / "heartbeat_*.log")): try: for line in Path(log_file).read_text().splitlines(): if "processed heartbeat" in line or "fired" in line.lower(): # Parse timestamp from log line: [2026-04-28 08:17:54] m = re.match(r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]", line) if m: fired_total += 1 try: dt = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S") if dt >= day_ago: fired_24h += 1 except: pass except: pass # Next scheduled heartbeat registry = _load_registry() next_hb = None for hb in registry.get("heartbeats", []): if hb.get("status") == "pending": ta = hb.get("trigger_at", "") if ta and (next_hb is None or ta < next_hb): next_hb = ta # Load heartbeat.json config config_file = Path.home() / ".hermes" / "config" / "heartbeat.json" config = {} if config_file.exists(): try: with config_file.open(encoding="utf-8") as f: config = json.loads(f.read()) except: pass return { "fired_total": fired_total, "fired_24h": fired_24h, "next_scheduled": next_hb, "config": config, } # GET /api/heartbeats/{id} if path.startswith("/api/heartbeats/"): hb_id = path.split("/")[-1] if hb_id in ("manager", "watchdog"): return {"error": "Not found"}, 404 registry = _load_registry() for hb in registry.get("heartbeats", []): if hb.get("id") == hb_id: return hb return {"error": f"Heartbeat {hb_id} not found"}, 404 return None # Not handled def handle_post(path: str, body: dict) -> dict: """Handle POST /api/heartbeats/* routes.""" if path == "/api/heartbeats": # Create heartbeat source = body.get("source", "webui") action = body.get("action", "rose_continue") instruction = body.get("instruction", "") minutes = int(body.get("minutes", 5)) priority = body.get("priority") mode = body.get("mode", "silent") recurring = bool(body.get("recurring", False)) interval_minutes = int(body.get("interval_minutes", minutes)) if recurring else None max_iterations = int(body["max_iterations"]) if body.get("max_iterations") else None args = [ "create", "--source", source, "--action", action, "--instruction", instruction, "--minutes", str(minutes), "--mode", mode, ] if priority: args += ["--priority", priority] if recurring: args.append("--recurring") if interval_minutes: args += ["--interval-minutes", str(interval_minutes)] if max_iterations: args += ["--max-iterations", str(max_iterations)] result = _run_api(args) return result if path == "/api/heartbeats/manager/restart": pid = _manager_pid() if pid: subprocess.run(["kill", pid], capture_output=True) subprocess.Popen( [sys.executable, str(MANAGER_SCRIPT), "--daemon"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, cwd=str(Path.home() / ".hermes") ) return {"ok": True, "message": "Manager restart initiated"} if path.startswith("/api/heartbeats/") and path.endswith("/cancel"): hb_id = path.split("/")[-2] result = _run_api(["cancel", "--id", hb_id]) return result if path.startswith("/api/heartbeats/") and path.endswith("/fire"): # Manual fire (for testing) hb_id = path.split("/")[-2] # Simulate fire by updating trigger_at to now registry = _load_registry() for hb in registry.get("heartbeats", []): if hb.get("id") == hb_id: hb["trigger_at"] = datetime.now().isoformat() REGISTRY_FILE.write_text(json.dumps(registry, indent=2)) return {"ok": True, "message": f"Heartbeat {hb_id} fire time set to now"} return {"error": f"Heartbeat {hb_id} not found"}, 404 if path == "/api/heartbeats/config": # Update heartbeat config (quiet hours, intervals, telegram) config_file = Path.home() / ".hermes" / "config" / "heartbeat.json" config = {} if config_file.exists(): try: with config_file.open(encoding="utf-8") as f: config = json.loads(f.read()) except: pass for key in ("quiet_hours", "daemon_interval_seconds", "intervals", "telegram", "critical_override"): if key in body: config[key] = body[key] config_file.write_text(json.dumps(config, indent=2, ensure_ascii=False)) return {"ok": True, "config": config} return None # Not handled def handle_delete(path: str) -> dict: """Handle DELETE /api/heartbeats/{id}.""" if path.startswith("/api/heartbeats/"): hb_id = path.split("/")[-1] if hb_id in ("manager", "watchdog"): return {"error": "Cannot delete system endpoint"}, 400 result = _run_api(["cancel", "--id", hb_id]) return result return None