# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from pyalex import Works, config from apps.resm.models import Paper, PaperAbstract, PaperMonitor from apps.utils.snowflake import idWorker from django.core.cache import cache import requests from lxml import etree from celery import current_app from datetime import datetime, timedelta import random from .d_oaurl import download_from_url_playwright import asyncio import sys import os from django.db.models import Q from django.utils import timezone from django.conf import settings # 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露) config.email = settings.OPENALEX_EMAIL config.max_retries = 0 config.retry_backoff_factor = 0.1 config.retry_http_codes = [429, 500, 503] config.api_key = settings.OPENALEX_API_KEY OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载 ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY ELSEVIER_HEADERS = { "X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN, "X-ELS-APIKey": ELSEVIER_APIKEY, } def run_async(coro): """ 跨平台运行异步任务,解决 Windows 上 asyncio subprocess 问题 """ if sys.platform == 'win32': # Windows 上需要使用 ProactorEventLoop 来支持 subprocess policy = asyncio.WindowsProactorEventLoopPolicy() asyncio.set_event_loop_policy(policy) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() else: # Unix/Linux/Mac 上使用默认方式 return asyncio.run(coro) # OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用 OPENALEX_SELECT_FIELDS = [ "id", "doi", "title", "publication_date", "open_access", "authorships", "primary_location", "publication_year", "display_name", "content_urls" ] def _apply_keyword_search(pager, keywords: str, search: str): """把 keywords / search 过滤条件套到 pyalex 的 pager 上。 keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter。 """ if keywords: if "|" in keywords: keywords_list = [k.strip() for k in keywords.split("|") if k.strip()] pager = pager.filter_or(keywords={"id": keywords_list}) elif "," in keywords: keywords_list = [k.strip() for k in keywords.split(",") if k.strip()] pager = pager.filter(keywords={"id": keywords_list}) else: pager = pager.filter(keywords={"id": [keywords.strip()]}) if search: pager = pager.search_filter(title_and_abstract=search) return pager def _build_paper_from_record(record, keywords: str, search: str) -> Paper: """把 OpenAlex 单条 record 映射成未保存的 Paper 实例。""" paper = Paper() paper.id = idWorker.get_id() paper.o_keywords = keywords paper.o_search = search paper.source = "openalex" paper.type = "article" paper.openalex_id = record["id"].split("/")[-1] paper.doi = record["doi"].replace("https://doi.org/", "") paper.title = record["display_name"] paper.publication_date = record["publication_date"] paper.publication_year = record["publication_year"] if record["open_access"]: paper.is_oa = record["open_access"]["is_oa"] paper.oa_url = record["open_access"]["oa_url"] if record["authorships"]: paper.first_author = record["authorships"][0]["author"]["display_name"] if record["authorships"][0]["institutions"]: paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] if record["primary_location"] and record["primary_location"]["source"]: paper.publication_name = record["primary_location"]["source"]["display_name"] return paper def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None, stop_key: str = None, n_max: int = None) -> int: """OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。 base_pager: 已套好 filter 的 pyalex pager(年/日期/收录日期等过滤由调用方决定)。 cache_key: 给定则逐页把「下一页游标」checkpoint 进去(抓完写 "DONE",已 DONE 直接 跳过)→ 真·断点续传;为 None 则不落游标,每次从头扫(适合短窗口增量)。 stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint。 返回本次处理到的 record 条数。 """ if cache_key: start = cache.get(cache_key, "*") if start == "DONE": return 0 else: start = "*" pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate( per_page=200, n_max=n_max, cursor=start) seen = 0 for page in pager: papers = [ _build_paper_from_record(record, keywords, search) for record in page if record["doi"] and (record["display_name"] or record["title"]) ] # ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的 Paper.objects.bulk_create(papers, ignore_conflicts=True) seen += len(papers) nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾 if cache_key: cache.set(cache_key, nv if nv is not None else "DONE", timeout=None) if nv is None or len(page) == 0: if cache_key: cache.set(cache_key, "DONE", timeout=None) break if stop_key and cache.get(stop_key): break return seen @shared_task(base=CustomTask) def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): if not (keywords or search): raise Exception("keywords or search must be provided") cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" base = Works().filter( publication_year=publication_year, has_doi=True, type="article" ) base = _apply_keyword_search(base, keywords, search) _crawl_openalex_query(base, keywords, search, cache_key=cache_key, stop_key="get_paper_meta_from_openalex_stop") if cache.get("get_paper_meta_from_openalex_stop", None) is None: if end_year is None: end_year = datetime.now().year if publication_year + 1 <= end_year: current_app.send_task( "apps.resm.tasks.get_paper_meta_from_openalex", kwargs={ "publication_year": publication_year + 1, "keywords": keywords, "search": search, "end_year": end_year }, countdown=5 ) def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str, per_combo_max: int = None) -> int: """针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在 from_publication_date 之后发表的论文。短窗口扫描,不落游标(每次从头扫)。 注:更贴切的「按收录时间增量」要用 from_created_date / from_updated_date,但这两个 过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的 from_publication_date(发表日期,天级粒度)。代价:论文常在发表后若干天才被 OpenAlex 收录,所以窗口要留得比调度间隔大,否则会漏掉「晚收录」的文。 """ base = Works().filter( has_doi=True, type="article", from_publication_date=from_publication_date, ) base = _apply_keyword_search(base, keywords, search) return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max) @shared_task(base=CustomTask) def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None): """自动增量更新期刊论文索引(resm_paper)的主任务。 遍历库里已抓过的每个 (o_keywords, o_search) 查询组合,用 OpenAlex 的 from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库。 (理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium, 当前 key 无权限,故用公开可用的 from_publication_date。) bulk_create(ignore_conflicts=True) 保证不重复。days 默认 30:论文常在发表后数天到 数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏「晚收录」的文。 由 django-celery-beat 每天调度一次。 """ from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat() combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct()) before = Paper.objects.count() n_combos = 0 for o_keywords, o_search in combos: o_keywords = o_keywords or "" o_search = o_search or "" if not o_keywords and not o_search: continue n_combos += 1 _fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max) new_papers = Paper.objects.count() - before return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}" BACKFILL_STOP_KEY = "backfill_paper_meta_stop" @shared_task(base=CustomTask) def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None, combo_index: int = 0, combos=None): """按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。 本任务只负责策略:抓哪些查询组合、按什么发表日期区间、怎么 chain。 具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query。 顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个, 单个 task 短小、可随时停、从断点续。 停 / 续(应对配额): - 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain - 续跑: cache.delete("backfill_paper_meta_stop") 后重新 backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01") 已 DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续。 配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续。 """ if cache.get(BACKFILL_STOP_KEY): return "paused (backfill_paper_meta_stop set); clear it then re-run to resume" if combos is None: # None 归一成 "" 并去重;排序保证每次续跑组合顺序一致 combos = [list(c) for c in sorted({ (c[0] or "", c[1] or "") for c in Paper.objects.values_list("o_keywords", "o_search").distinct() if (c[0] or c[1]) })] def ckey(kw, search): return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}" # 跳过已完成的组合 while combo_index < len(combos): kw, search = combos[combo_index] if cache.get(ckey(kw, search)) == "DONE": combo_index += 1 continue break if combo_index >= len(combos): return f"backfill done: {len(combos)} combos, from {from_publication_date}" kw, search = combos[combo_index] cursor_key = ckey(kw, search) base = Works().filter( has_doi=True, type="article", from_publication_date=from_publication_date, ) if to_publication_date: base = base.filter(to_publication_date=to_publication_date) base = _apply_keyword_search(base, kw, search) try: _crawl_openalex_query(base, kw, search, cache_key=cursor_key, stop_key=BACKFILL_STOP_KEY) except Exception as e: # 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑 cache.set(BACKFILL_STOP_KEY, True, timeout=None) return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. " f"clear {BACKFILL_STOP_KEY} then re-run to resume") if cache.get(cursor_key) != "DONE": # 被 stop_key 打断,没抓完;游标已保留,等续跑 return f"combo {combo_index}/{len(combos)} paused; cursor saved" # 该组合已抓完,chain 下一个 current_app.send_task( "apps.resm.tasks.backfill_paper_meta_from_openalex", kwargs={ "from_publication_date": from_publication_date, "to_publication_date": to_publication_date, "combo_index": combo_index + 1, "combos": combos, }, countdown=3, ) return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next" @shared_task(base=CustomTask) def monitor_papers(monitor_id: str = None): """期刊 / 关键词监控:遍历启用的 PaperMonitor,按 type 拼 OpenAlex 过滤,用 from_publication_date 拉最近 days 天的最新论文元数据入库。期刊与关键词监控共用本任务。 - journal: value=ISSN -> 过滤 primary_location.source.issn(o_keywords/o_search 留空) - search : value=英文搜索词 -> title_and_abstract.search(写回 o_search) - keyword: value=OpenAlex keyword id -> keywords.id(写回 o_keywords) 复用 _crawl_openalex_query(短窗口,cache_key=None 每次从头扫);bulk_create ignore_conflicts 自动与现有 Paper 去重。monitor_id 给定则只跑该条。由 beat 每天调度。 """ qs = PaperMonitor.objects.filter(is_active=True) if monitor_id: qs = qs.filter(id=monitor_id) results = [] for m in qs: from_pub = (timezone.now() - timedelta(days=m.days or 30)).date().isoformat() base = Works().filter(has_doi=True, type="article", from_publication_date=from_pub) kw, search = "", "" if m.type == PaperMonitor.TYPE_JOURNAL: base = base.filter(primary_location={"source": {"issn": m.value}}) elif m.type == PaperMonitor.TYPE_SEARCH: search = m.value base = _apply_keyword_search(base, "", search) elif m.type == PaperMonitor.TYPE_KEYWORD: kw = m.value base = _apply_keyword_search(base, kw, "") else: continue seen = _crawl_openalex_query(base, kw, search) m.last_run = timezone.now() m.last_count = seen m.save(update_fields=["last_run", "last_count", "update_time"]) results.append(f"{m.type}:{m.name or m.value}={seen}") return "; ".join(results) or "no active monitors" SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect" def _build_paper_from_sd_result(r, qs_text: str): """把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。 SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex 少。 缺 doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)。 """ doi = r.get("doi") title = r.get("title") if not doi or not title: return None pub_date = r.get("publicationDate") # YYYY-MM-DD year = None if pub_date: try: year = int(str(pub_date)[:4]) except (ValueError, TypeError): year = None if year is None: return None paper = Paper() paper.id = idWorker.get_id() paper.source = "elsevier" paper.type = "article" paper.o_search = qs_text paper.doi = str(doi).replace("https://doi.org/", "") paper.title = title paper.publication_date = pub_date if len(str(pub_date)) == 10 else None paper.publication_year = year paper.publication_name = r.get("sourceTitle") authors = r.get("authors") or [] if isinstance(authors, list) and authors: # 取 order 最小的作者作为第一作者(部分作者可能被省略) first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1) if isinstance(first, dict): paper.first_author = first.get("name") paper.is_oa = bool(r.get("openAccess")) return paper def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100, max_results: int = 200): """调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。 返回 (papers, err_msg)。err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)。 """ headers = { **ELSEVIER_HEADERS, "Content-Type": "application/json", "Accept": "application/json", } papers = [] offset = 0 with requests.Session() as req: while offset < max_results: body = { "qs": qs_text, "loadedAfter": loaded_after, "display": {"offset": offset, "show": show, "sortBy": "date"}, } try: res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body, headers=headers, timeout=(3, 30)) except requests.RequestException as e: return papers, f"elsevier_search_request_error: {e}" if res.status_code in (401, 403): return papers, f"elsevier_search_no_entitlement: {res.status_code}" if res.status_code != 200: return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}" results = (res.json() or {}).get("results") or [] # 空结果时 SD 可能返回单个 {"error": ...},一并视为结束 if not results or (len(results) == 1 and results[0].get("error")): break for r in results: paper = _build_paper_from_sd_result(r, qs_text) if paper is not None: papers.append(paper) if len(results) < show: break offset += show return papers, "" @shared_task(base=CustomTask) def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200): """ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。 遍历库里已有的纯文本查询(o_search,OpenAlex 的 keyword id 无法用于 SD, 故只用 o_search),用 loadedAfter 拉取最近 days 天 Elsevier 新入库的期刊文章。 只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex 对 Elsevier 新刊收录的延迟。由 django-celery-beat 每天调度一次。 """ loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ") qs_texts = [ s.strip() for s in Paper.objects.exclude(o_search__isnull=True) .exclude(o_search="").values_list("o_search", flat=True).distinct() if s and s.strip() ] before = Paper.objects.count() msgs = [] for qs_text in qs_texts: papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max) if papers: Paper.objects.bulk_create(papers, ignore_conflicts=True) if err: msgs.append(f"{qs_text}: {err}") # key 没 Search 权限,后续 query 也没意义,直接停 if "no_entitlement" in err: break new_papers = Paper.objects.count() - before return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, " f"since={loaded_after}, errs={msgs}") # 常用的 User-Agent 列表 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15", ] def get_random_headers(): """获取随机的请求头""" return { "User-Agent": random.choice(USER_AGENTS), "Accept": "application/pdf, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Referer": "https://www.google.com/", } def show_task_run(def_name: str): return cache.get(def_name, True) @shared_task(base=CustomTask) def get_pdf_from_openalex(number_of_task: int =10): def_name = get_pdf_from_openalex.name if not show_task_run(def_name): return "stoped" count = 0 qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude( fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task] if not qs.exists(): return "done" msg = "" for paper in qs: if not show_task_run(def_name): break paper.fetch("downloading") msg = save_pdf_from_openalex(paper) paper.fetch_end() if paper.has_fulltext_pdf: count += 1 if cache.get("openalex_api_exceed"): break countdown = 2 if cache.get("openalex_api_exceed"): countdown = 30 * 60 # 30分钟后重试 if show_task_run(def_name): current_app.send_task( "apps.resm.tasks.get_pdf_from_openalex", kwargs={ "number_of_task": number_of_task, }, countdown=countdown, ) return msg, count @shared_task(base=CustomTask) def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True): def_name = get_abstract_from_elsevier.name if not show_task_run(def_name): return "stoped" # qs = Paper.objects.filter(Q(has_abstract=False)|Q(has_fulltext_xml=False)) qs = Paper.objects.filter(has_abstract=False) # qs = qs.exclude( # fail_reason__contains="elsevier_doi_not_found" # ).exclude(fail_reason__contains="elsevier_abstract_not_found" # ) if exclude_failed: qs = qs.filter(fail_reason=None) else: qs = qs.exclude(fail_reason__contains="elsevier_") qs = qs.exclude(fetch_status="downloading" ).filter(doi__startswith="10.1016").order_by("?") if not qs.exists(): return "done" params = { "httpAccept": "text/xml" } err_msg = "" count_abs = 0 count_fulltext = 0 with requests.Session() as req: for paper in qs[:number_of_task]: if not show_task_run(def_name): break if paper.fetch_status == "downloading": continue paper.fetch(status="downloading") try: try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, headers = ELSEVIER_HEADERS, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200: xml_str = res.text try: root = etree.fromstring(xml_str.encode("utf-8")) except etree.XMLSyntaxError: paper.save_fail_reason("elsevier_xml_error") continue ns = {"dc": "http://purl.org/dc/elements/1.1/", "ce": "http://www.elsevier.com/xml/common/dtd", "xocs": "http://www.elsevier.com/xml/xocs/dtd",} abstract = root.xpath("//dc:description/text()", namespaces=ns) if abstract: PaperAbstract.objects.update_or_create( paper=paper, defaults={ "abstract": abstract[0].strip(), "source": "elsevier" } ) paper.has_abstract = True paper.has_abstract_xml = True count_abs += 1 else: paper.save_fail_reason("elsevier_abstract_not_found") continue paras = root.xpath("//ce:para", namespaces=ns) has_fulltext = len(paras) > 0 if has_fulltext is False: rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns) if rawtexts and len(rawtexts[0].strip()) > 2000: has_fulltext = True if has_fulltext: paper.has_fulltext = True paper.has_fulltext_xml = True count_fulltext += 1 paper.save_file_xml(xml_str) paper.save(update_fields=["has_abstract", "has_abstract_xml", "has_fulltext", "has_fulltext_xml", "update_time"]) elif res.status_code == 404: paper.save_fail_reason("elsevier_doi_not_found") else: err_msg = f"elsevier_response_error: {res.status_code} {res.text}" paper.save_fail_reason(f"elsevier_response_error: {res.status_code}") finally: paper.fetch_end() if show_task_run(def_name): current_app.send_task( "apps.resm.tasks.get_abstract_from_elsevier", kwargs={ "number_of_task": number_of_task, "exclude_failed": exclude_failed }, countdown=5, ) return f'{err_msg}, get {count_abs} abstracts, {count_fulltext} fulltexts' @shared_task(base=CustomTask) def get_pdf_from_elsevier(number_of_task=100): """ 获取elsevier全文 """ def_name = get_pdf_from_elsevier.name if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True) err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not show_task_run(def_name): break params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "application/pdf" } try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200: # 检查是否是PDF文件:检查魔数 %PDF 或 content-type is_pdf = ( res.content.startswith(b'%PDF') or res.headers.get("content-type", "").startswith("application/pdf") ) if is_pdf and len(res.content) > 1024: # 至少1KB paper.save_file_pdf(res.content) paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext_pdf", "update_time"]) qs_count = qs.count() if show_task_run(def_name) and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_pdf_from_elsevier", kwargs={ "number_of_task": number_of_task, }, countdown=5, ) return f'{def_name}, {err_msg}, remaining {qs_count} papers' def get_actual_running_count(): """获取实际在下载的任务数""" return Paper.objects.filter(fetch_status='downloading').count() def can_send_more(max_running): return get_actual_running_count() < max_running @shared_task(base=CustomTask) def send_download_fulltext_task(number_of_task=100): qs = Paper.objects.filter(has_fulltext=False, fail_reason=None, is_oa=True).exclude( fetch_status='downloading' ) if not qs.exists(): return "done" qs0 = qs.order_by("?") # 分批发送任务,控制并发数量,避免过多请求 task_count = 0 for paper in qs0[:number_of_task]: if not can_send_more(number_of_task): break # 使用 countdown 错开请求时间,避免过多并发 countdown = task_count * 1 # 每个任务间隔1秒 current_app.send_task( "apps.resm.tasks.download_pdf", kwargs={ "paper_id": paper.id, }, countdown=countdown, ) task_count += 1 return f"sent {task_count} download_pdf tasks" @shared_task(base=CustomTask) def release_working_paper(minutes=10): qs = Paper.objects.filter(fetch_status="downloading") count = 0 for paper in qs: if paper.update_time < timezone.now() - timedelta(minutes=minutes): paper.fetch_end() count += 1 return f"release {count} papers" @shared_task(base=CustomTask) def download_pdf(paper_id): """ 下载单个论文的PDF """ try: paper = Paper.objects.get(id=paper_id) if paper.fetch_status == "downloading": return paper.fetch("downloading") msg = "no_method_to_get_pdf" current_from = "" if paper.oa_url: if "https://doi.org/10.1016" in paper.oa_url: current_from = "elsevier" msg = save_pdf_from_elsevier(paper) else: current_from = "oa_url" msg = save_pdf_from_oa_url(paper) if paper.has_fulltext_pdf is False and paper.publication_year <= 2021: current_from = "scihub" msg = save_pdf_from_scihub(paper) # if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None: # current_from = "openalex" # msg = save_pdf_from_openalex(paper) if paper.fail_reason is None and paper.has_fulltext_pdf is False: paper.save_fail_reason(msg) return msg, current_from finally: paper.fetch_end() def save_pdf_from_oa_url(paper: Paper): from .d_oaurl import download_pdf_with_curl_cffi, download_from_url_playwright # 策略1: 直接请求 try: headers = get_random_headers() res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15)) except requests.RequestException as e: paper.save_fail_reason("oa_url_request_error") return f"oa_url_request_error: {str(e)}" if res.status_code in [200, 201, 202]: # 检查是否是PDF文件:检查魔数 %PDF 或 content-type is_pdf = ( res.content.startswith(b'%PDF') or res.headers.get("content-type", "").startswith("application/pdf") or res.headers.get("content-type", "") == "application/octet-stream" ) if is_pdf and len(res.content) > 1024: # 至少1KB paper.save_file_pdf(res.content, save_obj=True) return "success" else: paper.save_fail_reason("oa_url_not_pdf") return "oa_url_not_pdf" # 策略2: curl-cffi(处理 Cloudflare JS 挑战) paper_path = paper.init_paper_path("pdf") is_ok, err_msg = download_pdf_with_curl_cffi(paper.oa_url, paper_path) if is_ok: paper.has_fulltext = True paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"]) return "success" # 策略3: Playwright(最终回退) is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path)) if is_ok: paper.has_fulltext = True paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"]) return "success" paper.save_fail_reason(f"oa_url_all_methods_failed: {err_msg}") return f"oa_url_all_methods_failed: {err_msg}" def save_pdf_from_openalex(paper:Paper): if cache.get("openalex_api_exceed"): return "find cache Insufficient credits" # 尝试openalex下载 try: res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf", params={ "api_key": OPENALEX_KEY }) except requests.RequestException as e: return f"openalex_pdf_error: {str(e)}" if res.status_code == 200: paper.save_file_pdf(res.content, save_obj=True) return "success" elif res.status_code == 429: if "Insufficient credits" in res.json().get("message", ""): cache.set("openalex_api_exceed", True, timeout=3600) return "openalex_pdf_error: Insufficient credits" elif res.status_code == 404: paper.save_fail_reason("openalex_pdf_not_found") return "openalex_pdf_error: 404" else: paper.save_fail_reason("openalex_pdf_error") return f"openalex_pdf_error: {res.status_code} {res.text}" def save_pdf_from_elsevier(paper:Paper): params = { "httpAccept": "application/pdf", } try: res = requests.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, headers=ELSEVIER_HEADERS, timeout=(3, 15) ) except requests.RequestException as e: return f"elsevier_request_error: {str(e)}" if res.status_code == 200: paper.save_file_pdf(res.content, save_obj=True) return "success" else: return f"elsevier_status_error: {res.status_code} {res.text}" def save_pdf_from_scihub(paper:Paper): from .d_scihub import download_paper_by_doi is_ok, err_msg = download_paper_by_doi(paper.doi, paper.init_paper_path("pdf")) if is_ok: paper.has_fulltext = True paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext", "has_fulltext_pdf"]) return "success" else: paper.save_fail_reason(err_msg) return err_msg # https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true