Compare commits

...

2 Commits

Author SHA1 Message Date
caoqianming c5636b5131 feat(resm): 期刊/关键词监控 PaperMonitor + 移除每日增量周期任务
- 新增 PaperMonitor model(type=journal/search/keyword、value、name、note、is_active、days、last_run、last_count)+ admin 管理
- 新增 monitor_papers 任务:遍历启用监控,journal→primary_location.source.issn / search→title_and_abstract / keyword→keywords.id,复用 _crawl_openalex_query 入库去重,每天 05:00 调度
- 迁移 0008 建表;0009 种子(8 本无机非金属材料期刊 + 5 英文方向词,note=无机非金属材料)并注册监控周期任务
- 移除 0007:update_paper_meta_from_openalex/elsevier 不再注册为每日周期任务(只需一次性回补,用 backfill_paper_meta_from_openalex);两任务函数保留供手动/回补调用

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 23:43:58 +08:00
caoqianming 7b38d4d234 feat(resm): 论文索引自动更新 + 通用 OpenAlex 抓取核心
- 新增通用核心 _crawl_openalex_query:单查询 cursor 分页 + 逐页游标 checkpoint + 停/续标志,全量抓取/每天增量/回补三者共用;顺手修复 get_paper_meta_from_openalex 原先把起始游标写回缓存、年中断点不能续传的 bug
- 新增 update_paper_meta_from_openalex:每天按 from_publication_date 增量(days=30)。from_created_date/from_updated_date 需 OpenAlex Premium,当前 key 无权限,故用发表日期
- 新增 update_paper_meta_from_elsevier:ScienceDirect Search(loadedAfter)补充 Elsevier 新刊
- 新增 backfill_paper_meta_from_openalex:按发表日期一次性回补,支持断点续传/配额暂停续跑
- tasks.py 凭证改从 settings 读取(集中到 gitignore 的 config/conf.py)
- migration 0007:注册两条每天的增量周期任务(OpenAlex 03:00 / Elsevier 04:00)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 15:12:04 +08:00
5 changed files with 546 additions and 57 deletions

View File

@ -1,3 +1,14 @@
from django.contrib import admin from django.contrib import admin
from apps.resm.models import PaperMonitor
# Register your models here. # Register your models here.
@admin.register(PaperMonitor)
class PaperMonitorAdmin(admin.ModelAdmin):
list_display = ("type", "name", "value", "note", "is_active",
"days", "last_run", "last_count")
list_filter = ("type", "is_active", "note")
search_fields = ("name", "value", "note")
list_editable = ("is_active", "days")
ordering = ("type", "name")

View File

@ -0,0 +1,40 @@
# Generated by Django 4.2.27 on 2026-06-21 15:23
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('resm', '0006_pg_trgm_index'),
]
operations = [
migrations.CreateModel(
name='PaperMonitor',
fields=[
('id', models.CharField(editable=False, help_text='主键ID', max_length=20, primary_key=True, serialize=False, verbose_name='主键ID')),
('create_time', models.DateTimeField(default=django.utils.timezone.now, help_text='创建时间', verbose_name='创建时间')),
('update_time', models.DateTimeField(auto_now=True, help_text='修改时间', verbose_name='修改时间')),
('is_deleted', models.BooleanField(default=False, help_text='删除标记', verbose_name='删除标记')),
('type', models.CharField(choices=[('journal', '期刊(ISSN)'), ('search', '搜索词(标题/摘要)'), ('keyword', 'OpenAlex关键词ID')], db_index=True, max_length=20, verbose_name='监控类型')),
('value', models.CharField(max_length=500, verbose_name='监控值')),
('name', models.CharField(blank=True, max_length=200, null=True, verbose_name='名称')),
('note', models.CharField(blank=True, max_length=100, null=True, verbose_name='方向标注')),
('is_active', models.BooleanField(db_index=True, default=True, verbose_name='启用')),
('days', models.IntegerField(default=30, verbose_name='回看窗口(天)')),
('last_run', models.DateTimeField(blank=True, null=True, verbose_name='上次运行时间')),
('last_count', models.IntegerField(default=0, verbose_name='上次拉取篇数(窗口内)')),
('create_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_create_by', to=settings.AUTH_USER_MODEL, verbose_name='创建人')),
('update_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_update_by', to=settings.AUTH_USER_MODEL, verbose_name='最后编辑人')),
],
options={
'verbose_name': '论文监控',
'verbose_name_plural': '论文监控',
},
),
]

View File

@ -0,0 +1,85 @@
"""种子数据:无机非金属材料方向的期刊 / 关键词监控,并注册每周的监控周期任务。
- 8 本方向期刊( ISSN,OpenAlex 跨出版商都能抓)
- 若干英文搜索词(OpenAlex 语料是英文,中文词搜不到,故用英文;note 标注方向)
- 注册 PeriodicTask: apps.resm.tasks.monitor_papers,每天 05:00 跑一次
get_or_create / update_or_create 保证迁移可安全重跑
"""
from django.db import migrations
from apps.utils.snowflake import idWorker
NOTE = "无机非金属材料"
JOURNALS = [
("0272-8842", "Ceramics International"),
("0955-2219", "Journal of the European Ceramic Society"),
("0008-8846", "Cement and Concrete Research"),
("0958-9465", "Cement and Concrete Composites"),
("0950-0618", "Construction and Building Materials"),
("0022-3093", "Journal of Non-Crystalline Solids"),
("0002-7820", "Journal of the American Ceramic Society"),
("0022-2461", "Journal of Materials Science"),
]
SEARCHES = [
("ceramics material", "陶瓷材料"),
("glass material", "玻璃材料"),
("cement", "水泥"),
("refractory material", "耐火材料"),
("crystalline material", "晶体材料"),
]
MONITOR_TASK = "apps.resm.tasks.monitor_papers"
MONITOR_NAME = "resm: 论文监控(期刊/关键词)"
def seed(apps, schema_editor):
PaperMonitor = apps.get_model("resm", "PaperMonitor")
for issn, name in JOURNALS:
PaperMonitor.objects.get_or_create(
type="journal", value=issn,
defaults={"id": idWorker.get_id(), "name": name, "note": NOTE,
"is_active": True, "days": 7},
)
for term, name in SEARCHES:
PaperMonitor.objects.get_or_create(
type="search", value=term,
defaults={"id": idWorker.get_id(), "name": name, "note": NOTE,
"is_active": True, "days": 7},
)
CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
sched, _ = CrontabSchedule.objects.get_or_create(
minute="0", hour="5", day_of_week="*", day_of_month="*", month_of_year="*",
)
PeriodicTask.objects.update_or_create(
name=MONITOR_NAME,
defaults={
"task": MONITOR_TASK,
"crontab": sched,
"interval": None,
"enabled": True,
"description": "每天 05:00 拉取监控期刊/关键词的最新论文元数据(OpenAlex,跨出版商)",
},
)
def unseed(apps, schema_editor):
PaperMonitor = apps.get_model("resm", "PaperMonitor")
PaperMonitor.objects.filter(type="journal", value__in=[i for i, _ in JOURNALS]).delete()
PaperMonitor.objects.filter(type="search", value__in=[t for t, _ in SEARCHES]).delete()
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(name=MONITOR_NAME).delete()
class Migration(migrations.Migration):
dependencies = [
("resm", "0008_papermonitor"),
("django_celery_beat", "__latest__"),
]
operations = [
migrations.RunPython(seed, unseed),
]

View File

@ -1,5 +1,5 @@
from django.db import models from django.db import models
from apps.utils.models import BaseModel from apps.utils.models import BaseModel, CommonAModel
from django.conf import settings from django.conf import settings
import os import os
# Create your models here. # Create your models here.
@ -107,3 +107,32 @@ class PaperAbstract(BaseModel):
max_length=20, max_length=20,
verbose_name="摘要来源" # openalex / elsevier / crossref verbose_name="摘要来源" # openalex / elsevier / crossref
) )
class PaperMonitor(CommonAModel):
"""论文监控订阅:监控任务遍历启用项,按 type 拼 OpenAlex 过滤,用
from_publication_date 拉最近 days 天的最新论文元数据入库(走通用核心,自动去重)
期刊监控与关键词监控共用本表, type 区分"""
TYPE_JOURNAL = "journal"
TYPE_SEARCH = "search"
TYPE_KEYWORD = "keyword"
TYPE_CHOICES = (
(TYPE_JOURNAL, "期刊(ISSN)"),
(TYPE_SEARCH, "搜索词(标题/摘要)"),
(TYPE_KEYWORD, "OpenAlex关键词ID"),
)
type = models.CharField("监控类型", max_length=20, choices=TYPE_CHOICES, db_index=True)
value = models.CharField("监控值", max_length=500) # ISSN / 搜索词 / keyword id
name = models.CharField("名称", max_length=200, null=True, blank=True)
note = models.CharField("方向标注", max_length=100, null=True, blank=True) # 如 无机非金属材料
is_active = models.BooleanField("启用", default=True, db_index=True)
days = models.IntegerField("回看窗口(天)", default=30)
last_run = models.DateTimeField("上次运行时间", null=True, blank=True)
last_count = models.IntegerField("上次拉取篇数(窗口内)", default=0)
class Meta:
verbose_name = "论文监控"
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.get_type_display()}:{self.name or self.value}"

View File

@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask from apps.utils.tasks import CustomTask
from celery import shared_task from celery import shared_task
from pyalex import Works, config from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract from apps.resm.models import Paper, PaperAbstract, PaperMonitor
from apps.utils.snowflake import idWorker from apps.utils.snowflake import idWorker
from django.core.cache import cache from django.core.cache import cache
import requests import requests
@ -17,19 +17,19 @@ import sys
import os import os
from django.db.models import Q from django.db.models import Q
from django.utils import timezone from django.utils import timezone
from django.conf import settings
# config.email = "caoqianming@foxmail.com" # 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露)
config.email = "caoqianming@ctc.ac.cn" config.email = settings.OPENALEX_EMAIL
config.max_retries = 0 config.max_retries = 0
config.retry_backoff_factor = 0.1 config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503] config.retry_http_codes = [429, 500, 503]
# OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd" config.api_key = settings.OPENALEX_API_KEY
OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp" OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf' ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY
ELSEVIER_HEADERS = { ELSEVIER_HEADERS = {
"X-ELS-Insttoken": "135fa874aea9f0de11cad187ccb4878c", "X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN,
"X-ELS-APIKey": ELSEVIER_APIKEY, "X-ELS-APIKey": ELSEVIER_APIKEY,
} }
@ -52,22 +52,20 @@ def run_async(coro):
# Unix/Linux/Mac 上使用默认方式 # Unix/Linux/Mac 上使用默认方式
return asyncio.run(coro) return asyncio.run(coro)
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None): # OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}" OPENALEX_SELECT_FIELDS = [
cache_cursor = cache.get(cache_key, "*") "id", "doi", "title", "publication_date",
if keywords or search: "open_access", "authorships", "primary_location", "publication_year",
pass "display_name", "content_urls"
else: ]
raise Exception("keywords or search must be provided")
# filter=keywords.id:clinker|cement
pager = Works().filter( def _apply_keyword_search(pager, keywords: str, search: str):
publication_year=publication_year, """把 keywords / search 过滤条件套到 pyalex 的 pager 上。
has_doi=True, keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter
type="article" """
)
if keywords: if keywords:
# 支持 '|' 表示 OR',' 表示 AND。去除空白项。
if "|" in keywords: if "|" in keywords:
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()] keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
pager = pager.filter_or(keywords={"id": keywords_list}) pager = pager.filter_or(keywords={"id": keywords_list})
@ -75,43 +73,90 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()] keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
pager = pager.filter(keywords={"id": keywords_list}) pager = pager.filter(keywords={"id": keywords_list})
else: else:
keywords_list = [keywords.strip()] pager = pager.filter(keywords={"id": [keywords.strip()]})
pager = pager.filter(keywords={"id": keywords_list})
if search: if search:
pager = pager.search_filter(title_and_abstract=search) pager = pager.search_filter(title_and_abstract=search)
pager = pager.select([ return pager
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls" def _build_paper_from_record(record, keywords: str, search: str) -> Paper:
]).paginate(per_page=200, n_max=None, cursor=cache_cursor) """把 OpenAlex 单条 record 映射成未保存的 Paper 实例。"""
next_cursor = pager._next_value paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
return paper
def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None,
stop_key: str = None, n_max: int = None) -> int:
"""OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。
base_pager: 已套好 filter pyalex pager(/日期/收录日期等过滤由调用方决定)
cache_key: 给定则逐页把下一页游标checkpoint 进去(抓完写 "DONE", DONE 直接
跳过) ·断点续传; None 则不落游标,每次从头扫(适合短窗口增量)
stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint
返回本次处理到的 record 条数
"""
if cache_key:
start = cache.get(cache_key, "*")
if start == "DONE":
return 0
else:
start = "*"
pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate(
per_page=200, n_max=n_max, cursor=start)
seen = 0
for page in pager: for page in pager:
papers = [] papers = [
for record in page: _build_paper_from_record(record, keywords, search)
if record["doi"] and (record["display_name"] or record["title"]): for record in page
paper = Paper() if record["doi"] and (record["display_name"] or record["title"])
paper.id = idWorker.get_id() ]
paper.o_keywords = keywords # ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True) Paper.objects.bulk_create(papers, ignore_conflicts=True)
cache.set(cache_key, next_cursor, timeout=None) seen += len(papers)
nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾
if cache_key:
cache.set(cache_key, nv if nv is not None else "DONE", timeout=None)
if nv is None or len(page) == 0:
if cache_key:
cache.set(cache_key, "DONE", timeout=None)
break
if stop_key and cache.get(stop_key):
break
return seen
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
if not (keywords or search):
raise Exception("keywords or search must be provided")
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
base = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
base = _apply_keyword_search(base, keywords, search)
_crawl_openalex_query(base, keywords, search, cache_key=cache_key,
stop_key="get_paper_meta_from_openalex_stop")
if cache.get("get_paper_meta_from_openalex_stop", None) is None: if cache.get("get_paper_meta_from_openalex_stop", None) is None:
if end_year is None: if end_year is None:
end_year = datetime.now().year end_year = datetime.now().year
@ -127,6 +172,285 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s
countdown=5 countdown=5
) )
def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str,
per_combo_max: int = None) -> int:
"""针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在
from_publication_date 之后发表的论文短窗口扫描,不落游标(每次从头扫)
:更贴切的按收录时间增量要用 from_created_date / from_updated_date,但这两个
过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的
from_publication_date(发表日期,天级粒度)代价:论文常在发表后若干天才被 OpenAlex
收录,所以窗口要留得比调度间隔大,否则会漏掉晚收录的文
"""
base = Works().filter(
has_doi=True,
type="article",
from_publication_date=from_publication_date,
)
base = _apply_keyword_search(base, keywords, search)
return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max)
@shared_task(base=CustomTask)
def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None):
"""自动增量更新期刊论文索引(resm_paper)的主任务。
遍历库里已抓过的每个 (o_keywords, o_search) 查询组合, OpenAlex
from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库
(理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium,
当前 key 无权限,故用公开可用的 from_publication_date)
bulk_create(ignore_conflicts=True) 保证不重复days 默认 30:论文常在发表后数天到
数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏晚收录的文
django-celery-beat 每天调度一次
"""
from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat()
combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct())
before = Paper.objects.count()
n_combos = 0
for o_keywords, o_search in combos:
o_keywords = o_keywords or ""
o_search = o_search or ""
if not o_keywords and not o_search:
continue
n_combos += 1
_fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max)
new_papers = Paper.objects.count() - before
return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}"
BACKFILL_STOP_KEY = "backfill_paper_meta_stop"
@shared_task(base=CustomTask)
def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None,
combo_index: int = 0, combos=None):
"""按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。
本任务只负责策略:抓哪些查询组合按什么发表日期区间怎么 chain
具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query
顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个,
单个 task 短小可随时停从断点续
/ (应对配额):
- 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain
- 续跑: cache.delete("backfill_paper_meta_stop") 后重新
backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01")
DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续
配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续
"""
if cache.get(BACKFILL_STOP_KEY):
return "paused (backfill_paper_meta_stop set); clear it then re-run to resume"
if combos is None:
# None 归一成 "" 并去重;排序保证每次续跑组合顺序一致
combos = [list(c) for c in sorted({
(c[0] or "", c[1] or "")
for c in Paper.objects.values_list("o_keywords", "o_search").distinct()
if (c[0] or c[1])
})]
def ckey(kw, search):
return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}"
# 跳过已完成的组合
while combo_index < len(combos):
kw, search = combos[combo_index]
if cache.get(ckey(kw, search)) == "DONE":
combo_index += 1
continue
break
if combo_index >= len(combos):
return f"backfill done: {len(combos)} combos, from {from_publication_date}"
kw, search = combos[combo_index]
cursor_key = ckey(kw, search)
base = Works().filter(
has_doi=True, type="article", from_publication_date=from_publication_date,
)
if to_publication_date:
base = base.filter(to_publication_date=to_publication_date)
base = _apply_keyword_search(base, kw, search)
try:
_crawl_openalex_query(base, kw, search, cache_key=cursor_key,
stop_key=BACKFILL_STOP_KEY)
except Exception as e:
# 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑
cache.set(BACKFILL_STOP_KEY, True, timeout=None)
return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. "
f"clear {BACKFILL_STOP_KEY} then re-run to resume")
if cache.get(cursor_key) != "DONE":
# 被 stop_key 打断,没抓完;游标已保留,等续跑
return f"combo {combo_index}/{len(combos)} paused; cursor saved"
# 该组合已抓完,chain 下一个
current_app.send_task(
"apps.resm.tasks.backfill_paper_meta_from_openalex",
kwargs={
"from_publication_date": from_publication_date,
"to_publication_date": to_publication_date,
"combo_index": combo_index + 1,
"combos": combos,
},
countdown=3,
)
return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next"
@shared_task(base=CustomTask)
def monitor_papers(monitor_id: str = None):
"""期刊 / 关键词监控:遍历启用的 PaperMonitor,按 type 拼 OpenAlex 过滤,用
from_publication_date 拉最近 days 天的最新论文元数据入库期刊与关键词监控共用本任务
- journal: value=ISSN -> 过滤 primary_location.source.issn(o_keywords/o_search 留空)
- search : value=英文搜索词 -> title_and_abstract.search(写回 o_search)
- keyword: value=OpenAlex keyword id -> keywords.id(写回 o_keywords)
复用 _crawl_openalex_query(短窗口,cache_key=None 每次从头扫);bulk_create
ignore_conflicts 自动与现有 Paper 去重monitor_id 给定则只跑该条 beat 每天调度
"""
qs = PaperMonitor.objects.filter(is_active=True)
if monitor_id:
qs = qs.filter(id=monitor_id)
results = []
for m in qs:
from_pub = (timezone.now() - timedelta(days=m.days or 30)).date().isoformat()
base = Works().filter(has_doi=True, type="article", from_publication_date=from_pub)
kw, search = "", ""
if m.type == PaperMonitor.TYPE_JOURNAL:
base = base.filter(primary_location={"source": {"issn": m.value}})
elif m.type == PaperMonitor.TYPE_SEARCH:
search = m.value
base = _apply_keyword_search(base, "", search)
elif m.type == PaperMonitor.TYPE_KEYWORD:
kw = m.value
base = _apply_keyword_search(base, kw, "")
else:
continue
seen = _crawl_openalex_query(base, kw, search)
m.last_run = timezone.now()
m.last_count = seen
m.save(update_fields=["last_run", "last_count", "update_time"])
results.append(f"{m.type}:{m.name or m.value}={seen}")
return "; ".join(results) or "no active monitors"
SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect"
def _build_paper_from_sd_result(r, qs_text: str):
"""把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。
SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex
doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)
"""
doi = r.get("doi")
title = r.get("title")
if not doi or not title:
return None
pub_date = r.get("publicationDate") # YYYY-MM-DD
year = None
if pub_date:
try:
year = int(str(pub_date)[:4])
except (ValueError, TypeError):
year = None
if year is None:
return None
paper = Paper()
paper.id = idWorker.get_id()
paper.source = "elsevier"
paper.type = "article"
paper.o_search = qs_text
paper.doi = str(doi).replace("https://doi.org/", "")
paper.title = title
paper.publication_date = pub_date if len(str(pub_date)) == 10 else None
paper.publication_year = year
paper.publication_name = r.get("sourceTitle")
authors = r.get("authors") or []
if isinstance(authors, list) and authors:
# 取 order 最小的作者作为第一作者(部分作者可能被省略)
first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1)
if isinstance(first, dict):
paper.first_author = first.get("name")
paper.is_oa = bool(r.get("openAccess"))
return paper
def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100,
max_results: int = 200):
"""调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。
返回 (papers, err_msg)err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)
"""
headers = {
**ELSEVIER_HEADERS,
"Content-Type": "application/json",
"Accept": "application/json",
}
papers = []
offset = 0
with requests.Session() as req:
while offset < max_results:
body = {
"qs": qs_text,
"loadedAfter": loaded_after,
"display": {"offset": offset, "show": show, "sortBy": "date"},
}
try:
res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body,
headers=headers, timeout=(3, 30))
except requests.RequestException as e:
return papers, f"elsevier_search_request_error: {e}"
if res.status_code in (401, 403):
return papers, f"elsevier_search_no_entitlement: {res.status_code}"
if res.status_code != 200:
return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}"
results = (res.json() or {}).get("results") or []
# 空结果时 SD 可能返回单个 {"error": ...},一并视为结束
if not results or (len(results) == 1 and results[0].get("error")):
break
for r in results:
paper = _build_paper_from_sd_result(r, qs_text)
if paper is not None:
papers.append(paper)
if len(results) < show:
break
offset += show
return papers, ""
@shared_task(base=CustomTask)
def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200):
"""ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。
遍历库里已有的纯文本查询(o_search,OpenAlex keyword id 无法用于 SD,
故只用 o_search), loadedAfter 拉取最近 days Elsevier 新入库的期刊文章
只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex
Elsevier 新刊收录的延迟 django-celery-beat 每天调度一次
"""
loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
qs_texts = [
s.strip() for s in Paper.objects.exclude(o_search__isnull=True)
.exclude(o_search="").values_list("o_search", flat=True).distinct()
if s and s.strip()
]
before = Paper.objects.count()
msgs = []
for qs_text in qs_texts:
papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max)
if papers:
Paper.objects.bulk_create(papers, ignore_conflicts=True)
if err:
msgs.append(f"{qs_text}: {err}")
# key 没 Search 权限,后续 query 也没意义,直接停
if "no_entitlement" in err:
break
new_papers = Paper.objects.count() - before
return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, "
f"since={loaded_after}, errs={msgs}")
# 常用的 User-Agent 列表 # 常用的 User-Agent 列表
USER_AGENTS = [ USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",