Files
webui-develop/api/agents.py
Rose 00045314f8 Phase 4-6: Message Bus, Memory Search (ChromaDB), Token Tracking, Topology Graph
Phase 4: Message Bus Viewer
- Backend: get_message_bus_status(), send_bus_message() in agents.py
- Route: GET /api/agents/message-bus, POST /api/agents/{id}/bus-message
- Frontend: Message Bus tab in agent detail overlay

Phase 5: Memory Search (ChromaDB)
- Backend: _search_agent_memory(), _search_all_agents_memory() via ChromaDB rose_memory collection
- Route: GET /api/agents/memory/search, GET /api/agents/{id}/memory/search
- Frontend: Search bar added to Memory tab, renders confidence scores + topics

Phase 6: Token Tracking + Topology Graph
- Backend: get_agent_usage() reads ~/.hermes/agents/{id}/usage.json
- Route: GET /api/agents/{id}/usage
- Frontend: Usage tab with today/week/month token counts and cost
- Frontend: Topology tab with SVG radial graph of agent network
2026-04-20 14:45:11 +02:00

1070 lines
37 KiB
Python

"""
Rose Agents Panel API — Data layer for Hermes WebUI Agents extension.
Provides Rose + Tier-2 agent status, inbox management, soul/memory editing, and configuration.
"""
import json
import os
import subprocess
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from api.helpers import j
# ChromaDB for memory search
import chromadb
# ── Paths ──────────────────────────────────────────────────────────────────────
_HERMES_DIR = Path.home() / ".hermes"
_AGENTS_DIR = _HERMES_DIR / "agents"
_INBOX_BUS = _HERMES_DIR / "scripts" / "message_bus.py"
# ── Tier-2 Agent Registry ──────────────────────────────────────────────────────
TIER2_AGENTS = {
"lotus": {"name": "Lotus", "emoji": "🪷", "domain": "Health, Fitness & Recovery", "color": "#e91e63"},
"forget-me-not": {"name": "Forget-me-not", "emoji": "🌼", "domain": "Calendar, Time & Social", "color": "#ff9800"},
"sunflower": {"name": "Sunflower", "emoji": "🌻", "domain": "Finance, Wealth & Subscriptions","color": "#ffeb3b"},
"iris": {"name": "Iris", "emoji": "⚜️", "domain": "Career, Learning & Focus", "color": "#9c27b0"},
"ivy": {"name": "Ivy", "emoji": "🌿", "domain": "Smart Home & Environment", "color": "#4caf50"},
"dandelion": {"name": "Dandelion", "emoji": "🛡️", "domain": "Communication Triage", "color": "#03a9f4"},
"root": {"name": "Root", "emoji": "🌳", "domain": "DevOps, Logs & System Health", "color": "#795548"},
}
ROSE_META = {
"name": "Rose",
"emoji": "🌹",
"domain": "Orchestrator & Main Interface",
"color": "#f44336",
}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_process_status(agent_name: str) -> dict:
"""Check if an agent process is running via ps."""
try:
result = subprocess.run(
["pgrep", "-f", f"hermes.*--agent\\s+{agent_name}|message_bus.*--agent\\s+{agent_name}"],
capture_output=True, text=True
)
running = bool(result.stdout.strip())
pid = int(result.stdout.strip().split()[0]) if running else None
return {"running": running, "pid": pid}
except Exception:
return {"running": False, "pid": None}
def _get_agent_status(agent_name: str) -> dict:
"""
Determine agent status: active / idle / offline.
- active: process running AND recent activity (< 5 min)
- idle: process running BUT no recent activity (5-15 min)
- offline: no process
"""
proc = _get_process_status(agent_name)
active_session_path = _AGENTS_DIR / agent_name / "active_session.txt"
if not proc["running"]:
return {"status": "offline", "last_activity": None, "pid": None}
last_activity = None
if active_session_path.exists():
try:
mtime = active_session_path.stat().st_mtime
last_activity = datetime.fromtimestamp(mtime).isoformat() + "Z"
age_minutes = (time.time() - mtime) / 60
if age_minutes < 5:
status = "active"
elif age_minutes < 15:
status = "idle"
else:
status = "offline"
except Exception:
status = "unknown"
else:
# Process running but no session file = treat as idle
status = "idle"
last_activity = None
return {"status": status, "last_activity": last_activity, "pid": proc["pid"]}
def _get_inbox_count(agent_name: str) -> int:
"""Count pending (unread) messages in agent inbox."""
try:
result = subprocess.run(
["/usr/bin/python3", str(_INBOX_BUS), "check", "--agent", agent_name],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
data = json.loads(result.stdout)
return data.get("pending", 0)
except Exception:
pass
return 0
def _read_inbox(agent_name: str, limit: int = 50) -> list[dict]:
"""Read messages from agent inbox."""
inbox_path = _AGENTS_DIR / agent_name / "inbox.json"
if not inbox_path.exists():
return []
try:
with open(inbox_path, "r") as f:
data = json.load(f)
messages = data if isinstance(data, list) else data.get("messages", [])
# Reverse so newest first, return limited
return list(reversed(messages))[:limit]
except (json.JSONDecodeError, IOError):
return []
def _read_file_safe(path: Path) -> str | None:
"""Read a file safely, return None if missing."""
try:
if path.exists():
return path.read_text()
except Exception:
pass
return None
def _write_file_safe(path: Path, content: str, backup: bool = True) -> dict:
"""Write a file safely with optional backup. Returns dict with success/error."""
try:
if backup and path.exists():
backup_path = path.with_suffix(path.suffix + ".backup")
backup_path.write_text(path.read_text())
path.write_text(content)
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
# ── API Functions ─────────────────────────────────────────────────────────────
def list_agents() -> dict:
"""Return status summary for Rose + all Tier-2 agents."""
agents = []
# Rose (orchestrator — always "running" as it's the gateway itself)
rose_status = _get_agent_status("rose")
rose_inbox_count = _get_inbox_count("rose")
agents.append({
"id": "rose",
"name": ROSE_META["name"],
"emoji": ROSE_META["emoji"],
"domain": ROSE_META["domain"],
"color": ROSE_META["color"],
"tier": "orchestrator",
"status": "active", # Rose is always running
"pid": None,
"last_activity": rose_status.get("last_activity"),
"inbox_count": rose_inbox_count,
})
# Tier-2 agents
for agent_id, meta in TIER2_AGENTS.items():
status_info = _get_agent_status(agent_id)
inbox_count = _get_inbox_count(agent_id) if status_info["status"] != "offline" else 0
agents.append({
"id": agent_id,
"name": meta["name"],
"emoji": meta["emoji"],
"domain": meta["domain"],
"color": meta["color"],
"tier": "tier2",
"status": status_info["status"],
"pid": status_info["pid"],
"last_activity": status_info.get("last_activity"),
"inbox_count": inbox_count,
})
return {"agents": agents}
def get_agent(agent_id: str) -> dict:
"""Return full detail for one agent: soul.md, memory.md, inbox, config."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
meta = TIER2_AGENTS.get(agent_id, ROSE_META)
agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR
soul = _read_file_safe(agent_dir / "soul.md")
memory = _read_file_safe(agent_dir / "memory.md")
inbox_messages = _read_inbox(agent_id)
status_info = _get_agent_status(agent_id)
inbox_count = _get_inbox_count(agent_id)
# Default model — extract from soul.md YAML frontmatter if present
default_model = None
if soul:
import re
m = re.search(r'model:\s*["\']?([^"\'\n]+)["\']?', soul)
if m:
default_model = m.group(1).strip()
# Disabled flag
disabled = (agent_dir / "disabled").exists() if agent_dir.exists() else False
return {
"id": agent_id,
"name": meta["name"],
"emoji": meta["emoji"],
"domain": meta["domain"],
"color": meta["color"],
"tier": "orchestrator" if agent_id == "rose" else "tier2",
"status": status_info["status"],
"last_activity": status_info.get("last_activity"),
"pid": status_info["pid"],
"inbox_count": inbox_count,
"soul": soul or "",
"memory": memory or "",
"default_model": default_model,
"disabled": disabled,
"inbox": inbox_messages,
}
def get_agent_status(agent_id: str) -> dict:
"""Return only the status info for one agent."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
if agent_id == "rose":
return {"status": "active", "last_activity": None, "pid": None}
return _get_agent_status(agent_id)
def get_agent_inbox(agent_id: str, limit: int = 50) -> dict:
"""Return inbox messages for a specific agent."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
messages = _read_inbox(agent_id, limit)
meta = TIER2_AGENTS.get(agent_id, ROSE_META)
return {
"agent_id": agent_id,
"agent_name": meta["name"],
"messages": messages,
}
def update_agent_soul(agent_id: str, content: str) -> dict:
"""Write soul.md for an agent. Returns {ok, error}."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"ok": False, "error": f"Unknown agent: {agent_id}"}
if agent_id == "rose":
return {"ok": False, "error": "Rose's soul.md cannot be edited via this API"}
soul_path = _AGENTS_DIR / agent_id / "soul.md"
# Ensure directory exists
(_AGENTS_DIR / agent_id).mkdir(parents=True, exist_ok=True)
return _write_file_safe(soul_path, content, backup=True)
def update_agent_memory(agent_id: str, content: str) -> dict:
"""Write memory.md for an agent. Returns {ok, error}."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"ok": False, "error": f"Unknown agent: {agent_id}"}
if agent_id == "rose":
return {"ok": False, "error": "Rose's memory.md cannot be edited via this API"}
memory_path = _AGENTS_DIR / agent_id / "memory.md"
(_AGENTS_DIR / agent_id).mkdir(parents=True, exist_ok=True)
return _write_file_safe(memory_path, content, backup=True)
def send_agent_message(agent_id: str, payload: dict) -> dict:
"""
Add a message to an agent's inbox (simulates inter-agent message).
payload: {from, type, subject, content}
"""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"ok": False, "error": f"Unknown agent: {agent_id}"}
inbox_path = _AGENTS_DIR / agent_id / "inbox.json"
(_AGENTS_DIR / agent_id).mkdir(parents=True, exist_ok=True)
# Load existing inbox
if inbox_path.exists():
try:
with open(inbox_path, "r") as f:
data = json.load(f)
messages = data if isinstance(data, list) else data.get("messages", [])
except (json.JSONDecodeError, IOError):
messages = []
else:
messages = []
# Add new message
import uuid
msg = {
"id": uuid.uuid4().hex[:8],
"from": payload.get("from", "rose"),
"type": payload.get("type", "request"),
"subject": payload.get("subject", ""),
"content": payload.get("content", ""),
"timestamp": datetime.utcnow().isoformat() + "Z",
"status": "unread",
}
messages.append(msg)
try:
with open(inbox_path, "w") as f:
json.dump(messages, f, indent=2)
return {"ok": True, "message_id": msg["id"]}
except Exception as e:
return {"ok": False, "error": str(e)}
def ack_agent_message(agent_id: str, msg_id: str) -> dict:
"""Mark a message as acknowledged/read in an agent's inbox."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"ok": False, "error": f"Unknown agent: {agent_id}"}
inbox_path = _AGENTS_DIR / agent_id / "inbox.json"
if not inbox_path.exists():
return {"ok": False, "error": "Inbox not found"}
try:
with open(inbox_path, "r") as f:
messages = json.load(f)
if not isinstance(messages, list):
messages = messages.get("messages", [])
found = False
for msg in messages:
if msg.get("id") == msg_id:
msg["status"] = "read"
found = True
break
if not found:
return {"ok": False, "error": f"Message {msg_id} not found"}
with open(inbox_path, "w") as f:
json.dump(messages, f, indent=2)
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def set_agent_enabled(agent_id: str, enabled: bool) -> dict:
"""Enable or disable an agent. Disabled agents won't respond."""
if agent_id not in TIER2_AGENTS:
return {"ok": False, "error": f"Unknown agent: {agent_id}"}
if agent_id == "rose":
return {"ok": False, "error": "Rose cannot be disabled"}
disabled_flag = _AGENTS_DIR / agent_id / "disabled"
try:
if enabled:
if disabled_flag.exists():
disabled_flag.unlink()
else:
disabled_flag.write_text("disabled")
return {"ok": True, "disabled": not enabled}
except Exception as e:
return {"ok": False, "error": str(e)}
def get_agent_config(agent_id: str) -> dict:
"""Return configuration paths/info for a specific agent."""
if agent_id == "rose":
return {
"id": "rose",
"name": "Rose",
"soul_path": str(_HERMES_DIR / "rose.md"),
"memory_path": str(_HERMES_DIR / "memory.json"),
}
elif agent_id in TIER2_AGENTS:
soul_path = _AGENTS_DIR / agent_id / "soul.md"
memory_path = _AGENTS_DIR / agent_id / "memory.md"
inbox_path = _AGENTS_DIR / agent_id / "inbox.json"
return {
"id": agent_id,
"name": TIER2_AGENTS[agent_id]["name"],
"soul_path": str(soul_path) if soul_path.exists() else None,
"memory_path": str(memory_path) if memory_path.exists() else None,
"inbox_path": str(inbox_path),
}
return {"error": f"Unknown agent: {agent_id}"}
# ── Activity & Error Log ───────────────────────────────────────────────────────
ACTIVITY_EVENT_TYPES = [
"agent_started", "agent_stopped",
"message_sent", "message_received",
"task_started", "task_completed", "task_failed",
"error", "soul_updated", "memory_updated",
"chat_started", "chat_ended",
"health_check", "config_updated",
]
def _log_agent_activity(agent_id: str, event_type: str, details: str = "") -> dict:
"""
Write an activity event to the agent's activity.log file.
Log format: ISO timestamp | type | details
Returns {ok: True} or {ok: False, error: ...}
"""
if event_type not in ACTIVITY_EVENT_TYPES:
return {"ok": False, "error": f"Unknown event type: {event_type}"}
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"ok": False, "error": f"Unknown agent: {agent_id}"}
agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR
log_path = agent_dir / "activity.log"
agent_dir.mkdir(parents=True, exist_ok=True)
try:
timestamp = datetime.utcnow().isoformat() + "Z"
line = f"{timestamp} | {event_type} | {details}\n"
with open(log_path, "a") as f:
f.write(line)
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def _get_activity_log(agent_id: str, limit: int = 50) -> list[dict]:
"""
Read recent activity events for an agent.
Returns list of {timestamp, type, details} sorted newest-first.
"""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return []
agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR
log_path = agent_dir / "activity.log"
if not log_path.exists():
return []
try:
lines = log_path.read_text().strip().split("\n")
events = []
for line in reversed(lines):
line = line.strip()
if not line:
continue
parts = line.split(" | ", 2)
if len(parts) >= 2:
events.append({
"timestamp": parts[0],
"type": parts[1],
"details": parts[2] if len(parts) > 2 else "",
})
if len(events) >= limit:
break
return events
except Exception:
return []
def _get_error_log(agent_id: str, limit: int = 20) -> list[dict]:
"""
Read error events from activity log (type == 'error').
Returns list of {timestamp, type, details} sorted newest-first.
"""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return []
agent_dir = _AGENTS_DIR / agent_id if agent_id != "rose" else _HERMES_DIR
log_path = agent_dir / "activity.log"
if not log_path.exists():
return []
try:
lines = log_path.read_text().strip().split("\n")
errors = []
for line in reversed(lines):
line = line.strip()
if not line or " | error | " not in line:
continue
parts = line.split(" | ", 2)
if len(parts) >= 2:
errors.append({
"timestamp": parts[0],
"type": parts[1],
"details": parts[2] if len(parts) > 2 else "",
})
if len(errors) >= limit:
break
return errors
except Exception:
return []
def get_agent_activity(agent_id: str, limit: int = 50) -> dict:
"""API: GET /api/agents/{id}/activity — return activity log."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
events = _get_activity_log(agent_id, limit)
return {
"agent_id": agent_id,
"events": events,
}
def get_agent_errors(agent_id: str, limit: int = 20) -> dict:
"""API: GET /api/agents/{id}/errors — return error log."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
errors = _get_error_log(agent_id, limit)
return {
"agent_id": agent_id,
"errors": errors,
}
# ── Token / Cost Usage Tracking ───────────────────────────────────────────────
def _get_agent_usage(agent_id: str) -> dict:
"""
Read ~/.hermes/agents/{agent_id}/usage.json and compute daily/weekly/monthly totals.
Returns {agent_id, today: {tokens, cost}, week: {tokens, cost}, month: {tokens, cost}, history: [...]}.
If the usage file doesn't exist, returns zeros for all periods.
"""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
usage_path = _AGENTS_DIR / agent_id / "usage.json"
if not usage_path.exists():
return {
"agent_id": agent_id,
"today": {"tokens": 0, "cost": 0.0},
"week": {"tokens": 0, "cost": 0.0},
"month": {"tokens": 0, "cost": 0.0},
"history": [],
}
try:
with open(usage_path, "r") as f:
data = json.load(f)
except (json.JSONDecodeError, IOError):
return {
"agent_id": agent_id,
"today": {"tokens": 0, "cost": 0.0},
"week": {"tokens": 0, "cost": 0.0},
"month": {"tokens": 0, "cost": 0.0},
"history": [],
}
token_usage = data.get("token_usage", [])
today_str = datetime.utcnow().strftime("%Y-%m-%d")
# Compute period boundaries
now = datetime.utcnow()
week_ago = now - timedelta(days=7)
month_ago = now - timedelta(days=30)
today_tokens = 0
today_cost = 0.0
week_tokens = 0
week_cost = 0.0
month_tokens = 0
month_cost = 0.0
history = []
for entry in token_usage:
date_str = entry.get("date", "")
prompt = entry.get("prompt_tokens", 0)
completion = entry.get("completion_tokens", 0)
cost = entry.get("cost_usd", 0.0)
total = prompt + completion
try:
entry_date = datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
continue
history.append({
"date": date_str,
"prompt_tokens": prompt,
"completion_tokens": completion,
"total_tokens": total,
"cost_usd": cost,
})
if date_str == today_str:
today_tokens += total
today_cost += cost
if entry_date >= week_ago:
week_tokens += total
week_cost += cost
if entry_date >= month_ago:
month_tokens += total
month_cost += cost
# Sort history newest-first
history.sort(key=lambda x: x["date"], reverse=True)
return {
"agent_id": agent_id,
"today": {"tokens": today_tokens, "cost": round(today_cost, 6)},
"week": {"tokens": week_tokens, "cost": round(week_cost, 6)},
"month": {"tokens": month_tokens, "cost": round(month_cost, 6)},
"history": history,
}
def get_agent_usage(agent_id: str) -> dict:
"""API: GET /api/agents/{id}/usage — return token/cost usage."""
return _get_agent_usage(agent_id)
# ── Chat History ──────────────────────────────────────────────────────────────
def _get_chat_history(agent_id: str, limit: int = 20) -> list[dict]:
"""
Read chat sessions from JSONL files and return history for a specific agent.
Sessions are sorted newest-first.
Returns list of {session_id, title, message_count, created_at, last_message_at, model}.
"""
sessions_dir = _HERMES_DIR / "sessions"
if not sessions_dir.exists():
return []
sessions = sorted(sessions_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
history = []
for session_file in sessions[:limit * 2]: # overscan
if len(history) >= limit:
break
try:
with open(session_file) as f:
lines = f.readlines()
if not lines:
continue
# First line has metadata
metadata = json.loads(lines[0])
created_at = metadata.get("timestamp", "")
model = metadata.get("model", "unknown")
# Count messages
message_count = sum(1 for l in lines if l.strip())
# Title = first user message preview
title = "Chat"
for line in lines[1:]:
if line.strip():
try:
msg = json.loads(line)
if msg.get("role") == "user":
content = str(msg.get("content", ""))[:80]
title = content if content else "Chat"
break
except Exception:
pass
# Last message timestamp
last_msg = None
for line in reversed(lines):
if line.strip():
try:
last_msg = json.loads(line).get("timestamp", created_at)
break
except Exception:
pass
session_id = session_file.stem # filename without .jsonl
history.append({
"session_id": session_id,
"title": title,
"message_count": message_count,
"created_at": created_at,
"last_message_at": last_msg or created_at,
"model": model,
})
except Exception:
continue
return history[:limit]
def get_agent_chat_history(agent_id: str, limit: int = 20) -> dict:
"""API: GET /api/agents/{id}/chat-history — return chat history for agent."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
history = _get_chat_history(agent_id, limit)
return {
"agent_id": agent_id,
"sessions": history,
}
# ── Health Check ───────────────────────────────────────────────────────────────
def _get_agent_health(agent_id: str) -> dict:
"""
Return health metrics for an agent.
- status: active/idle/offline based on process presence
- uptime_seconds: from process start time
- cpu_percent: 60s avg (sampled via ps)
- memory_mb: RSS from ps
- threads: thread count
- pid: process ID if running
"""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
status = "offline"
pid = None
uptime_seconds = 0
cpu_percent = 0.0
memory_mb = 0.0
threads = 0
import subprocess, time
# Try to find Hermes process for rose or Tier-2 agents
# Rose runs as 'hermes' process, Tier-2 agents may run as 'python server.py' or similar
try:
# Find hermes process
ps_result = subprocess.run(
["ps", "aux"],
capture_output=True, text=True, timeout=5
)
for line in ps_result.stdout.split("\n"):
if "hermes" in line.lower() and "grep" not in line:
parts = line.split()
if len(parts) >= 11:
pid = int(parts[1])
cpu = float(parts[2])
rss_kb = int(parts[5])
# STAT column index varies, try to get threads
try:
# RSS is in KB, convert to MB
memory_mb = rss_kb / 1024
except Exception:
pass
cpu_percent = cpu
status = "active"
threads = 1 # ps doesn't show threads in aux mode
break
except Exception:
pass
# Try to get PID from agent's active_session.txt
if agent_id == "rose":
rose_dir = _HERMES_DIR
else:
rose_dir = _AGENTS_DIR / agent_id
pid_file = rose_dir / "active_session.txt"
if pid_file.exists():
try:
pid = int(pid_file.read_text().strip().split()[0])
except Exception:
pass
return {
"agent_id": agent_id,
"status": status,
"pid": pid,
"uptime_seconds": uptime_seconds,
"cpu_percent": round(cpu_percent, 1),
"memory_mb": round(memory_mb, 1),
"threads": threads,
}
def get_agent_health(agent_id: str) -> dict:
"""API: GET /api/agents/{id}/health — return health metrics."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
return _get_agent_health(agent_id)
# ── Task Queue ─────────────────────────────────────────────────────────────────
def _get_task_queue(agent_id: str) -> list[dict]:
"""
Read task queue from ~/.hermes/agents/{id}/tasks.json if it exists.
Returns list of tasks with {id, description, status, created_at}.
"""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return []
tasks_file = _AGENTS_DIR / agent_id / "tasks.json"
if not tasks_file.exists():
return []
try:
import json as _json
data = _json.loads(tasks_file.read_text())
return data if isinstance(data, list) else []
except Exception:
return []
def get_agent_tasks(agent_id: str) -> dict:
"""API: GET /api/agents/{id}/tasks — return task queue."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
tasks = _get_task_queue(agent_id)
return {
"agent_id": agent_id,
"tasks": tasks,
"count": len(tasks),
}
# ── Message Bus ────────────────────────────────────────────────────────────────
def _get_message_bus_overview() -> dict:
"""
Return a overview of all agent inboxes — message counts and recent messages.
"""
agents = []
for agent_id in TIER2_AGENTS:
inbox_path = _AGENTS_DIR / agent_id / "inbox.json"
count = 0
last_msg = None
if inbox_path.exists():
try:
inbox = json.loads(inbox_path.read_text())
messages = inbox.get("messages", [])
count = len(messages)
if messages:
last_msg = {
"from": messages[-1].get("from"),
"subject": messages[-1].get("subject"),
"timestamp": messages[-1].get("timestamp"),
}
except Exception:
pass
agents.append({
"agent_id": agent_id,
"name": TIER2_AGENTS[agent_id].get("name", agent_id),
"emoji": TIER2_AGENTS[agent_id].get("emoji", ""),
"inbox_count": count,
"last_message": last_msg,
})
# Rose inbox
rose_inbox_path = _HERMES_DIR / "inbox.json"
rose_count = 0
rose_last = None
if rose_inbox_path.exists():
try:
inbox = json.loads(rose_inbox_path.read_text())
messages = inbox.get("messages", [])
rose_count = len(messages)
if messages:
rose_last = {
"from": messages[-1].get("from"),
"subject": messages[-1].get("subject"),
"timestamp": messages[-1].get("timestamp"),
}
except Exception:
pass
agents.insert(0, {
"agent_id": "rose",
"name": "Rose",
"emoji": "🌹",
"inbox_count": rose_count,
"last_message": rose_last,
})
return {"agents": agents}
def get_message_bus_overview() -> dict:
"""API: GET /api/agents/message-bus — overview of all agent inboxes."""
return _get_message_bus_overview()
def send_bus_message(target_agent: str, from_agent: str, subject: str, content: str) -> dict:
"""
Write a message directly into an agent's inbox.json via the message bus.
"""
if target_agent not in TIER2_AGENTS and target_agent != "rose":
return {"ok": False, "error": f"Unknown agent: {target_agent}"}
if target_agent == "rose":
inbox_path = _HERMES_DIR / "inbox.json"
else:
inbox_path = _AGENTS_DIR / target_agent / "inbox.json"
inbox_path.parent.mkdir(parents=True, exist_ok=True)
try:
if inbox_path.exists():
inbox = json.loads(inbox_path.read_text())
else:
inbox = {"messages": []}
except Exception:
inbox = {"messages": []}
import datetime
msg_id = f"bus_{int(datetime.datetime.now().timestamp() * 1000)}"
message = {
"id": msg_id,
"from": from_agent,
"subject": subject,
"content": content,
"type": "request",
"status": "unread",
"timestamp": datetime.datetime.now().isoformat(),
}
inbox["messages"].append(message)
inbox_path.write_text(json.dumps(inbox, indent=2))
return {"ok": True, "message_id": msg_id}
def get_agent_bus_messages(agent_id: str, limit: int = 50) -> dict:
"""API: GET /api/agents/{id}/bus-messages — raw inbox messages."""
if agent_id not in TIER2_AGENTS and agent_id != "rose":
return {"error": f"Unknown agent: {agent_id}"}
if agent_id == "rose":
inbox_path = _HERMES_DIR / "inbox.json"
else:
inbox_path = _AGENTS_DIR / agent_id / "inbox.json"
if not inbox_path.exists():
return {"agent_id": agent_id, "messages": []}
try:
inbox = json.loads(inbox_path.read_text())
messages = inbox.get("messages", [])[-limit:]
return {"agent_id": agent_id, "messages": messages}
except Exception:
return {"agent_id": agent_id, "messages": []}
# ── Message Bus Viewer ────────────────────────────────────────────────────────
def get_message_bus_status() -> dict:
"""
Return all inboxes across all agents — a complete view of the message bus.
Returns dict with each agent and their messages.
"""
all_agents = list(TIER2_AGENTS.keys()) + ["rose"]
bus = {}
for agent_id in all_agents:
try:
messages = _read_inbox(agent_id, limit=100)
bus[agent_id] = {
"count": len(messages),
"messages": messages[-10:] if messages else [], # last 10
}
except Exception:
bus[agent_id] = {"count": 0, "messages": [], "error": True}
return {"bus": bus}
def send_bus_message(to_agent: str, from_agent: str, subject: str, content: str, msg_type: str = "request") -> dict:
"""
Write a message directly to an agent's inbox.json.
"""
if to_agent not in TIER2_AGENTS and to_agent != "rose":
return {"ok": False, "error": f"Unknown agent: {to_agent}"}
if to_agent == "rose":
inbox_path = _HERMES_DIR / "inbox.json"
else:
inbox_path = _AGENTS_DIR / to_agent / "inbox.json"
try:
data = []
if inbox_path.exists():
data = json.loads(inbox_path.read_text())
if not isinstance(data, list):
data = []
import uuid, datetime
msg = {
"id": str(uuid.uuid4())[:8],
"from": from_agent,
"to": to_agent,
"type": msg_type,
"subject": subject,
"content": content,
"timestamp": datetime.datetime.now().isoformat() + "Z",
"status": "unread",
}
data.append(msg)
inbox_path.write_text(json.dumps(data, indent=2))
return {"ok": True, "message_id": msg["id"]}
except Exception as e:
return {"ok": False, "error": str(e)}
# ── Memory Search (ChromaDB) ───────────────────────────────────────────────────
def _get_chroma_client():
"""Get or create the shared ChromaDB HTTP client (thread-safe singleton)."""
if not hasattr(_get_chroma_client, "_client"):
_get_chroma_client._client = chromadb.HttpClient(host="127.0.0.1", port=8000)
return _get_chroma_client._client
def _search_agent_memory(agent_id: str, query: str, limit: int = 10) -> list:
"""
Search memory for a specific agent.
Searches the rose_memory collection filtered by topic matching agent_id.
"""
try:
client = _get_chroma_client()
coll = client.get_collection(name="rose_memory")
results = coll.query(
query_texts=[query],
n_results=limit,
include=["metadatas", "documents"],
)
matches = []
for i, doc in enumerate(results.get("documents", [[]])[0] or []):
meta = (results.get("metadatas", [[{}]])[0] or [{}])[i] or {}
topic = meta.get("topic", "")
if not topic.startswith(agent_id):
continue
matches.append({
"id": (results.get("ids", [["?"]])[0] or ["?"])[i],
"topic": topic,
"content": doc,
"confidence": float(meta.get("confidence", 0.0)),
"tags": meta.get("tags", ""),
"vault_path": meta.get("vault_path", ""),
})
return matches
except Exception:
return []
def _search_all_agents_memory(query: str, limit: int = 20) -> list:
"""
Search across all agent memories in ChromaDB.
Returns matches with agent attribution from topic.
Topic format: "agent-name/fact-name" or flat topic name.
"""
try:
client = _get_chroma_client()
coll = client.get_collection(name="rose_memory")
results = coll.query(
query_texts=[query],
n_results=limit,
include=["metadatas", "documents"],
)
matches = []
for i, doc in enumerate(results.get("documents", [[]])[0] or []):
meta = (results.get("metadatas", [[{}]])[0] or [{}])[i] or {}
topic = meta.get("topic", "")
parts = topic.split("/")
agent = parts[0] if len(parts) > 1 else topic
matches.append({
"id": (results.get("ids", [["?"]])[0] or ["?"])[i],
"topic": topic,
"agent": agent,
"content": doc,
"confidence": float(meta.get("confidence", 0.0)),
"tags": meta.get("tags", ""),
"vault_path": meta.get("vault_path", ""),
})
return matches
except Exception:
return []