feat(scheduler): 定时任务 v1 — 对话建/管 + 守护循环执行 + 只读前端 (DESIGN §8.5)

到点把一句自然语言 prompt 喂进 agent 主管线,可跑 skill 出简报 / 发邮件 / 打招呼等。
job 本体 = cron+时区 + prompt + 会话模式;"发邮件"不是字段,是 agent 据 prompt 调
send_email 的动作 → 加任何能力不改 schema。

后端:
- scheduled_jobs 表 + migration 0011(独立加表,公测兼容)
- core/scheduler.py:croniter 算 next_run(时区+vixie OR 语义)、claim+advance 防重复触发、
  失败阈值自停、notify 兜底投递、CRUD 服务层(工具与 REST 共用不漂移)
- 守护循环 _scheduler_loop(lifespan,仿 _disk_scanner 的 plain-asyncio,不引 APScheduler/Celery;
  复用 _run_agent_bg,抢 run 锁、超时协作 cancel、并发上限)
- tools/send_email.py(host-side,SMTP_* 齐才挂)
- /v1/schedules GET/PATCH/DELETE 三端点

对话端 = 完整 CRUD:schedule_create/list/update/cancel 四工具(定时 run 内不挂防自我繁殖)。

前端 = 只读 + 停用/删除:左栏 rail「定时」入口 + crons.js 只读 master-detail modal
(复用 skills modal 范式);建/改故意只走对话,规避 cron 构建器 UX。

会话模式:isolated(默认,每次新建临时 task 省 token)/ persistent(绑 bound_task_id 续上下文)。
env:SMTP_* / ZCBOT_DISABLE_SCHEDULER / ZCBOT_SCHEDULER_TICK_SECONDS / ZCBOT_SCHEDULER_CONCURRENCY。

已验:migration 上库、CRUD 端到端、3 REST + 4 工具注册、crons.js 语法。
待验:起 web 进程跑一轮真实触发 + 邮件 smoke。bump 0.18.0 → 0.19.0。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-06-18 13:42:31 +08:00
parent 4f61b5fc56
commit 108351864e
15 changed files with 1417 additions and 4 deletions

View File

@ -589,6 +589,59 @@ create index on usage_events (model_profile, created_at);
**搁置(成本不抵当前收益)**:gunicorn 无感换版 / broker 外置 Redis / nginx 蓝绿双实例 —— 留到"单机线程池调到头仍不够"或"换版断流成真实投诉"再议(无感换版需先把 broker 外置共享,分析见 RUN.md §B)。
### 8.5 定时任务 / 计划运行(Scheduled Jobs)(2026-06-18 设计,status=design)
**缺口**:无任何定时触发机制。但有价值的活很多是**时间驱动**而非事件驱动 —— 每日简报、每周综述、定时拉数据存盘、早安提醒。当前必须有人在对话里手动发消息才跑得起来。诉求:**用对话方式创建**"每天 X 点干 Y"的任务,到点自动跑、结果送达。
**业界印证(四源高度收敛)**:OpenClaw `cron-jobs` / Autobot(agent-loop×cron)/ Claude Code routines / geta.team 自建调度器,关键模式一致 —— ① cron 到点**往同一条 agent 主管线注入一条带标记的消息**,不另起执行路径;② 三种会话隔离模式(isolated 默认 / persistent 续上下文 / main 系统事件);③ isolated 运行到期自动 prune;④ 退避重试(transient vs permanent);⑤ per-job 超时;⑥ 投递显式 + runner 兜底(OpenClaw `--announce`);⑦ 5 段 cron + 时区,警惕 dom/dow 同列的 vixie OR 语义坑;⑧ 持久化用 DB,管理三件套(`cron_create/list/delete`);⑨ 对话式自然语言创建即标准做法。
**核心洞察(把方案收口到极简)**:定时任务本体 = `什么时候(cron+时区)` + `做什么(一句自然语言 prompt)` + `跑在哪(会话模式)`。**复用现成 agent 主管线**(`web/app.py:_run_agent_bg`,§3.6 / §7.2 同一条 POST /messages 路径),守护循环只负责"到点把一条带 `[定时任务]` 标记的 prompt 喂进去",**不造第二套跑 agent 的逻辑**。
> **关键解耦:"发邮件"不是一等公民,是 agent 据 prompt 调工具的一个动作。** job 模型只存 prompt,"做什么 / 结果发哪"全在那句话里(发邮件→调 `send_email`;出简报→`load_skill` 落盘;打招呼→回一句话)。好处:未来加任何能力(telegram / webhook / 落盘 / 调 API)**不改 schema**,只要 agent 有对应工具、prompt 说清楚。
**三层投递(没人盯着看 → 结果不能丢)**:
1. **baseline(永远有,零配置)**:定时 run 就是正常 run,结果**必进对应 task 线程**;守护循环跑完给该 task 打**未读/通知标记**,用户下次登录可见。
2. **opt-in 推送(prompt 驱动)**:要发邮件/(将来)telegram → prompt 里说,agent 调工具发。灵活、能写动态正文。
3. **可靠兜底(可选结构化 `notify`)**:某 job 要"必达某邮箱、不靠 AI 记性" → job 带 `notify={channel,to}`,守护循环 run 完**确定性补发**最新产物。不填走第 1 层。
**会话模式(隔离轴,业界核心设计点)**:
- **isolated(默认)**:每次触发新建临时 task,只带 job 的 prompt + skill,**不继承对话历史**。上下文最小 → 省 token(契合 high-turn 烧 token 治理,§8.2 / [[project_high_turn_token_burn_root_causes]]);临时 task 打标签 + 到期自动归档,防 task 列表被每日任务刷屏。
- **persistent(可选)**:job 绑定一个常驻 task(`bound_task_id`),每次往同一线程追加消息,有跨天连续性("和昨天比")。代价:线程越长重发历史越多、token 逐日涨 —— 仅在用户明确要连续性时用。
**数据模型(新表 `scheduled_jobs`,独立加表不碰现有 schema → 公测兼容)**:
`id, user_id, name, prompt, cron, tz(默 Asia/Shanghai), mode(isolated|persistent), bound_task_id(可空), notify(JSONB 可空), enabled, timeout_seconds, next_run_at, last_run_at, last_status, last_error, last_task_id, consecutive_failures, expires_at(可空), created_at, deleted_at`。Alembic 加表 migration;`usage_events` 复用现成记账(可加 `kind="scheduled"` 自由文本区分,无需 migration)。
**守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~30s 扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at`
**croniter 选型**:存标准 5 段 cron 串 + 时区,`croniter` 算 `next_run_at`。理由:正确处理 dom/dow 同列的 vixie OR 语义和时区折算(手搓极易踩坑,四源都点名这个坑);纯 Python 小依赖。劣选:只支持"每天/每周 HH:MM"自己用 datetime 算 —— 零依赖但遇复杂周期要返工。
**可靠性(业界补的,纳入设计)**:
- **退避重试**:transient(限流/网络)指数退避重试(60s→120s→300s),成功重置;permanent(prompt 报错/鉴权)直接失败记 `last_error`
- **per-job 超时** `timeout_seconds`:超时复用现成协作式 cancel 信号(§7.x)。
- **无补跑(no catch-up)**:守护进程宕机期间错过的点**跳到下一个**,不补 N 次(同 Claude Code 语义)。
- **防自我繁殖**:定时 run 内**禁用 `schedule_create`**(防任务造任务);并发调度数设上限。
- **expiry 安全界**:`expires_at` + `consecutive_failures` 阈值 → 连续失败 N 次或长期没人管自动停,防僵尸定时任务(同 Claude Code 7 天过期思路)。
**对话端(用户要的"对话方式创建")**:核心是 host-side 工具三件套 `schedule_create / schedule_list / schedule_cancel`(写 `scheduled_jobs`,按 `user_id` 隔离,密钥不进沙箱,沿用 §3.4 typed-tool 范式)。自然语言进、自然语言管("我有哪些定时任务""取消那个简报")—— 即 Claude Code 模式(其定时任务纯工具实现,无配套 skill,证明工具单干可跑通)。
- **工具必须、skill 可选后置**:skill 是 markdown 不能落库,执行器只能是工具;收集字段/`ask_user` 确认这套流程,能力强的模型靠工具自描述 schema 即可走通。故 **v1 纯工具**(schema + 参数描述写好就够),契合 §5 "Less Scaffolding, More Trust" —— 先信模型,跑不好再加脚手架。
- **skill 真正值钱处不是教填参数(schema 够),而是教写好 `job.prompt`**:job 的 prompt 决定未来**每天**那次 run 的质量,用户随口一句直接存会跑得差;好 prompt 要自包含/可重复/产物位置明确/把发哪存哪写死 —— 模型默认不会,值得一份模板+确认纪律(cron 口径翻译、回读人话确认、默认 isolated 并提示 persistent 代价)去教。**v2 按需补**:实测发现 agent 写的 `job.prompt` 质量差 / 确认流程乱再加;且因调度低频,用按需 `load_skill`(§3.5)而非 always-on prompt 块,避免每轮白烧 token(§8.2)。
- 三件套用**三个独立工具**(schema 清晰、对齐 Claude Code `CronCreate/List/Delete`),非单工具带 `action` 参数。
**取舍(不选)**:
- **不引 APScheduler / Celery**:项目刻意用 plain-asyncio 后台循环(§8.4),调度需求是单机低并发,引调度框架/Redis broker 是过度工程。
- **不学 geta 用 JSON 文件持久化**:已有 PG + SQLAlchemy + alembic,加表是自然选择(JSON 文件丢状态、无事务、无按 user 查询)。
- **email 不做成 job 一等字段**:降通用性(见核心解耦);仅留可选 `notify` 兜底。
**风险 / 边界 / 待定**:
- **`send_email` 工具仍要建**(`tools/send_email.py`,host-side,仅当 `SMTP_*` env 存在才挂,沿用 §3.4 "有 key 才注册"),让第 2/3 层能用。**待定:SMTP 发信账号**(企业邮箱/QQ/163/Gmail 应用密码)—— 给真实账号走 env,或先占位走沙箱验证链路。
- **计费归属**:定时 run 计入 job 所属 `user_id` 的 token/配额,`usage_events` 标 `kind="scheduled"` 可审计。
- **错峰抖动**:多用户同设 8 点 → 按 job-id 加确定性偏移防同一秒打爆 LLM provider(单机低并发,列 nice-to-have 不阻塞 v1)。
- **待定小项**:可选 `notify` 字段是否 v1 就上(倾向上,零成本兜底);`expires_at` 默认值。
**改动面(v1)**:1 张新表 + migration、1 守护循环(lifespan)、4 个 schedule 工具(create/list/**update**/cancel)、1 个 send_email 工具、agent_builder 注册 + 定时 run 内工具裁剪。**v2 按需**:薄 skill(教写 `job.prompt`)。**不动** loop / llm / capabilities / 现有 DB schema。
**前端取舍(2026-06-18 定 + 落地):对话端做完整 CRUD,前端只读展示 + 停用/删除。** 前端 SPA 调 `/v1/*` REST、不经 agent → "界面建/改定时任务"必须另开 REST + 表单 + cron 构建器(整套最重的是让科研用户填 cron 的 UX)。既然产品本就是对话式 agent,把建/改/删/查全收到对话(`schedule_*` 工具),**前端退化成只读看板**:`GET /v1/schedules` 列表 + 列表项「停用/删除」两个高频便捷动作(`PATCH`/`DELETE /v1/schedules/{id}`)。好处:cron 构建器 UX 难题直接消失(用户从不在前端填 cron,对 bot 说"每天早九点"由模型翻译);无"前端改了和对话不同步"的状态问题。代价:界面不能新建/编辑(需求低频,且对话更自然)。落地:`web/static/js/crons.js` 只读 master-detail modal(复用 skills modal 范式)+ 左栏 rail「定时」入口;工具与 REST 共用 `core.scheduler` CRUD 服务层不漂移。
---
## 附录:DeepSeek V4 关键事实(2026-04-24)

View File

@ -2,7 +2,7 @@
> 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`
最后更新:2026-06-18(新增 brief skill:科研方向简报,三路检索 documents/research/web + 文献计量趋势型简报)
最后更新:2026-06-18(定时任务 v1:scheduled_jobs 表 + plain-asyncio 守护循环 + 3 工具 + send_email,DESIGN §8.5)
---
@ -21,6 +21,16 @@
## 已完成关键能力
### 2026-06-18 / 定时任务 v1(scheduled_jobs,DESIGN §8.5)
- 需求:对话方式建"每天 X 点干 Y"的定时任务(跑 skill 出简报 / 发邮件 / 打招呼皆可)。调研 OpenClaw/Autobot/Claude Code/geta 四源收敛,定方案见 DESIGN §8.5。
- **核心解耦**:job 本体 = `cron+tz + 一句 prompt + 会话模式`;"发邮件"不是字段,是 agent 据 prompt 调 `send_email` 的动作 → 加任何能力不改 schema。
- **不引调度框架**:croniter(唯一新依赖)只当 next_run 计算器(正确处理 dom/dow OR 语义 + 时区);"每 30s 醒来扫到点 job"是 plain-asyncio 守护循环,仿 §8.4 `_disk_scanner`,复用 `_run_agent_bg`,不上 APScheduler/Celery。
- **文件**:`core/storage/models.py` 加 `ScheduledJob` + migration `0011_scheduled_jobs`(独立加表,公测兼容)/ `core/scheduler.py`(cron 数学 + claim+advance 防重复触发 + record_result 失败阈值自停 + notify 兜底投递 + CRUD 服务层 `list/create/update/set_enabled/cancel_job`,工具与 REST 共用)/ `tools/schedule.py`(create/list/**update**/cancel 四件套,薄包装服务层,user_id ctor 注入,定时 run 内不挂防自我繁殖)/ `tools/send_email.py`(host-side,SMTP_* 齐才挂)/ `web/app.py` lifespan `_scheduler_loop` + `_execute_scheduled_job`(认领→抢 run 锁→to_thread 跑→超时协作 cancel→notify→记账)+ `/v1/schedules` GET/PATCH/DELETE 三端点。
- **对话端 = 完整 CRUD**(建/改/删/查都说着办);**前端 = 只读展示 + 停用/删除两个便捷按钮**(左栏 rail「定时」按钮 → `crons.js` 只读 master-detail modal,复用 skills modal 范式;建/改无 REST、故意只走对话,§8.5)。两条路径共用 `core.scheduler` 服务层不漂移。
- **会话模式**:isolated(默认,每次新建临时 task `scheduled-<id8>` 目录,省 token)/ persistent(绑定 bound_task_id 续上下文)。env:`SMTP_*` / `ZCBOT_DISABLE_SCHEDULER` / `ZCBOT_SCHEDULER_TICK_SECONDS` / `ZCBOT_SCHEDULER_CONCURRENCY`(见 RUN)。已验:migration 上库 0011、CRUD 服务层端到端、3 REST 路由 + 4 工具注册、crons.js 语法。bump 0.18.0 → 0.19.0。
- **v2 待做**:对话工具教写好 job.prompt 的薄 skill;退避重试(transient/permanent 区分)目前简化为"到下一 cron 点 + 连失败 5 次自停";真机邮件 smoke + 守护循环定时触发的端到端验证(需起 web 进程跑一轮)。
### 2026-06-18 / brief skill:科研方向简报
- 需求:用户要"水泥/建材方向的科研简报"。联网调研简报类做法——Anthropic 官方 digest skill(办公活动聚合)+ Paper Digest(论文影响力周报)+ 文献计量趋势报告(热点聚类/新兴方法/地理格局)。结论:现有 skill 缺"某方向近期文献 → 有判断的趋势简报"这一环(research/documents 只取文献不组织、paper-review 出可投稿综述、analyze 拆问题不查文献)。

11
RUN.md
View File

@ -44,6 +44,17 @@
# 对话整个 run 期占 1 线程,active_runs 逼近此值即排队(看 journalctl 的 [stats] 行);
# 并发不够再调大(先确认是真并发高、而非单条 run 慢)。
# ZCBOT_RUN_MAX_WORKERS=16
# 定时任务发邮件(send_email tool + 定时任务 notify 兜底投递,DESIGN §8.5):可选。
# 三者齐了才挂 send_email tool(没配的部署 agent 看不到这个工具);密钥只留宿主、不进 sandbox。
# SMTP_HOST=smtp.qq.com
# SMTP_PORT=465 # 默 465;465→SSL,其余→STARTTLS(可用 SMTP_TLS=ssl|starttls|none 覆盖)
# SMTP_USER=you@qq.com
# SMTP_PASSWORD=<授权码/应用专用密码,非登录密码>
# SMTP_FROM=you@qq.com # 可选,默认取 SMTP_USER
# 定时任务守护循环(DESIGN §8.5,随 web 进程起,plain-asyncio 仿 _disk_scanner):
# ZCBOT_DISABLE_SCHEDULER=1 # 可选,整体关掉调度(对照 Claude Code CLAUDE_CODE_DISABLE_CRON)
# ZCBOT_SCHEDULER_TICK_SECONDS=30 # 可选,扫描间隔,默 30s
# ZCBOT_SCHEDULER_CONCURRENCY=4 # 可选,并发跑的定时 run 上限,默 4
```
> litellm 在 import 时副作用加载 .env;入口走 `main.py`,`.env` 自动生效。直跑 `python -c "from core.storage import ..."` 不经 litellm 链路时记得自己 `import litellm` 触发,或手动 `export ZCBOT_DB_URL=...`
- **依赖**:`pip install -r requirements.txt`(已在 `.venv` 里;含 `bcrypt`)。

View File

@ -1,3 +1,3 @@
# zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。
# 改版本只动这一行。
__version__ = "0.18.0"
__version__ = "0.19.0"

View File

@ -56,6 +56,10 @@ from tools.task_progress import TaskProgressTool
from tools.ask_user import AskUserTool
from tools.web_fetch import WebFetchTool
from tools.web_search import WebSearchTool
from tools.schedule import (
ScheduleCancelTool, ScheduleCreateTool, ScheduleListTool, ScheduleUpdateTool,
)
from tools.send_email import SendEmailTool, smtp_configured
from core.ark_client import ArkConfig
from core.bocha_client import BochaConfig
@ -361,6 +365,7 @@ def build_agent(
image_variant: str = "",
video_variant: str = "",
cancel_check: Optional[Callable[[], bool]] = None,
scheduled_run: bool = False,
) -> Tuple[AgentLoop, Session, str, TaskState, Path]:
"""返回 (agent, session, task_id_str, task_state, working_dir_path)。
@ -546,6 +551,23 @@ def build_agent(
):
tools[t.name] = t
# 定时任务管理(DESIGN §8.5):增删查三件套。**定时 run 内不挂**(防任务造任务,
# 自我繁殖);仅交互对话里能建/管 job。user_id 由 ctor 注入,不信模型传的 id。
if not scheduled_run:
for t in (
ScheduleCreateTool(uid, base_dir=tool_base, user_root=ur_path),
ScheduleListTool(uid, base_dir=tool_base, user_root=ur_path),
ScheduleUpdateTool(uid, base_dir=tool_base, user_root=ur_path),
ScheduleCancelTool(uid, base_dir=tool_base, user_root=ur_path),
):
tools[t.name] = t
# 发邮件(§8.5 投递):仅当 SMTP_* env 齐了才挂(沿用"有 key 才注册",没配的
# 部署里 agent 看不到一个永远报错的工具)。定时与交互 run 都可用。
if smtp_configured():
se = SendEmailTool(base_dir=tool_base, user_root=ur_path)
tools[se.name] = se
if caps.enable_run_python:
rp = RunPythonTool(base_dir=tool_base, user_root=ur_path)
tools[rp.name] = rp

405
core/scheduler.py Normal file
View File

@ -0,0 +1,405 @@
"""定时任务调度核心(DESIGN §8.5)。
纯逻辑层:cronnext_run 计算due 任务认领跑完记账确定性兜底投递
**不碰 asyncio / broker / _run_agent_bg**(那些 web 专属编排留在 web/app.py
lifespan `_scheduler_loop`,仿 _disk_scanner 调本模块)
为什么 claim 时就推进 next_run_at:守护循环每 ~30s 扫一次,若不在认领时把 job
next_run_at 推到下一个 cron ,run 还没跑完时下一 tick 会把同一 job 重复触发
claim+advance 一把事务做掉 天然防重复触发(at-most-once per slot)
"""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from uuid import UUID, uuid4
from croniter import croniter
from sqlalchemy import select, update
from .storage import session_scope
from .storage.models import ScheduledJob
_MODES = ("isolated", "persistent")
try:
from zoneinfo import ZoneInfo
except ImportError: # pragma: no cover (py<3.9 不支持,本项目 3.11+)
ZoneInfo = None # type: ignore
# 连续失败到这个数自动停(防僵尸定时任务,DESIGN §8.5 expiry 安全界)
FAILURE_DISABLE_THRESHOLD = 5
# 单次 tick 最多认领多少 job(防一批同点任务一次性涌入)
CLAIM_LIMIT = 20
def validate_cron(expr: str) -> None:
"""非法 cron 抛 ValueError(给 schedule_create 工具做入参校验)。"""
expr = (expr or "").strip()
if not expr or not croniter.is_valid(expr):
raise ValueError(f"非法 cron 表达式: {expr!r}(需标准 5 段,如 '0 8 * * *')")
def _tzinfo(tz: str):
if ZoneInfo is None:
return timezone.utc
try:
return ZoneInfo(tz or "Asia/Shanghai")
except Exception:
return ZoneInfo("Asia/Shanghai")
def compute_next_run(cron: str, tz: str, after: Optional[datetime] = None) -> datetime:
"""按墙钟时区算下一个触发点,返回 UTC-aware datetime。
croniter 保留 base tzinfo base 折算到 job 的本地时区再算,
'0 8 * * *' 就是该时区的早上 8 ,而非 UTC 8 (§8.5 时区坑)
"""
tzinfo = _tzinfo(tz)
base = (after or datetime.now(timezone.utc)).astimezone(tzinfo)
nxt = croniter(cron, base).get_next(datetime)
return nxt.astimezone(timezone.utc)
def _snapshot(job: ScheduledJob) -> dict[str, Any]:
"""把 ORM 行拍成普通 dict,脱离 session 给编排层用(避免跨线程 lazy-load)。"""
return {
"job_id": job.job_id,
"user_id": job.user_id,
"name": job.name,
"prompt": job.prompt,
"cron": job.cron,
"tz": job.tz,
"mode": job.mode,
"bound_task_id": job.bound_task_id,
"skill": job.skill or "",
"model_profile": job.model_profile or "",
"notify": job.notify,
"timeout_seconds": job.timeout_seconds or 0,
}
def claim_due_jobs(now: Optional[datetime] = None, limit: int = CLAIM_LIMIT) -> list[dict[str, Any]]:
"""认领到点 job:一把事务 SELECT due + 推进 next_run_at,返回快照列表。
- 到点判据:enabled AND deleted_at IS NULL AND next_run_at <= now
- 过期(expires_at <= now): enabled=False 跳过,不返回(安全界)
- 其余:next_run_at 推到下一个 cron (防重复触发),返回快照交编排层去跑
"""
now = now or datetime.now(timezone.utc)
claimed: list[dict[str, Any]] = []
with session_scope() as s:
rows = s.execute(
select(ScheduledJob)
.where(
ScheduledJob.enabled.is_(True),
ScheduledJob.deleted_at.is_(None),
ScheduledJob.next_run_at <= now,
)
.order_by(ScheduledJob.next_run_at)
.limit(limit)
.with_for_update(skip_locked=True)
).scalars().all()
for job in rows:
if job.expires_at is not None and job.expires_at <= now:
job.enabled = False
job.last_status = "expired"
continue
claimed.append(_snapshot(job))
try:
job.next_run_at = compute_next_run(job.cron, job.tz, after=now)
except Exception:
# cron 莫名失效(理论上 create 时已校验)→ 停掉别让它卡死循环
job.enabled = False
job.last_status = "error"
job.last_error = "cron 计算失败,已自动停用"
return claimed
def record_result(
job_id: UUID,
*,
status: str,
task_id: Optional[UUID],
error: Optional[str] = None,
) -> None:
"""run 跑完(或 skip)后回写 last_*。ok 重置连续失败计数;error 累加,
到阈值自动停用"""
now = datetime.now(timezone.utc)
with session_scope() as s:
job = s.get(ScheduledJob, job_id)
if job is None:
return
job.last_run_at = now
job.last_status = status
job.last_error = error
if task_id is not None:
job.last_task_id = task_id
if status == "ok":
job.run_count = (job.run_count or 0) + 1
job.consecutive_failures = 0
elif status == "error":
job.run_count = (job.run_count or 0) + 1
job.consecutive_failures = (job.consecutive_failures or 0) + 1
if job.consecutive_failures >= FAILURE_DISABLE_THRESHOLD:
job.enabled = False
job.last_error = (
f"连续失败 {job.consecutive_failures} 次,已自动停用。最后错误: {error}"
)
# status == "skipped"(persistent task 正忙)不动计数,下一 cron 点再来
def build_run_message(snapshot: dict[str, Any]) -> str:
"""把 job.prompt 包成一条带标记的用户消息喂进 agent。"""
when = datetime.now(_tzinfo(snapshot.get("tz") or "Asia/Shanghai")).strftime("%Y-%m-%d %H:%M")
name = snapshot.get("name") or "定时任务"
return (
f"[定时任务「{name}」自动触发 · {when}]\n\n"
f"{snapshot['prompt']}"
)
# ───────────── 第 3 层确定性兜底投递(notify) ─────────────
def _newest_artifact(working_dir: Path) -> Optional[Path]:
"""工作目录里最近修改的普通文件(跳过隐藏 / .preview 缓存)。"""
if not working_dir.is_dir():
return None
best: Optional[Path] = None
best_mtime = -1.0
for p in working_dir.rglob("*"):
if not p.is_file():
continue
if any(part.startswith(".") for part in p.relative_to(working_dir).parts):
continue
try:
m = p.stat().st_mtime
except OSError:
continue
if m > best_mtime:
best, best_mtime = p, m
return best
def deliver_notify(
notify: Optional[dict[str, Any]],
*,
job_name: str,
working_dir: Path,
tz: str,
) -> None:
"""job 配了 notify 就确定性补发(不靠 agent 记性)。目前仅 email 通道:
把工作目录最新产物当附件,套固定模板发无产物则发纯文本告知已执行
阻塞 IO(smtplib),由编排层放进 run_in_executor 失败抛异常,编排层吞掉记日志
"""
if not notify or notify.get("channel") != "email":
return
to = notify.get("to")
if not to:
return
from tools.send_email import send_email_smtp # 延迟导入,避免 core→tools 顶层环依赖
when = datetime.now(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M")
artifact = _newest_artifact(working_dir)
if artifact is not None:
subject = f"[定时任务] {job_name} · {when}"
body = f"定时任务「{job_name}」已于 {when} 执行,产物见附件:{artifact.name}"
send_email_smtp(to, subject, body, [artifact])
else:
subject = f"[定时任务] {job_name} · {when}(无产物文件)"
body = f"定时任务「{job_name}」已于 {when} 执行,本次未产生文件产物。"
send_email_smtp(to, subject, body)
# ───────────── CRUD 服务层(对话工具 + REST 端点共用,DESIGN §8.5)─────────────
#
# tools/schedule.py(对话)与 web/app.py 的 /v1/schedules(前端只读+停用/删除)都调
# 这一层,避免两条创建路径逻辑漂移。函数内自管 session、返回可序列化 dict(脱离
# session,跨 web/工具线程安全);校验失败抛 JobError → 工具转 [Error] 行 / REST 转 4xx。
WEEKDAYS_CN = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
class JobError(ValueError):
"""job 校验 / 找不到 —— 工具转 [Error],REST 转 400/404。"""
def describe_cron(cron: str, tz: str) -> str:
"""常见 cron → 人话(给前端/工具回显);不认的样式退回原 cron 串。"""
parts = (cron or "").split()
if len(parts) != 5:
return cron
mi, ho, dom, mon, dow = parts
hhmm = None
if mi.isdigit() and ho.isdigit():
hhmm = f"{int(ho):02d}:{int(mi):02d}"
if hhmm and dom == "*" and mon == "*" and dow == "*":
return f"每天 {hhmm}"
if hhmm and dom == "*" and mon == "*" and dow.isdigit():
wd = int(dow) % 7 # cron 0/7=周日;映射到 WEEKDAYS_CN(0=周一)
idx = 6 if wd == 0 else wd - 1
return f"{WEEKDAYS_CN[idx]} {hhmm}"
if hhmm and dom.isdigit() and mon == "*" and dow == "*":
return f"每月 {int(dom)}{hhmm}"
if mi.startswith("*/") and ho == "*" and dom == "*":
return f"{mi[2:]} 分钟"
if mi.isdigit() and ho.startswith("*/") and dom == "*":
return f"{ho[2:]} 小时(第 {int(mi)} 分)"
return cron
def job_to_dict(job: ScheduledJob) -> dict[str, Any]:
"""ORM 行 → API/工具用的可序列化快照。"""
def _iso(dt: Optional[datetime]) -> Optional[str]:
if dt is None:
return None
return (dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)).isoformat()
return {
"job_id": str(job.job_id),
"short_id": str(job.job_id)[:8],
"name": job.name,
"prompt": job.prompt,
"cron": job.cron,
"schedule_desc": describe_cron(job.cron, job.tz),
"tz": job.tz,
"mode": job.mode,
"skill": job.skill or "",
"model_profile": job.model_profile or "",
"notify": job.notify,
"enabled": job.enabled,
"timeout_seconds": job.timeout_seconds or 0,
"next_run_at": _iso(job.next_run_at),
"last_run_at": _iso(job.last_run_at),
"last_status": job.last_status,
"last_error": job.last_error,
"last_task_id": str(job.last_task_id) if job.last_task_id else None,
"consecutive_failures": job.consecutive_failures or 0,
"run_count": job.run_count or 0,
"expires_at": _iso(job.expires_at),
"created_at": _iso(job.created_at),
}
def _validate_tz(tz: str) -> str:
tz = (tz or "Asia/Shanghai").strip() or "Asia/Shanghai"
if ZoneInfo is not None:
try:
ZoneInfo(tz)
except Exception:
raise JobError(f"未知时区: {tz!r}(用 IANA 名,如 'Asia/Shanghai')")
return tz
def list_jobs(user_id: UUID) -> list[dict[str, Any]]:
with session_scope() as s:
rows = s.execute(
select(ScheduledJob)
.where(ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None))
.order_by(ScheduledJob.created_at.desc())
).scalars().all()
return [job_to_dict(j) for j in rows]
def create_job(
user_id: UUID,
*,
name: str,
prompt: str,
cron: str,
tz: str = "Asia/Shanghai",
mode: str = "isolated",
skill: str = "",
notify: Optional[dict[str, Any]] = None,
model_profile: str = "",
timeout_seconds: int = 0,
) -> dict[str, Any]:
name = (name or "").strip()
prompt = (prompt or "").strip()
cron = (cron or "").strip()
mode = (mode or "isolated").strip().lower()
if not name:
raise JobError("name 不能为空")
if not prompt:
raise JobError("prompt 不能为空")
if mode not in _MODES:
raise JobError(f"mode 必须是 {_MODES} 之一")
validate_cron(cron)
tz = _validate_tz(tz)
next_run = compute_next_run(cron, tz)
job = ScheduledJob(
job_id=uuid4(), user_id=user_id, name=name, prompt=prompt, cron=cron, tz=tz,
mode=mode, skill=(skill or "").strip(), notify=notify,
model_profile=(model_profile or "").strip(),
timeout_seconds=int(timeout_seconds or 0), next_run_at=next_run,
)
with session_scope() as s:
s.add(job)
s.flush()
return job_to_dict(job)
def _resolve(s, user_id: UUID, id_str: str) -> ScheduledJob:
"""按完整 UUID 或短 id 前缀定位当前用户的 job;0 或多匹配抛 JobError。"""
jid = (id_str or "").strip()
if not jid:
raise JobError("job_id 不能为空")
rows = s.execute(
select(ScheduledJob).where(
ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None)
)
).scalars().all()
matches = [j for j in rows if str(j.job_id) == jid or str(j.job_id).startswith(jid)]
if not matches:
raise JobError(f"没找到 id 以 {jid!r} 开头的定时任务")
if len(matches) > 1:
ids = ", ".join(str(j.job_id)[:8] for j in matches)
raise JobError(f"id 前缀 {jid!r} 匹配到多个({ids}),请用更长的 id")
return matches[0]
_EDITABLE = {"name", "prompt", "cron", "tz", "mode", "skill", "notify", "model_profile",
"timeout_seconds", "enabled"}
def update_job(user_id: UUID, id_str: str, **fields: Any) -> dict[str, Any]:
"""改 job 的任意可编辑字段;改了 cron/tz 则重算 next_run_at。"""
fields = {k: v for k, v in fields.items() if k in _EDITABLE and v is not None}
if not fields:
raise JobError("没有可更新的字段")
if "mode" in fields:
fields["mode"] = str(fields["mode"]).strip().lower()
if fields["mode"] not in _MODES:
raise JobError(f"mode 必须是 {_MODES} 之一")
if "cron" in fields:
validate_cron(str(fields["cron"]).strip())
fields["cron"] = str(fields["cron"]).strip()
if "tz" in fields:
fields["tz"] = _validate_tz(str(fields["tz"]))
with session_scope() as s:
job = _resolve(s, user_id, id_str)
for k, v in fields.items():
setattr(job, k, v)
if "cron" in fields or "tz" in fields:
job.next_run_at = compute_next_run(job.cron, job.tz)
# 重新启用 / 改了排程 → 清零连续失败计数,给它干净的重新开始
if fields.get("enabled") is True or "cron" in fields:
job.consecutive_failures = 0
s.flush()
return job_to_dict(job)
def set_enabled(user_id: UUID, id_str: str, enabled: bool) -> dict[str, Any]:
return update_job(user_id, id_str, enabled=bool(enabled))
def cancel_job(user_id: UUID, id_str: str) -> dict[str, Any]:
"""软删(deleted_at + enabled=False)。"""
with session_scope() as s:
job = _resolve(s, user_id, id_str)
job.deleted_at = datetime.now(timezone.utc)
job.enabled = False
s.flush()
return job_to_dict(job)

View File

@ -21,6 +21,7 @@ from uuid import UUID, uuid4
from sqlalchemy import (
BigInteger,
Boolean,
DateTime,
ForeignKey,
Integer,
@ -171,3 +172,56 @@ class UserDiskUsage(Base):
)
class ScheduledJob(Base):
"""定时任务(0011,DESIGN §8.5)。
一行 = 一个"到点把 prompt 喂进 agent 主管线"的计划本体 = cron+tz(何时)
+ prompt(做什么)+ mode(跑在哪);"发邮件"不是字段, agent prompt
send_email 的动作 notify(可空 JSONB)"必达某邮箱"留确定性兜底
守护循环(web/app.py lifespan `_scheduler_loop`,仿 _disk_scanner) ~30s
`enabled AND deleted_at IS NULL AND next_run_at<=now()`,命中即复用 _run_agent_bg
run,跑完回写 last_* + croniter next_run_atmode:
- isolated(默认):每次新建临时 task,只带本 job prompt,不继承历史 token
- persistent:绑定 bound_task_id 常驻 task,追加消息有跨天连续性
"""
__tablename__ = "scheduled_jobs"
job_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4)
user_id: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True), ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False
)
name: Mapped[str] = mapped_column(Text, nullable=False)
prompt: Mapped[str] = mapped_column(Text, nullable=False)
cron: Mapped[str] = mapped_column(Text, nullable=False) # 标准 5 段 cron
tz: Mapped[str] = mapped_column(Text, nullable=False, server_default="Asia/Shanghai")
mode: Mapped[str] = mapped_column(Text, nullable=False, server_default="isolated") # isolated|persistent
# persistent 模式绑定的常驻 task;task 软删/物理删后 SET NULL(下次触发当 isolated 兜底)
bound_task_id: Mapped[Optional[UUID]] = mapped_column(
PG_UUID(as_uuid=True), ForeignKey("tasks.task_id", ondelete="SET NULL"), nullable=True
)
skill: Mapped[str] = mapped_column(Text, nullable=False, server_default="") # 可选预载 skill
model_profile: Mapped[str] = mapped_column(Text, nullable=False, server_default="") # 可选模型覆盖
# 第 3 层可靠投递:{"channel":"email","to":"a@b.com"};NULL=不兜底(走 prompt 驱动/线程未读)
notify: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB, nullable=True)
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="true")
timeout_seconds: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0") # 0=不限
next_run_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
last_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
last_status: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # ok|error|skipped
last_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
last_task_id: Mapped[Optional[UUID]] = mapped_column(PG_UUID(as_uuid=True), nullable=True)
consecutive_failures: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0")
run_count: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0")
expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)

View File

@ -0,0 +1,70 @@
"""scheduled_jobs 表(定时任务,DESIGN §8.5).
Revision ID: 0011
Revises: 0010
Create Date: 2026-06-18
新增独立表 scheduled_jobs 不碰现有 schema(公测兼容)一行 = 一个"到点把
prompt 喂进 agent 主管线"的计划。守护循环(web/app.py lifespan)按 (enabled,
next_run_at) 索引扫到点 job 触发 DESIGN §8.5 / core/storage/models.py
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB, UUID as PG_UUID
revision: str = "0011"
down_revision: Union[str, None] = "0010"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"scheduled_jobs",
sa.Column("job_id", PG_UUID(as_uuid=True), primary_key=True),
sa.Column(
"user_id", PG_UUID(as_uuid=True),
sa.ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False,
),
sa.Column("name", sa.Text(), nullable=False),
sa.Column("prompt", sa.Text(), nullable=False),
sa.Column("cron", sa.Text(), nullable=False),
sa.Column("tz", sa.Text(), nullable=False, server_default="Asia/Shanghai"),
sa.Column("mode", sa.Text(), nullable=False, server_default="isolated"),
sa.Column(
"bound_task_id", PG_UUID(as_uuid=True),
sa.ForeignKey("tasks.task_id", ondelete="SET NULL"), nullable=True,
),
sa.Column("skill", sa.Text(), nullable=False, server_default=""),
sa.Column("model_profile", sa.Text(), nullable=False, server_default=""),
sa.Column("notify", JSONB(), nullable=True),
sa.Column("enabled", sa.Boolean(), nullable=False, server_default="true"),
sa.Column("timeout_seconds", sa.Integer(), nullable=False, server_default="0"),
sa.Column("next_run_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_status", sa.Text(), nullable=True),
sa.Column("last_error", sa.Text(), nullable=True),
sa.Column("last_task_id", PG_UUID(as_uuid=True), nullable=True),
sa.Column("consecutive_failures", sa.Integer(), nullable=False, server_default="0"),
sa.Column("run_count", sa.Integer(), nullable=False, server_default="0"),
sa.Column("expires_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True),
)
# 守护循环 due 扫描热路径:WHERE enabled AND deleted_at IS NULL AND next_run_at<=now()
op.create_index(
"ix_scheduled_jobs_due", "scheduled_jobs", ["enabled", "next_run_at"],
)
# 用户列出自己的 job
op.create_index(
"ix_scheduled_jobs_user", "scheduled_jobs", ["user_id"],
)
def downgrade() -> None:
op.drop_index("ix_scheduled_jobs_user", table_name="scheduled_jobs")
op.drop_index("ix_scheduled_jobs_due", table_name="scheduled_jobs")
op.drop_table("scheduled_jobs")

View File

@ -15,6 +15,9 @@ markitdown[pdf,docx,pptx,xlsx]>=0.0.1
httpx>=0.27.0
html2text>=2024.0
# 定时任务(§8.5 scheduled_jobs):cron 串 → next_run_at 计算,正确处理 dom/dow OR 语义 + 时区
croniter>=2.0
# §7 B 阶段: Storage 落 PG
sqlalchemy>=2.0.0
psycopg[binary]>=3.1.0

202
tools/schedule.py Normal file
View File

@ -0,0 +1,202 @@
"""定时任务对话工具(DESIGN §8.5 对话端)。
create / list / update / cancel 对话端的完整 CRUDhost-side, user_id 隔离
(ctor 注入,不信模型传的 id)增删改查逻辑全在 core.scheduler 服务层,本文件只做
schema + 把结果/JobError 转成给模型看的文本前端只读视图走 /v1/schedules REST,
同样调那一层 两条路径不漂移
定时 run 内这几个工具不注册(防任务造任务,§8.5)
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from core import scheduler
from core.scheduler import JobError, _tzinfo
from .base import Tool
_MODES = ("isolated", "persistent")
def _fmt_local_iso(iso: Optional[str], tz: str) -> str:
if not iso:
return ""
dt = datetime.fromisoformat(iso)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M")
class _UserScopedTool(Tool):
"""带 user_id 的 host-side 工具基类。"""
def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None:
super().__init__(base_dir=base_dir, user_root=user_root)
self.user_id = user_id
class ScheduleCreateTool(_UserScopedTool):
name = "schedule_create"
description = (
"Create a recurring scheduled task that runs an instruction automatically on a cron "
"schedule. The instruction (`prompt`) is what an agent will be told to do at each fire "
"time — write it self-contained and repeatable (which skill to use, what to produce, "
"where to deliver). To email a result, say so IN the prompt (the run can call send_email), "
"or set `notify_email` for guaranteed delivery of the latest output. ALWAYS confirm the "
"schedule (read back the cron in plain words + the resolved next run time) with the user "
"before relying on it. Times are wall-clock in `tz`."
)
parameters = {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Short human label, e.g. '每日水泥简报'."},
"prompt": {
"type": "string",
"description": "Self-contained instruction to run each time (e.g. '用 brief skill 生成今日简报并存盘').",
},
"cron": {
"type": "string",
"description": "Standard 5-field cron 'min hour dom month dow'. E.g. '0 8 * * *' = daily 08:00; '0 9 * * 1' = Mondays 09:00.",
},
"tz": {"type": "string", "description": "IANA timezone, default 'Asia/Shanghai'."},
"mode": {
"type": "string",
"enum": list(_MODES),
"description": "isolated (default): fresh task each run, cheap, no cross-run memory. persistent: same task thread, keeps continuity (costs more tokens).",
},
"skill": {"type": "string", "description": "Optional skill name to preload for the run."},
"notify_email": {
"type": "string",
"description": "Optional email for guaranteed delivery of the run's latest artifact (deterministic, not dependent on the agent remembering).",
},
"timeout_seconds": {
"type": "integer",
"description": "Optional hard timeout for each run (0 = no limit).",
},
},
"required": ["name", "prompt", "cron"],
}
def execute(
self, name: str, prompt: str, cron: str, tz: str = "Asia/Shanghai",
mode: str = "isolated", skill: str = "", notify_email: str = "",
timeout_seconds: int = 0,
) -> str:
notify = None
if notify_email and notify_email.strip():
notify = {"channel": "email", "to": notify_email.strip()}
try:
job = scheduler.create_job(
self.user_id, name=name, prompt=prompt, cron=cron, tz=tz, mode=mode,
skill=skill, notify=notify, timeout_seconds=timeout_seconds,
)
except JobError as e:
return f"[Error] {e}"
nf = f";产物将发到 {notify['to']}" if notify else ""
return (
f"[ok] 已创建定时任务「{job['name']}」(id={job['short_id']},mode={job['mode']}{nf})。\n"
f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});"
f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}"
)
class ScheduleListTool(_UserScopedTool):
name = "schedule_list"
description = (
"List the current user's scheduled tasks with id, schedule, next run time and last "
"status. Use when the user asks what scheduled tasks they have."
)
parameters = {"type": "object", "properties": {}}
def execute(self) -> str:
jobs = scheduler.list_jobs(self.user_id)
if not jobs:
return "(没有定时任务)"
lines = ["定时任务列表:"]
for j in jobs:
flag = "" if j["enabled"] else " [已停用]"
nxt = _fmt_local_iso(j["next_run_at"], j["tz"])
lines.append(
f"- {j['short_id']}{j['name']}{flag} | {j['schedule_desc']} | "
f"{j['mode']} | 下次 {nxt} | 上次 {j['last_status'] or ''}"
)
return "\n".join(lines)
class ScheduleUpdateTool(_UserScopedTool):
name = "schedule_update"
description = (
"Edit an existing scheduled task by id (8-char short id from schedule_list is accepted). "
"Pass ONLY the fields to change. Use to reschedule (cron), rewrite the instruction "
"(prompt), pause/resume (enabled), change delivery (notify_email), etc. Changing cron/tz "
"recomputes the next run time. Confirm the change (read back new schedule) with the user."
)
parameters = {
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."},
"name": {"type": "string", "description": "New label."},
"prompt": {"type": "string", "description": "New instruction to run."},
"cron": {"type": "string", "description": "New 5-field cron expression."},
"tz": {"type": "string", "description": "New IANA timezone."},
"mode": {"type": "string", "enum": list(_MODES), "description": "New session mode."},
"skill": {"type": "string", "description": "New preload skill (empty string to clear)."},
"enabled": {"type": "boolean", "description": "true=resume, false=pause."},
"notify_email": {
"type": "string",
"description": "Set guaranteed-delivery email; empty string clears it.",
},
"timeout_seconds": {"type": "integer", "description": "New per-run timeout (0=none)."},
},
"required": ["job_id"],
}
def execute(
self, job_id: str, name: Optional[str] = None, prompt: Optional[str] = None,
cron: Optional[str] = None, tz: Optional[str] = None, mode: Optional[str] = None,
skill: Optional[str] = None, enabled: Optional[bool] = None,
notify_email: Optional[str] = None, timeout_seconds: Optional[int] = None,
) -> str:
fields: dict = {}
for k, v in (("name", name), ("prompt", prompt), ("cron", cron), ("tz", tz),
("mode", mode), ("skill", skill), ("enabled", enabled),
("timeout_seconds", timeout_seconds)):
if v is not None:
fields[k] = v
# notify_email 是便捷字段 → 转 notify dict;传空串 = 清除兜底投递
if notify_email is not None:
fields["notify"] = {"channel": "email", "to": notify_email.strip()} if notify_email.strip() else {}
try:
job = scheduler.update_job(self.user_id, job_id, **fields)
except JobError as e:
return f"[Error] {e}"
state = "启用" if job["enabled"] else "已停用"
return (
f"[ok] 已更新定时任务「{job['name']}」(id={job['short_id']},{state})。\n"
f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});"
f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}"
)
class ScheduleCancelTool(_UserScopedTool):
name = "schedule_cancel"
description = (
"Cancel (soft-delete) a scheduled task by id (8-char short id accepted). It stops firing "
"immediately. To merely pause (keep it for later), use schedule_update enabled=false instead."
)
parameters = {
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."},
},
"required": ["job_id"],
}
def execute(self, job_id: str) -> str:
try:
job = scheduler.cancel_job(self.user_id, job_id)
except JobError as e:
return f"[Error] {e}"
return f"[ok] 已取消定时任务「{job['name']}」(id={job['short_id']})"

169
tools/send_email.py Normal file
View File

@ -0,0 +1,169 @@
"""发邮件(host-side,DESIGN §8.5 投递第 2/3 层)。
- `send_email_smtp(...)`:纯函数, SMTP_* env 发信SendEmailTool 与定时任务的
确定性兜底投递(core/scheduler.py notify)共用它
- `smtp_configured()`:agent_builder 据此决定挂不挂 tool(沿用"有 key 才注册"范式,
§3.4);没配 SMTP 的部署里 agent 看不到一个永远报错的工具
- `SendEmailTool`:agent (定时或交互)run 里调,附件路径强制落 user_root 内防越界
密钥只在 host 进程读,绝不进沙箱 / run_pythonenv:
SMTP_HOST SMTP_PORT( 465) SMTP_USER SMTP_PASSWORD
SMTP_FROM( SMTP_USER) SMTP_TLS(ssl|starttls|none;默按端口:465ssl 否则 starttls)
"""
from __future__ import annotations
import os
import smtplib
from email.message import EmailMessage
from email.utils import formataddr
from pathlib import Path
from typing import Iterable, Optional
from .base import Tool
_MAX_ATTACH_BYTES = 20 * 1024 * 1024 # 单封附件总上限,防把大产物塞爆 SMTP
_MAX_RECIPIENTS = 10
def smtp_configured() -> bool:
"""最小可发信集合是否齐全。"""
return bool(
os.getenv("SMTP_HOST", "").strip()
and os.getenv("SMTP_USER", "").strip()
and os.getenv("SMTP_PASSWORD", "").strip()
)
def _tls_mode(port: int) -> str:
mode = os.getenv("SMTP_TLS", "").strip().lower()
if mode in ("ssl", "starttls", "none"):
return mode
return "ssl" if port == 465 else "starttls"
def send_email_smtp(
to: Iterable[str] | str,
subject: str,
body: str,
attachments: Optional[Iterable[Path]] = None,
*,
timeout: float = 30.0,
) -> None:
"""同步发一封纯文本邮件(可带附件)。失败抛异常,由调用方决定如何处理。
host-side 调用(SendEmailTool to_thread run 线程里;scheduler
run_in_executor ) smtplib 是阻塞 IO,不要在 asyncio loop 直接 await
"""
if not smtp_configured():
raise RuntimeError("SMTP 未配置(需 SMTP_HOST/SMTP_USER/SMTP_PASSWORD)")
host = os.getenv("SMTP_HOST", "").strip()
port = int(os.getenv("SMTP_PORT", "465").strip() or "465")
user = os.getenv("SMTP_USER", "").strip()
password = os.getenv("SMTP_PASSWORD", "").strip()
sender = os.getenv("SMTP_FROM", "").strip() or user
if isinstance(to, str):
to_list = [to]
else:
to_list = list(to)
to_list = [a.strip() for a in to_list if a and a.strip()]
if not to_list:
raise ValueError("收件人为空")
if len(to_list) > _MAX_RECIPIENTS:
raise ValueError(f"收件人过多(上限 {_MAX_RECIPIENTS})")
msg = EmailMessage()
msg["From"] = formataddr(("zcbot", sender))
msg["To"] = ", ".join(to_list)
msg["Subject"] = subject or "(无主题)"
msg.set_content(body or "")
total = 0
for p in attachments or []:
p = Path(p)
if not p.is_file():
continue
data = p.read_bytes()
total += len(data)
if total > _MAX_ATTACH_BYTES:
raise ValueError(f"附件总大小超过 {_MAX_ATTACH_BYTES // (1024*1024)}MB")
msg.add_attachment(
data, maintype="application", subtype="octet-stream", filename=p.name
)
tls = _tls_mode(port)
if tls == "ssl":
with smtplib.SMTP_SSL(host, port, timeout=timeout) as smtp:
smtp.login(user, password)
smtp.send_message(msg)
else:
with smtplib.SMTP(host, port, timeout=timeout) as smtp:
if tls == "starttls":
smtp.starttls()
smtp.login(user, password)
smtp.send_message(msg)
class SendEmailTool(Tool):
name = "send_email"
description = (
"Send an email (plain text, optional file attachments) via the server's configured "
"SMTP account. Use this when the user asks to email a result/report to someone, or when "
"a scheduled task's instruction says to email its output. Attachments are paths inside "
"the working directory (e.g. 'report.docx' or '<wd>/figures/x.png'). Returns a short "
"confirmation or an [Error] line."
)
parameters = {
"type": "object",
"properties": {
"to": {
"type": "array",
"items": {"type": "string"},
"description": "Recipient email address(es).",
},
"subject": {"type": "string", "description": "Email subject line."},
"body": {"type": "string", "description": "Plain-text email body."},
"attachments": {
"type": "array",
"items": {"type": "string"},
"description": "Optional file paths (relative to working dir) to attach.",
},
},
"required": ["to", "subject", "body"],
}
def execute(
self,
to: list[str] | str,
subject: str,
body: str,
attachments: Optional[list[str]] = None,
) -> str:
if isinstance(to, str):
to = [to]
recipients = [a.strip() for a in (to or []) if isinstance(a, str) and a.strip()]
if not recipients:
return "[Error] to 不能为空"
resolved: list[Path] = []
for raw in attachments or []:
if not isinstance(raw, str) or not raw.strip():
continue
p = self._resolve(raw.strip()).resolve()
# 附件强制落 user_root 内,防 ../ 读到别人/系统文件
if self.user_root is not None:
try:
p.relative_to(self.user_root.resolve())
except ValueError:
return f"[Error] 附件路径越界(必须在工作目录内): {raw}"
if not p.is_file():
return f"[Error] 附件不存在: {raw}"
resolved.append(p)
try:
send_email_smtp(recipients, subject, body, resolved)
except Exception as e:
return f"[Error] 发送失败: {type(e).__name__}: {e}"
n = f"(含 {len(resolved)} 个附件)" if resolved else ""
return f"[ok] 已发送给 {', '.join(recipients)} {n}".strip()

View File

@ -36,13 +36,13 @@ from sqlalchemy import BigInteger, cast, func, select, update
from starlette.background import BackgroundTask
from core import __version__
from core.paths import to_db_path
from core.paths import from_db_path, to_db_path
from core.storage import (
NoSubtaskError,
check_no_subtask,
session_scope,
)
from core.storage.models import Message, Task, UsageEvent
from core.storage.models import Message, ScheduledJob, Task, UsageEvent
from core.storage.utils import ensure_local_task_row
from .auth import (
@ -349,6 +349,7 @@ def _validate_transfer(
def _run_agent_bg(
task_id: UUID, user_id: UUID, user_message: str,
image_variant: str = "", video_variant: str = "",
scheduled: bool = False,
) -> None:
"""工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 tasks.run_status。
@ -372,6 +373,7 @@ def _run_agent_bg(
image_variant=image_variant,
video_variant=video_variant,
cancel_check=cancel_check,
scheduled_run=scheduled,
)
agent.sink = WebEventSink(broker, task_id)
agent.run(user_message)
@ -510,6 +512,12 @@ class TaskPatchRequest(BaseModel):
model_profile: Optional[str] = None # 切模型(c 模式 task 层 / A 粒度 — 下条 send 生效)
class SchedulePatchRequest(BaseModel):
# 前端只读视图仅用 enabled(停用/启用);其余字段留着供"对话改不了时"的兜底直改,
# 但前端不暴露编辑表单(建/改走对话,§8.5)。
enabled: Optional[bool] = None
class MessageRequest(BaseModel):
content: str
# 该条消息触发的生图 / 生视频模型 variant key(config/media/doubao.yaml image/video 段)。
@ -691,6 +699,154 @@ def create_app() -> FastAPI:
stats_logger_task = asyncio.create_task(_stats_logger(), name="stats-logger")
# ── 定时任务守护循环(§8.5)── 仿 _disk_scanner 的 plain-asyncio 范式,不引
# APScheduler/Celery。每 ~30s 认领到点 job(claim+advance next_run 防重复触发),
# 复用 _run_agent_bg 起 run,跑完确定性兜底投递 + 回写 last_*。ZCBOT_DISABLE_SCHEDULER=1
# 整体关掉(对照 Claude Code CLAUDE_CODE_DISABLE_CRON)。
scheduler_enabled = os.getenv("ZCBOT_DISABLE_SCHEDULER", "").strip() not in ("1", "true", "yes")
sched_tick = int(os.getenv("ZCBOT_SCHEDULER_TICK_SECONDS", "30") or "30")
sched_sema = asyncio.Semaphore(int(os.getenv("ZCBOT_SCHEDULER_CONCURRENCY", "4") or "4"))
async def _execute_scheduled_job(snap: dict) -> None:
"""认领后跑一个 job:解析目标 task → 抢 run 锁 → _run_agent_bg → 投递 + 记账。"""
from core.agent_builder import (
resolve_workspace, working_dir_from_name, validate_task_name, InvalidTaskName,
)
from core.scheduler import build_run_message, deliver_notify, record_result
from core.storage.utils import ensure_local_task_row
job_id = snap["job_id"]
uid = snap["user_id"]
async with sched_sema:
try:
profile, model_id = _resolve_model_profile(snap.get("model_profile") or "")
ws = resolve_workspace(None, _cfg)
# 目标 task:persistent 用绑定 task(缺则新建并回填);isolated 用稳定 per-job 目录
tid: Optional[UUID] = None
if snap["mode"] == "persistent" and snap.get("bound_task_id"):
tid = snap["bound_task_id"]
# 绑定 task 可能已被删(SET NULL 已处理 None;这里再查实在性)
with session_scope() as s:
exists = s.execute(
select(Task.task_id).where(
Task.task_id == tid, Task.deleted_at.is_(None)
)
).first()
if exists is None:
tid = None
if tid is None:
tid = uuid4()
wd_name = f"scheduled-{str(job_id)[:8]}"
fs_dir = working_dir_from_name(ws, uid, wd_name)
fs_dir.mkdir(parents=True, exist_ok=True)
disp = f"{snap['name']}"
try:
disp = validate_task_name(disp)
except InvalidTaskName:
disp = wd_name # 名字含非法字符 → 退到安全名
ensure_local_task_row(
task_id=tid, name=disp, working_dir=to_db_path(fs_dir),
skill=snap.get("skill") or "", user_id=uid,
model=model_id, model_profile=profile,
description="(定时任务自动创建)",
)
if snap["mode"] == "persistent":
with session_scope() as s:
s.execute(update(ScheduledJob).where(
ScheduledJob.job_id == job_id
).values(bound_task_id=tid))
# 抢 run 锁(同 post_message):busy → 本次跳过,下个 cron 点再来
with session_scope() as s:
row = s.execute(
select(Task.run_status).where(Task.task_id == tid).with_for_update()
).first()
if row is None:
record_result(job_id, status="error", task_id=tid, error="目标 task 不存在")
return
if row.run_status in ("running", "cancelling"):
record_result(job_id, status="skipped", task_id=tid,
error="目标 task 正忙,本次跳过")
print(f"[scheduler] job {str(job_id)[:8]} skipped (task busy)")
return
s.execute(update(Task).where(Task.task_id == tid).values(
run_status="running", run_error=None))
message = build_run_message(snap)
broker.start(tid)
runner = asyncio.create_task(asyncio.to_thread(
_run_agent_bg, tid, uid, message, "", "", True,
))
app.state.inflight[runner] = tid
runner.add_done_callback(lambda t: app.state.inflight.pop(t, None))
timeout = int(snap.get("timeout_seconds") or 0)
if timeout > 0:
done, _pending = await asyncio.wait({runner}, timeout=timeout)
if not done:
broker.request_cancel(tid) # 协作式停;loop 在 chunk 间 poll 到即退
print(f"[scheduler] job {str(job_id)[:8]} timed out ({timeout}s), cancelling")
await runner
else:
await runner
# run 终态:_run_agent_bg 收尾把 run_status 写回 idle(ok)/error
with session_scope() as s:
st = s.execute(
select(Task.run_status, Task.run_error).where(Task.task_id == tid)
).first()
if st is not None and st.run_status == "error":
record_result(job_id, status="error", task_id=tid, error=st.run_error)
print(f"[scheduler] job {str(job_id)[:8]} run error: {st.run_error}")
return
# 第 3 层确定性兜底投递(notify);失败不影响 run 已成功这一事实
if snap.get("notify"):
try:
with session_scope() as s:
wd_db = s.execute(
select(Task.working_dir).where(Task.task_id == tid)
).scalar_one_or_none()
fs_dir = from_db_path(wd_db) if wd_db else ws
await asyncio.get_running_loop().run_in_executor(
None, lambda: deliver_notify(
snap["notify"], job_name=snap["name"],
working_dir=fs_dir, tz=snap["tz"],
)
)
except Exception as e:
print(f"[scheduler] job {str(job_id)[:8]} notify failed: {type(e).__name__}: {e}")
record_result(job_id, status="ok", task_id=tid)
print(f"[scheduler] job {str(job_id)[:8]} '{snap['name']}' done")
except Exception as e:
print(f"[scheduler] job {str(job_id)[:8]} crashed: {type(e).__name__}: {e}")
try:
record_result(job_id, status="error", task_id=None, error=f"{type(e).__name__}: {e}")
except Exception:
pass
async def _scheduler_loop() -> None:
from core.scheduler import claim_due_jobs
loop = asyncio.get_running_loop()
while True:
try:
await asyncio.sleep(sched_tick)
if getattr(app.state, "draining", None) is not None and app.state.draining.is_set():
continue # 关停 drain 期不起新 job
due = await loop.run_in_executor(None, claim_due_jobs)
for snap in due:
asyncio.create_task(_execute_scheduled_job(snap))
if due:
print(f"[scheduler] fired {len(due)} job(s)")
except asyncio.CancelledError:
raise
except Exception as e:
print(f"[scheduler] loop error: {type(e).__name__}: {e}")
scheduler_task = asyncio.create_task(_scheduler_loop(), name="scheduler") if scheduler_enabled else None
if scheduler_enabled:
print(f"[scheduler] enabled (tick={sched_tick}s)")
# Sandbox pool(§7.5):仅当 ZCBOT_SANDBOX_BACKEND=docker 时启用。
# 启动钩子:① init_pool(创建 docker network + pool 实例)② shutdown_all 清
# 前驱孤儿(上次进程留下的 zcbot-sandbox-* 容器,内存 _last_active 为空,
@ -782,6 +938,12 @@ def create_app() -> FastAPI:
await stats_logger_task
except (asyncio.CancelledError, Exception):
pass
if scheduler_task is not None:
scheduler_task.cancel()
try:
await scheduler_task
except (asyncio.CancelledError, Exception):
pass
if sandbox_reaper_task is not None:
sandbox_reaper_task.cancel()
try:
@ -1329,6 +1491,37 @@ def create_app() -> FastAPI:
raise HTTPException(404, f"memory file not found: {filename!r}")
return {"filename": filename, "content": content}
# ───────────── 定时任务(DESIGN §8.5)─────────────
# 前端只读展示 + 停用/删除两个便捷动作;建/改全走对话(schedule_* 工具)。
# 与对话工具共用 core.scheduler 服务层,两条路径不漂移。
@app.get("/v1/schedules", tags=["schedules"])
def list_schedules(user_id: UUID = Depends(require_user)):
"""列当前用户的定时任务(只读)。前端「定时」面板一次拉满。"""
from core import scheduler
return {"results": scheduler.list_jobs(user_id)}
@app.patch("/v1/schedules/{job_id}", tags=["schedules"])
def patch_schedule(
job_id: str, body: SchedulePatchRequest, user_id: UUID = Depends(require_user),
):
"""改定时任务 —— 前端只用来停用/启用(enabled)。其余编辑走对话。"""
from core import scheduler
if body.enabled is None:
raise HTTPException(400, "no fields to update")
try:
return scheduler.set_enabled(user_id, job_id, body.enabled)
except scheduler.JobError as e:
raise HTTPException(404, str(e))
@app.delete("/v1/schedules/{job_id}", status_code=204, tags=["schedules"])
def delete_schedule(job_id: str, user_id: UUID = Depends(require_user)):
"""删定时任务(软删,立即停止触发)。"""
from core import scheduler
try:
scheduler.cancel_job(user_id, job_id)
except scheduler.JobError as e:
raise HTTPException(404, str(e))
@app.delete("/v1/tasks/{task_id}", status_code=204, tags=["tasks"])
def delete_task(task_id: str, user_id: UUID = Depends(require_user)):
"""软删除:置 deleted_at=now(),从任务列表隐藏。

View File

@ -292,6 +292,42 @@
.sk-pane { width: auto; max-height: 26vh; border-right: none; border-bottom: 1px solid var(--border); }
}
/* 定时任务 modal(只读 + 停用/删除,DESIGN §8.5)— 复用 .sk-item/.sk-badge/.sk-empty */
#crons-modal .card {
width: 880px; max-width: 94vw; height: 78vh; max-height: 78vh;
display: flex; flex-direction: column;
}
#crons-modal h3 {
margin: 0; padding: 12px 16px; font-size: 16px;
border-bottom: 1px solid var(--border); display: flex; align-items: center; gap: 8px;
}
#crons-modal h3 .spacer { flex: 1; }
#crons-modal h3 svg { opacity: .85; }
#crons-modal .cr-hint { font-size: 11px; font-weight: 400; color: var(--muted); }
#crons-modal .sk-x {
border: none; background: transparent; font-size: 16px;
cursor: pointer; color: var(--muted); padding: 2px 6px;
}
#cr-cols { flex: 1; display: flex; min-height: 0; }
#cr-list { width: 300px; flex-shrink: 0; overflow: auto; padding: 12px; border-right: 1px solid var(--border); }
#cr-detail { flex: 1; min-width: 0; overflow: auto; padding: 16px 20px; }
.cr-sched { font-size: 12px; color: var(--accent); margin-top: 2px; }
.cr-meta { font-size: 11px; color: var(--muted); margin-top: 3px; }
.cr-st { font-size: 10px; font-weight: 500; border-radius: 8px; padding: 0 6px; white-space: nowrap; }
.cr-st.ok { color: #2a8a4a; border: 1px solid #2a8a4a; }
.cr-st.error { color: #c0392b; border: 1px solid #c0392b; }
.cr-st.paused { color: var(--muted); border: 1px solid var(--border); }
.cr-d-row { display: flex; gap: 8px; padding: 7px 0; border-bottom: 1px solid var(--border); font-size: 13px; }
.cr-d-row .k { width: 84px; flex-shrink: 0; color: var(--muted); }
.cr-d-row .v { flex: 1; min-width: 0; word-break: break-word; white-space: pre-wrap; }
.cr-acts { display: flex; gap: 8px; margin-top: 16px; flex-wrap: wrap; }
@media (max-width: 760px) {
#crons-modal .card { width: 96vw; height: 88vh; max-height: 88vh; }
#cr-cols { flex-direction: column; }
#cr-list { width: auto; max-height: 30vh; border-right: none; border-bottom: 1px solid var(--border); }
#crons-modal .cr-hint { display: none; }
}
/* ───── 记忆查看 modal(只读两栏;改走对话)───── */
#memory-modal { z-index: 112; }
#memory-modal .card {
@ -1230,6 +1266,22 @@
</div>
</div>
<div id="crons-modal" class="modal">
<div class="card">
<h3>
<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><circle cx="12" cy="12" r="9"></circle><path d="M12 7v5l3 2"></path></svg>
<span>定时任务</span>
<span class="spacer"></span>
<span class="cr-hint">新建 / 修改请在对话里说,例如「每天早八点把水泥简报发我邮箱」</span>
<button id="cr-close" class="sk-x" title="关闭"></button>
</h3>
<div id="cr-cols">
<div id="cr-list"><div class="muted" style="padding:8px;">加载中…</div></div>
<div id="cr-detail"><div class="sk-empty">← 选一个定时任务查看详情</div></div>
</div>
</div>
</div>
<!-- ───── 记忆查看 modal(只读;改记忆走对话)───── -->
<div id="memory-modal" class="modal">
<div class="card">
@ -1324,6 +1376,10 @@
<svg viewBox="0 0 24 24" width="15" height="15" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M4 19.5A2.5 2.5 0 0 1 6.5 17H20"></path><path d="M6.5 2H20v20H6.5A2.5 2.5 0 0 1 4 19.5v-15A2.5 2.5 0 0 1 6.5 2z"></path></svg>
<span>记忆</span>
</button>
<button id="hd-crons" title="查看定时任务(建 / 改请在对话里说)">
<svg viewBox="0 0 24 24" width="15" height="15" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><circle cx="12" cy="12" r="9"></circle><path d="M12 7v5l3 2"></path></svg>
<span>定时</span>
</button>
</div>
</div>
<div id="split-left" class="splitter" role="separator" aria-orientation="vertical" title="拖拽调整任务栏宽度"></div>

163
web/static/js/crons.js Normal file
View File

@ -0,0 +1,163 @@
// 定时任务 modal:只读两栏 master-detail(左列表 / 右详情),仅「停用/启用」「删除」
// 两个便捷动作;新建 / 修改全走对话(schedule_* 工具,DESIGN §8.5)。
// 左侧 rail 底部「定时」按钮触发。
// 后端:GET /v1/schedules(列表)、PATCH /v1/schedules/{id}{enabled}(停用/启用)、
// DELETE /v1/schedules/{id}(删除)。建/改无 REST —— 故意只读。
import { $ } from "./dom.js";
import { api } from "./api.js";
import { escapeHtml } from "./format.js";
import { selectTask } from "./chat.js";
const PLACEHOLDER = '<div class="sk-empty">← 选一个定时任务查看详情</div>';
let _jobs = [];
function openCronsModal() {
$("crons-modal").classList.add("show");
$("cr-detail").innerHTML = PLACEHOLDER;
renderList();
}
export function closeCronsModal() {
$("crons-modal").classList.remove("show");
}
// UTC ISO → 浏览器本地短时刻(用户浏览器基本就在 CST,直观)。
function ts(iso) {
if (!iso) return "—";
const d = new Date(iso);
if (isNaN(d)) return "—";
return d.toLocaleString("zh-CN", {
month: "numeric", day: "numeric", hour: "2-digit", minute: "2-digit", hour12: false,
});
}
function statusBadge(j) {
if (!j.enabled) return '<span class="cr-st paused">已停用</span>';
if (j.last_status === "error") return '<span class="cr-st error">上次失败</span>';
if (j.last_status === "ok") return '<span class="cr-st ok">正常</span>';
return '<span class="cr-st paused">待运行</span>';
}
function itemHtml(j) {
return `<div class="sk-item" data-id="${escapeHtml(j.job_id)}">
<div class="sk-name">${escapeHtml(j.name)} ${statusBadge(j)}</div>
<div class="cr-sched">${escapeHtml(j.schedule_desc || j.cron)}</div>
<div class="cr-meta">下次 ${ts(j.next_run_at)} · 上次 ${ts(j.last_run_at)}</div>
</div>`;
}
async function renderList() {
const list = $("cr-list");
list.innerHTML = '<div class="muted" style="padding:8px;">加载中…</div>';
let data;
try {
data = await api("GET", "/v1/schedules");
} catch (e) {
list.innerHTML = `<div class="err" style="padding:8px;">加载失败: ${escapeHtml(e.message)}</div>`;
return;
}
_jobs = data.results || [];
if (!_jobs.length) {
list.innerHTML =
'<div class="muted" style="padding:8px;font-size:12px;">还没有定时任务。' +
'对助手说「每天早上八点用 brief skill 出份简报发我邮箱」即可创建。</div>';
return;
}
const active = _jobs.filter((j) => j.enabled);
const paused = _jobs.filter((j) => !j.enabled);
let html = `<div class="sk-group-title">活跃 (${active.length})</div>`;
html += active.map(itemHtml).join("") ||
'<div class="muted" style="padding:4px 8px;font-size:12px;">(无)</div>';
if (paused.length) {
html += `<div class="sk-group-title" style="margin-top:12px;">已停用 (${paused.length})</div>`;
html += paused.map(itemHtml).join("");
}
list.innerHTML = html;
}
function row(k, v) {
return `<div class="cr-d-row"><span class="k">${k}</span><span class="v">${v}</span></div>`;
}
function selectJob(id, itemEl) {
$("cr-list").querySelectorAll(".sk-item.active").forEach((el) => el.classList.remove("active"));
if (itemEl) itemEl.classList.add("active");
const j = _jobs.find((x) => x.job_id === id);
if (!j) return;
const notify = j.notify && j.notify.to
? `必达邮件 → ${escapeHtml(j.notify.to)}` : "无(结果进任务线程 / 由指令自行投递)";
const modeDesc = j.mode === "persistent" ? "持续(同一任务线程,有连续性)" : "独立(每次新建任务,省 token)";
let rows =
row("排程", `${escapeHtml(j.schedule_desc || "")} <span class="muted">(${escapeHtml(j.cron)} @${escapeHtml(j.tz)})</span>`) +
row("模式", escapeHtml(modeDesc)) +
row("指令", escapeHtml(j.prompt)) +
(j.skill ? row("技能", escapeHtml(j.skill)) : "") +
row("通知", notify) +
row("下次触发", ts(j.next_run_at)) +
row("上次执行", `${ts(j.last_run_at)} · ${escapeHtml(j.last_status || "—")}`);
if (j.last_error) rows += row("上次错误", `<span style="color:#c0392b">${escapeHtml(j.last_error)}</span>`);
rows += row("累计运行", `${j.run_count}` + (j.consecutive_failures ? ` · 连续失败 ${j.consecutive_failures}` : ""));
const openTask = j.last_task_id
? `<button class="small" data-open-task="${escapeHtml(j.last_task_id)}">打开它跑的任务</button>` : "";
$("cr-detail").innerHTML =
`<div class="sk-d-head"><span class="sk-d-name">${escapeHtml(j.name)}</span> ${statusBadge(j)}<span class="spacer"></span></div>` +
rows +
`<div class="cr-acts">
<button class="small" data-toggle="${escapeHtml(j.job_id)}">${j.enabled ? "停用" : "启用"}</button>
<button class="small danger" data-del="${escapeHtml(j.job_id)}">删除</button>
${openTask}
</div>`;
}
// ───── 顶层绑定 ─────
$("hd-crons").onclick = openCronsModal;
$("cr-close").onclick = closeCronsModal;
$("crons-modal").addEventListener("click", (e) => {
if (e.target.id === "crons-modal") closeCronsModal(); // 点遮罩关闭
});
$("cr-list").addEventListener("click", (e) => {
const item = e.target.closest(".sk-item");
if (item) selectJob(item.getAttribute("data-id"), item);
});
$("cr-detail").addEventListener("click", async (e) => {
const toggle = e.target.closest("[data-toggle]");
const del = e.target.closest("[data-del]");
const openTask = e.target.closest("[data-open-task]");
if (openTask) {
closeCronsModal();
selectTask(openTask.getAttribute("data-open-task"));
return;
}
if (toggle) {
const id = toggle.getAttribute("data-toggle");
const j = _jobs.find((x) => x.job_id === id);
toggle.disabled = true;
try {
await api("PATCH", "/v1/schedules/" + encodeURIComponent(id), { enabled: !j.enabled });
await renderList();
selectJob(id);
} catch (err) {
alert("操作失败: " + err.message);
toggle.disabled = false;
}
return;
}
if (del) {
const id = del.getAttribute("data-del");
const j = _jobs.find((x) => x.job_id === id);
if (!confirm(`删除定时任务「${j ? j.name : id}」?不可撤销(停用可保留改用「停用」)。`)) return;
del.disabled = true;
try {
await api("DELETE", "/v1/schedules/" + encodeURIComponent(id));
$("cr-detail").innerHTML = PLACEHOLDER;
await renderList();
} catch (err) {
alert("删除失败: " + err.message);
del.disabled = false;
}
}
});

View File

@ -8,6 +8,7 @@ import { api } from "./api.js";
import { closeChpwModal } from "./auth.js";
import { closeSkillsModal } from "./skills.js";
import { closeMemoryModal } from "./memory.js";
import { closeCronsModal } from "./crons.js";
import { closeFilePreview, closeMiniPreview } from "./preview.js";
import { closeSrcPicker, loadFiles } from "./files.js";
import { loadFolderSuggestions } from "./newtask.js";
@ -87,6 +88,7 @@ document.addEventListener("keydown", (e) => {
if ($("chpw-modal").classList.contains("show")) { closeChpwModal(); return; }
if ($("skills-modal").classList.contains("show")) { closeSkillsModal(); return; }
if ($("memory-modal").classList.contains("show")) { closeMemoryModal(); return; }
if ($("crons-modal").classList.contains("show")) { closeCronsModal(); return; }
if ($("mini-preview-modal").classList.contains("show")) { closeMiniPreview(); return; }
if ($("src-picker-modal").classList.contains("show")) { closeSrcPicker(); return; }
if ($("file-preview-modal").classList.contains("show")) { closeFilePreview(); return; }