diff --git a/DESIGN.md b/DESIGN.md index 6b71e1a..596da64 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -611,7 +611,7 @@ create index on usage_events (model_profile, created_at); **数据模型(新表 `scheduled_jobs`,独立加表不碰现有 schema → 公测兼容)**: `id, user_id, name, prompt, cron, tz(默 Asia/Shanghai), mode(isolated|persistent), bound_task_id(可空), notify(JSONB 可空), enabled, timeout_seconds, next_run_at, last_run_at, last_status, last_error, last_task_id, consecutive_failures, expires_at(可空), created_at, deleted_at`。Alembic 加表 migration;`usage_events` 复用现成记账(可加 `kind="scheduled"` 自由文本区分,无需 migration)。 -**守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~30s 扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at`。 +**守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~10s(`ZCBOT_SCHEDULER_TICK_SECONDS`,只决定最坏延迟≤1tick、不决定会否漏 —— claim 取 `next_run<=now` 的全部)扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at`。 **croniter 选型**:存标准 5 段 cron 串 + 时区,`croniter` 算 `next_run_at`。理由:正确处理 dom/dow 同列的 vixie OR 语义和时区折算(手搓极易踩坑,四源都点名这个坑);纯 Python 小依赖。劣选:只支持"每天/每周 HH:MM"自己用 datetime 算 —— 零依赖但遇复杂周期要返工。 diff --git a/RUN.md b/RUN.md index b51571e..4effd6e 100644 --- a/RUN.md +++ b/RUN.md @@ -53,7 +53,7 @@ # SMTP_FROM=you@qq.com # 可选,默认取 SMTP_USER # 定时任务守护循环(DESIGN §8.5,随 web 进程起,plain-asyncio 仿 _disk_scanner): # ZCBOT_DISABLE_SCHEDULER=1 # 可选,整体关掉调度(对照 Claude Code CLAUDE_CODE_DISABLE_CRON) - # ZCBOT_SCHEDULER_TICK_SECONDS=30 # 可选,扫描间隔,默 30s + # ZCBOT_SCHEDULER_TICK_SECONDS=10 # 可选,扫描间隔,默 10s(只决定最坏延迟≤1tick,不影响会否漏) # ZCBOT_SCHEDULER_CONCURRENCY=4 # 可选,并发跑的定时 run 上限,默 4 ``` > litellm 在 import 时副作用加载 .env;入口走 `main.py`,`.env` 自动生效。直跑 `python -c "from core.storage import ..."` 不经 litellm 链路时记得自己 `import litellm` 触发,或手动 `export ZCBOT_DB_URL=...`。 diff --git a/core/__init__.py b/core/__init__.py index a0f7bb1..76f08ca 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,3 +1,3 @@ # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # 改版本只动这一行。 -__version__ = "0.20.0" +__version__ = "0.20.1" diff --git a/scripts/smoke_scheduler.py b/scripts/smoke_scheduler.py new file mode 100644 index 0000000..e194f50 --- /dev/null +++ b/scripts/smoke_scheduler.py @@ -0,0 +1,156 @@ +"""Smoke: 定时任务守护循环端到端(DESIGN §8.5)。 + +跑法(**需先在另一个终端起 web 服务** `.venv/Scripts/python.exe main.py web`): + .venv/Scripts/python.exe scripts/smoke_scheduler.py [--email a@b.com] + +干什么: + 1. 给某用户(默认 DB 第一个 / --email 指定)插一条 isolated 定时任务, + prompt 是"回一句早安、不调工具",并把 next_run_at 改成现在 → 让守护循环下一 tick 就认领。 + 2. 轮询 scheduled_jobs.last_status 直到翻成 ok/error/skipped(超时 180s)。 + 3. ok 则打印它新建的 task_id + agent 实际回复片段,证明全链路(认领→建 task→ + _run_agent_bg→LLM→回写 run_status→record_result)走通。 + 4. 收尾软删该 job(留下 task 供查看)。 + +**会真的发起一次 LLM 调用**(一句短回复,费用可忽略)。不测邮件 —— notify 投递另行验。 +""" +from __future__ import annotations + +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from uuid import UUID + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +# Windows 控制台默认 GBK,打印中文会乱码 → 强制 stdout UTF-8(只影响本脚本打印) +try: + sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined] +except Exception: + pass + +# 读 .env +env_file = ROOT / ".env" +if env_file.exists(): + for line in env_file.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, _, v = line.partition("=") + os.environ.setdefault(k.strip(), v.strip()) + +from sqlalchemy import select, update + +from core import scheduler +from core.storage import session_scope +from core.storage.models import Message, ScheduledJob, Task, User + +POLL_TIMEOUT = 180 # 秒 +POLL_INTERVAL = 3 +PROMPT = "请直接回复一句『早安,今天也加油!』。不要调用任何工具,不要创建文件,不要做其它事。" + + +def _pick_user(email: str | None) -> UUID | None: + with session_scope() as s: + if email: + return s.execute(select(User.user_id).where(User.email == email)).scalar_one_or_none() + return s.execute(select(User.user_id).order_by(User.created_at).limit(1)).scalar_one_or_none() + + +def _last_assistant_text(task_id: UUID) -> str: + """取该 task 最后一条 assistant 文本(payload JSONB)。""" + with session_scope() as s: + rows = s.execute( + select(Message.payload).where(Message.task_id == task_id) + .order_by(Message.idx.desc()).limit(20) + ).scalars().all() + for payload in rows: + if not isinstance(payload, dict): + continue + if payload.get("role") != "assistant": + continue + c = payload.get("content") + if isinstance(c, str) and c.strip(): + return c.strip() + if isinstance(c, list): # 富内容块 + for blk in c: + if isinstance(blk, dict) and isinstance(blk.get("text"), str) and blk["text"].strip(): + return blk["text"].strip() + return "(未找到 assistant 文本)" + + +def main() -> int: + email = None + if "--email" in sys.argv: + i = sys.argv.index("--email") + email = sys.argv[i + 1] if i + 1 < len(sys.argv) else None + + uid = _pick_user(email) + if uid is None: + print("[FAIL] DB 里没有用户(或 --email 未匹配)。先 main.py user add。") + return 1 + print(f"[..] 用户 {str(uid)[:8]} prompt={PROMPT[:24]}...") + + # 1) 建 job(cron 随便给个合法值,马上覆盖 next_run_at 为现在) + job = scheduler.create_job( + uid, name="[smoke] 早安测试", prompt=PROMPT, cron="*/5 * * * *", mode="isolated", + ) + jid = UUID(job["job_id"]) + with session_scope() as s: + s.execute(update(ScheduledJob).where(ScheduledJob.job_id == jid) + .values(next_run_at=datetime.now(timezone.utc))) + print(f"[ok] 已插入 job {job['short_id']},next_run 置为现在,等守护循环认领...") + print(" (若卡住不动 → 确认 web 服务在跑、ZCBOT_DISABLE_SCHEDULER 未设、tick 已过)") + + # 2) 轮询 last_status + deadline = time.time() + POLL_TIMEOUT + status = None + last_task_id = None + last_error = None + while time.time() < deadline: + time.sleep(POLL_INTERVAL) + with session_scope() as s: + row = s.execute( + select(ScheduledJob.last_status, ScheduledJob.last_task_id, + ScheduledJob.last_error, ScheduledJob.next_run_at) + .where(ScheduledJob.job_id == jid) + ).first() + if row is None: + print("[FAIL] job 不见了(被并发删?)") + return 1 + status, last_task_id, last_error = row.last_status, row.last_task_id, row.last_error + waited = int(time.time() - (deadline - POLL_TIMEOUT)) + print(f" [{waited:>3}s] last_status={status or '(待触发)'}") + if status in ("ok", "error", "skipped"): + break + + # 3) 结果 + print("-" * 50) + if status == "ok": + print(f"[PASS] 守护循环已触发并成功。task={str(last_task_id)[:8] if last_task_id else '?'}") + if last_task_id: + print(f" agent 回复: {_last_assistant_text(last_task_id)[:120]}") + rc = 0 + elif status == "error": + print(f"[FAIL] 触发了但 run 报错: {last_error}") + rc = 1 + elif status == "skipped": + print(f"[WARN] 被跳过(目标 task 正忙?): {last_error}") + rc = 1 + else: + print(f"[FAIL] {POLL_TIMEOUT}s 内未触发(last_status 仍为空)。web 服务/调度是否在跑?") + rc = 1 + + # 4) 收尾:软删 job(留 task) + try: + scheduler.cancel_job(uid, str(jid)) + print(f"[..] 已清理 smoke job {job['short_id']}(task 保留可查看)") + except Exception as e: + print(f"[..] 清理 job 失败(可手动删): {e}") + return rc + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/web/app.py b/web/app.py index 57d48ad..51f004a 100644 --- a/web/app.py +++ b/web/app.py @@ -700,11 +700,12 @@ def create_app() -> FastAPI: stats_logger_task = asyncio.create_task(_stats_logger(), name="stats-logger") # ── 定时任务守护循环(§8.5)── 仿 _disk_scanner 的 plain-asyncio 范式,不引 - # APScheduler/Celery。每 ~30s 认领到点 job(claim+advance next_run 防重复触发), - # 复用 _run_agent_bg 起 run,跑完确定性兜底投递 + 回写 last_*。ZCBOT_DISABLE_SCHEDULER=1 + # APScheduler/Celery。每 ~10s 认领到点 job(claim+advance next_run 防重复触发), + # 复用 _run_agent_bg 起 run,跑完确定性兜底投递 + 回写 last_*。间隔只决定最坏延迟 + # (≤1 tick),不决定会不会漏(claim 取 next_run<=now 的全部)。ZCBOT_DISABLE_SCHEDULER=1 # 整体关掉(对照 Claude Code CLAUDE_CODE_DISABLE_CRON)。 scheduler_enabled = os.getenv("ZCBOT_DISABLE_SCHEDULER", "").strip() not in ("1", "true", "yes") - sched_tick = int(os.getenv("ZCBOT_SCHEDULER_TICK_SECONDS", "30") or "30") + sched_tick = int(os.getenv("ZCBOT_SCHEDULER_TICK_SECONDS", "10") or "10") sched_sema = asyncio.Semaphore(int(os.getenv("ZCBOT_SCHEDULER_CONCURRENCY", "4") or "4")) async def _execute_scheduled_job(snap: dict) -> None: