zcbot/core/scheduler.py

432 lines
16 KiB
Python

"""定时任务调度核心(DESIGN §8.5)。
纯逻辑层:cron→next_run 计算、due 任务认领、跑完记账、确定性兜底投递。
**不碰 asyncio / broker / _run_agent_bg**(那些 web 专属编排留在 web/app.py 的
lifespan `_scheduler_loop`,仿 _disk_scanner 调本模块)。
为什么 claim 时就推进 next_run_at:守护循环每 ~30s 扫一次,若不在认领时把 job 的
next_run_at 推到下一个 cron 点,run 还没跑完时下一 tick 会把同一 job 重复触发。
claim+advance 一把事务做掉 → 天然防重复触发(at-most-once per slot)。
"""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from uuid import UUID, uuid4
from croniter import croniter
from sqlalchemy import select, update
from .storage import session_scope
from .storage.models import ScheduledJob
_MODES = ("isolated", "persistent")
try:
from zoneinfo import ZoneInfo
except ImportError: # pragma: no cover (py<3.9 不支持,本项目 3.11+)
ZoneInfo = None # type: ignore
# 连续失败到这个数自动停(防僵尸定时任务,DESIGN §8.5 expiry 安全界)
FAILURE_DISABLE_THRESHOLD = 5
# 单次 tick 最多认领多少 job(防一批同点任务一次性涌入)
CLAIM_LIMIT = 20
def validate_cron(expr: str) -> None:
"""非法 cron 抛 ValueError(给 schedule_create 工具做入参校验)。"""
expr = (expr or "").strip()
if not expr or not croniter.is_valid(expr):
raise ValueError(f"非法 cron 表达式: {expr!r}(需标准 5 段,如 '0 8 * * *')")
def _tzinfo(tz: str):
if ZoneInfo is None:
return timezone.utc
try:
return ZoneInfo(tz or "Asia/Shanghai")
except Exception:
return ZoneInfo("Asia/Shanghai")
def compute_next_run(cron: str, tz: str, after: Optional[datetime] = None) -> datetime:
"""按墙钟时区算下一个触发点,返回 UTC-aware datetime。
croniter 保留 base 的 tzinfo —— 把 base 折算到 job 的本地时区再算,
'0 8 * * *' 就是该时区的早上 8 点,而非 UTC 8 点(§8.5 时区坑)。
"""
tzinfo = _tzinfo(tz)
base = (after or datetime.now(timezone.utc)).astimezone(tzinfo)
nxt = croniter(cron, base).get_next(datetime)
return nxt.astimezone(timezone.utc)
def _snapshot(job: ScheduledJob) -> dict[str, Any]:
"""把 ORM 行拍成普通 dict,脱离 session 给编排层用(避免跨线程 lazy-load)。"""
return {
"job_id": job.job_id,
"user_id": job.user_id,
"name": job.name,
"prompt": job.prompt,
"cron": job.cron,
"tz": job.tz,
"mode": job.mode,
"bound_task_id": job.bound_task_id,
"skill": job.skill or "",
"model_profile": job.model_profile or "",
"notify": job.notify,
"timeout_seconds": job.timeout_seconds or 0,
}
def claim_due_jobs(now: Optional[datetime] = None, limit: int = CLAIM_LIMIT) -> list[dict[str, Any]]:
"""认领到点 job:一把事务 SELECT due + 推进 next_run_at,返回快照列表。
- 到点判据:enabled AND deleted_at IS NULL AND next_run_at <= now
- 过期(expires_at <= now)的:置 enabled=False 跳过,不返回(安全界)
- 其余:next_run_at 推到下一个 cron 点(防重复触发),返回快照交编排层去跑
"""
now = now or datetime.now(timezone.utc)
claimed: list[dict[str, Any]] = []
with session_scope() as s:
rows = s.execute(
select(ScheduledJob)
.where(
ScheduledJob.enabled.is_(True),
ScheduledJob.deleted_at.is_(None),
ScheduledJob.next_run_at <= now,
)
.order_by(ScheduledJob.next_run_at)
.limit(limit)
.with_for_update(skip_locked=True)
).scalars().all()
for job in rows:
if job.expires_at is not None and job.expires_at <= now:
job.enabled = False
job.last_status = "expired"
continue
claimed.append(_snapshot(job))
try:
job.next_run_at = compute_next_run(job.cron, job.tz, after=now)
except Exception:
# cron 莫名失效(理论上 create 时已校验)→ 停掉别让它卡死循环
job.enabled = False
job.last_status = "error"
job.last_error = "cron 计算失败,已自动停用"
return claimed
def record_result(
job_id: UUID,
*,
status: str,
task_id: Optional[UUID],
error: Optional[str] = None,
) -> None:
"""run 跑完(或 skip)后回写 last_*。ok 重置连续失败计数;error 累加,
到阈值自动停用。"""
now = datetime.now(timezone.utc)
with session_scope() as s:
job = s.get(ScheduledJob, job_id)
if job is None:
return
job.last_run_at = now
job.last_status = status
job.last_error = error
if task_id is not None:
job.last_task_id = task_id
if status == "ok":
job.run_count = (job.run_count or 0) + 1
job.consecutive_failures = 0
elif status == "error":
job.run_count = (job.run_count or 0) + 1
job.consecutive_failures = (job.consecutive_failures or 0) + 1
if job.consecutive_failures >= FAILURE_DISABLE_THRESHOLD:
job.enabled = False
job.last_error = (
f"连续失败 {job.consecutive_failures} 次,已自动停用。最后错误: {error}"
)
# status == "skipped"(persistent task 正忙)不动计数,下一 cron 点再来
def build_run_message(snapshot: dict[str, Any]) -> str:
"""把 job.prompt 包成一条带标记的用户消息喂进 agent。"""
when = datetime.now(_tzinfo(snapshot.get("tz") or "Asia/Shanghai")).strftime("%Y-%m-%d %H:%M")
name = snapshot.get("name") or "定时任务"
return (
f"[定时任务「{name}」自动触发 · {when}]\n\n"
f"{snapshot['prompt']}"
)
# ───────────── 第 3 层确定性兜底投递(notify) ─────────────
def _newest_artifact(working_dir: Path) -> Optional[Path]:
"""工作目录里最近修改的普通文件(跳过隐藏 / .preview 缓存)。"""
if not working_dir.is_dir():
return None
best: Optional[Path] = None
best_mtime = -1.0
for p in working_dir.rglob("*"):
if not p.is_file():
continue
if any(part.startswith(".") for part in p.relative_to(working_dir).parts):
continue
try:
m = p.stat().st_mtime
except OSError:
continue
if m > best_mtime:
best, best_mtime = p, m
return best
def _notify_email(to, job_name: str, when: str, artifact: Optional[Path]) -> None:
from tools.send_email import send_email_smtp # 延迟导入,避免 core→tools 顶层环依赖
if artifact is not None:
subject = f"[定时任务] {job_name} · {when}"
body = f"定时任务「{job_name}」已于 {when} 执行,产物见附件:{artifact.name}"
send_email_smtp(to, subject, body, [artifact])
else:
subject = f"[定时任务] {job_name} · {when}(无产物文件)"
body = f"定时任务「{job_name}」已于 {when} 执行,本次未产生文件产物。"
send_email_smtp(to, subject, body)
def deliver_notify(
notify: Optional[dict[str, Any]],
*,
job_name: str,
working_dir: Path,
tz: str,
user_id: Optional[Any] = None,
) -> None:
"""job 配了 notify 就确定性补发(不靠 agent 记性)。通道:
- `email`:把工作目录最新产物当附件发到 notify.to。
- `wechat`:把最新产物 + 一句话主动推到该用户已绑微信(§8.7);未送达(超 24h 窗口 /
未绑 / 未开口)且 notify 配了 `to`(邮箱)+ SMTP 在 → 退邮件兜底,否则抛错。
阻塞 IO(smtplib / httpx),由编排层放进 run_in_executor 调。失败抛异常,编排层吞掉记日志。
"""
if not notify:
return
channel = notify.get("channel")
when = datetime.now(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M")
artifact = _newest_artifact(working_dir)
if channel == "email":
to = notify.get("to")
if to:
_notify_email(to, job_name, when, artifact)
return
if channel == "wechat":
if user_id is None:
return
from core.wechat.service import send_to_user # 延迟导入,避免顶层环依赖
from tools.send_email import smtp_configured
text = (f"定时任务「{job_name}」已于 {when} 执行"
+ (f",产物:{artifact.name}" if artifact else ",本次未产生文件产物。"))
report = send_to_user(user_id, text, str(artifact) if artifact else None)
if report.delivered:
return
fb = notify.get("to") # 可选 fallback 邮箱
if fb and smtp_configured():
_notify_email(fb, job_name, when, artifact)
return
raise RuntimeError("微信推送未送达: " + ", ".join(r.reason for r in report.results))
# ───────────── CRUD 服务层(对话工具 + REST 端点共用,DESIGN §8.5)─────────────
#
# tools/schedule.py(对话)与 web/app.py 的 /v1/schedules(前端只读+停用/删除)都调
# 这一层,避免两条创建路径逻辑漂移。函数内自管 session、返回可序列化 dict(脱离
# session,跨 web/工具线程安全);校验失败抛 JobError → 工具转 [Error] 行 / REST 转 4xx。
WEEKDAYS_CN = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
class JobError(ValueError):
"""job 校验 / 找不到 —— 工具转 [Error],REST 转 400/404。"""
def describe_cron(cron: str, tz: str) -> str:
"""常见 cron → 人话(给前端/工具回显);不认的样式退回原 cron 串。"""
parts = (cron or "").split()
if len(parts) != 5:
return cron
mi, ho, dom, mon, dow = parts
hhmm = None
if mi.isdigit() and ho.isdigit():
hhmm = f"{int(ho):02d}:{int(mi):02d}"
if hhmm and dom == "*" and mon == "*" and dow == "*":
return f"每天 {hhmm}"
if hhmm and dom == "*" and mon == "*" and dow.isdigit():
wd = int(dow) % 7 # cron 0/7=周日;映射到 WEEKDAYS_CN(0=周一)
idx = 6 if wd == 0 else wd - 1
return f"{WEEKDAYS_CN[idx]} {hhmm}"
if hhmm and dom.isdigit() and mon == "*" and dow == "*":
return f"每月 {int(dom)}{hhmm}"
if mi.startswith("*/") and ho == "*" and dom == "*":
return f"{mi[2:]} 分钟"
if mi.isdigit() and ho.startswith("*/") and dom == "*":
return f"{ho[2:]} 小时(第 {int(mi)} 分)"
return cron
def job_to_dict(job: ScheduledJob) -> dict[str, Any]:
"""ORM 行 → API/工具用的可序列化快照。"""
def _iso(dt: Optional[datetime]) -> Optional[str]:
if dt is None:
return None
return (dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)).isoformat()
return {
"job_id": str(job.job_id),
"short_id": str(job.job_id)[:8],
"name": job.name,
"prompt": job.prompt,
"cron": job.cron,
"schedule_desc": describe_cron(job.cron, job.tz),
"tz": job.tz,
"mode": job.mode,
"skill": job.skill or "",
"model_profile": job.model_profile or "",
"notify": job.notify,
"enabled": job.enabled,
"timeout_seconds": job.timeout_seconds or 0,
"next_run_at": _iso(job.next_run_at),
"last_run_at": _iso(job.last_run_at),
"last_status": job.last_status,
"last_error": job.last_error,
"last_task_id": str(job.last_task_id) if job.last_task_id else None,
"consecutive_failures": job.consecutive_failures or 0,
"run_count": job.run_count or 0,
"expires_at": _iso(job.expires_at),
"created_at": _iso(job.created_at),
}
def _validate_tz(tz: str) -> str:
tz = (tz or "Asia/Shanghai").strip() or "Asia/Shanghai"
if ZoneInfo is not None:
try:
ZoneInfo(tz)
except Exception:
raise JobError(f"未知时区: {tz!r}(用 IANA 名,如 'Asia/Shanghai')")
return tz
def list_jobs(user_id: UUID) -> list[dict[str, Any]]:
with session_scope() as s:
rows = s.execute(
select(ScheduledJob)
.where(ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None))
.order_by(ScheduledJob.created_at.desc())
).scalars().all()
return [job_to_dict(j) for j in rows]
def create_job(
user_id: UUID,
*,
name: str,
prompt: str,
cron: str,
tz: str = "Asia/Shanghai",
mode: str = "isolated",
skill: str = "",
notify: Optional[dict[str, Any]] = None,
model_profile: str = "",
timeout_seconds: int = 0,
) -> dict[str, Any]:
name = (name or "").strip()
prompt = (prompt or "").strip()
cron = (cron or "").strip()
mode = (mode or "isolated").strip().lower()
if not name:
raise JobError("name 不能为空")
if not prompt:
raise JobError("prompt 不能为空")
if mode not in _MODES:
raise JobError(f"mode 必须是 {_MODES} 之一")
validate_cron(cron)
tz = _validate_tz(tz)
next_run = compute_next_run(cron, tz)
job = ScheduledJob(
job_id=uuid4(), user_id=user_id, name=name, prompt=prompt, cron=cron, tz=tz,
mode=mode, skill=(skill or "").strip(), notify=notify,
model_profile=(model_profile or "").strip(),
timeout_seconds=int(timeout_seconds or 0), next_run_at=next_run,
)
with session_scope() as s:
s.add(job)
s.flush()
return job_to_dict(job)
def _resolve(s, user_id: UUID, id_str: str) -> ScheduledJob:
"""按完整 UUID 或短 id 前缀定位当前用户的 job;0 或多匹配抛 JobError。"""
jid = (id_str or "").strip()
if not jid:
raise JobError("job_id 不能为空")
rows = s.execute(
select(ScheduledJob).where(
ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None)
)
).scalars().all()
matches = [j for j in rows if str(j.job_id) == jid or str(j.job_id).startswith(jid)]
if not matches:
raise JobError(f"没找到 id 以 {jid!r} 开头的定时任务")
if len(matches) > 1:
ids = ", ".join(str(j.job_id)[:8] for j in matches)
raise JobError(f"id 前缀 {jid!r} 匹配到多个({ids}),请用更长的 id")
return matches[0]
_EDITABLE = {"name", "prompt", "cron", "tz", "mode", "skill", "notify", "model_profile",
"timeout_seconds", "enabled"}
def update_job(user_id: UUID, id_str: str, **fields: Any) -> dict[str, Any]:
"""改 job 的任意可编辑字段;改了 cron/tz 则重算 next_run_at。"""
fields = {k: v for k, v in fields.items() if k in _EDITABLE and v is not None}
if not fields:
raise JobError("没有可更新的字段")
if "mode" in fields:
fields["mode"] = str(fields["mode"]).strip().lower()
if fields["mode"] not in _MODES:
raise JobError(f"mode 必须是 {_MODES} 之一")
if "cron" in fields:
validate_cron(str(fields["cron"]).strip())
fields["cron"] = str(fields["cron"]).strip()
if "tz" in fields:
fields["tz"] = _validate_tz(str(fields["tz"]))
with session_scope() as s:
job = _resolve(s, user_id, id_str)
for k, v in fields.items():
setattr(job, k, v)
if "cron" in fields or "tz" in fields:
job.next_run_at = compute_next_run(job.cron, job.tz)
# 重新启用 / 改了排程 → 清零连续失败计数,给它干净的重新开始
if fields.get("enabled") is True or "cron" in fields:
job.consecutive_failures = 0
s.flush()
return job_to_dict(job)
def set_enabled(user_id: UUID, id_str: str, enabled: bool) -> dict[str, Any]:
return update_job(user_id, id_str, enabled=bool(enabled))
def cancel_job(user_id: UUID, id_str: str) -> dict[str, Any]:
"""软删(deleted_at + enabled=False)。"""
with session_scope() as s:
job = _resolve(s, user_id, id_str)
job.deleted_at = datetime.now(timezone.utc)
job.enabled = False
s.flush()
return job_to_dict(job)