diff --git a/apps/resm/migrations/0010_seed_ensure_fetch_running.py b/apps/resm/migrations/0010_seed_ensure_fetch_running.py new file mode 100644 index 0000000..654fbfc --- /dev/null +++ b/apps/resm/migrations/0010_seed_ensure_fetch_running.py @@ -0,0 +1,43 @@ +"""注册抓取链常驻保活的 beat 周期任务。 + +apps.resm.tasks.ensure_fetch_running 每 60 秒检查各自触发抓取链的 alive 心跳, +链已死(重启/崩溃/空闲过期)且未被手动停止时重新点火, 保证重启后自愈。 +update_or_create / get_or_create 保证迁移可安全重跑。 +""" +from django.db import migrations + +ENSURE_TASK = "apps.resm.tasks.ensure_fetch_running" +ENSURE_NAME = "resm: 抓取链常驻保活(beat 点火)" + + +def seed(apps, schema_editor): + IntervalSchedule = apps.get_model("django_celery_beat", "IntervalSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + sched, _ = IntervalSchedule.objects.get_or_create(every=60, period="seconds") + PeriodicTask.objects.update_or_create( + name=ENSURE_NAME, + defaults={ + "task": ENSURE_TASK, + "interval": sched, + "crontab": None, + "enabled": True, + "description": "每 60 秒检查抓取自触发链 alive 心跳, 链已死且未手动停止则重新点火, 保证重启自愈", + }, + ) + + +def unseed(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + PeriodicTask.objects.filter(name=ENSURE_NAME).delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("resm", "0009_seed_monitors_and_schedule"), + ("django_celery_beat", "__latest__"), + ] + + operations = [ + migrations.RunPython(seed, unseed), + ] diff --git a/apps/resm/tasks.py b/apps/resm/tasks.py index 3d91283..bb9b7b0 100644 --- a/apps/resm/tasks.py +++ b/apps/resm/tasks.py @@ -471,16 +471,53 @@ def get_random_headers(): def show_task_run(def_name: str): return cache.get(def_name, True) +# 自触发抓取链的"常驻保活": 链每跑一轮刷一次 alive 心跳, beat 周期任务据此 +# 判断链是否还活着, 死了(重启/崩溃/空闲过期)且未被手动停止时重新点火。 +ALIVE_TTL = 120 # alive 过期(秒): 需 > 链最大自重发间隔(60s), 给 beat 留判定窗口 +MANAGED_FETCH_TASKS = [ + "apps.resm.tasks.get_abstract_from_elsevier", + "apps.resm.tasks.get_pdf_from_openalex", +] + +def touch_alive(def_name: str): + """标记该自触发链仍在运行。""" + cache.set(def_name + ":alive", 1, timeout=ALIVE_TTL) + +def is_alive(def_name: str): + return cache.get(def_name + ":alive") is not None + +@shared_task(base=CustomTask) +def ensure_fetch_running(): + """beat 周期点火: 链死了且未被手动停止则重新点起, 保证重启后自愈。""" + started = [] + for name in MANAGED_FETCH_TASKS: + if show_task_run(name) and not is_alive(name): + current_app.send_task(name) + started.append(name) + return f"ensure_fetch_running started: {started}" + @shared_task(base=CustomTask) def get_pdf_from_openalex(number_of_task: int =10): def_name = get_pdf_from_openalex.name if not show_task_run(def_name): return "stoped" + touch_alive(def_name) + + # 限流退避中: 不打 API, 慢节奏自重发只为维持 alive, 等 exceed 标记自然过期。 + # 这样 beat 看到 alive 仍在, 不会另起一条链去撞限流。 + if cache.get("openalex_api_exceed"): + current_app.send_task( + "apps.resm.tasks.get_pdf_from_openalex", + kwargs={"number_of_task": number_of_task}, + countdown=60, + ) + return "openalex_api_exceed, backing off" + count = 0 qs = Paper.objects.filter(is_oa=True, has_fulltext=False).exclude( fetch_status="downloading").exclude(fail_reason__contains="openalex_pdf_not_found")[:number_of_task] if not qs.exists(): - return "done" + return "done" # 不自重发, 交给 beat 轮询拉起 msg = "" for paper in qs: if not show_task_run(def_name): @@ -492,10 +529,9 @@ def get_pdf_from_openalex(number_of_task: int =10): count += 1 if cache.get("openalex_api_exceed"): break - countdown = 2 - if cache.get("openalex_api_exceed"): - countdown = 30 * 60 # 30分钟后重试 if show_task_run(def_name): + # 刚撞上限流则放慢到 60s(仍维持 alive), 否则紧凑 2s + countdown = 60 if cache.get("openalex_api_exceed") else 2 current_app.send_task( "apps.resm.tasks.get_pdf_from_openalex", kwargs={ @@ -505,17 +541,100 @@ def get_pdf_from_openalex(number_of_task: int =10): ) return msg, count +def _elsevier_fetch_xml(req, paper): + """同一 DOI 取 XML, 解析摘要/全文标记并落库。 + 返回 (got_abstract, got_fulltext, err): err 仅在网络异常时为 'request_error'。""" + try: + res = req.get( + f"https://api.elsevier.com/content/article/doi/{paper.doi}", + params={"httpAccept": "text/xml"}, + headers=ELSEVIER_HEADERS, + timeout=(3, 15), + ) + except requests.RequestException: + return False, False, "request_error" + + if res.status_code == 404: + paper.save_fail_reason("elsevier_doi_not_found") + return False, False, None + if res.status_code != 200: + paper.save_fail_reason(f"elsevier_response_error: {res.status_code}") + return False, False, None + + xml_str = res.text + try: + root = etree.fromstring(xml_str.encode("utf-8")) + except etree.XMLSyntaxError: + paper.save_fail_reason("elsevier_xml_error") + return False, False, None + + ns = {"dc": "http://purl.org/dc/elements/1.1/", + "ce": "http://www.elsevier.com/xml/common/dtd", + "xocs": "http://www.elsevier.com/xml/xocs/dtd"} + abstract = root.xpath("//dc:description/text()", namespaces=ns) + if not abstract: + paper.save_fail_reason("elsevier_abstract_not_found") + return False, False, None + + PaperAbstract.objects.update_or_create( + paper=paper, + defaults={"abstract": abstract[0].strip(), "source": "elsevier"}, + ) + paper.has_abstract = True + paper.has_abstract_xml = True + + paras = root.xpath("//ce:para", namespaces=ns) + has_fulltext = len(paras) > 0 + if has_fulltext is False: + rawtexts = root.xpath("//xocs:rawtext/text()", namespaces=ns) + if rawtexts and len(rawtexts[0].strip()) > 2000: + has_fulltext = True + if has_fulltext: + paper.has_fulltext = True + paper.has_fulltext_xml = True + + paper.save_file_xml(xml_str) + paper.save(update_fields=["has_abstract", "has_abstract_xml", + "has_fulltext", "has_fulltext_xml", "update_time"]) + return True, has_fulltext, None + + +def _elsevier_fetch_pdf(req, paper): + """同一 DOI 取 PDF, 成功落库返回 True。""" + try: + res = req.get( + f"https://api.elsevier.com/content/article/doi/{paper.doi}", + params={"httpAccept": "application/pdf"}, + headers=ELSEVIER_HEADERS, + timeout=(3, 15), + ) + except requests.RequestException: + return False + if res.status_code == 200: + # 检查是否是PDF文件:检查魔数 %PDF 或 content-type + is_pdf = ( + res.content.startswith(b'%PDF') or + res.headers.get("content-type", "").startswith("application/pdf") + ) + if is_pdf and len(res.content) > 1024: # 至少1KB + paper.save_file_pdf(res.content, save_obj=True) + return True + return False + + @shared_task(base=CustomTask) -def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True): +def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True, + pdf_number_of_task:int = 20): + """Elsevier 单端点合并任务: 同一 DOI 先取 XML(摘要/全文标记), 若有全文则内联 + 再取一次 PDF; 并补抓历史上已有全文标记但缺 PDF 的论文。原 get_pdf_from_elsevier 已并入。 + number_of_task: 阶段1(摘要+内联 PDF)每轮上限; pdf_number_of_task: 阶段2(存量补 PDF)每轮上限。""" def_name = get_abstract_from_elsevier.name if not show_task_run(def_name): return "stoped" - # qs = Paper.objects.filter(Q(has_abstract=False)|Q(has_fulltext_xml=False)) + touch_alive(def_name) + + # 待抓摘要(并顺带取 PDF) qs = Paper.objects.filter(has_abstract=False) - # qs = qs.exclude( - # fail_reason__contains="elsevier_doi_not_found" - # ).exclude(fail_reason__contains="elsevier_abstract_not_found" - # ) if exclude_failed: qs = qs.filter(fail_reason=None) else: @@ -523,16 +642,20 @@ def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True qs = qs.exclude(fetch_status="downloading" ).filter(doi__startswith="10.1016").order_by("?") - if not qs.exists(): - return "done" + # 存量补 PDF: 已有全文标记但还没下到 PDF + qs_pdf = Paper.objects.filter( + has_fulltext=True, has_fulltext_pdf=False, has_abstract=True + ).exclude(fetch_status="downloading").filter(doi__startswith="10.1016") + + if not qs.exists() and not qs_pdf.exists(): + return "done" # 不自重发, 交给 beat 轮询拉起 - params = { - "httpAccept": "text/xml" - } err_msg = "" count_abs = 0 count_fulltext = 0 + count_pdf = 0 with requests.Session() as req: + # 阶段1: 摘要 + 内联 PDF for paper in qs[:number_of_task]: if not show_task_run(def_name): break @@ -540,127 +663,45 @@ def get_abstract_from_elsevier(number_of_task:int = 20, exclude_failed:bool=True continue paper.fetch(status="downloading") try: - try: - res = req.get( - f"https://api.elsevier.com/content/article/doi/{paper.doi}", - params=params, - headers = ELSEVIER_HEADERS, - timeout=(3, 15) - ) - except requests.RequestException: + got_abs, got_fulltext, err = _elsevier_fetch_xml(req, paper) + if err == "request_error": err_msg = "elsevier_request_error" break - if res.status_code == 200: - xml_str = res.text - try: - root = etree.fromstring(xml_str.encode("utf-8")) - except etree.XMLSyntaxError: - paper.save_fail_reason("elsevier_xml_error") - continue - - ns = {"dc": "http://purl.org/dc/elements/1.1/", - "ce": "http://www.elsevier.com/xml/common/dtd", - "xocs": "http://www.elsevier.com/xml/xocs/dtd",} - abstract = root.xpath("//dc:description/text()", namespaces=ns) - if abstract: - PaperAbstract.objects.update_or_create( - paper=paper, - defaults={ - "abstract": abstract[0].strip(), - "source": "elsevier" - } - ) - paper.has_abstract = True - paper.has_abstract_xml = True - count_abs += 1 - else: - paper.save_fail_reason("elsevier_abstract_not_found") - continue - - paras = root.xpath("//ce:para", namespaces=ns) - has_fulltext = len(paras) > 0 - if has_fulltext is False: - rawtexts = root.xpath("//xocs:rawtext/text()",namespaces=ns) - if rawtexts and len(rawtexts[0].strip()) > 2000: - has_fulltext = True - if has_fulltext: - paper.has_fulltext = True - paper.has_fulltext_xml = True - count_fulltext += 1 - - paper.save_file_xml(xml_str) - paper.save(update_fields=["has_abstract", - "has_abstract_xml", "has_fulltext", - "has_fulltext_xml", "update_time"]) - - elif res.status_code == 404: - paper.save_fail_reason("elsevier_doi_not_found") - else: - err_msg = f"elsevier_response_error: {res.status_code} {res.text}" - paper.save_fail_reason(f"elsevier_response_error: {res.status_code}") + if got_abs: + count_abs += 1 + if got_fulltext: + count_fulltext += 1 + if _elsevier_fetch_pdf(req, paper): + count_pdf += 1 finally: paper.fetch_end() + # 阶段2: 存量补 PDF (摘要阶段未因网络异常中断才继续) + if err_msg == "": + for paper in qs_pdf[:pdf_number_of_task]: + if not show_task_run(def_name): + break + if paper.fetch_status == "downloading": + continue + paper.fetch(status="downloading") + try: + if _elsevier_fetch_pdf(req, paper): + count_pdf += 1 + finally: + paper.fetch_end() if show_task_run(def_name): + countdown = 30 if err_msg else 5 # 网络异常时放慢重试 current_app.send_task( "apps.resm.tasks.get_abstract_from_elsevier", kwargs={ "number_of_task": number_of_task, - "exclude_failed": exclude_failed + "exclude_failed": exclude_failed, + "pdf_number_of_task": pdf_number_of_task, }, - countdown=5, + countdown=countdown, ) - return f'{err_msg}, get {count_abs} abstracts, {count_fulltext} fulltexts' - - -@shared_task(base=CustomTask) -def get_pdf_from_elsevier(number_of_task=100): - """ - 获取elsevier全文 - """ - def_name = get_pdf_from_elsevier.name - if not show_task_run(def_name): - return "stoped" - qs = Paper.objects.filter(has_fulltext=True, has_fulltext_pdf=False, has_abstract=True) - err_msg = "" - with requests.Session() as req: - for paper in qs[:number_of_task]: - if not show_task_run(def_name): - break - params = { - "apiKey": ELSEVIER_APIKEY, - "httpAccept": "application/pdf" - } - try: - res = req.get( - f"https://api.elsevier.com/content/article/doi/{paper.doi}", - params=params, - timeout=(3, 15) - ) - except requests.RequestException: - err_msg = "elsevier_request_error" - break - if res.status_code == 200: - # 检查是否是PDF文件:检查魔数 %PDF 或 content-type - is_pdf = ( - res.content.startswith(b'%PDF') or - res.headers.get("content-type", "").startswith("application/pdf") - ) - if is_pdf and len(res.content) > 1024: # 至少1KB - paper.save_file_pdf(res.content) - paper.has_fulltext_pdf = True - paper.save(update_fields=["has_fulltext_pdf", "update_time"]) - qs_count = qs.count() - if show_task_run(def_name) and qs_count > 0: - current_app.send_task( - "apps.resm.tasks.get_pdf_from_elsevier", - kwargs={ - "number_of_task": number_of_task, - }, - countdown=5, - ) - return f'{def_name}, {err_msg}, remaining {qs_count} papers' + return f'{err_msg}, abs {count_abs}, fulltext {count_fulltext}, pdf {count_pdf}' def get_actual_running_count(): """获取实际在下载的任务数""" @@ -798,9 +839,17 @@ def save_pdf_from_openalex(paper:Paper): paper.save_file_pdf(res.content, save_obj=True) return "success" elif res.status_code == 429: - if "Insufficient credits" in res.json().get("message", ""): + try: + message = res.json().get("message", "") + except ValueError: + message = res.text + if "Insufficient credits" in message: + # 额度耗尽: 退避 1 小时 cache.set("openalex_api_exceed", True, timeout=3600) return "openalex_pdf_error: Insufficient credits" + # 普通限流(请求过频): 短退避 2 分钟, 避免立刻重试再撞 429 + cache.set("openalex_api_exceed", True, timeout=120) + return f"openalex_pdf_error: 429 {message[:100]}" elif res.status_code == 404: paper.save_fail_reason("openalex_pdf_not_found") return "openalex_pdf_error: 404"