paper_server/scripts/sd_download.py

312 lines
11 KiB
Python

#!/usr/bin/env python
"""独立脚本: 从 ScienceDirect 网页下载排版 PDF。
与 apps.resm 解耦, 独立运行。核心难点是 Cloudflare 人机校验: Playwright 自建
浏览器带自动化指纹会被 Turnstile 识破而死循环("are you a robot"), 因此推荐
连接你手动启动的真实 Chrome(由真人过一次验证), 脚本只负责驱动它下载。
前提: 运行方 IP 在机构订阅网段(ScienceDirect 按 IP 授权)。
依赖: playwright, requests, lxml, pypdf(可选, 用于精确判页数)。
凭证: 默认从项目 config/conf.py 读取 ELSEVIER_API_KEY / ELSEVIER_INST_TOKEN,
也可用 --pii 直接给 PII 跳过取号。
用法(推荐 CDP 模式):
1) 单独起一个带调试端口的 Chrome(独立档案, 不影响日常浏览器):
Windows:
& "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe" \
--remote-debugging-port=9222 --user-data-dir="D:\\chrome-sd-profile"
Linux:
google-chrome --remote-debugging-port=9222 --user-data-dir=/tmp/sd-profile
2) 在该 Chrome 里手动打开任一 ScienceDirect 文章, 亲手过掉 Cloudflare 验证。
3) 运行本脚本(可一次多篇):
python scripts/sd_download.py 10.1016/j.conbuildmat.2026.146897 \
--cdp http://localhost:9222 --out ./sd_pdfs
不加 --cdp 时脚本自行启动浏览器(大概率被 Cloudflare 拦, 仅调试用)。
提示: 批量爬 ScienceDirect 违反 Elsevier 条款且可能导致机构 IP 被封, 仅供少量补抓。
"""
import argparse
import asyncio
import os
import re
import sys
from io import BytesIO
# 项目根入 sys.path, 以便读取 config/conf.py 的凭证
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
_UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36")
_STEALTH_ARGS = [
"--disable-blink-features=AutomationControlled",
"--no-first-run", "--no-default-browser-check",
"--disable-infobars", "--disable-extensions", "--disable-notifications",
]
_CHALLENGE_KW = ("just a moment", "moment", "checking your browser",
"attention required")
# ------------------------------ PII / PDF 工具 ------------------------------
def get_creds():
try:
from config.conf import ELSEVIER_API_KEY, ELSEVIER_INST_TOKEN
return ELSEVIER_API_KEY, ELSEVIER_INST_TOKEN
except Exception as e:
print(f"[warn] 读取 config/conf.py 凭证失败: {e!r}")
return None, None
def fetch_pii(doi):
"""调 Elsevier API(text/xml) 取归一化 PII; 失败返回 None。"""
import requests
from lxml import etree
key, token = get_creds()
if not key:
return None
headers = {"X-ELS-APIKey": key}
if token:
headers["X-ELS-Insttoken"] = token
try:
r = requests.get(f"https://api.elsevier.com/content/article/doi/{doi}",
params={"httpAccept": "text/xml"}, headers=headers,
timeout=(3, 30))
except requests.RequestException as e:
print(f"[warn] 取 PII 请求失败: {e!r}")
return None
if r.status_code != 200:
print(f"[warn] 取 PII 非 200: {r.status_code}")
return None
try:
root = etree.fromstring(r.content)
except Exception:
return None
nodes = root.xpath("//*[local-name()='pii']/text()")
if not nodes:
return None
return re.sub(r"[^A-Za-z0-9]", "", nodes[0])
def pdf_page_count(content: bytes):
"""返回页数, 判不出返回 None。优先 pypdf, 退化字节扫描。"""
try:
import logging
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
from pypdf import PdfReader
return len(PdfReader(BytesIO(content), strict=False).pages)
except ImportError:
pass
except Exception:
return None
try:
counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)]
if counts:
return max(counts)
n = len(re.findall(rb"/Type\s*/Page(?![sR])", content))
if n:
return n
except Exception:
pass
return None
def classify(content: bytes):
"""(kind, pages): broken / preview(1页) / ok(多页) / unknown。"""
if not content or b"%PDF" not in content[:1024]:
return "broken", 0
pages = pdf_page_count(content)
if pages is None:
return "unknown", None
if pages <= 0:
return "broken", pages
return ("preview" if pages == 1 else "ok"), pages
# ------------------------------ 浏览器下载 ------------------------------
async def _wait_challenge_cleared(page, rounds=30, interval=2000):
for _ in range(rounds):
try:
await page.wait_for_timeout(interval)
except Exception:
pass
try:
title = (await page.title()) or ""
except Exception:
continue # 跳转中, 下一轮再看
if title and not any(k in title.lower() for k in _CHALLENGE_KW):
return True
return False
async def _find_pdfft_href(page):
try:
return await page.evaluate(
"() => { const a=document.querySelector('a[href*=\"pdfft\"]');"
" return a ? a.href : null; }")
except Exception:
return None
async def _grab_pdf(page, url, timeout):
# 优先 download 事件
try:
async with page.expect_download(timeout=min(timeout, 45000)) as dl:
try:
await page.goto(url, timeout=timeout)
except Exception:
pass
download = await dl.value
path = await download.path()
if path:
with open(path, "rb") as f:
data = f.read()
if data:
return data
except Exception:
pass
# 退化: 拦截内联 application/pdf 响应
captured = {}
async def on_resp(resp):
try:
if "application/pdf" in resp.headers.get("content-type", ""):
captured["b"] = await resp.body()
except Exception:
pass
page.on("response", on_resp)
try:
try:
await page.goto(url.replace("?download=true", ""), timeout=timeout)
except Exception:
pass
for _ in range(12):
if captured.get("b"):
break
await page.wait_for_timeout(1000)
finally:
try:
page.remove_listener("response", on_resp)
except Exception:
pass
return captured.get("b")
async def download_one(pii, save_path, cdp_url=None, headless=False,
timeout=60000):
"""返回 (ok, msg)。"""
from playwright.async_api import async_playwright
article = f"https://www.sciencedirect.com/science/article/pii/{pii}"
pdf_url = f"{article}/pdfft?download=true"
async with async_playwright() as p:
connected = bool(cdp_url)
if connected:
browser = await p.chromium.connect_over_cdp(cdp_url)
else:
try:
browser = await p.chromium.launch(headless=headless,
channel="chrome",
args=_STEALTH_ARGS)
except Exception:
browser = await p.chromium.launch(headless=headless,
args=_STEALTH_ARGS)
page = None
try:
if connected:
ctx = browser.contexts[0] if browser.contexts else await browser.new_context()
else:
ctx = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=_UA, locale="en-US", accept_downloads=True)
page = await ctx.new_page()
page.set_default_timeout(timeout)
if not connected:
await page.add_init_script(
"Object.defineProperty(navigator,'webdriver',{get:()=>false});")
try:
await page.goto(article, wait_until="domcontentloaded",
timeout=40000)
except Exception:
pass
rounds = 60 if connected else 20
if not await _wait_challenge_cleared(page, rounds=rounds):
return False, "cloudflare_not_cleared"
# reload 拿干净文章页
try:
await page.goto(article, wait_until="domcontentloaded",
timeout=40000)
await page.wait_for_timeout(4000)
except Exception:
pass
href = await _find_pdfft_href(page) or pdf_url
body = await _grab_pdf(page, href, timeout)
if not body:
return False, "no_pdf_captured"
kind, pages = classify(body)
if kind != "ok":
head = body[:160].decode("utf-8", "replace").replace("\n", " ")
return False, f"not_fulltext kind={kind} pages={pages} len={len(body)} head={head!r}"
os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
with open(save_path, "wb") as f:
f.write(body)
return True, f"ok pages={pages} len={len(body)}"
finally:
try:
if connected and page:
await page.close()
await browser.close()
except Exception:
pass
# ------------------------------ CLI ------------------------------
async def _run(args):
os.makedirs(args.out, exist_ok=True)
ok_n = 0
for doi in args.doi:
doi = doi.strip()
pii = args.pii or fetch_pii(doi)
if not pii:
print(f"[FAIL] {doi}: 取不到 PII")
continue
save = os.path.join(args.out, doi.replace("/", "_") + ".pdf")
print(f"[..] {doi} PII={pii} -> {save}")
ok, msg = await download_one(pii, save, cdp_url=args.cdp or None,
headless=args.headless, timeout=args.timeout * 1000)
if ok:
ok_n += 1
print(f"[OK] {doi}: {msg}")
else:
print(f"[FAIL] {doi}: {msg}")
print(f"完成: {ok_n}/{len(args.doi)} 成功")
def main():
ap = argparse.ArgumentParser(description="从 ScienceDirect 网页下载 PDF(独立脚本)")
ap.add_argument("doi", nargs="+", help="一个或多个 DOI")
ap.add_argument("--cdp", default="",
help="连接手动启动的 Chrome, 如 http://localhost:9222(推荐)")
ap.add_argument("--out", default="./sd_pdfs", help="PDF 输出目录")
ap.add_argument("--pii", default="", help="直接指定 PII(仅单篇时用, 跳过 API 取号)")
ap.add_argument("--headless", action="store_true", help="无头(非 CDP 模式, 调试用)")
ap.add_argument("--timeout", type=int, default=60, help="单步超时(秒)")
args = ap.parse_args()
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(_run(args))
if __name__ == "__main__":
main()