# Create your tasks here from __future__ import absolute_import, unicode_literals from apps.utils.tasks import CustomTask from celery import shared_task from pyalex import Works, config from apps.resm.models import Paper, PaperAbstract from apps.utils.snowflake import idWorker from django.core.cache import cache import requests from lxml import etree from celery import current_app from datetime import datetime import random # config.email = "caoqianming@foxmail.com" config.email = "caoqianming@ctc.ac.cn" config.max_retries = 0 config.retry_backoff_factor = 0.1 config.retry_http_codes = [429, 500, 503] # OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd" OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp" config.api_key = OPENALEX_KEY @shared_task(base=CustomTask) def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" cache_cursor = cache.get(cache_key, "*") if keywords or search: pass else: raise Exception("keywords or search must be provided") # filter=keywords.id:clinker|cement pager = Works().filter( publication_year=publication_year, has_doi=True, type="article" ) if keywords: if "|" in keywords: keywords_list = keywords.split("|") else: keywords_list = [keywords] pager = pager.filter( keywords={"id": keywords_list} ) if search: pager = pager.filter( search=search ) pager = pager.select([ "id", "doi", "title", "publication_date", "open_access", "authorships", "primary_location", "publication_year", "display_name", "content_urls" ]).paginate(per_page=200, n_max=None, cursor=cache_cursor) next_cursor = pager._next_value for page in pager: papers = [] for record in page: if record["doi"] and (record["display_name"] or record["title"]): paper = Paper() paper.id = idWorker.get_id() paper.o_keywords = keywords paper.o_search = search paper.source = "openalex" paper.type = "article" paper.openalex_id = record["id"].split("/")[-1] paper.doi = record["doi"].replace("https://doi.org/", "") paper.title = record["display_name"] paper.publication_date = record["publication_date"] paper.publication_year = record["publication_year"] if record["open_access"]: paper.is_oa = record["open_access"]["is_oa"] paper.oa_url = record["open_access"]["oa_url"] if record["authorships"]: paper.first_author = record["authorships"][0]["author"]["display_name"] if record["authorships"][0]["institutions"]: paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] if record["primary_location"] and record["primary_location"]["source"]: paper.publication_name = record["primary_location"]["source"]["display_name"] papers.append(paper) Paper.objects.bulk_create(papers, ignore_conflicts=True) cache.set(cache_key, next_cursor, timeout=None) if end_year is None: end_year = datetime.now().year if publication_year + 1 <= end_year: current_app.send_task( "apps.resm.tasks.get_paper_meta_from_openalex", kwargs={ "publication_year": publication_year + 1, "keywords": keywords, "search": search, "end_year": end_year }, countdown=5 ) ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf' # 常用的 User-Agent 列表 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15", ] def get_random_headers(): """获取随机的请求头""" return { "User-Agent": random.choice(USER_AGENTS), "Accept": "application/pdf, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Referer": "https://www.google.com/", } def show_task_run(def_name: str): return cache.get(def_name, True) @shared_task(base=CustomTask) def get_abstract_from_elsevier(number_of_task:int = 20): def_name = get_abstract_from_elsevier.name if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(has_abstract=False) qs = qs.exclude( fail_reason__contains="elsevier_doi_not_found" ).exclude(fail_reason__contains="elsevier_abstract_not_found" ).exclude(fetch_status="downloading").order_by("publication_date") if not qs.exists(): return "done" params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "text/xml" } err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not show_task_run(def_name): break original_status = paper.fetch_status if original_status == "downloading": continue paper.fetch_status = "downloading" paper.save(update_fields=["fetch_status", "update_time"]) try: try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200: xml_str = res.text try: root = etree.fromstring(xml_str.encode("utf-8")) except etree.XMLSyntaxError: paper.save_fail_reason("elsevier_xml_error") continue ns = {"dc": "http://purl.org/dc/elements/1.1/", "ce": "http://www.elsevier.com/xml/common/dtd", "xocs": "http://www.elsevier.com/xml/xocs/dtd",} abstract = root.xpath("//dc:description/text()", namespaces=ns) if abstract: PaperAbstract.objects.update_or_create( paper=paper, defaults={ "abstract": abstract[0].strip(), "source": "elsevier" } ) paper.has_abstract = True paper.has_abstract_xml = True paper.fetch_status = "abstract_ready" else: paper.save_fail_reason("elsevier_abstract_not_found") continue paras = root.xpath("//ce:para", namespaces=ns) has_fulltext = len(paras) > 0 if has_fulltext is False: rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns) if rawtexts and len(rawtexts[0].strip()) > 2000: has_fulltext = True if has_fulltext: paper.has_fulltext = True paper.has_fulltext_xml = True paper.fetch_status = "fulltext_ready" paper.save_file_xml(xml_str) paper.save(update_fields=["has_abstract", "has_abstract_xml", "has_fulltext", "has_fulltext_xml", "update_time", "fetch_status"]) elif res.status_code == 404: paper.save_fail_reason("elsevier_doi_not_found") finally: if paper.fetch_status == "downloading": paper.fetch_status = original_status paper.save(update_fields=["fetch_status", "update_time"]) qs_count = qs.count() if show_task_run(def_name) and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_abstract_from_elsevier", kwargs={ "number_of_task": number_of_task, }, countdown=5, ) return f'{def_name}, {err_msg}, remaining {qs_count} papers' @shared_task(base=CustomTask) def get_pdf_from_elsevier(number_of_task=100): """ 获取elsevier全文 """ def_name = get_pdf_from_elsevier.name if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True) err_msg = "" with requests.Session() as req: for paper in qs[:number_of_task]: if not show_task_run(def_name): break params = { "apiKey": ELSEVIER_APIKEY, "httpAccept": "application/pdf" } try: res = req.get( f"https://api.elsevier.com/content/article/doi/{paper.doi}", params=params, timeout=(3, 15) ) except requests.RequestException: err_msg = "elsevier_request_error" break if res.status_code == 200: # 检查是否是PDF文件:检查魔数 %PDF 或 content-type is_pdf = ( res.content.startswith(b'%PDF') or res.headers.get("content-type", "").startswith("application/pdf") ) if is_pdf and len(res.content) > 1024: # 至少1KB paper.save_file_pdf(res.content) paper.has_fulltext_pdf = True paper.save(update_fields=["has_fulltext_pdf", "update_time"]) qs_count = qs.count() if show_task_run(def_name) and qs_count > 0: current_app.send_task( "apps.resm.tasks.get_pdf_from_elsevier", kwargs={ "number_of_task": number_of_task, }, countdown=5, ) return f'{def_name}, {err_msg}, remaining {qs_count} papers' def get_actual_running_count(): """获取实际在下载的任务数""" return Paper.objects.filter(fetch_status='downloading').count() def can_send_more(max_running): return get_actual_running_count() < max_running @shared_task(base=CustomTask) def send_download_fulltext_task(number_of_task=100): qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None).exclude( fetch_status='downloading' ) if not qs.exists(): return "done" qs0 = qs.order_by("?") # 分批发送任务,控制并发数量,避免过多请求 task_count = 0 for paper in qs0[:number_of_task]: if not can_send_more(number_of_task): break if paper.oa_url: # 使用 countdown 错开请求时间,避免过多并发 countdown = task_count * 1 # 每个任务间隔1秒 current_app.send_task( "apps.resm.tasks.download_pdf", kwargs={ "paper_id": paper.id, }, countdown=countdown, ) task_count += 1 return f"sent {task_count} download_pdf tasks" @shared_task(base=CustomTask) def download_pdf(paper_id): """ 下载单个论文的PDF """ paper = None original_status = None try: paper = Paper.objects.get(id=paper_id) original_status = paper.fetch_status if original_status == "downloading": return f"paper {paper_id} is already downloading" # 将状态改为downloading paper.fetch_status = 'downloading' paper.save(update_fields=['fetch_status', 'update_time']) msg = "没有下载渠道" current_from = "" if paper.oa_url: current_from = "oa_url" msg = save_pdf_from_oa_url(paper) # if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None: # current_from = "openalex" # msg = save_pdf_from_openalex(paper) return msg, current_from finally: if paper and paper.fetch_status == "downloading": paper.fetch_status = original_status paper.save(update_fields=['fetch_status', 'update_time']) def save_pdf_from_oa_url(paper:Paper): try: headers = get_random_headers() res = requests.get(paper.oa_url, headers=headers, timeout=(3, 15)) except requests.RequestException as e: paper.save_fail_reason("oa_url_request_error") return f"oa_url_request_error: {str(e)}" if res.status_code == 200: # 检查是否是PDF文件:检查魔数 %PDF 或 content-type is_pdf = ( res.content.startswith(b'%PDF') or res.headers.get("content-type", "").startswith("application/pdf") or res.headers.get("content-type", "") == "application/octet-stream" ) if is_pdf and len(res.content) > 1024: # 至少1KB paper.save_file_pdf(res.content) paper.has_fulltext = True paper.has_fulltext_pdf = True paper.fetch_status = "fulltext_ready" paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) return "success" def save_pdf_from_openalex(paper:Paper): if cache.get("openalex_api_exceed"): return "find cache Insufficient credits" # 尝试openalex下载 try: res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf", params={ "api_key": OPENALEX_KEY }) except requests.RequestException as e: paper.save_fail_reason("openalex_pdf_error") return f"openalex_pdf_error: {str(e)}" if res.status_code == 200: paper.save_file_pdf(res.content) paper.has_fulltext = True paper.has_fulltext_pdf = True paper.fetch_status = "fulltext_ready" paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) return "success" elif res.status_code == 429: if "Insufficient credits" in res.json().get("message", ""): cache.set("openalex_api_exceed", True, timeout=3600) return "Insufficient credits" def save_pdf_from_scihub(paper:Paper): pass # https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true