paper_server/apps/resm/tasks.py

178 lines
7.3 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from itertools import chain
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from django.conf import settings
import os
from celery import current_app
from datetime import datetime
config.email = "caoqianming@foxmail.com"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, search_key:str, end_year:int=None):
cache_key = f"openalex_cursor_{publication_year}_{search_key}"
cache_cursor = cache.get(cache_key, "*")
pager = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article" # 将 type 移到 filter 中
).search(search_key).select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = []
for record in page:
if record["doi"] and (record["display_name"] or record["title"]):
paper = Paper()
paper.id = idWorker.get_id()
paper.search_word_first = search_key
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None)
if end_year is None:
end_year = datetime.now().year
if publication_year + 1 <= end_year:
current_app.send_task(
"apps.resm.tasks.get_paper_meta_from_openalex",
kwargs={
"publication_year": publication_year + 1,
"search_key": search_key,
"end_year": end_year
},
countdown=5
)
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
def is_elsevier_abstract_task_enabled():
return cache.get("elsevier_abstract_task_enabled", True)
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 100):
if not is_elsevier_abstract_task_enabled():
return "stoped"
qs = Paper.objects.filter(has_abstract=False)
if publication_year is not None:
qs = qs.filter(publication_year=publication_year)
qs = qs.exclude(
fail_reason="elsevier_doi_not_found"
).exclude(fail_reason="elsevier_abstract_not_found").order_by("publication_date")
if not qs.exists():
return "done"
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "text/xml"
}
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not is_elsevier_abstract_task_enabled():
break
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.fail_reason = "elsevier_xml_error"
paper.save(update_fields=["fail_reason", "update_time"])
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd"}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
paper.fetch_status = "abstract_ready"
else:
paper.fail_reason = "elsevier_abstract_not_found"
paper.save(update_fields=["fail_reason", "update_time"])
continue
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.fetch_status = "fulltext_ready"
publication_date = paper.publication_date
paper_dir = os.path.join(
settings.BASE_DIR,
"media/papers",
str(publication_date.year),
str(publication_date.month),
str(publication_date.day)
)
os.makedirs(paper_dir, exist_ok=True)
safe_doi = paper.doi.replace("/", "_")
paper_file = os.path.join(paper_dir, f"{safe_doi}.xml")
with open(paper_file, "wb") as f:
f.write(xml_str.encode("utf-8"))
paper.save(update_fields=["has_abstract",
"has_abstract_xml", "has_fulltext",
"has_fulltext_xml", "update_time", "fetch_status", "fail_reason"])
elif res.status_code == 404:
paper.fail_reason = "elsevier_doi_not_found"
paper.save(update_fields=["fail_reason", "update_time"])
if is_elsevier_abstract_task_enabled():
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"publication_year": publication_year,
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{err_msg}, remaining {qs.count()} papers'