paper_server/apps/resm/tasks.py

805 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from celery import current_app
from datetime import datetime, timedelta
import random
from .d_oaurl import download_from_url_playwright
import asyncio
import sys
import os
from django.db.models import Q
from django.utils import timezone
from django.conf import settings
# 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露)
config.email = settings.OPENALEX_EMAIL
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = settings.OPENALEX_API_KEY
OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载
ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY
ELSEVIER_HEADERS = {
"X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN,
"X-ELS-APIKey": ELSEVIER_APIKEY,
}
def run_async(coro):
"""
跨平台运行异步任务,解决 Windows 上 asyncio subprocess 问题
"""
if sys.platform == 'win32':
# Windows 上需要使用 ProactorEventLoop 来支持 subprocess
policy = asyncio.WindowsProactorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
else:
# Unix/Linux/Mac 上使用默认方式
return asyncio.run(coro)
# OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用
OPENALEX_SELECT_FIELDS = [
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]
def _apply_keyword_search(pager, keywords: str, search: str):
"""把 keywords / search 过滤条件套到 pyalex 的 pager 上。
keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter。
"""
if keywords:
if "|" in keywords:
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
pager = pager.filter_or(keywords={"id": keywords_list})
elif "," in keywords:
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
pager = pager.filter(keywords={"id": keywords_list})
else:
pager = pager.filter(keywords={"id": [keywords.strip()]})
if search:
pager = pager.search_filter(title_and_abstract=search)
return pager
def _build_paper_from_record(record, keywords: str, search: str) -> Paper:
"""把 OpenAlex 单条 record 映射成未保存的 Paper 实例。"""
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
return paper
def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None,
stop_key: str = None, n_max: int = None) -> int:
"""OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。
base_pager: 已套好 filter 的 pyalex pager(年/日期/收录日期等过滤由调用方决定)。
cache_key: 给定则逐页把「下一页游标」checkpoint 进去(抓完写 "DONE",已 DONE 直接
跳过)→ 真·断点续传;为 None 则不落游标,每次从头扫(适合短窗口增量)。
stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint。
返回本次处理到的 record 条数。
"""
if cache_key:
start = cache.get(cache_key, "*")
if start == "DONE":
return 0
else:
start = "*"
pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate(
per_page=200, n_max=n_max, cursor=start)
seen = 0
for page in pager:
papers = [
_build_paper_from_record(record, keywords, search)
for record in page
if record["doi"] and (record["display_name"] or record["title"])
]
# ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的
Paper.objects.bulk_create(papers, ignore_conflicts=True)
seen += len(papers)
nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾
if cache_key:
cache.set(cache_key, nv if nv is not None else "DONE", timeout=None)
if nv is None or len(page) == 0:
if cache_key:
cache.set(cache_key, "DONE", timeout=None)
break
if stop_key and cache.get(stop_key):
break
return seen
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
if not (keywords or search):
raise Exception("keywords or search must be provided")
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
base = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
base = _apply_keyword_search(base, keywords, search)
_crawl_openalex_query(base, keywords, search, cache_key=cache_key,
stop_key="get_paper_meta_from_openalex_stop")
if cache.get("get_paper_meta_from_openalex_stop", None) is None:
if end_year is None:
end_year = datetime.now().year
if publication_year + 1 <= end_year:
current_app.send_task(
"apps.resm.tasks.get_paper_meta_from_openalex",
kwargs={
"publication_year": publication_year + 1,
"keywords": keywords,
"search": search,
"end_year": end_year
},
countdown=5
)
def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str,
per_combo_max: int = None) -> int:
"""针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在
from_publication_date 之后发表的论文。短窗口扫描,不落游标(每次从头扫)。
注:更贴切的「按收录时间增量」要用 from_created_date / from_updated_date,但这两个
过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的
from_publication_date(发表日期,天级粒度)。代价:论文常在发表后若干天才被 OpenAlex
收录,所以窗口要留得比调度间隔大,否则会漏掉「晚收录」的文。
"""
base = Works().filter(
has_doi=True,
type="article",
from_publication_date=from_publication_date,
)
base = _apply_keyword_search(base, keywords, search)
return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max)
@shared_task(base=CustomTask)
def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None):
"""自动增量更新期刊论文索引(resm_paper)的主任务。
遍历库里已抓过的每个 (o_keywords, o_search) 查询组合,用 OpenAlex 的
from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库。
(理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium,
当前 key 无权限,故用公开可用的 from_publication_date。)
bulk_create(ignore_conflicts=True) 保证不重复。days 默认 30:论文常在发表后数天到
数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏「晚收录」的文。
由 django-celery-beat 每天调度一次。
"""
from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat()
combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct())
before = Paper.objects.count()
n_combos = 0
for o_keywords, o_search in combos:
o_keywords = o_keywords or ""
o_search = o_search or ""
if not o_keywords and not o_search:
continue
n_combos += 1
_fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max)
new_papers = Paper.objects.count() - before
return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}"
BACKFILL_STOP_KEY = "backfill_paper_meta_stop"
@shared_task(base=CustomTask)
def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None,
combo_index: int = 0, combos=None):
"""按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。
本任务只负责策略:抓哪些查询组合、按什么发表日期区间、怎么 chain。
具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query。
顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个,
单个 task 短小、可随时停、从断点续。
停 / 续(应对配额):
- 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain
- 续跑: cache.delete("backfill_paper_meta_stop") 后重新
backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01")
已 DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续。
配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续。
"""
if cache.get(BACKFILL_STOP_KEY):
return "paused (backfill_paper_meta_stop set); clear it then re-run to resume"
if combos is None:
# None 归一成 "" 并去重;排序保证每次续跑组合顺序一致
combos = [list(c) for c in sorted({
(c[0] or "", c[1] or "")
for c in Paper.objects.values_list("o_keywords", "o_search").distinct()
if (c[0] or c[1])
})]
def ckey(kw, search):
return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}"
# 跳过已完成的组合
while combo_index < len(combos):
kw, search = combos[combo_index]
if cache.get(ckey(kw, search)) == "DONE":
combo_index += 1
continue
break
if combo_index >= len(combos):
return f"backfill done: {len(combos)} combos, from {from_publication_date}"
kw, search = combos[combo_index]
cursor_key = ckey(kw, search)
base = Works().filter(
has_doi=True, type="article", from_publication_date=from_publication_date,
)
if to_publication_date:
base = base.filter(to_publication_date=to_publication_date)
base = _apply_keyword_search(base, kw, search)
try:
_crawl_openalex_query(base, kw, search, cache_key=cursor_key,
stop_key=BACKFILL_STOP_KEY)
except Exception as e:
# 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑
cache.set(BACKFILL_STOP_KEY, True, timeout=None)
return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. "
f"clear {BACKFILL_STOP_KEY} then re-run to resume")
if cache.get(cursor_key) != "DONE":
# 被 stop_key 打断,没抓完;游标已保留,等续跑
return f"combo {combo_index}/{len(combos)} paused; cursor saved"
# 该组合已抓完,chain 下一个
current_app.send_task(
"apps.resm.tasks.backfill_paper_meta_from_openalex",
kwargs={
"from_publication_date": from_publication_date,
"to_publication_date": to_publication_date,
"combo_index": combo_index + 1,
"combos": combos,
},
countdown=3,
)
return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next"
SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect"
def _build_paper_from_sd_result(r, qs_text: str):
"""把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。
SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex 少。
缺 doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)。
"""
doi = r.get("doi")
title = r.get("title")
if not doi or not title:
return None
pub_date = r.get("publicationDate") # YYYY-MM-DD
year = None
if pub_date:
try:
year = int(str(pub_date)[:4])
except (ValueError, TypeError):
year = None
if year is None:
return None
paper = Paper()
paper.id = idWorker.get_id()
paper.source = "elsevier"
paper.type = "article"
paper.o_search = qs_text
paper.doi = str(doi).replace("https://doi.org/", "")
paper.title = title
paper.publication_date = pub_date if len(str(pub_date)) == 10 else None
paper.publication_year = year
paper.publication_name = r.get("sourceTitle")
authors = r.get("authors") or []
if isinstance(authors, list) and authors:
# 取 order 最小的作者作为第一作者(部分作者可能被省略)
first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1)
if isinstance(first, dict):
paper.first_author = first.get("name")
paper.is_oa = bool(r.get("openAccess"))
return paper
def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100,
max_results: int = 200):
"""调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。
返回 (papers, err_msg)。err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)。
"""
headers = {
**ELSEVIER_HEADERS,
"Content-Type": "application/json",
"Accept": "application/json",
}
papers = []
offset = 0
with requests.Session() as req:
while offset < max_results:
body = {
"qs": qs_text,
"loadedAfter": loaded_after,
"display": {"offset": offset, "show": show, "sortBy": "date"},
}
try:
res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body,
headers=headers, timeout=(3, 30))
except requests.RequestException as e:
return papers, f"elsevier_search_request_error: {e}"
if res.status_code in (401, 403):
return papers, f"elsevier_search_no_entitlement: {res.status_code}"
if res.status_code != 200:
return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}"
results = (res.json() or {}).get("results") or []
# 空结果时 SD 可能返回单个 {"error": ...},一并视为结束
if not results or (len(results) == 1 and results[0].get("error")):
break
for r in results:
paper = _build_paper_from_sd_result(r, qs_text)
if paper is not None:
papers.append(paper)
if len(results) < show:
break
offset += show
return papers, ""
@shared_task(base=CustomTask)
def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200):
"""ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。
遍历库里已有的纯文本查询(o_search,OpenAlex 的 keyword id 无法用于 SD,
故只用 o_search),用 loadedAfter 拉取最近 days 天 Elsevier 新入库的期刊文章。
只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex 对
Elsevier 新刊收录的延迟。由 django-celery-beat 每天调度一次。
"""
loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
qs_texts = [
s.strip() for s in Paper.objects.exclude(o_search__isnull=True)
.exclude(o_search="").values_list("o_search", flat=True).distinct()
if s and s.strip()
]
before = Paper.objects.count()
msgs = []
for qs_text in qs_texts:
papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max)
if papers:
Paper.objects.bulk_create(papers, ignore_conflicts=True)
if err:
msgs.append(f"{qs_text}: {err}")
# key 没 Search 权限,后续 query 也没意义,直接停
if "no_entitlement" in err:
break
new_papers = Paper.objects.count() - before
return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, "
f"since={loaded_after}, errs={msgs}")
# 常用的 User-Agent 列表
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15",
]
def get_random_headers():
"""获取随机的请求头"""
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "application/pdf, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.google.com/",
}
def show_task_run(def_name: str):
return cache.get(def_name, True)
@shared_task(base=CustomTask)
def get_pdf_from_openalex(number_of_task: int =10):
def_name = get_pdf_from_openalex.name
if not show_task_run(def_name):
return "stoped"
count = 0
qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude(
fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task]
if not qs.exists():
return "done"
msg = ""
for paper in qs:
if not show_task_run(def_name):
break
paper.fetch("downloading")
msg = save_pdf_from_openalex(paper)
paper.fetch_end()
if paper.has_fulltext_pdf:
count += 1
if cache.get("openalex_api_exceed"):
break
countdown = 2
if cache.get("openalex_api_exceed"):
countdown = 30 * 60 # 30分钟后重试
if show_task_run(def_name):
current_app.send_task(
"apps.resm.tasks.get_pdf_from_openalex",
kwargs={
"number_of_task": number_of_task,
},
countdown=countdown,
)
return msg, count
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True):
def_name = get_abstract_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
# qs = Paper.objects.filter(Q(has_abstract=False)|Q(has_fulltext_xml=False))
qs = Paper.objects.filter(has_abstract=False)
# qs = qs.exclude(
# fail_reason__contains="elsevier_doi_not_found"
# ).exclude(fail_reason__contains="elsevier_abstract_not_found"
# )
if exclude_failed:
qs = qs.filter(fail_reason=None)
else:
qs = qs.exclude(fail_reason__contains="elsevier_")
qs = qs.exclude(fetch_status="downloading"
).filter(doi__startswith="10.1016").order_by("?")
if not qs.exists():
return "done"
params = {
"httpAccept": "text/xml"
}
err_msg = ""
count_abs = 0
count_fulltext = 0
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
if paper.fetch_status == "downloading":
continue
paper.fetch(status="downloading")
try:
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
headers = ELSEVIER_HEADERS,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd",}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
count_abs += 1
else:
paper.save_fail_reason("elsevier_abstract_not_found")
continue
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
count_fulltext += 1
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract",
"has_abstract_xml", "has_fulltext",
"has_fulltext_xml", "update_time"])
elif res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
else:
err_msg = f"elsevier_response_error: {res.status_code} {res.text}"
paper.save_fail_reason(f"elsevier_response_error: {res.status_code}")
finally:
paper.fetch_end()
if show_task_run(def_name):
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"number_of_task": number_of_task,
"exclude_failed": exclude_failed
},
countdown=5,
)
return f'{err_msg}, get {count_abs} abstracts, {count_fulltext} fulltexts'
@shared_task(base=CustomTask)
def get_pdf_from_elsevier(number_of_task=100):
"""
获取elsevier全文
"""
def_name = get_pdf_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True)
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "application/pdf"
}
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf")
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content)
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext_pdf", "update_time"])
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_pdf_from_elsevier",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
def get_actual_running_count():
"""获取实际在下载的任务数"""
return Paper.objects.filter(fetch_status='downloading').count()
def can_send_more(max_running):
return get_actual_running_count() < max_running
@shared_task(base=CustomTask)
def send_download_fulltext_task(number_of_task=100):
qs = Paper.objects.filter(has_fulltext=False, fail_reason=None, is_oa=True).exclude(
fetch_status='downloading'
)
if not qs.exists():
return "done"
qs0 = qs.order_by("?")
# 分批发送任务,控制并发数量,避免过多请求
task_count = 0
for paper in qs0[:number_of_task]:
if not can_send_more(number_of_task):
break
# 使用 countdown 错开请求时间,避免过多并发
countdown = task_count * 1 # 每个任务间隔1秒
current_app.send_task(
"apps.resm.tasks.download_pdf",
kwargs={
"paper_id": paper.id,
},
countdown=countdown,
)
task_count += 1
return f"sent {task_count} download_pdf tasks"
@shared_task(base=CustomTask)
def release_working_paper(minutes=10):
qs = Paper.objects.filter(fetch_status="downloading")
count = 0
for paper in qs:
if paper.update_time < timezone.now() - timedelta(minutes=minutes):
paper.fetch_end()
count += 1
return f"release {count} papers"
@shared_task(base=CustomTask)
def download_pdf(paper_id):
"""
下载单个论文的PDF
"""
try:
paper = Paper.objects.get(id=paper_id)
if paper.fetch_status == "downloading":
return
paper.fetch("downloading")
msg = "no_method_to_get_pdf"
current_from = ""
if paper.oa_url:
if "https://doi.org/10.1016" in paper.oa_url:
current_from = "elsevier"
msg = save_pdf_from_elsevier(paper)
else:
current_from = "oa_url"
msg = save_pdf_from_oa_url(paper)
if paper.has_fulltext_pdf is False and paper.publication_year <= 2021:
current_from = "scihub"
msg = save_pdf_from_scihub(paper)
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
# current_from = "openalex"
# msg = save_pdf_from_openalex(paper)
if paper.fail_reason is None and paper.has_fulltext_pdf is False:
paper.save_fail_reason(msg)
return msg, current_from
finally:
paper.fetch_end()
def save_pdf_from_oa_url(paper: Paper):
from .d_oaurl import download_pdf_with_curl_cffi, download_from_url_playwright
# 策略1: 直接请求
try:
headers = get_random_headers()
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
except requests.RequestException as e:
paper.save_fail_reason("oa_url_request_error")
return f"oa_url_request_error: {str(e)}"
if res.status_code in [200, 201, 202]:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf") or
res.headers.get("content-type", "") == "application/octet-stream"
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content, save_obj=True)
return "success"
else:
paper.save_fail_reason("oa_url_not_pdf")
return "oa_url_not_pdf"
# 策略2: curl-cffi处理 Cloudflare JS 挑战)
paper_path = paper.init_paper_path("pdf")
is_ok, err_msg = download_pdf_with_curl_cffi(paper.oa_url, paper_path)
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
return "success"
# 策略3: Playwright最终回退
is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
return "success"
paper.save_fail_reason(f"oa_url_all_methods_failed: {err_msg}")
return f"oa_url_all_methods_failed: {err_msg}"
def save_pdf_from_openalex(paper:Paper):
if cache.get("openalex_api_exceed"):
return "find cache Insufficient credits"
# 尝试openalex下载
try:
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
params={
"api_key": OPENALEX_KEY
})
except requests.RequestException as e:
return f"openalex_pdf_error: {str(e)}"
if res.status_code == 200:
paper.save_file_pdf(res.content, save_obj=True)
return "success"
elif res.status_code == 429:
if "Insufficient credits" in res.json().get("message", ""):
cache.set("openalex_api_exceed", True, timeout=3600)
return "openalex_pdf_error: Insufficient credits"
elif res.status_code == 404:
paper.save_fail_reason("openalex_pdf_not_found")
return "openalex_pdf_error: 404"
else:
paper.save_fail_reason("openalex_pdf_error")
return f"openalex_pdf_error: {res.status_code} {res.text}"
def save_pdf_from_elsevier(paper:Paper):
params = {
"httpAccept": "application/pdf",
}
try:
res = requests.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
headers=ELSEVIER_HEADERS,
timeout=(3, 15)
)
except requests.RequestException as e:
return f"elsevier_request_error: {str(e)}"
if res.status_code == 200:
paper.save_file_pdf(res.content, save_obj=True)
return "success"
else:
return f"elsevier_status_error: {res.status_code} {res.text}"
def save_pdf_from_scihub(paper:Paper):
from .d_scihub import download_paper_by_doi
is_ok, err_msg = download_paper_by_doi(paper.doi, paper.init_paper_path("pdf"))
if is_ok:
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf"])
return "success"
else:
paper.save_fail_reason(err_msg)
return err_msg
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true