""" Rose Agents Panel API — Data layer for Hermes WebUI Agents extension. Provides Rose + Tier-2 agent status, inbox management, soul/memory editing, and configuration. """ import json import os import subprocess import threading import time from datetime import datetime, timedelta from pathlib import Path from typing import Any from api.helpers import j # ── Paths ────────────────────────────────────────────────────────────────────── _HERMES_DIR = Path.home() / ".hermes" _AGENTS_DIR = _HERMES_DIR / "agents" _INBOX_BUS = _HERMES_DIR / "scripts" / "message_bus.py" # ── Tier-2 Agent Registry ────────────────────────────────────────────────────── TIER2_AGENTS = { "lotus": {"name": "Lotus", "emoji": "🪷", "domain": "Health, Fitness & Recovery", "color": "#e91e63"}, "forget-me-not": {"name": "Forget-me-not", "emoji": "🌼", "domain": "Calendar, Time & Social", "color": "#ff9800"}, "sunflower": {"name": "Sunflower", "emoji": "🌻", "domain": "Finance, Wealth & Subscriptions","color": "#ffeb3b"}, "iris": {"name": "Iris", "emoji": "⚜️", "domain": "Career, Learning & Focus", "color": "#9c27b0"}, "ivy": {"name": "Ivy", "emoji": "🌿", "domain": "Smart Home & Environment", "color": "#4caf50"}, "dandelion": {"name": "Dandelion", "emoji": "🛡️", "domain": "Communication Triage", "color": "#03a9f4"}, "root": {"name": "Root", "emoji": "🌳", "domain": "DevOps, Logs & System Health", "color": "#795548"}, } ROSE_META = { "name": "Rose", "emoji": "🌹", "domain": "Orchestrator & Main Interface", "color": "#f44336", } # ── Helpers ─────────────────────────────────────────────────────────────────── def _get_process_status(agent_name: str) -> dict: """Check if an agent process is running via ps.""" try: result = subprocess.run( ["pgrep", "-f", f"hermes.*--agent\\s+{agent_name}|message_bus.*--agent\\s+{agent_name}"], capture_output=True, text=True ) running = bool(result.stdout.strip()) pid = int(result.stdout.strip().split()[0]) if running else None return {"running": running, "pid": pid} except Exception: return {"running": False, "pid": None} def _get_agent_status(agent_name: str) -> dict: """ Determine agent status: active / idle / offline. - active: process running AND recent activity (< 5 min) - idle: process running BUT no recent activity (5-15 min) - offline: no process """ proc = _get_process_status(agent_name) active_session_path = _AGENTS_DIR / agent_name / "active_session.txt" if not proc["running"]: return {"status": "offline", "last_activity": None, "pid": None} last_activity = None if active_session_path.exists(): try: mtime = active_session_path.stat().st_mtime last_activity = datetime.fromtimestamp(mtime).isoformat() + "Z" age_minutes = (time.time() - mtime) / 60 if age_minutes < 5: status = "active" elif age_minutes < 15: status = "idle" else: status = "offline" except Exception: status = "unknown" else: # Process running but no session file = treat as idle status = "idle" last_activity = None return {"status": status, "last_activity": last_activity, "pid": proc["pid"]} def _get_inbox_count(agent_name: str) -> int: """Count pending (unread) messages in agent inbox.""" try: result = subprocess.run( ["/usr/bin/python3", str(_INBOX_BUS), "check", "--agent", agent_name], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: data = json.loads(result.stdout) return data.get("pending", 0) except Exception: pass return 0 def _read_inbox(agent_name: str, limit: int = 50) -> list[dict]: """Read messages from agent inbox.""" inbox_path = _AGENTS_DIR / agent_name / "inbox.json" if not inbox_path.exists(): return [] try: with open(inbox_path, "r") as f: data = json.load(f) messages = data if isinstance(data, list) else data.get("messages", []) # Reverse so newest first, return limited return list(reversed(messages))[:limit] except (json.JSONDecodeError, IOError): return [] def _read_file_safe(path: Path) -> str | None: """Read a file safely, return None if missing.""" try: if path.exists(): return path.read_text() except Exception: pass return None def _write_file_safe(path: Path, content: str, backup: bool = True) -> dict: """Write a file safely with optional backup. Returns dict with success/error.""" try: if backup and path.exists(): backup_path = path.with_suffix(path.suffix + ".backup") backup_path.write_text(path.read_text()) path.write_text(content) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} # ── API Functions ───────────────────────────────────────────────────────────── def list_agents() -> dict: """Return status summary for Rose + all Tier-2 agents.""" agents = [] # Rose (orchestrator — always "running" as it's the gateway itself) rose_status = _get_agent_status("rose") rose_inbox_count = _get_inbox_count("rose") agents.append({ "id": "rose", "name": ROSE_META["name"], "emoji": ROSE_META["emoji"], "domain": ROSE_META["domain"], "color": ROSE_META["color"], "tier": "orchestrator", "status": "active", # Rose is always running "pid": None, "last_activity": rose_status.get("last_activity"), "inbox_count": rose_inbox_count, }) # Tier-2 agents for agent_id, meta in TIER2_AGENTS.items(): status_info = _get_agent_status(agent_id) inbox_count = _get_inbox_count(agent_id) if status_info["status"] != "offline" else 0 agents.append({ "id": agent_id, "name": meta["name"], "emoji": meta["emoji"], "domain": meta["domain"], "color": meta["color"], "tier": "tier2", "status": status_info["status"], "pid": status_info["pid"], "last_activity": status_info.get("last_activity"), "inbox_count": inbox_count, }) return {"agents": agents} def get_agent(agent_id: str) -> dict: """Return full detail for one agent: soul.md, memory.md, inbox, config.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"error": f"Unknown agent: {agent_id}"} meta = TIER2_AGENTS.get(agent_id, ROSE_META) agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR soul = _read_file_safe(agent_dir / "soul.md") memory = _read_file_safe(agent_dir / "memory.md") inbox_messages = _read_inbox(agent_id) status_info = _get_agent_status(agent_id) inbox_count = _get_inbox_count(agent_id) # Default model — extract from soul.md YAML frontmatter if present default_model = None if soul: import re m = re.search(r'model:\s*["\']?([^"\'\n]+)["\']?', soul) if m: default_model = m.group(1).strip() # Disabled flag disabled = (agent_dir / "disabled").exists() if agent_dir.exists() else False return { "id": agent_id, "name": meta["name"], "emoji": meta["emoji"], "domain": meta["domain"], "color": meta["color"], "tier": "orchestrator" if agent_id == "rose" else "tier2", "status": status_info["status"], "last_activity": status_info.get("last_activity"), "pid": status_info["pid"], "inbox_count": inbox_count, "soul": soul or "", "memory": memory or "", "default_model": default_model, "disabled": disabled, "inbox": inbox_messages, } def get_agent_status(agent_id: str) -> dict: """Return only the status info for one agent.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"error": f"Unknown agent: {agent_id}"} if agent_id == "rose": return {"status": "active", "last_activity": None, "pid": None} return _get_agent_status(agent_id) def get_agent_inbox(agent_id: str, limit: int = 50) -> dict: """Return inbox messages for a specific agent.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"error": f"Unknown agent: {agent_id}"} messages = _read_inbox(agent_id, limit) meta = TIER2_AGENTS.get(agent_id, ROSE_META) return { "agent_id": agent_id, "agent_name": meta["name"], "messages": messages, } def update_agent_soul(agent_id: str, content: str) -> dict: """Write soul.md for an agent. Returns {ok, error}.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"ok": False, "error": f"Unknown agent: {agent_id}"} if agent_id == "rose": return {"ok": False, "error": "Rose's soul.md cannot be edited via this API"} soul_path = _AGENTS_DIR / agent_id / "soul.md" # Ensure directory exists (_AGENTS_DIR / agent_id).mkdir(parents=True, exist_ok=True) return _write_file_safe(soul_path, content, backup=True) def update_agent_memory(agent_id: str, content: str) -> dict: """Write memory.md for an agent. Returns {ok, error}.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"ok": False, "error": f"Unknown agent: {agent_id}"} if agent_id == "rose": return {"ok": False, "error": "Rose's memory.md cannot be edited via this API"} memory_path = _AGENTS_DIR / agent_id / "memory.md" (_AGENTS_DIR / agent_id).mkdir(parents=True, exist_ok=True) return _write_file_safe(memory_path, content, backup=True) def send_agent_message(agent_id: str, payload: dict) -> dict: """ Add a message to an agent's inbox (simulates inter-agent message). payload: {from, type, subject, content} """ if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"ok": False, "error": f"Unknown agent: {agent_id}"} inbox_path = _AGENTS_DIR / agent_id / "inbox.json" (_AGENTS_DIR / agent_id).mkdir(parents=True, exist_ok=True) # Load existing inbox if inbox_path.exists(): try: with open(inbox_path, "r") as f: data = json.load(f) messages = data if isinstance(data, list) else data.get("messages", []) except (json.JSONDecodeError, IOError): messages = [] else: messages = [] # Add new message import uuid msg = { "id": uuid.uuid4().hex[:8], "from": payload.get("from", "rose"), "type": payload.get("type", "request"), "subject": payload.get("subject", ""), "content": payload.get("content", ""), "timestamp": datetime.utcnow().isoformat() + "Z", "status": "unread", } messages.append(msg) try: with open(inbox_path, "w") as f: json.dump(messages, f, indent=2) return {"ok": True, "message_id": msg["id"]} except Exception as e: return {"ok": False, "error": str(e)} def ack_agent_message(agent_id: str, msg_id: str) -> dict: """Mark a message as acknowledged/read in an agent's inbox.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"ok": False, "error": f"Unknown agent: {agent_id}"} inbox_path = _AGENTS_DIR / agent_id / "inbox.json" if not inbox_path.exists(): return {"ok": False, "error": "Inbox not found"} try: with open(inbox_path, "r") as f: messages = json.load(f) if not isinstance(messages, list): messages = messages.get("messages", []) found = False for msg in messages: if msg.get("id") == msg_id: msg["status"] = "read" found = True break if not found: return {"ok": False, "error": f"Message {msg_id} not found"} with open(inbox_path, "w") as f: json.dump(messages, f, indent=2) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} def set_agent_enabled(agent_id: str, enabled: bool) -> dict: """Enable or disable an agent. Disabled agents won't respond.""" if agent_id not in TIER2_AGENTS: return {"ok": False, "error": f"Unknown agent: {agent_id}"} if agent_id == "rose": return {"ok": False, "error": "Rose cannot be disabled"} disabled_flag = _AGENTS_DIR / agent_id / "disabled" try: if enabled: if disabled_flag.exists(): disabled_flag.unlink() else: disabled_flag.write_text("disabled") return {"ok": True, "disabled": not enabled} except Exception as e: return {"ok": False, "error": str(e)} def get_agent_config(agent_id: str) -> dict: """Return configuration paths/info for a specific agent.""" if agent_id == "rose": return { "id": "rose", "name": "Rose", "soul_path": str(_HERMES_DIR / "rose.md"), "memory_path": str(_HERMES_DIR / "memory.json"), } elif agent_id in TIER2_AGENTS: soul_path = _AGENTS_DIR / agent_id / "soul.md" memory_path = _AGENTS_DIR / agent_id / "memory.md" inbox_path = _AGENTS_DIR / agent_id / "inbox.json" return { "id": agent_id, "name": TIER2_AGENTS[agent_id]["name"], "soul_path": str(soul_path) if soul_path.exists() else None, "memory_path": str(memory_path) if memory_path.exists() else None, "inbox_path": str(inbox_path), } return {"error": f"Unknown agent: {agent_id}"} # ── Activity & Error Log ─────────────────────────────────────────────────────── ACTIVITY_EVENT_TYPES = [ "agent_started", "agent_stopped", "message_sent", "message_received", "task_started", "task_completed", "task_failed", "error", "soul_updated", "memory_updated", "chat_started", "chat_ended", "health_check", "config_updated", ] def _log_agent_activity(agent_id: str, event_type: str, details: str = "") -> dict: """ Write an activity event to the agent's activity.log file. Log format: ISO timestamp | type | details Returns {ok: True} or {ok: False, error: ...} """ if event_type not in ACTIVITY_EVENT_TYPES: return {"ok": False, "error": f"Unknown event type: {event_type}"} if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"ok": False, "error": f"Unknown agent: {agent_id}"} agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR log_path = agent_dir / "activity.log" agent_dir.mkdir(parents=True, exist_ok=True) try: timestamp = datetime.utcnow().isoformat() + "Z" line = f"{timestamp} | {event_type} | {details}\n" with open(log_path, "a") as f: f.write(line) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} def _get_activity_log(agent_id: str, limit: int = 50) -> list[dict]: """ Read recent activity events for an agent. Returns list of {timestamp, type, details} sorted newest-first. """ if agent_id not in TIER2_AGENTS and agent_id != "rose": return [] agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR log_path = agent_dir / "activity.log" if not log_path.exists(): return [] try: lines = log_path.read_text().strip().split("\n") events = [] for line in reversed(lines): line = line.strip() if not line: continue parts = line.split(" | ", 2) if len(parts) >= 2: events.append({ "timestamp": parts[0], "type": parts[1], "details": parts[2] if len(parts) > 2 else "", }) if len(events) >= limit: break return events except Exception: return [] def _get_error_log(agent_id: str, limit: int = 20) -> list[dict]: """ Read error events from activity log (type == 'error'). Returns list of {timestamp, type, details} sorted newest-first. """ if agent_id not in TIER2_AGENTS and agent_id != "rose": return [] agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR log_path = agent_dir / "activity.log" if not log_path.exists(): return [] try: lines = log_path.read_text().strip().split("\n") errors = [] for line in reversed(lines): line = line.strip() if not line or " | error | " not in line: continue parts = line.split(" | ", 2) if len(parts) >= 2: errors.append({ "timestamp": parts[0], "type": parts[1], "details": parts[2] if len(parts) > 2 else "", }) if len(errors) >= limit: break return errors except Exception: return [] def get_agent_activity(agent_id: str, limit: int = 50) -> dict: """API: GET /api/agents/{id}/activity — return activity log.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"error": f"Unknown agent: {agent_id}"} events = _get_activity_log(agent_id, limit) return { "agent_id": agent_id, "events": events, } def get_agent_errors(agent_id: str, limit: int = 20) -> dict: """API: GET /api/agents/{id}/errors — return error log.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"error": f"Unknown agent: {agent_id}"} errors = _get_error_log(agent_id, limit) return { "agent_id": agent_id, "errors": errors, } # ── Chat History ────────────────────────────────────────────────────────────── def _get_chat_history(agent_id: str, limit: int = 20) -> list[dict]: """ Read chat sessions from JSONL files and return history for a specific agent. Sessions are sorted newest-first. Returns list of {session_id, title, message_count, created_at, last_message_at, model}. """ sessions_dir = _HERMES_DIR / "sessions" if not sessions_dir.exists(): return [] sessions = sorted(sessions_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True) history = [] for session_file in sessions[:limit * 2]: # overscan if len(history) >= limit: break try: with open(session_file) as f: lines = f.readlines() if not lines: continue # First line has metadata metadata = json.loads(lines[0]) created_at = metadata.get("timestamp", "") model = metadata.get("model", "unknown") # Count messages message_count = sum(1 for l in lines if l.strip()) # Title = first user message preview title = "Chat" for line in lines[1:]: if line.strip(): try: msg = json.loads(line) if msg.get("role") == "user": content = str(msg.get("content", ""))[:80] title = content if content else "Chat" break except Exception: pass # Last message timestamp last_msg = None for line in reversed(lines): if line.strip(): try: last_msg = json.loads(line).get("timestamp", created_at) break except Exception: pass session_id = session_file.stem # filename without .jsonl history.append({ "session_id": session_id, "title": title, "message_count": message_count, "created_at": created_at, "last_message_at": last_msg or created_at, "model": model, }) except Exception: continue return history[:limit] def get_agent_chat_history(agent_id: str, limit: int = 20) -> dict: """API: GET /api/agents/{id}/chat-history — return chat history for agent.""" if agent_id not in TIER2_AGENTS and agent_id != "rose": return {"error": f"Unknown agent: {agent_id}"} history = _get_chat_history(agent_id, limit) return { "agent_id": agent_id, "sessions": history, }