feat(/compress): manual session compression with focus topic — closes #469 (PR #619 by @franksong2702)

POST /api/session/compress with optional focus_topic. Transcript-inline cards: command, running, complete (collapsible green), reference. /compact alias kept. Fixes: var(--green) undefined color, focus_topic 500-char cap. Independent review by @nesquena (4 passes).
This commit is contained in:
nesquena-hermes
2026-04-17 23:55:04 -07:00
committed by GitHub
parent b1aa1cfa4d
commit b49de92893
11 changed files with 1221 additions and 25 deletions

View File

@@ -913,6 +913,9 @@ def handle_post(handler, parsed) -> bool:
handler, {"ok": True, "session": s.compact() | {"messages": s.messages}}
)
if parsed.path == "/api/session/compress":
return _handle_session_compress(handler, body)
if parsed.path == "/api/chat/start":
return _handle_chat_start(handler, body)
@@ -1321,7 +1324,6 @@ def handle_post(handler, parsed) -> bool:
return False # 404
# ── GET route helpers ─────────────────────────────────────────────────────────
# MIME types for static file serving. Hoisted to module scope to avoid
@@ -2481,6 +2483,264 @@ def _handle_clarify_respond(handler, body):
return j(handler, {"ok": True, "response": response})
def _handle_session_compress(handler, body):
def _visible_messages_for_anchor(messages):
out = []
for m in messages or []:
if not isinstance(m, dict):
continue
role = m.get("role")
if not role or role == "tool":
continue
content = m.get("content", "")
has_attachments = bool(m.get("attachments"))
if role == "assistant":
tool_calls = m.get("tool_calls")
has_tool_calls = isinstance(tool_calls, list) and len(tool_calls) > 0
has_tool_use = False
has_reasoning = bool(m.get("reasoning"))
if isinstance(content, list):
for p in content:
if not isinstance(p, dict):
continue
if p.get("type") == "tool_use":
has_tool_use = True
if p.get("type") in {"thinking", "reasoning"}:
has_reasoning = True
text = "\n".join(
str(p.get("text") or p.get("content") or "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
).strip()
else:
text = str(content or "").strip()
if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning:
out.append(m)
continue
if isinstance(content, list):
text = "\n".join(
str(p.get("text") or p.get("content") or "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
).strip()
else:
text = str(content or "").strip()
if text or has_attachments:
out.append(m)
return out
def _anchor_message_key(m):
if not isinstance(m, dict):
return None
role = str(m.get("role") or "")
if not role or role == "tool":
return None
content = m.get("content", "")
if isinstance(content, list):
text = "\n".join(
str(p.get("text") or p.get("content") or "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
)
else:
text = str(content or "")
norm = " ".join(text.split()).strip()[:160]
ts = m.get("_ts") or m.get("timestamp")
attachments = m.get("attachments")
attach_count = len(attachments) if isinstance(attachments, list) else 0
if not norm and not attach_count and not ts:
return None
return {"role": role, "ts": ts, "text": norm, "attachments": attach_count}
try:
require(body, "session_id")
except ValueError as e:
return bad(handler, str(e))
sid = str(body.get("session_id") or "").strip()
if not sid:
return bad(handler, "session_id is required")
# Cap focus_topic to 500 chars — matches the defensive input-size pattern
# used elsewhere (session title :80, first-exchange snippets :500) and
# prevents a user from forwarding an unbounded string into the compressor
# prompt path. No privilege boundary here (user prompting themself), just
# cheap bound-checking.
focus_topic = str(body.get("focus_topic") or body.get("topic") or "").strip()[:500] or None
try:
s = get_session(sid)
except KeyError:
return bad(handler, "Session not found", 404)
if getattr(s, "active_stream_id", None):
return bad(handler, "Session is still streaming; wait for the current turn to finish.", 409)
try:
from api.streaming import _sanitize_messages_for_api
messages = _sanitize_messages_for_api(s.messages)
if len(messages) < 4:
return bad(handler, "Not enough conversation to compress (need at least 4 messages).")
def _fallback_estimate_messages_tokens_rough(msgs):
"""Fallback heuristic token estimate when runtime metadata helpers are absent.
Uses whitespace token-like word counting only. This intentionally
over/under-estimates BPE token counts (roughly around x3/x4 scale),
and is only for resilient fallback behavior.
"""
total = 0
for m in msgs or []:
if not isinstance(m, dict):
continue
content = m.get("content", "")
if isinstance(content, list):
content_text = "\n".join(
str(p.get("text") or p.get("content") or "")
for p in content
if isinstance(p, dict)
)
else:
content_text = str(content or "")
total += len(content_text.split())
return max(1, total)
def _fallback_summarize_manual_compression(original_messages, compressed_messages, before_tokens, after_tokens, focus_topic=None):
"""Lightweight fallback summary to keep /session/compress usable in tests/runtime."""
after_tokens = after_tokens if after_tokens is not None else _fallback_estimate_messages_tokens_rough(compressed_messages)
headline = f"Compressed: {len(original_messages)} \u2192 {len(compressed_messages)} messages"
summary = {
"headline": headline,
"token_line": f"Rough transcript estimate: ~{before_tokens} \u2192 ~{after_tokens} tokens",
"note": f"Focus: {focus_topic}" if focus_topic else None,
}
summary["reference_message"] = (
f"[CONTEXT COMPACTION \u2014 REFERENCE ONLY] {headline}\n"
f"{summary['token_line']}\n"
+ (summary["note"] + "\n" if summary.get("note") else "")
+ "Compression completed."
)
return summary
def _estimate_messages_tokens_rough(msgs):
try:
from agent.model_metadata import estimate_messages_tokens_rough
return estimate_messages_tokens_rough(msgs)
except Exception:
return _fallback_estimate_messages_tokens_rough(msgs)
def _summarize_manual_compression(
original_messages,
compressed_messages,
before_tokens,
after_tokens,
focus_topic=None,
):
try:
from agent.manual_compression_feedback import summarize_manual_compression
return summarize_manual_compression(
original_messages,
compressed_messages,
before_tokens,
after_tokens,
)
except Exception:
return _fallback_summarize_manual_compression(
original_messages,
compressed_messages,
before_tokens,
after_tokens,
focus_topic,
)
import api.config as _cfg
import hermes_cli.runtime_provider as _runtime_provider
import run_agent as _run_agent
resolved_model, resolved_provider, resolved_base_url = _cfg.resolve_model_provider(s.model)
resolved_api_key = None
try:
_rt = _runtime_provider.resolve_runtime_provider(requested=resolved_provider)
resolved_api_key = _rt.get("api_key")
if not resolved_provider:
resolved_provider = _rt.get("provider")
if not resolved_base_url:
resolved_base_url = _rt.get("base_url")
except Exception as _e:
logger.warning("resolve_runtime_provider failed for compression: %s", _e)
if not resolved_api_key:
return bad(handler, "No provider configured -- cannot compress.")
with _cfg._get_session_agent_lock(sid):
original_messages = list(messages)
approx_tokens = _estimate_messages_tokens_rough(original_messages)
agent = _run_agent.AIAgent(
model=resolved_model,
provider=resolved_provider,
base_url=resolved_base_url,
api_key=resolved_api_key,
platform="cli",
quiet_mode=True,
enabled_toolsets=_resolve_cli_toolsets(),
session_id=sid,
)
compressed = agent.context_compressor.compress(
original_messages,
current_tokens=approx_tokens,
focus_topic=focus_topic,
)
new_tokens = _estimate_messages_tokens_rough(compressed)
summary = _summarize_manual_compression(
original_messages,
compressed,
approx_tokens,
new_tokens,
focus_topic=focus_topic,
)
s.messages = compressed
s.tool_calls = []
s.active_stream_id = None
s.pending_user_message = None
s.pending_attachments = []
s.pending_started_at = None
visible_after = _visible_messages_for_anchor(compressed)
s.compression_anchor_visible_idx = max(0, len(visible_after) - 1) if visible_after else None
s.compression_anchor_message_key = _anchor_message_key(visible_after[-1]) if visible_after else None
s.save()
session_payload = redact_session_data(
s.compact() | {
"messages": s.messages,
"tool_calls": s.tool_calls,
"active_stream_id": s.active_stream_id,
"pending_user_message": s.pending_user_message,
"pending_attachments": s.pending_attachments,
"pending_started_at": s.pending_started_at,
"compression_anchor_visible_idx": getattr(s, "compression_anchor_visible_idx", None),
"compression_anchor_message_key": getattr(s, "compression_anchor_message_key", None),
}
)
return j(
handler,
{
"ok": True,
"session": session_payload,
"summary": summary,
"focus_topic": focus_topic,
},
)
except Exception as e:
logger.warning("Manual session compression failed: %s", e)
return bad(handler, f"Compression failed: {_sanitize_error(e)}")
def _handle_skill_save(handler, body):
try:
require(body, "name", "content")