Compare commits
2 Commits
6a5a5d7b6b
...
c5636b5131
| Author | SHA1 | Date |
|---|---|---|
|
|
c5636b5131 | |
|
|
7b38d4d234 |
|
|
@ -1,3 +1,14 @@
|
|||
from django.contrib import admin
|
||||
from apps.resm.models import PaperMonitor
|
||||
|
||||
# Register your models here.
|
||||
|
||||
|
||||
@admin.register(PaperMonitor)
|
||||
class PaperMonitorAdmin(admin.ModelAdmin):
|
||||
list_display = ("type", "name", "value", "note", "is_active",
|
||||
"days", "last_run", "last_count")
|
||||
list_filter = ("type", "is_active", "note")
|
||||
search_fields = ("name", "value", "note")
|
||||
list_editable = ("is_active", "days")
|
||||
ordering = ("type", "name")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
# Generated by Django 4.2.27 on 2026-06-21 15:23
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
import django.utils.timezone
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
('resm', '0006_pg_trgm_index'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='PaperMonitor',
|
||||
fields=[
|
||||
('id', models.CharField(editable=False, help_text='主键ID', max_length=20, primary_key=True, serialize=False, verbose_name='主键ID')),
|
||||
('create_time', models.DateTimeField(default=django.utils.timezone.now, help_text='创建时间', verbose_name='创建时间')),
|
||||
('update_time', models.DateTimeField(auto_now=True, help_text='修改时间', verbose_name='修改时间')),
|
||||
('is_deleted', models.BooleanField(default=False, help_text='删除标记', verbose_name='删除标记')),
|
||||
('type', models.CharField(choices=[('journal', '期刊(ISSN)'), ('search', '搜索词(标题/摘要)'), ('keyword', 'OpenAlex关键词ID')], db_index=True, max_length=20, verbose_name='监控类型')),
|
||||
('value', models.CharField(max_length=500, verbose_name='监控值')),
|
||||
('name', models.CharField(blank=True, max_length=200, null=True, verbose_name='名称')),
|
||||
('note', models.CharField(blank=True, max_length=100, null=True, verbose_name='方向标注')),
|
||||
('is_active', models.BooleanField(db_index=True, default=True, verbose_name='启用')),
|
||||
('days', models.IntegerField(default=30, verbose_name='回看窗口(天)')),
|
||||
('last_run', models.DateTimeField(blank=True, null=True, verbose_name='上次运行时间')),
|
||||
('last_count', models.IntegerField(default=0, verbose_name='上次拉取篇数(窗口内)')),
|
||||
('create_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_create_by', to=settings.AUTH_USER_MODEL, verbose_name='创建人')),
|
||||
('update_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_update_by', to=settings.AUTH_USER_MODEL, verbose_name='最后编辑人')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': '论文监控',
|
||||
'verbose_name_plural': '论文监控',
|
||||
},
|
||||
),
|
||||
]
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
"""种子数据:无机非金属材料方向的期刊 / 关键词监控,并注册每周的监控周期任务。
|
||||
|
||||
- 8 本方向期刊(按 ISSN,OpenAlex 跨出版商都能抓)
|
||||
- 若干英文搜索词(OpenAlex 语料是英文,中文词搜不到,故用英文;note 标注方向)
|
||||
- 注册 PeriodicTask: apps.resm.tasks.monitor_papers,每天 05:00 跑一次
|
||||
get_or_create / update_or_create 保证迁移可安全重跑。
|
||||
"""
|
||||
from django.db import migrations
|
||||
from apps.utils.snowflake import idWorker
|
||||
|
||||
NOTE = "无机非金属材料"
|
||||
|
||||
JOURNALS = [
|
||||
("0272-8842", "Ceramics International"),
|
||||
("0955-2219", "Journal of the European Ceramic Society"),
|
||||
("0008-8846", "Cement and Concrete Research"),
|
||||
("0958-9465", "Cement and Concrete Composites"),
|
||||
("0950-0618", "Construction and Building Materials"),
|
||||
("0022-3093", "Journal of Non-Crystalline Solids"),
|
||||
("0002-7820", "Journal of the American Ceramic Society"),
|
||||
("0022-2461", "Journal of Materials Science"),
|
||||
]
|
||||
|
||||
SEARCHES = [
|
||||
("ceramics material", "陶瓷材料"),
|
||||
("glass material", "玻璃材料"),
|
||||
("cement", "水泥"),
|
||||
("refractory material", "耐火材料"),
|
||||
("crystalline material", "晶体材料"),
|
||||
]
|
||||
|
||||
MONITOR_TASK = "apps.resm.tasks.monitor_papers"
|
||||
MONITOR_NAME = "resm: 论文监控(期刊/关键词)"
|
||||
|
||||
|
||||
def seed(apps, schema_editor):
|
||||
PaperMonitor = apps.get_model("resm", "PaperMonitor")
|
||||
for issn, name in JOURNALS:
|
||||
PaperMonitor.objects.get_or_create(
|
||||
type="journal", value=issn,
|
||||
defaults={"id": idWorker.get_id(), "name": name, "note": NOTE,
|
||||
"is_active": True, "days": 7},
|
||||
)
|
||||
for term, name in SEARCHES:
|
||||
PaperMonitor.objects.get_or_create(
|
||||
type="search", value=term,
|
||||
defaults={"id": idWorker.get_id(), "name": name, "note": NOTE,
|
||||
"is_active": True, "days": 7},
|
||||
)
|
||||
|
||||
CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule")
|
||||
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
|
||||
sched, _ = CrontabSchedule.objects.get_or_create(
|
||||
minute="0", hour="5", day_of_week="*", day_of_month="*", month_of_year="*",
|
||||
)
|
||||
PeriodicTask.objects.update_or_create(
|
||||
name=MONITOR_NAME,
|
||||
defaults={
|
||||
"task": MONITOR_TASK,
|
||||
"crontab": sched,
|
||||
"interval": None,
|
||||
"enabled": True,
|
||||
"description": "每天 05:00 拉取监控期刊/关键词的最新论文元数据(OpenAlex,跨出版商)",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def unseed(apps, schema_editor):
|
||||
PaperMonitor = apps.get_model("resm", "PaperMonitor")
|
||||
PaperMonitor.objects.filter(type="journal", value__in=[i for i, _ in JOURNALS]).delete()
|
||||
PaperMonitor.objects.filter(type="search", value__in=[t for t, _ in SEARCHES]).delete()
|
||||
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
|
||||
PeriodicTask.objects.filter(name=MONITOR_NAME).delete()
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("resm", "0008_papermonitor"),
|
||||
("django_celery_beat", "__latest__"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(seed, unseed),
|
||||
]
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
from django.db import models
|
||||
from apps.utils.models import BaseModel
|
||||
from apps.utils.models import BaseModel, CommonAModel
|
||||
from django.conf import settings
|
||||
import os
|
||||
# Create your models here.
|
||||
|
|
@ -107,3 +107,32 @@ class PaperAbstract(BaseModel):
|
|||
max_length=20,
|
||||
verbose_name="摘要来源" # openalex / elsevier / crossref
|
||||
)
|
||||
|
||||
|
||||
class PaperMonitor(CommonAModel):
|
||||
"""论文监控订阅:监控任务遍历启用项,按 type 拼 OpenAlex 过滤,用
|
||||
from_publication_date 拉最近 days 天的最新论文元数据入库(走通用核心,自动去重)。
|
||||
期刊监控与关键词监控共用本表,靠 type 区分。"""
|
||||
TYPE_JOURNAL = "journal"
|
||||
TYPE_SEARCH = "search"
|
||||
TYPE_KEYWORD = "keyword"
|
||||
TYPE_CHOICES = (
|
||||
(TYPE_JOURNAL, "期刊(ISSN)"),
|
||||
(TYPE_SEARCH, "搜索词(标题/摘要)"),
|
||||
(TYPE_KEYWORD, "OpenAlex关键词ID"),
|
||||
)
|
||||
type = models.CharField("监控类型", max_length=20, choices=TYPE_CHOICES, db_index=True)
|
||||
value = models.CharField("监控值", max_length=500) # ISSN / 搜索词 / keyword id
|
||||
name = models.CharField("名称", max_length=200, null=True, blank=True)
|
||||
note = models.CharField("方向标注", max_length=100, null=True, blank=True) # 如 无机非金属材料
|
||||
is_active = models.BooleanField("启用", default=True, db_index=True)
|
||||
days = models.IntegerField("回看窗口(天)", default=30)
|
||||
last_run = models.DateTimeField("上次运行时间", null=True, blank=True)
|
||||
last_count = models.IntegerField("上次拉取篇数(窗口内)", default=0)
|
||||
|
||||
class Meta:
|
||||
verbose_name = "论文监控"
|
||||
verbose_name_plural = verbose_name
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.get_type_display()}:{self.name or self.value}"
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
|
|||
from apps.utils.tasks import CustomTask
|
||||
from celery import shared_task
|
||||
from pyalex import Works, config
|
||||
from apps.resm.models import Paper, PaperAbstract
|
||||
from apps.resm.models import Paper, PaperAbstract, PaperMonitor
|
||||
from apps.utils.snowflake import idWorker
|
||||
from django.core.cache import cache
|
||||
import requests
|
||||
|
|
@ -17,19 +17,19 @@ import sys
|
|||
import os
|
||||
from django.db.models import Q
|
||||
from django.utils import timezone
|
||||
from django.conf import settings
|
||||
|
||||
# config.email = "caoqianming@foxmail.com"
|
||||
config.email = "caoqianming@ctc.ac.cn"
|
||||
# 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露)
|
||||
config.email = settings.OPENALEX_EMAIL
|
||||
config.max_retries = 0
|
||||
config.retry_backoff_factor = 0.1
|
||||
config.retry_http_codes = [429, 500, 503]
|
||||
# OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd"
|
||||
OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp"
|
||||
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
|
||||
config.api_key = settings.OPENALEX_API_KEY
|
||||
OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载
|
||||
|
||||
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
|
||||
ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY
|
||||
ELSEVIER_HEADERS = {
|
||||
"X-ELS-Insttoken": "135fa874aea9f0de11cad187ccb4878c",
|
||||
"X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN,
|
||||
"X-ELS-APIKey": ELSEVIER_APIKEY,
|
||||
}
|
||||
|
||||
|
|
@ -52,22 +52,20 @@ def run_async(coro):
|
|||
# Unix/Linux/Mac 上使用默认方式
|
||||
return asyncio.run(coro)
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
|
||||
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
|
||||
cache_cursor = cache.get(cache_key, "*")
|
||||
if keywords or search:
|
||||
pass
|
||||
else:
|
||||
raise Exception("keywords or search must be provided")
|
||||
# filter=keywords.id:clinker|cement
|
||||
pager = Works().filter(
|
||||
publication_year=publication_year,
|
||||
has_doi=True,
|
||||
type="article"
|
||||
)
|
||||
|
||||
# OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用
|
||||
OPENALEX_SELECT_FIELDS = [
|
||||
"id", "doi", "title", "publication_date",
|
||||
"open_access", "authorships", "primary_location", "publication_year",
|
||||
"display_name", "content_urls"
|
||||
]
|
||||
|
||||
|
||||
def _apply_keyword_search(pager, keywords: str, search: str):
|
||||
"""把 keywords / search 过滤条件套到 pyalex 的 pager 上。
|
||||
keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter。
|
||||
"""
|
||||
if keywords:
|
||||
# 支持 '|' 表示 OR,',' 表示 AND。去除空白项。
|
||||
if "|" in keywords:
|
||||
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
|
||||
pager = pager.filter_or(keywords={"id": keywords_list})
|
||||
|
|
@ -75,43 +73,90 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s
|
|||
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
|
||||
pager = pager.filter(keywords={"id": keywords_list})
|
||||
else:
|
||||
keywords_list = [keywords.strip()]
|
||||
pager = pager.filter(keywords={"id": keywords_list})
|
||||
pager = pager.filter(keywords={"id": [keywords.strip()]})
|
||||
if search:
|
||||
pager = pager.search_filter(title_and_abstract=search)
|
||||
pager = pager.select([
|
||||
"id", "doi", "title", "publication_date",
|
||||
"open_access", "authorships", "primary_location", "publication_year",
|
||||
"display_name", "content_urls"
|
||||
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
|
||||
next_cursor = pager._next_value
|
||||
return pager
|
||||
|
||||
|
||||
def _build_paper_from_record(record, keywords: str, search: str) -> Paper:
|
||||
"""把 OpenAlex 单条 record 映射成未保存的 Paper 实例。"""
|
||||
paper = Paper()
|
||||
paper.id = idWorker.get_id()
|
||||
paper.o_keywords = keywords
|
||||
paper.o_search = search
|
||||
paper.source = "openalex"
|
||||
paper.type = "article"
|
||||
paper.openalex_id = record["id"].split("/")[-1]
|
||||
paper.doi = record["doi"].replace("https://doi.org/", "")
|
||||
paper.title = record["display_name"]
|
||||
paper.publication_date = record["publication_date"]
|
||||
paper.publication_year = record["publication_year"]
|
||||
if record["open_access"]:
|
||||
paper.is_oa = record["open_access"]["is_oa"]
|
||||
paper.oa_url = record["open_access"]["oa_url"]
|
||||
if record["authorships"]:
|
||||
paper.first_author = record["authorships"][0]["author"]["display_name"]
|
||||
if record["authorships"][0]["institutions"]:
|
||||
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
|
||||
if record["primary_location"] and record["primary_location"]["source"]:
|
||||
paper.publication_name = record["primary_location"]["source"]["display_name"]
|
||||
return paper
|
||||
|
||||
|
||||
def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None,
|
||||
stop_key: str = None, n_max: int = None) -> int:
|
||||
"""OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。
|
||||
|
||||
base_pager: 已套好 filter 的 pyalex pager(年/日期/收录日期等过滤由调用方决定)。
|
||||
cache_key: 给定则逐页把「下一页游标」checkpoint 进去(抓完写 "DONE",已 DONE 直接
|
||||
跳过)→ 真·断点续传;为 None 则不落游标,每次从头扫(适合短窗口增量)。
|
||||
stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint。
|
||||
返回本次处理到的 record 条数。
|
||||
"""
|
||||
if cache_key:
|
||||
start = cache.get(cache_key, "*")
|
||||
if start == "DONE":
|
||||
return 0
|
||||
else:
|
||||
start = "*"
|
||||
pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate(
|
||||
per_page=200, n_max=n_max, cursor=start)
|
||||
seen = 0
|
||||
for page in pager:
|
||||
papers = []
|
||||
for record in page:
|
||||
if record["doi"] and (record["display_name"] or record["title"]):
|
||||
paper = Paper()
|
||||
paper.id = idWorker.get_id()
|
||||
paper.o_keywords = keywords
|
||||
paper.o_search = search
|
||||
paper.source = "openalex"
|
||||
paper.type = "article"
|
||||
paper.openalex_id = record["id"].split("/")[-1]
|
||||
paper.doi = record["doi"].replace("https://doi.org/", "")
|
||||
paper.title = record["display_name"]
|
||||
paper.publication_date = record["publication_date"]
|
||||
paper.publication_year = record["publication_year"]
|
||||
if record["open_access"]:
|
||||
paper.is_oa = record["open_access"]["is_oa"]
|
||||
paper.oa_url = record["open_access"]["oa_url"]
|
||||
if record["authorships"]:
|
||||
paper.first_author = record["authorships"][0]["author"]["display_name"]
|
||||
if record["authorships"][0]["institutions"]:
|
||||
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
|
||||
if record["primary_location"] and record["primary_location"]["source"]:
|
||||
paper.publication_name = record["primary_location"]["source"]["display_name"]
|
||||
papers.append(paper)
|
||||
papers = [
|
||||
_build_paper_from_record(record, keywords, search)
|
||||
for record in page
|
||||
if record["doi"] and (record["display_name"] or record["title"])
|
||||
]
|
||||
# ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的
|
||||
Paper.objects.bulk_create(papers, ignore_conflicts=True)
|
||||
cache.set(cache_key, next_cursor, timeout=None)
|
||||
seen += len(papers)
|
||||
nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾
|
||||
if cache_key:
|
||||
cache.set(cache_key, nv if nv is not None else "DONE", timeout=None)
|
||||
if nv is None or len(page) == 0:
|
||||
if cache_key:
|
||||
cache.set(cache_key, "DONE", timeout=None)
|
||||
break
|
||||
if stop_key and cache.get(stop_key):
|
||||
break
|
||||
return seen
|
||||
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
|
||||
if not (keywords or search):
|
||||
raise Exception("keywords or search must be provided")
|
||||
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
|
||||
base = Works().filter(
|
||||
publication_year=publication_year,
|
||||
has_doi=True,
|
||||
type="article"
|
||||
)
|
||||
base = _apply_keyword_search(base, keywords, search)
|
||||
_crawl_openalex_query(base, keywords, search, cache_key=cache_key,
|
||||
stop_key="get_paper_meta_from_openalex_stop")
|
||||
if cache.get("get_paper_meta_from_openalex_stop", None) is None:
|
||||
if end_year is None:
|
||||
end_year = datetime.now().year
|
||||
|
|
@ -127,6 +172,285 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s
|
|||
countdown=5
|
||||
)
|
||||
|
||||
|
||||
def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str,
|
||||
per_combo_max: int = None) -> int:
|
||||
"""针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在
|
||||
from_publication_date 之后发表的论文。短窗口扫描,不落游标(每次从头扫)。
|
||||
|
||||
注:更贴切的「按收录时间增量」要用 from_created_date / from_updated_date,但这两个
|
||||
过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的
|
||||
from_publication_date(发表日期,天级粒度)。代价:论文常在发表后若干天才被 OpenAlex
|
||||
收录,所以窗口要留得比调度间隔大,否则会漏掉「晚收录」的文。
|
||||
"""
|
||||
base = Works().filter(
|
||||
has_doi=True,
|
||||
type="article",
|
||||
from_publication_date=from_publication_date,
|
||||
)
|
||||
base = _apply_keyword_search(base, keywords, search)
|
||||
return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max)
|
||||
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None):
|
||||
"""自动增量更新期刊论文索引(resm_paper)的主任务。
|
||||
|
||||
遍历库里已抓过的每个 (o_keywords, o_search) 查询组合,用 OpenAlex 的
|
||||
from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库。
|
||||
(理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium,
|
||||
当前 key 无权限,故用公开可用的 from_publication_date。)
|
||||
bulk_create(ignore_conflicts=True) 保证不重复。days 默认 30:论文常在发表后数天到
|
||||
数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏「晚收录」的文。
|
||||
由 django-celery-beat 每天调度一次。
|
||||
"""
|
||||
from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat()
|
||||
combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct())
|
||||
before = Paper.objects.count()
|
||||
n_combos = 0
|
||||
for o_keywords, o_search in combos:
|
||||
o_keywords = o_keywords or ""
|
||||
o_search = o_search or ""
|
||||
if not o_keywords and not o_search:
|
||||
continue
|
||||
n_combos += 1
|
||||
_fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max)
|
||||
new_papers = Paper.objects.count() - before
|
||||
return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}"
|
||||
|
||||
|
||||
BACKFILL_STOP_KEY = "backfill_paper_meta_stop"
|
||||
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None,
|
||||
combo_index: int = 0, combos=None):
|
||||
"""按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。
|
||||
|
||||
本任务只负责策略:抓哪些查询组合、按什么发表日期区间、怎么 chain。
|
||||
具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query。
|
||||
顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个,
|
||||
单个 task 短小、可随时停、从断点续。
|
||||
|
||||
停 / 续(应对配额):
|
||||
- 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain
|
||||
- 续跑: cache.delete("backfill_paper_meta_stop") 后重新
|
||||
backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01")
|
||||
已 DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续。
|
||||
配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续。
|
||||
"""
|
||||
if cache.get(BACKFILL_STOP_KEY):
|
||||
return "paused (backfill_paper_meta_stop set); clear it then re-run to resume"
|
||||
|
||||
if combos is None:
|
||||
# None 归一成 "" 并去重;排序保证每次续跑组合顺序一致
|
||||
combos = [list(c) for c in sorted({
|
||||
(c[0] or "", c[1] or "")
|
||||
for c in Paper.objects.values_list("o_keywords", "o_search").distinct()
|
||||
if (c[0] or c[1])
|
||||
})]
|
||||
|
||||
def ckey(kw, search):
|
||||
return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}"
|
||||
|
||||
# 跳过已完成的组合
|
||||
while combo_index < len(combos):
|
||||
kw, search = combos[combo_index]
|
||||
if cache.get(ckey(kw, search)) == "DONE":
|
||||
combo_index += 1
|
||||
continue
|
||||
break
|
||||
if combo_index >= len(combos):
|
||||
return f"backfill done: {len(combos)} combos, from {from_publication_date}"
|
||||
|
||||
kw, search = combos[combo_index]
|
||||
cursor_key = ckey(kw, search)
|
||||
base = Works().filter(
|
||||
has_doi=True, type="article", from_publication_date=from_publication_date,
|
||||
)
|
||||
if to_publication_date:
|
||||
base = base.filter(to_publication_date=to_publication_date)
|
||||
base = _apply_keyword_search(base, kw, search)
|
||||
|
||||
try:
|
||||
_crawl_openalex_query(base, kw, search, cache_key=cursor_key,
|
||||
stop_key=BACKFILL_STOP_KEY)
|
||||
except Exception as e:
|
||||
# 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑
|
||||
cache.set(BACKFILL_STOP_KEY, True, timeout=None)
|
||||
return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. "
|
||||
f"clear {BACKFILL_STOP_KEY} then re-run to resume")
|
||||
|
||||
if cache.get(cursor_key) != "DONE":
|
||||
# 被 stop_key 打断,没抓完;游标已保留,等续跑
|
||||
return f"combo {combo_index}/{len(combos)} paused; cursor saved"
|
||||
|
||||
# 该组合已抓完,chain 下一个
|
||||
current_app.send_task(
|
||||
"apps.resm.tasks.backfill_paper_meta_from_openalex",
|
||||
kwargs={
|
||||
"from_publication_date": from_publication_date,
|
||||
"to_publication_date": to_publication_date,
|
||||
"combo_index": combo_index + 1,
|
||||
"combos": combos,
|
||||
},
|
||||
countdown=3,
|
||||
)
|
||||
return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next"
|
||||
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
def monitor_papers(monitor_id: str = None):
|
||||
"""期刊 / 关键词监控:遍历启用的 PaperMonitor,按 type 拼 OpenAlex 过滤,用
|
||||
from_publication_date 拉最近 days 天的最新论文元数据入库。期刊与关键词监控共用本任务。
|
||||
|
||||
- journal: value=ISSN -> 过滤 primary_location.source.issn(o_keywords/o_search 留空)
|
||||
- search : value=英文搜索词 -> title_and_abstract.search(写回 o_search)
|
||||
- keyword: value=OpenAlex keyword id -> keywords.id(写回 o_keywords)
|
||||
复用 _crawl_openalex_query(短窗口,cache_key=None 每次从头扫);bulk_create
|
||||
ignore_conflicts 自动与现有 Paper 去重。monitor_id 给定则只跑该条。由 beat 每天调度。
|
||||
"""
|
||||
qs = PaperMonitor.objects.filter(is_active=True)
|
||||
if monitor_id:
|
||||
qs = qs.filter(id=monitor_id)
|
||||
results = []
|
||||
for m in qs:
|
||||
from_pub = (timezone.now() - timedelta(days=m.days or 30)).date().isoformat()
|
||||
base = Works().filter(has_doi=True, type="article", from_publication_date=from_pub)
|
||||
kw, search = "", ""
|
||||
if m.type == PaperMonitor.TYPE_JOURNAL:
|
||||
base = base.filter(primary_location={"source": {"issn": m.value}})
|
||||
elif m.type == PaperMonitor.TYPE_SEARCH:
|
||||
search = m.value
|
||||
base = _apply_keyword_search(base, "", search)
|
||||
elif m.type == PaperMonitor.TYPE_KEYWORD:
|
||||
kw = m.value
|
||||
base = _apply_keyword_search(base, kw, "")
|
||||
else:
|
||||
continue
|
||||
seen = _crawl_openalex_query(base, kw, search)
|
||||
m.last_run = timezone.now()
|
||||
m.last_count = seen
|
||||
m.save(update_fields=["last_run", "last_count", "update_time"])
|
||||
results.append(f"{m.type}:{m.name or m.value}={seen}")
|
||||
return "; ".join(results) or "no active monitors"
|
||||
|
||||
|
||||
SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect"
|
||||
|
||||
|
||||
def _build_paper_from_sd_result(r, qs_text: str):
|
||||
"""把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。
|
||||
SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex 少。
|
||||
缺 doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)。
|
||||
"""
|
||||
doi = r.get("doi")
|
||||
title = r.get("title")
|
||||
if not doi or not title:
|
||||
return None
|
||||
pub_date = r.get("publicationDate") # YYYY-MM-DD
|
||||
year = None
|
||||
if pub_date:
|
||||
try:
|
||||
year = int(str(pub_date)[:4])
|
||||
except (ValueError, TypeError):
|
||||
year = None
|
||||
if year is None:
|
||||
return None
|
||||
|
||||
paper = Paper()
|
||||
paper.id = idWorker.get_id()
|
||||
paper.source = "elsevier"
|
||||
paper.type = "article"
|
||||
paper.o_search = qs_text
|
||||
paper.doi = str(doi).replace("https://doi.org/", "")
|
||||
paper.title = title
|
||||
paper.publication_date = pub_date if len(str(pub_date)) == 10 else None
|
||||
paper.publication_year = year
|
||||
paper.publication_name = r.get("sourceTitle")
|
||||
authors = r.get("authors") or []
|
||||
if isinstance(authors, list) and authors:
|
||||
# 取 order 最小的作者作为第一作者(部分作者可能被省略)
|
||||
first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1)
|
||||
if isinstance(first, dict):
|
||||
paper.first_author = first.get("name")
|
||||
paper.is_oa = bool(r.get("openAccess"))
|
||||
return paper
|
||||
|
||||
|
||||
def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100,
|
||||
max_results: int = 200):
|
||||
"""调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。
|
||||
返回 (papers, err_msg)。err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)。
|
||||
"""
|
||||
headers = {
|
||||
**ELSEVIER_HEADERS,
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
papers = []
|
||||
offset = 0
|
||||
with requests.Session() as req:
|
||||
while offset < max_results:
|
||||
body = {
|
||||
"qs": qs_text,
|
||||
"loadedAfter": loaded_after,
|
||||
"display": {"offset": offset, "show": show, "sortBy": "date"},
|
||||
}
|
||||
try:
|
||||
res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body,
|
||||
headers=headers, timeout=(3, 30))
|
||||
except requests.RequestException as e:
|
||||
return papers, f"elsevier_search_request_error: {e}"
|
||||
if res.status_code in (401, 403):
|
||||
return papers, f"elsevier_search_no_entitlement: {res.status_code}"
|
||||
if res.status_code != 200:
|
||||
return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}"
|
||||
results = (res.json() or {}).get("results") or []
|
||||
# 空结果时 SD 可能返回单个 {"error": ...},一并视为结束
|
||||
if not results or (len(results) == 1 and results[0].get("error")):
|
||||
break
|
||||
for r in results:
|
||||
paper = _build_paper_from_sd_result(r, qs_text)
|
||||
if paper is not None:
|
||||
papers.append(paper)
|
||||
if len(results) < show:
|
||||
break
|
||||
offset += show
|
||||
return papers, ""
|
||||
|
||||
|
||||
@shared_task(base=CustomTask)
|
||||
def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200):
|
||||
"""ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。
|
||||
|
||||
遍历库里已有的纯文本查询(o_search,OpenAlex 的 keyword id 无法用于 SD,
|
||||
故只用 o_search),用 loadedAfter 拉取最近 days 天 Elsevier 新入库的期刊文章。
|
||||
只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex 对
|
||||
Elsevier 新刊收录的延迟。由 django-celery-beat 每天调度一次。
|
||||
"""
|
||||
loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
qs_texts = [
|
||||
s.strip() for s in Paper.objects.exclude(o_search__isnull=True)
|
||||
.exclude(o_search="").values_list("o_search", flat=True).distinct()
|
||||
if s and s.strip()
|
||||
]
|
||||
before = Paper.objects.count()
|
||||
msgs = []
|
||||
for qs_text in qs_texts:
|
||||
papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max)
|
||||
if papers:
|
||||
Paper.objects.bulk_create(papers, ignore_conflicts=True)
|
||||
if err:
|
||||
msgs.append(f"{qs_text}: {err}")
|
||||
# key 没 Search 权限,后续 query 也没意义,直接停
|
||||
if "no_entitlement" in err:
|
||||
break
|
||||
new_papers = Paper.objects.count() - before
|
||||
return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, "
|
||||
f"since={loaded_after}, errs={msgs}")
|
||||
|
||||
|
||||
# 常用的 User-Agent 列表
|
||||
USER_AGENTS = [
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||||
|
|
|
|||
Loading…
Reference in New Issue