feat(resm): 合并 elsevier 抓取任务 + 抓取链常驻保活 + openalex 限流退避

- 合并 get_pdf_from_elsevier 进 get_abstract_from_elsevier: 同一 DOI 取 XML 后
  发现全文则内联取 PDF, 并补抓存量缺 PDF 论文; 阶段2 批量上限拆为 pdf_number_of_task
- 新增 ensure_fetch_running beat 任务 + alive 心跳: 自触发链重启/崩溃/空闲后自愈
- get_pdf_from_openalex: 限流期间慢节奏刷 alive 不打 API; 普通 429 也退避
- migration 0010 注册 ensure_fetch_running 每 60s 周期任务

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-23 10:06:30 +08:00
parent 643cb97e4a
commit 12f97fc47f
2 changed files with 216 additions and 124 deletions

View File

@ -0,0 +1,43 @@
"""注册抓取链常驻保活的 beat 周期任务。
apps.resm.tasks.ensure_fetch_running 60 秒检查各自触发抓取链的 alive 心跳,
链已死(重启/崩溃/空闲过期)且未被手动停止时重新点火, 保证重启后自愈
update_or_create / get_or_create 保证迁移可安全重跑
"""
from django.db import migrations
ENSURE_TASK = "apps.resm.tasks.ensure_fetch_running"
ENSURE_NAME = "resm: 抓取链常驻保活(beat 点火)"
def seed(apps, schema_editor):
IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
sched, _ = IntervalSchedule.objects.get_or_create(every=60, period="seconds")
PeriodicTask.objects.update_or_create(
name=ENSURE_NAME,
defaults={
"task": ENSURE_TASK,
"interval": sched,
"crontab": None,
"enabled": True,
"description": "每 60 秒检查抓取自触发链 alive 心跳, 链已死且未手动停止则重新点火, 保证重启自愈",
},
)
def unseed(apps, schema_editor):
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(name=ENSURE_NAME).delete()
class Migration(migrations.Migration):
dependencies = [
("resm", "0009_seed_monitors_and_schedule"),
("django_celery_beat", "__latest__"),
]
operations = [
migrations.RunPython(seed, unseed),
]

View File

@ -471,16 +471,53 @@ def get_random_headers():
def show_task_run(def_name: str):
return cache.get(def_name, True)
# 自触发抓取链的"常驻保活": 链每跑一轮刷一次 alive 心跳, beat 周期任务据此
# 判断链是否还活着, 死了(重启/崩溃/空闲过期)且未被手动停止时重新点火。
ALIVE_TTL = 120 # alive 过期(秒): 需 > 链最大自重发间隔(60s), 给 beat 留判定窗口
MANAGED_FETCH_TASKS = [
"apps.resm.tasks.get_abstract_from_elsevier",
"apps.resm.tasks.get_pdf_from_openalex",
]
def touch_alive(def_name: str):
"""标记该自触发链仍在运行。"""
cache.set(def_name + ":alive", 1, timeout=ALIVE_TTL)
def is_alive(def_name: str):
return cache.get(def_name + ":alive") is not None
@shared_task(base=CustomTask)
def ensure_fetch_running():
"""beat 周期点火: 链死了且未被手动停止则重新点起, 保证重启后自愈。"""
started = []
for name in MANAGED_FETCH_TASKS:
if show_task_run(name) and not is_alive(name):
current_app.send_task(name)
started.append(name)
return f"ensure_fetch_running started: {started}"
@shared_task(base=CustomTask)
def get_pdf_from_openalex(number_of_task: int =10):
def_name = get_pdf_from_openalex.name
if not show_task_run(def_name):
return "stoped"
touch_alive(def_name)
# 限流退避中: 不打 API, 慢节奏自重发只为维持 alive, 等 exceed 标记自然过期。
# 这样 beat 看到 alive 仍在, 不会另起一条链去撞限流。
if cache.get("openalex_api_exceed"):
current_app.send_task(
"apps.resm.tasks.get_pdf_from_openalex",
kwargs={"number_of_task": number_of_task},
countdown=60,
)
return "openalex_api_exceed, backing off"
count = 0
qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude(
fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task]
if not qs.exists():
return "done"
return "done" # 不自重发, 交给 beat 轮询拉起
msg = ""
for paper in qs:
if not show_task_run(def_name):
@ -492,10 +529,9 @@ def get_pdf_from_openalex(number_of_task: int =10):
count += 1
if cache.get("openalex_api_exceed"):
break
countdown = 2
if cache.get("openalex_api_exceed"):
countdown = 30 * 60 # 30分钟后重试
if show_task_run(def_name):
# 刚撞上限流则放慢到 60s(仍维持 alive), 否则紧凑 2s
countdown = 60 if cache.get("openalex_api_exceed") else 2
current_app.send_task(
"apps.resm.tasks.get_pdf_from_openalex",
kwargs={
@ -505,17 +541,100 @@ def get_pdf_from_openalex(number_of_task: int =10):
)
return msg, count
def _elsevier_fetch_xml(req, paper):
"""同一 DOI 取 XML, 解析摘要/全文标记并落库。
返回 (got_abstract, got_fulltext, err): err 仅在网络异常时为 'request_error'"""
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params={"httpAccept": "text/xml"},
headers=ELSEVIER_HEADERS,
timeout=(3, 15),
)
except requests.RequestException:
return False, False, "request_error"
if res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
return False, False, None
if res.status_code != 200:
paper.save_fail_reason(f"elsevier_response_error: {res.status_code}")
return False, False, None
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
return False, False, None
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd"}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if not abstract:
paper.save_fail_reason("elsevier_abstract_not_found")
return False, False, None
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={"abstract": abstract[0].strip(), "source": "elsevier"},
)
paper.has_abstract = True
paper.has_abstract_xml = True
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()", namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract", "has_abstract_xml",
"has_fulltext", "has_fulltext_xml", "update_time"])
return True, has_fulltext, None
def _elsevier_fetch_pdf(req, paper):
"""同一 DOI 取 PDF, 成功落库返回 True。"""
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params={"httpAccept": "application/pdf"},
headers=ELSEVIER_HEADERS,
timeout=(3, 15),
)
except requests.RequestException:
return False
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf")
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content, save_obj=True)
return True
return False
@shared_task(base=CustomTask)
def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True):
def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True,
pdf_number_of_task:int = 20):
"""Elsevier 单端点合并任务: 同一 DOI 先取 XML(摘要/全文标记), 若有全文则内联
再取一次 PDF; 并补抓历史上已有全文标记但缺 PDF 的论文 get_pdf_from_elsevier 已并入
number_of_task: 阶段1(摘要+内联 PDF)每轮上限; pdf_number_of_task: 阶段2(存量补 PDF)每轮上限"""
def_name = get_abstract_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
# qs = Paper.objects.filter(Q(has_abstract=False)|Q(has_fulltext_xml=False))
touch_alive(def_name)
# 待抓摘要(并顺带取 PDF)
qs = Paper.objects.filter(has_abstract=False)
# qs = qs.exclude(
# fail_reason__contains="elsevier_doi_not_found"
# ).exclude(fail_reason__contains="elsevier_abstract_not_found"
# )
if exclude_failed:
qs = qs.filter(fail_reason=None)
else:
@ -523,16 +642,20 @@ def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True
qs = qs.exclude(fetch_status="downloading"
).filter(doi__startswith="10.1016").order_by("?")
if not qs.exists():
return "done"
# 存量补 PDF: 已有全文标记但还没下到 PDF
qs_pdf = Paper.objects.filter(
has_fulltext=True, has_fulltext_pdf=False, has_abstract=True
).exclude(fetch_status="downloading").filter(doi__startswith="10.1016")
if not qs.exists() and not qs_pdf.exists():
return "done" # 不自重发, 交给 beat 轮询拉起
params = {
"httpAccept": "text/xml"
}
err_msg = ""
count_abs = 0
count_fulltext = 0
count_pdf = 0
with requests.Session() as req:
# 阶段1: 摘要 + 内联 PDF
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
@ -540,127 +663,45 @@ def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True
continue
paper.fetch(status="downloading")
try:
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
headers = ELSEVIER_HEADERS,
timeout=(3, 15)
)
except requests.RequestException:
got_abs, got_fulltext, err = _elsevier_fetch_xml(req, paper)
if err == "request_error":
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
xml_str = res.text
try:
root = etree.fromstring(xml_str.encode("utf-8"))
except etree.XMLSyntaxError:
paper.save_fail_reason("elsevier_xml_error")
continue
ns = {"dc": "http://purl.org/dc/elements/1.1/",
"ce": "http://www.elsevier.com/xml/common/dtd",
"xocs": "http://www.elsevier.com/xml/xocs/dtd",}
abstract = root.xpath("//dc:description/text()", namespaces=ns)
if abstract:
PaperAbstract.objects.update_or_create(
paper=paper,
defaults={
"abstract": abstract[0].strip(),
"source": "elsevier"
}
)
paper.has_abstract = True
paper.has_abstract_xml = True
count_abs += 1
else:
paper.save_fail_reason("elsevier_abstract_not_found")
continue
paras = root.xpath("//ce:para", namespaces=ns)
has_fulltext = len(paras) > 0
if has_fulltext is False:
rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns)
if rawtexts and len(rawtexts[0].strip()) > 2000:
has_fulltext = True
if has_fulltext:
paper.has_fulltext = True
paper.has_fulltext_xml = True
count_fulltext += 1
paper.save_file_xml(xml_str)
paper.save(update_fields=["has_abstract",
"has_abstract_xml", "has_fulltext",
"has_fulltext_xml", "update_time"])
elif res.status_code == 404:
paper.save_fail_reason("elsevier_doi_not_found")
else:
err_msg = f"elsevier_response_error: {res.status_code} {res.text}"
paper.save_fail_reason(f"elsevier_response_error: {res.status_code}")
if got_abs:
count_abs += 1
if got_fulltext:
count_fulltext += 1
if _elsevier_fetch_pdf(req, paper):
count_pdf += 1
finally:
paper.fetch_end()
# 阶段2: 存量补 PDF (摘要阶段未因网络异常中断才继续)
if err_msg == "":
for paper in qs_pdf[:pdf_number_of_task]:
if not show_task_run(def_name):
break
if paper.fetch_status == "downloading":
continue
paper.fetch(status="downloading")
try:
if _elsevier_fetch_pdf(req, paper):
count_pdf += 1
finally:
paper.fetch_end()
if show_task_run(def_name):
countdown = 30 if err_msg else 5 # 网络异常时放慢重试
current_app.send_task(
"apps.resm.tasks.get_abstract_from_elsevier",
kwargs={
"number_of_task": number_of_task,
"exclude_failed": exclude_failed
"exclude_failed": exclude_failed,
"pdf_number_of_task": pdf_number_of_task,
},
countdown=5,
countdown=countdown,
)
return f'{err_msg}, get {count_abs} abstracts, {count_fulltext} fulltexts'
@shared_task(base=CustomTask)
def get_pdf_from_elsevier(number_of_task=100):
"""
获取elsevier全文
"""
def_name = get_pdf_from_elsevier.name
if not show_task_run(def_name):
return "stoped"
qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True)
err_msg = ""
with requests.Session() as req:
for paper in qs[:number_of_task]:
if not show_task_run(def_name):
break
params = {
"apiKey": ELSEVIER_APIKEY,
"httpAccept": "application/pdf"
}
try:
res = req.get(
f"https://api.elsevier.com/content/article/doi/{paper.doi}",
params=params,
timeout=(3, 15)
)
except requests.RequestException:
err_msg = "elsevier_request_error"
break
if res.status_code == 200:
# 检查是否是PDF文件检查魔数 %PDF 或 content-type
is_pdf = (
res.content.startswith(b'%PDF') or
res.headers.get("content-type", "").startswith("application/pdf")
)
if is_pdf and len(res.content) > 1024: # 至少1KB
paper.save_file_pdf(res.content)
paper.has_fulltext_pdf = True
paper.save(update_fields=["has_fulltext_pdf", "update_time"])
qs_count = qs.count()
if show_task_run(def_name) and qs_count > 0:
current_app.send_task(
"apps.resm.tasks.get_pdf_from_elsevier",
kwargs={
"number_of_task": number_of_task,
},
countdown=5,
)
return f'{def_name}, {err_msg}, remaining {qs_count} papers'
return f'{err_msg}, abs {count_abs}, fulltext {count_fulltext}, pdf {count_pdf}'
def get_actual_running_count():
"""获取实际在下载的任务数"""
@ -798,9 +839,17 @@ def save_pdf_from_openalex(paper:Paper):
paper.save_file_pdf(res.content, save_obj=True)
return "success"
elif res.status_code == 429:
if "Insufficient credits" in res.json().get("message", ""):
try:
message = res.json().get("message", "")
except ValueError:
message = res.text
if "Insufficient credits" in message:
# 额度耗尽: 退避 1 小时
cache.set("openalex_api_exceed", True, timeout=3600)
return "openalex_pdf_error: Insufficient credits"
# 普通限流(请求过频): 短退避 2 分钟, 避免立刻重试再撞 429
cache.set("openalex_api_exceed", True, timeout=120)
return f"openalex_pdf_error: 429 {message[:100]}"
elif res.status_code == 404:
paper.save_fail_reason("openalex_pdf_not_found")
return "openalex_pdf_error: 404"