"""把 task 的 PG messages 表 + tasks 元数据 渲染为 .docx 对话稿。 布局: - 文档开头 meta 表(task_id / 模式 / 描述 / 模型 / 创建时间 / 消息数 / tokens / 导出时间) - 主体每条消息一组段落,全部左排,小字号,角色用不同颜色加粗区分 - assistant 的 reasoning_content 默认带,灰色斜体 - tool 结果保留前 head + 中间省略 + 后 tail 三段 - tool_calls 把 function 名 + 参数 JSON 单列展示 调用入口: - 顶层函数 export_chat_to_docx(task_id, task_dir=None, out_path=None, ...) - CLI 子命令 `python cli.py export ` 与 REPL `/export []` 都走它 §7 B Step 3 后:meta 和 messages 都从 PG 读(state.json 已废除)。 """ from __future__ import annotations import json from datetime import datetime from pathlib import Path from typing import Optional from uuid import UUID from core.task import TaskState from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.oxml import OxmlElement from docx.oxml.ns import qn from docx.shared import Cm, Pt, RGBColor # ───────────────────────── 配色 ───────────────────────── # 选 Word 浅底高对比度的 GitHub-ish 色板,不刺眼也能区分 COLOR_USER = RGBColor(0x6F, 0x42, 0xC1) # 紫 COLOR_ASSISTANT = RGBColor(0x1F, 0x6F, 0xEB) # 蓝 COLOR_TOOL_CALL = RGBColor(0xBF, 0x63, 0x10) # 橙(深一点保证可读) COLOR_TOOL_RESULT = RGBColor(0x1A, 0x7F, 0x37) # 绿 COLOR_REASONING = RGBColor(0x6E, 0x76, 0x81) # 中灰,斜体 COLOR_SYSTEM = RGBColor(0x57, 0x60, 0x6A) # 暗灰 COLOR_META_LABEL = RGBColor(0x57, 0x60, 0x6A) # ───────────────────────── 字体辅助 ───────────────────────── def _set_run_fonts(run, *, cn_font: str = "宋体", en_font: str = "Times New Roman") -> None: rPr = run._element.get_or_add_rPr() rFonts = rPr.find(qn("w:rFonts")) if rFonts is None: rFonts = OxmlElement("w:rFonts") rPr.append(rFonts) rFonts.set(qn("w:eastAsia"), cn_font) rFonts.set(qn("w:ascii"), en_font) rFonts.set(qn("w:hAnsi"), en_font) def _preserve_spaces(run) -> None: """让 docx 不压缩连续空格 — 代码块/JSON 缩进必须留住。""" for t in run._element.iter(qn("w:t")): t.set(qn("xml:space"), "preserve") # ───────────────────────── 文档骨架 ───────────────────────── def _init_doc() -> Document: doc = Document() section = doc.sections[0] section.page_height = Cm(29.7) section.page_width = Cm(21) section.top_margin = Cm(2.0) section.bottom_margin = Cm(2.0) section.left_margin = Cm(2.5) section.right_margin = Cm(2.0) normal = doc.styles["Normal"] normal.font.name = "Times New Roman" normal.font.size = Pt(9.5) pf = normal.paragraph_format pf.line_spacing = 1.3 pf.space_before = Pt(0) pf.space_after = Pt(0) pf.first_line_indent = None return doc # ───────────────────────── 段落原语 ───────────────────────── def _add_role_header(doc: Document, label: str, color: RGBColor) -> None: p = doc.add_paragraph() pf = p.paragraph_format pf.first_line_indent = None pf.space_before = Pt(8) pf.space_after = Pt(2) p.alignment = WD_ALIGN_PARAGRAPH.LEFT run = p.add_run(label) run.font.size = Pt(10.5) run.font.bold = True run.font.color.rgb = color _set_run_fonts(run, cn_font="黑体", en_font="Consolas") def _add_text( doc: Document, text: str, *, color: Optional[RGBColor] = None, italic: bool = False, mono: bool = False, size: Pt = Pt(9.5), indent_left: Optional[Pt] = None, ) -> None: """整段文本输出。保留 \n 换行;mono 用等宽中文(新宋体)+ Consolas。""" if not text: return p = doc.add_paragraph() pf = p.paragraph_format pf.first_line_indent = None pf.line_spacing = 1.25 pf.space_before = Pt(0) pf.space_after = Pt(2) if indent_left is not None: pf.left_indent = indent_left p.alignment = WD_ALIGN_PARAGRAPH.LEFT cn_font = "新宋体" if mono else "宋体" en_font = "Consolas" if mono else "Times New Roman" lines = text.split("\n") for i, line in enumerate(lines): if i > 0: br = p.add_run() br.add_break() run = p.add_run(line) run.font.size = size if color is not None: run.font.color.rgb = color if italic: run.italic = True _set_run_fonts(run, cn_font=cn_font, en_font=en_font) if mono: _preserve_spaces(run) # ───────────────────────── 工具结果裁剪 ───────────────────────── def _truncate_with_ellipsis(text: str, head: int, tail: int) -> str: """前 head + 省略 + 后 tail。整体短于阈值则原样返回。""" if text is None: return "" if len(text) <= head + tail + 80: return text omitted = len(text) - head - tail return f"{text[:head]}\n\n... [omitted {omitted} chars] ...\n\n{text[-tail:]}" def _format_args(args_str: str) -> str: """tool_call 参数若是合法 JSON 就 pretty,否则原样返回。""" if not args_str: return "" try: parsed = json.loads(args_str) return json.dumps(parsed, ensure_ascii=False, indent=2) except Exception: return args_str # ───────────────────────── Meta 区块 ───────────────────────── def _add_meta_block( doc: Document, meta: dict, task_state: dict, n_msgs: int, working_dir: Optional[Path] ) -> None: p = doc.add_paragraph() p.alignment = WD_ALIGN_PARAGRAPH.LEFT p.paragraph_format.first_line_indent = None p.paragraph_format.space_before = Pt(0) p.paragraph_format.space_after = Pt(4) title = f"Task 对话记录 - {meta.get('id') or task_state.get('task_id') or '?'}" run = p.add_run(title) run.font.size = Pt(14) run.font.bold = True _set_run_fonts(run, cn_font="黑体", en_font="Consolas") name = task_state.get("name") or "" desc = task_state.get("description") or "" skill = task_state.get("skill") or "" status = task_state.get("status") or "" model = meta.get("model") or task_state.get("model") or "" profile = meta.get("model_profile") or task_state.get("model_profile") or "" created = meta.get("created_at") or task_state.get("created_at") or "" updated = task_state.get("updated_at") or "" tp = task_state.get("tokens_prompt", 0) tc = task_state.get("tokens_completion", 0) rows = [ ("Task ID", meta.get("id") or task_state.get("task_id") or "?"), ("任务名", name), ("Skill", skill), ("描述", desc), ("状态", status), ("模型", model), ("Profile", profile), ("创建时间", created), ("更新时间", updated), ("消息数", str(n_msgs)), ("Tokens", f"{tp} prompt / {tc} completion / {tp + tc} total"), ("工作目录", str(working_dir) if working_dir else "(未绑)"), ("导出时间", datetime.now().isoformat(timespec="seconds")), ] table = doc.add_table(rows=len(rows), cols=2) try: table.style = "Light Grid Accent 1" except KeyError: pass for ri, (k, v) in enumerate(rows): c1 = table.rows[ri].cells[0] c1.text = "" p1 = c1.paragraphs[0] p1.paragraph_format.first_line_indent = None p1.paragraph_format.line_spacing = 1.15 run = p1.add_run(k) run.font.size = Pt(9) run.font.bold = True run.font.color.rgb = COLOR_META_LABEL _set_run_fonts(run, cn_font="宋体", en_font="Times New Roman") c2 = table.rows[ri].cells[1] c2.text = "" p2 = c2.paragraphs[0] p2.paragraph_format.first_line_indent = None p2.paragraph_format.line_spacing = 1.15 run = p2.add_run(str(v) if v else "-") run.font.size = Pt(9) _set_run_fonts(run, cn_font="宋体", en_font="Times New Roman") # ───────────────────────── 单条消息渲染 ───────────────────────── def _render_message( doc: Document, msg: dict, *, include_reasoning: bool, tool_head: int, tool_tail: int, ) -> None: role = msg.get("role") if role == "system": _add_role_header(doc, "[system]", COLOR_SYSTEM) content = msg.get("content") or "" # system prompt 通常 2-5KB,导出时也压一下 content = _truncate_with_ellipsis(content, 1500, 500) _add_text(doc, content, color=COLOR_SYSTEM, size=Pt(8.5), mono=True) return if role == "user": _add_role_header(doc, "[user]", COLOR_USER) _add_text(doc, msg.get("content") or "", size=Pt(10)) return if role == "assistant": _add_role_header(doc, "[assistant]", COLOR_ASSISTANT) if include_reasoning: rc = msg.get("reasoning_content") or "" if not rc: psf = msg.get("provider_specific_fields") or {} rc = psf.get("reasoning_content") or "" if rc: _add_text( doc, "▎reasoning", color=COLOR_REASONING, size=Pt(8.5), italic=True, ) _add_text( doc, rc, color=COLOR_REASONING, size=Pt(9), italic=True, indent_left=Pt(12), ) content = msg.get("content") or "" if content: _add_text(doc, content, size=Pt(10)) for call in msg.get("tool_calls") or []: fn_obj = call.get("function") or {} fn = fn_obj.get("name", "?") args = fn_obj.get("arguments", "") cid = call.get("id", "") _add_text( doc, f"▎tool_call -> {fn} ({cid})", color=COLOR_TOOL_CALL, size=Pt(9), italic=True, ) _add_text( doc, _format_args(args), color=COLOR_TOOL_CALL, size=Pt(8.5), mono=True, indent_left=Pt(12), ) return if role == "tool": cid = msg.get("tool_call_id", "") _add_role_header(doc, f"[tool result] ({cid})", COLOR_TOOL_RESULT) content = msg.get("content") or "" truncated = _truncate_with_ellipsis(content, tool_head, tool_tail) _add_text( doc, truncated, color=COLOR_TOOL_RESULT, size=Pt(8.5), mono=True, indent_left=Pt(12), ) return # 兜底:未知 role _add_role_header(doc, f"[{role or 'unknown'}]", COLOR_SYSTEM) _add_text(doc, msg.get("content") or "", size=Pt(9.5)) # ───────────────────────── 顶层入口 ───────────────────────── def export_chat_to_docx( task_id: UUID, working_dir: Optional[Path] = None, out_path: Optional[Path] = None, *, include_system: bool = False, include_reasoning: bool = True, tool_head: int = 1000, tool_tail: int = 500, ) -> Path: """渲染 task 对话为 .docx,返回写入路径。 task_id 是主标识(从 PG 读 messages + 元数据)。 working_dir 留空 → 用 PG tasks.working_dir(用户指定模式可能不在默认派生路径下); DB 也空 → 报错(无处放产物)。out_path 留空 → working_dir / chat_.docx。 """ from dataclasses import asdict from sqlalchemy import select from core.storage import session_scope from core.storage.models import Message as MessageRow with session_scope() as s: rows = s.execute( select(MessageRow).where(MessageRow.task_id == task_id).order_by(MessageRow.idx) ).scalars().all() messages = [dict(r.payload) for r in rows] st = TaskState.load(task_id) task_state: dict = asdict(st) if st is not None else {} if working_dir is None: wd_str = task_state.get("working_dir", "") if wd_str: # wd_str 是 db 形态(相对 ROOT 或绝对),走 from_db_path 还原 absolute Path from core.paths import from_db_path working_dir = from_db_path(wd_str) # else: working_dir 留 None,只在 out_path 也 None 时报错(不能没地方落 .docx) if out_path is None: if working_dir is None: raise ValueError(f"task {task_id} 无 working_dir 且未指定 out_path —— 无处放 .docx") out_path = working_dir / f"chat_{task_id}.docx" meta = { "id": str(task_id), "model": task_state.get("model", ""), "model_profile": task_state.get("model_profile", ""), "created_at": task_state.get("created_at", ""), } doc = _init_doc() _add_meta_block(doc, meta, task_state, len(messages), working_dir) doc.add_paragraph() # 与 meta 表保持一行间距 for msg in messages: if msg.get("role") == "system" and not include_system: continue _render_message( doc, msg, include_reasoning=include_reasoning, tool_head=tool_head, tool_tail=tool_tail, ) out_path.parent.mkdir(parents=True, exist_ok=True) doc.save(str(out_path)) return out_path