feat: redesign chat transcript + fix streaming/persistence lifecycle — v0.50.70 (PR #587 by @aronprins)
Redesign chat transcript + fix streaming/persistence lifecycle — v0.50.70 Squash-merges PR #587 by @aronprins (Aron Prins). Full credit to @aronprins for all feature and fix work. Transcript redesign: unified --msg-rail/--msg-max CSS variables, user turns as tinted cards, thinking cards as bordered panels, error card treatment, day-change separators, composer fade. Approval/clarify as composer flyouts: cards slide up from behind composer top, overflow:hidden + translateY clip prevents travel visibility, focus({preventScroll:true}). Streaming lifecycle: DOM order user→thinking→tool cards→response, no mid-stream jump. Live tool cards inserted before [data-live-assistant]. Persistence: reasoning attached before s.save(), _restore_reasoning_metadata on reload, role=tool rows preserved in S.messages, CLI-session tool-result fallback. Workspace panel FOUC fix: [data-workspace-panel] set at parse time. Docs: docs/ui-ux/index.html + two-stage-proposal.html. Maintainer additions (433b867): CHANGELOG v0.50.70, version badge, usage badge loop simplification. Reviewed and approved by @nesquena (independent review). 1361 tests passing.
This commit is contained in:
276
api/streaming.py
276
api/streaming.py
@@ -10,6 +10,7 @@ import re
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import copy
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
@@ -539,6 +540,183 @@ def _sanitize_messages_for_api(messages):
|
||||
return clean
|
||||
|
||||
|
||||
def _api_safe_message_positions(messages):
|
||||
"""Return [(original_index, sanitized_message)] for API-safe messages."""
|
||||
valid_tool_call_ids: set = set()
|
||||
for msg in messages:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if msg.get('role') == 'assistant':
|
||||
for tc in msg.get('tool_calls') or []:
|
||||
if isinstance(tc, dict):
|
||||
tid = tc.get('id') or tc.get('call_id') or ''
|
||||
if tid:
|
||||
valid_tool_call_ids.add(tid)
|
||||
|
||||
out = []
|
||||
for idx, msg in enumerate(messages):
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
role = msg.get('role')
|
||||
if role == 'tool':
|
||||
tid = msg.get('tool_call_id') or ''
|
||||
if not tid or tid not in valid_tool_call_ids:
|
||||
continue
|
||||
sanitized = {k: v for k, v in msg.items() if k in _API_SAFE_MSG_KEYS}
|
||||
if sanitized.get('role'):
|
||||
out.append((idx, sanitized))
|
||||
return out
|
||||
|
||||
|
||||
def _restore_reasoning_metadata(previous_messages, updated_messages):
|
||||
"""Carry forward assistant reasoning metadata lost during API-safe history sanitization.
|
||||
|
||||
The provider-facing history strips WebUI-only fields like `reasoning`. When the
|
||||
agent returns its new full message history, prior assistant messages come back
|
||||
without that metadata unless we merge it back in by API-history position.
|
||||
"""
|
||||
if not previous_messages or not updated_messages:
|
||||
return updated_messages
|
||||
updated_messages = list(updated_messages)
|
||||
prev_safe = _api_safe_message_positions(previous_messages)
|
||||
|
||||
def _safe_projection(msg):
|
||||
if not isinstance(msg, dict):
|
||||
return None
|
||||
return {k: v for k, v in msg.items() if k in _API_SAFE_MSG_KEYS and msg.get('role')}
|
||||
|
||||
def _reasoning_only_assistant(msg):
|
||||
if not isinstance(msg, dict) or msg.get('role') != 'assistant' or not msg.get('reasoning'):
|
||||
return False
|
||||
if msg.get('tool_calls'):
|
||||
return False
|
||||
return not _message_text(msg.get('content'))
|
||||
|
||||
safe_pos = 0
|
||||
while safe_pos < len(prev_safe):
|
||||
prev_idx, _ = prev_safe[safe_pos]
|
||||
prev_msg = previous_messages[prev_idx]
|
||||
cur_msg = updated_messages[safe_pos] if safe_pos < len(updated_messages) else None
|
||||
|
||||
if isinstance(prev_msg, dict) and isinstance(cur_msg, dict) and _safe_projection(prev_msg) == _safe_projection(cur_msg):
|
||||
if prev_msg.get('role') == 'assistant' and prev_msg.get('reasoning') and not cur_msg.get('reasoning'):
|
||||
cur_msg['reasoning'] = prev_msg['reasoning']
|
||||
safe_pos += 1
|
||||
continue
|
||||
|
||||
if _reasoning_only_assistant(prev_msg):
|
||||
updated_messages.insert(safe_pos, copy.deepcopy(prev_msg))
|
||||
safe_pos += 1
|
||||
continue
|
||||
|
||||
safe_pos += 1
|
||||
return updated_messages
|
||||
|
||||
|
||||
def _tool_result_snippet(raw) -> str:
|
||||
"""Extract a compact result preview from a stored tool message payload."""
|
||||
text = str(raw or '')
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if isinstance(data, dict):
|
||||
return str(data.get('output') or data.get('result') or data.get('error') or text)[:200]
|
||||
except Exception:
|
||||
pass
|
||||
return text[:200]
|
||||
|
||||
|
||||
def _truncate_tool_args(args, limit: int = 6) -> dict:
|
||||
"""Truncate tool args for compact session persistence."""
|
||||
out = {}
|
||||
if not isinstance(args, dict):
|
||||
return out
|
||||
for k, v in list(args.items())[:limit]:
|
||||
s = str(v)
|
||||
out[k] = s[:120] + ('...' if len(s) > 120 else '')
|
||||
return out
|
||||
|
||||
|
||||
def _nearest_assistant_msg_idx(messages, msg_idx: int) -> int:
|
||||
"""Find the closest preceding assistant message index for a tool result."""
|
||||
for idx in range(msg_idx - 1, -1, -1):
|
||||
msg = messages[idx]
|
||||
if isinstance(msg, dict) and msg.get('role') == 'assistant':
|
||||
return idx
|
||||
return -1
|
||||
|
||||
|
||||
def _extract_tool_calls_from_messages(messages, live_tool_calls=None):
|
||||
"""Build persisted tool-call summaries from final messages plus live progress fallback."""
|
||||
tool_calls = []
|
||||
pending_names = {}
|
||||
pending_args = {}
|
||||
pending_asst_idx = {}
|
||||
tool_msg_sequence = []
|
||||
|
||||
for msg_idx, m in enumerate(messages or []):
|
||||
if not isinstance(m, dict):
|
||||
continue
|
||||
role = m.get('role')
|
||||
if role == 'assistant':
|
||||
content = m.get('content', '')
|
||||
if isinstance(content, list):
|
||||
for part in content:
|
||||
if isinstance(part, dict) and part.get('type') == 'tool_use':
|
||||
tid = part.get('id', '')
|
||||
if tid:
|
||||
pending_names[tid] = part.get('name', '')
|
||||
pending_args[tid] = part.get('input', {})
|
||||
pending_asst_idx[tid] = msg_idx
|
||||
for tc in m.get('tool_calls', []):
|
||||
if not isinstance(tc, dict):
|
||||
continue
|
||||
tid = tc.get('id', '') or tc.get('call_id', '')
|
||||
fn = tc.get('function', {})
|
||||
name = fn.get('name', '')
|
||||
try:
|
||||
args = json.loads(fn.get('arguments', '{}') or '{}')
|
||||
except Exception:
|
||||
args = {}
|
||||
if tid and name:
|
||||
pending_names[tid] = name
|
||||
pending_args[tid] = args
|
||||
pending_asst_idx[tid] = msg_idx
|
||||
elif role == 'tool':
|
||||
tid = m.get('tool_call_id') or m.get('tool_use_id', '')
|
||||
raw = m.get('content', '')
|
||||
seq = {'msg_idx': msg_idx, 'raw': raw, 'resolved': False}
|
||||
if tid:
|
||||
name = pending_names.get(tid, '')
|
||||
if name and name != 'tool':
|
||||
tool_calls.append({
|
||||
'name': name,
|
||||
'snippet': _tool_result_snippet(raw),
|
||||
'tid': tid,
|
||||
'assistant_msg_idx': pending_asst_idx.get(tid, -1),
|
||||
'args': _truncate_tool_args(pending_args.get(tid, {})),
|
||||
})
|
||||
seq['resolved'] = True
|
||||
tool_msg_sequence.append(seq)
|
||||
|
||||
live = [tc for tc in (live_tool_calls or []) if isinstance(tc, dict) and tc.get('name') and tc.get('name') != 'clarify']
|
||||
if live:
|
||||
for seq_idx, seq in enumerate(tool_msg_sequence):
|
||||
if seq.get('resolved'):
|
||||
continue
|
||||
if seq_idx >= len(live):
|
||||
break
|
||||
live_tc = live[seq_idx]
|
||||
tool_calls.append({
|
||||
'name': live_tc.get('name', 'tool'),
|
||||
'snippet': _tool_result_snippet(seq.get('raw', '')),
|
||||
'tid': live_tc.get('tid', '') or '',
|
||||
'assistant_msg_idx': _nearest_assistant_msg_idx(messages, seq.get('msg_idx', -1)),
|
||||
'args': _truncate_tool_args(live_tc.get('args', {}), limit=4),
|
||||
})
|
||||
|
||||
return tool_calls
|
||||
|
||||
|
||||
def _sse(handler, event, data):
|
||||
"""Write one SSE event to the response stream."""
|
||||
payload = f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
||||
@@ -704,6 +882,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
try:
|
||||
_token_sent = False # tracks whether any streamed tokens were sent
|
||||
_reasoning_text = '' # accumulates reasoning/thinking trace for persistence
|
||||
_live_tool_calls = [] # tool progress fallback when final messages omit tool IDs
|
||||
|
||||
def on_token(text):
|
||||
nonlocal _token_sent
|
||||
@@ -749,6 +928,10 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
args_snap[k] = s2[:120] + ('...' if len(s2) > 120 else '')
|
||||
|
||||
if event_type in (None, 'tool.started'):
|
||||
_live_tool_calls.append({
|
||||
'name': name,
|
||||
'args': args if isinstance(args, dict) else {},
|
||||
})
|
||||
put('tool', {
|
||||
'event_type': event_type or 'tool.started',
|
||||
'name': name,
|
||||
@@ -769,6 +952,14 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
return
|
||||
|
||||
if event_type == 'tool.completed':
|
||||
for live_tc in reversed(_live_tool_calls):
|
||||
if live_tc.get('done'):
|
||||
continue
|
||||
if not name or live_tc.get('name') == name:
|
||||
live_tc['done'] = True
|
||||
live_tc['duration'] = cb_kwargs.get('duration')
|
||||
live_tc['is_error'] = bool(cb_kwargs.get('is_error', False))
|
||||
break
|
||||
put('tool_complete', {
|
||||
'event_type': event_type,
|
||||
'name': name,
|
||||
@@ -903,6 +1094,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
# Pass personality via ephemeral_system_prompt (agent's own mechanism)
|
||||
if _personality_prompt:
|
||||
agent.ephemeral_system_prompt = _personality_prompt
|
||||
_previous_messages = list(s.messages or [])
|
||||
result = agent.run_conversation(
|
||||
user_message=workspace_ctx + msg_text,
|
||||
system_message=workspace_system_msg,
|
||||
@@ -910,7 +1102,10 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
task_id=session_id,
|
||||
persist_user_message=msg_text,
|
||||
)
|
||||
s.messages = result.get('messages') or s.messages
|
||||
s.messages = _restore_reasoning_metadata(
|
||||
_previous_messages,
|
||||
result.get('messages') or s.messages,
|
||||
)
|
||||
|
||||
# ── Detect silent agent failure (no assistant reply produced) ──
|
||||
# When the agent catches an auth/network error internally it may return
|
||||
@@ -1011,63 +1206,12 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
s.output_tokens = (s.output_tokens or 0) + output_tokens
|
||||
if estimated_cost:
|
||||
s.estimated_cost = (s.estimated_cost or 0) + estimated_cost
|
||||
# Extract tool call metadata grouped by assistant message index
|
||||
# Each tool call gets assistant_msg_idx so the client can render
|
||||
# cards inline with the assistant bubble that triggered them.
|
||||
tool_calls = []
|
||||
pending_names = {} # tool_call_id -> name
|
||||
pending_args = {} # tool_call_id -> args dict
|
||||
pending_asst_idx = {} # tool_call_id -> index in s.messages
|
||||
for msg_idx, m in enumerate(s.messages):
|
||||
if m.get('role') == 'assistant':
|
||||
c = m.get('content', '')
|
||||
# Anthropic format: content is a list with type=tool_use blocks
|
||||
if isinstance(c, list):
|
||||
for p in c:
|
||||
if isinstance(p, dict) and p.get('type') == 'tool_use':
|
||||
tid = p.get('id', '')
|
||||
pending_names[tid] = p.get('name', '')
|
||||
pending_args[tid] = p.get('input', {})
|
||||
pending_asst_idx[tid] = msg_idx
|
||||
# OpenAI format: tool_calls as top-level field on the message
|
||||
for tc in m.get('tool_calls', []):
|
||||
if not isinstance(tc, dict):
|
||||
continue
|
||||
tid = tc.get('id', '') or tc.get('call_id', '')
|
||||
fn = tc.get('function', {})
|
||||
name = fn.get('name', '')
|
||||
try:
|
||||
import json as _j
|
||||
args = _j.loads(fn.get('arguments', '{}') or '{}')
|
||||
except Exception:
|
||||
args = {}
|
||||
if tid and name:
|
||||
pending_names[tid] = name
|
||||
pending_args[tid] = args
|
||||
pending_asst_idx[tid] = msg_idx
|
||||
elif m.get('role') == 'tool':
|
||||
tid = m.get('tool_call_id') or m.get('tool_use_id', '')
|
||||
name = pending_names.get(tid, '')
|
||||
if not name or name == 'tool':
|
||||
continue # skip unresolvable tool entries
|
||||
asst_idx = pending_asst_idx.get(tid, -1)
|
||||
args = pending_args.get(tid, {})
|
||||
raw = str(m.get('content', ''))
|
||||
try:
|
||||
rd = json.loads(raw)
|
||||
snippet = str(rd.get('output') or rd.get('result') or rd.get('error') or raw)[:200]
|
||||
except Exception:
|
||||
snippet = raw[:200]
|
||||
# Truncate args values for storage
|
||||
args_snap = {}
|
||||
if isinstance(args, dict):
|
||||
for k, v in list(args.items())[:6]:
|
||||
s2 = str(v)
|
||||
args_snap[k] = s2[:120] + ('...' if len(s2) > 120 else '')
|
||||
tool_calls.append({
|
||||
'name': name, 'snippet': snippet, 'tid': tid,
|
||||
'assistant_msg_idx': asst_idx, 'args': args_snap,
|
||||
})
|
||||
# Persist tool-call summaries even when the final message history only
|
||||
# kept bare tool rows and omitted explicit assistant tool_call IDs.
|
||||
tool_calls = _extract_tool_calls_from_messages(
|
||||
s.messages,
|
||||
live_tool_calls=_live_tool_calls,
|
||||
)
|
||||
s.tool_calls = tool_calls
|
||||
s.active_stream_id = None
|
||||
s.pending_user_message = None
|
||||
@@ -1085,6 +1229,15 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
if base_text[:60] in content or content[:60] in msg_text:
|
||||
m['attachments'] = attachments
|
||||
break
|
||||
# Persist reasoning trace in the session so it survives reload.
|
||||
# Must run BEFORE s.save() — otherwise the mutation lives only in
|
||||
# memory until the next turn's save, and the last-turn thinking card
|
||||
# is lost when the user reloads immediately after a response.
|
||||
if _reasoning_text and s.messages:
|
||||
for _rm in reversed(s.messages):
|
||||
if isinstance(_rm, dict) and _rm.get('role') == 'assistant':
|
||||
_rm['reasoning'] = _reasoning_text
|
||||
break
|
||||
s.save()
|
||||
# Sync to state.db for /insights (opt-in setting)
|
||||
try:
|
||||
@@ -1109,12 +1262,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
usage['context_length'] = getattr(_cc, 'context_length', 0) or 0
|
||||
usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0
|
||||
usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0
|
||||
# Persist reasoning trace in the session so it survives reload
|
||||
if _reasoning_text and s.messages:
|
||||
for _rm in reversed(s.messages):
|
||||
if isinstance(_rm, dict) and _rm.get('role') == 'assistant':
|
||||
_rm['reasoning'] = _reasoning_text
|
||||
break
|
||||
# (reasoning trace already attached + saved above, before s.save())
|
||||
raw_session = s.compact() | {'messages': s.messages, 'tool_calls': tool_calls}
|
||||
put('done', {'session': redact_session_data(raw_session), 'usage': usage})
|
||||
if _should_bg_title and _u0 and _a0:
|
||||
|
||||
Reference in New Issue
Block a user