paper_server/apps/resm/tasks.py

155 lines
6.3 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from itertools import chain
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from django.conf import settings
import os
from celery import current_app
config.email = "caoqianming@foxmail.com"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, search_key:str, end_year:int=2026):
cache_key = f"openalex_cursor_{publication_year}_{search_key}"
cache_cursor = cache.get(cache_key, "*")
pager = Works().filter(
publication_year=publication_year,
type="article" # 将 type 移到 filter 中
).search(search_key).select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = []
for record in page:
if record["doi"] and (record["display_name"] or record["title"]):
paper = Paper()
paper.id = idWorker.get_id()
paper.search_word_first = search_key
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None)
if publication_year + 1 <= end_year:
get_paper_meta_from_openalex.delay(publication_year + 1, search_key, end_year)
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 100):
qs = Paper.objects.filter(has_abstract=False)
if publication_year is not None:
qs = qs.filter(publication_year=publication_year)
qs = qs.exclude(
fail_reason="elsevier_doi_not_found"
).order_by("publication_date")
if not qs.exists():
return "done"
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "text/xml"
}
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.fail_reason = "elsevier_xml_error"
paper.save(update_fields=["fail_reason"])
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd"}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
paper.fetch_status = "abstract_ready"
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.fetch_status = "fulltext_ready"
publication_date = paper.publication_date
paper_dir = os.path.join(
settings.BASE_DIR,
"media/papers",
str(publication_date.year),
str(publication_date.month),
str(publication_date.day)
)
os.makedirs(paper_dir, exist_ok=True)
safe_doi = paper.doi.replace("/", "_")
paper_file = os.path.join(paper_dir, f"{safe_doi}.xml")
with open(paper_file, "wb") as f:
f.write(xml_str.encode("utf-8"))
paper.save(update_fields=["has_abstract", "has_abstract_xml", "has_fulltext", "has_fulltext_xml", "update_time"])
elif res.status_code == 404:
paper.fail_reason = "elsevier_doi_not_found"
paper.save(update_fields=["fail_reason"])
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"publication_year": publication_year,
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{err_msg}, remaining {qs.count()} papers'