406 lines
15 KiB
Python
406 lines
15 KiB
Python
"""定时任务调度核心(DESIGN §8.5)。
|
|
|
|
纯逻辑层:cron→next_run 计算、due 任务认领、跑完记账、确定性兜底投递。
|
|
**不碰 asyncio / broker / _run_agent_bg**(那些 web 专属编排留在 web/app.py 的
|
|
lifespan `_scheduler_loop`,仿 _disk_scanner 调本模块)。
|
|
|
|
为什么 claim 时就推进 next_run_at:守护循环每 ~30s 扫一次,若不在认领时把 job 的
|
|
next_run_at 推到下一个 cron 点,run 还没跑完时下一 tick 会把同一 job 重复触发。
|
|
claim+advance 一把事务做掉 → 天然防重复触发(at-most-once per slot)。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
from uuid import UUID, uuid4
|
|
|
|
from croniter import croniter
|
|
from sqlalchemy import select, update
|
|
|
|
from .storage import session_scope
|
|
from .storage.models import ScheduledJob
|
|
|
|
_MODES = ("isolated", "persistent")
|
|
|
|
try:
|
|
from zoneinfo import ZoneInfo
|
|
except ImportError: # pragma: no cover (py<3.9 不支持,本项目 3.11+)
|
|
ZoneInfo = None # type: ignore
|
|
|
|
# 连续失败到这个数自动停(防僵尸定时任务,DESIGN §8.5 expiry 安全界)
|
|
FAILURE_DISABLE_THRESHOLD = 5
|
|
# 单次 tick 最多认领多少 job(防一批同点任务一次性涌入)
|
|
CLAIM_LIMIT = 20
|
|
|
|
|
|
def validate_cron(expr: str) -> None:
|
|
"""非法 cron 抛 ValueError(给 schedule_create 工具做入参校验)。"""
|
|
expr = (expr or "").strip()
|
|
if not expr or not croniter.is_valid(expr):
|
|
raise ValueError(f"非法 cron 表达式: {expr!r}(需标准 5 段,如 '0 8 * * *')")
|
|
|
|
|
|
def _tzinfo(tz: str):
|
|
if ZoneInfo is None:
|
|
return timezone.utc
|
|
try:
|
|
return ZoneInfo(tz or "Asia/Shanghai")
|
|
except Exception:
|
|
return ZoneInfo("Asia/Shanghai")
|
|
|
|
|
|
def compute_next_run(cron: str, tz: str, after: Optional[datetime] = None) -> datetime:
|
|
"""按墙钟时区算下一个触发点,返回 UTC-aware datetime。
|
|
|
|
croniter 保留 base 的 tzinfo —— 把 base 折算到 job 的本地时区再算,
|
|
'0 8 * * *' 就是该时区的早上 8 点,而非 UTC 8 点(§8.5 时区坑)。
|
|
"""
|
|
tzinfo = _tzinfo(tz)
|
|
base = (after or datetime.now(timezone.utc)).astimezone(tzinfo)
|
|
nxt = croniter(cron, base).get_next(datetime)
|
|
return nxt.astimezone(timezone.utc)
|
|
|
|
|
|
def _snapshot(job: ScheduledJob) -> dict[str, Any]:
|
|
"""把 ORM 行拍成普通 dict,脱离 session 给编排层用(避免跨线程 lazy-load)。"""
|
|
return {
|
|
"job_id": job.job_id,
|
|
"user_id": job.user_id,
|
|
"name": job.name,
|
|
"prompt": job.prompt,
|
|
"cron": job.cron,
|
|
"tz": job.tz,
|
|
"mode": job.mode,
|
|
"bound_task_id": job.bound_task_id,
|
|
"skill": job.skill or "",
|
|
"model_profile": job.model_profile or "",
|
|
"notify": job.notify,
|
|
"timeout_seconds": job.timeout_seconds or 0,
|
|
}
|
|
|
|
|
|
def claim_due_jobs(now: Optional[datetime] = None, limit: int = CLAIM_LIMIT) -> list[dict[str, Any]]:
|
|
"""认领到点 job:一把事务 SELECT due + 推进 next_run_at,返回快照列表。
|
|
|
|
- 到点判据:enabled AND deleted_at IS NULL AND next_run_at <= now
|
|
- 过期(expires_at <= now)的:置 enabled=False 跳过,不返回(安全界)
|
|
- 其余:next_run_at 推到下一个 cron 点(防重复触发),返回快照交编排层去跑
|
|
"""
|
|
now = now or datetime.now(timezone.utc)
|
|
claimed: list[dict[str, Any]] = []
|
|
with session_scope() as s:
|
|
rows = s.execute(
|
|
select(ScheduledJob)
|
|
.where(
|
|
ScheduledJob.enabled.is_(True),
|
|
ScheduledJob.deleted_at.is_(None),
|
|
ScheduledJob.next_run_at <= now,
|
|
)
|
|
.order_by(ScheduledJob.next_run_at)
|
|
.limit(limit)
|
|
.with_for_update(skip_locked=True)
|
|
).scalars().all()
|
|
for job in rows:
|
|
if job.expires_at is not None and job.expires_at <= now:
|
|
job.enabled = False
|
|
job.last_status = "expired"
|
|
continue
|
|
claimed.append(_snapshot(job))
|
|
try:
|
|
job.next_run_at = compute_next_run(job.cron, job.tz, after=now)
|
|
except Exception:
|
|
# cron 莫名失效(理论上 create 时已校验)→ 停掉别让它卡死循环
|
|
job.enabled = False
|
|
job.last_status = "error"
|
|
job.last_error = "cron 计算失败,已自动停用"
|
|
return claimed
|
|
|
|
|
|
def record_result(
|
|
job_id: UUID,
|
|
*,
|
|
status: str,
|
|
task_id: Optional[UUID],
|
|
error: Optional[str] = None,
|
|
) -> None:
|
|
"""run 跑完(或 skip)后回写 last_*。ok 重置连续失败计数;error 累加,
|
|
到阈值自动停用。"""
|
|
now = datetime.now(timezone.utc)
|
|
with session_scope() as s:
|
|
job = s.get(ScheduledJob, job_id)
|
|
if job is None:
|
|
return
|
|
job.last_run_at = now
|
|
job.last_status = status
|
|
job.last_error = error
|
|
if task_id is not None:
|
|
job.last_task_id = task_id
|
|
if status == "ok":
|
|
job.run_count = (job.run_count or 0) + 1
|
|
job.consecutive_failures = 0
|
|
elif status == "error":
|
|
job.run_count = (job.run_count or 0) + 1
|
|
job.consecutive_failures = (job.consecutive_failures or 0) + 1
|
|
if job.consecutive_failures >= FAILURE_DISABLE_THRESHOLD:
|
|
job.enabled = False
|
|
job.last_error = (
|
|
f"连续失败 {job.consecutive_failures} 次,已自动停用。最后错误: {error}"
|
|
)
|
|
# status == "skipped"(persistent task 正忙)不动计数,下一 cron 点再来
|
|
|
|
|
|
def build_run_message(snapshot: dict[str, Any]) -> str:
|
|
"""把 job.prompt 包成一条带标记的用户消息喂进 agent。"""
|
|
when = datetime.now(_tzinfo(snapshot.get("tz") or "Asia/Shanghai")).strftime("%Y-%m-%d %H:%M")
|
|
name = snapshot.get("name") or "定时任务"
|
|
return (
|
|
f"[定时任务「{name}」自动触发 · {when}]\n\n"
|
|
f"{snapshot['prompt']}"
|
|
)
|
|
|
|
|
|
# ───────────── 第 3 层确定性兜底投递(notify) ─────────────
|
|
|
|
def _newest_artifact(working_dir: Path) -> Optional[Path]:
|
|
"""工作目录里最近修改的普通文件(跳过隐藏 / .preview 缓存)。"""
|
|
if not working_dir.is_dir():
|
|
return None
|
|
best: Optional[Path] = None
|
|
best_mtime = -1.0
|
|
for p in working_dir.rglob("*"):
|
|
if not p.is_file():
|
|
continue
|
|
if any(part.startswith(".") for part in p.relative_to(working_dir).parts):
|
|
continue
|
|
try:
|
|
m = p.stat().st_mtime
|
|
except OSError:
|
|
continue
|
|
if m > best_mtime:
|
|
best, best_mtime = p, m
|
|
return best
|
|
|
|
|
|
def deliver_notify(
|
|
notify: Optional[dict[str, Any]],
|
|
*,
|
|
job_name: str,
|
|
working_dir: Path,
|
|
tz: str,
|
|
) -> None:
|
|
"""job 配了 notify 就确定性补发(不靠 agent 记性)。目前仅 email 通道:
|
|
把工作目录最新产物当附件,套固定模板发。无产物则发纯文本告知已执行。
|
|
|
|
阻塞 IO(smtplib),由编排层放进 run_in_executor 调。失败抛异常,编排层吞掉记日志。
|
|
"""
|
|
if not notify or notify.get("channel") != "email":
|
|
return
|
|
to = notify.get("to")
|
|
if not to:
|
|
return
|
|
from tools.send_email import send_email_smtp # 延迟导入,避免 core→tools 顶层环依赖
|
|
|
|
when = datetime.now(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M")
|
|
artifact = _newest_artifact(working_dir)
|
|
if artifact is not None:
|
|
subject = f"[定时任务] {job_name} · {when}"
|
|
body = f"定时任务「{job_name}」已于 {when} 执行,产物见附件:{artifact.name}。"
|
|
send_email_smtp(to, subject, body, [artifact])
|
|
else:
|
|
subject = f"[定时任务] {job_name} · {when}(无产物文件)"
|
|
body = f"定时任务「{job_name}」已于 {when} 执行,本次未产生文件产物。"
|
|
send_email_smtp(to, subject, body)
|
|
|
|
|
|
# ───────────── CRUD 服务层(对话工具 + REST 端点共用,DESIGN §8.5)─────────────
|
|
#
|
|
# tools/schedule.py(对话)与 web/app.py 的 /v1/schedules(前端只读+停用/删除)都调
|
|
# 这一层,避免两条创建路径逻辑漂移。函数内自管 session、返回可序列化 dict(脱离
|
|
# session,跨 web/工具线程安全);校验失败抛 JobError → 工具转 [Error] 行 / REST 转 4xx。
|
|
|
|
WEEKDAYS_CN = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
|
|
|
|
|
|
class JobError(ValueError):
|
|
"""job 校验 / 找不到 —— 工具转 [Error],REST 转 400/404。"""
|
|
|
|
|
|
def describe_cron(cron: str, tz: str) -> str:
|
|
"""常见 cron → 人话(给前端/工具回显);不认的样式退回原 cron 串。"""
|
|
parts = (cron or "").split()
|
|
if len(parts) != 5:
|
|
return cron
|
|
mi, ho, dom, mon, dow = parts
|
|
hhmm = None
|
|
if mi.isdigit() and ho.isdigit():
|
|
hhmm = f"{int(ho):02d}:{int(mi):02d}"
|
|
if hhmm and dom == "*" and mon == "*" and dow == "*":
|
|
return f"每天 {hhmm}"
|
|
if hhmm and dom == "*" and mon == "*" and dow.isdigit():
|
|
wd = int(dow) % 7 # cron 0/7=周日;映射到 WEEKDAYS_CN(0=周一)
|
|
idx = 6 if wd == 0 else wd - 1
|
|
return f"每{WEEKDAYS_CN[idx]} {hhmm}"
|
|
if hhmm and dom.isdigit() and mon == "*" and dow == "*":
|
|
return f"每月 {int(dom)} 号 {hhmm}"
|
|
if mi.startswith("*/") and ho == "*" and dom == "*":
|
|
return f"每 {mi[2:]} 分钟"
|
|
if mi.isdigit() and ho.startswith("*/") and dom == "*":
|
|
return f"每 {ho[2:]} 小时(第 {int(mi)} 分)"
|
|
return cron
|
|
|
|
|
|
def job_to_dict(job: ScheduledJob) -> dict[str, Any]:
|
|
"""ORM 行 → API/工具用的可序列化快照。"""
|
|
def _iso(dt: Optional[datetime]) -> Optional[str]:
|
|
if dt is None:
|
|
return None
|
|
return (dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)).isoformat()
|
|
|
|
return {
|
|
"job_id": str(job.job_id),
|
|
"short_id": str(job.job_id)[:8],
|
|
"name": job.name,
|
|
"prompt": job.prompt,
|
|
"cron": job.cron,
|
|
"schedule_desc": describe_cron(job.cron, job.tz),
|
|
"tz": job.tz,
|
|
"mode": job.mode,
|
|
"skill": job.skill or "",
|
|
"model_profile": job.model_profile or "",
|
|
"notify": job.notify,
|
|
"enabled": job.enabled,
|
|
"timeout_seconds": job.timeout_seconds or 0,
|
|
"next_run_at": _iso(job.next_run_at),
|
|
"last_run_at": _iso(job.last_run_at),
|
|
"last_status": job.last_status,
|
|
"last_error": job.last_error,
|
|
"last_task_id": str(job.last_task_id) if job.last_task_id else None,
|
|
"consecutive_failures": job.consecutive_failures or 0,
|
|
"run_count": job.run_count or 0,
|
|
"expires_at": _iso(job.expires_at),
|
|
"created_at": _iso(job.created_at),
|
|
}
|
|
|
|
|
|
def _validate_tz(tz: str) -> str:
|
|
tz = (tz or "Asia/Shanghai").strip() or "Asia/Shanghai"
|
|
if ZoneInfo is not None:
|
|
try:
|
|
ZoneInfo(tz)
|
|
except Exception:
|
|
raise JobError(f"未知时区: {tz!r}(用 IANA 名,如 'Asia/Shanghai')")
|
|
return tz
|
|
|
|
|
|
def list_jobs(user_id: UUID) -> list[dict[str, Any]]:
|
|
with session_scope() as s:
|
|
rows = s.execute(
|
|
select(ScheduledJob)
|
|
.where(ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None))
|
|
.order_by(ScheduledJob.created_at.desc())
|
|
).scalars().all()
|
|
return [job_to_dict(j) for j in rows]
|
|
|
|
|
|
def create_job(
|
|
user_id: UUID,
|
|
*,
|
|
name: str,
|
|
prompt: str,
|
|
cron: str,
|
|
tz: str = "Asia/Shanghai",
|
|
mode: str = "isolated",
|
|
skill: str = "",
|
|
notify: Optional[dict[str, Any]] = None,
|
|
model_profile: str = "",
|
|
timeout_seconds: int = 0,
|
|
) -> dict[str, Any]:
|
|
name = (name or "").strip()
|
|
prompt = (prompt or "").strip()
|
|
cron = (cron or "").strip()
|
|
mode = (mode or "isolated").strip().lower()
|
|
if not name:
|
|
raise JobError("name 不能为空")
|
|
if not prompt:
|
|
raise JobError("prompt 不能为空")
|
|
if mode not in _MODES:
|
|
raise JobError(f"mode 必须是 {_MODES} 之一")
|
|
validate_cron(cron)
|
|
tz = _validate_tz(tz)
|
|
next_run = compute_next_run(cron, tz)
|
|
job = ScheduledJob(
|
|
job_id=uuid4(), user_id=user_id, name=name, prompt=prompt, cron=cron, tz=tz,
|
|
mode=mode, skill=(skill or "").strip(), notify=notify,
|
|
model_profile=(model_profile or "").strip(),
|
|
timeout_seconds=int(timeout_seconds or 0), next_run_at=next_run,
|
|
)
|
|
with session_scope() as s:
|
|
s.add(job)
|
|
s.flush()
|
|
return job_to_dict(job)
|
|
|
|
|
|
def _resolve(s, user_id: UUID, id_str: str) -> ScheduledJob:
|
|
"""按完整 UUID 或短 id 前缀定位当前用户的 job;0 或多匹配抛 JobError。"""
|
|
jid = (id_str or "").strip()
|
|
if not jid:
|
|
raise JobError("job_id 不能为空")
|
|
rows = s.execute(
|
|
select(ScheduledJob).where(
|
|
ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None)
|
|
)
|
|
).scalars().all()
|
|
matches = [j for j in rows if str(j.job_id) == jid or str(j.job_id).startswith(jid)]
|
|
if not matches:
|
|
raise JobError(f"没找到 id 以 {jid!r} 开头的定时任务")
|
|
if len(matches) > 1:
|
|
ids = ", ".join(str(j.job_id)[:8] for j in matches)
|
|
raise JobError(f"id 前缀 {jid!r} 匹配到多个({ids}),请用更长的 id")
|
|
return matches[0]
|
|
|
|
|
|
_EDITABLE = {"name", "prompt", "cron", "tz", "mode", "skill", "notify", "model_profile",
|
|
"timeout_seconds", "enabled"}
|
|
|
|
|
|
def update_job(user_id: UUID, id_str: str, **fields: Any) -> dict[str, Any]:
|
|
"""改 job 的任意可编辑字段;改了 cron/tz 则重算 next_run_at。"""
|
|
fields = {k: v for k, v in fields.items() if k in _EDITABLE and v is not None}
|
|
if not fields:
|
|
raise JobError("没有可更新的字段")
|
|
if "mode" in fields:
|
|
fields["mode"] = str(fields["mode"]).strip().lower()
|
|
if fields["mode"] not in _MODES:
|
|
raise JobError(f"mode 必须是 {_MODES} 之一")
|
|
if "cron" in fields:
|
|
validate_cron(str(fields["cron"]).strip())
|
|
fields["cron"] = str(fields["cron"]).strip()
|
|
if "tz" in fields:
|
|
fields["tz"] = _validate_tz(str(fields["tz"]))
|
|
with session_scope() as s:
|
|
job = _resolve(s, user_id, id_str)
|
|
for k, v in fields.items():
|
|
setattr(job, k, v)
|
|
if "cron" in fields or "tz" in fields:
|
|
job.next_run_at = compute_next_run(job.cron, job.tz)
|
|
# 重新启用 / 改了排程 → 清零连续失败计数,给它干净的重新开始
|
|
if fields.get("enabled") is True or "cron" in fields:
|
|
job.consecutive_failures = 0
|
|
s.flush()
|
|
return job_to_dict(job)
|
|
|
|
|
|
def set_enabled(user_id: UUID, id_str: str, enabled: bool) -> dict[str, Any]:
|
|
return update_job(user_id, id_str, enabled=bool(enabled))
|
|
|
|
|
|
def cancel_job(user_id: UUID, id_str: str) -> dict[str, Any]:
|
|
"""软删(deleted_at + enabled=False)。"""
|
|
with session_scope() as s:
|
|
job = _resolve(s, user_id, id_str)
|
|
job.deleted_at = datetime.now(timezone.utc)
|
|
job.enabled = False
|
|
s.flush()
|
|
return job_to_dict(job)
|