feat: real-time gateway session sync (Phase 1) (#274)
* feat: add real-time gateway session sync (Phase 1) - Add gateway_watcher.py: background daemon polling state.db every 5s for gateway session changes (telegram, discord, slack, etc.) - Extend get_cli_sessions() to include all non-webui sources - Add SSE endpoint /api/sessions/gateway/stream for real-time push - Add dynamic source badges (telegram=blue, discord=purple, slack=dark purple) - Rename 'Show CLI sessions' to 'Show agent sessions' - Wire watcher lifecycle into server start/stop - 10 tests covering metadata, filtering, SSE, and watcher lifecycle - Activated via the same checkbox as CLI session import Addresses GitHub issue #272 * fix: SSE event name mismatch, TLS attribute, remove PLAN.md - Fix critical SSE bug: frontend listened for 'gateway_session_update' but backend sends 'sessions_changed' -- events were silently dropped - Fix frontend field check: data.changed -> data.sessions (matches the actual payload structure from gateway_watcher) - Fix TLS: ssl.TLSv1_2 -> ssl.TLSVersion.TLSv1_2 (the bare attribute does not exist, would crash TLS setup and silently fall back to HTTP) - Remove PLAN.md: implementation plan should not be committed to repo Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: test isolation and slow-consumer sentinel in gateway sync tests/test_gateway_sync.py: - Fix _get_test_state_dir() path mismatch: the function was computing HERMES_HOME/webui-mvp-test but conftest.py sets HERMES_HOME=TEST_STATE_DIR, so state.db was written to a double-nested path the server never read. Now uses HERMES_WEBUI_STATE_DIR first (which conftest sets directly to TEST_STATE_DIR), fixing the 7/10 test failures in full-suite ordering. - Fix conn cleanup: removed conn.close() from inside try blocks so the connection stays valid for _remove_test_sessions() in the finally block. Previously the closed conn caused ProgrammingError in finally (swallowed by bare except), leaving ghost sessions in state.db on test failure. api/gateway_watcher.py: - Fix slow-consumer queue eviction: when a subscriber queue fills (>10 events) and is removed from _subscribers, now puts a None sentinel into it so the SSE handler unblocks and closes the connection, letting EventSource auto-reconnect. Without this the connection stayed open but received no further events. * fix: test isolation — set HERMES_WEBUI_TEST_STATE_DIR in conftest The gateway sync tests write directly to state.db and must use the same path the test server reads from. Previously they computed the path independently, which broke when test_auth_sessions.py set a different HERMES_WEBUI_STATE_DIR in the test-process environment at import time. tests/conftest.py: - Set HERMES_WEBUI_TEST_STATE_DIR=TEST_STATE_DIR in the test process's os.environ (via setdefault) so gateway tests can read it reliably. Using setdefault preserves any explicit override the caller may pass. tests/test_gateway_sync.py: - Simplify _get_test_state_dir(): check HERMES_WEBUI_TEST_STATE_DIR first (now reliably set by conftest), fall back to HERMES_HOME/webui-mvp-test. Remove the workaround that tried to snapshot HERMES_HOME at import time. Result: 658/658 tests pass in full-suite ordering (was 651 pass / 7 fail). --------- Co-authored-by: bergeouss <bergeouss@users.noreply.github.com> Co-authored-by: Nathan Esquenazi <nesquena@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
225
api/gateway_watcher.py
Normal file
225
api/gateway_watcher.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""
|
||||
Hermes Web UI -- Gateway session watcher.
|
||||
|
||||
Background daemon thread that polls state.db every 5 seconds for changes
|
||||
to gateway sessions (telegram, discord, slack, etc.). When changes are
|
||||
detected, it pushes notifications to all subscribed SSE clients.
|
||||
|
||||
This enables real-time session list updates in the sidebar without
|
||||
requiring any changes to hermes-agent.
|
||||
"""
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from api.config import HOME
|
||||
|
||||
|
||||
# ── State hash tracking ─────────────────────────────────────────────────────
|
||||
|
||||
def _snapshot_hash(sessions: list) -> str:
|
||||
"""Create a lightweight hash of session IDs and timestamps for change detection."""
|
||||
key = '|'.join(
|
||||
f"{s['session_id']}:{s.get('updated_at', 0)}:{s.get('message_count', 0)}"
|
||||
for s in sorted(sessions, key=lambda x: x['session_id'])
|
||||
)
|
||||
return hashlib.md5(key.encode()).hexdigest()
|
||||
|
||||
|
||||
# ── DB resolution (shared pattern with state_sync.py) ──────────────────────
|
||||
|
||||
def _get_state_db_path() -> Path:
|
||||
"""Resolve state.db path for the active profile."""
|
||||
try:
|
||||
from api.profiles import get_active_hermes_home
|
||||
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
|
||||
except Exception:
|
||||
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
|
||||
return hermes_home / 'state.db'
|
||||
|
||||
|
||||
def _get_agent_sessions_from_db() -> list:
|
||||
"""Read all non-webui sessions from state.db.
|
||||
Returns list of session dicts, or empty list on any error.
|
||||
"""
|
||||
db_path = _get_state_db_path()
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT s.id, s.title, s.model, s.message_count,
|
||||
s.started_at, s.source,
|
||||
MAX(m.timestamp) AS last_activity
|
||||
FROM sessions s
|
||||
LEFT JOIN messages m ON m.session_id = s.id
|
||||
WHERE s.source IS NOT NULL AND s.source != 'webui'
|
||||
GROUP BY s.id
|
||||
ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC
|
||||
LIMIT 200
|
||||
""")
|
||||
sessions = []
|
||||
for row in cur.fetchall():
|
||||
sessions.append({
|
||||
'session_id': row['id'],
|
||||
'title': row['title'] or 'Agent Session',
|
||||
'model': row['model'] or 'unknown',
|
||||
'message_count': row['message_count'] or 0,
|
||||
'created_at': row['started_at'],
|
||||
'updated_at': row['last_activity'] or row['started_at'],
|
||||
'source': row['source'] or 'cli',
|
||||
})
|
||||
return sessions
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
# ── GatewayWatcher ──────────────────────────────────────────────────────────
|
||||
|
||||
class GatewayWatcher:
|
||||
"""Background thread that polls state.db for agent session changes.
|
||||
|
||||
Usage:
|
||||
watcher = GatewayWatcher()
|
||||
watcher.start()
|
||||
q = watcher.subscribe()
|
||||
# ... receive change events via q.get() ...
|
||||
watcher.unsubscribe(q)
|
||||
watcher.stop()
|
||||
"""
|
||||
|
||||
POLL_INTERVAL = 5 # seconds between polls
|
||||
SUBSCRIBER_TIMEOUT = 30 # seconds before sending keepalive comment
|
||||
|
||||
def __init__(self):
|
||||
self._subscribers: list[queue.Queue] = []
|
||||
self._sub_lock = threading.Lock()
|
||||
self._stop_event = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
self._last_hash: str = ''
|
||||
self._last_sessions: list = []
|
||||
|
||||
def start(self):
|
||||
"""Start the watcher daemon thread."""
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._stop_event.clear()
|
||||
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='gateway-watcher')
|
||||
self._thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the watcher thread."""
|
||||
self._stop_event.set()
|
||||
# Wake up any subscribers
|
||||
with self._sub_lock:
|
||||
for q in self._subscribers:
|
||||
try:
|
||||
q.put(None) # sentinel
|
||||
except Exception:
|
||||
pass
|
||||
if self._thread:
|
||||
self._thread.join(timeout=3)
|
||||
self._thread = None
|
||||
|
||||
def subscribe(self) -> queue.Queue:
|
||||
"""Subscribe to change events. Returns a queue.Queue.
|
||||
Events are dicts: {'type': 'sessions_changed', 'sessions': [...]}
|
||||
A None sentinel means the watcher is stopping.
|
||||
"""
|
||||
q = queue.Queue(maxsize=10)
|
||||
with self._sub_lock:
|
||||
self._subscribers.append(q)
|
||||
return q
|
||||
|
||||
def unsubscribe(self, q: queue.Queue):
|
||||
"""Remove a subscriber queue."""
|
||||
with self._sub_lock:
|
||||
try:
|
||||
self._subscribers.remove(q)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def _notify_subscribers(self, sessions: list):
|
||||
"""Push change event to all subscribers."""
|
||||
event = {
|
||||
'type': 'sessions_changed',
|
||||
'sessions': sessions,
|
||||
}
|
||||
with self._sub_lock:
|
||||
dead = []
|
||||
for q in self._subscribers:
|
||||
try:
|
||||
q.put_nowait(event)
|
||||
except queue.Full:
|
||||
dead.append(q) # remove slow consumers
|
||||
except Exception:
|
||||
dead.append(q)
|
||||
for q in dead:
|
||||
try:
|
||||
self._subscribers.remove(q)
|
||||
except ValueError:
|
||||
pass
|
||||
# Send a None sentinel so the SSE handler unblocks, closes,
|
||||
# and lets the browser's EventSource auto-reconnect.
|
||||
try:
|
||||
q.put_nowait(None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _poll_loop(self):
|
||||
"""Main polling loop. Runs in a daemon thread."""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
sessions = _get_agent_sessions_from_db()
|
||||
current_hash = _snapshot_hash(sessions)
|
||||
|
||||
if current_hash != self._last_hash:
|
||||
self._last_hash = current_hash
|
||||
self._last_sessions = sessions
|
||||
self._notify_subscribers(sessions)
|
||||
except Exception:
|
||||
pass # never crash the watcher
|
||||
|
||||
# Sleep in small increments so we can stop promptly
|
||||
for _ in range(self.POLL_INTERVAL * 10):
|
||||
if self._stop_event.is_set():
|
||||
return
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
# ── Module-level singleton ─────────────────────────────────────────────────
|
||||
|
||||
_watcher: GatewayWatcher | None = None
|
||||
_watcher_lock = threading.Lock()
|
||||
|
||||
|
||||
def start_watcher():
|
||||
"""Start the global gateway watcher (idempotent)."""
|
||||
global _watcher
|
||||
with _watcher_lock:
|
||||
if _watcher is None:
|
||||
_watcher = GatewayWatcher()
|
||||
_watcher.start()
|
||||
|
||||
|
||||
def stop_watcher():
|
||||
"""Stop the global gateway watcher."""
|
||||
global _watcher
|
||||
with _watcher_lock:
|
||||
if _watcher is not None:
|
||||
_watcher.stop()
|
||||
_watcher = None
|
||||
|
||||
|
||||
def get_watcher() -> GatewayWatcher | None:
|
||||
"""Get the global watcher instance (or None if not started)."""
|
||||
with _watcher_lock:
|
||||
return _watcher
|
||||
@@ -269,6 +269,7 @@ def get_cli_sessions() -> list:
|
||||
MAX(m.timestamp) AS last_activity
|
||||
FROM sessions s
|
||||
LEFT JOIN messages m ON m.session_id = s.id
|
||||
WHERE s.source IS NOT NULL AND s.source != 'webui'
|
||||
GROUP BY s.id
|
||||
ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC
|
||||
LIMIT 200
|
||||
@@ -280,9 +281,11 @@ def get_cli_sessions() -> list:
|
||||
# the active CLI profile so sidebar filtering works either way.
|
||||
profile = _cli_profile # CLI DB has no profile column; use active profile
|
||||
|
||||
_source = row['source'] or 'cli'
|
||||
_display_title = row['title'] or f'{_source.title()} Session'
|
||||
cli_sessions.append({
|
||||
'session_id': sid,
|
||||
'title': row['title'] or 'CLI Session',
|
||||
'title': _display_title,
|
||||
'workspace': str(get_last_workspace()),
|
||||
'model': row['model'] or 'unknown',
|
||||
'message_count': row['message_count'] or 0,
|
||||
@@ -292,7 +295,7 @@ def get_cli_sessions() -> list:
|
||||
'archived': False,
|
||||
'project_id': None,
|
||||
'profile': profile,
|
||||
'source_tag': 'cli',
|
||||
'source_tag': _source,
|
||||
'is_cli_session': True,
|
||||
})
|
||||
except Exception:
|
||||
|
||||
@@ -327,6 +327,9 @@ def handle_get(handler, parsed) -> bool:
|
||||
if parsed.path == '/api/chat/stream':
|
||||
return _handle_sse_stream(handler, parsed)
|
||||
|
||||
if parsed.path == '/api/sessions/gateway/stream':
|
||||
return _handle_gateway_sse_stream(handler)
|
||||
|
||||
if parsed.path == '/api/file/raw':
|
||||
return _handle_file_raw(handler, parsed)
|
||||
|
||||
@@ -914,6 +917,52 @@ def _handle_sse_stream(handler, parsed):
|
||||
return True
|
||||
|
||||
|
||||
def _handle_gateway_sse_stream(handler):
|
||||
"""SSE endpoint for real-time gateway session updates.
|
||||
Streams change events from the gateway watcher background thread.
|
||||
Only active when show_cli_sessions (show_agent_sessions) setting is enabled.
|
||||
"""
|
||||
# Check if the feature is enabled
|
||||
settings = load_settings()
|
||||
if not settings.get('show_cli_sessions'):
|
||||
return j(handler, {'error': 'agent sessions not enabled'}, status=404)
|
||||
|
||||
from api.gateway_watcher import get_watcher
|
||||
watcher = get_watcher()
|
||||
if watcher is None:
|
||||
return j(handler, {'error': 'watcher not started'}, status=503)
|
||||
|
||||
handler.send_response(200)
|
||||
handler.send_header('Content-Type', 'text/event-stream; charset=utf-8')
|
||||
handler.send_header('Cache-Control', 'no-cache')
|
||||
handler.send_header('X-Accel-Buffering', 'no')
|
||||
handler.send_header('Connection', 'keep-alive')
|
||||
handler.end_headers()
|
||||
|
||||
q = watcher.subscribe()
|
||||
try:
|
||||
# Send initial snapshot immediately
|
||||
from api.models import get_cli_sessions
|
||||
initial = get_cli_sessions()
|
||||
_sse(handler, 'sessions_changed', {'sessions': initial})
|
||||
|
||||
while True:
|
||||
try:
|
||||
event_data = q.get(timeout=30)
|
||||
except queue.Empty:
|
||||
handler.wfile.write(b': keepalive\n\n')
|
||||
handler.wfile.flush()
|
||||
continue
|
||||
if event_data is None:
|
||||
break # watcher is stopping
|
||||
_sse(handler, event_data.get('type', 'sessions_changed'), event_data)
|
||||
except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
|
||||
pass
|
||||
finally:
|
||||
watcher.unsubscribe(q)
|
||||
return True
|
||||
|
||||
|
||||
def _handle_file_raw(handler, parsed):
|
||||
qs = parse_qs(parsed.query)
|
||||
sid = qs.get('session_id', [''])[0]
|
||||
|
||||
18
server.py
18
server.py
@@ -110,6 +110,14 @@ def main() -> None:
|
||||
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
||||
DEFAULT_WORKSPACE.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Start the gateway session watcher for real-time SSE updates
|
||||
try:
|
||||
from api.gateway_watcher import start_watcher
|
||||
start_watcher()
|
||||
except Exception as e:
|
||||
print(f'[!!] WARNING: Gateway watcher failed to start: {e}', flush=True)
|
||||
|
||||
httpd = ThreadingHTTPServer((HOST, PORT), Handler)
|
||||
|
||||
# ── TLS/HTTPS setup (optional) ─────────────────────────────────────────
|
||||
@@ -132,7 +140,15 @@ def main() -> None:
|
||||
print(f' Remote access: ssh -N -L {PORT}:127.0.0.1:{PORT} <user>@<your-server>', flush=True)
|
||||
print(f' Then open: {scheme}://localhost:{PORT}', flush=True)
|
||||
print('', flush=True)
|
||||
httpd.serve_forever()
|
||||
try:
|
||||
httpd.serve_forever()
|
||||
finally:
|
||||
# Stop the gateway watcher on shutdown
|
||||
try:
|
||||
from api.gateway_watcher import stop_watcher
|
||||
stop_watcher()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -385,11 +385,13 @@ function applyBotName(){
|
||||
_initResizePanels();
|
||||
const saved=localStorage.getItem('hermes-webui-session');
|
||||
if(saved){
|
||||
try{await loadSession(saved);await renderSessionList();await checkInflightOnBoot(saved);return;}
|
||||
try{await loadSession(saved);await renderSessionList();if(typeof startGatewaySSE==='function')startGatewaySSE();await checkInflightOnBoot(saved);return;}
|
||||
catch(e){localStorage.removeItem('hermes-webui-session');}
|
||||
}
|
||||
// no saved session - show empty state, wait for user to hit +
|
||||
$('emptyState').style.display='';
|
||||
await renderSessionList();
|
||||
// Start real-time gateway session sync if setting is enabled
|
||||
if(typeof startGatewaySSE==='function') startGatewaySSE();
|
||||
})();
|
||||
|
||||
|
||||
@@ -121,7 +121,7 @@ const LOCALES = {
|
||||
settings_label_theme: 'Theme',
|
||||
settings_label_language: 'Language',
|
||||
settings_label_token_usage: 'Show token usage',
|
||||
settings_label_cli_sessions: 'Show CLI sessions',
|
||||
settings_label_cli_sessions: 'Show agent sessions',
|
||||
settings_label_sync_insights: 'Sync to insights',
|
||||
settings_label_check_updates: 'Check for updates',
|
||||
settings_label_bot_name: 'Assistant Name',
|
||||
@@ -499,7 +499,7 @@ const LOCALES = {
|
||||
settings_label_theme: 'Theme',
|
||||
settings_label_language: 'Sprache',
|
||||
settings_label_token_usage: 'Token-Verbrauch anzeigen',
|
||||
settings_label_cli_sessions: 'CLI-Sitzungen anzeigen',
|
||||
settings_label_cli_sessions: 'Agent-Sitzungen anzeigen',
|
||||
settings_label_sync_insights: 'Mit Insights synchronisieren',
|
||||
settings_label_check_updates: 'Nach Updates suchen',
|
||||
settings_label_bot_name: 'Assistenten-Name',
|
||||
|
||||
@@ -405,9 +405,9 @@
|
||||
<div class="settings-field">
|
||||
<label style="display:flex;align-items:center;gap:8px;cursor:pointer">
|
||||
<input type="checkbox" id="settingsShowCliSessions" style="width:15px;height:15px;accent-color:var(--accent)">
|
||||
<span data-i18n="settings_label_cli_sessions">Show CLI sessions in sidebar</span>
|
||||
<span data-i18n="settings_label_cli_sessions">Show agent sessions in sidebar</span>
|
||||
</label>
|
||||
<div style="font-size:11px;color:var(--muted);margin-top:4px" data-i18n="settings_desc_cli_sessions">Merges sessions from the Hermes CLI (state.db) into the session list. Click a CLI session to import it and continue the conversation.</div>
|
||||
<div style="font-size:11px;color:var(--muted);margin-top:4px" data-i18n="settings_desc_cli_sessions">Merges sessions from Hermes agent platforms (CLI, Telegram, Discord, Slack, etc.) into the session list. Agent sessions are view-only.</div>
|
||||
</div>
|
||||
<div class="settings-field">
|
||||
<label style="display:flex;align-items:center;gap:8px;cursor:pointer">
|
||||
|
||||
@@ -1113,6 +1113,8 @@ async function saveSettings(andClose){
|
||||
if(typeof applyBotName==='function') applyBotName();
|
||||
if(typeof setLocale==='function') setLocale(language);
|
||||
if(typeof applyLocaleToDOM==='function') applyLocaleToDOM();
|
||||
// Restart gateway SSE when agent session setting changes
|
||||
if(typeof startGatewaySSE==='function'){if(showCliSessions)startGatewaySSE();else if(typeof stopGatewaySSE==='function')stopGatewaySSE();}
|
||||
_settingsDirty=false; _settingsThemeOnOpen=theme;
|
||||
const bar=$('settingsUnsavedBar'); if(bar) bar.style.display='none';
|
||||
renderMessages();
|
||||
|
||||
@@ -248,6 +248,35 @@ async function renderSessionList(){
|
||||
}catch(e){console.warn('renderSessionList',e);}
|
||||
}
|
||||
|
||||
// ── Gateway session SSE (real-time sync for agent sessions) ──
|
||||
let _gatewaySSE = null;
|
||||
|
||||
function startGatewaySSE(){
|
||||
stopGatewaySSE();
|
||||
if(!window._showCliSessions) return;
|
||||
try{
|
||||
_gatewaySSE = new EventSource('/api/sessions/gateway/stream');
|
||||
_gatewaySSE.addEventListener('sessions_changed', (ev) => {
|
||||
try{
|
||||
const data = JSON.parse(ev.data);
|
||||
if(data.sessions){
|
||||
renderSessionList(); // re-fetch and re-render
|
||||
}
|
||||
}catch(e){ /* ignore parse errors */ }
|
||||
});
|
||||
_gatewaySSE.onerror = () => {
|
||||
// EventSource auto-reconnects; no action needed
|
||||
};
|
||||
}catch(e){ /* SSE not available */ }
|
||||
}
|
||||
|
||||
function stopGatewaySSE(){
|
||||
if(_gatewaySSE){
|
||||
_gatewaySSE.close();
|
||||
_gatewaySSE = null;
|
||||
}
|
||||
}
|
||||
|
||||
let _searchDebounceTimer = null;
|
||||
let _contentSearchResults = []; // results from /api/sessions/search content scan
|
||||
|
||||
@@ -409,6 +438,7 @@ function renderSessionListFromCache(){
|
||||
const el=document.createElement('div');
|
||||
const isActive=S.session&&s.session_id===S.session.session_id;
|
||||
el.className='session-item'+(isActive?' active':'')+(isActive&&S.session&&S.session._flash?' new-flash':'')+(s.archived?' archived':'')+(s.is_cli_session?' cli-session':'');
|
||||
if(s.source_tag) el.dataset.source=s.source_tag;
|
||||
if(isActive&&S.session&&S.session._flash)delete S.session._flash;
|
||||
const rawTitle=s.title||'Untitled';
|
||||
const tags=(rawTitle.match(/#[\w-]+/g)||[]);
|
||||
|
||||
@@ -862,13 +862,13 @@ body.resizing{user-select:none;cursor:col-resize;}
|
||||
|
||||
.bg-error-banner{background:rgba(229,62,62,.15);border:1px solid rgba(229,62,62,.3);color:#fca5a5;padding:8px 16px;font-size:12px;display:flex;align-items:center;justify-content:space-between;gap:12px;border-radius:0;}
|
||||
|
||||
/* ── CLI session items in sidebar ── */
|
||||
/* ── CLI / Agent session items in sidebar ── */
|
||||
.session-item.cli-session {
|
||||
border-left-color: var(--gold);
|
||||
padding-right: 40px; /* make room for the session actions trigger */
|
||||
}
|
||||
.session-item.cli-session::after {
|
||||
content: 'cli';
|
||||
content: attr(data-source);
|
||||
font-size: 9px;
|
||||
font-weight: 600;
|
||||
text-transform: uppercase;
|
||||
@@ -882,3 +882,10 @@ body.resizing{user-select:none;cursor:col-resize;}
|
||||
.session-item.cli-session:hover::after {
|
||||
display: none; /* hide badge on hover so session-actions icons are fully reachable */
|
||||
}
|
||||
/* Source-specific colors for gateway sessions */
|
||||
.session-item.cli-session[data-source="telegram"] { border-left-color: #0088cc; }
|
||||
.session-item.cli-session[data-source="telegram"]::after { color: #0088cc; }
|
||||
.session-item.cli-session[data-source="discord"] { border-left-color: #5865F2; }
|
||||
.session-item.cli-session[data-source="discord"]::after { color: #5865F2; }
|
||||
.session-item.cli-session[data-source="slack"] { border-left-color: #4A154B; }
|
||||
.session-item.cli-session[data-source="slack"]::after { color: #4A154B; }
|
||||
|
||||
@@ -238,6 +238,13 @@ def test_server():
|
||||
# Isolated cron state
|
||||
(TEST_STATE_DIR / 'cron').mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Expose TEST_STATE_DIR to the test process itself so that tests which write
|
||||
# directly to state.db (e.g. test_gateway_sync.py) always use the same path
|
||||
# as the server. Other test files (test_auth_sessions.py) may override
|
||||
# HERMES_WEBUI_STATE_DIR for their own purposes, but HERMES_WEBUI_TEST_STATE_DIR
|
||||
# is reserved for this mapping and is never overridden by individual test files.
|
||||
os.environ.setdefault('HERMES_WEBUI_TEST_STATE_DIR', str(TEST_STATE_DIR))
|
||||
|
||||
env = os.environ.copy()
|
||||
env.update({
|
||||
"HERMES_WEBUI_PORT": str(TEST_PORT),
|
||||
|
||||
364
tests/test_gateway_sync.py
Normal file
364
tests/test_gateway_sync.py
Normal file
@@ -0,0 +1,364 @@
|
||||
"""
|
||||
Tests for Phase 1: Real-time Gateway Session Sync.
|
||||
|
||||
Tests are ordered TDD-style:
|
||||
1. Gateway sessions appear in /api/sessions when setting enabled
|
||||
2. Gateway sessions excluded when setting disabled
|
||||
3. Gateway sessions have correct metadata (source_tag, is_cli_session)
|
||||
4. SSE stream endpoint opens and receives events
|
||||
5. Watcher detects new sessions inserted into state.db
|
||||
6. Settings UI has renamed label
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
|
||||
BASE = "http://127.0.0.1:8788"
|
||||
|
||||
|
||||
def get(path):
|
||||
with urllib.request.urlopen(BASE + path, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
|
||||
|
||||
def post(path, body=None):
|
||||
data = json.dumps(body or {}).encode()
|
||||
req = urllib.request.Request(BASE + path, data=data,
|
||||
headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as r:
|
||||
return json.loads(r.read()), r.status
|
||||
except urllib.error.HTTPError as e:
|
||||
try:
|
||||
return json.loads(e.read()), e.code
|
||||
except Exception:
|
||||
return {}, e.code
|
||||
|
||||
|
||||
def _get_test_state_dir():
|
||||
"""Return the test state directory (matches conftest.py TEST_STATE_DIR).
|
||||
|
||||
conftest.py sets HERMES_WEBUI_TEST_STATE_DIR in the test-process environment
|
||||
(via os.environ.setdefault) so that tests writing directly to state.db always
|
||||
use the same path the test server was started with. If the env var is not
|
||||
set (e.g. when running this file standalone), fall back to the conftest
|
||||
formula: HERMES_HOME/webui-mvp-test.
|
||||
"""
|
||||
explicit = os.getenv('HERMES_WEBUI_TEST_STATE_DIR')
|
||||
if explicit:
|
||||
return pathlib.Path(explicit)
|
||||
hermes_home = pathlib.Path(os.getenv('HERMES_HOME', str(pathlib.Path.home() / '.hermes')))
|
||||
return hermes_home / 'webui-mvp-test' # matches conftest.py TEST_STATE_DIR formula
|
||||
|
||||
|
||||
def _get_state_db_path():
|
||||
"""Return path to the test state.db."""
|
||||
return _get_test_state_dir() / 'state.db'
|
||||
|
||||
|
||||
def _ensure_state_db():
|
||||
"""Create state.db with sessions and messages tables if it doesn't exist.
|
||||
Returns a connection. Does NOT delete existing data (safe for parallel tests).
|
||||
"""
|
||||
db_path = _get_state_db_path()
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
id TEXT PRIMARY KEY,
|
||||
source TEXT NOT NULL,
|
||||
user_id TEXT,
|
||||
model TEXT,
|
||||
started_at REAL NOT NULL,
|
||||
message_count INTEGER DEFAULT 0,
|
||||
title TEXT
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id TEXT NOT NULL,
|
||||
role TEXT NOT NULL,
|
||||
content TEXT,
|
||||
timestamp REAL NOT NULL
|
||||
);
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def _insert_gateway_session(conn, session_id='20260401_120000_abcdefgh', source='telegram',
|
||||
title='Telegram Chat', model='anthropic/claude-sonnet-4-5',
|
||||
started_at=None, message_count=2):
|
||||
"""Insert a gateway session into state.db."""
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO sessions (id, source, title, model, started_at, message_count) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(session_id, source, title, model, started_at or time.time(), message_count)
|
||||
)
|
||||
# Delete any existing messages for this session (idempotent re-insert)
|
||||
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
|
||||
# Insert some messages
|
||||
conn.execute(
|
||||
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, 'user', ?, ?)",
|
||||
(session_id, 'Hello from Telegram', started_at or time.time())
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, 'assistant', ?, ?)",
|
||||
(session_id, 'Hi there!', (started_at or time.time()) + 1)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _remove_test_sessions(conn, *session_ids):
|
||||
"""Remove specific test sessions from state.db (parallel-safe cleanup)."""
|
||||
for sid in session_ids:
|
||||
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
|
||||
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _cleanup_state_db():
|
||||
"""Remove state.db if it exists (only used for tests that need a blank slate)."""
|
||||
db_path = _get_state_db_path()
|
||||
for p in [db_path, db_path.parent / 'state.db-wal', db_path.parent / 'state.db-shm']:
|
||||
try:
|
||||
p.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ── Tests ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_gateway_sessions_appear_when_enabled():
|
||||
"""Gateway sessions from state.db appear in /api/sessions when show_cli_sessions is on."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='gw_test_tg_001', source='telegram', title='TG Test Chat')
|
||||
|
||||
# Enable the setting
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
gw_ids = [s['session_id'] for s in sessions if s.get('session_id') == 'gw_test_tg_001']
|
||||
assert len(gw_ids) == 1, f"Expected gateway session gw_test_tg_001, got {[s['session_id'] for s in sessions]}"
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'gw_test_tg_001')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_sessions_excluded_when_disabled():
|
||||
"""Gateway sessions are NOT returned when show_cli_sessions is off."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='gw_test_dc_001', source='discord', title='DC Test Chat')
|
||||
|
||||
# Ensure setting is off
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
gw_ids = [s['session_id'] for s in sessions if s.get('session_id') == 'gw_test_dc_001']
|
||||
assert len(gw_ids) == 0, "Gateway session should not appear when setting is off"
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'gw_test_dc_001')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def test_gateway_session_has_correct_metadata():
|
||||
"""Gateway sessions include source_tag and is_cli_session fields."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='gw_meta_001', source='telegram', title='Meta Test')
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
gw = next((s for s in sessions if s['session_id'] == 'gw_meta_001'), None)
|
||||
assert gw is not None, "Gateway session not found"
|
||||
assert gw.get('source_tag') == 'telegram', f"Expected source_tag=telegram, got {gw.get('source_tag')}"
|
||||
assert gw.get('is_cli_session') is True, "is_cli_session should be True for agent sessions"
|
||||
assert gw.get('title') == 'Meta Test'
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'gw_meta_001')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_session_has_message_count():
|
||||
"""Gateway sessions report correct message_count from state.db."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='gw_msg_001', source='discord', title='Msg Count Test', message_count=5)
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
gw = next((s for s in sessions if s['session_id'] == 'gw_msg_001'), None)
|
||||
assert gw is not None
|
||||
assert gw.get('message_count') == 5, f"Expected message_count=5, got {gw.get('message_count')}"
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'gw_msg_001')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_sessions_multiple_sources():
|
||||
"""Sessions from multiple gateway sources (telegram, discord, slack) all appear."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='gw_multi_tg', source='telegram', title='TG Chat')
|
||||
_insert_gateway_session(conn, session_id='gw_multi_dc', source='discord', title='DC Chat')
|
||||
_insert_gateway_session(conn, session_id='gw_multi_sl', source='slack', title='SL Chat')
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
gw_ids = {s['session_id'] for s in sessions if s.get('session_id') in ('gw_multi_tg', 'gw_multi_dc', 'gw_multi_sl')}
|
||||
assert len(gw_ids) == 3, f"Expected 3 gateway sessions, got {len(gw_ids)}: {gw_ids}"
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'gw_multi_tg', 'gw_multi_dc', 'gw_multi_sl')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_session_messages_readable():
|
||||
"""Gateway session messages can be loaded via /api/session."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='gw_read_001', source='telegram', title='Readable')
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get(f'/api/session?session_id=gw_read_001')
|
||||
assert status == 200
|
||||
msgs = data.get('session', {}).get('messages', [])
|
||||
assert len(msgs) >= 2, f"Expected at least 2 messages, got {len(msgs)}"
|
||||
assert msgs[0].get('role') == 'user'
|
||||
assert msgs[0].get('content') == 'Hello from Telegram'
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'gw_read_001')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_sse_stream_endpoint_exists():
|
||||
"""GET /api/sessions/gateway/stream returns a response (200 or 200-range)."""
|
||||
# The SSE endpoint requires show_cli_sessions to be enabled
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
try:
|
||||
req = urllib.request.Request(BASE + '/api/sessions/gateway/stream')
|
||||
with urllib.request.urlopen(req, timeout=5) as r:
|
||||
assert r.status in (200, 204), f"Expected 200/204, got {r.status}"
|
||||
# SSE should have content-type text/event-stream
|
||||
ctype = r.headers.get('Content-Type', '')
|
||||
assert 'text/event-stream' in ctype, f"Expected text/event-stream, got {ctype}"
|
||||
except Exception as e:
|
||||
# Timeout is acceptable — means the connection is held open (SSE behavior)
|
||||
if 'timed out' in str(e).lower() or 'timeout' in str(e).lower():
|
||||
pass # Good: SSE keeps the connection open
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_webui_sessions_not_duplicated():
|
||||
"""If a session_id exists both in WebUI store and state.db, it's not duplicated."""
|
||||
# Create a WebUI session with a known ID
|
||||
body = {}
|
||||
d, _ = post('/api/session/new', body)
|
||||
webui_sid = d['session']['session_id']
|
||||
|
||||
try:
|
||||
# Insert the same session_id into state.db as a gateway session
|
||||
conn = _ensure_state_db()
|
||||
_insert_gateway_session(conn, session_id=webui_sid, source='telegram', title='Dup Test')
|
||||
conn.close()
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
matching = [s for s in sessions if s['session_id'] == webui_sid]
|
||||
assert len(matching) == 1, f"Expected 1 entry for {webui_sid}, got {len(matching)}"
|
||||
finally:
|
||||
try:
|
||||
conn2 = sqlite3.connect(str(_get_state_db_path()))
|
||||
_remove_test_sessions(conn2, webui_sid)
|
||||
conn2.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/session/delete', {'session_id': webui_sid})
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_sessions_no_state_db():
|
||||
"""When state.db doesn't exist, /api/sessions works fine (no gateway sessions)."""
|
||||
_cleanup_state_db()
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
try:
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
# Should succeed with just webui sessions (or empty)
|
||||
assert 'sessions' in data
|
||||
finally:
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_cli_sessions_still_work():
|
||||
"""CLI sessions (source='cli') still appear alongside gateway sessions."""
|
||||
conn = _ensure_state_db()
|
||||
try:
|
||||
_insert_gateway_session(conn, session_id='cli_legacy_001', source='cli', title='CLI Legacy')
|
||||
_insert_gateway_session(conn, session_id='gw_new_001', source='telegram', title='GW New')
|
||||
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
|
||||
data, status = get('/api/sessions')
|
||||
assert status == 200
|
||||
sessions = data.get('sessions', [])
|
||||
agent_ids = {s['session_id'] for s in sessions if s.get('session_id') in ('cli_legacy_001', 'gw_new_001')}
|
||||
assert len(agent_ids) == 2, f"Expected 2 agent sessions (cli + gateway), got {len(agent_ids)}"
|
||||
finally:
|
||||
try:
|
||||
_remove_test_sessions(conn, 'cli_legacy_001', 'gw_new_001')
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
Reference in New Issue
Block a user