feat: 采用fetch_status记录下载状态
This commit is contained in:
parent
434659ba09
commit
aaa51f548b
|
|
@ -29,7 +29,7 @@ class Paper(BaseModel):
|
||||||
has_fulltext_pdf = models.BooleanField(default=False, db_index=True)
|
has_fulltext_pdf = models.BooleanField(default=False, db_index=True)
|
||||||
fetch_status = models.CharField(
|
fetch_status = models.CharField(
|
||||||
max_length=20,
|
max_length=20,
|
||||||
default="meta_only", # meta_only / abstract_ready / fulltext_ready / parsed / failed
|
default="meta_only", # meta_only / downloading / abstract_ready / fulltext_ready / parsed / failed
|
||||||
db_index=True
|
db_index=True
|
||||||
)
|
)
|
||||||
fail_reason = models.TextField(null=True, blank=True)
|
fail_reason = models.TextField(null=True, blank=True)
|
||||||
|
|
|
||||||
|
|
@ -117,16 +117,15 @@ def show_task_run(def_name: str):
|
||||||
return cache.get(def_name, True)
|
return cache.get(def_name, True)
|
||||||
|
|
||||||
@shared_task(base=CustomTask)
|
@shared_task(base=CustomTask)
|
||||||
def get_abstract_from_elsevier(publication_year: int = None, number_of_task:int = 20):
|
def get_abstract_from_elsevier(number_of_task:int = 20):
|
||||||
def_name = get_abstract_from_elsevier.name
|
def_name = get_abstract_from_elsevier.name
|
||||||
if not show_task_run(def_name):
|
if not show_task_run(def_name):
|
||||||
return "stoped"
|
return "stoped"
|
||||||
qs = Paper.objects.filter(has_abstract=False)
|
qs = Paper.objects.filter(has_abstract=False)
|
||||||
if publication_year is not None:
|
|
||||||
qs = qs.filter(publication_year=publication_year)
|
|
||||||
qs = qs.exclude(
|
qs = qs.exclude(
|
||||||
fail_reason__contains="elsevier_doi_not_found"
|
fail_reason__contains="elsevier_doi_not_found"
|
||||||
).exclude(fail_reason__contains="elsevier_abstract_not_found").order_by("publication_date")
|
).exclude(fail_reason__contains="elsevier_abstract_not_found"
|
||||||
|
).exclude(fetch_status="downloading").order_by("publication_date")
|
||||||
|
|
||||||
if not qs.exists():
|
if not qs.exists():
|
||||||
return "done"
|
return "done"
|
||||||
|
|
@ -256,19 +255,18 @@ def get_pdf_from_elsevier(number_of_task=100):
|
||||||
)
|
)
|
||||||
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
|
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
|
||||||
|
|
||||||
RUNNING_KEY = "download_pdf:running"
|
def get_actual_running_count():
|
||||||
|
"""获取实际在下载的任务数"""
|
||||||
|
return Paper.objects.filter(fetch_status='downloading').count()
|
||||||
|
|
||||||
def get_max_running():
|
def can_send_more(max_running):
|
||||||
return cache.get("download_pdf:max_running", 100)
|
return get_actual_running_count() < max_running
|
||||||
|
|
||||||
def can_send_more():
|
|
||||||
return cache.get(RUNNING_KEY, 0) < get_max_running()
|
|
||||||
|
|
||||||
@shared_task(base=CustomTask)
|
@shared_task(base=CustomTask)
|
||||||
def send_download_fulltext_task(number_of_task=100):
|
def send_download_fulltext_task(number_of_task=100):
|
||||||
if number_of_task != get_max_running():
|
qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None).exclude(
|
||||||
cache.set("download_pdf:max_running", number_of_task)
|
fetch_status='downloading'
|
||||||
qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None)
|
)
|
||||||
if not qs.exists():
|
if not qs.exists():
|
||||||
return "done"
|
return "done"
|
||||||
qs0 = qs.order_by("?")
|
qs0 = qs.order_by("?")
|
||||||
|
|
@ -276,16 +274,9 @@ def send_download_fulltext_task(number_of_task=100):
|
||||||
# 分批发送任务,控制并发数量,避免过多请求
|
# 分批发送任务,控制并发数量,避免过多请求
|
||||||
task_count = 0
|
task_count = 0
|
||||||
for paper in qs0[:number_of_task]:
|
for paper in qs0[:number_of_task]:
|
||||||
if not can_send_more():
|
if not can_send_more(number_of_task):
|
||||||
break
|
break
|
||||||
if paper.oa_url:
|
if paper.oa_url:
|
||||||
# 发送任务前先增加计数,确保计数准确
|
|
||||||
current_count = incr_running()
|
|
||||||
if current_count > number_of_task:
|
|
||||||
# 超过限制,回滚计数并停止
|
|
||||||
decr_running()
|
|
||||||
break
|
|
||||||
|
|
||||||
# 使用 countdown 错开请求时间,避免过多并发
|
# 使用 countdown 错开请求时间,避免过多并发
|
||||||
countdown = task_count * 1 # 每个任务间隔1秒
|
countdown = task_count * 1 # 每个任务间隔1秒
|
||||||
current_app.send_task(
|
current_app.send_task(
|
||||||
|
|
@ -296,27 +287,8 @@ def send_download_fulltext_task(number_of_task=100):
|
||||||
countdown=countdown,
|
countdown=countdown,
|
||||||
)
|
)
|
||||||
task_count += 1
|
task_count += 1
|
||||||
return f"sent {task_count} download_pdf tasks, running {cache.get(RUNNING_KEY, 0)}/{number_of_task}"
|
|
||||||
|
|
||||||
|
return f"sent {task_count} download_pdf tasks"
|
||||||
def incr_running():
|
|
||||||
try:
|
|
||||||
return cache.incr(RUNNING_KEY)
|
|
||||||
except ValueError:
|
|
||||||
cache.set(RUNNING_KEY, 1)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
|
|
||||||
def decr_running():
|
|
||||||
try:
|
|
||||||
current = cache.get(RUNNING_KEY, 0)
|
|
||||||
if current <= 0:
|
|
||||||
cache.set(RUNNING_KEY, 0)
|
|
||||||
return 0
|
|
||||||
return cache.decr(RUNNING_KEY)
|
|
||||||
except ValueError:
|
|
||||||
cache.set(RUNNING_KEY, 0)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(base=CustomTask)
|
@shared_task(base=CustomTask)
|
||||||
|
|
@ -324,19 +296,18 @@ def download_pdf(paper_id):
|
||||||
"""
|
"""
|
||||||
下载单个论文的PDF
|
下载单个论文的PDF
|
||||||
"""
|
"""
|
||||||
try:
|
paper = None
|
||||||
|
original_status = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
paper = Paper.objects.get(id=paper_id)
|
paper = Paper.objects.get(id=paper_id)
|
||||||
except Paper.DoesNotExist:
|
original_status = paper.fetch_status
|
||||||
return f"Paper {paper_id} not found"
|
if original_status == "downloading":
|
||||||
|
return f"paper {paper_id} is already downloading"
|
||||||
|
|
||||||
# 检查缓存,避免短时间内重复下载同一个paper
|
# 将状态改为downloading
|
||||||
cache_key = f"download_pdf_{paper_id}"
|
paper.fetch_status = 'downloading'
|
||||||
if cache.get(cache_key):
|
paper.save(update_fields=['fetch_status', 'update_time'])
|
||||||
return "already_processing"
|
|
||||||
|
|
||||||
# 设置处理中标记,防止并发重复处理
|
|
||||||
cache.set(cache_key, True, timeout=3600)
|
|
||||||
|
|
||||||
current_from = "oa_url"
|
current_from = "oa_url"
|
||||||
msg = save_pdf_from_oa_url(paper)
|
msg = save_pdf_from_oa_url(paper)
|
||||||
|
|
@ -345,7 +316,10 @@ def download_pdf(paper_id):
|
||||||
msg = save_pdf_from_openalex(paper)
|
msg = save_pdf_from_openalex(paper)
|
||||||
return msg, current_from
|
return msg, current_from
|
||||||
finally:
|
finally:
|
||||||
decr_running()
|
# 出错时恢复到原状态
|
||||||
|
if paper.fetch_status == "downloading" and paper.has_fulltext_pdf is False:
|
||||||
|
paper.fetch_status = original_status
|
||||||
|
paper.save(update_fields=['fetch_status', 'update_time'])
|
||||||
|
|
||||||
|
|
||||||
def save_pdf_from_oa_url(paper:Paper):
|
def save_pdf_from_oa_url(paper:Paper):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue