diff --git a/api/gateway_watcher.py b/api/gateway_watcher.py new file mode 100644 index 0000000..59db57b --- /dev/null +++ b/api/gateway_watcher.py @@ -0,0 +1,225 @@ +""" +Hermes Web UI -- Gateway session watcher. + +Background daemon thread that polls state.db every 5 seconds for changes +to gateway sessions (telegram, discord, slack, etc.). When changes are +detected, it pushes notifications to all subscribed SSE clients. + +This enables real-time session list updates in the sidebar without +requiring any changes to hermes-agent. +""" +import hashlib +import json +import os +import queue +import sqlite3 +import threading +import time +from pathlib import Path + +from api.config import HOME + + +# ── State hash tracking ───────────────────────────────────────────────────── + +def _snapshot_hash(sessions: list) -> str: + """Create a lightweight hash of session IDs and timestamps for change detection.""" + key = '|'.join( + f"{s['session_id']}:{s.get('updated_at', 0)}:{s.get('message_count', 0)}" + for s in sorted(sessions, key=lambda x: x['session_id']) + ) + return hashlib.md5(key.encode()).hexdigest() + + +# ── DB resolution (shared pattern with state_sync.py) ────────────────────── + +def _get_state_db_path() -> Path: + """Resolve state.db path for the active profile.""" + try: + from api.profiles import get_active_hermes_home + hermes_home = Path(get_active_hermes_home()).expanduser().resolve() + except Exception: + hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve() + return hermes_home / 'state.db' + + +def _get_agent_sessions_from_db() -> list: + """Read all non-webui sessions from state.db. + Returns list of session dicts, or empty list on any error. + """ + db_path = _get_state_db_path() + if not db_path.exists(): + return [] + + try: + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + cur = conn.cursor() + cur.execute(""" + SELECT s.id, s.title, s.model, s.message_count, + s.started_at, s.source, + MAX(m.timestamp) AS last_activity + FROM sessions s + LEFT JOIN messages m ON m.session_id = s.id + WHERE s.source IS NOT NULL AND s.source != 'webui' + GROUP BY s.id + ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC + LIMIT 200 + """) + sessions = [] + for row in cur.fetchall(): + sessions.append({ + 'session_id': row['id'], + 'title': row['title'] or 'Agent Session', + 'model': row['model'] or 'unknown', + 'message_count': row['message_count'] or 0, + 'created_at': row['started_at'], + 'updated_at': row['last_activity'] or row['started_at'], + 'source': row['source'] or 'cli', + }) + return sessions + except Exception: + return [] + + +# ── GatewayWatcher ────────────────────────────────────────────────────────── + +class GatewayWatcher: + """Background thread that polls state.db for agent session changes. + + Usage: + watcher = GatewayWatcher() + watcher.start() + q = watcher.subscribe() + # ... receive change events via q.get() ... + watcher.unsubscribe(q) + watcher.stop() + """ + + POLL_INTERVAL = 5 # seconds between polls + SUBSCRIBER_TIMEOUT = 30 # seconds before sending keepalive comment + + def __init__(self): + self._subscribers: list[queue.Queue] = [] + self._sub_lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._last_hash: str = '' + self._last_sessions: list = [] + + def start(self): + """Start the watcher daemon thread.""" + if self._thread and self._thread.is_alive(): + return + self._stop_event.clear() + self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='gateway-watcher') + self._thread.start() + + def stop(self): + """Stop the watcher thread.""" + self._stop_event.set() + # Wake up any subscribers + with self._sub_lock: + for q in self._subscribers: + try: + q.put(None) # sentinel + except Exception: + pass + if self._thread: + self._thread.join(timeout=3) + self._thread = None + + def subscribe(self) -> queue.Queue: + """Subscribe to change events. Returns a queue.Queue. + Events are dicts: {'type': 'sessions_changed', 'sessions': [...]} + A None sentinel means the watcher is stopping. + """ + q = queue.Queue(maxsize=10) + with self._sub_lock: + self._subscribers.append(q) + return q + + def unsubscribe(self, q: queue.Queue): + """Remove a subscriber queue.""" + with self._sub_lock: + try: + self._subscribers.remove(q) + except ValueError: + pass + + def _notify_subscribers(self, sessions: list): + """Push change event to all subscribers.""" + event = { + 'type': 'sessions_changed', + 'sessions': sessions, + } + with self._sub_lock: + dead = [] + for q in self._subscribers: + try: + q.put_nowait(event) + except queue.Full: + dead.append(q) # remove slow consumers + except Exception: + dead.append(q) + for q in dead: + try: + self._subscribers.remove(q) + except ValueError: + pass + # Send a None sentinel so the SSE handler unblocks, closes, + # and lets the browser's EventSource auto-reconnect. + try: + q.put_nowait(None) + except Exception: + pass + + def _poll_loop(self): + """Main polling loop. Runs in a daemon thread.""" + while not self._stop_event.is_set(): + try: + sessions = _get_agent_sessions_from_db() + current_hash = _snapshot_hash(sessions) + + if current_hash != self._last_hash: + self._last_hash = current_hash + self._last_sessions = sessions + self._notify_subscribers(sessions) + except Exception: + pass # never crash the watcher + + # Sleep in small increments so we can stop promptly + for _ in range(self.POLL_INTERVAL * 10): + if self._stop_event.is_set(): + return + time.sleep(0.1) + + +# ── Module-level singleton ───────────────────────────────────────────────── + +_watcher: GatewayWatcher | None = None +_watcher_lock = threading.Lock() + + +def start_watcher(): + """Start the global gateway watcher (idempotent).""" + global _watcher + with _watcher_lock: + if _watcher is None: + _watcher = GatewayWatcher() + _watcher.start() + + +def stop_watcher(): + """Stop the global gateway watcher.""" + global _watcher + with _watcher_lock: + if _watcher is not None: + _watcher.stop() + _watcher = None + + +def get_watcher() -> GatewayWatcher | None: + """Get the global watcher instance (or None if not started).""" + with _watcher_lock: + return _watcher diff --git a/api/models.py b/api/models.py index c89e9aa..072d531 100644 --- a/api/models.py +++ b/api/models.py @@ -269,6 +269,7 @@ def get_cli_sessions() -> list: MAX(m.timestamp) AS last_activity FROM sessions s LEFT JOIN messages m ON m.session_id = s.id + WHERE s.source IS NOT NULL AND s.source != 'webui' GROUP BY s.id ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC LIMIT 200 @@ -280,9 +281,11 @@ def get_cli_sessions() -> list: # the active CLI profile so sidebar filtering works either way. profile = _cli_profile # CLI DB has no profile column; use active profile + _source = row['source'] or 'cli' + _display_title = row['title'] or f'{_source.title()} Session' cli_sessions.append({ 'session_id': sid, - 'title': row['title'] or 'CLI Session', + 'title': _display_title, 'workspace': str(get_last_workspace()), 'model': row['model'] or 'unknown', 'message_count': row['message_count'] or 0, @@ -292,7 +295,7 @@ def get_cli_sessions() -> list: 'archived': False, 'project_id': None, 'profile': profile, - 'source_tag': 'cli', + 'source_tag': _source, 'is_cli_session': True, }) except Exception: diff --git a/api/routes.py b/api/routes.py index 610c3fc..7ef586d 100644 --- a/api/routes.py +++ b/api/routes.py @@ -327,6 +327,9 @@ def handle_get(handler, parsed) -> bool: if parsed.path == '/api/chat/stream': return _handle_sse_stream(handler, parsed) + if parsed.path == '/api/sessions/gateway/stream': + return _handle_gateway_sse_stream(handler) + if parsed.path == '/api/file/raw': return _handle_file_raw(handler, parsed) @@ -914,6 +917,52 @@ def _handle_sse_stream(handler, parsed): return True +def _handle_gateway_sse_stream(handler): + """SSE endpoint for real-time gateway session updates. + Streams change events from the gateway watcher background thread. + Only active when show_cli_sessions (show_agent_sessions) setting is enabled. + """ + # Check if the feature is enabled + settings = load_settings() + if not settings.get('show_cli_sessions'): + return j(handler, {'error': 'agent sessions not enabled'}, status=404) + + from api.gateway_watcher import get_watcher + watcher = get_watcher() + if watcher is None: + return j(handler, {'error': 'watcher not started'}, status=503) + + handler.send_response(200) + handler.send_header('Content-Type', 'text/event-stream; charset=utf-8') + handler.send_header('Cache-Control', 'no-cache') + handler.send_header('X-Accel-Buffering', 'no') + handler.send_header('Connection', 'keep-alive') + handler.end_headers() + + q = watcher.subscribe() + try: + # Send initial snapshot immediately + from api.models import get_cli_sessions + initial = get_cli_sessions() + _sse(handler, 'sessions_changed', {'sessions': initial}) + + while True: + try: + event_data = q.get(timeout=30) + except queue.Empty: + handler.wfile.write(b': keepalive\n\n') + handler.wfile.flush() + continue + if event_data is None: + break # watcher is stopping + _sse(handler, event_data.get('type', 'sessions_changed'), event_data) + except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError): + pass + finally: + watcher.unsubscribe(q) + return True + + def _handle_file_raw(handler, parsed): qs = parse_qs(parsed.query) sid = qs.get('session_id', [''])[0] diff --git a/server.py b/server.py index 1f07313..ea2d2cf 100644 --- a/server.py +++ b/server.py @@ -110,6 +110,14 @@ def main() -> None: STATE_DIR.mkdir(parents=True, exist_ok=True) SESSION_DIR.mkdir(parents=True, exist_ok=True) DEFAULT_WORKSPACE.mkdir(parents=True, exist_ok=True) + + # Start the gateway session watcher for real-time SSE updates + try: + from api.gateway_watcher import start_watcher + start_watcher() + except Exception as e: + print(f'[!!] WARNING: Gateway watcher failed to start: {e}', flush=True) + httpd = ThreadingHTTPServer((HOST, PORT), Handler) # ── TLS/HTTPS setup (optional) ───────────────────────────────────────── @@ -132,7 +140,15 @@ def main() -> None: print(f' Remote access: ssh -N -L {PORT}:127.0.0.1:{PORT} @', flush=True) print(f' Then open: {scheme}://localhost:{PORT}', flush=True) print('', flush=True) - httpd.serve_forever() + try: + httpd.serve_forever() + finally: + # Stop the gateway watcher on shutdown + try: + from api.gateway_watcher import stop_watcher + stop_watcher() + except Exception: + pass if __name__ == '__main__': main() diff --git a/static/boot.js b/static/boot.js index ab075d2..650a06b 100644 --- a/static/boot.js +++ b/static/boot.js @@ -385,11 +385,13 @@ function applyBotName(){ _initResizePanels(); const saved=localStorage.getItem('hermes-webui-session'); if(saved){ - try{await loadSession(saved);await renderSessionList();await checkInflightOnBoot(saved);return;} + try{await loadSession(saved);await renderSessionList();if(typeof startGatewaySSE==='function')startGatewaySSE();await checkInflightOnBoot(saved);return;} catch(e){localStorage.removeItem('hermes-webui-session');} } // no saved session - show empty state, wait for user to hit + $('emptyState').style.display=''; await renderSessionList(); + // Start real-time gateway session sync if setting is enabled + if(typeof startGatewaySSE==='function') startGatewaySSE(); })(); diff --git a/static/i18n.js b/static/i18n.js index 847ed74..b81255b 100644 --- a/static/i18n.js +++ b/static/i18n.js @@ -121,7 +121,7 @@ const LOCALES = { settings_label_theme: 'Theme', settings_label_language: 'Language', settings_label_token_usage: 'Show token usage', - settings_label_cli_sessions: 'Show CLI sessions', + settings_label_cli_sessions: 'Show agent sessions', settings_label_sync_insights: 'Sync to insights', settings_label_check_updates: 'Check for updates', settings_label_bot_name: 'Assistant Name', @@ -499,7 +499,7 @@ const LOCALES = { settings_label_theme: 'Theme', settings_label_language: 'Sprache', settings_label_token_usage: 'Token-Verbrauch anzeigen', - settings_label_cli_sessions: 'CLI-Sitzungen anzeigen', + settings_label_cli_sessions: 'Agent-Sitzungen anzeigen', settings_label_sync_insights: 'Mit Insights synchronisieren', settings_label_check_updates: 'Nach Updates suchen', settings_label_bot_name: 'Assistenten-Name', diff --git a/static/index.html b/static/index.html index 5033a15..9d9d046 100644 --- a/static/index.html +++ b/static/index.html @@ -405,9 +405,9 @@
-
Merges sessions from the Hermes CLI (state.db) into the session list. Click a CLI session to import it and continue the conversation.
+
Merges sessions from Hermes agent platforms (CLI, Telegram, Discord, Slack, etc.) into the session list. Agent sessions are view-only.