# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from pyalex import Works, config from apps.resm.models import Paper, PaperAbstract from apps.utils.snowflake import idWorker from django.core.cache import cache import requests from lxml import etree from celery import current_app from datetime import datetime config.email = "caoqianming@foxmail.com" config.max_retries = 0 config.retry_backoff_factor = 0.1 config.retry_http_codes = [429, 500, 503] config.api_key = "4KJZdkCFA0uFb6IsYKc8cd" @shared_task(base=CustomTask) def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" cache_cursor = cache.get(cache_key, "*") if keywords or search: pass else: raise Exception("keywords or search must be provided") # filter=keywords.id:clinker|cement pager = Works().filter( publication_year=publication_year, has_doi=True, type="article" ) if keywords: if "|" in keywords: keywords_list = keywords.split("|") else: keywords_list = [keywords] pager = pager.filter( keywords={"id": keywords_list} ) if search: pager = pager.filter( search=search ) pager = pager.select([ "id", "doi", "title", "publication_date", "open_access", "authorships", "primary_location", "publication_year", "display_name", "content_urls" ]).paginate(per_page=200, n_max=None, cursor=cache_cursor) next_cursor = pager._next_value for page in pager: papers = [] for record in page: if record["doi"] and (record["display_name"] or record["title"]): paper = Paper() paper.id = idWorker.get_id() paper.o_keywords = keywords paper.o_search = search paper.source = "openalex" paper.type = "article" paper.openalex_id = record["id"].split("/")[-1] paper.doi = record["doi"].replace("https://doi.org/", "") paper.title = record["display_name"] paper.publication_date = record["publication_date"] paper.publication_year = record["publication_year"] if record["open_access"]: paper.is_oa = record["open_access"]["is_oa"] paper.oa_url = record["open_access"]["oa_url"] if record["authorships"]: paper.first_author = record["authorships"][0]["author"]["display_name"] if record["authorships"][0]["institutions"]: paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] if record["primary_location"] and record["primary_location"]["source"]: paper.publication_name = record["primary_location"]["source"]["display_name"] papers.append(paper) Paper.objects.bulk_create(papers, ignore_conflicts=True) cache.set(cache_key, next_cursor, timeout=None) if end_year is None: end_year = datetime.now().year if publication_year + 1 <= end_year: current_app.send_task( "apps.resm.tasks.get_paper_meta_from_openalex", kwargs={ "publication_year": publication_year + 1, "keywords": keywords, "search": search, "end_year": end_year }, countdown=5 ) ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf' def show_task_run(def_name: str): return cache.get(def_name, True) @shared_task(base=CustomTask) def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 100): def_name = get_abstract_from_elsevier.__name__ if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(has_abstract=False) if publication_year is not None: qs = qs.filter(publication_year=publication_year) qs = qs.exclude( fail_reason__contains="elsevier_doi_not_found" ).exclude(fail_reason__contains="elsevier_abstract_not_found").order_by("publication_date") if not qs.exists(): return "done" params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "text/xml" } err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not show_task_run(def_name): break try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200: xml_str = res.text try: root = etree.fromstring(xml_str.encode("utf-8")) except etree.XMLSyntaxError: paper.save_fail_reason("elsevier_xml_error") continue ns = {"dc": "http://purl.org/dc/elements/1.1/", "ce": "http://www.elsevier.com/xml/common/dtd", "xocs": "http://www.elsevier.com/xml/xocs/dtd",} abstract = root.xpath("//dc:description/text()", namespaces=ns) if abstract: PaperAbstract.objects.update_or_create( paper=paper, defaults={ "abstract": abstract[0].strip(), "source": "elsevier" } ) paper.has_abstract = True paper.has_abstract_xml = True paper.fetch_status = "abstract_ready" else: paper.save_fail_reason("elsevier_abstract_not_found") continue paras = root.xpath("//ce:para", namespaces=ns) has_fulltext = len(paras) > 0 if has_fulltext is False: rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns) if rawtexts and len(rawtexts[0].strip()) > 2000: has_fulltext = True if has_fulltext: paper.has_fulltext = True paper.has_fulltext_xml = True paper.fetch_status = "fulltext_ready" paper.save_file_xml(xml_str) paper.save(update_fields=["has_abstract", "has_abstract_xml", "has_fulltext", "has_fulltext_xml", "update_time", "fetch_status"]) elif res.status_code == 404: paper.save_fail_reason("elsevier_doi_not_found") qs_count = qs.count() if show_task_run(def_name) and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_abstract_from_elsevier", kwargs={ "publication_year": publication_year, "number_of_task": number_of_task, }, countdown=5, ) return f'{err_msg}, remaining {qs_count} papers' @shared_task(base=CustomTask) def get_pdf_from_elsevier(number_of_task=100): """ 获取elsevier全文 """ def_name = get_pdf_from_elsevier.__name__ if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False) err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not show_task_run(def_name): break params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "application/pdf" } try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200 and res.headers["content-type"] == "application/pdf": paper.save_file_pdf(res.content) paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext_pdf", "update_time"]) qs_count = qs.count() if show_task_run(def_name) and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_pdf_from_elsevier", kwargs={ "number_of_task": number_of_task, }, countdown=5, ) return f'{err_msg}, remaining {qs_count} papers' @shared_task(base=CustomTask) def get_pdf_from_oa_url(number_of_task=100): def_name = get_pdf_from_oa_url.__name__ if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude( fail_reason__contains="oa_url_request_error" ).exclude(fail_reason__contains="oa_url_not_pdf") err_msg = "" for paper in qs[:number_of_task]: if paper.oa_url: try: res = requests.get(paper.oa_url, timeout=(3, 15)) except requests.RequestException: paper.save_fail_reason("oa_url_request_error") continue if res.status_code == 200 and res.headers["content-type"] == "application/pdf": paper.save_file_pdf(res.content) paper.has_fulltext = True paper.has_fulltext_pdf = True paper.fetch_status = "fulltext_ready" paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) else: paper.save_fail_reason("oa_url_not_pdf") qs_count = qs.count() if show_task_run(def_name) and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_pdf_from_oa_url", kwargs={ "number_of_task": number_of_task, }, countdown=5, ) return f'{err_msg}, remaining {qs_count} papers'