paper_server/apps/resm/tasks.py

266 lines
10 KiB
Python

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from celery import current_app
from datetime import datetime
config.email = "caoqianming@foxmail.com"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
cache_cursor = cache.get(cache_key, "*")
if keywords or search:
pass
else:
raise Exception("keywords or search must be provided")
# filter=keywords.id:clinker|cement
pager = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
if keywords:
if "|" in keywords:
keywords_list = keywords.split("|")
else:
keywords_list = [keywords]
pager = pager.filter(
keywords={"id": keywords_list}
)
if search:
pager = pager.filter(
search=search
)
pager = pager.select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = []
for record in page:
if record["doi"] and (record["display_name"] or record["title"]):
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None)
if end_year is None:
end_year = datetime.now().year
if publication_year + 1 <= end_year:
current_app.send_task(
"apps.resm.tasks.get_paper_meta_from_openalex",
kwargs={
"publication_year": publication_year + 1,
"keywords": keywords,
"search": search,
"end_year": end_year
},
countdown=5
)
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
def show_task_run(def_name: str):
return cache.get(def_name, True)
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 100):
def_name = get_abstract_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_abstract=False)
if publication_year is not None:
qs = qs.filter(publication_year=publication_year)
qs = qs.exclude(
fail_reason__contains="elsevier_doi_not_found"
).exclude(fail_reason__contains="elsevier_abstract_not_found").order_by("publication_date")
if not qs.exists():
return "done"
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "text/xml"
}
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd",}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
paper.fetch_status = "abstract_ready"
else:
paper.save_fail_reason("elsevier_abstract_not_found")
continue
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.fetch_status = "fulltext_ready"
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract",
"has_abstract_xml", "has_fulltext",
"has_fulltext_xml", "update_time", "fetch_status"])
elif res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"publication_year": publication_year,
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
@shared_task(base=CustomTask)
def get_pdf_from_elsevier(number_of_task=100):
"""
获取elsevier全文
"""
def_name = get_pdf_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False)
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "application/pdf"
}
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200 and res.headers["content-type"] == "application/pdf":
paper.save_file_pdf(res.content)
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext_pdf", "update_time"])
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_pdf_from_elsevier",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
@shared_task(base=CustomTask)
def get_pdf_from_oa_url(number_of_task=100):
def_name = get_pdf_from_oa_url.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude(
fail_reason__contains="oa_url_request_error"
).exclude(fail_reason__contains="oa_url_not_pdf")
err_msg = ""
for paper in qs[:number_of_task]:
if paper.oa_url:
try:
res = requests.get(paper.oa_url, timeout=(3, 15))
except requests.RequestException:
paper.save_fail_reason("oa_url_request_error")
continue
if res.status_code == 200 and res.headers["content-type"] == "application/pdf":
paper.save_file_pdf(res.content)
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.fetch_status = "fulltext_ready"
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
else:
paper.save_fail_reason("oa_url_not_pdf")
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_pdf_from_oa_url",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'