""" Hermes Web UI -- File upload: multipart parser and upload handler. """ import re as _re import email.parser from pathlib import Path from api.config import MAX_UPLOAD_BYTES from api.helpers import j, bad from api.models import get_session from api.workspace import safe_resolve_ws def parse_multipart(rfile, content_type, content_length) -> tuple: import re as _re, email.parser as _ep m = _re.search(r'boundary=([^;\s]+)', content_type) if not m: raise ValueError('No boundary in Content-Type') boundary = m.group(1).strip('"').encode() raw = rfile.read(content_length) fields = {} files = {} delimiter = b'--' + boundary end_marker = b'--' + boundary + b'--' parts = raw.split(delimiter) for part in parts[1:]: stripped = part.lstrip(b'\r\n') if stripped.startswith(b'--'): break sep = b'\r\n\r\n' if b'\r\n\r\n' in part else b'\n\n' if sep not in part: continue header_raw, body = part.split(sep, 1) if body.endswith(b'\r\n'): body = body[:-2] elif body.endswith(b'\n'): body = body[:-1] header_text = header_raw.lstrip(b'\r\n').decode('utf-8', errors='replace') msg = _ep.HeaderParser().parsestr(header_text) disp = msg.get('Content-Disposition', '') name_m = _re.search(r'name="([^"]*)"', disp) file_m = _re.search(r'filename="([^"]*)"', disp) if not name_m: continue name = name_m.group(1) if file_m: files[name] = (file_m.group(1), body) else: fields[name] = body.decode('utf-8', errors='replace') return fields, files def handle_upload(handler): import re as _re, traceback as _tb try: content_type = handler.headers.get('Content-Type', '') content_length = int(handler.headers.get('Content-Length', 0) or 0) if content_length > MAX_UPLOAD_BYTES: return j(handler, {'error': f'File too large (max {MAX_UPLOAD_BYTES//1024//1024}MB)'}, status=413) fields, files = parse_multipart(handler.rfile, content_type, content_length) session_id = fields.get('session_id', '') if 'file' not in files: return j(handler, {'error': 'No file field in request'}, status=400) filename, file_bytes = files['file'] if not filename: return j(handler, {'error': 'No filename in upload'}, status=400) try: s = get_session(session_id) except KeyError: return j(handler, {'error': 'Session not found'}, status=404) workspace = Path(s.workspace) safe_name = _re.sub(r'[^\w.\-]', '_', Path(filename).name)[:200] # Reject names that are purely dots (path traversal: ".." survives regex) if not safe_name or safe_name.strip('.') == '': return j(handler, {'error': 'Invalid filename'}, status=400) # Verify the resolved path stays within the workspace dest = safe_resolve_ws(workspace, safe_name) dest.write_bytes(file_bytes) return j(handler, {'filename': safe_name, 'path': str(dest), 'size': dest.stat().st_size}) except Exception as e: print('[webui] upload error: ' + _tb.format_exc(), flush=True) return j(handler, {'error': 'Upload failed'}, status=500)