feat: 优化fetch_status
This commit is contained in:
parent
bfcc6d77fc
commit
1f5def2821
|
|
@ -0,0 +1,18 @@
|
||||||
|
# Generated by Django 4.2.27 on 2026-01-30 02:37
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('resm', '0004_alter_paper_fail_reason'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='paper',
|
||||||
|
name='fetch_status',
|
||||||
|
field=models.CharField(blank=True, max_length=20, null=True),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
@ -27,11 +27,7 @@ class Paper(BaseModel):
|
||||||
has_fulltext = models.BooleanField(default=False, db_index=True)
|
has_fulltext = models.BooleanField(default=False, db_index=True)
|
||||||
has_fulltext_xml = models.BooleanField(default=False, db_index=True)
|
has_fulltext_xml = models.BooleanField(default=False, db_index=True)
|
||||||
has_fulltext_pdf = models.BooleanField(default=False, db_index=True)
|
has_fulltext_pdf = models.BooleanField(default=False, db_index=True)
|
||||||
fetch_status = models.CharField(
|
fetch_status = models.CharField(max_length=20, null=True, blank=True) # downloading
|
||||||
max_length=20,
|
|
||||||
default="meta_only", # meta_only / downloading / abstract_ready / fulltext_ready / parsed / failed
|
|
||||||
db_index=True
|
|
||||||
)
|
|
||||||
fail_reason = models.TextField(null=True, blank=True)
|
fail_reason = models.TextField(null=True, blank=True)
|
||||||
|
|
||||||
source = models.CharField(
|
source = models.CharField(
|
||||||
|
|
@ -63,11 +59,15 @@ class Paper(BaseModel):
|
||||||
with open(paper_file, "wb") as f:
|
with open(paper_file, "wb") as f:
|
||||||
f.write(content.encode("utf-8"))
|
f.write(content.encode("utf-8"))
|
||||||
|
|
||||||
def save_file_pdf(self, content):
|
def save_file_pdf(self, content, save_obj=False):
|
||||||
safe_doi = self.doi.replace("/", "_")
|
safe_doi = self.doi.replace("/", "_")
|
||||||
paper_file = os.path.join(self.init_save_dir(), f"{safe_doi}.pdf")
|
paper_file = os.path.join(self.init_save_dir(), f"{safe_doi}.pdf")
|
||||||
with open(paper_file, "wb") as f:
|
with open(paper_file, "wb") as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
|
if save_obj:
|
||||||
|
self.has_fulltext = True
|
||||||
|
self.has_fulltext_pdf = True
|
||||||
|
self.save(update_fields=["has_fulltext", "has_fulltext_pdf", "update_time"])
|
||||||
|
|
||||||
def save_fail_reason(self, reason):
|
def save_fail_reason(self, reason):
|
||||||
if self.fail_reason:
|
if self.fail_reason:
|
||||||
|
|
@ -76,6 +76,14 @@ class Paper(BaseModel):
|
||||||
self.fail_reason = f";{reason}"
|
self.fail_reason = f";{reason}"
|
||||||
self.save(update_fields=["fail_reason", "update_time"])
|
self.save(update_fields=["fail_reason", "update_time"])
|
||||||
|
|
||||||
|
def fetch(self, status:str):
|
||||||
|
self.fetch_status = status
|
||||||
|
self.save(update_fields=["fetch_status", "update_time"])
|
||||||
|
|
||||||
|
def fetch_end(self):
|
||||||
|
self.fetch_status = None
|
||||||
|
self.save(update_fields=["fetch_status", "update_time"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PaperAbstract(BaseModel):
|
class PaperAbstract(BaseModel):
|
||||||
|
|
|
||||||
|
|
@ -140,11 +140,9 @@ def get_abstract_from_elsevier(number_of_task:int = 20):
|
||||||
for paper in qs[:number_of_task]:
|
for paper in qs[:number_of_task]:
|
||||||
if not show_task_run(def_name):
|
if not show_task_run(def_name):
|
||||||
break
|
break
|
||||||
original_status = paper.fetch_status
|
if paper.fetch_status == "downloading":
|
||||||
if original_status == "downloading":
|
|
||||||
continue
|
continue
|
||||||
paper.fetch_status = "downloading"
|
paper.fetch(status="downloading")
|
||||||
paper.save(update_fields=["fetch_status", "update_time"])
|
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
res = req.get(
|
res = req.get(
|
||||||
|
|
@ -177,7 +175,6 @@ def get_abstract_from_elsevier(number_of_task:int = 20):
|
||||||
)
|
)
|
||||||
paper.has_abstract = True
|
paper.has_abstract = True
|
||||||
paper.has_abstract_xml = True
|
paper.has_abstract_xml = True
|
||||||
paper.fetch_status = "abstract_ready"
|
|
||||||
else:
|
else:
|
||||||
paper.save_fail_reason("elsevier_abstract_not_found")
|
paper.save_fail_reason("elsevier_abstract_not_found")
|
||||||
continue
|
continue
|
||||||
|
|
@ -191,19 +188,17 @@ def get_abstract_from_elsevier(number_of_task:int = 20):
|
||||||
if has_fulltext:
|
if has_fulltext:
|
||||||
paper.has_fulltext = True
|
paper.has_fulltext = True
|
||||||
paper.has_fulltext_xml = True
|
paper.has_fulltext_xml = True
|
||||||
paper.fetch_status = "fulltext_ready"
|
|
||||||
|
|
||||||
paper.save_file_xml(xml_str)
|
paper.save_file_xml(xml_str)
|
||||||
paper.save(update_fields=["has_abstract",
|
paper.save(update_fields=["has_abstract",
|
||||||
"has_abstract_xml", "has_fulltext",
|
"has_abstract_xml", "has_fulltext",
|
||||||
"has_fulltext_xml", "update_time", "fetch_status"])
|
"has_fulltext_xml", "update_time"])
|
||||||
|
save_pdf_from_elsevier(paper)
|
||||||
|
|
||||||
elif res.status_code == 404:
|
elif res.status_code == 404:
|
||||||
paper.save_fail_reason("elsevier_doi_not_found")
|
paper.save_fail_reason("elsevier_doi_not_found")
|
||||||
finally:
|
finally:
|
||||||
if paper.fetch_status == "downloading":
|
paper.fetch_end()
|
||||||
paper.fetch_status = original_status
|
|
||||||
paper.save(update_fields=["fetch_status", "update_time"])
|
|
||||||
|
|
||||||
qs_count = qs.count()
|
qs_count = qs.count()
|
||||||
if show_task_run(def_name) and qs_count > 0:
|
if show_task_run(def_name) and qs_count > 0:
|
||||||
|
|
@ -274,7 +269,7 @@ def can_send_more(max_running):
|
||||||
|
|
||||||
@shared_task(base=CustomTask)
|
@shared_task(base=CustomTask)
|
||||||
def send_download_fulltext_task(number_of_task=100):
|
def send_download_fulltext_task(number_of_task=100):
|
||||||
qs = Paper.objects.filter(is_oa=True, has_fulltext=False, fail_reason=None).exclude(
|
qs = Paper.objects.filter(has_fulltext=False, fail_reason=None).exclude(
|
||||||
fetch_status='downloading'
|
fetch_status='downloading'
|
||||||
)
|
)
|
||||||
if not qs.exists():
|
if not qs.exists():
|
||||||
|
|
@ -286,17 +281,16 @@ def send_download_fulltext_task(number_of_task=100):
|
||||||
for paper in qs0[:number_of_task]:
|
for paper in qs0[:number_of_task]:
|
||||||
if not can_send_more(number_of_task):
|
if not can_send_more(number_of_task):
|
||||||
break
|
break
|
||||||
if paper.oa_url:
|
# 使用 countdown 错开请求时间,避免过多并发
|
||||||
# 使用 countdown 错开请求时间,避免过多并发
|
countdown = task_count * 1 # 每个任务间隔1秒
|
||||||
countdown = task_count * 1 # 每个任务间隔1秒
|
current_app.send_task(
|
||||||
current_app.send_task(
|
"apps.resm.tasks.download_pdf",
|
||||||
"apps.resm.tasks.download_pdf",
|
kwargs={
|
||||||
kwargs={
|
"paper_id": paper.id,
|
||||||
"paper_id": paper.id,
|
},
|
||||||
},
|
countdown=countdown,
|
||||||
countdown=countdown,
|
)
|
||||||
)
|
task_count += 1
|
||||||
task_count += 1
|
|
||||||
|
|
||||||
return f"sent {task_count} download_pdf tasks"
|
return f"sent {task_count} download_pdf tasks"
|
||||||
|
|
||||||
|
|
@ -306,31 +300,26 @@ def download_pdf(paper_id):
|
||||||
"""
|
"""
|
||||||
下载单个论文的PDF
|
下载单个论文的PDF
|
||||||
"""
|
"""
|
||||||
paper = None
|
|
||||||
original_status = None
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
paper = Paper.objects.get(id=paper_id)
|
paper = Paper.objects.get(id=paper_id)
|
||||||
original_status = paper.fetch_status
|
if paper.fetch_status == "downloading":
|
||||||
if original_status == "downloading":
|
return
|
||||||
return f"paper {paper_id} is already downloading"
|
paper.fetch("downloading")
|
||||||
|
|
||||||
# 将状态改为downloading
|
|
||||||
paper.fetch_status = 'downloading'
|
|
||||||
paper.save(update_fields=['fetch_status', 'update_time'])
|
|
||||||
msg = "没有下载渠道"
|
msg = "没有下载渠道"
|
||||||
current_from = ""
|
current_from = ""
|
||||||
if paper.oa_url:
|
if paper.oa_url:
|
||||||
current_from = "oa_url"
|
if "https://doi.org/10.1016" in paper.oa_url:
|
||||||
msg = save_pdf_from_oa_url(paper)
|
current_from = "elsevier"
|
||||||
|
msg = save_pdf_from_elsevier(paper)
|
||||||
|
else:
|
||||||
|
current_from = "oa_url"
|
||||||
|
msg = save_pdf_from_oa_url(paper)
|
||||||
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
|
# if paper.has_fulltext_pdf is False and cache.get("openalex_api_exceed") is None:
|
||||||
# current_from = "openalex"
|
# current_from = "openalex"
|
||||||
# msg = save_pdf_from_openalex(paper)
|
# msg = save_pdf_from_openalex(paper)
|
||||||
return msg, current_from
|
return msg, current_from
|
||||||
finally:
|
finally:
|
||||||
if paper and paper.fetch_status == "downloading":
|
paper.fetch_end()
|
||||||
paper.fetch_status = original_status
|
|
||||||
paper.save(update_fields=['fetch_status', 'update_time'])
|
|
||||||
|
|
||||||
|
|
||||||
def save_pdf_from_oa_url(paper:Paper):
|
def save_pdf_from_oa_url(paper:Paper):
|
||||||
|
|
@ -349,12 +338,9 @@ def save_pdf_from_oa_url(paper:Paper):
|
||||||
res.headers.get("content-type", "") == "application/octet-stream"
|
res.headers.get("content-type", "") == "application/octet-stream"
|
||||||
)
|
)
|
||||||
if is_pdf and len(res.content) > 1024: # 至少1KB
|
if is_pdf and len(res.content) > 1024: # 至少1KB
|
||||||
paper.save_file_pdf(res.content)
|
paper.save_file_pdf(res.content, save_obj=True)
|
||||||
paper.has_fulltext = True
|
|
||||||
paper.has_fulltext_pdf = True
|
|
||||||
paper.fetch_status = "fulltext_ready"
|
|
||||||
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
|
|
||||||
return "success"
|
return "success"
|
||||||
|
return f"oa_url_pdf_error: {res.status_code}"
|
||||||
|
|
||||||
def save_pdf_from_openalex(paper:Paper):
|
def save_pdf_from_openalex(paper:Paper):
|
||||||
if cache.get("openalex_api_exceed"):
|
if cache.get("openalex_api_exceed"):
|
||||||
|
|
@ -366,19 +352,34 @@ def save_pdf_from_openalex(paper:Paper):
|
||||||
"api_key": OPENALEX_KEY
|
"api_key": OPENALEX_KEY
|
||||||
})
|
})
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
paper.save_fail_reason("openalex_pdf_error")
|
|
||||||
return f"openalex_pdf_error: {str(e)}"
|
return f"openalex_pdf_error: {str(e)}"
|
||||||
if res.status_code == 200:
|
if res.status_code == 200:
|
||||||
paper.save_file_pdf(res.content)
|
paper.save_file_pdf(res.content, save_obj=True)
|
||||||
paper.has_fulltext = True
|
|
||||||
paper.has_fulltext_pdf = True
|
|
||||||
paper.fetch_status = "fulltext_ready"
|
|
||||||
paper.save(update_fields=["has_fulltext", "has_fulltext_pdf", "fetch_status", "update_time"])
|
|
||||||
return "success"
|
return "success"
|
||||||
elif res.status_code == 429:
|
elif res.status_code == 429:
|
||||||
if "Insufficient credits" in res.json().get("message", ""):
|
if "Insufficient credits" in res.json().get("message", ""):
|
||||||
cache.set("openalex_api_exceed", True, timeout=3600)
|
cache.set("openalex_api_exceed", True, timeout=3600)
|
||||||
return "Insufficient credits"
|
return "openalex_pdf_error: Insufficient credits"
|
||||||
|
|
||||||
|
|
||||||
|
def save_pdf_from_elsevier(paper:Paper):
|
||||||
|
params = {
|
||||||
|
"apiKey": ELSEVIER_APIKEY,
|
||||||
|
"httpAccept": "application/pdf"
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
res = requests.get(
|
||||||
|
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
|
||||||
|
params=params,
|
||||||
|
timeout=(3, 15)
|
||||||
|
)
|
||||||
|
except requests.RequestException as e:
|
||||||
|
return f"elsevier_request_error: {str(e)}"
|
||||||
|
if res.status_code == 200:
|
||||||
|
paper.save_file_pdf(res.content, save_obj=True)
|
||||||
|
return "success"
|
||||||
|
else:
|
||||||
|
return f"elsevier_status_error: {res.status_code}"
|
||||||
|
|
||||||
def save_pdf_from_scihub(paper:Paper):
|
def save_pdf_from_scihub(paper:Paper):
|
||||||
pass
|
pass
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue