# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from pyalex import Works, config from itertools import chain from apps.resm.models import Paper, PaperAbstract from apps.utils.snowflake import idWorker from django.core.cache import cache import requests from lxml import etree from django.conf import settings import os from celery import current_app from datetime import datetime config.email = "caoqianming@foxmail.com" config.max_retries = 0 config.retry_backoff_factor = 0.1 config.retry_http_codes = [429, 500, 503] config.api_key = "4KJZdkCFA0uFb6IsYKc8cd" @shared_task(base=CustomTask) def get_paper_meta_from_openalex(publication_year:int, search_key:str, end_year:int=None): cache_key = f"openalex_cursor_{publication_year}_{search_key}" cache_cursor = cache.get(cache_key, "*") pager = Works().filter( publication_year=publication_year, has_doi=True, type="article" # 将 type 移到 filter 中 ).search(search_key).select([ "id", "doi", "title", "publication_date", "open_access", "authorships", "primary_location", "publication_year", "display_name" ]).paginate(per_page=200, n_max=None, cursor=cache_cursor) next_cursor = pager._next_value for page in pager: papers = [] for record in page: if record["doi"] and (record["display_name"] or record["title"]): paper = Paper() paper.id = idWorker.get_id() paper.search_word_first = search_key paper.source = "openalex" paper.type = "article" paper.openalex_id = record["id"].split("/")[-1] paper.doi = record["doi"].replace("https://doi.org/", "") paper.title = record["display_name"] paper.publication_date = record["publication_date"] paper.publication_year = record["publication_year"] if record["open_access"]: paper.is_oa = record["open_access"]["is_oa"] paper.oa_url = record["open_access"]["oa_url"] if record["authorships"]: paper.first_author = record["authorships"][0]["author"]["display_name"] if record["authorships"][0]["institutions"]: paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] if record["primary_location"] and record["primary_location"]["source"]: paper.publication_name = record["primary_location"]["source"]["display_name"] papers.append(paper) Paper.objects.bulk_create(papers, ignore_conflicts=True) cache.set(cache_key, next_cursor, timeout=None) if end_year is None: end_year = datetime.now().year if publication_year + 1 <= end_year: current_app.send_task( "apps.resm.tasks.get_paper_meta_from_openalex", kwargs={ "publication_year": publication_year + 1, "search_key": search_key, "end_year": end_year }, countdown=5 ) ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf' def is_elsevier_abstract_task_enabled(): return cache.get("elsevier_abstract_task_enabled", True) @shared_task(base=CustomTask) def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 100): if not is_elsevier_abstract_task_enabled(): return "stoped" qs = Paper.objects.filter(has_abstract=False) if publication_year is not None: qs = qs.filter(publication_year=publication_year) qs = qs.exclude( fail_reason="elsevier_doi_not_found" ).exclude(fail_reason="elsevier_abstract_not_found").order_by("publication_date") if not qs.exists(): return "done" params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "text/xml" } err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not is_elsevier_abstract_task_enabled(): break try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200: xml_str = res.text try: root = etree.fromstring(xml_str.encode("utf-8")) except etree.XMLSyntaxError: paper.fail_reason = "elsevier_xml_error" paper.save(update_fields=["fail_reason", "update_time"]) continue ns = {"dc": "http://purl.org/dc/elements/1.1/", "ce": "http://www.elsevier.com/xml/common/dtd", "xocs": "http://www.elsevier.com/xml/xocs/dtd",} abstract = root.xpath("//dc:description/text()", namespaces=ns) if abstract: PaperAbstract.objects.update_or_create( paper=paper, defaults={ "abstract": abstract[0].strip(), "source": "elsevier" } ) paper.has_abstract = True paper.has_abstract_xml = True paper.fetch_status = "abstract_ready" else: paper.fail_reason = "elsevier_abstract_not_found" paper.save(update_fields=["fail_reason", "update_time"]) continue paras = root.xpath("//ce:para", namespaces=ns) has_fulltext = len(paras) > 0 if has_fulltext is False: rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns) if rawtexts and len(rawtexts[0].strip()) > 2000: has_fulltext = True if has_fulltext: paper.has_fulltext = True paper.has_fulltext_xml = True paper.fetch_status = "fulltext_ready" paper.save_file_xml(xml_str) paper.save(update_fields=["has_abstract", "has_abstract_xml", "has_fulltext", "has_fulltext_xml", "update_time", "fetch_status", "fail_reason"]) elif res.status_code == 404: paper.fail_reason = "elsevier_doi_not_found" paper.save(update_fields=["fail_reason", "update_time"]) qs_count = qs.count() if is_elsevier_abstract_task_enabled() and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_abstract_from_elsevier", kwargs={ "publication_year": publication_year, "number_of_task": number_of_task, }, countdown=5, ) return f'{err_msg}, remaining {qs_count} papers' def is_elsevier_pdf_task_enabled(): return cache.get("elsevier_pdf_task_enabled", True) @shared_task(base=CustomTask) def get_pdf_from_elsevier(number_of_task=100): """ 获取elsevier全文 """ if not is_elsevier_pdf_task_enabled(): return "stoped" qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False) err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not is_elsevier_pdf_task_enabled(): break params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "application/pdf" } try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200 and res.headers["content-type"] == "application/pdf": paper.save_file_pdf(res.content) paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext_pdf", "update_time"]) qs_count = qs.count() if is_elsevier_pdf_task_enabled() and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_pdf_from_elsevier", kwargs={ "number_of_task": number_of_task, }, countdown=5, ) return f'{err_msg}, remaining {qs_count} papers'