paper_server/apps/resm/tasks.py

366 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
from lxml import etree
from celery import current_app
from datetime import datetime
import random
config.email = "caoqianming@foxmail.com"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd"
config.api_key = OPENALEX_KEY
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
cache_cursor = cache.get(cache_key, "*")
if keywords or search:
pass
else:
raise Exception("keywords or search must be provided")
# filter=keywords.id:clinker|cement
pager = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
if keywords:
if "|" in keywords:
keywords_list = keywords.split("|")
else:
keywords_list = [keywords]
pager = pager.filter(
keywords={"id": keywords_list}
)
if search:
pager = pager.filter(
search=search
)
pager = pager.select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = []
for record in page:
if record["doi"] and (record["display_name"] or record["title"]):
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None)
if end_year is None:
end_year = datetime.now().year
if publication_year + 1 <= end_year:
current_app.send_task(
"apps.resm.tasks.get_paper_meta_from_openalex",
kwargs={
"publication_year": publication_year + 1,
"keywords": keywords,
"search": search,
"end_year": end_year
},
countdown=5
)
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
# 常用的 User-Agent 列表
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15",
]
def get_random_headers():
"""获取随机的请求头"""
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "application/pdf, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.google.com/",
}
def show_task_run(def_name: str):
return cache.get(def_name, True)
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 100):
def_name = get_abstract_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_abstract=False)
if publication_year is not None:
qs = qs.filter(publication_year=publication_year)
qs = qs.exclude(
fail_reason__contains="elsevier_doi_not_found"
).exclude(fail_reason__contains="elsevier_abstract_not_found").order_by("publication_date")
if not qs.exists():
return "done"
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "text/xml"
}
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd",}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
paper.fetch_status = "abstract_ready"
else:
paper.save_fail_reason("elsevier_abstract_not_found")
continue
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.fetch_status = "fulltext_ready"
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract",
"has_abstract_xml", "has_fulltext",
"has_fulltext_xml", "update_time", "fetch_status"])
elif res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"publication_year": publication_year,
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
@shared_task(base=CustomTask)
def get_pdf_from_elsevier(number_of_task=100):
"""
获取elsevier全文
"""
def_name = get_pdf_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True)
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "application/pdf"
}
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf")
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content)
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext_pdf", "update_time"])
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_pdf_from_elsevier",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
@shared_task(base=CustomTask)
def send_download_fulltext_task(number_of_task=100):
def_name = send_download_fulltext_task.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None)
if not qs.exists():
return "done"
qs0 = qs.order_by("?")
# 分批发送任务,控制并发数量,避免过多请求
task_count = 0
for paper in qs0[:number_of_task]:
if not show_task_run(def_name):
break
if paper.oa_url:
# 使用 countdown 错开请求时间,避免过多并发
countdown = task_count * 2 # 每个任务间隔2秒
current_app.send_task(
"apps.resm.tasks.download_pdf",
kwargs={
"paper_id": paper.id,
},
countdown=countdown,
)
task_count += 1
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
def_name,
kwargs={
"number_of_task": number_of_task,
},
countdown=60, # 等待当前批次完成后再继续
)
return f'{def_name}, sent {task_count} tasks, remaining {qs_count} papers'
@shared_task(base=CustomTask)
def download_pdf(paper_id):
"""
下载单个论文的PDF
"""
try:
paper = Paper.objects.get(id=paper_id)
except Paper.DoesNotExist:
return f"Paper {paper_id} not found"
# 检查缓存避免短时间内重复下载同一个paper
cache_key = f"download_pdf_{paper_id}"
if cache.get(cache_key):
return "already_processing"
# 设置处理中标记,防止并发重复处理
cache.set(cache_key, True, timeout=3600)
msg = save_pdf_from_oa_url(paper)
if paper.has_fulltext_pdf is False:
msg = save_pdf_from_openalex(paper)
return msg
def save_pdf_from_oa_url(paper:Paper):
try:
headers = get_random_headers()
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
except requests.RequestException as e:
paper.save_fail_reason("oa_url_request_error")
return f"oa_url_request_error: {str(e)}"
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf") or
res.headers.get("content-type", "") == "application/octet-stream"
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content)
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.fetch_status = "fulltext_ready"
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
return "success"
def save_pdf_from_openalex(paper:Paper):
# 尝试openalex下载
try:
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
params={
"api_key": OPENALEX_KEY
})
except requests.RequestException as e:
paper.save_fail_reason("openalex_pdf_error")
return f"openalex_pdf_error: {str(e)}"
if res.status_code == 200:
paper.save_file_pdf(res.content)
paper.has_fulltext = True
paper.has_fulltext_pdf = True
paper.fetch_status = "fulltext_ready"
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
return "success"
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true