979 lines
39 KiB
Python
979 lines
39 KiB
Python
# Create your tasks here
|
||
from __future__ import absolute_import, unicode_literals
|
||
from apps.utils.tasks import CustomTask
|
||
from celery import shared_task
|
||
from pyalex import Works, config
|
||
from apps.resm.models import Paper, PaperAbstract, PaperMonitor
|
||
from apps.utils.snowflake import idWorker
|
||
from django.core.cache import cache
|
||
import requests
|
||
from lxml import etree
|
||
from celery import current_app
|
||
from datetime import datetime, timedelta
|
||
import random
|
||
import re
|
||
from .d_oaurl import download_from_url_playwright
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
from django.db.models import Q
|
||
from django.utils import timezone
|
||
from django.conf import settings
|
||
|
||
# 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露)
|
||
config.email = settings.OPENALEX_EMAIL
|
||
config.max_retries = 0
|
||
config.retry_backoff_factor = 0.1
|
||
config.retry_http_codes = [429, 500, 503]
|
||
config.api_key = settings.OPENALEX_API_KEY
|
||
OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载
|
||
|
||
ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY
|
||
ELSEVIER_HEADERS = {
|
||
"X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN,
|
||
"X-ELS-APIKey": ELSEVIER_APIKEY,
|
||
}
|
||
|
||
|
||
def run_async(coro):
|
||
"""
|
||
跨平台运行异步任务,解决 Windows 上 asyncio subprocess 问题
|
||
"""
|
||
if sys.platform == 'win32':
|
||
# Windows 上需要使用 ProactorEventLoop 来支持 subprocess
|
||
policy = asyncio.WindowsProactorEventLoopPolicy()
|
||
asyncio.set_event_loop_policy(policy)
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
return loop.run_until_complete(coro)
|
||
finally:
|
||
loop.close()
|
||
else:
|
||
# Unix/Linux/Mac 上使用默认方式
|
||
return asyncio.run(coro)
|
||
|
||
|
||
# OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用
|
||
OPENALEX_SELECT_FIELDS = [
|
||
"id", "doi", "title", "publication_date",
|
||
"open_access", "authorships", "primary_location", "publication_year",
|
||
"display_name", "content_urls"
|
||
]
|
||
|
||
|
||
def _apply_keyword_search(pager, keywords: str, search: str):
|
||
"""把 keywords / search 过滤条件套到 pyalex 的 pager 上。
|
||
keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter。
|
||
"""
|
||
if keywords:
|
||
if "|" in keywords:
|
||
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
|
||
pager = pager.filter_or(keywords={"id": keywords_list})
|
||
elif "," in keywords:
|
||
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
|
||
pager = pager.filter(keywords={"id": keywords_list})
|
||
else:
|
||
pager = pager.filter(keywords={"id": [keywords.strip()]})
|
||
if search:
|
||
pager = pager.search_filter(title_and_abstract=search)
|
||
return pager
|
||
|
||
|
||
def _build_paper_from_record(record, keywords: str, search: str) -> Paper:
|
||
"""把 OpenAlex 单条 record 映射成未保存的 Paper 实例。"""
|
||
paper = Paper()
|
||
paper.id = idWorker.get_id()
|
||
paper.o_keywords = keywords
|
||
paper.o_search = search
|
||
paper.source = "openalex"
|
||
paper.type = "article"
|
||
paper.openalex_id = record["id"].split("/")[-1]
|
||
paper.doi = record["doi"].replace("https://doi.org/", "")
|
||
paper.title = record["display_name"]
|
||
paper.publication_date = record["publication_date"]
|
||
paper.publication_year = record["publication_year"]
|
||
if record["open_access"]:
|
||
paper.is_oa = record["open_access"]["is_oa"]
|
||
paper.oa_url = record["open_access"]["oa_url"]
|
||
if record["authorships"]:
|
||
paper.first_author = record["authorships"][0]["author"]["display_name"]
|
||
if record["authorships"][0]["institutions"]:
|
||
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
|
||
if record["primary_location"] and record["primary_location"]["source"]:
|
||
paper.publication_name = record["primary_location"]["source"]["display_name"]
|
||
return paper
|
||
|
||
|
||
def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None,
|
||
stop_key: str = None, n_max: int = None) -> int:
|
||
"""OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。
|
||
|
||
base_pager: 已套好 filter 的 pyalex pager(年/日期/收录日期等过滤由调用方决定)。
|
||
cache_key: 给定则逐页把「下一页游标」checkpoint 进去(抓完写 "DONE",已 DONE 直接
|
||
跳过)→ 真·断点续传;为 None 则不落游标,每次从头扫(适合短窗口增量)。
|
||
stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint。
|
||
返回本次处理到的 record 条数。
|
||
"""
|
||
if cache_key:
|
||
start = cache.get(cache_key, "*")
|
||
if start == "DONE":
|
||
return 0
|
||
else:
|
||
start = "*"
|
||
pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate(
|
||
per_page=200, n_max=n_max, cursor=start)
|
||
seen = 0
|
||
for page in pager:
|
||
papers = [
|
||
_build_paper_from_record(record, keywords, search)
|
||
for record in page
|
||
if record["doi"] and (record["display_name"] or record["title"])
|
||
]
|
||
# ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的
|
||
Paper.objects.bulk_create(papers, ignore_conflicts=True)
|
||
seen += len(papers)
|
||
nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾
|
||
if cache_key:
|
||
cache.set(cache_key, nv if nv is not None else "DONE", timeout=None)
|
||
if nv is None or len(page) == 0:
|
||
if cache_key:
|
||
cache.set(cache_key, "DONE", timeout=None)
|
||
break
|
||
if stop_key and cache.get(stop_key):
|
||
break
|
||
return seen
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
|
||
if not (keywords or search):
|
||
raise Exception("keywords or search must be provided")
|
||
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
|
||
base = Works().filter(
|
||
publication_year=publication_year,
|
||
has_doi=True,
|
||
type="article"
|
||
)
|
||
base = _apply_keyword_search(base, keywords, search)
|
||
_crawl_openalex_query(base, keywords, search, cache_key=cache_key,
|
||
stop_key="get_paper_meta_from_openalex_stop")
|
||
if cache.get("get_paper_meta_from_openalex_stop", None) is None:
|
||
if end_year is None:
|
||
end_year = datetime.now().year
|
||
if publication_year + 1 <= end_year:
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_paper_meta_from_openalex",
|
||
kwargs={
|
||
"publication_year": publication_year + 1,
|
||
"keywords": keywords,
|
||
"search": search,
|
||
"end_year": end_year
|
||
},
|
||
countdown=5
|
||
)
|
||
|
||
|
||
def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str,
|
||
per_combo_max: int = None) -> int:
|
||
"""针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在
|
||
from_publication_date 之后发表的论文。短窗口扫描,不落游标(每次从头扫)。
|
||
|
||
注:更贴切的「按收录时间增量」要用 from_created_date / from_updated_date,但这两个
|
||
过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的
|
||
from_publication_date(发表日期,天级粒度)。代价:论文常在发表后若干天才被 OpenAlex
|
||
收录,所以窗口要留得比调度间隔大,否则会漏掉「晚收录」的文。
|
||
"""
|
||
base = Works().filter(
|
||
has_doi=True,
|
||
type="article",
|
||
from_publication_date=from_publication_date,
|
||
)
|
||
base = _apply_keyword_search(base, keywords, search)
|
||
return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max)
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None):
|
||
"""自动增量更新期刊论文索引(resm_paper)的主任务。
|
||
|
||
遍历库里已抓过的每个 (o_keywords, o_search) 查询组合,用 OpenAlex 的
|
||
from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库。
|
||
(理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium,
|
||
当前 key 无权限,故用公开可用的 from_publication_date。)
|
||
bulk_create(ignore_conflicts=True) 保证不重复。days 默认 30:论文常在发表后数天到
|
||
数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏「晚收录」的文。
|
||
由 django-celery-beat 每天调度一次。
|
||
"""
|
||
from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat()
|
||
combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct())
|
||
before = Paper.objects.count()
|
||
n_combos = 0
|
||
for o_keywords, o_search in combos:
|
||
o_keywords = o_keywords or ""
|
||
o_search = o_search or ""
|
||
if not o_keywords and not o_search:
|
||
continue
|
||
n_combos += 1
|
||
_fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max)
|
||
new_papers = Paper.objects.count() - before
|
||
return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}"
|
||
|
||
|
||
BACKFILL_STOP_KEY = "backfill_paper_meta_stop"
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None,
|
||
combo_index: int = 0, combos=None):
|
||
"""按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。
|
||
|
||
本任务只负责策略:抓哪些查询组合、按什么发表日期区间、怎么 chain。
|
||
具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query。
|
||
顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个,
|
||
单个 task 短小、可随时停、从断点续。
|
||
|
||
停 / 续(应对配额):
|
||
- 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain
|
||
- 续跑: cache.delete("backfill_paper_meta_stop") 后重新
|
||
backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01")
|
||
已 DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续。
|
||
配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续。
|
||
"""
|
||
if cache.get(BACKFILL_STOP_KEY):
|
||
return "paused (backfill_paper_meta_stop set); clear it then re-run to resume"
|
||
|
||
if combos is None:
|
||
# None 归一成 "" 并去重;排序保证每次续跑组合顺序一致
|
||
combos = [list(c) for c in sorted({
|
||
(c[0] or "", c[1] or "")
|
||
for c in Paper.objects.values_list("o_keywords", "o_search").distinct()
|
||
if (c[0] or c[1])
|
||
})]
|
||
|
||
def ckey(kw, search):
|
||
return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}"
|
||
|
||
# 跳过已完成的组合
|
||
while combo_index < len(combos):
|
||
kw, search = combos[combo_index]
|
||
if cache.get(ckey(kw, search)) == "DONE":
|
||
combo_index += 1
|
||
continue
|
||
break
|
||
if combo_index >= len(combos):
|
||
return f"backfill done: {len(combos)} combos, from {from_publication_date}"
|
||
|
||
kw, search = combos[combo_index]
|
||
cursor_key = ckey(kw, search)
|
||
base = Works().filter(
|
||
has_doi=True, type="article", from_publication_date=from_publication_date,
|
||
)
|
||
if to_publication_date:
|
||
base = base.filter(to_publication_date=to_publication_date)
|
||
base = _apply_keyword_search(base, kw, search)
|
||
|
||
try:
|
||
_crawl_openalex_query(base, kw, search, cache_key=cursor_key,
|
||
stop_key=BACKFILL_STOP_KEY)
|
||
except Exception as e:
|
||
# 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑
|
||
cache.set(BACKFILL_STOP_KEY, True, timeout=None)
|
||
return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. "
|
||
f"clear {BACKFILL_STOP_KEY} then re-run to resume")
|
||
|
||
if cache.get(cursor_key) != "DONE":
|
||
# 被 stop_key 打断,没抓完;游标已保留,等续跑
|
||
return f"combo {combo_index}/{len(combos)} paused; cursor saved"
|
||
|
||
# 该组合已抓完,chain 下一个
|
||
current_app.send_task(
|
||
"apps.resm.tasks.backfill_paper_meta_from_openalex",
|
||
kwargs={
|
||
"from_publication_date": from_publication_date,
|
||
"to_publication_date": to_publication_date,
|
||
"combo_index": combo_index + 1,
|
||
"combos": combos,
|
||
},
|
||
countdown=3,
|
||
)
|
||
return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next"
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def monitor_papers(monitor_id: str = None):
|
||
"""期刊 / 关键词监控:遍历启用的 PaperMonitor,按 type 拼 OpenAlex 过滤,用
|
||
from_publication_date 拉最近 days 天的最新论文元数据入库。期刊与关键词监控共用本任务。
|
||
|
||
- journal: value=ISSN -> 过滤 primary_location.source.issn(o_keywords/o_search 留空)
|
||
- search : value=英文搜索词 -> title_and_abstract.search(写回 o_search)
|
||
- keyword: value=OpenAlex keyword id -> keywords.id(写回 o_keywords)
|
||
复用 _crawl_openalex_query(短窗口,cache_key=None 每次从头扫);bulk_create
|
||
ignore_conflicts 自动与现有 Paper 去重。monitor_id 给定则只跑该条。由 beat 每天调度。
|
||
"""
|
||
qs = PaperMonitor.objects.filter(is_active=True)
|
||
if monitor_id:
|
||
qs = qs.filter(id=monitor_id)
|
||
results = []
|
||
for m in qs:
|
||
from_pub = (timezone.now() - timedelta(days=m.days or 30)).date().isoformat()
|
||
base = Works().filter(has_doi=True, type="article", from_publication_date=from_pub)
|
||
kw, search = "", ""
|
||
if m.type == PaperMonitor.TYPE_JOURNAL:
|
||
base = base.filter(primary_location={"source": {"issn": m.value}})
|
||
elif m.type == PaperMonitor.TYPE_SEARCH:
|
||
search = m.value
|
||
base = _apply_keyword_search(base, "", search)
|
||
elif m.type == PaperMonitor.TYPE_KEYWORD:
|
||
kw = m.value
|
||
base = _apply_keyword_search(base, kw, "")
|
||
else:
|
||
continue
|
||
seen = _crawl_openalex_query(base, kw, search)
|
||
m.last_run = timezone.now()
|
||
m.last_count = seen
|
||
m.save(update_fields=["last_run", "last_count", "update_time"])
|
||
results.append(f"{m.type}:{m.name or m.value}={seen}")
|
||
return "; ".join(results) or "no active monitors"
|
||
|
||
|
||
SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect"
|
||
|
||
|
||
def _build_paper_from_sd_result(r, qs_text: str):
|
||
"""把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。
|
||
SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex 少。
|
||
缺 doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)。
|
||
"""
|
||
doi = r.get("doi")
|
||
title = r.get("title")
|
||
if not doi or not title:
|
||
return None
|
||
pub_date = r.get("publicationDate") # YYYY-MM-DD
|
||
year = None
|
||
if pub_date:
|
||
try:
|
||
year = int(str(pub_date)[:4])
|
||
except (ValueError, TypeError):
|
||
year = None
|
||
if year is None:
|
||
return None
|
||
|
||
paper = Paper()
|
||
paper.id = idWorker.get_id()
|
||
paper.source = "elsevier"
|
||
paper.type = "article"
|
||
paper.o_search = qs_text
|
||
paper.doi = str(doi).replace("https://doi.org/", "")
|
||
paper.title = title
|
||
paper.publication_date = pub_date if len(str(pub_date)) == 10 else None
|
||
paper.publication_year = year
|
||
paper.publication_name = r.get("sourceTitle")
|
||
authors = r.get("authors") or []
|
||
if isinstance(authors, list) and authors:
|
||
# 取 order 最小的作者作为第一作者(部分作者可能被省略)
|
||
first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1)
|
||
if isinstance(first, dict):
|
||
paper.first_author = first.get("name")
|
||
paper.is_oa = bool(r.get("openAccess"))
|
||
return paper
|
||
|
||
|
||
def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100,
|
||
max_results: int = 200):
|
||
"""调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。
|
||
返回 (papers, err_msg)。err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)。
|
||
"""
|
||
headers = {
|
||
**ELSEVIER_HEADERS,
|
||
"Content-Type": "application/json",
|
||
"Accept": "application/json",
|
||
}
|
||
papers = []
|
||
offset = 0
|
||
with requests.Session() as req:
|
||
while offset < max_results:
|
||
body = {
|
||
"qs": qs_text,
|
||
"loadedAfter": loaded_after,
|
||
"display": {"offset": offset, "show": show, "sortBy": "date"},
|
||
}
|
||
try:
|
||
res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body,
|
||
headers=headers, timeout=(3, 30))
|
||
except requests.RequestException as e:
|
||
return papers, f"elsevier_search_request_error: {e}"
|
||
if res.status_code in (401, 403):
|
||
return papers, f"elsevier_search_no_entitlement: {res.status_code}"
|
||
if res.status_code != 200:
|
||
return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}"
|
||
results = (res.json() or {}).get("results") or []
|
||
# 空结果时 SD 可能返回单个 {"error": ...},一并视为结束
|
||
if not results or (len(results) == 1 and results[0].get("error")):
|
||
break
|
||
for r in results:
|
||
paper = _build_paper_from_sd_result(r, qs_text)
|
||
if paper is not None:
|
||
papers.append(paper)
|
||
if len(results) < show:
|
||
break
|
||
offset += show
|
||
return papers, ""
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200):
|
||
"""ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。
|
||
|
||
遍历库里已有的纯文本查询(o_search,OpenAlex 的 keyword id 无法用于 SD,
|
||
故只用 o_search),用 loadedAfter 拉取最近 days 天 Elsevier 新入库的期刊文章。
|
||
只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex 对
|
||
Elsevier 新刊收录的延迟。由 django-celery-beat 每天调度一次。
|
||
"""
|
||
loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
qs_texts = [
|
||
s.strip() for s in Paper.objects.exclude(o_search__isnull=True)
|
||
.exclude(o_search="").values_list("o_search", flat=True).distinct()
|
||
if s and s.strip()
|
||
]
|
||
before = Paper.objects.count()
|
||
msgs = []
|
||
for qs_text in qs_texts:
|
||
papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max)
|
||
if papers:
|
||
Paper.objects.bulk_create(papers, ignore_conflicts=True)
|
||
if err:
|
||
msgs.append(f"{qs_text}: {err}")
|
||
# key 没 Search 权限,后续 query 也没意义,直接停
|
||
if "no_entitlement" in err:
|
||
break
|
||
new_papers = Paper.objects.count() - before
|
||
return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, "
|
||
f"since={loaded_after}, errs={msgs}")
|
||
|
||
|
||
# 常用的 User-Agent 列表
|
||
USER_AGENTS = [
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15",
|
||
]
|
||
|
||
def get_random_headers():
|
||
"""获取随机的请求头"""
|
||
return {
|
||
"User-Agent": random.choice(USER_AGENTS),
|
||
"Accept": "application/pdf, */*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
"Referer": "https://www.google.com/",
|
||
}
|
||
|
||
def show_task_run(def_name: str):
|
||
return cache.get(def_name, True)
|
||
|
||
# 自触发抓取链的"常驻保活": 链每跑一轮刷一次 alive 心跳, beat 周期任务据此
|
||
# 判断链是否还活着, 死了(重启/崩溃/空闲过期)且未被手动停止时重新点火。
|
||
ALIVE_TTL = 120 # alive 过期(秒): 需 > 链最大自重发间隔(60s), 给 beat 留判定窗口
|
||
MANAGED_FETCH_TASKS = [
|
||
"apps.resm.tasks.get_abstract_from_elsevier",
|
||
"apps.resm.tasks.get_pdf_from_openalex",
|
||
]
|
||
|
||
def touch_alive(def_name: str):
|
||
"""标记该自触发链仍在运行。"""
|
||
cache.set(def_name + ":alive", 1, timeout=ALIVE_TTL)
|
||
|
||
def is_alive(def_name: str):
|
||
return cache.get(def_name + ":alive") is not None
|
||
|
||
@shared_task(base=CustomTask)
|
||
def ensure_fetch_running():
|
||
"""beat 周期点火: 链死了且未被手动停止则重新点起, 保证重启后自愈。"""
|
||
started = []
|
||
for name in MANAGED_FETCH_TASKS:
|
||
if show_task_run(name) and not is_alive(name):
|
||
current_app.send_task(name)
|
||
started.append(name)
|
||
return f"ensure_fetch_running started: {started}"
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_pdf_from_openalex(number_of_task: int =10):
|
||
def_name = get_pdf_from_openalex.name
|
||
if not show_task_run(def_name):
|
||
return "stoped"
|
||
touch_alive(def_name)
|
||
|
||
# 限流退避中: 不打 API, 慢节奏自重发只为维持 alive, 等 exceed 标记自然过期。
|
||
# 这样 beat 看到 alive 仍在, 不会另起一条链去撞限流。
|
||
if cache.get("openalex_api_exceed"):
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_pdf_from_openalex",
|
||
kwargs={"number_of_task": number_of_task},
|
||
countdown=60,
|
||
)
|
||
return "openalex_api_exceed, backing off"
|
||
|
||
count = 0
|
||
qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude(
|
||
fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task]
|
||
if not qs.exists():
|
||
return "done" # 不自重发, 交给 beat 轮询拉起
|
||
msg = ""
|
||
for paper in qs:
|
||
if not show_task_run(def_name):
|
||
break
|
||
paper.fetch("downloading")
|
||
msg = save_pdf_from_openalex(paper)
|
||
paper.fetch_end()
|
||
if paper.has_fulltext_pdf:
|
||
count += 1
|
||
if cache.get("openalex_api_exceed"):
|
||
break
|
||
if show_task_run(def_name):
|
||
# 刚撞上限流则放慢到 60s(仍维持 alive), 否则紧凑 2s
|
||
countdown = 60 if cache.get("openalex_api_exceed") else 2
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_pdf_from_openalex",
|
||
kwargs={
|
||
"number_of_task": number_of_task,
|
||
},
|
||
countdown=countdown,
|
||
)
|
||
return msg, count
|
||
|
||
def _elsevier_fetch_xml(req, paper):
|
||
"""同一 DOI 取 XML, 解析摘要/全文标记并落库。
|
||
返回 (got_abstract, got_fulltext, err): err 仅在网络异常时为 'request_error'。"""
|
||
try:
|
||
res = req.get(
|
||
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||
params={"httpAccept": "text/xml"},
|
||
headers=ELSEVIER_HEADERS,
|
||
timeout=(3, 15),
|
||
)
|
||
except requests.RequestException:
|
||
return False, False, "request_error"
|
||
|
||
if res.status_code == 404:
|
||
paper.save_fail_reason("elsevier_doi_not_found")
|
||
return False, False, None
|
||
if res.status_code != 200:
|
||
paper.save_fail_reason(f"elsevier_response_error: {res.status_code}")
|
||
return False, False, None
|
||
|
||
xml_str = res.text
|
||
try:
|
||
root = etree.fromstring(xml_str.encode("utf-8"))
|
||
except etree.XMLSyntaxError:
|
||
paper.save_fail_reason("elsevier_xml_error")
|
||
return False, False, None
|
||
|
||
ns = {"dc": "http://purl.org/dc/elements/1.1/",
|
||
"ce": "http://www.elsevier.com/xml/common/dtd",
|
||
"xocs": "http://www.elsevier.com/xml/xocs/dtd"}
|
||
abstract = root.xpath("//dc:description/text()", namespaces=ns)
|
||
if not abstract:
|
||
paper.save_fail_reason("elsevier_abstract_not_found")
|
||
return False, False, None
|
||
|
||
PaperAbstract.objects.update_or_create(
|
||
paper=paper,
|
||
defaults={"abstract": abstract[0].strip(), "source": "elsevier"},
|
||
)
|
||
paper.has_abstract = True
|
||
paper.has_abstract_xml = True
|
||
|
||
paras = root.xpath("//ce:para", namespaces=ns)
|
||
has_fulltext = len(paras) > 0
|
||
if has_fulltext is False:
|
||
rawtexts = root.xpath("//xocs:rawtext/text()", namespaces=ns)
|
||
if rawtexts and len(rawtexts[0].strip()) > 2000:
|
||
has_fulltext = True
|
||
if has_fulltext:
|
||
paper.has_fulltext = True
|
||
paper.has_fulltext_xml = True
|
||
|
||
paper.save_file_xml(xml_str)
|
||
paper.save(update_fields=["has_abstract", "has_abstract_xml",
|
||
"has_fulltext", "has_fulltext_xml", "update_time"])
|
||
return True, has_fulltext, None
|
||
|
||
|
||
def _pdf_page_count(content: bytes):
|
||
"""返回 PDF 页数; 无法确定时返回 None。
|
||
|
||
优先用 pypdf 精确解析; 未安装或解析异常时退化为字节扫描
|
||
(对未压缩对象树有效, Elsevier 的摘要预览页正属此类)。"""
|
||
try:
|
||
from io import BytesIO
|
||
import logging
|
||
# 坏 PDF 会让 pypdf 刷大量恢复日志(incorrect header / Cannot find /Root 等),
|
||
# 这里只关心页数, 静音其 logger 避免污染输出。
|
||
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
|
||
from pypdf import PdfReader
|
||
return len(PdfReader(BytesIO(content), strict=False).pages)
|
||
except ImportError:
|
||
pass
|
||
except Exception:
|
||
return None
|
||
try:
|
||
counts = [int(m) for m in re.findall(rb"/Count\s+(\d+)", content)]
|
||
if counts:
|
||
return max(counts)
|
||
n = len(re.findall(rb"/Type\s*/Page(?![sR])", content))
|
||
if n:
|
||
return n
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def _is_elsevier_preview_pdf(content: bytes) -> bool:
|
||
"""判断 Elsevier 返回的 PDF 是否为"摘要预览页"。
|
||
|
||
Elsevier Article API 对未授权 / in-press 文章, application/pdf 端点会返回
|
||
仅含摘要的 1 页预览 PDF(魔数仍是 %PDF、体积也不小), 全文 XML 却可能正常。
|
||
判据: 能确定页数且 <= 1 页。无法确定页数时返回 False(从宽, 不误杀真全文)。"""
|
||
pages = _pdf_page_count(content)
|
||
return pages is not None and pages <= 1
|
||
|
||
|
||
def _inspect_pdf(content: bytes):
|
||
"""对历史落库的 PDF 文件分类, 返回 (kind, pages)。
|
||
|
||
kind:
|
||
'broken' - 非 PDF(魔数不符)或 pypdf 解析直接失败 -> 可安全删除重抓
|
||
'preview' - 1 页摘要预览页
|
||
'ok' - 多页, 视为真全文, 不处理
|
||
'unknown' - 魔数正常但页数判不出(通常因未装 pypdf) -> 不处理, 绝不当坏文件
|
||
pages: 页数; 无法确定为 None。"""
|
||
if not content or b"%PDF" not in content[:1024]:
|
||
return "broken", 0
|
||
try:
|
||
from io import BytesIO
|
||
import logging
|
||
logging.getLogger("pypdf").setLevel(logging.CRITICAL)
|
||
from pypdf import PdfReader
|
||
except ImportError:
|
||
# 没装 pypdf: 只能靠字节扫描, 判不出就 unknown(从宽, 不误判为坏)
|
||
pages = _pdf_page_count(content)
|
||
if pages is None:
|
||
return "unknown", None
|
||
return ("preview" if pages <= 1 else "ok"), pages
|
||
try:
|
||
pages = len(PdfReader(BytesIO(content), strict=False).pages)
|
||
except Exception:
|
||
return "broken", None
|
||
if pages <= 0:
|
||
return "broken", pages
|
||
return ("preview" if pages == 1 else "ok"), pages
|
||
|
||
|
||
def _elsevier_fetch_pdf(req, paper):
|
||
"""同一 DOI 取 PDF, 成功落库返回 True。"""
|
||
try:
|
||
res = req.get(
|
||
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||
params={"httpAccept": "application/pdf"},
|
||
headers=ELSEVIER_HEADERS,
|
||
timeout=(3, 15),
|
||
)
|
||
except requests.RequestException:
|
||
return False
|
||
if res.status_code == 200:
|
||
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
|
||
is_pdf = (
|
||
res.content.startswith(b'%PDF') or
|
||
res.headers.get("content-type", "").startswith("application/pdf")
|
||
)
|
||
if is_pdf and len(res.content) > 1024: # 至少1KB
|
||
# 排除"摘要预览页"(1 页): 否则会被误标 has_fulltext_pdf=True
|
||
if _is_elsevier_preview_pdf(res.content):
|
||
paper.save_fail_reason("elsevier_pdf_preview_only")
|
||
return False
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return True
|
||
return False
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True,
|
||
pdf_number_of_task:int = 20):
|
||
"""Elsevier 单端点合并任务: 同一 DOI 先取 XML(摘要/全文标记), 若有全文则内联
|
||
再取一次 PDF; 并补抓历史上已有全文标记但缺 PDF 的论文。原 get_pdf_from_elsevier 已并入。
|
||
number_of_task: 阶段1(摘要+内联 PDF)每轮上限; pdf_number_of_task: 阶段2(存量补 PDF)每轮上限。"""
|
||
def_name = get_abstract_from_elsevier.name
|
||
if not show_task_run(def_name):
|
||
return "stoped"
|
||
touch_alive(def_name)
|
||
|
||
# 待抓摘要(并顺带取 PDF)
|
||
qs = Paper.objects.filter(has_abstract=False)
|
||
if exclude_failed:
|
||
qs = qs.filter(fail_reason=None)
|
||
else:
|
||
qs = qs.exclude(fail_reason__contains="elsevier_")
|
||
qs = qs.exclude(fetch_status="downloading"
|
||
).filter(doi__startswith="10.1016").order_by("?")
|
||
|
||
# 存量补 PDF: 已有全文标记但还没下到 PDF
|
||
qs_pdf = Paper.objects.filter(
|
||
has_fulltext=True, has_fulltext_pdf=False, has_abstract=True
|
||
).exclude(fetch_status="downloading"
|
||
).exclude(fail_reason__contains="elsevier_pdf_preview_only"
|
||
).filter(doi__startswith="10.1016")
|
||
|
||
if not qs.exists() and not qs_pdf.exists():
|
||
return "done" # 不自重发, 交给 beat 轮询拉起
|
||
|
||
err_msg = ""
|
||
count_abs = 0
|
||
count_fulltext = 0
|
||
count_pdf = 0
|
||
with requests.Session() as req:
|
||
# 阶段1: 摘要 + 内联 PDF
|
||
for paper in qs[:number_of_task]:
|
||
if not show_task_run(def_name):
|
||
break
|
||
if paper.fetch_status == "downloading":
|
||
continue
|
||
paper.fetch(status="downloading")
|
||
try:
|
||
got_abs, got_fulltext, err = _elsevier_fetch_xml(req, paper)
|
||
if err == "request_error":
|
||
err_msg = "elsevier_request_error"
|
||
break
|
||
if got_abs:
|
||
count_abs += 1
|
||
if got_fulltext:
|
||
count_fulltext += 1
|
||
if _elsevier_fetch_pdf(req, paper):
|
||
count_pdf += 1
|
||
finally:
|
||
paper.fetch_end()
|
||
|
||
# 阶段2: 存量补 PDF (摘要阶段未因网络异常中断才继续)
|
||
if err_msg == "":
|
||
for paper in qs_pdf[:pdf_number_of_task]:
|
||
if not show_task_run(def_name):
|
||
break
|
||
if paper.fetch_status == "downloading":
|
||
continue
|
||
paper.fetch(status="downloading")
|
||
try:
|
||
if _elsevier_fetch_pdf(req, paper):
|
||
count_pdf += 1
|
||
finally:
|
||
paper.fetch_end()
|
||
|
||
if show_task_run(def_name):
|
||
countdown = 30 if err_msg else 5 # 网络异常时放慢重试
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_abstract_from_elsevier",
|
||
kwargs={
|
||
"number_of_task": number_of_task,
|
||
"exclude_failed": exclude_failed,
|
||
"pdf_number_of_task": pdf_number_of_task,
|
||
},
|
||
countdown=countdown,
|
||
)
|
||
return f'{err_msg}, abs {count_abs}, fulltext {count_fulltext}, pdf {count_pdf}'
|
||
|
||
def get_actual_running_count():
|
||
"""获取实际在下载的任务数"""
|
||
return Paper.objects.filter(fetch_status='downloading').count()
|
||
|
||
def can_send_more(max_running):
|
||
return get_actual_running_count() < max_running
|
||
|
||
@shared_task(base=CustomTask)
|
||
def send_download_fulltext_task(number_of_task=100):
|
||
# 只排除"本下载链路已尝试过"的论文(download_pdf 终态会打 download_pdf_tried 标记),
|
||
# 不再用 fail_reason=None —— 否则被 openalex 保活链失败标记蹭上 fail_reason 的论文会被
|
||
# 永久遮蔽, 其 oa_url/elsevier/scihub 兜底路径永远不会被尝试。
|
||
qs = Paper.objects.filter(has_fulltext=False, is_oa=True).exclude(
|
||
fetch_status='downloading'
|
||
).exclude(fail_reason__contains="download_pdf_tried")
|
||
if not qs.exists():
|
||
return "done"
|
||
qs0 = qs.order_by("?")
|
||
|
||
# 分批发送任务,控制并发数量,避免过多请求
|
||
task_count = 0
|
||
for paper in qs0[:number_of_task]:
|
||
if not can_send_more(number_of_task):
|
||
break
|
||
# 使用 countdown 错开请求时间,避免过多并发
|
||
countdown = task_count * 1 # 每个任务间隔1秒
|
||
current_app.send_task(
|
||
"apps.resm.tasks.download_pdf",
|
||
kwargs={
|
||
"paper_id": paper.id,
|
||
},
|
||
countdown=countdown,
|
||
)
|
||
task_count += 1
|
||
|
||
return f"sent {task_count} download_pdf tasks"
|
||
|
||
@shared_task(base=CustomTask)
|
||
def release_working_paper(minutes=10):
|
||
qs = Paper.objects.filter(fetch_status="downloading")
|
||
count = 0
|
||
for paper in qs:
|
||
if paper.update_time < timezone.now() - timedelta(minutes=minutes):
|
||
paper.fetch_end()
|
||
count += 1
|
||
return f"release {count} papers"
|
||
|
||
@shared_task(base=CustomTask)
|
||
def download_pdf(paper_id):
|
||
"""
|
||
下载单个论文的PDF
|
||
"""
|
||
try:
|
||
paper = Paper.objects.get(id=paper_id)
|
||
if paper.fetch_status == "downloading":
|
||
return
|
||
paper.fetch("downloading")
|
||
msg = "no_method_to_get_pdf"
|
||
current_from = ""
|
||
if paper.oa_url:
|
||
if "https://doi.org/10.1016" in paper.oa_url:
|
||
current_from = "elsevier"
|
||
msg = save_pdf_from_elsevier(paper)
|
||
else:
|
||
current_from = "oa_url"
|
||
msg = save_pdf_from_oa_url(paper)
|
||
if paper.has_fulltext_pdf is False and paper.publication_year <= 2021:
|
||
current_from = "scihub"
|
||
msg = save_pdf_from_scihub(paper)
|
||
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
|
||
# current_from = "openalex"
|
||
# msg = save_pdf_from_openalex(paper)
|
||
if paper.has_fulltext_pdf is False:
|
||
# 终态标记: 无论该论文之前是否已被 openalex 保活链写过 fail_reason, 这里都追加
|
||
# 一条带 download_pdf_tried 的终态, 供 send_download_fulltext_task 据此排除。
|
||
# 既防本下载链路(oa_url/elsevier/scihub)对同一篇无限重试, 也避免 openalex 链
|
||
# 留下的 fail_reason 把本链路的首次尝试给遮蔽掉。
|
||
paper.save_fail_reason(f"download_pdf_tried:{msg}")
|
||
return msg, current_from
|
||
finally:
|
||
paper.fetch_end()
|
||
|
||
|
||
def save_pdf_from_oa_url(paper: Paper):
|
||
from .d_oaurl import download_pdf_with_curl_cffi, download_from_url_playwright
|
||
|
||
# 策略1: 直接请求
|
||
try:
|
||
headers = get_random_headers()
|
||
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
|
||
except requests.RequestException as e:
|
||
paper.save_fail_reason("oa_url_request_error")
|
||
return f"oa_url_request_error: {str(e)}"
|
||
|
||
if res.status_code in [200, 201, 202]:
|
||
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
|
||
is_pdf = (
|
||
res.content.startswith(b'%PDF') or
|
||
res.headers.get("content-type", "").startswith("application/pdf") or
|
||
res.headers.get("content-type", "") == "application/octet-stream"
|
||
)
|
||
if is_pdf and len(res.content) > 1024: # 至少1KB
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return "success"
|
||
else:
|
||
paper.save_fail_reason("oa_url_not_pdf")
|
||
return "oa_url_not_pdf"
|
||
|
||
# 策略2: curl-cffi(处理 Cloudflare JS 挑战)
|
||
paper_path = paper.init_paper_path("pdf")
|
||
is_ok, err_msg = download_pdf_with_curl_cffi(paper.oa_url, paper_path)
|
||
if is_ok:
|
||
paper.has_fulltext = True
|
||
paper.has_fulltext_pdf = True
|
||
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
||
return "success"
|
||
|
||
# 策略3: Playwright(最终回退)
|
||
is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
|
||
if is_ok:
|
||
paper.has_fulltext = True
|
||
paper.has_fulltext_pdf = True
|
||
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
||
return "success"
|
||
|
||
paper.save_fail_reason(f"oa_url_all_methods_failed: {err_msg}")
|
||
return f"oa_url_all_methods_failed: {err_msg}"
|
||
|
||
def save_pdf_from_openalex(paper:Paper):
|
||
if cache.get("openalex_api_exceed"):
|
||
return "find cache Insufficient credits"
|
||
# 尝试openalex下载
|
||
try:
|
||
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
|
||
params={
|
||
"api_key": OPENALEX_KEY
|
||
},
|
||
timeout=(3, 15))
|
||
except requests.RequestException as e:
|
||
return f"openalex_pdf_error: {str(e)}"
|
||
if res.status_code == 200:
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return "success"
|
||
elif res.status_code == 429:
|
||
try:
|
||
message = res.json().get("message", "")
|
||
except ValueError:
|
||
message = res.text
|
||
if "Insufficient credits" in message:
|
||
# 额度耗尽: 退避 1 小时
|
||
cache.set("openalex_api_exceed", True, timeout=3600)
|
||
return "openalex_pdf_error: Insufficient credits"
|
||
# 普通限流(请求过频): 短退避 2 分钟, 避免立刻重试再撞 429
|
||
cache.set("openalex_api_exceed", True, timeout=120)
|
||
return f"openalex_pdf_error: 429 {message[:100]}"
|
||
elif res.status_code == 404:
|
||
paper.save_fail_reason("openalex_pdf_not_found")
|
||
return "openalex_pdf_error: 404"
|
||
else:
|
||
paper.save_fail_reason("openalex_pdf_error")
|
||
return f"openalex_pdf_error: {res.status_code} {res.text}"
|
||
|
||
|
||
def save_pdf_from_elsevier(paper:Paper):
|
||
params = {
|
||
"httpAccept": "application/pdf",
|
||
}
|
||
try:
|
||
res = requests.get(
|
||
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||
params=params,
|
||
headers=ELSEVIER_HEADERS,
|
||
timeout=(3, 15)
|
||
)
|
||
except requests.RequestException as e:
|
||
return f"elsevier_request_error: {str(e)}"
|
||
if res.status_code == 200:
|
||
if _is_elsevier_preview_pdf(res.content):
|
||
paper.save_fail_reason("elsevier_pdf_preview_only")
|
||
return "elsevier_pdf_preview_only"
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return "success"
|
||
else:
|
||
return f"elsevier_status_error: {res.status_code} {res.text}"
|
||
|
||
def save_pdf_from_scihub(paper:Paper):
|
||
from .d_scihub import download_paper_by_doi
|
||
is_ok, err_msg = download_paper_by_doi(paper.doi, paper.init_paper_path("pdf"))
|
||
if is_ok:
|
||
paper.has_fulltext = True
|
||
paper.has_fulltext_pdf = True
|
||
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf"])
|
||
return "success"
|
||
else:
|
||
paper.save_fail_reason(err_msg)
|
||
return err_msg
|
||
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true |