paper_server/apps/resm/tasks.py

944 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract, PaperMonitor
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from celery import current_app
from datetime import datetime, timedelta
import random
import re
from .d_oaurl import download_from_url_playwright
import asyncio
import sys
import os
from django.db.models import Q
from django.utils import timezone
from django.conf import settings
# 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露)
config.email = settings.OPENALEX_EMAIL
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = settings.OPENALEX_API_KEY
OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载
ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY
ELSEVIER_HEADERS = {
"X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN,
"X-ELS-APIKey": ELSEVIER_APIKEY,
}
def run_async(coro):
"""
跨平台运行异步任务,解决 Windows 上 asyncio subprocess 问题
"""
if sys.platform == 'win32':
# Windows 上需要使用 ProactorEventLoop 来支持 subprocess
policy = asyncio.WindowsProactorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
else:
# Unix/Linux/Mac 上使用默认方式
return asyncio.run(coro)
# OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用
OPENALEX_SELECT_FIELDS = [
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]
def _apply_keyword_search(pager, keywords: str, search: str):
"""把 keywords / search 过滤条件套到 pyalex 的 pager 上。
keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter。
"""
if keywords:
if "|" in keywords:
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
pager = pager.filter_or(keywords={"id": keywords_list})
elif "," in keywords:
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
pager = pager.filter(keywords={"id": keywords_list})
else:
pager = pager.filter(keywords={"id": [keywords.strip()]})
if search:
pager = pager.search_filter(title_and_abstract=search)
return pager
def _build_paper_from_record(record, keywords: str, search: str) -> Paper:
"""把 OpenAlex 单条 record 映射成未保存的 Paper 实例。"""
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
return paper
def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None,
stop_key: str = None, n_max: int = None) -> int:
"""OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。
base_pager: 已套好 filter 的 pyalex pager(年/日期/收录日期等过滤由调用方决定)。
cache_key: 给定则逐页把「下一页游标」checkpoint 进去(抓完写 "DONE",已 DONE 直接
跳过)→ 真·断点续传;为 None 则不落游标,每次从头扫(适合短窗口增量)。
stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint。
返回本次处理到的 record 条数。
"""
if cache_key:
start = cache.get(cache_key, "*")
if start == "DONE":
return 0
else:
start = "*"
pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate(
per_page=200, n_max=n_max, cursor=start)
seen = 0
for page in pager:
papers = [
_build_paper_from_record(record, keywords, search)
for record in page
if record["doi"] and (record["display_name"] or record["title"])
]
# ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的
Paper.objects.bulk_create(papers, ignore_conflicts=True)
seen += len(papers)
nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾
if cache_key:
cache.set(cache_key, nv if nv is not None else "DONE", timeout=None)
if nv is None or len(page) == 0:
if cache_key:
cache.set(cache_key, "DONE", timeout=None)
break
if stop_key and cache.get(stop_key):
break
return seen
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
if not (keywords or search):
raise Exception("keywords or search must be provided")
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
base = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
base = _apply_keyword_search(base, keywords, search)
_crawl_openalex_query(base, keywords, search, cache_key=cache_key,
stop_key="get_paper_meta_from_openalex_stop")
if cache.get("get_paper_meta_from_openalex_stop", None) is None:
if end_year is None:
end_year = datetime.now().year
if publication_year + 1 <= end_year:
current_app.send_task(
"apps.resm.tasks.get_paper_meta_from_openalex",
kwargs={
"publication_year": publication_year + 1,
"keywords": keywords,
"search": search,
"end_year": end_year
},
countdown=5
)
def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str,
per_combo_max: int = None) -> int:
"""针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在
from_publication_date 之后发表的论文。短窗口扫描,不落游标(每次从头扫)。
注:更贴切的「按收录时间增量」要用 from_created_date / from_updated_date,但这两个
过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的
from_publication_date(发表日期,天级粒度)。代价:论文常在发表后若干天才被 OpenAlex
收录,所以窗口要留得比调度间隔大,否则会漏掉「晚收录」的文。
"""
base = Works().filter(
has_doi=True,
type="article",
from_publication_date=from_publication_date,
)
base = _apply_keyword_search(base, keywords, search)
return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max)
@shared_task(base=CustomTask)
def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None):
"""自动增量更新期刊论文索引(resm_paper)的主任务。
遍历库里已抓过的每个 (o_keywords, o_search) 查询组合,用 OpenAlex 的
from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库。
(理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium,
当前 key 无权限,故用公开可用的 from_publication_date。)
bulk_create(ignore_conflicts=True) 保证不重复。days 默认 30:论文常在发表后数天到
数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏「晚收录」的文。
由 django-celery-beat 每天调度一次。
"""
from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat()
combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct())
before = Paper.objects.count()
n_combos = 0
for o_keywords, o_search in combos:
o_keywords = o_keywords or ""
o_search = o_search or ""
if not o_keywords and not o_search:
continue
n_combos += 1
_fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max)
new_papers = Paper.objects.count() - before
return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}"
BACKFILL_STOP_KEY = "backfill_paper_meta_stop"
@shared_task(base=CustomTask)
def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None,
combo_index: int = 0, combos=None):
"""按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。
本任务只负责策略:抓哪些查询组合、按什么发表日期区间、怎么 chain。
具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query。
顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个,
单个 task 短小、可随时停、从断点续。
停 / 续(应对配额):
- 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain
- 续跑: cache.delete("backfill_paper_meta_stop") 后重新
backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01")
已 DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续。
配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续。
"""
if cache.get(BACKFILL_STOP_KEY):
return "paused (backfill_paper_meta_stop set); clear it then re-run to resume"
if combos is None:
# None 归一成 "" 并去重;排序保证每次续跑组合顺序一致
combos = [list(c) for c in sorted({
(c[0] or "", c[1] or "")
for c in Paper.objects.values_list("o_keywords", "o_search").distinct()
if (c[0] or c[1])
})]
def ckey(kw, search):
return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}"
# 跳过已完成的组合
while combo_index < len(combos):
kw, search = combos[combo_index]
if cache.get(ckey(kw, search)) == "DONE":
combo_index += 1
continue
break
if combo_index >= len(combos):
return f"backfill done: {len(combos)} combos, from {from_publication_date}"
kw, search = combos[combo_index]
cursor_key = ckey(kw, search)
base = Works().filter(
has_doi=True, type="article", from_publication_date=from_publication_date,
)
if to_publication_date:
base = base.filter(to_publication_date=to_publication_date)
base = _apply_keyword_search(base, kw, search)
try:
_crawl_openalex_query(base, kw, search, cache_key=cursor_key,
stop_key=BACKFILL_STOP_KEY)
except Exception as e:
# 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑
cache.set(BACKFILL_STOP_KEY, True, timeout=None)
return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. "
f"clear {BACKFILL_STOP_KEY} then re-run to resume")
if cache.get(cursor_key) != "DONE":
# 被 stop_key 打断,没抓完;游标已保留,等续跑
return f"combo {combo_index}/{len(combos)} paused; cursor saved"
# 该组合已抓完,chain 下一个
current_app.send_task(
"apps.resm.tasks.backfill_paper_meta_from_openalex",
kwargs={
"from_publication_date": from_publication_date,
"to_publication_date": to_publication_date,
"combo_index": combo_index + 1,
"combos": combos,
},
countdown=3,
)
return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next"
@shared_task(base=CustomTask)
def monitor_papers(monitor_id: str = None):
"""期刊 / 关键词监控:遍历启用的 PaperMonitor,按 type 拼 OpenAlex 过滤,用
from_publication_date 拉最近 days 天的最新论文元数据入库。期刊与关键词监控共用本任务。
- journal: value=ISSN -> 过滤 primary_location.source.issn(o_keywords/o_search 留空)
- search : value=英文搜索词 -> title_and_abstract.search(写回 o_search)
- keyword: value=OpenAlex keyword id -> keywords.id(写回 o_keywords)
复用 _crawl_openalex_query(短窗口,cache_key=None 每次从头扫);bulk_create
ignore_conflicts 自动与现有 Paper 去重。monitor_id 给定则只跑该条。由 beat 每天调度。
"""
qs = PaperMonitor.objects.filter(is_active=True)
if monitor_id:
qs = qs.filter(id=monitor_id)
results = []
for m in qs:
from_pub = (timezone.now() - timedelta(days=m.days or 30)).date().isoformat()
base = Works().filter(has_doi=True, type="article", from_publication_date=from_pub)
kw, search = "", ""
if m.type == PaperMonitor.TYPE_JOURNAL:
base = base.filter(primary_location={"source": {"issn": m.value}})
elif m.type == PaperMonitor.TYPE_SEARCH:
search = m.value
base = _apply_keyword_search(base, "", search)
elif m.type == PaperMonitor.TYPE_KEYWORD:
kw = m.value
base = _apply_keyword_search(base, kw, "")
else:
continue
seen = _crawl_openalex_query(base, kw, search)
m.last_run = timezone.now()
m.last_count = seen
m.save(update_fields=["last_run", "last_count", "update_time"])
results.append(f"{m.type}:{m.name or m.value}={seen}")
return "; ".join(results) or "no active monitors"
SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect"
def _build_paper_from_sd_result(r, qs_text: str):
"""把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。
SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex 少。
缺 doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)。
"""
doi = r.get("doi")
title = r.get("title")
if not doi or not title:
return None
pub_date = r.get("publicationDate") # YYYY-MM-DD
year = None
if pub_date:
try:
year = int(str(pub_date)[:4])
except (ValueError, TypeError):
year = None
if year is None:
return None
paper = Paper()
paper.id = idWorker.get_id()
paper.source = "elsevier"
paper.type = "article"
paper.o_search = qs_text
paper.doi = str(doi).replace("https://doi.org/", "")
paper.title = title
paper.publication_date = pub_date if len(str(pub_date)) == 10 else None
paper.publication_year = year
paper.publication_name = r.get("sourceTitle")
authors = r.get("authors") or []
if isinstance(authors, list) and authors:
# 取 order 最小的作者作为第一作者(部分作者可能被省略)
first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1)
if isinstance(first, dict):
paper.first_author = first.get("name")
paper.is_oa = bool(r.get("openAccess"))
return paper
def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100,
max_results: int = 200):
"""调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。
返回 (papers, err_msg)。err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)。
"""
headers = {
**ELSEVIER_HEADERS,
"Content-Type": "application/json",
"Accept": "application/json",
}
papers = []
offset = 0
with requests.Session() as req:
while offset < max_results:
body = {
"qs": qs_text,
"loadedAfter": loaded_after,
"display": {"offset": offset, "show": show, "sortBy": "date"},
}
try:
res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body,
headers=headers, timeout=(3, 30))
except requests.RequestException as e:
return papers, f"elsevier_search_request_error: {e}"
if res.status_code in (401, 403):
return papers, f"elsevier_search_no_entitlement: {res.status_code}"
if res.status_code != 200:
return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}"
results = (res.json() or {}).get("results") or []
# 空结果时 SD 可能返回单个 {"error": ...},一并视为结束
if not results or (len(results) == 1 and results[0].get("error")):
break
for r in results:
paper = _build_paper_from_sd_result(r, qs_text)
if paper is not None:
papers.append(paper)
if len(results) < show:
break
offset += show
return papers, ""
@shared_task(base=CustomTask)
def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200):
"""ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。
遍历库里已有的纯文本查询(o_search,OpenAlex 的 keyword id 无法用于 SD,
故只用 o_search),用 loadedAfter 拉取最近 days 天 Elsevier 新入库的期刊文章。
只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex 对
Elsevier 新刊收录的延迟。由 django-celery-beat 每天调度一次。
"""
loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
qs_texts = [
s.strip() for s in Paper.objects.exclude(o_search__isnull=True)
.exclude(o_search="").values_list("o_search", flat=True).distinct()
if s and s.strip()
]
before = Paper.objects.count()
msgs = []
for qs_text in qs_texts:
papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max)
if papers:
Paper.objects.bulk_create(papers, ignore_conflicts=True)
if err:
msgs.append(f"{qs_text}: {err}")
# key 没 Search 权限,后续 query 也没意义,直接停
if "no_entitlement" in err:
break
new_papers = Paper.objects.count() - before
return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, "
f"since={loaded_after}, errs={msgs}")
# 常用的 User-Agent 列表
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15",
]
def get_random_headers():
"""获取随机的请求头"""
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "application/pdf, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.google.com/",
}
def show_task_run(def_name: str):
return cache.get(def_name, True)
# 自触发抓取链的"常驻保活": 链每跑一轮刷一次 alive 心跳, beat 周期任务据此
# 判断链是否还活着, 死了(重启/崩溃/空闲过期)且未被手动停止时重新点火。
ALIVE_TTL = 120 # alive 过期(秒): 需 > 链最大自重发间隔(60s), 给 beat 留判定窗口
MANAGED_FETCH_TASKS = [
"apps.resm.tasks.get_abstract_from_elsevier",
"apps.resm.tasks.get_pdf_from_openalex",
]
def touch_alive(def_name: str):
"""标记该自触发链仍在运行。"""
cache.set(def_name + ":alive", 1, timeout=ALIVE_TTL)
def is_alive(def_name: str):
return cache.get(def_name + ":alive") is not None
@shared_task(base=CustomTask)
def ensure_fetch_running():
"""beat 周期点火: 链死了且未被手动停止则重新点起, 保证重启后自愈。"""
started = []
for name in MANAGED_FETCH_TASKS:
if show_task_run(name) and not is_alive(name):
current_app.send_task(name)
started.append(name)
return f"ensure_fetch_running started: {started}"
@shared_task(base=CustomTask)
def get_pdf_from_openalex(number_of_task: int =10):
def_name = get_pdf_from_openalex.name
if not show_task_run(def_name):
return "stoped"
touch_alive(def_name)
# 限流退避中: 不打 API, 慢节奏自重发只为维持 alive, 等 exceed 标记自然过期。
# 这样 beat 看到 alive 仍在, 不会另起一条链去撞限流。
if cache.get("openalex_api_exceed"):
current_app.send_task(
"apps.resm.tasks.get_pdf_from_openalex",
kwargs={"number_of_task": number_of_task},
countdown=60,
)
return "openalex_api_exceed, backing off"
count = 0
qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude(
fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task]
if not qs.exists():
return "done" # 不自重发, 交给 beat 轮询拉起
msg = ""
for paper in qs:
if not show_task_run(def_name):
break
paper.fetch("downloading")
msg = save_pdf_from_openalex(paper)
paper.fetch_end()
if paper.has_fulltext_pdf:
count += 1
if cache.get("openalex_api_exceed"):
break
if show_task_run(def_name):
# 刚撞上限流则放慢到 60s(仍维持 alive), 否则紧凑 2s
countdown = 60 if cache.get("openalex_api_exceed") else 2
current_app.send_task(
"apps.resm.tasks.get_pdf_from_openalex",
kwargs={
"number_of_task": number_of_task,
},
countdown=countdown,
)
return msg, count
def _elsevier_fetch_xml(req, paper):
"""同一 DOI 取 XML, 解析摘要/全文标记并落库。
返回 (got_abstract, got_fulltext, err): err 仅在网络异常时为 'request_error'"""
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params={"httpAccept": "text/xml"},
headers=ELSEVIER_HEADERS,
timeout=(3, 15),
)
except requests.RequestException:
return False, False, "request_error"
if res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
return False, False, None
if res.status_code != 200:
paper.save_fail_reason(f"elsevier_response_error: {res.status_code}")
return False, False, None
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
return False, False, None
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd"}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if not abstract:
paper.save_fail_reason("elsevier_abstract_not_found")
return False, False, None
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={"abstract": abstract[0].strip(), "source": "elsevier"},
)
paper.has_abstract = True
paper.has_abstract_xml = True
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()", namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract", "has_abstract_xml",
"has_fulltext", "has_fulltext_xml", "update_time"])
return True, has_fulltext, None
def _pdf_page_count(content: bytes):
"""返回 PDF 页数; 无法确定时返回 None。
优先用 pypdf 精确解析; 未安装或解析异常时退化为字节扫描
(对未压缩对象树有效, Elsevier 的摘要预览页正属此类)。"""
try:
from io import BytesIO
from pypdf import PdfReader
return len(PdfReader(BytesIO(content), strict=False).pages)
except ImportError:
pass
except Exception:
return None
try:
counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)]
if counts:
return max(counts)
n = len(re.findall(rb"/Type\s*/Page(?![sR])", content))
if n:
return n
except Exception:
pass
return None
def _is_elsevier_preview_pdf(content: bytes) -> bool:
"""判断 Elsevier 返回的 PDF 是否为"摘要预览页"
Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回
仅含摘要的 1 页预览 PDF(魔数仍是 %PDF、体积也不小), 全文 XML 却可能正常。
判据: 能确定页数且 <= 1 页。无法确定页数时返回 False(从宽, 不误杀真全文)。"""
pages = _pdf_page_count(content)
return pages is not None and pages <= 1
def _elsevier_fetch_pdf(req, paper):
"""同一 DOI 取 PDF, 成功落库返回 True。"""
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params={"httpAccept": "application/pdf"},
headers=ELSEVIER_HEADERS,
timeout=(3, 15),
)
except requests.RequestException:
return False
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf")
)
if is_pdf and len(res.content) > 1024: # 至少1KB
# 排除"摘要预览页"(1 页): 否则会被误标 has_fulltext_pdf=True
if _is_elsevier_preview_pdf(res.content):
paper.save_fail_reason("elsevier_pdf_preview_only")
return False
paper.save_file_pdf(res.content, save_obj=True)
return True
return False
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True,
pdf_number_of_task:int = 20):
"""Elsevier 单端点合并任务: 同一 DOI 先取 XML(摘要/全文标记), 若有全文则内联
再取一次 PDF; 并补抓历史上已有全文标记但缺 PDF 的论文。原 get_pdf_from_elsevier 已并入。
number_of_task: 阶段1(摘要+内联 PDF)每轮上限; pdf_number_of_task: 阶段2(存量补 PDF)每轮上限。"""
def_name = get_abstract_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
touch_alive(def_name)
# 待抓摘要(并顺带取 PDF)
qs = Paper.objects.filter(has_abstract=False)
if exclude_failed:
qs = qs.filter(fail_reason=None)
else:
qs = qs.exclude(fail_reason__contains="elsevier_")
qs = qs.exclude(fetch_status="downloading"
).filter(doi__startswith="10.1016").order_by("?")
# 存量补 PDF: 已有全文标记但还没下到 PDF
qs_pdf = Paper.objects.filter(
has_fulltext=True, has_fulltext_pdf=False, has_abstract=True
).exclude(fetch_status="downloading"
).exclude(fail_reason__contains="elsevier_pdf_preview_only"
).filter(doi__startswith="10.1016")
if not qs.exists() and not qs_pdf.exists():
return "done" # 不自重发, 交给 beat 轮询拉起
err_msg = ""
count_abs = 0
count_fulltext = 0
count_pdf = 0
with requests.Session() as req:
# 阶段1: 摘要 + 内联 PDF
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
if paper.fetch_status == "downloading":
continue
paper.fetch(status="downloading")
try:
got_abs, got_fulltext, err = _elsevier_fetch_xml(req, paper)
if err == "request_error":
err_msg = "elsevier_request_error"
break
if got_abs:
count_abs += 1
if got_fulltext:
count_fulltext += 1
if _elsevier_fetch_pdf(req, paper):
count_pdf += 1
finally:
paper.fetch_end()
# 阶段2: 存量补 PDF (摘要阶段未因网络异常中断才继续)
if err_msg == "":
for paper in qs_pdf[:pdf_number_of_task]:
if not show_task_run(def_name):
break
if paper.fetch_status == "downloading":
continue
paper.fetch(status="downloading")
try:
if _elsevier_fetch_pdf(req, paper):
count_pdf += 1
finally:
paper.fetch_end()
if show_task_run(def_name):
countdown = 30 if err_msg else 5 # 网络异常时放慢重试
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"number_of_task": number_of_task,
"exclude_failed": exclude_failed,
"pdf_number_of_task": pdf_number_of_task,
},
countdown=countdown,
)
return f'{err_msg}, abs {count_abs}, fulltext {count_fulltext}, pdf {count_pdf}'
def get_actual_running_count():
"""获取实际在下载的任务数"""
return Paper.objects.filter(fetch_status='downloading').count()
def can_send_more(max_running):
return get_actual_running_count() < max_running
@shared_task(base=CustomTask)
def send_download_fulltext_task(number_of_task=100):
# 只排除"本下载链路已尝试过"的论文(download_pdf 终态会打 download_pdf_tried 标记),
# 不再用 fail_reason=None —— 否则被 openalex 保活链失败标记蹭上 fail_reason 的论文会被
# 永久遮蔽, 其 oa_url/elsevier/scihub 兜底路径永远不会被尝试。
qs = Paper.objects.filter(has_fulltext=False, is_oa=True).exclude(
fetch_status='downloading'
).exclude(fail_reason__contains="download_pdf_tried")
if not qs.exists():
return "done"
qs0 = qs.order_by("?")
# 分批发送任务,控制并发数量,避免过多请求
task_count = 0
for paper in qs0[:number_of_task]:
if not can_send_more(number_of_task):
break
# 使用 countdown 错开请求时间,避免过多并发
countdown = task_count * 1 # 每个任务间隔1秒
current_app.send_task(
"apps.resm.tasks.download_pdf",
kwargs={
"paper_id": paper.id,
},
countdown=countdown,
)
task_count += 1
return f"sent {task_count} download_pdf tasks"
@shared_task(base=CustomTask)
def release_working_paper(minutes=10):
qs = Paper.objects.filter(fetch_status="downloading")
count = 0
for paper in qs:
if paper.update_time < timezone.now() - timedelta(minutes=minutes):
paper.fetch_end()
count += 1
return f"release {count} papers"
@shared_task(base=CustomTask)
def download_pdf(paper_id):
"""
下载单个论文的PDF
"""
try:
paper = Paper.objects.get(id=paper_id)
if paper.fetch_status == "downloading":
return
paper.fetch("downloading")
msg = "no_method_to_get_pdf"
current_from = ""
if paper.oa_url:
if "https://doi.org/10.1016" in paper.oa_url:
current_from = "elsevier"
msg = save_pdf_from_elsevier(paper)
else:
current_from = "oa_url"
msg = save_pdf_from_oa_url(paper)
if paper.has_fulltext_pdf is False and paper.publication_year <= 2021:
current_from = "scihub"
msg = save_pdf_from_scihub(paper)
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
# current_from = "openalex"
# msg = save_pdf_from_openalex(paper)
if paper.has_fulltext_pdf is False:
# 终态标记: 无论该论文之前是否已被 openalex 保活链写过 fail_reason, 这里都追加
# 一条带 download_pdf_tried 的终态, 供 send_download_fulltext_task 据此排除。
# 既防本下载链路(oa_url/elsevier/scihub)对同一篇无限重试, 也避免 openalex 链
# 留下的 fail_reason 把本链路的首次尝试给遮蔽掉。
paper.save_fail_reason(f"download_pdf_tried:{msg}")
return msg, current_from
finally:
paper.fetch_end()
def save_pdf_from_oa_url(paper: Paper):
from .d_oaurl import download_pdf_with_curl_cffi, download_from_url_playwright
# 策略1: 直接请求
try:
headers = get_random_headers()
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
except requests.RequestException as e:
paper.save_fail_reason("oa_url_request_error")
return f"oa_url_request_error: {str(e)}"
if res.status_code in [200, 201, 202]:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf") or
res.headers.get("content-type", "") == "application/octet-stream"
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content, save_obj=True)
return "success"
else:
paper.save_fail_reason("oa_url_not_pdf")
return "oa_url_not_pdf"
# 策略2: curl-cffi处理 Cloudflare JS 挑战)
paper_path = paper.init_paper_path("pdf")
is_ok, err_msg = download_pdf_with_curl_cffi(paper.oa_url, paper_path)
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
return "success"
# 策略3: Playwright最终回退
is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
return "success"
paper.save_fail_reason(f"oa_url_all_methods_failed: {err_msg}")
return f"oa_url_all_methods_failed: {err_msg}"
def save_pdf_from_openalex(paper:Paper):
if cache.get("openalex_api_exceed"):
return "find cache Insufficient credits"
# 尝试openalex下载
try:
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
params={
"api_key": OPENALEX_KEY
},
timeout=(3, 15))
except requests.RequestException as e:
return f"openalex_pdf_error: {str(e)}"
if res.status_code == 200:
paper.save_file_pdf(res.content, save_obj=True)
return "success"
elif res.status_code == 429:
try:
message = res.json().get("message", "")
except ValueError:
message = res.text
if "Insufficient credits" in message:
# 额度耗尽: 退避 1 小时
cache.set("openalex_api_exceed", True, timeout=3600)
return "openalex_pdf_error: Insufficient credits"
# 普通限流(请求过频): 短退避 2 分钟, 避免立刻重试再撞 429
cache.set("openalex_api_exceed", True, timeout=120)
return f"openalex_pdf_error: 429 {message[:100]}"
elif res.status_code == 404:
paper.save_fail_reason("openalex_pdf_not_found")
return "openalex_pdf_error: 404"
else:
paper.save_fail_reason("openalex_pdf_error")
return f"openalex_pdf_error: {res.status_code} {res.text}"
def save_pdf_from_elsevier(paper:Paper):
params = {
"httpAccept": "application/pdf",
}
try:
res = requests.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
headers=ELSEVIER_HEADERS,
timeout=(3, 15)
)
except requests.RequestException as e:
return f"elsevier_request_error: {str(e)}"
if res.status_code == 200:
if _is_elsevier_preview_pdf(res.content):
paper.save_fail_reason("elsevier_pdf_preview_only")
return "elsevier_pdf_preview_only"
paper.save_file_pdf(res.content, save_obj=True)
return "success"
else:
return f"elsevier_status_error: {res.status_code} {res.text}"
def save_pdf_from_scihub(paper:Paper):
from .d_scihub import download_paper_by_doi
is_ok, err_msg = download_paper_by_doi(paper.doi, paper.init_paper_path("pdf"))
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf"])
return "success"
else:
paper.save_fail_reason(err_msg)
return err_msg
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true