perf(resm): fix_preview_pdf 多进程并发扫描

读文件 + pypdf 解析是 CPU/IO 密集, 17 万条串行太慢。改用 ProcessPoolExecutor
并行分类, DB 写入留主进程串行(坏文件仅少数, 非瓶颈, 也避免子进程共享 DB 连接)。

- 新增 apps/resm/pdf_utils.py: 抽出 _pdf_page_count / _is_elsevier_preview_pdf /
  _inspect_pdf / classify_pdf_file, 不依赖 Django, 进程池 fork/spawn 均可安全导入
- tasks.py: 改为从 pdf_utils 导入, 删除内联定义
- 命令新增 --workers(默认 CPU 核数) / --batch; 用 .values() 流式分批, 逐批打印进度;
  DB 写入改用 filter().update() 一次完成, 不再加载模型实例

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-29 13:24:45 +08:00
parent 70bac5c22c
commit 88b51f97b0
3 changed files with 199 additions and 129 deletions

View File

@ -12,6 +12,10 @@
两类在缺少 XML 全文(has_fulltext_xml=False), 一并把 has_fulltext 回退 False, 两类在缺少 XML 全文(has_fulltext_xml=False), 一并把 has_fulltext 回退 False,
让其重新进入下载链路去找真正的全文; 并追加 fail_reason 标记供抓取任务排除 让其重新进入下载链路去找真正的全文; 并追加 fail_reason 标记供抓取任务排除
性能:
读文件 + pypdf 解析是 CPU/IO 密集, ProcessPoolExecutor 并行(--workers, 默认 CPU 核数);
数据库写入留在主进程串行(坏文件仅占少数, 非瓶颈, 也避免子进程共享 DB 连接)
安全前提: 安全前提:
"损坏"只在铁证下判定 文件不以 %PDF 开头, 或已装 pypdf 且解析直接失败 "损坏"只在铁证下判定 文件不以 %PDF 开头, 或已装 pypdf 且解析直接失败
若未装 pypdf 且魔数正常但页数判不出, 归为 unknown, **不处理绝不删除** 若未装 pypdf 且魔数正常但页数判不出, 归为 unknown, **不处理绝不删除**
@ -19,19 +23,45 @@
用法: 用法:
python manage.py fix_preview_pdf --dry-run python manage.py fix_preview_pdf --dry-run
python manage.py fix_preview_pdf # 纠正标记 + 删除坏文件, 保留预览页文件 python manage.py fix_preview_pdf # 纠正标记 + 删坏文件, 保留预览页文件
python manage.py fix_preview_pdf --delete-file # 并删除预览页文件 python manage.py fix_preview_pdf --delete-file # 并删除预览页文件
python manage.py fix_preview_pdf --workers 16 # 指定并发进程数
""" """
import os import os
from concurrent.futures import ProcessPoolExecutor
from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.utils import timezone
from apps.resm.models import Paper from apps.resm.models import Paper
from apps.resm.tasks import _inspect_pdf from apps.resm.pdf_utils import classify_pdf_file
def _pdf_path(doi, pub_date):
"""按 doi + publication_date 推算 PDF 落盘路径(不创建目录, 只读用)。"""
safe = doi.replace("/", "_")
if pub_date is None:
d = os.path.join(settings.BASE_DIR, "media/papers", "unknown")
else:
d = os.path.join(settings.BASE_DIR, "media/papers",
str(pub_date.year), str(pub_date.month), str(pub_date.day))
return os.path.join(d, f"{safe}.pdf")
def _batched(iterable, size):
batch = []
for item in iterable:
batch.append(item)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
class Command(BaseCommand): class Command(BaseCommand):
help = "纠正被误标为全文的 Elsevier 预览页 / 损坏 PDF" help = "纠正被误标为全文的 Elsevier 预览页 / 损坏 PDF(多进程并发)"
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true", parser.add_argument("--dry-run", action="store_true",
@ -40,80 +70,96 @@ class Command(BaseCommand):
help="最多处理多少条 (0=不限)") help="最多处理多少条 (0=不限)")
parser.add_argument("--delete-file", action="store_true", parser.add_argument("--delete-file", action="store_true",
help="同时删除预览页文件(坏文件无论该开关都会删)") help="同时删除预览页文件(坏文件无论该开关都会删)")
parser.add_argument("--workers", type=int, default=0,
help="并发进程数 (0=CPU 核数)")
parser.add_argument("--batch", type=int, default=2000,
help="每批处理多少条(控制内存与进度粒度)")
def handle(self, *args, **opts): def handle(self, *args, **opts):
dry = opts["dry_run"] dry = opts["dry_run"]
limit = opts["limit"] limit = opts["limit"]
del_preview = opts["delete_file"] del_preview = opts["delete_file"]
batch = max(1, opts["batch"])
workers = opts["workers"] or (os.cpu_count() or 4)
qs = Paper.objects.filter( qs = Paper.objects.filter(
has_fulltext_pdf=True, doi__startswith="10.1016" has_fulltext_pdf=True, doi__startswith="10.1016"
).order_by("id") ).order_by("id")
total = qs.count() total = qs.count()
self.stdout.write( self.stdout.write(
f"候选(has_fulltext_pdf=True 且 DOI 以 10.1016 开头): {total}") f"候选(has_fulltext_pdf=True 且 DOI 以 10.1016 开头): {total}; "
f"workers={workers} batch={batch}"
+ (" (dry-run)" if dry else ""))
rows_iter = qs.values(
"id", "doi", "publication_date", "has_fulltext_xml", "fail_reason"
).iterator(chunk_size=batch)
checked = preview = broken = only_pdf = deleted = 0 checked = preview = broken = only_pdf = deleted = 0
missing = unknown = 0 missing = unknown = 0
for paper in qs.iterator():
if limit and checked >= limit:
break
checked += 1
path = paper.init_paper_path("pdf") with ProcessPoolExecutor(max_workers=workers) as ex:
if not os.path.exists(path): stop = False
missing += 1 for chunk in _batched(rows_iter, batch):
continue if stop:
try: break
with open(path, "rb") as f: paths = [_pdf_path(r["doi"], r["publication_date"]) for r in chunk]
content = f.read() results = ex.map(classify_pdf_file, paths, chunksize=32)
except OSError: for r, (_path, kind, pages) in zip(chunk, results):
unknown += 1 if limit and checked >= limit:
continue stop = True
break
checked += 1
kind, pages = _inspect_pdf(content) if kind == "missing":
if kind == "ok": missing += 1
continue continue
if kind == "unknown": if kind in ("ok", "unknown", "unreadable"):
unknown += 1 if kind != "ok":
continue unknown += 1
continue
# kind in ('preview', 'broken'): 都要纠正标记 # kind in ('preview', 'broken'): 纠正标记
do_delete = (kind == "broken") or del_preview do_delete = (kind == "broken") or del_preview
only_pdf_case = not paper.has_fulltext_xml only_pdf_case = not r["has_fulltext_xml"]
if kind == "preview": if kind == "preview":
preview += 1 preview += 1
tag = f"preview {pages}p" tag = f"preview {pages}p"
else: reason = "elsevier_pdf_preview_only"
broken += 1 else:
tag = "broken" broken += 1
if only_pdf_case: tag = "broken"
only_pdf += 1 reason = "pdf_broken"
self.stdout.write( if only_pdf_case:
f"[{tag}]{' (only-pdf)' if only_pdf_case else ''}" only_pdf += 1
f"{' +rm' if do_delete else ''} {paper.doi} {path}") self.stdout.write(
if dry: f"[{tag}]{' (only-pdf)' if only_pdf_case else ''}"
continue f"{' +rm' if do_delete else ''} {r['doi']} {_path}")
if dry:
continue
paper.has_fulltext_pdf = False fr = r["fail_reason"]
update_fields = ["has_fulltext_pdf", "update_time"] if reason not in (fr or ""):
# 没有 XML 全文时, has_fulltext 之前只是被这张假/坏 PDF 置上的, 一并回退 fr = f"{fr};{reason}" if fr else f";{reason}"
if only_pdf_case: upd = {"has_fulltext_pdf": False, "fail_reason": fr,
paper.has_fulltext = False "update_time": timezone.now()}
update_fields.insert(0, "has_fulltext") if only_pdf_case:
paper.save(update_fields=update_fields) upd["has_fulltext"] = False
reason = "elsevier_pdf_preview_only" if kind == "preview" else "pdf_broken" Paper.objects.filter(id=r["id"]).update(**upd)
if reason not in (paper.fail_reason or ""): if do_delete:
paper.save_fail_reason(reason) try:
if do_delete: os.remove(_path)
try: deleted += 1
os.remove(path) except OSError:
deleted += 1 pass
except OSError:
pass self.stdout.write(
f" 进度 checked={checked}/{total} preview={preview} "
f"broken={broken} deleted={deleted} missing={missing} "
f"unknown={unknown}")
self.stdout.write(self.style.SUCCESS( self.stdout.write(self.style.SUCCESS(
f"检查={checked} 预览页={preview} 坏文件={broken} " f"完成 检查={checked} 预览页={preview} 坏文件={broken} "
f"(无XML全文一并回退has_fulltext={only_pdf}) 删除文件={deleted} " f"(无XML全文一并回退has_fulltext={only_pdf}) 删除文件={deleted} "
f"文件缺失={missing} 未知/跳过={unknown}" f"文件缺失={missing} 未知/跳过={unknown}"
+ (" (dry-run, 未写库)" if dry else "") + (" (dry-run, 未写库)" if dry else "")

94
apps/resm/pdf_utils.py Normal file
View File

@ -0,0 +1,94 @@
"""PDF 解析/分类工具(纯 stdlib + pypdf, 不依赖 Django)。
独立成模块, 以便 ProcessPoolExecutor 的子进程能安全导入(fork/spawn 均可),
不会牵连 Django 模型与配置tasks.py 从这里复用这些函数
"""
import os
import re
def _pdf_page_count(content: bytes):
"""返回 PDF 页数; 无法确定时返回 None。
优先用 pypdf 精确解析; 未安装或解析异常时退化为字节扫描
(对未压缩对象树有效, Elsevier 的摘要预览页正属此类)"""
try:
from io import BytesIO
import logging
# 坏 PDF 会让 pypdf 刷大量恢复日志, 这里只关心页数, 静音其 logger
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
from pypdf import PdfReader
return len(PdfReader(BytesIO(content), strict=False).pages)
except ImportError:
pass
except Exception:
return None
try:
counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)]
if counts:
return max(counts)
n = len(re.findall(rb"/Type\s*/Page(?![sR])", content))
if n:
return n
except Exception:
pass
return None
def _is_elsevier_preview_pdf(content: bytes) -> bool:
"""判断 Elsevier 返回的 PDF 是否为"摘要预览页"
Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回
仅含摘要的 1 页预览 PDF(魔数仍是 %PDF体积也不小), 全文 XML 却可能正常
判据: 能确定页数且 <= 1 无法确定页数时返回 False(从宽, 不误杀真全文)"""
pages = _pdf_page_count(content)
return pages is not None and pages <= 1
def _inspect_pdf(content: bytes):
"""对历史落库的 PDF 文件分类, 返回 (kind, pages)。
kind:
'broken' - PDF(魔数不符) pypdf 解析直接失败 -> 可安全删除重抓
'preview' - 1 页摘要预览页
'ok' - 多页, 视为真全文, 不处理
'unknown' - 魔数正常但页数判不出(通常因未装 pypdf) -> 不处理, 绝不当坏文件
pages: 页数; 无法确定为 None"""
if not content or b"%PDF" not in content[:1024]:
return "broken", 0
try:
from io import BytesIO
import logging
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
from pypdf import PdfReader
except ImportError:
# 没装 pypdf: 只能靠字节扫描, 判不出就 unknown(从宽, 不误判为坏)
pages = _pdf_page_count(content)
if pages is None:
return "unknown", None
return ("preview" if pages <= 1 else "ok"), pages
try:
pages = len(PdfReader(BytesIO(content), strict=False).pages)
except Exception:
return "broken", None
if pages <= 0:
return "broken", pages
return ("preview" if pages == 1 else "ok"), pages
def classify_pdf_file(path: str):
"""并发 worker 入口: 读取并分类单个 PDF 文件路径。
返回 (path, kind, pages) _inspect_pdf 的四种 kind , 另有 IO 结果:
'missing' - 文件不存在
'unreadable' - 打开失败(权限等)
设计为纯函数( stdlib + pypdf), 可被进程池安全 pickle / 导入"""
try:
if not os.path.exists(path):
return path, "missing", None
with open(path, "rb") as f:
content = f.read()
except OSError:
return path, "unreadable", None
kind, pages = _inspect_pdf(content)
return path, kind, pages

View File

@ -11,7 +11,7 @@ from lxml import etree
from celery import current_app from celery import current_app
from datetime import datetime, timedelta from datetime import datetime, timedelta
import random import random
import re from .pdf_utils import _is_elsevier_preview_pdf
from .d_oaurl import download_from_url_playwright from .d_oaurl import download_from_url_playwright
import asyncio import asyncio
import sys import sys
@ -600,76 +600,6 @@ def _elsevier_fetch_xml(req, paper):
return True, has_fulltext, None return True, has_fulltext, None
def _pdf_page_count(content: bytes):
"""返回 PDF 页数; 无法确定时返回 None。
优先用 pypdf 精确解析; 未安装或解析异常时退化为字节扫描
(对未压缩对象树有效, Elsevier 的摘要预览页正属此类)"""
try:
from io import BytesIO
import logging
# 坏 PDF 会让 pypdf 刷大量恢复日志(incorrect header / Cannot find /Root 等),
# 这里只关心页数, 静音其 logger 避免污染输出。
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
from pypdf import PdfReader
return len(PdfReader(BytesIO(content), strict=False).pages)
except ImportError:
pass
except Exception:
return None
try:
counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)]
if counts:
return max(counts)
n = len(re.findall(rb"/Type\s*/Page(?![sR])", content))
if n:
return n
except Exception:
pass
return None
def _is_elsevier_preview_pdf(content: bytes) -> bool:
"""判断 Elsevier 返回的 PDF 是否为"摘要预览页"
Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回
仅含摘要的 1 页预览 PDF(魔数仍是 %PDF体积也不小), 全文 XML 却可能正常
判据: 能确定页数且 <= 1 无法确定页数时返回 False(从宽, 不误杀真全文)"""
pages = _pdf_page_count(content)
return pages is not None and pages <= 1
def _inspect_pdf(content: bytes):
"""对历史落库的 PDF 文件分类, 返回 (kind, pages)。
kind:
'broken' - PDF(魔数不符) pypdf 解析直接失败 -> 可安全删除重抓
'preview' - 1 页摘要预览页
'ok' - 多页, 视为真全文, 不处理
'unknown' - 魔数正常但页数判不出(通常因未装 pypdf) -> 不处理, 绝不当坏文件
pages: 页数; 无法确定为 None"""
if not content or b"%PDF" not in content[:1024]:
return "broken", 0
try:
from io import BytesIO
import logging
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
from pypdf import PdfReader
except ImportError:
# 没装 pypdf: 只能靠字节扫描, 判不出就 unknown(从宽, 不误判为坏)
pages = _pdf_page_count(content)
if pages is None:
return "unknown", None
return ("preview" if pages <= 1 else "ok"), pages
try:
pages = len(PdfReader(BytesIO(content), strict=False).pages)
except Exception:
return "broken", None
if pages <= 0:
return "broken", pages
return ("preview" if pages == 1 else "ok"), pages
def _elsevier_fetch_pdf(req, paper): def _elsevier_fetch_pdf(req, paper):
"""同一 DOI 取 PDF, 成功落库返回 True。""" """同一 DOI 取 PDF, 成功落库返回 True。"""
try: try: