Compare commits

..

No commits in common. "c5636b5131ea918bc7a504a7c3109dbce4c4a881" and "6a5a5d7b6bdbe7b8eb802335c3dd540872f017fb" have entirely different histories.

5 changed files with 57 additions and 546 deletions

View File

@ -1,14 +1,3 @@
from django.contrib import admin
from apps.resm.models import PaperMonitor
# Register your models here.
@admin.register(PaperMonitor)
class PaperMonitorAdmin(admin.ModelAdmin):
list_display = ("type", "name", "value", "note", "is_active",
"days", "last_run", "last_count")
list_filter = ("type", "is_active", "note")
search_fields = ("name", "value", "note")
list_editable = ("is_active", "days")
ordering = ("type", "name")

View File

@ -1,40 +0,0 @@
# Generated by Django 4.2.27 on 2026-06-21 15:23
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('resm', '0006_pg_trgm_index'),
]
operations = [
migrations.CreateModel(
name='PaperMonitor',
fields=[
('id', models.CharField(editable=False, help_text='主键ID', max_length=20, primary_key=True, serialize=False, verbose_name='主键ID')),
('create_time', models.DateTimeField(default=django.utils.timezone.now, help_text='创建时间', verbose_name='创建时间')),
('update_time', models.DateTimeField(auto_now=True, help_text='修改时间', verbose_name='修改时间')),
('is_deleted', models.BooleanField(default=False, help_text='删除标记', verbose_name='删除标记')),
('type', models.CharField(choices=[('journal', '期刊(ISSN)'), ('search', '搜索词(标题/摘要)'), ('keyword', 'OpenAlex关键词ID')], db_index=True, max_length=20, verbose_name='监控类型')),
('value', models.CharField(max_length=500, verbose_name='监控值')),
('name', models.CharField(blank=True, max_length=200, null=True, verbose_name='名称')),
('note', models.CharField(blank=True, max_length=100, null=True, verbose_name='方向标注')),
('is_active', models.BooleanField(db_index=True, default=True, verbose_name='启用')),
('days', models.IntegerField(default=30, verbose_name='回看窗口(天)')),
('last_run', models.DateTimeField(blank=True, null=True, verbose_name='上次运行时间')),
('last_count', models.IntegerField(default=0, verbose_name='上次拉取篇数(窗口内)')),
('create_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_create_by', to=settings.AUTH_USER_MODEL, verbose_name='创建人')),
('update_by', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_update_by', to=settings.AUTH_USER_MODEL, verbose_name='最后编辑人')),
],
options={
'verbose_name': '论文监控',
'verbose_name_plural': '论文监控',
},
),
]

View File

@ -1,85 +0,0 @@
"""种子数据:无机非金属材料方向的期刊 / 关键词监控,并注册每周的监控周期任务。
- 8 本方向期刊( ISSN,OpenAlex 跨出版商都能抓)
- 若干英文搜索词(OpenAlex 语料是英文,中文词搜不到,故用英文;note 标注方向)
- 注册 PeriodicTask: apps.resm.tasks.monitor_papers,每天 05:00 跑一次
get_or_create / update_or_create 保证迁移可安全重跑
"""
from django.db import migrations
from apps.utils.snowflake import idWorker
NOTE = "无机非金属材料"
JOURNALS = [
("0272-8842", "Ceramics International"),
("0955-2219", "Journal of the European Ceramic Society"),
("0008-8846", "Cement and Concrete Research"),
("0958-9465", "Cement and Concrete Composites"),
("0950-0618", "Construction and Building Materials"),
("0022-3093", "Journal of Non-Crystalline Solids"),
("0002-7820", "Journal of the American Ceramic Society"),
("0022-2461", "Journal of Materials Science"),
]
SEARCHES = [
("ceramics material", "陶瓷材料"),
("glass material", "玻璃材料"),
("cement", "水泥"),
("refractory material", "耐火材料"),
("crystalline material", "晶体材料"),
]
MONITOR_TASK = "apps.resm.tasks.monitor_papers"
MONITOR_NAME = "resm: 论文监控(期刊/关键词)"
def seed(apps, schema_editor):
PaperMonitor = apps.get_model("resm", "PaperMonitor")
for issn, name in JOURNALS:
PaperMonitor.objects.get_or_create(
type="journal", value=issn,
defaults={"id": idWorker.get_id(), "name": name, "note": NOTE,
"is_active": True, "days": 7},
)
for term, name in SEARCHES:
PaperMonitor.objects.get_or_create(
type="search", value=term,
defaults={"id": idWorker.get_id(), "name": name, "note": NOTE,
"is_active": True, "days": 7},
)
CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
sched, _ = CrontabSchedule.objects.get_or_create(
minute="0", hour="5", day_of_week="*", day_of_month="*", month_of_year="*",
)
PeriodicTask.objects.update_or_create(
name=MONITOR_NAME,
defaults={
"task": MONITOR_TASK,
"crontab": sched,
"interval": None,
"enabled": True,
"description": "每天 05:00 拉取监控期刊/关键词的最新论文元数据(OpenAlex,跨出版商)",
},
)
def unseed(apps, schema_editor):
PaperMonitor = apps.get_model("resm", "PaperMonitor")
PaperMonitor.objects.filter(type="journal", value__in=[i for i, _ in JOURNALS]).delete()
PaperMonitor.objects.filter(type="search", value__in=[t for t, _ in SEARCHES]).delete()
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(name=MONITOR_NAME).delete()
class Migration(migrations.Migration):
dependencies = [
("resm", "0008_papermonitor"),
("django_celery_beat", "__latest__"),
]
operations = [
migrations.RunPython(seed, unseed),
]

View File

@ -1,5 +1,5 @@
from django.db import models
from apps.utils.models import BaseModel, CommonAModel
from apps.utils.models import BaseModel
from django.conf import settings
import os
# Create your models here.
@ -107,32 +107,3 @@ class PaperAbstract(BaseModel):
max_length=20,
verbose_name="摘要来源" # openalex / elsevier / crossref
)
class PaperMonitor(CommonAModel):
"""论文监控订阅:监控任务遍历启用项,按 type 拼 OpenAlex 过滤,用
from_publication_date 拉最近 days 天的最新论文元数据入库(走通用核心,自动去重)
期刊监控与关键词监控共用本表, type 区分"""
TYPE_JOURNAL = "journal"
TYPE_SEARCH = "search"
TYPE_KEYWORD = "keyword"
TYPE_CHOICES = (
(TYPE_JOURNAL, "期刊(ISSN)"),
(TYPE_SEARCH, "搜索词(标题/摘要)"),
(TYPE_KEYWORD, "OpenAlex关键词ID"),
)
type = models.CharField("监控类型", max_length=20, choices=TYPE_CHOICES, db_index=True)
value = models.CharField("监控值", max_length=500) # ISSN / 搜索词 / keyword id
name = models.CharField("名称", max_length=200, null=True, blank=True)
note = models.CharField("方向标注", max_length=100, null=True, blank=True) # 如 无机非金属材料
is_active = models.BooleanField("启用", default=True, db_index=True)
days = models.IntegerField("回看窗口(天)", default=30)
last_run = models.DateTimeField("上次运行时间", null=True, blank=True)
last_count = models.IntegerField("上次拉取篇数(窗口内)", default=0)
class Meta:
verbose_name = "论文监控"
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.get_type_display()}:{self.name or self.value}"

View File

@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from apps.utils.tasks import CustomTask
from celery import shared_task
from pyalex import Works, config
from apps.resm.models import Paper, PaperAbstract, PaperMonitor
from apps.resm.models import Paper, PaperAbstract
from apps.utils.snowflake import idWorker
from django.core.cache import cache
import requests
@ -17,19 +17,19 @@ import sys
import os
from django.db.models import Q
from django.utils import timezone
from django.conf import settings
# 凭证集中在 config/conf.py (经 settings 的 `from config.conf import *` 暴露)
config.email = settings.OPENALEX_EMAIL
# config.email = "caoqianming@foxmail.com"
config.email = "caoqianming@ctc.ac.cn"
config.max_retries = 0
config.retry_backoff_factor = 0.1
config.retry_http_codes = [429, 500, 503]
config.api_key = settings.OPENALEX_API_KEY
OPENALEX_KEY = settings.OPENALEX_CONTENT_KEY # content.openalex.org PDF 下载
# OPENALEX_KEY = "4KJZdkCFA0uFb6IsYKc8cd"
OPENALEX_KEY = "NPimoE2ecdWmfdhH8abxEp"
config.api_key = "4KJZdkCFA0uFb6IsYKc8cd"
ELSEVIER_APIKEY = settings.ELSEVIER_API_KEY
ELSEVIER_APIKEY = 'aa8868cac9e27d6153ab0a0acd7b50bf'
ELSEVIER_HEADERS = {
"X-ELS-Insttoken": settings.ELSEVIER_INST_TOKEN,
"X-ELS-Insttoken": "135fa874aea9f0de11cad187ccb4878c",
"X-ELS-APIKey": ELSEVIER_APIKEY,
}
@ -52,20 +52,22 @@ def run_async(coro):
# Unix/Linux/Mac 上使用默认方式
return asyncio.run(coro)
# OpenAlex 元数据抓取统一选择的字段,全量抓取与增量更新共用
OPENALEX_SELECT_FIELDS = [
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]
def _apply_keyword_search(pager, keywords: str, search: str):
"""把 keywords / search 过滤条件套到 pyalex 的 pager 上。
keywords: '|' 表示 OR,',' 表示 AND,单值直接 filter
"""
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
cache_cursor = cache.get(cache_key, "*")
if keywords or search:
pass
else:
raise Exception("keywords or search must be provided")
# filter=keywords.id:clinker|cement
pager = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
if keywords:
# 支持 '|' 表示 OR',' 表示 AND。去除空白项。
if "|" in keywords:
keywords_list = [k.strip() for k in keywords.split("|") if k.strip()]
pager = pager.filter_or(keywords={"id": keywords_list})
@ -73,90 +75,43 @@ def _apply_keyword_search(pager, keywords: str, search: str):
keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
pager = pager.filter(keywords={"id": keywords_list})
else:
pager = pager.filter(keywords={"id": [keywords.strip()]})
keywords_list = [keywords.strip()]
pager = pager.filter(keywords={"id": keywords_list})
if search:
pager = pager.search_filter(title_and_abstract=search)
return pager
def _build_paper_from_record(record, keywords: str, search: str) -> Paper:
"""把 OpenAlex 单条 record 映射成未保存的 Paper 实例。"""
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
return paper
def _crawl_openalex_query(base_pager, keywords: str, search: str, cache_key: str = None,
stop_key: str = None, n_max: int = None) -> int:
"""OpenAlex 单查询抓取核心,全量 / 增量 / 回补共用,保证 checkpoint 行为一致。
base_pager: 已套好 filter pyalex pager(/日期/收录日期等过滤由调用方决定)
cache_key: 给定则逐页把下一页游标checkpoint 进去(抓完写 "DONE", DONE 直接
跳过) ·断点续传; None 则不落游标,每次从头扫(适合短窗口增量)
stop_key: 给定且命中时,当前页抓完即停(应对配额耗尽 / 手动暂停),游标已 checkpoint
返回本次处理到的 record 条数
"""
if cache_key:
start = cache.get(cache_key, "*")
if start == "DONE":
return 0
else:
start = "*"
pager = base_pager.select(OPENALEX_SELECT_FIELDS).paginate(
per_page=200, n_max=n_max, cursor=start)
seen = 0
pager = pager.select([
"id", "doi", "title", "publication_date",
"open_access", "authorships", "primary_location", "publication_year",
"display_name", "content_urls"
]).paginate(per_page=200, n_max=None, cursor=cache_cursor)
next_cursor = pager._next_value
for page in pager:
papers = [
_build_paper_from_record(record, keywords, search)
for record in page
if record["doi"] and (record["display_name"] or record["title"])
]
# ignore_conflicts:openalex_id / doi 已存在的自动跳过,只插新的
papers = []
for record in page:
if record["doi"] and (record["display_name"] or record["title"]):
paper = Paper()
paper.id = idWorker.get_id()
paper.o_keywords = keywords
paper.o_search = search
paper.source = "openalex"
paper.type = "article"
paper.openalex_id = record["id"].split("/")[-1]
paper.doi = record["doi"].replace("https://doi.org/", "")
paper.title = record["display_name"]
paper.publication_date = record["publication_date"]
paper.publication_year = record["publication_year"]
if record["open_access"]:
paper.is_oa = record["open_access"]["is_oa"]
paper.oa_url = record["open_access"]["oa_url"]
if record["authorships"]:
paper.first_author = record["authorships"][0]["author"]["display_name"]
if record["authorships"][0]["institutions"]:
paper.first_author_institution = record["authorships"][0]["institutions"][0]["display_name"]
if record["primary_location"] and record["primary_location"]["source"]:
paper.publication_name = record["primary_location"]["source"]["display_name"]
papers.append(paper)
Paper.objects.bulk_create(papers, ignore_conflicts=True)
seen += len(papers)
nv = pager._next_value # pyalex 在取完每页后才更新,None 表示已到末尾
if cache_key:
cache.set(cache_key, nv if nv is not None else "DONE", timeout=None)
if nv is None or len(page) == 0:
if cache_key:
cache.set(cache_key, "DONE", timeout=None)
break
if stop_key and cache.get(stop_key):
break
return seen
@shared_task(base=CustomTask)
def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:str="", end_year:int=None):
if not (keywords or search):
raise Exception("keywords or search must be provided")
cache_key = f"openalex_cursor_{publication_year}_{keywords}{search}"
base = Works().filter(
publication_year=publication_year,
has_doi=True,
type="article"
)
base = _apply_keyword_search(base, keywords, search)
_crawl_openalex_query(base, keywords, search, cache_key=cache_key,
stop_key="get_paper_meta_from_openalex_stop")
cache.set(cache_key, next_cursor, timeout=None)
if cache.get("get_paper_meta_from_openalex_stop", None) is None:
if end_year is None:
end_year = datetime.now().year
@ -172,285 +127,6 @@ def get_paper_meta_from_openalex(publication_year:int, keywords:str="", search:s
countdown=5
)
def _fetch_openalex_published_since(keywords: str, search: str, from_publication_date: str,
per_combo_max: int = None) -> int:
"""针对单个 (keywords, search) 组合,拉取 OpenAlex 中 publication_date 在
from_publication_date 之后发表的论文短窗口扫描,不落游标(每次从头扫)
:更贴切的按收录时间增量要用 from_created_date / from_updated_date,但这两个
过滤项需 OpenAlex Premium(当前 key 返回 "Plan upgrade required"),故退而用公开可用的
from_publication_date(发表日期,天级粒度)代价:论文常在发表后若干天才被 OpenAlex
收录,所以窗口要留得比调度间隔大,否则会漏掉晚收录的文
"""
base = Works().filter(
has_doi=True,
type="article",
from_publication_date=from_publication_date,
)
base = _apply_keyword_search(base, keywords, search)
return _crawl_openalex_query(base, keywords, search, n_max=per_combo_max)
@shared_task(base=CustomTask)
def update_paper_meta_from_openalex(days: int = 30, per_combo_max: int = None):
"""自动增量更新期刊论文索引(resm_paper)的主任务。
遍历库里已抓过的每个 (o_keywords, o_search) 查询组合, OpenAlex
from_publication_date 过滤拉取最近 days 天内"发表"的论文并入库
(理想是按收录时间 from_created_date / from_updated_date 增量,但需 Premium,
当前 key 无权限,故用公开可用的 from_publication_date)
bulk_create(ignore_conflicts=True) 保证不重复days 默认 30:论文常在发表后数天到
数周才被 OpenAlex 收录,窗口要足够大覆盖这个滞后,否则会漏晚收录的文
django-celery-beat 每天调度一次
"""
from_publication_date = (timezone.now() - timedelta(days=days)).date().isoformat()
combos = list(Paper.objects.values_list("o_keywords", "o_search").distinct())
before = Paper.objects.count()
n_combos = 0
for o_keywords, o_search in combos:
o_keywords = o_keywords or ""
o_search = o_search or ""
if not o_keywords and not o_search:
continue
n_combos += 1
_fetch_openalex_published_since(o_keywords, o_search, from_publication_date, per_combo_max)
new_papers = Paper.objects.count() - before
return f"openalex update: combos={n_combos}, new_papers={new_papers}, since={from_publication_date}"
BACKFILL_STOP_KEY = "backfill_paper_meta_stop"
@shared_task(base=CustomTask)
def backfill_paper_meta_from_openalex(from_publication_date: str, to_publication_date: str = None,
combo_index: int = 0, combos=None):
"""按发表日期一次性回补论文索引,支持断点续传(应对 OpenAlex 配额限制)。
本任务只负责策略:抓哪些查询组合按什么发表日期区间怎么 chain
具体的分页抓取与游标 checkpoint 复用通用核心 _crawl_openalex_query
顺序逐个 (o_keywords, o_search) 组合处理,一个组合抓完再 chain 下一个,
单个 task 短小可随时停从断点续
/ (应对配额):
- 暂停: cache.set("backfill_paper_meta_stop", True) # 当前页抓完即停,不再 chain
- 续跑: cache.delete("backfill_paper_meta_stop") 后重新
backfill_paper_meta_from_openalex.delay(from_publication_date="2026-02-01")
DONE 的组合自动跳过,未完成的从上次 checkpoint 的游标继续
配额耗尽 / 网络异常时也会保留游标并置停标志,重新 .delay 即从断点继续
"""
if cache.get(BACKFILL_STOP_KEY):
return "paused (backfill_paper_meta_stop set); clear it then re-run to resume"
if combos is None:
# None 归一成 "" 并去重;排序保证每次续跑组合顺序一致
combos = [list(c) for c in sorted({
(c[0] or "", c[1] or "")
for c in Paper.objects.values_list("o_keywords", "o_search").distinct()
if (c[0] or c[1])
})]
def ckey(kw, search):
return f"backfill_cursor_{from_publication_date}|{to_publication_date}|{kw}|{search}"
# 跳过已完成的组合
while combo_index < len(combos):
kw, search = combos[combo_index]
if cache.get(ckey(kw, search)) == "DONE":
combo_index += 1
continue
break
if combo_index >= len(combos):
return f"backfill done: {len(combos)} combos, from {from_publication_date}"
kw, search = combos[combo_index]
cursor_key = ckey(kw, search)
base = Works().filter(
has_doi=True, type="article", from_publication_date=from_publication_date,
)
if to_publication_date:
base = base.filter(to_publication_date=to_publication_date)
base = _apply_keyword_search(base, kw, search)
try:
_crawl_openalex_query(base, kw, search, cache_key=cursor_key,
stop_key=BACKFILL_STOP_KEY)
except Exception as e:
# 配额耗尽 / 网络等:游标已 checkpoint,置停标志,等手动续跑
cache.set(BACKFILL_STOP_KEY, True, timeout=None)
return (f"combo {combo_index}/{len(combos)} interrupted: {e!r}; cursor saved. "
f"clear {BACKFILL_STOP_KEY} then re-run to resume")
if cache.get(cursor_key) != "DONE":
# 被 stop_key 打断,没抓完;游标已保留,等续跑
return f"combo {combo_index}/{len(combos)} paused; cursor saved"
# 该组合已抓完,chain 下一个
current_app.send_task(
"apps.resm.tasks.backfill_paper_meta_from_openalex",
kwargs={
"from_publication_date": from_publication_date,
"to_publication_date": to_publication_date,
"combo_index": combo_index + 1,
"combos": combos,
},
countdown=3,
)
return f"combo {combo_index}/{len(combos)} done ({kw!r},{search!r}) -> next"
@shared_task(base=CustomTask)
def monitor_papers(monitor_id: str = None):
"""期刊 / 关键词监控:遍历启用的 PaperMonitor,按 type 拼 OpenAlex 过滤,用
from_publication_date 拉最近 days 天的最新论文元数据入库期刊与关键词监控共用本任务
- journal: value=ISSN -> 过滤 primary_location.source.issn(o_keywords/o_search 留空)
- search : value=英文搜索词 -> title_and_abstract.search(写回 o_search)
- keyword: value=OpenAlex keyword id -> keywords.id(写回 o_keywords)
复用 _crawl_openalex_query(短窗口,cache_key=None 每次从头扫);bulk_create
ignore_conflicts 自动与现有 Paper 去重monitor_id 给定则只跑该条 beat 每天调度
"""
qs = PaperMonitor.objects.filter(is_active=True)
if monitor_id:
qs = qs.filter(id=monitor_id)
results = []
for m in qs:
from_pub = (timezone.now() - timedelta(days=m.days or 30)).date().isoformat()
base = Works().filter(has_doi=True, type="article", from_publication_date=from_pub)
kw, search = "", ""
if m.type == PaperMonitor.TYPE_JOURNAL:
base = base.filter(primary_location={"source": {"issn": m.value}})
elif m.type == PaperMonitor.TYPE_SEARCH:
search = m.value
base = _apply_keyword_search(base, "", search)
elif m.type == PaperMonitor.TYPE_KEYWORD:
kw = m.value
base = _apply_keyword_search(base, kw, "")
else:
continue
seen = _crawl_openalex_query(base, kw, search)
m.last_run = timezone.now()
m.last_count = seen
m.save(update_fields=["last_run", "last_count", "update_time"])
results.append(f"{m.type}:{m.name or m.value}={seen}")
return "; ".join(results) or "no active monitors"
SCIENCEDIRECT_SEARCH_URL = "https://api.elsevier.com/content/search/sciencedirect"
def _build_paper_from_sd_result(r, qs_text: str):
"""把 ScienceDirect Search 单条 result 映射成未保存的 Paper 实例。
SD Search 不返回 oa_url / 摘要 / openalex_id,字段比 OpenAlex
doi / title / 年份的直接返回 None 跳过(model 里这几个字段非空)
"""
doi = r.get("doi")
title = r.get("title")
if not doi or not title:
return None
pub_date = r.get("publicationDate") # YYYY-MM-DD
year = None
if pub_date:
try:
year = int(str(pub_date)[:4])
except (ValueError, TypeError):
year = None
if year is None:
return None
paper = Paper()
paper.id = idWorker.get_id()
paper.source = "elsevier"
paper.type = "article"
paper.o_search = qs_text
paper.doi = str(doi).replace("https://doi.org/", "")
paper.title = title
paper.publication_date = pub_date if len(str(pub_date)) == 10 else None
paper.publication_year = year
paper.publication_name = r.get("sourceTitle")
authors = r.get("authors") or []
if isinstance(authors, list) and authors:
# 取 order 最小的作者作为第一作者(部分作者可能被省略)
first = min(authors, key=lambda a: a.get("order", 1) if isinstance(a, dict) else 1)
if isinstance(first, dict):
paper.first_author = first.get("name")
paper.is_oa = bool(r.get("openAccess"))
return paper
def _search_sciencedirect(qs_text: str, loaded_after: str, show: int = 100,
max_results: int = 200):
"""调用 ScienceDirect Search(PUT)拉取 loaded_after 之后新入库的 Elsevier 文章。
返回 (papers, err_msg)err_msg 非空表示该 query 出错(已尽量带回已拿到的部分)
"""
headers = {
**ELSEVIER_HEADERS,
"Content-Type": "application/json",
"Accept": "application/json",
}
papers = []
offset = 0
with requests.Session() as req:
while offset < max_results:
body = {
"qs": qs_text,
"loadedAfter": loaded_after,
"display": {"offset": offset, "show": show, "sortBy": "date"},
}
try:
res = req.put(SCIENCEDIRECT_SEARCH_URL, json=body,
headers=headers, timeout=(3, 30))
except requests.RequestException as e:
return papers, f"elsevier_search_request_error: {e}"
if res.status_code in (401, 403):
return papers, f"elsevier_search_no_entitlement: {res.status_code}"
if res.status_code != 200:
return papers, f"elsevier_search_error: {res.status_code} {res.text[:200]}"
results = (res.json() or {}).get("results") or []
# 空结果时 SD 可能返回单个 {"error": ...},一并视为结束
if not results or (len(results) == 1 and results[0].get("error")):
break
for r in results:
paper = _build_paper_from_sd_result(r, qs_text)
if paper is not None:
papers.append(paper)
if len(results) < show:
break
offset += show
return papers, ""
@shared_task(base=CustomTask)
def update_paper_meta_from_elsevier(days: int = 7, per_qs_max: int = 200):
"""ScienceDirect Search 增量补充任务,作为 OpenAlex 增量的补充。
遍历库里已有的纯文本查询(o_search,OpenAlex keyword id 无法用于 SD,
故只用 o_search), loadedAfter 拉取最近 days Elsevier 新入库的期刊文章
只覆盖 ScienceDirect 自家内容(基本是 DOI 10.1016),弥补 OpenAlex
Elsevier 新刊收录的延迟 django-celery-beat 每天调度一次
"""
loaded_after = (timezone.now() - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
qs_texts = [
s.strip() for s in Paper.objects.exclude(o_search__isnull=True)
.exclude(o_search="").values_list("o_search", flat=True).distinct()
if s and s.strip()
]
before = Paper.objects.count()
msgs = []
for qs_text in qs_texts:
papers, err = _search_sciencedirect(qs_text, loaded_after, max_results=per_qs_max)
if papers:
Paper.objects.bulk_create(papers, ignore_conflicts=True)
if err:
msgs.append(f"{qs_text}: {err}")
# key 没 Search 权限,后续 query 也没意义,直接停
if "no_entitlement" in err:
break
new_papers = Paper.objects.count() - before
return (f"elsevier update: qs={len(qs_texts)}, new_papers={new_papers}, "
f"since={loaded_after}, errs={msgs}")
# 常用的 User-Agent 列表
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",