paper_server/apps/resm/tasks.py

385 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from celery import current_app
from datetime import datetime
import random
import time
# config.email = "caoqianming@foxmail.com"
config.email = "caoqianming@ctc.ac.cn"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
# OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd"
OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp"
config.api_key = OPENALEX_KEY
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
cache_cursor = cache.get(cache_key, "*")
if keywords or search:
pass
else:
raise Exception("keywords or search must be provided")
# filter=keywords.id:clinker|cement
pager = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
if keywords:
if "|" in keywords:
keywords_list = keywords.split("|")
else:
keywords_list = [keywords]
pager = pager.filter(
keywords={"id": keywords_list}
)
if search:
pager = pager.filter(
search=search
)
pager = pager.select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = []
for record in page:
if record["doi"] and (record["display_name"] or record["title"]):
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None)
if end_year is None:
end_year = datetime.now().year
if publication_year + 1 <= end_year:
current_app.send_task(
"apps.resm.tasks.get_paper_meta_from_openalex",
kwargs={
"publication_year": publication_year + 1,
"keywords": keywords,
"search": search,
"end_year": end_year
},
countdown=5
)
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
# 常用的 User-Agent 列表
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15",
]
def get_random_headers():
"""获取随机的请求头"""
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "application/pdf, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.google.com/",
}
def show_task_run(def_name: str):
return cache.get(def_name, True)
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(number_of_task:int = 20):
def_name = get_abstract_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_abstract=False)
qs = qs.exclude(
fail_reason__contains="elsevier_doi_not_found"
).exclude(fail_reason__contains="elsevier_abstract_not_found"
).exclude(fetch_status="downloading").order_by("publication_date")
if not qs.exists():
return "done"
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "text/xml"
}
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
original_status = paper.fetch_status
if original_status == "downloading":
continue
paper.fetch_status = "downloading"
paper.save(update_fields=["fetch_status", "update_time"])
try:
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd",}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
paper.fetch_status = "abstract_ready"
else:
paper.save_fail_reason("elsevier_abstract_not_found")
continue
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.fetch_status = "fulltext_ready"
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract",
"has_abstract_xml", "has_fulltext",
"has_fulltext_xml", "update_time", "fetch_status"])
elif res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
finally:
if paper.fetch_status == "downloading":
paper.fetch_status = original_status
paper.save(update_fields=["fetch_status", "update_time"])
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
@shared_task(base=CustomTask)
def get_pdf_from_elsevier(number_of_task=100):
"""
获取elsevier全文
"""
def_name = get_pdf_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True)
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "application/pdf"
}
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf")
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content)
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext_pdf", "update_time"])
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_pdf_from_elsevier",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
def get_actual_running_count():
"""获取实际在下载的任务数"""
return Paper.objects.filter(fetch_status='downloading').count()
def can_send_more(max_running):
return get_actual_running_count() < max_running
@shared_task(base=CustomTask)
def send_download_fulltext_task(number_of_task=100):
qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None).exclude(
fetch_status='downloading'
)
if not qs.exists():
return "done"
qs0 = qs.order_by("?")
# 分批发送任务,控制并发数量,避免过多请求
task_count = 0
for paper in qs0[:number_of_task]:
if not can_send_more(number_of_task):
break
if paper.oa_url:
# 使用 countdown 错开请求时间,避免过多并发
countdown = task_count * 1 # 每个任务间隔1秒
current_app.send_task(
"apps.resm.tasks.download_pdf",
kwargs={
"paper_id": paper.id,
},
countdown=countdown,
)
task_count += 1
return f"sent {task_count} download_pdf tasks"
@shared_task(base=CustomTask)
def download_pdf(paper_id):
"""
下载单个论文的PDF
"""
paper = None
original_status = None
try:
paper = Paper.objects.get(id=paper_id)
original_status = paper.fetch_status
if original_status == "downloading":
return f"paper {paper_id} is already downloading"
# 将状态改为downloading
paper.fetch_status = 'downloading'
paper.save(update_fields=['fetch_status', 'update_time'])
msg = "没有下载渠道"
if paper.oa_url:
current_from = "oa_url"
msg = save_pdf_from_oa_url(paper)
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
# current_from = "openalex"
# msg = save_pdf_from_openalex(paper)
return msg, current_from
finally:
if paper.fetch_status == "downloading":
paper.fetch_status = original_status
paper.save(update_fields=['fetch_status', 'update_time'])
def save_pdf_from_oa_url(paper:Paper):
try:
headers = get_random_headers()
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
except requests.RequestException as e:
paper.save_fail_reason("oa_url_request_error")
return f"oa_url_request_error: {str(e)}"
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf") or
res.headers.get("content-type", "") == "application/octet-stream"
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content)
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.fetch_status = "fulltext_ready"
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
return "success"
def save_pdf_from_openalex(paper:Paper):
if cache.get("openalex_api_exceed"):
return "find cache Insufficient credits"
# 尝试openalex下载
try:
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
params={
"api_key": OPENALEX_KEY
})
except requests.RequestException as e:
paper.save_fail_reason("openalex_pdf_error")
return f"openalex_pdf_error: {str(e)}"
if res.status_code == 200:
paper.save_file_pdf(res.content)
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.fetch_status = "fulltext_ready"
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
return "success"
elif res.status_code == 429:
if "Insufficient credits" in res.json().get("message", ""):
cache.set("openalex_api_exceed", True, timeout=3600)
return "Insufficient credits"
def save_pdf_from_scihub(paper:Paper):
pass
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true