diff --git a/apps/resm/management/commands/fix_preview_pdf.py b/apps/resm/management/commands/fix_preview_pdf.py index 89eb0f8..326cc63 100644 --- a/apps/resm/management/commands/fix_preview_pdf.py +++ b/apps/resm/management/commands/fix_preview_pdf.py @@ -12,6 +12,10 @@ 两类在缺少 XML 全文(has_fulltext_xml=False)时, 一并把 has_fulltext 回退 False, 让其重新进入下载链路去找真正的全文; 并追加 fail_reason 标记供抓取任务排除。 +性能: + 读文件 + pypdf 解析是 CPU/IO 密集, 用 ProcessPoolExecutor 并行(--workers, 默认 CPU 核数); + 数据库写入留在主进程串行(坏文件仅占少数, 非瓶颈, 也避免子进程共享 DB 连接)。 + 安全前提: "损坏"只在铁证下判定 —— 文件不以 %PDF 开头, 或已装 pypdf 且解析直接失败。 若未装 pypdf 且魔数正常但页数判不出, 归为 unknown, **不处理、绝不删除**。 @@ -19,19 +23,45 @@ 用法: python manage.py fix_preview_pdf --dry-run - python manage.py fix_preview_pdf # 纠正标记 + 删除坏文件, 保留预览页文件 - python manage.py fix_preview_pdf --delete-file # 并删除预览页文件 + python manage.py fix_preview_pdf # 纠正标记 + 删坏文件, 保留预览页文件 + python manage.py fix_preview_pdf --delete-file # 并删除预览页文件 + python manage.py fix_preview_pdf --workers 16 # 指定并发进程数 """ import os +from concurrent.futures import ProcessPoolExecutor +from django.conf import settings from django.core.management.base import BaseCommand +from django.utils import timezone from apps.resm.models import Paper -from apps.resm.tasks import _inspect_pdf +from apps.resm.pdf_utils import classify_pdf_file + + +def _pdf_path(doi, pub_date): + """按 doi + publication_date 推算 PDF 落盘路径(不创建目录, 只读用)。""" + safe = doi.replace("/", "_") + if pub_date is None: + d = os.path.join(settings.BASE_DIR, "media/papers", "unknown") + else: + d = os.path.join(settings.BASE_DIR, "media/papers", + str(pub_date.year), str(pub_date.month), str(pub_date.day)) + return os.path.join(d, f"{safe}.pdf") + + +def _batched(iterable, size): + batch = [] + for item in iterable: + batch.append(item) + if len(batch) >= size: + yield batch + batch = [] + if batch: + yield batch class Command(BaseCommand): - help = "纠正被误标为全文的 Elsevier 预览页 / 损坏 PDF" + help = "纠正被误标为全文的 Elsevier 预览页 / 损坏 PDF(多进程并发)" def add_arguments(self, parser): parser.add_argument("--dry-run", action="store_true", @@ -40,80 +70,96 @@ class Command(BaseCommand): help="最多处理多少条 (0=不限)") parser.add_argument("--delete-file", action="store_true", help="同时删除预览页文件(坏文件无论该开关都会删)") + parser.add_argument("--workers", type=int, default=0, + help="并发进程数 (0=CPU 核数)") + parser.add_argument("--batch", type=int, default=2000, + help="每批处理多少条(控制内存与进度粒度)") def handle(self, *args, **opts): dry = opts["dry_run"] limit = opts["limit"] del_preview = opts["delete_file"] + batch = max(1, opts["batch"]) + workers = opts["workers"] or (os.cpu_count() or 4) qs = Paper.objects.filter( has_fulltext_pdf=True, doi__startswith="10.1016" ).order_by("id") total = qs.count() self.stdout.write( - f"候选(has_fulltext_pdf=True 且 DOI 以 10.1016 开头): {total}") + f"候选(has_fulltext_pdf=True 且 DOI 以 10.1016 开头): {total}; " + f"workers={workers} batch={batch}" + + (" (dry-run)" if dry else "")) + + rows_iter = qs.values( + "id", "doi", "publication_date", "has_fulltext_xml", "fail_reason" + ).iterator(chunk_size=batch) checked = preview = broken = only_pdf = deleted = 0 missing = unknown = 0 - for paper in qs.iterator(): - if limit and checked >= limit: - break - checked += 1 - path = paper.init_paper_path("pdf") - if not os.path.exists(path): - missing += 1 - continue - try: - with open(path, "rb") as f: - content = f.read() - except OSError: - unknown += 1 - continue + with ProcessPoolExecutor(max_workers=workers) as ex: + stop = False + for chunk in _batched(rows_iter, batch): + if stop: + break + paths = [_pdf_path(r["doi"], r["publication_date"]) for r in chunk] + results = ex.map(classify_pdf_file, paths, chunksize=32) + for r, (_path, kind, pages) in zip(chunk, results): + if limit and checked >= limit: + stop = True + break + checked += 1 - kind, pages = _inspect_pdf(content) - if kind == "ok": - continue - if kind == "unknown": - unknown += 1 - continue + if kind == "missing": + missing += 1 + continue + if kind in ("ok", "unknown", "unreadable"): + if kind != "ok": + unknown += 1 + continue - # kind in ('preview', 'broken'): 都要纠正标记 - do_delete = (kind == "broken") or del_preview - only_pdf_case = not paper.has_fulltext_xml - if kind == "preview": - preview += 1 - tag = f"preview {pages}p" - else: - broken += 1 - tag = "broken" - if only_pdf_case: - only_pdf += 1 - self.stdout.write( - f"[{tag}]{' (only-pdf)' if only_pdf_case else ''}" - f"{' +rm' if do_delete else ''} {paper.doi} {path}") - if dry: - continue + # kind in ('preview', 'broken'): 纠正标记 + do_delete = (kind == "broken") or del_preview + only_pdf_case = not r["has_fulltext_xml"] + if kind == "preview": + preview += 1 + tag = f"preview {pages}p" + reason = "elsevier_pdf_preview_only" + else: + broken += 1 + tag = "broken" + reason = "pdf_broken" + if only_pdf_case: + only_pdf += 1 + self.stdout.write( + f"[{tag}]{' (only-pdf)' if only_pdf_case else ''}" + f"{' +rm' if do_delete else ''} {r['doi']} {_path}") + if dry: + continue - paper.has_fulltext_pdf = False - update_fields = ["has_fulltext_pdf", "update_time"] - # 没有 XML 全文时, has_fulltext 之前只是被这张假/坏 PDF 置上的, 一并回退 - if only_pdf_case: - paper.has_fulltext = False - update_fields.insert(0, "has_fulltext") - paper.save(update_fields=update_fields) - reason = "elsevier_pdf_preview_only" if kind == "preview" else "pdf_broken" - if reason not in (paper.fail_reason or ""): - paper.save_fail_reason(reason) - if do_delete: - try: - os.remove(path) - deleted += 1 - except OSError: - pass + fr = r["fail_reason"] + if reason not in (fr or ""): + fr = f"{fr};{reason}" if fr else f";{reason}" + upd = {"has_fulltext_pdf": False, "fail_reason": fr, + "update_time": timezone.now()} + if only_pdf_case: + upd["has_fulltext"] = False + Paper.objects.filter(id=r["id"]).update(**upd) + if do_delete: + try: + os.remove(_path) + deleted += 1 + except OSError: + pass + + self.stdout.write( + f" 进度 checked={checked}/{total} preview={preview} " + f"broken={broken} deleted={deleted} missing={missing} " + f"unknown={unknown}") self.stdout.write(self.style.SUCCESS( - f"检查={checked} 预览页={preview} 坏文件={broken} " + f"完成 检查={checked} 预览页={preview} 坏文件={broken} " f"(无XML全文一并回退has_fulltext={only_pdf}) 删除文件={deleted} " f"文件缺失={missing} 未知/跳过={unknown}" + (" (dry-run, 未写库)" if dry else "") diff --git a/apps/resm/pdf_utils.py b/apps/resm/pdf_utils.py new file mode 100644 index 0000000..ea12943 --- /dev/null +++ b/apps/resm/pdf_utils.py @@ -0,0 +1,94 @@ +"""PDF 解析/分类工具(纯 stdlib + pypdf, 不依赖 Django)。 + +独立成模块, 以便 ProcessPoolExecutor 的子进程能安全导入(fork/spawn 均可), +不会牵连 Django 模型与配置。tasks.py 从这里复用这些函数。 +""" +import os +import re + + +def _pdf_page_count(content: bytes): + """返回 PDF 页数; 无法确定时返回 None。 + + 优先用 pypdf 精确解析; 未安装或解析异常时退化为字节扫描 + (对未压缩对象树有效, Elsevier 的摘要预览页正属此类)。""" + try: + from io import BytesIO + import logging + # 坏 PDF 会让 pypdf 刷大量恢复日志, 这里只关心页数, 静音其 logger + logging.getLogger("pypdf").setLevel(logging.CRITICAL) + from pypdf import PdfReader + return len(PdfReader(BytesIO(content), strict=False).pages) + except ImportError: + pass + except Exception: + return None + try: + counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)] + if counts: + return max(counts) + n = len(re.findall(rb"/Type\s*/Page(?![sR])", content)) + if n: + return n + except Exception: + pass + return None + + +def _is_elsevier_preview_pdf(content: bytes) -> bool: + """判断 Elsevier 返回的 PDF 是否为"摘要预览页"。 + + Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回 + 仅含摘要的 1 页预览 PDF(魔数仍是 %PDF、体积也不小), 全文 XML 却可能正常。 + 判据: 能确定页数且 <= 1 页。无法确定页数时返回 False(从宽, 不误杀真全文)。""" + pages = _pdf_page_count(content) + return pages is not None and pages <= 1 + + +def _inspect_pdf(content: bytes): + """对历史落库的 PDF 文件分类, 返回 (kind, pages)。 + + kind: + 'broken' - 非 PDF(魔数不符)或 pypdf 解析直接失败 -> 可安全删除重抓 + 'preview' - 1 页摘要预览页 + 'ok' - 多页, 视为真全文, 不处理 + 'unknown' - 魔数正常但页数判不出(通常因未装 pypdf) -> 不处理, 绝不当坏文件 + pages: 页数; 无法确定为 None。""" + if not content or b"%PDF" not in content[:1024]: + return "broken", 0 + try: + from io import BytesIO + import logging + logging.getLogger("pypdf").setLevel(logging.CRITICAL) + from pypdf import PdfReader + except ImportError: + # 没装 pypdf: 只能靠字节扫描, 判不出就 unknown(从宽, 不误判为坏) + pages = _pdf_page_count(content) + if pages is None: + return "unknown", None + return ("preview" if pages <= 1 else "ok"), pages + try: + pages = len(PdfReader(BytesIO(content), strict=False).pages) + except Exception: + return "broken", None + if pages <= 0: + return "broken", pages + return ("preview" if pages == 1 else "ok"), pages + + +def classify_pdf_file(path: str): + """并发 worker 入口: 读取并分类单个 PDF 文件路径。 + + 返回 (path, kind, pages)。除 _inspect_pdf 的四种 kind 外, 另有 IO 结果: + 'missing' - 文件不存在 + 'unreadable' - 打开失败(权限等) + 设计为纯函数(仅 stdlib + pypdf), 可被进程池安全 pickle / 导入。""" + try: + if not os.path.exists(path): + return path, "missing", None + with open(path, "rb") as f: + content = f.read() + except OSError: + return path, "unreadable", None + kind, pages = _inspect_pdf(content) + return path, kind, pages diff --git a/apps/resm/tasks.py b/apps/resm/tasks.py index 6308551..cb54933 100644 --- a/apps/resm/tasks.py +++ b/apps/resm/tasks.py @@ -11,7 +11,7 @@ from lxml import etree from celery import current_app from datetime import datetime, timedelta import random -import re +from .pdf_utils import _is_elsevier_preview_pdf from .d_oaurl import download_from_url_playwright import asyncio import sys @@ -600,76 +600,6 @@ def _elsevier_fetch_xml(req, paper): return True, has_fulltext, None -def _pdf_page_count(content: bytes): - """返回 PDF 页数; 无法确定时返回 None。 - - 优先用 pypdf 精确解析; 未安装或解析异常时退化为字节扫描 - (对未压缩对象树有效, Elsevier 的摘要预览页正属此类)。""" - try: - from io import BytesIO - import logging - # 坏 PDF 会让 pypdf 刷大量恢复日志(incorrect header / Cannot find /Root 等), - # 这里只关心页数, 静音其 logger 避免污染输出。 - logging.getLogger("pypdf").setLevel(logging.CRITICAL) - from pypdf import PdfReader - return len(PdfReader(BytesIO(content), strict=False).pages) - except ImportError: - pass - except Exception: - return None - try: - counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)] - if counts: - return max(counts) - n = len(re.findall(rb"/Type\s*/Page(?![sR])", content)) - if n: - return n - except Exception: - pass - return None - - -def _is_elsevier_preview_pdf(content: bytes) -> bool: - """判断 Elsevier 返回的 PDF 是否为"摘要预览页"。 - - Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回 - 仅含摘要的 1 页预览 PDF(魔数仍是 %PDF、体积也不小), 全文 XML 却可能正常。 - 判据: 能确定页数且 <= 1 页。无法确定页数时返回 False(从宽, 不误杀真全文)。""" - pages = _pdf_page_count(content) - return pages is not None and pages <= 1 - - -def _inspect_pdf(content: bytes): - """对历史落库的 PDF 文件分类, 返回 (kind, pages)。 - - kind: - 'broken' - 非 PDF(魔数不符)或 pypdf 解析直接失败 -> 可安全删除重抓 - 'preview' - 1 页摘要预览页 - 'ok' - 多页, 视为真全文, 不处理 - 'unknown' - 魔数正常但页数判不出(通常因未装 pypdf) -> 不处理, 绝不当坏文件 - pages: 页数; 无法确定为 None。""" - if not content or b"%PDF" not in content[:1024]: - return "broken", 0 - try: - from io import BytesIO - import logging - logging.getLogger("pypdf").setLevel(logging.CRITICAL) - from pypdf import PdfReader - except ImportError: - # 没装 pypdf: 只能靠字节扫描, 判不出就 unknown(从宽, 不误判为坏) - pages = _pdf_page_count(content) - if pages is None: - return "unknown", None - return ("preview" if pages <= 1 else "ok"), pages - try: - pages = len(PdfReader(BytesIO(content), strict=False).pages) - except Exception: - return "broken", None - if pages <= 0: - return "broken", pages - return ("preview" if pages == 1 else "ok"), pages - - def _elsevier_fetch_pdf(req, paper): """同一 DOI 取 PDF, 成功落库返回 True。""" try: