From 13c231637fa185a790deee5ded2ce93298fdbd2c Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 29 Jan 2026 13:53:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=9D=E8=AF=81download=5Fpdf?= =?UTF-8?q?=E4=B8=8D=E8=B6=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/resm/tasks.py | 99 ++++++++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 39 deletions(-) diff --git a/apps/resm/tasks.py b/apps/resm/tasks.py index e0f8305..79e4615 100644 --- a/apps/resm/tasks.py +++ b/apps/resm/tasks.py @@ -256,27 +256,34 @@ def get_pdf_from_elsevier(number_of_task=100): ) return f'{def_name}, {err_msg}, remaining {qs_count} papers' +RUNNING_KEY = "download_pdf:running" +MAX_RUNNING = 20 + +def can_send_more(): + return cache.get(RUNNING_KEY, 0) < MAX_RUNNING @shared_task(base=CustomTask) def send_download_fulltext_task(number_of_task=100): - def_name = send_download_fulltext_task.name - if not show_task_run(def_name): - return "stoped" qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None) - if not qs.exists(): return "done" - qs0 = qs.order_by("?") # 分批发送任务,控制并发数量,避免过多请求 task_count = 0 for paper in qs0[:number_of_task]: - if not show_task_run(def_name): + if not can_send_more(): break if paper.oa_url: + # 发送任务前先增加计数,确保计数准确 + current_count = incr_running() + if current_count > MAX_RUNNING: + # 超过限制,回滚计数并停止 + decr_running() + break + # 使用 countdown 错开请求时间,避免过多并发 - countdown = task_count * 2 # 每个任务间隔2秒 + countdown = task_count * 1 # 每个任务间隔1秒 current_app.send_task( "apps.resm.tasks.download_pdf", kwargs={ @@ -285,17 +292,27 @@ def send_download_fulltext_task(number_of_task=100): countdown=countdown, ) task_count += 1 + return f"sent {task_count} download_pdf tasks, running {cache.get(RUNNING_KEY, 0)}/{MAX_RUNNING}" - qs_count = qs.count() - if show_task_run(def_name) and qs_count > 0: - current_app.send_task( - def_name, - kwargs={ - "number_of_task": number_of_task, - }, - countdown=10, # 等待当前批次完成后再继续 - ) - return f'{def_name}, sent {task_count} tasks, remaining {qs_count} papers' + +def incr_running(): + try: + return cache.incr(RUNNING_KEY) + except ValueError: + cache.set(RUNNING_KEY, 1) + return 1 + + +def decr_running(): + try: + current = cache.get(RUNNING_KEY, 0) + if current <= 0: + cache.set(RUNNING_KEY, 0) + return 0 + return cache.decr(RUNNING_KEY) + except ValueError: + cache.set(RUNNING_KEY, 0) + return 0 @shared_task(base=CustomTask) @@ -304,24 +321,27 @@ def download_pdf(paper_id): 下载单个论文的PDF """ try: - paper = Paper.objects.get(id=paper_id) - except Paper.DoesNotExist: - return f"Paper {paper_id} not found" - - # 检查缓存,避免短时间内重复下载同一个paper - cache_key = f"download_pdf_{paper_id}" - if cache.get(cache_key): - return "already_processing" + try: + paper = Paper.objects.get(id=paper_id) + except Paper.DoesNotExist: + return f"Paper {paper_id} not found" + + # 检查缓存,避免短时间内重复下载同一个paper + cache_key = f"download_pdf_{paper_id}" + if cache.get(cache_key): + return "already_processing" - # 设置处理中标记,防止并发重复处理 - cache.set(cache_key, True, timeout=3600) - - current_from = "oa_url" - msg = save_pdf_from_oa_url(paper) - if paper.has_fulltext_pdf is False: - current_from = "openalex" - msg = save_pdf_from_openalex(paper) - return msg, current_from + # 设置处理中标记,防止并发重复处理 + cache.set(cache_key, True, timeout=3600) + + current_from = "oa_url" + msg = save_pdf_from_oa_url(paper) + if paper.has_fulltext_pdf is False: + current_from = "openalex" + msg = save_pdf_from_openalex(paper) + return msg, current_from + finally: + decr_running() def save_pdf_from_oa_url(paper:Paper): @@ -347,7 +367,9 @@ def save_pdf_from_oa_url(paper:Paper): paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) return "success" -def save_pdf_from_openalex(paper:Paper, retry=1): +def save_pdf_from_openalex(paper:Paper): + if cache.get("openalex_api_exceed"): + return "Insufficient credits" # 尝试openalex下载 try: res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf", @@ -365,10 +387,9 @@ def save_pdf_from_openalex(paper:Paper, retry=1): paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) return "success" elif res.status_code == 429: - if retry > 1: - time.sleep(random.randint(1, 10)) - retry -= 1 - return save_pdf_from_openalex(paper, retry=retry) + if "Insufficient credits" in res.json().get("message", ""): + cache.set("openalex_api_exceed", True, timeout=3600) + return "Insufficient credits" # https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true \ No newline at end of file