diff --git a/apps/resm/migrations/0007_auto_update_index_periodic_tasks.py b/apps/resm/migrations/0007_auto_update_index_periodic_tasks.py new file mode 100644 index 0000000..53819d9 --- /dev/null +++ b/apps/resm/migrations/0007_auto_update_index_periodic_tasks.py @@ -0,0 +1,71 @@ +"""注册“自动更新论文索引”的周期任务(django-celery-beat,DB 调度)。 + +本项目用 DatabaseScheduler,周期任务存在 DB 里。这里用数据迁移幂等地建两条 +每天跑一次的 PeriodicTask: + - apps.resm.tasks.update_paper_meta_from_openalex (03:00,主力增量源) + - apps.resm.tasks.update_paper_meta_from_elsevier (04:00,ScienceDirect 补充) +错开整点,避免同时打两个外部 API。update_or_create 保证迁移可安全重跑。 +""" +import json +from django.db import migrations + +OPENALEX_TASK = "apps.resm.tasks.update_paper_meta_from_openalex" +ELSEVIER_TASK = "apps.resm.tasks.update_paper_meta_from_elsevier" +OPENALEX_NAME = "resm: 自动增量更新论文索引 (OpenAlex)" +ELSEVIER_NAME = "resm: 自动增量更新论文索引 (Elsevier 补充)" + + +def _crontab(CrontabSchedule, hour): + schedule, _ = CrontabSchedule.objects.get_or_create( + minute="0", + hour=str(hour), + day_of_week="*", + day_of_month="*", + month_of_year="*", + ) + return schedule + + +def create_periodic_tasks(apps, schema_editor): + CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + + PeriodicTask.objects.update_or_create( + name=OPENALEX_NAME, + defaults={ + "task": OPENALEX_TASK, + "crontab": _crontab(CrontabSchedule, 3), + "interval": None, + "kwargs": json.dumps({"days": 30}), + "enabled": True, + "description": "每天用 from_publication_date 拉取最近 30 天发表的论文,保持 resm_paper 索引更新(from_created_date 需 Premium)", + }, + ) + PeriodicTask.objects.update_or_create( + name=ELSEVIER_NAME, + defaults={ + "task": ELSEVIER_TASK, + "crontab": _crontab(CrontabSchedule, 4), + "interval": None, + "kwargs": json.dumps({"days": 7}), + "enabled": True, + "description": "每天用 ScienceDirect Search 补充 Elsevier(10.1016)新刊,补 OpenAlex 收录延迟", + }, + ) + + +def remove_periodic_tasks(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + PeriodicTask.objects.filter(name__in=[OPENALEX_NAME, ELSEVIER_NAME]).delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("resm", "0006_pg_trgm_index"), + ("django_celery_beat", "__latest__"), + ] + + operations = [ + migrations.RunPython(create_periodic_tasks, remove_periodic_tasks), + ] diff --git a/apps/resm/tasks.py b/apps/resm/tasks.py index f4219fe..9748b41 100644 --- a/apps/resm/tasks.py +++ b/apps/resm/tasks.py @@ -17,19 +17,19 @@ import sys import os from django.db.models import Q from django.utils import timezone +from django.conf import settings -# config.email = "caoqianming@foxmail.com" -config.email = "caoqianming@ctc.ac.cn" +# 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露) +config.email = settings.OPENALEX_EMAIL config.max_retries = 0 config.retry_backoff_factor = 0.1 config.retry_http_codes = [429, 500, 503] -# OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd" -OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp" -config.api_key = "4KJZdkCFA0uFb6IsYKc8cd" +config.api_key = settings.OPENALEX_API_KEY +OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载 -ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf' +ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY ELSEVIER_HEADERS = { - "X-ELS-Insttoken": "135fa874aea9f0de11cad187ccb4878c", + "X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN, "X-ELS-APIKey": ELSEVIER_APIKEY, } @@ -52,22 +52,20 @@ def run_async(coro): # Unix/Linux/Mac 上使用默认方式 return asyncio.run(coro) -@shared_task(base=CustomTask) -def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): - cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" - cache_cursor = cache.get(cache_key, "*") - if keywords or search: - pass - else: - raise Exception("keywords or search must be provided") - # filter=keywords.id:clinker|cement - pager = Works().filter( - publication_year=publication_year, - has_doi=True, - type="article" - ) + +# OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用 +OPENALEX_SELECT_FIELDS = [ + "id", "doi", "title", "publication_date", + "open_access", "authorships", "primary_location", "publication_year", + "display_name", "content_urls" +] + + +def _apply_keyword_search(pager, keywords: str, search: str): + """把 keywords / search 过滤条件套到 pyalex 的 pager 上。 + keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter。 + """ if keywords: - # 支持 '|' 表示 OR,',' 表示 AND。去除空白项。 if "|" in keywords: keywords_list = [k.strip() for k in keywords.split("|") if k.strip()] pager = pager.filter_or(keywords={"id": keywords_list}) @@ -75,43 +73,90 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s keywords_list = [k.strip() for k in keywords.split(",") if k.strip()] pager = pager.filter(keywords={"id": keywords_list}) else: - keywords_list = [keywords.strip()] - pager = pager.filter(keywords={"id": keywords_list}) + pager = pager.filter(keywords={"id": [keywords.strip()]}) if search: pager = pager.search_filter(title_and_abstract=search) - pager = pager.select([ - "id", "doi", "title", "publication_date", - "open_access", "authorships", "primary_location", "publication_year", - "display_name", "content_urls" - ]).paginate(per_page=200, n_max=None, cursor=cache_cursor) - next_cursor = pager._next_value + return pager + + +def _build_paper_from_record(record, keywords: str, search: str) -> Paper: + """把 OpenAlex 单条 record 映射成未保存的 Paper 实例。""" + paper = Paper() + paper.id = idWorker.get_id() + paper.o_keywords = keywords + paper.o_search = search + paper.source = "openalex" + paper.type = "article" + paper.openalex_id = record["id"].split("/")[-1] + paper.doi = record["doi"].replace("https://doi.org/", "") + paper.title = record["display_name"] + paper.publication_date = record["publication_date"] + paper.publication_year = record["publication_year"] + if record["open_access"]: + paper.is_oa = record["open_access"]["is_oa"] + paper.oa_url = record["open_access"]["oa_url"] + if record["authorships"]: + paper.first_author = record["authorships"][0]["author"]["display_name"] + if record["authorships"][0]["institutions"]: + paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] + if record["primary_location"] and record["primary_location"]["source"]: + paper.publication_name = record["primary_location"]["source"]["display_name"] + return paper + + +def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None, + stop_key: str = None, n_max: int = None) -> int: + """OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。 + + base_pager: 已套好 filter 的 pyalex pager(年/日期/收录日期等过滤由调用方决定)。 + cache_key: 给定则逐页把「下一页游标」checkpoint 进去(抓完写 "DONE",已 DONE 直接 + 跳过)→ 真·断点续传;为 None 则不落游标,每次从头扫(适合短窗口增量)。 + stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint。 + 返回本次处理到的 record 条数。 + """ + if cache_key: + start = cache.get(cache_key, "*") + if start == "DONE": + return 0 + else: + start = "*" + pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate( + per_page=200, n_max=n_max, cursor=start) + seen = 0 for page in pager: - papers = [] - for record in page: - if record["doi"] and (record["display_name"] or record["title"]): - paper = Paper() - paper.id = idWorker.get_id() - paper.o_keywords = keywords - paper.o_search = search - paper.source = "openalex" - paper.type = "article" - paper.openalex_id = record["id"].split("/")[-1] - paper.doi = record["doi"].replace("https://doi.org/", "") - paper.title = record["display_name"] - paper.publication_date = record["publication_date"] - paper.publication_year = record["publication_year"] - if record["open_access"]: - paper.is_oa = record["open_access"]["is_oa"] - paper.oa_url = record["open_access"]["oa_url"] - if record["authorships"]: - paper.first_author = record["authorships"][0]["author"]["display_name"] - if record["authorships"][0]["institutions"]: - paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"] - if record["primary_location"] and record["primary_location"]["source"]: - paper.publication_name = record["primary_location"]["source"]["display_name"] - papers.append(paper) + papers = [ + _build_paper_from_record(record, keywords, search) + for record in page + if record["doi"] and (record["display_name"] or record["title"]) + ] + # ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的 Paper.objects.bulk_create(papers, ignore_conflicts=True) - cache.set(cache_key, next_cursor, timeout=None) + seen += len(papers) + nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾 + if cache_key: + cache.set(cache_key, nv if nv is not None else "DONE", timeout=None) + if nv is None or len(page) == 0: + if cache_key: + cache.set(cache_key, "DONE", timeout=None) + break + if stop_key and cache.get(stop_key): + break + return seen + + +@shared_task(base=CustomTask) +def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): + if not (keywords or search): + raise Exception("keywords or search must be provided") + cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" + base = Works().filter( + publication_year=publication_year, + has_doi=True, + type="article" + ) + base = _apply_keyword_search(base, keywords, search) + _crawl_openalex_query(base, keywords, search, cache_key=cache_key, + stop_key="get_paper_meta_from_openalex_stop") if cache.get("get_paper_meta_from_openalex_stop", None) is None: if end_year is None: end_year = datetime.now().year @@ -127,6 +172,248 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s countdown=5 ) + +def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str, + per_combo_max: int = None) -> int: + """针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在 + from_publication_date 之后发表的论文。短窗口扫描,不落游标(每次从头扫)。 + + 注:更贴切的「按收录时间增量」要用 from_created_date / from_updated_date,但这两个 + 过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的 + from_publication_date(发表日期,天级粒度)。代价:论文常在发表后若干天才被 OpenAlex + 收录,所以窗口要留得比调度间隔大,否则会漏掉「晚收录」的文。 + """ + base = Works().filter( + has_doi=True, + type="article", + from_publication_date=from_publication_date, + ) + base = _apply_keyword_search(base, keywords, search) + return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max) + + +@shared_task(base=CustomTask) +def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None): + """自动增量更新期刊论文索引(resm_paper)的主任务。 + + 遍历库里已抓过的每个 (o_keywords, o_search) 查询组合,用 OpenAlex 的 + from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库。 + (理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium, + 当前 key 无权限,故用公开可用的 from_publication_date。) + bulk_create(ignore_conflicts=True) 保证不重复。days 默认 30:论文常在发表后数天到 + 数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏「晚收录」的文。 + 由 django-celery-beat 每天调度一次。 + """ + from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat() + combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct()) + before = Paper.objects.count() + n_combos = 0 + for o_keywords, o_search in combos: + o_keywords = o_keywords or "" + o_search = o_search or "" + if not o_keywords and not o_search: + continue + n_combos += 1 + _fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max) + new_papers = Paper.objects.count() - before + return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}" + + +BACKFILL_STOP_KEY = "backfill_paper_meta_stop" + + +@shared_task(base=CustomTask) +def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None, + combo_index: int = 0, combos=None): + """按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。 + + 本任务只负责策略:抓哪些查询组合、按什么发表日期区间、怎么 chain。 + 具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query。 + 顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个, + 单个 task 短小、可随时停、从断点续。 + + 停 / 续(应对配额): + - 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain + - 续跑: cache.delete("backfill_paper_meta_stop") 后重新 + backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01") + 已 DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续。 + 配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续。 + """ + if cache.get(BACKFILL_STOP_KEY): + return "paused (backfill_paper_meta_stop set); clear it then re-run to resume" + + if combos is None: + # None 归一成 "" 并去重;排序保证每次续跑组合顺序一致 + combos = [list(c) for c in sorted({ + (c[0] or "", c[1] or "") + for c in Paper.objects.values_list("o_keywords", "o_search").distinct() + if (c[0] or c[1]) + })] + + def ckey(kw, search): + return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}" + + # 跳过已完成的组合 + while combo_index < len(combos): + kw, search = combos[combo_index] + if cache.get(ckey(kw, search)) == "DONE": + combo_index += 1 + continue + break + if combo_index >= len(combos): + return f"backfill done: {len(combos)} combos, from {from_publication_date}" + + kw, search = combos[combo_index] + cursor_key = ckey(kw, search) + base = Works().filter( + has_doi=True, type="article", from_publication_date=from_publication_date, + ) + if to_publication_date: + base = base.filter(to_publication_date=to_publication_date) + base = _apply_keyword_search(base, kw, search) + + try: + _crawl_openalex_query(base, kw, search, cache_key=cursor_key, + stop_key=BACKFILL_STOP_KEY) + except Exception as e: + # 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑 + cache.set(BACKFILL_STOP_KEY, True, timeout=None) + return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. " + f"clear {BACKFILL_STOP_KEY} then re-run to resume") + + if cache.get(cursor_key) != "DONE": + # 被 stop_key 打断,没抓完;游标已保留,等续跑 + return f"combo {combo_index}/{len(combos)} paused; cursor saved" + + # 该组合已抓完,chain 下一个 + current_app.send_task( + "apps.resm.tasks.backfill_paper_meta_from_openalex", + kwargs={ + "from_publication_date": from_publication_date, + "to_publication_date": to_publication_date, + "combo_index": combo_index + 1, + "combos": combos, + }, + countdown=3, + ) + return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next" + + +SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect" + + +def _build_paper_from_sd_result(r, qs_text: str): + """把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。 + SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex 少。 + 缺 doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)。 + """ + doi = r.get("doi") + title = r.get("title") + if not doi or not title: + return None + pub_date = r.get("publicationDate") # YYYY-MM-DD + year = None + if pub_date: + try: + year = int(str(pub_date)[:4]) + except (ValueError, TypeError): + year = None + if year is None: + return None + + paper = Paper() + paper.id = idWorker.get_id() + paper.source = "elsevier" + paper.type = "article" + paper.o_search = qs_text + paper.doi = str(doi).replace("https://doi.org/", "") + paper.title = title + paper.publication_date = pub_date if len(str(pub_date)) == 10 else None + paper.publication_year = year + paper.publication_name = r.get("sourceTitle") + authors = r.get("authors") or [] + if isinstance(authors, list) and authors: + # 取 order 最小的作者作为第一作者(部分作者可能被省略) + first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1) + if isinstance(first, dict): + paper.first_author = first.get("name") + paper.is_oa = bool(r.get("openAccess")) + return paper + + +def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100, + max_results: int = 200): + """调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。 + 返回 (papers, err_msg)。err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)。 + """ + headers = { + **ELSEVIER_HEADERS, + "Content-Type": "application/json", + "Accept": "application/json", + } + papers = [] + offset = 0 + with requests.Session() as req: + while offset < max_results: + body = { + "qs": qs_text, + "loadedAfter": loaded_after, + "display": {"offset": offset, "show": show, "sortBy": "date"}, + } + try: + res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body, + headers=headers, timeout=(3, 30)) + except requests.RequestException as e: + return papers, f"elsevier_search_request_error: {e}" + if res.status_code in (401, 403): + return papers, f"elsevier_search_no_entitlement: {res.status_code}" + if res.status_code != 200: + return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}" + results = (res.json() or {}).get("results") or [] + # 空结果时 SD 可能返回单个 {"error": ...},一并视为结束 + if not results or (len(results) == 1 and results[0].get("error")): + break + for r in results: + paper = _build_paper_from_sd_result(r, qs_text) + if paper is not None: + papers.append(paper) + if len(results) < show: + break + offset += show + return papers, "" + + +@shared_task(base=CustomTask) +def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200): + """ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。 + + 遍历库里已有的纯文本查询(o_search,OpenAlex 的 keyword id 无法用于 SD, + 故只用 o_search),用 loadedAfter 拉取最近 days 天 Elsevier 新入库的期刊文章。 + 只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex 对 + Elsevier 新刊收录的延迟。由 django-celery-beat 每天调度一次。 + """ + loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ") + qs_texts = [ + s.strip() for s in Paper.objects.exclude(o_search__isnull=True) + .exclude(o_search="").values_list("o_search", flat=True).distinct() + if s and s.strip() + ] + before = Paper.objects.count() + msgs = [] + for qs_text in qs_texts: + papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max) + if papers: + Paper.objects.bulk_create(papers, ignore_conflicts=True) + if err: + msgs.append(f"{qs_text}: {err}") + # key 没 Search 权限,后续 query 也没意义,直接停 + if "no_entitlement" in err: + break + new_papers = Paper.objects.count() - before + return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, " + f"since={loaded_after}, errs={msgs}") + + # 常用的 User-Agent 列表 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",