paper_server/apps/resm/management/commands/fix_preview_pdf.py

167 lines
7.3 KiB
Python

"""一次性修复: 纠正被误标为全文 PDF 的历史记录(Elsevier 摘要预览页 / 损坏文件)。
背景:
Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回仅含
摘要的 1 页预览 PDF(魔数仍是 %PDF、体积也不小); 另有部分历史记录把 HTML 错误页 /
被截断的垃圾当 PDF 存了。旧抓取逻辑只校验魔数 + 体积, 都会误标 has_fulltext_pdf=True。
本命令核对本地 PDF, 分两类处理:
- 预览页(1 页): has_fulltext_pdf 置回 False; 文件**仅在 --delete-file 时**删除。
- 损坏文件(非 PDF / pypdf 解析失败): has_fulltext_pdf 置回 False; 文件**总是删除**
(dry-run 除外), 因为它根本不是有效全文, 留着无用且会污染下游解析。
两类在缺少 XML 全文(has_fulltext_xml=False)时, 一并把 has_fulltext 回退 False,
让其重新进入下载链路去找真正的全文; 并追加 fail_reason 标记供抓取任务排除。
性能:
读文件 + pypdf 解析是 CPU/IO 密集, 用 ProcessPoolExecutor 并行(--workers, 默认 CPU 核数);
数据库写入留在主进程串行(坏文件仅占少数, 非瓶颈, 也避免子进程共享 DB 连接)。
安全前提:
"损坏"只在铁证下判定 —— 文件不以 %PDF 开头, 或已装 pypdf 且解析直接失败。
若未装 pypdf 且魔数正常但页数判不出, 归为 unknown, **不处理、绝不删除**。
强烈建议先 `pip install pypdf` 再跑, 否则只能处理魔数明显不符的坏文件。
用法:
python manage.py fix_preview_pdf --dry-run
python manage.py fix_preview_pdf # 纠正标记 + 删坏文件, 保留预览页文件
python manage.py fix_preview_pdf --delete-file # 并删除预览页文件
python manage.py fix_preview_pdf --workers 16 # 指定并发进程数
"""
import os
from concurrent.futures import ProcessPoolExecutor
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import timezone
from apps.resm.models import Paper
from apps.resm.pdf_utils import classify_pdf_file
def _pdf_path(doi, pub_date):
"""按 doi + publication_date 推算 PDF 落盘路径(不创建目录, 只读用)。"""
safe = doi.replace("/", "_")
if pub_date is None:
d = os.path.join(settings.BASE_DIR, "media/papers", "unknown")
else:
d = os.path.join(settings.BASE_DIR, "media/papers",
str(pub_date.year), str(pub_date.month), str(pub_date.day))
return os.path.join(d, f"{safe}.pdf")
def _batched(iterable, size):
batch = []
for item in iterable:
batch.append(item)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
class Command(BaseCommand):
help = "纠正被误标为全文的 Elsevier 预览页 / 损坏 PDF(多进程并发)"
def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true",
help="只统计, 不写库 / 不删文件")
parser.add_argument("--limit", type=int, default=0,
help="最多处理多少条 (0=不限)")
parser.add_argument("--delete-file", action="store_true",
help="同时删除预览页文件(坏文件无论该开关都会删)")
parser.add_argument("--workers", type=int, default=0,
help="并发进程数 (0=CPU 核数)")
parser.add_argument("--batch", type=int, default=2000,
help="每批处理多少条(控制内存与进度粒度)")
def handle(self, *args, **opts):
dry = opts["dry_run"]
limit = opts["limit"]
del_preview = opts["delete_file"]
batch = max(1, opts["batch"])
workers = opts["workers"] or (os.cpu_count() or 4)
qs = Paper.objects.filter(
has_fulltext_pdf=True, doi__startswith="10.1016"
).order_by("id")
total = qs.count()
self.stdout.write(
f"候选(has_fulltext_pdf=True 且 DOI 以 10.1016 开头): {total}; "
f"workers={workers} batch={batch}"
+ (" (dry-run)" if dry else ""))
rows_iter = qs.values(
"id", "doi", "publication_date", "has_fulltext_xml", "fail_reason"
).iterator(chunk_size=batch)
checked = preview = broken = only_pdf = deleted = 0
missing = unknown = 0
with ProcessPoolExecutor(max_workers=workers) as ex:
stop = False
for chunk in _batched(rows_iter, batch):
if stop:
break
paths = [_pdf_path(r["doi"], r["publication_date"]) for r in chunk]
results = ex.map(classify_pdf_file, paths, chunksize=32)
for r, (_path, kind, pages) in zip(chunk, results):
if limit and checked >= limit:
stop = True
break
checked += 1
if kind == "missing":
missing += 1
continue
if kind in ("ok", "unknown", "unreadable"):
if kind != "ok":
unknown += 1
continue
# kind in ('preview', 'broken'): 纠正标记
do_delete = (kind == "broken") or del_preview
only_pdf_case = not r["has_fulltext_xml"]
if kind == "preview":
preview += 1
tag = f"preview {pages}p"
reason = "elsevier_pdf_preview_only"
else:
broken += 1
tag = "broken"
reason = "pdf_broken"
if only_pdf_case:
only_pdf += 1
self.stdout.write(
f"[{tag}]{' (only-pdf)' if only_pdf_case else ''}"
f"{' +rm' if do_delete else ''} {r['doi']} {_path}")
if dry:
continue
fr = r["fail_reason"]
if reason not in (fr or ""):
fr = f"{fr};{reason}" if fr else f";{reason}"
upd = {"has_fulltext_pdf": False, "fail_reason": fr,
"update_time": timezone.now()}
if only_pdf_case:
upd["has_fulltext"] = False
Paper.objects.filter(id=r["id"]).update(**upd)
if do_delete:
try:
os.remove(_path)
deleted += 1
except OSError:
pass
self.stdout.write(
f" 进度 checked={checked}/{total} preview={preview} "
f"broken={broken} deleted={deleted} missing={missing} "
f"unknown={unknown}")
self.stdout.write(self.style.SUCCESS(
f"完成 检查={checked} 预览页={preview} 坏文件={broken} "
f"(无XML全文一并回退has_fulltext={only_pdf}) 删除文件={deleted} "
f"文件缺失={missing} 未知/跳过={unknown}"
+ (" (dry-run, 未写库)" if dry else "")
))