From aaa51f548b6b6775183ce32a424115cfdb2d2a9f Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 29 Jan 2026 17:52:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=87=E7=94=A8fetch=5Fstatus?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E4=B8=8B=E8=BD=BD=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/resm/models.py | 2 +- apps/resm/tasks.py | 82 ++++++++++++++++----------------------------- 2 files changed, 29 insertions(+), 55 deletions(-) diff --git a/apps/resm/models.py b/apps/resm/models.py index cf9cd70..e2e48a0 100644 --- a/apps/resm/models.py +++ b/apps/resm/models.py @@ -29,7 +29,7 @@ class Paper(BaseModel): has_fulltext_pdf = models.BooleanField(default=False, db_index=True) fetch_status = models.CharField( max_length=20, - default="meta_only", # meta_only / abstract_ready / fulltext_ready / parsed / failed + default="meta_only", # meta_only / downloading / abstract_ready / fulltext_ready / parsed / failed db_index=True ) fail_reason = models.TextField(null=True, blank=True) diff --git a/apps/resm/tasks.py b/apps/resm/tasks.py index 06fe7a3..6727542 100644 --- a/apps/resm/tasks.py +++ b/apps/resm/tasks.py @@ -117,16 +117,15 @@ def show_task_run(def_name: str): return cache.get(def_name, True) @shared_task(base=CustomTask) -def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 20): +def get_abstract_from_elsevier(number_of_task:int = 20): def_name = get_abstract_from_elsevier.name if not show_task_run(def_name): return "stoped" qs = Paper.objects.filter(has_abstract=False) - if publication_year is not None: - qs = qs.filter(publication_year=publication_year) qs = qs.exclude( fail_reason__contains="elsevier_doi_not_found" - ).exclude(fail_reason__contains="elsevier_abstract_not_found").order_by("publication_date") + ).exclude(fail_reason__contains="elsevier_abstract_not_found" + ).exclude(fetch_status="downloading").order_by("publication_date") if not qs.exists(): return "done" @@ -256,19 +255,18 @@ def get_pdf_from_elsevier(number_of_task=100): ) return f'{def_name}, {err_msg}, remaining {qs_count} papers' -RUNNING_KEY = "download_pdf:running" +def get_actual_running_count(): + """获取实际在下载的任务数""" + return Paper.objects.filter(fetch_status='downloading').count() -def get_max_running(): - return cache.get("download_pdf:max_running", 100) - -def can_send_more(): - return cache.get(RUNNING_KEY, 0) < get_max_running() +def can_send_more(max_running): + return get_actual_running_count() < max_running @shared_task(base=CustomTask) def send_download_fulltext_task(number_of_task=100): - if number_of_task != get_max_running(): - cache.set("download_pdf:max_running", number_of_task) - qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None) + qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None).exclude( + fetch_status='downloading' + ) if not qs.exists(): return "done" qs0 = qs.order_by("?") @@ -276,16 +274,9 @@ def send_download_fulltext_task(number_of_task=100): # 分批发送任务,控制并发数量,避免过多请求 task_count = 0 for paper in qs0[:number_of_task]: - if not can_send_more(): + if not can_send_more(number_of_task): break if paper.oa_url: - # 发送任务前先增加计数,确保计数准确 - current_count = incr_running() - if current_count > number_of_task: - # 超过限制,回滚计数并停止 - decr_running() - break - # 使用 countdown 错开请求时间,避免过多并发 countdown = task_count * 1 # 每个任务间隔1秒 current_app.send_task( @@ -296,27 +287,8 @@ def send_download_fulltext_task(number_of_task=100): countdown=countdown, ) task_count += 1 - return f"sent {task_count} download_pdf tasks, running {cache.get(RUNNING_KEY, 0)}/{number_of_task}" - - -def incr_running(): - try: - return cache.incr(RUNNING_KEY) - except ValueError: - cache.set(RUNNING_KEY, 1) - return 1 - - -def decr_running(): - try: - current = cache.get(RUNNING_KEY, 0) - if current <= 0: - cache.set(RUNNING_KEY, 0) - return 0 - return cache.decr(RUNNING_KEY) - except ValueError: - cache.set(RUNNING_KEY, 0) - return 0 + + return f"sent {task_count} download_pdf tasks" @shared_task(base=CustomTask) @@ -324,19 +296,18 @@ def download_pdf(paper_id): """ 下载单个论文的PDF """ + paper = None + original_status = None + try: - try: - paper = Paper.objects.get(id=paper_id) - except Paper.DoesNotExist: - return f"Paper {paper_id} not found" + paper = Paper.objects.get(id=paper_id) + original_status = paper.fetch_status + if original_status == "downloading": + return f"paper {paper_id} is already downloading" - # 检查缓存,避免短时间内重复下载同一个paper - cache_key = f"download_pdf_{paper_id}" - if cache.get(cache_key): - return "already_processing" - - # 设置处理中标记,防止并发重复处理 - cache.set(cache_key, True, timeout=3600) + # 将状态改为downloading + paper.fetch_status = 'downloading' + paper.save(update_fields=['fetch_status', 'update_time']) current_from = "oa_url" msg = save_pdf_from_oa_url(paper) @@ -345,7 +316,10 @@ def download_pdf(paper_id): msg = save_pdf_from_openalex(paper) return msg, current_from finally: - decr_running() + # 出错时恢复到原状态 + if paper.fetch_status == "downloading" and paper.has_fulltext_pdf is False: + paper.fetch_status = original_status + paper.save(update_fields=['fetch_status', 'update_time']) def save_pdf_from_oa_url(paper:Paper):