503 lines
19 KiB
Python
503 lines
19 KiB
Python
# Create your tasks here
|
||
from __future__ import absolute_import, unicode_literals
|
||
from apps.utils.tasks import CustomTask
|
||
from celery import shared_task
|
||
from pyalex import Works, config
|
||
from apps.resm.models import Paper, PaperAbstract
|
||
from apps.utils.snowflake import idWorker
|
||
from django.core.cache import cache
|
||
import requests
|
||
from lxml import etree
|
||
from celery import current_app
|
||
from datetime import datetime, timedelta
|
||
import random
|
||
from .d_oaurl import download_from_url_playwright
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
from django.db.models import Q
|
||
from django.utils import timezone
|
||
|
||
# config.email = "caoqianming@foxmail.com"
|
||
config.email = "caoqianming@ctc.ac.cn"
|
||
config.max_retries = 0
|
||
config.retry_backoff_factor = 0.1
|
||
config.retry_http_codes = [429, 500, 503]
|
||
# OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd"
|
||
OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp"
|
||
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
|
||
|
||
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
|
||
ELSEVIER_HEADERS = {
|
||
"X-ELS-Insttoken": "135fa874aea9f0de11cad187ccb4878c",
|
||
"X-ELS-APIKey": ELSEVIER_APIKEY,
|
||
}
|
||
|
||
|
||
def run_async(coro):
|
||
"""
|
||
跨平台运行异步任务,解决 Windows 上 asyncio subprocess 问题
|
||
"""
|
||
if sys.platform == 'win32':
|
||
# Windows 上需要使用 ProactorEventLoop 来支持 subprocess
|
||
policy = asyncio.WindowsProactorEventLoopPolicy()
|
||
asyncio.set_event_loop_policy(policy)
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
return loop.run_until_complete(coro)
|
||
finally:
|
||
loop.close()
|
||
else:
|
||
# Unix/Linux/Mac 上使用默认方式
|
||
return asyncio.run(coro)
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
|
||
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
|
||
cache_cursor = cache.get(cache_key, "*")
|
||
if keywords or search:
|
||
pass
|
||
else:
|
||
raise Exception("keywords or search must be provided")
|
||
# filter=keywords.id:clinker|cement
|
||
pager = Works().filter(
|
||
publication_year=publication_year,
|
||
has_doi=True,
|
||
type="article"
|
||
)
|
||
if keywords:
|
||
# 支持 '|' 表示 OR,',' 表示 AND。去除空白项。
|
||
if "|" in keywords:
|
||
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
|
||
pager = pager.filter_or(keywords={"id": keywords_list})
|
||
elif "," in keywords:
|
||
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
|
||
pager = pager.filter(keywords={"id": keywords_list})
|
||
else:
|
||
keywords_list = [keywords.strip()]
|
||
pager = pager.filter(keywords={"id": keywords_list})
|
||
if search:
|
||
pager = pager.search_filter(title_and_abstract=search)
|
||
pager = pager.select([
|
||
"id", "doi", "title", "publication_date",
|
||
"open_access", "authorships", "primary_location", "publication_year",
|
||
"display_name", "content_urls"
|
||
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
|
||
next_cursor = pager._next_value
|
||
for page in pager:
|
||
papers = []
|
||
for record in page:
|
||
if record["doi"] and (record["display_name"] or record["title"]):
|
||
paper = Paper()
|
||
paper.id = idWorker.get_id()
|
||
paper.o_keywords = keywords
|
||
paper.o_search = search
|
||
paper.source = "openalex"
|
||
paper.type = "article"
|
||
paper.openalex_id = record["id"].split("/")[-1]
|
||
paper.doi = record["doi"].replace("https://doi.org/", "")
|
||
paper.title = record["display_name"]
|
||
paper.publication_date = record["publication_date"]
|
||
paper.publication_year = record["publication_year"]
|
||
if record["open_access"]:
|
||
paper.is_oa = record["open_access"]["is_oa"]
|
||
paper.oa_url = record["open_access"]["oa_url"]
|
||
if record["authorships"]:
|
||
paper.first_author = record["authorships"][0]["author"]["display_name"]
|
||
if record["authorships"][0]["institutions"]:
|
||
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
|
||
if record["primary_location"] and record["primary_location"]["source"]:
|
||
paper.publication_name = record["primary_location"]["source"]["display_name"]
|
||
papers.append(paper)
|
||
Paper.objects.bulk_create(papers, ignore_conflicts=True)
|
||
cache.set(cache_key, next_cursor, timeout=None)
|
||
if end_year is None:
|
||
end_year = datetime.now().year
|
||
if publication_year + 1 <= end_year:
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_paper_meta_from_openalex",
|
||
kwargs={
|
||
"publication_year": publication_year + 1,
|
||
"keywords": keywords,
|
||
"search": search,
|
||
"end_year": end_year
|
||
},
|
||
countdown=5
|
||
)
|
||
|
||
# 常用的 User-Agent 列表
|
||
USER_AGENTS = [
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15",
|
||
]
|
||
|
||
def get_random_headers():
|
||
"""获取随机的请求头"""
|
||
return {
|
||
"User-Agent": random.choice(USER_AGENTS),
|
||
"Accept": "application/pdf, */*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
"Referer": "https://www.google.com/",
|
||
}
|
||
|
||
def show_task_run(def_name: str):
|
||
return cache.get(def_name, True)
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_pdf_from_openalex(number_of_task: int =10):
|
||
def_name = get_pdf_from_openalex.name
|
||
if not show_task_run(def_name):
|
||
return "stoped"
|
||
count = 0
|
||
qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude(
|
||
fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task]
|
||
if not qs.exists():
|
||
return "done"
|
||
msg = ""
|
||
for paper in qs:
|
||
if not show_task_run(def_name):
|
||
break
|
||
paper.fetch("downloading")
|
||
msg = save_pdf_from_openalex(paper)
|
||
paper.fetch_end()
|
||
if paper.has_fulltext_pdf:
|
||
count += 1
|
||
if cache.get("openalex_api_exceed"):
|
||
break
|
||
countdown = 2
|
||
if cache.get("openalex_api_exceed"):
|
||
countdown = 30 * 60 # 30分钟后重试
|
||
if show_task_run(def_name):
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_pdf_from_openalex",
|
||
kwargs={
|
||
"number_of_task": number_of_task,
|
||
},
|
||
countdown=countdown,
|
||
)
|
||
return msg, count
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True):
|
||
def_name = get_abstract_from_elsevier.name
|
||
if not show_task_run(def_name):
|
||
return "stoped"
|
||
# qs = Paper.objects.filter(Q(has_abstract=False)|Q(has_fulltext_xml=False))
|
||
qs = Paper.objects.filter(has_abstract=False)
|
||
# qs = qs.exclude(
|
||
# fail_reason__contains="elsevier_doi_not_found"
|
||
# ).exclude(fail_reason__contains="elsevier_abstract_not_found"
|
||
# )
|
||
if exclude_failed:
|
||
qs = qs.filter(fail_reason=None)
|
||
qs = qs.exclude(fetch_status="downloading"
|
||
).filter(doi__startswith="10.1016").order_by("?")
|
||
|
||
if not qs.exists():
|
||
return "done"
|
||
|
||
params = {
|
||
"httpAccept": "text/xml"
|
||
}
|
||
err_msg = ""
|
||
count_abs = 0
|
||
count_fulltext = 0
|
||
with requests.Session() as req:
|
||
for paper in qs[:number_of_task]:
|
||
if not show_task_run(def_name):
|
||
break
|
||
if paper.fetch_status == "downloading":
|
||
continue
|
||
paper.fetch(status="downloading")
|
||
try:
|
||
try:
|
||
res = req.get(
|
||
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||
params=params,
|
||
headers = ELSEVIER_HEADERS,
|
||
timeout=(3, 15)
|
||
)
|
||
except requests.RequestException:
|
||
err_msg = "elsevier_request_error"
|
||
break
|
||
if res.status_code == 200:
|
||
xml_str = res.text
|
||
try:
|
||
root = etree.fromstring(xml_str.encode("utf-8"))
|
||
except etree.XMLSyntaxError:
|
||
paper.save_fail_reason("elsevier_xml_error")
|
||
continue
|
||
|
||
ns = {"dc": "http://purl.org/dc/elements/1.1/",
|
||
"ce": "http://www.elsevier.com/xml/common/dtd",
|
||
"xocs": "http://www.elsevier.com/xml/xocs/dtd",}
|
||
abstract = root.xpath("//dc:description/text()", namespaces=ns)
|
||
if abstract:
|
||
PaperAbstract.objects.update_or_create(
|
||
paper=paper,
|
||
defaults={
|
||
"abstract": abstract[0].strip(),
|
||
"source": "elsevier"
|
||
}
|
||
)
|
||
paper.has_abstract = True
|
||
paper.has_abstract_xml = True
|
||
count_abs += 1
|
||
else:
|
||
paper.save_fail_reason("elsevier_abstract_not_found")
|
||
continue
|
||
|
||
paras = root.xpath("//ce:para", namespaces=ns)
|
||
has_fulltext = len(paras) > 0
|
||
if has_fulltext is False:
|
||
rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns)
|
||
if rawtexts and len(rawtexts[0].strip()) > 2000:
|
||
has_fulltext = True
|
||
if has_fulltext:
|
||
paper.has_fulltext = True
|
||
paper.has_fulltext_xml = True
|
||
count_fulltext += 1
|
||
|
||
paper.save_file_xml(xml_str)
|
||
paper.save(update_fields=["has_abstract",
|
||
"has_abstract_xml", "has_fulltext",
|
||
"has_fulltext_xml", "update_time"])
|
||
|
||
elif res.status_code == 404:
|
||
paper.save_fail_reason("elsevier_doi_not_found")
|
||
else:
|
||
err_msg = f"elsevier_response_error: {res.status_code} {res.text}"
|
||
finally:
|
||
paper.fetch_end()
|
||
|
||
|
||
if show_task_run(def_name):
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_abstract_from_elsevier",
|
||
kwargs={
|
||
"number_of_task": number_of_task,
|
||
},
|
||
countdown=5,
|
||
)
|
||
return f'{err_msg}, get {count_abs} abstracts, {count_fulltext} fulltexts'
|
||
|
||
|
||
@shared_task(base=CustomTask)
|
||
def get_pdf_from_elsevier(number_of_task=100):
|
||
"""
|
||
获取elsevier全文
|
||
"""
|
||
def_name = get_pdf_from_elsevier.name
|
||
if not show_task_run(def_name):
|
||
return "stoped"
|
||
qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True)
|
||
err_msg = ""
|
||
with requests.Session() as req:
|
||
for paper in qs[:number_of_task]:
|
||
if not show_task_run(def_name):
|
||
break
|
||
params = {
|
||
"apiKey": ELSEVIER_APIKEY,
|
||
"httpAccept": "application/pdf"
|
||
}
|
||
try:
|
||
res = req.get(
|
||
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||
params=params,
|
||
timeout=(3, 15)
|
||
)
|
||
except requests.RequestException:
|
||
err_msg = "elsevier_request_error"
|
||
break
|
||
if res.status_code == 200:
|
||
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
|
||
is_pdf = (
|
||
res.content.startswith(b'%PDF') or
|
||
res.headers.get("content-type", "").startswith("application/pdf")
|
||
)
|
||
if is_pdf and len(res.content) > 1024: # 至少1KB
|
||
paper.save_file_pdf(res.content)
|
||
paper.has_fulltext_pdf = True
|
||
paper.save(update_fields=["has_fulltext_pdf", "update_time"])
|
||
qs_count = qs.count()
|
||
if show_task_run(def_name) and qs_count > 0:
|
||
current_app.send_task(
|
||
"apps.resm.tasks.get_pdf_from_elsevier",
|
||
kwargs={
|
||
"number_of_task": number_of_task,
|
||
},
|
||
countdown=5,
|
||
)
|
||
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
|
||
|
||
def get_actual_running_count():
|
||
"""获取实际在下载的任务数"""
|
||
return Paper.objects.filter(fetch_status='downloading').count()
|
||
|
||
def can_send_more(max_running):
|
||
return get_actual_running_count() < max_running
|
||
|
||
@shared_task(base=CustomTask)
|
||
def send_download_fulltext_task(number_of_task=100):
|
||
qs = Paper.objects.filter(has_fulltext=False, fail_reason=None, is_oa=True).exclude(
|
||
fetch_status='downloading'
|
||
)
|
||
if not qs.exists():
|
||
return "done"
|
||
qs0 = qs.order_by("?")
|
||
|
||
# 分批发送任务,控制并发数量,避免过多请求
|
||
task_count = 0
|
||
for paper in qs0[:number_of_task]:
|
||
if not can_send_more(number_of_task):
|
||
break
|
||
# 使用 countdown 错开请求时间,避免过多并发
|
||
countdown = task_count * 1 # 每个任务间隔1秒
|
||
current_app.send_task(
|
||
"apps.resm.tasks.download_pdf",
|
||
kwargs={
|
||
"paper_id": paper.id,
|
||
},
|
||
countdown=countdown,
|
||
)
|
||
task_count += 1
|
||
|
||
return f"sent {task_count} download_pdf tasks"
|
||
|
||
@shared_task(base=CustomTask)
|
||
def release_working_paper(minutes=10):
|
||
qs = Paper.objects.filter(fetch_status="downloading")
|
||
count = 0
|
||
for paper in qs:
|
||
if paper.update_time < timezone.now() - timedelta(minutes=minutes):
|
||
paper.fetch_end()
|
||
count += 1
|
||
return f"release {count} papers"
|
||
|
||
@shared_task(base=CustomTask)
|
||
def download_pdf(paper_id):
|
||
"""
|
||
下载单个论文的PDF
|
||
"""
|
||
try:
|
||
paper = Paper.objects.get(id=paper_id)
|
||
if paper.fetch_status == "downloading":
|
||
return
|
||
paper.fetch("downloading")
|
||
msg = "no_method_to_get_pdf"
|
||
current_from = ""
|
||
if paper.oa_url:
|
||
if "https://doi.org/10.1016" in paper.oa_url:
|
||
current_from = "elsevier"
|
||
msg = save_pdf_from_elsevier(paper)
|
||
else:
|
||
current_from = "oa_url"
|
||
msg = save_pdf_from_oa_url(paper)
|
||
if paper.has_fulltext_pdf is False and paper.publication_year <= 2021:
|
||
current_from = "scihub"
|
||
msg = save_pdf_from_scihub(paper)
|
||
if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
|
||
current_from = "openalex"
|
||
msg = save_pdf_from_openalex(paper)
|
||
if paper.fail_reason is None and paper.has_fulltext_pdf is False:
|
||
paper.save_fail_reason(msg)
|
||
return msg, current_from
|
||
finally:
|
||
paper.fetch_end()
|
||
|
||
|
||
def save_pdf_from_oa_url(paper:Paper):
|
||
try:
|
||
headers = get_random_headers()
|
||
res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15))
|
||
except requests.RequestException as e:
|
||
paper.save_fail_reason("oa_url_request_error")
|
||
return f"oa_url_request_error: {str(e)}"
|
||
|
||
if res.status_code in [200, 201, 202]:
|
||
# 检查是否是PDF文件:检查魔数 %PDF 或 content-type
|
||
is_pdf = (
|
||
res.content.startswith(b'%PDF') or
|
||
res.headers.get("content-type", "").startswith("application/pdf") or
|
||
res.headers.get("content-type", "") == "application/octet-stream"
|
||
)
|
||
if is_pdf and len(res.content) > 1024: # 至少1KB
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return "success"
|
||
else:
|
||
paper.save_fail_reason("oa_url_not_pdf")
|
||
return "oa_url_not_pdf"
|
||
elif res.status_code == 403:
|
||
paper.save_fail_reason("oa_url_need_play")
|
||
# paper_path = paper.init_paper_path("pdf")
|
||
# is_ok, err_msg = run_async(download_from_url_playwright(paper.oa_url, paper_path))
|
||
# if is_ok:
|
||
# paper.has_fulltext = True
|
||
# paper.has_fulltext_pdf = True
|
||
# paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
||
# return "success"
|
||
# else:
|
||
# paper.save_fail_reason(f"oa_url_pdf_play_error: {err_msg}")
|
||
# return f"oa_url_pdf_play_error: {err_msg}"
|
||
return f"oa_url_pdf_oerror: {res.status_code}"
|
||
|
||
def save_pdf_from_openalex(paper:Paper):
|
||
if cache.get("openalex_api_exceed"):
|
||
return "find cache Insufficient credits"
|
||
# 尝试openalex下载
|
||
try:
|
||
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
|
||
params={
|
||
"api_key": OPENALEX_KEY
|
||
})
|
||
except requests.RequestException as e:
|
||
return f"openalex_pdf_error: {str(e)}"
|
||
if res.status_code == 200:
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return "success"
|
||
elif res.status_code == 429:
|
||
if "Insufficient credits" in res.json().get("message", ""):
|
||
cache.set("openalex_api_exceed", True, timeout=3600)
|
||
return "openalex_pdf_error: Insufficient credits"
|
||
elif res.status_code == 404:
|
||
paper.save_fail_reason("openalex_pdf_not_found")
|
||
return "openalex_pdf_error: 404"
|
||
else:
|
||
paper.save_fail_reason("openalex_pdf_error")
|
||
return f"openalex_pdf_error: {res.status_code} {res.text}"
|
||
|
||
|
||
def save_pdf_from_elsevier(paper:Paper):
|
||
params = {
|
||
"httpAccept": "application/pdf",
|
||
}
|
||
try:
|
||
res = requests.get(
|
||
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||
params=params,
|
||
headers=ELSEVIER_HEADERS,
|
||
timeout=(3, 15)
|
||
)
|
||
except requests.RequestException as e:
|
||
return f"elsevier_request_error: {str(e)}"
|
||
if res.status_code == 200:
|
||
paper.save_file_pdf(res.content, save_obj=True)
|
||
return "success"
|
||
else:
|
||
return f"elsevier_status_error: {res.status_code} {res.text}"
|
||
|
||
def save_pdf_from_scihub(paper:Paper):
|
||
from .d_scihub import download_paper_by_doi
|
||
is_ok, err_msg = download_paper_by_doi(paper.doi, paper.init_paper_path("pdf"))
|
||
if is_ok:
|
||
paper.has_fulltext = True
|
||
paper.has_fulltext_pdf = True
|
||
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf"])
|
||
return "success"
|
||
else:
|
||
paper.save_fail_reason(err_msg)
|
||
return err_msg
|
||
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true |