feat: 保证download_pdf不超

This commit is contained in:
caoqianming 2026-01-29 13:53:01 +08:00
parent 3afdbc995c
commit 13c231637f
1 changed files with 60 additions and 39 deletions

View File

@ -256,27 +256,34 @@ def get_pdf_from_elsevier(number_of_task=100):
) )
return f'{def_name}, {err_msg}, remaining {qs_count} papers' return f'{def_name}, {err_msg}, remaining {qs_count} papers'
RUNNING_KEY = "download_pdf:running"
MAX_RUNNING = 20
def can_send_more():
return cache.get(RUNNING_KEY, 0) < MAX_RUNNING
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
def send_download_fulltext_task(number_of_task=100): def send_download_fulltext_task(number_of_task=100):
def_name = send_download_fulltext_task.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None) qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None)
if not qs.exists(): if not qs.exists():
return "done" return "done"
qs0 = qs.order_by("?") qs0 = qs.order_by("?")
# 分批发送任务,控制并发数量,避免过多请求 # 分批发送任务,控制并发数量,避免过多请求
task_count = 0 task_count = 0
for paper in qs0[:number_of_task]: for paper in qs0[:number_of_task]:
if not show_task_run(def_name): if not can_send_more():
break break
if paper.oa_url: if paper.oa_url:
# 发送任务前先增加计数,确保计数准确
current_count = incr_running()
if current_count > MAX_RUNNING:
# 超过限制,回滚计数并停止
decr_running()
break
# 使用 countdown 错开请求时间,避免过多并发 # 使用 countdown 错开请求时间,避免过多并发
countdown = task_count * 2 # 每个任务间隔2秒 countdown = task_count * 1 # 每个任务间隔1
current_app.send_task( current_app.send_task(
"apps.resm.tasks.download_pdf", "apps.resm.tasks.download_pdf",
kwargs={ kwargs={
@ -285,17 +292,27 @@ def send_download_fulltext_task(number_of_task=100):
countdown=countdown, countdown=countdown,
) )
task_count += 1 task_count += 1
return f"sent {task_count} download_pdf tasks, running {cache.get(RUNNING_KEY, 0)}/{MAX_RUNNING}"
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0: def incr_running():
current_app.send_task( try:
def_name, return cache.incr(RUNNING_KEY)
kwargs={ except ValueError:
"number_of_task": number_of_task, cache.set(RUNNING_KEY, 1)
}, return 1
countdown=10, # 等待当前批次完成后再继续
)
return f'{def_name}, sent {task_count} tasks, remaining {qs_count} papers' def decr_running():
try:
current = cache.get(RUNNING_KEY, 0)
if current <= 0:
cache.set(RUNNING_KEY, 0)
return 0
return cache.decr(RUNNING_KEY)
except ValueError:
cache.set(RUNNING_KEY, 0)
return 0
@shared_task(base=CustomTask) @shared_task(base=CustomTask)
@ -303,6 +320,7 @@ def download_pdf(paper_id):
""" """
下载单个论文的PDF 下载单个论文的PDF
""" """
try:
try: try:
paper = Paper.objects.get(id=paper_id) paper = Paper.objects.get(id=paper_id)
except Paper.DoesNotExist: except Paper.DoesNotExist:
@ -322,6 +340,8 @@ def download_pdf(paper_id):
current_from = "openalex" current_from = "openalex"
msg = save_pdf_from_openalex(paper) msg = save_pdf_from_openalex(paper)
return msg, current_from return msg, current_from
finally:
decr_running()
def save_pdf_from_oa_url(paper:Paper): def save_pdf_from_oa_url(paper:Paper):
@ -347,7 +367,9 @@ def save_pdf_from_oa_url(paper:Paper):
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
return "success" return "success"
def save_pdf_from_openalex(paper:Paper, retry=1): def save_pdf_from_openalex(paper:Paper):
if cache.get("openalex_api_exceed"):
return "Insufficient credits"
# 尝试openalex下载 # 尝试openalex下载
try: try:
res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf", res = requests.get(url=f"https://content.openalex.org/works/{paper.openalex_id}.pdf",
@ -365,10 +387,9 @@ def save_pdf_from_openalex(paper:Paper, retry=1):
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"]) paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
return "success" return "success"
elif res.status_code == 429: elif res.status_code == 429:
if retry > 1: if "Insufficient credits" in res.json().get("message", ""):
time.sleep(random.randint(1, 10)) cache.set("openalex_api_exceed", True, timeout=3600)
retry -= 1 return "Insufficient credits"
return save_pdf_from_openalex(paper, retry=retry)
# https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true # https://sci.bban.top/pdf/10.1016/j.conbuildmat.2020.121016.pdf?download=true