paper_server/apps/resm/tasks.py

53 lines
2.4 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from itertools import chain
from apps.resm.models import Paper
from apps.utils.snowflake import idWorker
from django.core.cache import cache
config.email = "caoqianming@foxmail.com"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, search_key:str):
cache_key = f"openalex_cursor_{publication_year}_{search_key}"
cache_cursor = cache.get(cache_key, "*")
pager = Works().filter(
publication_year=publication_year,
type="article" # 将 type 移到 filter 中
).search(search_key).select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = []
for record in page:
if record["doi"]:
paper = Paper()
paper.id = idWorker.get_id()
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None)