"""Smoke: paper_server → zcbot research skill 三步链路。 跑法: .venv/Scripts/python.exe scripts/smoke_paper_skill.py 依赖:paper_server 已 redeploy(list 端点返 abstract / retrieve 端点可用 / filterset_class 生效)。 不动 DB,不动 workspace;PDF 落到系统临时目录,跑完即丢。 三步: 1) search(keyword="cement", limit=5) — 验 list shape + abstract 字段 2) get_paper() — 验 retrieve 端点(原 viewset 没挂 mixin 是 404 bug) 3) fetch_pdf(<有 PDF 的那条>, tmp_dir) ×2 — 验文件落盘 + 复用(第二次应跳过下载直接复用) 任一步异常都打印后 continue 下一步,保证整条链路看一遍。 """ from __future__ import annotations import os import sys import tempfile import time from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) # 读 .env 注入(PAPER_SERVER_URL 可在这里覆盖,默认 http://paper.xxhhcty.xyz:8080) env_file = ROOT / ".env" if env_file.exists(): for line in env_file.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") os.environ.setdefault(k.strip(), v.strip()) from skills.research.paper import _BASE_URL, fetch_pdf, fetch_xml, get_paper, search def _hr(title: str) -> None: print() print("=" * 60) print(f"[{title}]") print("=" * 60) def _truncate(s: str, n: int = 200) -> str: if not s: return "(empty)" s = s.replace("\n", " ") return s if len(s) <= n else s[:n] + "...(+%d chars)" % (len(s) - n) def step0_trgm_speed() -> None: _hr("step 0: search(keyword='cement', limit=3) 不带 filter — 验证 pg_trgm GIN 索引生效") print("note: 加 pg_trgm 索引前会 30s+ timeout(ILIKE 跨 3 列全表扫);加后应几百 ms 内返") t0 = time.time() try: results = search(keyword="cement", limit=3) dt = (time.time() - t0) * 1000 print(f"[OK] returned {len(results)} in {dt:.0f}ms") if dt > 5000: print(f"[WARN] >5s,pg_trgm 索引可能没建对 — 检查 migration 0006_pg_trgm_index 是否跑了") elif dt < 1000: print("[OK] <1s,pg_trgm 索引生效") except Exception as e: print(f"[FAIL] {type(e).__name__}: {e}") print(" (若是 ReadTimeout,pg_trgm 没生效;请确认 migration 跑了 + DB 有 superuser 装扩展)") def step1_search() -> list[dict]: _hr("step 1: search(keyword='cement', has_pdf=True, limit=5)") print(f"PAPER_SERVER_URL = {_BASE_URL}") print("note: 带 has_pdf=True 走 has_fulltext_pdf 索引先过滤,纯 keyword 大词会 ILIKE 全表扫导致 30s timeout (paper_server 既有问题)") t0 = time.time() try: results = search(keyword="cement", has_pdf=True, limit=5) except Exception as e: print(f"[FAIL] {type(e).__name__}: {e}") return [] dt = (time.time() - t0) * 1000 print(f"[OK] returned {len(results)} papers in {dt:.0f}ms") if not results: print("[WARN] 0 results — keyword 'cement' + has_pdf=True 居然没命中?后两步会用空集 fallback") return [] # 验返回 shape:必含 16 字段 expected = { "id", "doi", "title", "first_author", "first_author_institution", "publication_year", "publication_date", "publication_name", "has_fulltext_pdf", "has_fulltext_xml", "has_abstract", "is_oa", "type", "abstract", "pdf_url", "xml_url", } actual = set(results[0].keys()) missing = expected - actual extra = actual - expected if missing: print(f"[FAIL] 字段缺失:{missing}") if extra: print(f"[INFO] 字段冗余(应被 _LIST_FIELDS 过滤掉):{extra}") if not missing and not extra: print("[OK] 16 字段完整,无冗余") # 抽样打前 3 条 print() for i, p in enumerate(results[:3], 1): print(f"--- paper {i} ---") print(f" id = {p['id']}") print(f" doi = {p['doi']}") print(f" title = {_truncate(p['title'], 100)}") print(f" year = {p['publication_year']} / date={p['publication_date']} / type={p['type']} / is_oa={p['is_oa']}") print(f" has_abstract = {p['has_abstract']}") print(f" has_fulltext_pdf = {p['has_fulltext_pdf']} / has_fulltext_xml = {p['has_fulltext_xml']}") print(f" pdf_url = {p['pdf_url'] or '(empty)'}") print(f" xml_url = {p['xml_url'] or '(empty)'}") print(f" abstract = {_truncate(p['abstract'], 150)}") return results def step1b_abstract_filled() -> None: _hr("step 1b: search(has_abstract=True, limit=3) — 验证 abstract 字段在 list 里确实能装文本") try: t0 = time.time() results = search(keyword="hydration", has_pdf=True, limit=3) dt = (time.time() - t0) * 1000 print(f"[OK] returned {len(results)} in {dt:.0f}ms") if not results: print("[WARN] hydration+has_pdf 也是 0 结果,跳过") return for i, p in enumerate(results, 1): print(f"--- paper {i} --- has_abstract={p['has_abstract']}") print(f" title = {_truncate(p['title'], 80)}") print(f" abstract = {_truncate(p['abstract'], 200)}") any_filled = any(p["abstract"] for p in results if p.get("has_abstract")) if any_filled: print("\n[OK] abstract 字段实际装载了文本(non-empty)— list 加 abstract 改动真实生效") else: print('\n[INFO] 本批候选 has_abstract 全为 False / abstract 全空;不算 fail,只说明本批走的 serializer 的 "无 PaperAbstract 行返空串" 分支') except Exception as e: print(f"[FAIL] {type(e).__name__}: {e}") def step2_get_paper(papers: list[dict]) -> None: _hr("step 2: get_paper()") if not papers: print("[SKIP] step 1 无结果,无法挑 doi") return doi = papers[0]["doi"] print(f"querying doi = {doi}") try: t0 = time.time() paper = get_paper(doi) dt = (time.time() - t0) * 1000 print(f"[OK] retrieve in {dt:.0f}ms") except Exception as e: print(f"[FAIL] {type(e).__name__}: {e}") print(" (若是 404,可能 redeploy 没生效 / retrieve mixin 没挂;原 bug 就是这个)") return print(f" id = {paper.get('id')}") print(f" title = {_truncate(paper.get('title') or '', 100)}") print(f" has_abstract = {paper.get('has_abstract')}") print(f" abstract = {_truncate(paper.get('abstract') or '', 200)}") if paper.get("id") != papers[0]["id"]: print(f"[WARN] retrieve 返回的 id ({paper.get('id')}) ≠ list 里的 id ({papers[0]['id']})") def step3_fetch_pdf(papers: list[dict]) -> None: _hr("step 3: fetch_pdf(轮询候选直到命中真实可下载, tmp_dir) ×2") candidates = [p for p in papers if p.get("has_fulltext_pdf")] if not candidates: print("[SKIP] step 1 候选里没有 has_fulltext_pdf=True,跳过(P1 timeout 也会落到这里)") return import httpx as _httpx tmp_dir = Path(tempfile.mkdtemp(prefix="zcbot_smoke_paper_")) print(f"tmp working_dir = {tmp_dir}") print(f"候选数: {len(candidates)}") success = False for i, paper in enumerate(candidates, 1): pid = paper["id"] print(f"\n--- 尝试候选 {i}/{len(candidates)}: id={pid} doi={paper['doi']} ---") try: t0 = time.time() rel1 = fetch_pdf(pid, str(tmp_dir)) dt1 = (time.time() - t0) * 1000 abs_path = tmp_dir / rel1 size = abs_path.stat().st_size print(f"[OK] 第 1 次下载: rel={rel1} size={size/1024:.1f}KB in {dt1:.0f}ms") t0 = time.time() rel2 = fetch_pdf(pid, str(tmp_dir)) dt2 = (time.time() - t0) * 1000 print(f"[OK] 第 2 次复用: rel={rel2} in {dt2:.0f}ms (期望 <100ms)") if dt2 > 1000: print(f"[WARN] 第 2 次 >1s,可能没走复用分支?") if rel1 != rel2: print(f"[FAIL] 两次返回路径不一致:{rel1} vs {rel2}") success = True break except _httpx.HTTPStatusError as e: if e.response.status_code == 404: print(f"[SKIP] paper_pdf_view 404 — DB has_fulltext_pdf=True 但磁盘文件缺失(paper_server 数据一致性问题,继续下一个)") continue print(f"[FAIL] HTTPStatusError {e.response.status_code}: {e}") break except RuntimeError as e: print(f"[INFO] {e}") continue except Exception as e: print(f"[FAIL] {type(e).__name__}: {e}") break if not success: print(f"\n[WARN] {len(candidates)} 个候选全部 404 — paper_server 侧 DB/disk 不一致问题严重,fetch_pdf 链路逻辑未真实验证(本地实现按 has_fulltext_pdf=True 是放行的,服务器侧最终又 404 拦下)") print(f"(tmp_dir 留着自查:{tmp_dir})") def step4_fetch_xml() -> None: _hr("step 4: fetch_xml(, tmp_dir) ×2 — 验证 XML 直链 + 复用") try: candidates = search(keyword="cement", limit=20) except Exception as e: print(f"[FAIL] 拉候选失败: {type(e).__name__}: {e}") return xml_candidates = [p for p in candidates if p.get("has_fulltext_xml") and p.get("xml_url")] print(f"候选 {len(candidates)} 条中,has_fulltext_xml=True 且 xml_url 非空: {len(xml_candidates)} 条") if not xml_candidates: print("[INFO] 库里 cement 主题没 has_fulltext_xml 候选,试别的关键词") for kw in ("hydration", "concrete", "polymer"): try: more = search(keyword=kw, limit=20) xml_candidates = [p for p in more if p.get("has_fulltext_xml") and p.get("xml_url")] if xml_candidates: print(f" '{kw}' 拉到 {len(xml_candidates)} 条 XML 候选") break except Exception: continue if not xml_candidates: print("[SKIP] 多关键词试了都没 XML 候选;fetch_xml 链路未验证") return tmp_dir = Path(tempfile.mkdtemp(prefix="zcbot_smoke_xml_")) print(f"tmp working_dir = {tmp_dir}") import httpx as _httpx success = False for i, paper in enumerate(xml_candidates[:5], 1): pid = paper["id"] print(f"\n--- 尝试候选 {i}: id={pid} doi={paper['doi']} xml_url={paper['xml_url']} ---") try: t0 = time.time() rel1 = fetch_xml(pid, str(tmp_dir)) dt1 = (time.time() - t0) * 1000 size = (tmp_dir / rel1).stat().st_size print(f"[OK] 第 1 次下载: rel={rel1} size={size/1024:.1f}KB in {dt1:.0f}ms") t0 = time.time() rel2 = fetch_xml(pid, str(tmp_dir)) dt2 = (time.time() - t0) * 1000 print(f"[OK] 第 2 次复用: rel={rel2} in {dt2:.0f}ms (期望 <100ms)") if rel1 != rel2: print(f"[FAIL] 两次路径不一致:{rel1} vs {rel2}") success = True break except _httpx.HTTPStatusError as e: if e.response.status_code == 404: print(f"[SKIP] media 静态 URL 404 — paper_server disk 文件缺失,继续下一个") continue print(f"[FAIL] HTTPStatusError {e.response.status_code}: {e}") break except Exception as e: print(f"[FAIL] {type(e).__name__}: {e}") break if not success: print(f"\n[WARN] 5 个候选全部 disk 缺失;fetch_xml 客户端代码本身简单(直链 stream + 复用),不阻塞改动验证") print(f"(tmp_dir 留着自查:{tmp_dir})") def main() -> int: print("=" * 60) print("zcbot research skill smoke") print("=" * 60) try: step0_trgm_speed() except Exception as e: print(f"[FAIL step 0] {type(e).__name__}: {e}") try: papers = step1_search() except Exception as e: print(f"[FAIL step 1] {type(e).__name__}: {e}") papers = [] try: step1b_abstract_filled() except Exception as e: print(f"[FAIL step 1b] {type(e).__name__}: {e}") try: step2_get_paper(papers) except Exception as e: print(f"[FAIL step 2] {type(e).__name__}: {e}") try: step3_fetch_pdf(papers) except Exception as e: print(f"[FAIL step 3] {type(e).__name__}: {e}") try: step4_fetch_xml() except Exception as e: print(f"[FAIL step 4] {type(e).__name__}: {e}") print() print("=" * 60) print("smoke done") print("=" * 60) return 0 if __name__ == "__main__": sys.exit(main())