# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from pyalex import Works, config from itertools import chain from apps.resm.models import Paper from apps.utils.snowflake import idWorker config.email = "caoqianming@foxmail.com" config.max_retries = 0 config.retry_backoff_factor = 0.1 config.retry_http_codes = [429, 500, 503] config.api_key = "4KJZdkCFA0uFb6IsYKc8cd" @shared_task(base=CustomTask) def get_paper_meta_from_openalex(publication_year:int, search_key:str): query = Works().filter( publication_year=publication_year, type="article" # 将 type 移到 filter 中 ).search(search_key).select([ "id", "doi", "title", "publication_date", "open_access", "authorships", "primary_location", "publication_year" ]) papers = [] for record in chain(*query.paginate(per_page=200)): if record["doi"]: paper = Paper() paper.id = idWorker.get_id() paper.type = "article" paper.openalex_id = record["id"].split("/")[-1] paper.doi = record["doi"].replace("https://doi.org/", "") paper.title = record["title"] paper.publication_date = record["publication_date"] paper.publication_year = record["publication_year"] if record["open_access"]: paper.is_oa = record["open_access"]["is_oa"] paper.oa_url = record["open_access"]["oa_url"] if record["authorships"]: paper.first_author = record["authorships"][0]["author"]["display_name"] if record["authorships"][0]["institutions"]: paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] if record["primary_location"] and record["primary_location"]["source"]: paper.publication_name = record["primary_location"]["source"]["display_name"] papers.append(paper) Paper.objects.bulk_create(papers, ignore_conflicts=True)