From 108351864eb41c65671d739930b04dfc8af7e1a7 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 18 Jun 2026 13:42:31 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=20v1=20=E2=80=94=20=E5=AF=B9=E8=AF=9D=E5=BB=BA/?= =?UTF-8?q?=E7=AE=A1=20+=20=E5=AE=88=E6=8A=A4=E5=BE=AA=E7=8E=AF=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=20+=20=E5=8F=AA=E8=AF=BB=E5=89=8D=E7=AB=AF=20(DESIGN?= =?UTF-8?q?=20=C2=A78.5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 到点把一句自然语言 prompt 喂进 agent 主管线,可跑 skill 出简报 / 发邮件 / 打招呼等。 job 本体 = cron+时区 + prompt + 会话模式;"发邮件"不是字段,是 agent 据 prompt 调 send_email 的动作 → 加任何能力不改 schema。 后端: - scheduled_jobs 表 + migration 0011(独立加表,公测兼容) - core/scheduler.py:croniter 算 next_run(时区+vixie OR 语义)、claim+advance 防重复触发、 失败阈值自停、notify 兜底投递、CRUD 服务层(工具与 REST 共用不漂移) - 守护循环 _scheduler_loop(lifespan,仿 _disk_scanner 的 plain-asyncio,不引 APScheduler/Celery; 复用 _run_agent_bg,抢 run 锁、超时协作 cancel、并发上限) - tools/send_email.py(host-side,SMTP_* 齐才挂) - /v1/schedules GET/PATCH/DELETE 三端点 对话端 = 完整 CRUD:schedule_create/list/update/cancel 四工具(定时 run 内不挂防自我繁殖)。 前端 = 只读 + 停用/删除:左栏 rail「定时」入口 + crons.js 只读 master-detail modal (复用 skills modal 范式);建/改故意只走对话,规避 cron 构建器 UX。 会话模式:isolated(默认,每次新建临时 task 省 token)/ persistent(绑 bound_task_id 续上下文)。 env:SMTP_* / ZCBOT_DISABLE_SCHEDULER / ZCBOT_SCHEDULER_TICK_SECONDS / ZCBOT_SCHEDULER_CONCURRENCY。 已验:migration 上库、CRUD 端到端、3 REST + 4 工具注册、crons.js 语法。 待验:起 web 进程跑一轮真实触发 + 邮件 smoke。bump 0.18.0 → 0.19.0。 Co-Authored-By: Claude Opus 4.8 (1M context) --- DESIGN.md | 53 +++ PROGRESS.md | 12 +- RUN.md | 11 + core/__init__.py | 2 +- core/agent_builder.py | 22 + core/scheduler.py | 405 ++++++++++++++++++ core/storage/models.py | 54 +++ .../20260618_1000_0011_scheduled_jobs.py | 70 +++ requirements.txt | 3 + tools/schedule.py | 202 +++++++++ tools/send_email.py | 169 ++++++++ web/app.py | 197 ++++++++- web/static/dev.html | 56 +++ web/static/js/crons.js | 163 +++++++ web/static/js/main.js | 2 + 15 files changed, 1417 insertions(+), 4 deletions(-) create mode 100644 core/scheduler.py create mode 100644 db/migrations/versions/20260618_1000_0011_scheduled_jobs.py create mode 100644 tools/schedule.py create mode 100644 tools/send_email.py create mode 100644 web/static/js/crons.js diff --git a/DESIGN.md b/DESIGN.md index d5009d1..6b71e1a 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -589,6 +589,59 @@ create index on usage_events (model_profile, created_at); **搁置(成本不抵当前收益)**:gunicorn 无感换版 / broker 外置 Redis / nginx 蓝绿双实例 —— 留到"单机线程池调到头仍不够"或"换版断流成真实投诉"再议(无感换版需先把 broker 外置共享,分析见 RUN.md §B)。 +### 8.5 定时任务 / 计划运行(Scheduled Jobs)(2026-06-18 设计,status=design) + +**缺口**:无任何定时触发机制。但有价值的活很多是**时间驱动**而非事件驱动 —— 每日简报、每周综述、定时拉数据存盘、早安提醒。当前必须有人在对话里手动发消息才跑得起来。诉求:**用对话方式创建**"每天 X 点干 Y"的任务,到点自动跑、结果送达。 + +**业界印证(四源高度收敛)**:OpenClaw `cron-jobs` / Autobot(agent-loop×cron)/ Claude Code routines / geta.team 自建调度器,关键模式一致 —— ① cron 到点**往同一条 agent 主管线注入一条带标记的消息**,不另起执行路径;② 三种会话隔离模式(isolated 默认 / persistent 续上下文 / main 系统事件);③ isolated 运行到期自动 prune;④ 退避重试(transient vs permanent);⑤ per-job 超时;⑥ 投递显式 + runner 兜底(OpenClaw `--announce`);⑦ 5 段 cron + 时区,警惕 dom/dow 同列的 vixie OR 语义坑;⑧ 持久化用 DB,管理三件套(`cron_create/list/delete`);⑨ 对话式自然语言创建即标准做法。 + +**核心洞察(把方案收口到极简)**:定时任务本体 = `什么时候(cron+时区)` + `做什么(一句自然语言 prompt)` + `跑在哪(会话模式)`。**复用现成 agent 主管线**(`web/app.py:_run_agent_bg`,§3.6 / §7.2 同一条 POST /messages 路径),守护循环只负责"到点把一条带 `[定时任务]` 标记的 prompt 喂进去",**不造第二套跑 agent 的逻辑**。 + +> **关键解耦:"发邮件"不是一等公民,是 agent 据 prompt 调工具的一个动作。** job 模型只存 prompt,"做什么 / 结果发哪"全在那句话里(发邮件→调 `send_email`;出简报→`load_skill` 落盘;打招呼→回一句话)。好处:未来加任何能力(telegram / webhook / 落盘 / 调 API)**不改 schema**,只要 agent 有对应工具、prompt 说清楚。 + +**三层投递(没人盯着看 → 结果不能丢)**: +1. **baseline(永远有,零配置)**:定时 run 就是正常 run,结果**必进对应 task 线程**;守护循环跑完给该 task 打**未读/通知标记**,用户下次登录可见。 +2. **opt-in 推送(prompt 驱动)**:要发邮件/(将来)telegram → prompt 里说,agent 调工具发。灵活、能写动态正文。 +3. **可靠兜底(可选结构化 `notify`)**:某 job 要"必达某邮箱、不靠 AI 记性" → job 带 `notify={channel,to}`,守护循环 run 完**确定性补发**最新产物。不填走第 1 层。 + +**会话模式(隔离轴,业界核心设计点)**: +- **isolated(默认)**:每次触发新建临时 task,只带 job 的 prompt + skill,**不继承对话历史**。上下文最小 → 省 token(契合 high-turn 烧 token 治理,§8.2 / [[project_high_turn_token_burn_root_causes]]);临时 task 打标签 + 到期自动归档,防 task 列表被每日任务刷屏。 +- **persistent(可选)**:job 绑定一个常驻 task(`bound_task_id`),每次往同一线程追加消息,有跨天连续性("和昨天比")。代价:线程越长重发历史越多、token 逐日涨 —— 仅在用户明确要连续性时用。 + +**数据模型(新表 `scheduled_jobs`,独立加表不碰现有 schema → 公测兼容)**: +`id, user_id, name, prompt, cron, tz(默 Asia/Shanghai), mode(isolated|persistent), bound_task_id(可空), notify(JSONB 可空), enabled, timeout_seconds, next_run_at, last_run_at, last_status, last_error, last_task_id, consecutive_failures, expires_at(可空), created_at, deleted_at`。Alembic 加表 migration;`usage_events` 复用现成记账(可加 `kind="scheduled"` 自由文本区分,无需 migration)。 + +**守护循环(仿 §8.4 `_disk_scanner`,plain-asyncio)**:lifespan 起一个后台 task,每 ~30s 扫 `enabled AND next_run_at<=now()`;命中即 `asyncio.create_task(asyncio.to_thread(_run_agent_bg, ...))` 复用现成路径,登记到 `app.state.inflight`(随关停 drain 一起收尾)。与**单活 run 锁**(§7.x `run_status` + `SELECT FOR UPDATE`)交互:isolated 每次新 task 天然无冲突;persistent 若绑定 task 正忙 → 跳过本次 + 记 warn,下一个点再来(不排队堆积)。run 完回写 `last_*` + croniter 算 `next_run_at`。 + +**croniter 选型**:存标准 5 段 cron 串 + 时区,`croniter` 算 `next_run_at`。理由:正确处理 dom/dow 同列的 vixie OR 语义和时区折算(手搓极易踩坑,四源都点名这个坑);纯 Python 小依赖。劣选:只支持"每天/每周 HH:MM"自己用 datetime 算 —— 零依赖但遇复杂周期要返工。 + +**可靠性(业界补的,纳入设计)**: +- **退避重试**:transient(限流/网络)指数退避重试(60s→120s→300s),成功重置;permanent(prompt 报错/鉴权)直接失败记 `last_error`。 +- **per-job 超时** `timeout_seconds`:超时复用现成协作式 cancel 信号(§7.x)。 +- **无补跑(no catch-up)**:守护进程宕机期间错过的点**跳到下一个**,不补 N 次(同 Claude Code 语义)。 +- **防自我繁殖**:定时 run 内**禁用 `schedule_create`**(防任务造任务);并发调度数设上限。 +- **expiry 安全界**:`expires_at` + `consecutive_failures` 阈值 → 连续失败 N 次或长期没人管自动停,防僵尸定时任务(同 Claude Code 7 天过期思路)。 + +**对话端(用户要的"对话方式创建")**:核心是 host-side 工具三件套 `schedule_create / schedule_list / schedule_cancel`(写 `scheduled_jobs`,按 `user_id` 隔离,密钥不进沙箱,沿用 §3.4 typed-tool 范式)。自然语言进、自然语言管("我有哪些定时任务""取消那个简报")—— 即 Claude Code 模式(其定时任务纯工具实现,无配套 skill,证明工具单干可跑通)。 +- **工具必须、skill 可选后置**:skill 是 markdown 不能落库,执行器只能是工具;收集字段/`ask_user` 确认这套流程,能力强的模型靠工具自描述 schema 即可走通。故 **v1 纯工具**(schema + 参数描述写好就够),契合 §5 "Less Scaffolding, More Trust" —— 先信模型,跑不好再加脚手架。 +- **skill 真正值钱处不是教填参数(schema 够),而是教写好 `job.prompt`**:job 的 prompt 决定未来**每天**那次 run 的质量,用户随口一句直接存会跑得差;好 prompt 要自包含/可重复/产物位置明确/把发哪存哪写死 —— 模型默认不会,值得一份模板+确认纪律(cron 口径翻译、回读人话确认、默认 isolated 并提示 persistent 代价)去教。**v2 按需补**:实测发现 agent 写的 `job.prompt` 质量差 / 确认流程乱再加;且因调度低频,用按需 `load_skill`(§3.5)而非 always-on prompt 块,避免每轮白烧 token(§8.2)。 +- 三件套用**三个独立工具**(schema 清晰、对齐 Claude Code `CronCreate/List/Delete`),非单工具带 `action` 参数。 + +**取舍(不选)**: +- **不引 APScheduler / Celery**:项目刻意用 plain-asyncio 后台循环(§8.4),调度需求是单机低并发,引调度框架/Redis broker 是过度工程。 +- **不学 geta 用 JSON 文件持久化**:已有 PG + SQLAlchemy + alembic,加表是自然选择(JSON 文件丢状态、无事务、无按 user 查询)。 +- **email 不做成 job 一等字段**:降通用性(见核心解耦);仅留可选 `notify` 兜底。 + +**风险 / 边界 / 待定**: +- **`send_email` 工具仍要建**(`tools/send_email.py`,host-side,仅当 `SMTP_*` env 存在才挂,沿用 §3.4 "有 key 才注册"),让第 2/3 层能用。**待定:SMTP 发信账号**(企业邮箱/QQ/163/Gmail 应用密码)—— 给真实账号走 env,或先占位走沙箱验证链路。 +- **计费归属**:定时 run 计入 job 所属 `user_id` 的 token/配额,`usage_events` 标 `kind="scheduled"` 可审计。 +- **错峰抖动**:多用户同设 8 点 → 按 job-id 加确定性偏移防同一秒打爆 LLM provider(单机低并发,列 nice-to-have 不阻塞 v1)。 +- **待定小项**:可选 `notify` 字段是否 v1 就上(倾向上,零成本兜底);`expires_at` 默认值。 + +**改动面(v1)**:1 张新表 + migration、1 守护循环(lifespan)、4 个 schedule 工具(create/list/**update**/cancel)、1 个 send_email 工具、agent_builder 注册 + 定时 run 内工具裁剪。**v2 按需**:薄 skill(教写 `job.prompt`)。**不动** loop / llm / capabilities / 现有 DB schema。 + +**前端取舍(2026-06-18 定 + 落地):对话端做完整 CRUD,前端只读展示 + 停用/删除。** 前端 SPA 调 `/v1/*` REST、不经 agent → "界面建/改定时任务"必须另开 REST + 表单 + cron 构建器(整套最重的是让科研用户填 cron 的 UX)。既然产品本就是对话式 agent,把建/改/删/查全收到对话(`schedule_*` 工具),**前端退化成只读看板**:`GET /v1/schedules` 列表 + 列表项「停用/删除」两个高频便捷动作(`PATCH`/`DELETE /v1/schedules/{id}`)。好处:cron 构建器 UX 难题直接消失(用户从不在前端填 cron,对 bot 说"每天早九点"由模型翻译);无"前端改了和对话不同步"的状态问题。代价:界面不能新建/编辑(需求低频,且对话更自然)。落地:`web/static/js/crons.js` 只读 master-detail modal(复用 skills modal 范式)+ 左栏 rail「定时」入口;工具与 REST 共用 `core.scheduler` CRUD 服务层不漂移。 + --- ## 附录:DeepSeek V4 关键事实(2026-04-24) diff --git a/PROGRESS.md b/PROGRESS.md index 0e2f1ca..45d5662 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -2,7 +2,7 @@ > 配合 `DESIGN.md`。本文件只记 phase 状态、决策偏差、文件量、下一步。每条 1-2 句:做了啥 + 关键判断;细节查 `git log` / `git diff` / `DESIGN §7.9`。 -最后更新:2026-06-18(新增 brief skill:科研方向简报,三路检索 documents/research/web + 文献计量趋势型简报) +最后更新:2026-06-18(定时任务 v1:scheduled_jobs 表 + plain-asyncio 守护循环 + 3 工具 + send_email,DESIGN §8.5) --- @@ -21,6 +21,16 @@ ## 已完成关键能力 +### 2026-06-18 / 定时任务 v1(scheduled_jobs,DESIGN §8.5) + +- 需求:对话方式建"每天 X 点干 Y"的定时任务(跑 skill 出简报 / 发邮件 / 打招呼皆可)。调研 OpenClaw/Autobot/Claude Code/geta 四源收敛,定方案见 DESIGN §8.5。 +- **核心解耦**:job 本体 = `cron+tz + 一句 prompt + 会话模式`;"发邮件"不是字段,是 agent 据 prompt 调 `send_email` 的动作 → 加任何能力不改 schema。 +- **不引调度框架**:croniter(唯一新依赖)只当 next_run 计算器(正确处理 dom/dow OR 语义 + 时区);"每 30s 醒来扫到点 job"是 plain-asyncio 守护循环,仿 §8.4 `_disk_scanner`,复用 `_run_agent_bg`,不上 APScheduler/Celery。 +- **文件**:`core/storage/models.py` 加 `ScheduledJob` + migration `0011_scheduled_jobs`(独立加表,公测兼容)/ `core/scheduler.py`(cron 数学 + claim+advance 防重复触发 + record_result 失败阈值自停 + notify 兜底投递 + CRUD 服务层 `list/create/update/set_enabled/cancel_job`,工具与 REST 共用)/ `tools/schedule.py`(create/list/**update**/cancel 四件套,薄包装服务层,user_id ctor 注入,定时 run 内不挂防自我繁殖)/ `tools/send_email.py`(host-side,SMTP_* 齐才挂)/ `web/app.py` lifespan `_scheduler_loop` + `_execute_scheduled_job`(认领→抢 run 锁→to_thread 跑→超时协作 cancel→notify→记账)+ `/v1/schedules` GET/PATCH/DELETE 三端点。 +- **对话端 = 完整 CRUD**(建/改/删/查都说着办);**前端 = 只读展示 + 停用/删除两个便捷按钮**(左栏 rail「定时」按钮 → `crons.js` 只读 master-detail modal,复用 skills modal 范式;建/改无 REST、故意只走对话,§8.5)。两条路径共用 `core.scheduler` 服务层不漂移。 +- **会话模式**:isolated(默认,每次新建临时 task `scheduled-` 目录,省 token)/ persistent(绑定 bound_task_id 续上下文)。env:`SMTP_*` / `ZCBOT_DISABLE_SCHEDULER` / `ZCBOT_SCHEDULER_TICK_SECONDS` / `ZCBOT_SCHEDULER_CONCURRENCY`(见 RUN)。已验:migration 上库 0011、CRUD 服务层端到端、3 REST 路由 + 4 工具注册、crons.js 语法。bump 0.18.0 → 0.19.0。 +- **v2 待做**:对话工具教写好 job.prompt 的薄 skill;退避重试(transient/permanent 区分)目前简化为"到下一 cron 点 + 连失败 5 次自停";真机邮件 smoke + 守护循环定时触发的端到端验证(需起 web 进程跑一轮)。 + ### 2026-06-18 / brief skill:科研方向简报 - 需求:用户要"水泥/建材方向的科研简报"。联网调研简报类做法——Anthropic 官方 digest skill(办公活动聚合)+ Paper Digest(论文影响力周报)+ 文献计量趋势报告(热点聚类/新兴方法/地理格局)。结论:现有 skill 缺"某方向近期文献 → 有判断的趋势简报"这一环(research/documents 只取文献不组织、paper-review 出可投稿综述、analyze 拆问题不查文献)。 diff --git a/RUN.md b/RUN.md index baf158e..b51571e 100644 --- a/RUN.md +++ b/RUN.md @@ -44,6 +44,17 @@ # 对话整个 run 期占 1 线程,active_runs 逼近此值即排队(看 journalctl 的 [stats] 行); # 并发不够再调大(先确认是真并发高、而非单条 run 慢)。 # ZCBOT_RUN_MAX_WORKERS=16 + # 定时任务发邮件(send_email tool + 定时任务 notify 兜底投递,DESIGN §8.5):可选。 + # 三者齐了才挂 send_email tool(没配的部署 agent 看不到这个工具);密钥只留宿主、不进 sandbox。 + # SMTP_HOST=smtp.qq.com + # SMTP_PORT=465 # 默 465;465→SSL,其余→STARTTLS(可用 SMTP_TLS=ssl|starttls|none 覆盖) + # SMTP_USER=you@qq.com + # SMTP_PASSWORD=<授权码/应用专用密码,非登录密码> + # SMTP_FROM=you@qq.com # 可选,默认取 SMTP_USER + # 定时任务守护循环(DESIGN §8.5,随 web 进程起,plain-asyncio 仿 _disk_scanner): + # ZCBOT_DISABLE_SCHEDULER=1 # 可选,整体关掉调度(对照 Claude Code CLAUDE_CODE_DISABLE_CRON) + # ZCBOT_SCHEDULER_TICK_SECONDS=30 # 可选,扫描间隔,默 30s + # ZCBOT_SCHEDULER_CONCURRENCY=4 # 可选,并发跑的定时 run 上限,默 4 ``` > litellm 在 import 时副作用加载 .env;入口走 `main.py`,`.env` 自动生效。直跑 `python -c "from core.storage import ..."` 不经 litellm 链路时记得自己 `import litellm` 触发,或手动 `export ZCBOT_DB_URL=...`。 - **依赖**:`pip install -r requirements.txt`(已在 `.venv` 里;含 `bcrypt`)。 diff --git a/core/__init__.py b/core/__init__.py index 7d9f014..9c42e8b 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,3 +1,3 @@ # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # 改版本只动这一行。 -__version__ = "0.18.0" +__version__ = "0.19.0" diff --git a/core/agent_builder.py b/core/agent_builder.py index 8e6ca3c..472289e 100644 --- a/core/agent_builder.py +++ b/core/agent_builder.py @@ -56,6 +56,10 @@ from tools.task_progress import TaskProgressTool from tools.ask_user import AskUserTool from tools.web_fetch import WebFetchTool from tools.web_search import WebSearchTool +from tools.schedule import ( + ScheduleCancelTool, ScheduleCreateTool, ScheduleListTool, ScheduleUpdateTool, +) +from tools.send_email import SendEmailTool, smtp_configured from core.ark_client import ArkConfig from core.bocha_client import BochaConfig @@ -361,6 +365,7 @@ def build_agent( image_variant: str = "", video_variant: str = "", cancel_check: Optional[Callable[[], bool]] = None, + scheduled_run: bool = False, ) -> Tuple[AgentLoop, Session, str, TaskState, Path]: """返回 (agent, session, task_id_str, task_state, working_dir_path)。 @@ -546,6 +551,23 @@ def build_agent( ): tools[t.name] = t + # 定时任务管理(DESIGN §8.5):增删查三件套。**定时 run 内不挂**(防任务造任务, + # 自我繁殖);仅交互对话里能建/管 job。user_id 由 ctor 注入,不信模型传的 id。 + if not scheduled_run: + for t in ( + ScheduleCreateTool(uid, base_dir=tool_base, user_root=ur_path), + ScheduleListTool(uid, base_dir=tool_base, user_root=ur_path), + ScheduleUpdateTool(uid, base_dir=tool_base, user_root=ur_path), + ScheduleCancelTool(uid, base_dir=tool_base, user_root=ur_path), + ): + tools[t.name] = t + + # 发邮件(§8.5 投递):仅当 SMTP_* env 齐了才挂(沿用"有 key 才注册",没配的 + # 部署里 agent 看不到一个永远报错的工具)。定时与交互 run 都可用。 + if smtp_configured(): + se = SendEmailTool(base_dir=tool_base, user_root=ur_path) + tools[se.name] = se + if caps.enable_run_python: rp = RunPythonTool(base_dir=tool_base, user_root=ur_path) tools[rp.name] = rp diff --git a/core/scheduler.py b/core/scheduler.py new file mode 100644 index 0000000..0a28c96 --- /dev/null +++ b/core/scheduler.py @@ -0,0 +1,405 @@ +"""定时任务调度核心(DESIGN §8.5)。 + +纯逻辑层:cron→next_run 计算、due 任务认领、跑完记账、确定性兜底投递。 +**不碰 asyncio / broker / _run_agent_bg**(那些 web 专属编排留在 web/app.py 的 +lifespan `_scheduler_loop`,仿 _disk_scanner 调本模块)。 + +为什么 claim 时就推进 next_run_at:守护循环每 ~30s 扫一次,若不在认领时把 job 的 +next_run_at 推到下一个 cron 点,run 还没跑完时下一 tick 会把同一 job 重复触发。 +claim+advance 一把事务做掉 → 天然防重复触发(at-most-once per slot)。 +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Optional +from uuid import UUID, uuid4 + +from croniter import croniter +from sqlalchemy import select, update + +from .storage import session_scope +from .storage.models import ScheduledJob + +_MODES = ("isolated", "persistent") + +try: + from zoneinfo import ZoneInfo +except ImportError: # pragma: no cover (py<3.9 不支持,本项目 3.11+) + ZoneInfo = None # type: ignore + +# 连续失败到这个数自动停(防僵尸定时任务,DESIGN §8.5 expiry 安全界) +FAILURE_DISABLE_THRESHOLD = 5 +# 单次 tick 最多认领多少 job(防一批同点任务一次性涌入) +CLAIM_LIMIT = 20 + + +def validate_cron(expr: str) -> None: + """非法 cron 抛 ValueError(给 schedule_create 工具做入参校验)。""" + expr = (expr or "").strip() + if not expr or not croniter.is_valid(expr): + raise ValueError(f"非法 cron 表达式: {expr!r}(需标准 5 段,如 '0 8 * * *')") + + +def _tzinfo(tz: str): + if ZoneInfo is None: + return timezone.utc + try: + return ZoneInfo(tz or "Asia/Shanghai") + except Exception: + return ZoneInfo("Asia/Shanghai") + + +def compute_next_run(cron: str, tz: str, after: Optional[datetime] = None) -> datetime: + """按墙钟时区算下一个触发点,返回 UTC-aware datetime。 + + croniter 保留 base 的 tzinfo —— 把 base 折算到 job 的本地时区再算, + '0 8 * * *' 就是该时区的早上 8 点,而非 UTC 8 点(§8.5 时区坑)。 + """ + tzinfo = _tzinfo(tz) + base = (after or datetime.now(timezone.utc)).astimezone(tzinfo) + nxt = croniter(cron, base).get_next(datetime) + return nxt.astimezone(timezone.utc) + + +def _snapshot(job: ScheduledJob) -> dict[str, Any]: + """把 ORM 行拍成普通 dict,脱离 session 给编排层用(避免跨线程 lazy-load)。""" + return { + "job_id": job.job_id, + "user_id": job.user_id, + "name": job.name, + "prompt": job.prompt, + "cron": job.cron, + "tz": job.tz, + "mode": job.mode, + "bound_task_id": job.bound_task_id, + "skill": job.skill or "", + "model_profile": job.model_profile or "", + "notify": job.notify, + "timeout_seconds": job.timeout_seconds or 0, + } + + +def claim_due_jobs(now: Optional[datetime] = None, limit: int = CLAIM_LIMIT) -> list[dict[str, Any]]: + """认领到点 job:一把事务 SELECT due + 推进 next_run_at,返回快照列表。 + + - 到点判据:enabled AND deleted_at IS NULL AND next_run_at <= now + - 过期(expires_at <= now)的:置 enabled=False 跳过,不返回(安全界) + - 其余:next_run_at 推到下一个 cron 点(防重复触发),返回快照交编排层去跑 + """ + now = now or datetime.now(timezone.utc) + claimed: list[dict[str, Any]] = [] + with session_scope() as s: + rows = s.execute( + select(ScheduledJob) + .where( + ScheduledJob.enabled.is_(True), + ScheduledJob.deleted_at.is_(None), + ScheduledJob.next_run_at <= now, + ) + .order_by(ScheduledJob.next_run_at) + .limit(limit) + .with_for_update(skip_locked=True) + ).scalars().all() + for job in rows: + if job.expires_at is not None and job.expires_at <= now: + job.enabled = False + job.last_status = "expired" + continue + claimed.append(_snapshot(job)) + try: + job.next_run_at = compute_next_run(job.cron, job.tz, after=now) + except Exception: + # cron 莫名失效(理论上 create 时已校验)→ 停掉别让它卡死循环 + job.enabled = False + job.last_status = "error" + job.last_error = "cron 计算失败,已自动停用" + return claimed + + +def record_result( + job_id: UUID, + *, + status: str, + task_id: Optional[UUID], + error: Optional[str] = None, +) -> None: + """run 跑完(或 skip)后回写 last_*。ok 重置连续失败计数;error 累加, + 到阈值自动停用。""" + now = datetime.now(timezone.utc) + with session_scope() as s: + job = s.get(ScheduledJob, job_id) + if job is None: + return + job.last_run_at = now + job.last_status = status + job.last_error = error + if task_id is not None: + job.last_task_id = task_id + if status == "ok": + job.run_count = (job.run_count or 0) + 1 + job.consecutive_failures = 0 + elif status == "error": + job.run_count = (job.run_count or 0) + 1 + job.consecutive_failures = (job.consecutive_failures or 0) + 1 + if job.consecutive_failures >= FAILURE_DISABLE_THRESHOLD: + job.enabled = False + job.last_error = ( + f"连续失败 {job.consecutive_failures} 次,已自动停用。最后错误: {error}" + ) + # status == "skipped"(persistent task 正忙)不动计数,下一 cron 点再来 + + +def build_run_message(snapshot: dict[str, Any]) -> str: + """把 job.prompt 包成一条带标记的用户消息喂进 agent。""" + when = datetime.now(_tzinfo(snapshot.get("tz") or "Asia/Shanghai")).strftime("%Y-%m-%d %H:%M") + name = snapshot.get("name") or "定时任务" + return ( + f"[定时任务「{name}」自动触发 · {when}]\n\n" + f"{snapshot['prompt']}" + ) + + +# ───────────── 第 3 层确定性兜底投递(notify) ───────────── + +def _newest_artifact(working_dir: Path) -> Optional[Path]: + """工作目录里最近修改的普通文件(跳过隐藏 / .preview 缓存)。""" + if not working_dir.is_dir(): + return None + best: Optional[Path] = None + best_mtime = -1.0 + for p in working_dir.rglob("*"): + if not p.is_file(): + continue + if any(part.startswith(".") for part in p.relative_to(working_dir).parts): + continue + try: + m = p.stat().st_mtime + except OSError: + continue + if m > best_mtime: + best, best_mtime = p, m + return best + + +def deliver_notify( + notify: Optional[dict[str, Any]], + *, + job_name: str, + working_dir: Path, + tz: str, +) -> None: + """job 配了 notify 就确定性补发(不靠 agent 记性)。目前仅 email 通道: + 把工作目录最新产物当附件,套固定模板发。无产物则发纯文本告知已执行。 + + 阻塞 IO(smtplib),由编排层放进 run_in_executor 调。失败抛异常,编排层吞掉记日志。 + """ + if not notify or notify.get("channel") != "email": + return + to = notify.get("to") + if not to: + return + from tools.send_email import send_email_smtp # 延迟导入,避免 core→tools 顶层环依赖 + + when = datetime.now(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M") + artifact = _newest_artifact(working_dir) + if artifact is not None: + subject = f"[定时任务] {job_name} · {when}" + body = f"定时任务「{job_name}」已于 {when} 执行,产物见附件:{artifact.name}。" + send_email_smtp(to, subject, body, [artifact]) + else: + subject = f"[定时任务] {job_name} · {when}(无产物文件)" + body = f"定时任务「{job_name}」已于 {when} 执行,本次未产生文件产物。" + send_email_smtp(to, subject, body) + + +# ───────────── CRUD 服务层(对话工具 + REST 端点共用,DESIGN §8.5)───────────── +# +# tools/schedule.py(对话)与 web/app.py 的 /v1/schedules(前端只读+停用/删除)都调 +# 这一层,避免两条创建路径逻辑漂移。函数内自管 session、返回可序列化 dict(脱离 +# session,跨 web/工具线程安全);校验失败抛 JobError → 工具转 [Error] 行 / REST 转 4xx。 + +WEEKDAYS_CN = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] + + +class JobError(ValueError): + """job 校验 / 找不到 —— 工具转 [Error],REST 转 400/404。""" + + +def describe_cron(cron: str, tz: str) -> str: + """常见 cron → 人话(给前端/工具回显);不认的样式退回原 cron 串。""" + parts = (cron or "").split() + if len(parts) != 5: + return cron + mi, ho, dom, mon, dow = parts + hhmm = None + if mi.isdigit() and ho.isdigit(): + hhmm = f"{int(ho):02d}:{int(mi):02d}" + if hhmm and dom == "*" and mon == "*" and dow == "*": + return f"每天 {hhmm}" + if hhmm and dom == "*" and mon == "*" and dow.isdigit(): + wd = int(dow) % 7 # cron 0/7=周日;映射到 WEEKDAYS_CN(0=周一) + idx = 6 if wd == 0 else wd - 1 + return f"每{WEEKDAYS_CN[idx]} {hhmm}" + if hhmm and dom.isdigit() and mon == "*" and dow == "*": + return f"每月 {int(dom)} 号 {hhmm}" + if mi.startswith("*/") and ho == "*" and dom == "*": + return f"每 {mi[2:]} 分钟" + if mi.isdigit() and ho.startswith("*/") and dom == "*": + return f"每 {ho[2:]} 小时(第 {int(mi)} 分)" + return cron + + +def job_to_dict(job: ScheduledJob) -> dict[str, Any]: + """ORM 行 → API/工具用的可序列化快照。""" + def _iso(dt: Optional[datetime]) -> Optional[str]: + if dt is None: + return None + return (dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)).isoformat() + + return { + "job_id": str(job.job_id), + "short_id": str(job.job_id)[:8], + "name": job.name, + "prompt": job.prompt, + "cron": job.cron, + "schedule_desc": describe_cron(job.cron, job.tz), + "tz": job.tz, + "mode": job.mode, + "skill": job.skill or "", + "model_profile": job.model_profile or "", + "notify": job.notify, + "enabled": job.enabled, + "timeout_seconds": job.timeout_seconds or 0, + "next_run_at": _iso(job.next_run_at), + "last_run_at": _iso(job.last_run_at), + "last_status": job.last_status, + "last_error": job.last_error, + "last_task_id": str(job.last_task_id) if job.last_task_id else None, + "consecutive_failures": job.consecutive_failures or 0, + "run_count": job.run_count or 0, + "expires_at": _iso(job.expires_at), + "created_at": _iso(job.created_at), + } + + +def _validate_tz(tz: str) -> str: + tz = (tz or "Asia/Shanghai").strip() or "Asia/Shanghai" + if ZoneInfo is not None: + try: + ZoneInfo(tz) + except Exception: + raise JobError(f"未知时区: {tz!r}(用 IANA 名,如 'Asia/Shanghai')") + return tz + + +def list_jobs(user_id: UUID) -> list[dict[str, Any]]: + with session_scope() as s: + rows = s.execute( + select(ScheduledJob) + .where(ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None)) + .order_by(ScheduledJob.created_at.desc()) + ).scalars().all() + return [job_to_dict(j) for j in rows] + + +def create_job( + user_id: UUID, + *, + name: str, + prompt: str, + cron: str, + tz: str = "Asia/Shanghai", + mode: str = "isolated", + skill: str = "", + notify: Optional[dict[str, Any]] = None, + model_profile: str = "", + timeout_seconds: int = 0, +) -> dict[str, Any]: + name = (name or "").strip() + prompt = (prompt or "").strip() + cron = (cron or "").strip() + mode = (mode or "isolated").strip().lower() + if not name: + raise JobError("name 不能为空") + if not prompt: + raise JobError("prompt 不能为空") + if mode not in _MODES: + raise JobError(f"mode 必须是 {_MODES} 之一") + validate_cron(cron) + tz = _validate_tz(tz) + next_run = compute_next_run(cron, tz) + job = ScheduledJob( + job_id=uuid4(), user_id=user_id, name=name, prompt=prompt, cron=cron, tz=tz, + mode=mode, skill=(skill or "").strip(), notify=notify, + model_profile=(model_profile or "").strip(), + timeout_seconds=int(timeout_seconds or 0), next_run_at=next_run, + ) + with session_scope() as s: + s.add(job) + s.flush() + return job_to_dict(job) + + +def _resolve(s, user_id: UUID, id_str: str) -> ScheduledJob: + """按完整 UUID 或短 id 前缀定位当前用户的 job;0 或多匹配抛 JobError。""" + jid = (id_str or "").strip() + if not jid: + raise JobError("job_id 不能为空") + rows = s.execute( + select(ScheduledJob).where( + ScheduledJob.user_id == user_id, ScheduledJob.deleted_at.is_(None) + ) + ).scalars().all() + matches = [j for j in rows if str(j.job_id) == jid or str(j.job_id).startswith(jid)] + if not matches: + raise JobError(f"没找到 id 以 {jid!r} 开头的定时任务") + if len(matches) > 1: + ids = ", ".join(str(j.job_id)[:8] for j in matches) + raise JobError(f"id 前缀 {jid!r} 匹配到多个({ids}),请用更长的 id") + return matches[0] + + +_EDITABLE = {"name", "prompt", "cron", "tz", "mode", "skill", "notify", "model_profile", + "timeout_seconds", "enabled"} + + +def update_job(user_id: UUID, id_str: str, **fields: Any) -> dict[str, Any]: + """改 job 的任意可编辑字段;改了 cron/tz 则重算 next_run_at。""" + fields = {k: v for k, v in fields.items() if k in _EDITABLE and v is not None} + if not fields: + raise JobError("没有可更新的字段") + if "mode" in fields: + fields["mode"] = str(fields["mode"]).strip().lower() + if fields["mode"] not in _MODES: + raise JobError(f"mode 必须是 {_MODES} 之一") + if "cron" in fields: + validate_cron(str(fields["cron"]).strip()) + fields["cron"] = str(fields["cron"]).strip() + if "tz" in fields: + fields["tz"] = _validate_tz(str(fields["tz"])) + with session_scope() as s: + job = _resolve(s, user_id, id_str) + for k, v in fields.items(): + setattr(job, k, v) + if "cron" in fields or "tz" in fields: + job.next_run_at = compute_next_run(job.cron, job.tz) + # 重新启用 / 改了排程 → 清零连续失败计数,给它干净的重新开始 + if fields.get("enabled") is True or "cron" in fields: + job.consecutive_failures = 0 + s.flush() + return job_to_dict(job) + + +def set_enabled(user_id: UUID, id_str: str, enabled: bool) -> dict[str, Any]: + return update_job(user_id, id_str, enabled=bool(enabled)) + + +def cancel_job(user_id: UUID, id_str: str) -> dict[str, Any]: + """软删(deleted_at + enabled=False)。""" + with session_scope() as s: + job = _resolve(s, user_id, id_str) + job.deleted_at = datetime.now(timezone.utc) + job.enabled = False + s.flush() + return job_to_dict(job) diff --git a/core/storage/models.py b/core/storage/models.py index 485f7e1..99fb173 100644 --- a/core/storage/models.py +++ b/core/storage/models.py @@ -21,6 +21,7 @@ from uuid import UUID, uuid4 from sqlalchemy import ( BigInteger, + Boolean, DateTime, ForeignKey, Integer, @@ -171,3 +172,56 @@ class UserDiskUsage(Base): ) +class ScheduledJob(Base): + """定时任务(0011,DESIGN §8.5)。 + + 一行 = 一个"到点把 prompt 喂进 agent 主管线"的计划。本体 = cron+tz(何时) + + prompt(做什么)+ mode(跑在哪);"发邮件"不是字段,是 agent 据 prompt 调 + send_email 的动作。仅 notify(可空 JSONB)给"必达某邮箱"留确定性兜底。 + + 守护循环(web/app.py lifespan `_scheduler_loop`,仿 _disk_scanner)每 ~30s 扫 + `enabled AND deleted_at IS NULL AND next_run_at<=now()`,命中即复用 _run_agent_bg + 起 run,跑完回写 last_* + croniter 算 next_run_at。mode: + - isolated(默认):每次新建临时 task,只带本 job 的 prompt,不继承历史 → 省 token + - persistent:绑定 bound_task_id 常驻 task,追加消息有跨天连续性 + """ + + __tablename__ = "scheduled_jobs" + + job_id: Mapped[UUID] = mapped_column(PG_UUID(as_uuid=True), primary_key=True, default=uuid4) + user_id: Mapped[UUID] = mapped_column( + PG_UUID(as_uuid=True), ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False + ) + name: Mapped[str] = mapped_column(Text, nullable=False) + prompt: Mapped[str] = mapped_column(Text, nullable=False) + cron: Mapped[str] = mapped_column(Text, nullable=False) # 标准 5 段 cron + tz: Mapped[str] = mapped_column(Text, nullable=False, server_default="Asia/Shanghai") + mode: Mapped[str] = mapped_column(Text, nullable=False, server_default="isolated") # isolated|persistent + # persistent 模式绑定的常驻 task;task 软删/物理删后 SET NULL(下次触发当 isolated 兜底) + bound_task_id: Mapped[Optional[UUID]] = mapped_column( + PG_UUID(as_uuid=True), ForeignKey("tasks.task_id", ondelete="SET NULL"), nullable=True + ) + skill: Mapped[str] = mapped_column(Text, nullable=False, server_default="") # 可选预载 skill + model_profile: Mapped[str] = mapped_column(Text, nullable=False, server_default="") # 可选模型覆盖 + # 第 3 层可靠投递:{"channel":"email","to":"a@b.com"};NULL=不兜底(走 prompt 驱动/线程未读) + notify: Mapped[Optional[dict[str, Any]]] = mapped_column(JSONB, nullable=True) + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="true") + timeout_seconds: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0") # 0=不限 + + next_run_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + last_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True) + last_status: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # ok|error|skipped + last_error: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + last_task_id: Mapped[Optional[UUID]] = mapped_column(PG_UUID(as_uuid=True), nullable=True) + consecutive_failures: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0") + run_count: Mapped[int] = mapped_column(Integer, nullable=False, server_default="0") + expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + deleted_at: Mapped[Optional[datetime]] = mapped_column( + DateTime(timezone=True), nullable=True + ) + + diff --git a/db/migrations/versions/20260618_1000_0011_scheduled_jobs.py b/db/migrations/versions/20260618_1000_0011_scheduled_jobs.py new file mode 100644 index 0000000..181a5fd --- /dev/null +++ b/db/migrations/versions/20260618_1000_0011_scheduled_jobs.py @@ -0,0 +1,70 @@ +"""scheduled_jobs 表(定时任务,DESIGN §8.5). + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-06-18 + +新增独立表 scheduled_jobs —— 不碰现有 schema(公测兼容)。一行 = 一个"到点把 +prompt 喂进 agent 主管线"的计划。守护循环(web/app.py lifespan)按 (enabled, +next_run_at) 索引扫到点 job 触发。详 DESIGN §8.5 / core/storage/models.py。 +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import JSONB, UUID as PG_UUID + + +revision: str = "0011" +down_revision: Union[str, None] = "0010" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "scheduled_jobs", + sa.Column("job_id", PG_UUID(as_uuid=True), primary_key=True), + sa.Column( + "user_id", PG_UUID(as_uuid=True), + sa.ForeignKey("users.user_id", ondelete="CASCADE"), nullable=False, + ), + sa.Column("name", sa.Text(), nullable=False), + sa.Column("prompt", sa.Text(), nullable=False), + sa.Column("cron", sa.Text(), nullable=False), + sa.Column("tz", sa.Text(), nullable=False, server_default="Asia/Shanghai"), + sa.Column("mode", sa.Text(), nullable=False, server_default="isolated"), + sa.Column( + "bound_task_id", PG_UUID(as_uuid=True), + sa.ForeignKey("tasks.task_id", ondelete="SET NULL"), nullable=True, + ), + sa.Column("skill", sa.Text(), nullable=False, server_default=""), + sa.Column("model_profile", sa.Text(), nullable=False, server_default=""), + sa.Column("notify", JSONB(), nullable=True), + sa.Column("enabled", sa.Boolean(), nullable=False, server_default="true"), + sa.Column("timeout_seconds", sa.Integer(), nullable=False, server_default="0"), + sa.Column("next_run_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("last_status", sa.Text(), nullable=True), + sa.Column("last_error", sa.Text(), nullable=True), + sa.Column("last_task_id", PG_UUID(as_uuid=True), nullable=True), + sa.Column("consecutive_failures", sa.Integer(), nullable=False, server_default="0"), + sa.Column("run_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("expires_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True), + ) + # 守护循环 due 扫描热路径:WHERE enabled AND deleted_at IS NULL AND next_run_at<=now() + op.create_index( + "ix_scheduled_jobs_due", "scheduled_jobs", ["enabled", "next_run_at"], + ) + # 用户列出自己的 job + op.create_index( + "ix_scheduled_jobs_user", "scheduled_jobs", ["user_id"], + ) + + +def downgrade() -> None: + op.drop_index("ix_scheduled_jobs_user", table_name="scheduled_jobs") + op.drop_index("ix_scheduled_jobs_due", table_name="scheduled_jobs") + op.drop_table("scheduled_jobs") diff --git a/requirements.txt b/requirements.txt index 3e68bd6..1b2eae1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,9 @@ markitdown[pdf,docx,pptx,xlsx]>=0.0.1 httpx>=0.27.0 html2text>=2024.0 +# 定时任务(§8.5 scheduled_jobs):cron 串 → next_run_at 计算,正确处理 dom/dow OR 语义 + 时区 +croniter>=2.0 + # §7 B 阶段: Storage 落 PG sqlalchemy>=2.0.0 psycopg[binary]>=3.1.0 diff --git a/tools/schedule.py b/tools/schedule.py new file mode 100644 index 0000000..6961125 --- /dev/null +++ b/tools/schedule.py @@ -0,0 +1,202 @@ +"""定时任务对话工具(DESIGN §8.5 对话端)。 + +create / list / update / cancel —— 对话端的完整 CRUD。host-side,按 user_id 隔离 +(ctor 注入,不信模型传的 id)。增删改查逻辑全在 core.scheduler 服务层,本文件只做 +schema + 把结果/JobError 转成给模型看的文本。前端只读视图走 /v1/schedules REST, +同样调那一层 → 两条路径不漂移。 + +定时 run 内这几个工具不注册(防任务造任务,§8.5)。 +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from core import scheduler +from core.scheduler import JobError, _tzinfo +from .base import Tool + +_MODES = ("isolated", "persistent") + + +def _fmt_local_iso(iso: Optional[str], tz: str) -> str: + if not iso: + return "—" + dt = datetime.fromisoformat(iso) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M") + + +class _UserScopedTool(Tool): + """带 user_id 的 host-side 工具基类。""" + + def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None: + super().__init__(base_dir=base_dir, user_root=user_root) + self.user_id = user_id + + +class ScheduleCreateTool(_UserScopedTool): + name = "schedule_create" + description = ( + "Create a recurring scheduled task that runs an instruction automatically on a cron " + "schedule. The instruction (`prompt`) is what an agent will be told to do at each fire " + "time — write it self-contained and repeatable (which skill to use, what to produce, " + "where to deliver). To email a result, say so IN the prompt (the run can call send_email), " + "or set `notify_email` for guaranteed delivery of the latest output. ALWAYS confirm the " + "schedule (read back the cron in plain words + the resolved next run time) with the user " + "before relying on it. Times are wall-clock in `tz`." + ) + parameters = { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Short human label, e.g. '每日水泥简报'."}, + "prompt": { + "type": "string", + "description": "Self-contained instruction to run each time (e.g. '用 brief skill 生成今日简报并存盘').", + }, + "cron": { + "type": "string", + "description": "Standard 5-field cron 'min hour dom month dow'. E.g. '0 8 * * *' = daily 08:00; '0 9 * * 1' = Mondays 09:00.", + }, + "tz": {"type": "string", "description": "IANA timezone, default 'Asia/Shanghai'."}, + "mode": { + "type": "string", + "enum": list(_MODES), + "description": "isolated (default): fresh task each run, cheap, no cross-run memory. persistent: same task thread, keeps continuity (costs more tokens).", + }, + "skill": {"type": "string", "description": "Optional skill name to preload for the run."}, + "notify_email": { + "type": "string", + "description": "Optional email for guaranteed delivery of the run's latest artifact (deterministic, not dependent on the agent remembering).", + }, + "timeout_seconds": { + "type": "integer", + "description": "Optional hard timeout for each run (0 = no limit).", + }, + }, + "required": ["name", "prompt", "cron"], + } + + def execute( + self, name: str, prompt: str, cron: str, tz: str = "Asia/Shanghai", + mode: str = "isolated", skill: str = "", notify_email: str = "", + timeout_seconds: int = 0, + ) -> str: + notify = None + if notify_email and notify_email.strip(): + notify = {"channel": "email", "to": notify_email.strip()} + try: + job = scheduler.create_job( + self.user_id, name=name, prompt=prompt, cron=cron, tz=tz, mode=mode, + skill=skill, notify=notify, timeout_seconds=timeout_seconds, + ) + except JobError as e: + return f"[Error] {e}" + nf = f";产物将发到 {notify['to']}" if notify else "" + return ( + f"[ok] 已创建定时任务「{job['name']}」(id={job['short_id']},mode={job['mode']}{nf})。\n" + f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});" + f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}。" + ) + + +class ScheduleListTool(_UserScopedTool): + name = "schedule_list" + description = ( + "List the current user's scheduled tasks with id, schedule, next run time and last " + "status. Use when the user asks what scheduled tasks they have." + ) + parameters = {"type": "object", "properties": {}} + + def execute(self) -> str: + jobs = scheduler.list_jobs(self.user_id) + if not jobs: + return "(没有定时任务)" + lines = ["定时任务列表:"] + for j in jobs: + flag = "" if j["enabled"] else " [已停用]" + nxt = _fmt_local_iso(j["next_run_at"], j["tz"]) + lines.append( + f"- {j['short_id']} 「{j['name']}」{flag} | {j['schedule_desc']} | " + f"{j['mode']} | 下次 {nxt} | 上次 {j['last_status'] or '—'}" + ) + return "\n".join(lines) + + +class ScheduleUpdateTool(_UserScopedTool): + name = "schedule_update" + description = ( + "Edit an existing scheduled task by id (8-char short id from schedule_list is accepted). " + "Pass ONLY the fields to change. Use to reschedule (cron), rewrite the instruction " + "(prompt), pause/resume (enabled), change delivery (notify_email), etc. Changing cron/tz " + "recomputes the next run time. Confirm the change (read back new schedule) with the user." + ) + parameters = { + "type": "object", + "properties": { + "job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."}, + "name": {"type": "string", "description": "New label."}, + "prompt": {"type": "string", "description": "New instruction to run."}, + "cron": {"type": "string", "description": "New 5-field cron expression."}, + "tz": {"type": "string", "description": "New IANA timezone."}, + "mode": {"type": "string", "enum": list(_MODES), "description": "New session mode."}, + "skill": {"type": "string", "description": "New preload skill (empty string to clear)."}, + "enabled": {"type": "boolean", "description": "true=resume, false=pause."}, + "notify_email": { + "type": "string", + "description": "Set guaranteed-delivery email; empty string clears it.", + }, + "timeout_seconds": {"type": "integer", "description": "New per-run timeout (0=none)."}, + }, + "required": ["job_id"], + } + + def execute( + self, job_id: str, name: Optional[str] = None, prompt: Optional[str] = None, + cron: Optional[str] = None, tz: Optional[str] = None, mode: Optional[str] = None, + skill: Optional[str] = None, enabled: Optional[bool] = None, + notify_email: Optional[str] = None, timeout_seconds: Optional[int] = None, + ) -> str: + fields: dict = {} + for k, v in (("name", name), ("prompt", prompt), ("cron", cron), ("tz", tz), + ("mode", mode), ("skill", skill), ("enabled", enabled), + ("timeout_seconds", timeout_seconds)): + if v is not None: + fields[k] = v + # notify_email 是便捷字段 → 转 notify dict;传空串 = 清除兜底投递 + if notify_email is not None: + fields["notify"] = {"channel": "email", "to": notify_email.strip()} if notify_email.strip() else {} + try: + job = scheduler.update_job(self.user_id, job_id, **fields) + except JobError as e: + return f"[Error] {e}" + state = "启用" if job["enabled"] else "已停用" + return ( + f"[ok] 已更新定时任务「{job['name']}」(id={job['short_id']},{state})。\n" + f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});" + f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}。" + ) + + +class ScheduleCancelTool(_UserScopedTool): + name = "schedule_cancel" + description = ( + "Cancel (soft-delete) a scheduled task by id (8-char short id accepted). It stops firing " + "immediately. To merely pause (keep it for later), use schedule_update enabled=false instead." + ) + parameters = { + "type": "object", + "properties": { + "job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."}, + }, + "required": ["job_id"], + } + + def execute(self, job_id: str) -> str: + try: + job = scheduler.cancel_job(self.user_id, job_id) + except JobError as e: + return f"[Error] {e}" + return f"[ok] 已取消定时任务「{job['name']}」(id={job['short_id']})" diff --git a/tools/send_email.py b/tools/send_email.py new file mode 100644 index 0000000..a9793dc --- /dev/null +++ b/tools/send_email.py @@ -0,0 +1,169 @@ +"""发邮件(host-side,DESIGN §8.5 投递第 2/3 层)。 + +- `send_email_smtp(...)`:纯函数,读 SMTP_* env 发信。SendEmailTool 与定时任务的 + 确定性兜底投递(core/scheduler.py notify)共用它。 +- `smtp_configured()`:agent_builder 据此决定挂不挂 tool(沿用"有 key 才注册"范式, + §3.4);没配 SMTP 的部署里 agent 看不到一个永远报错的工具。 +- `SendEmailTool`:agent 在(定时或交互)run 里调,附件路径强制落 user_root 内防越界。 + +密钥只在 host 进程读,绝不进沙箱 / run_python。env: + SMTP_HOST SMTP_PORT(默 465) SMTP_USER SMTP_PASSWORD + SMTP_FROM(默 SMTP_USER) SMTP_TLS(ssl|starttls|none;默按端口:465→ssl 否则 starttls) +""" +from __future__ import annotations + +import os +import smtplib +from email.message import EmailMessage +from email.utils import formataddr +from pathlib import Path +from typing import Iterable, Optional + +from .base import Tool + +_MAX_ATTACH_BYTES = 20 * 1024 * 1024 # 单封附件总上限,防把大产物塞爆 SMTP +_MAX_RECIPIENTS = 10 + + +def smtp_configured() -> bool: + """最小可发信集合是否齐全。""" + return bool( + os.getenv("SMTP_HOST", "").strip() + and os.getenv("SMTP_USER", "").strip() + and os.getenv("SMTP_PASSWORD", "").strip() + ) + + +def _tls_mode(port: int) -> str: + mode = os.getenv("SMTP_TLS", "").strip().lower() + if mode in ("ssl", "starttls", "none"): + return mode + return "ssl" if port == 465 else "starttls" + + +def send_email_smtp( + to: Iterable[str] | str, + subject: str, + body: str, + attachments: Optional[Iterable[Path]] = None, + *, + timeout: float = 30.0, +) -> None: + """同步发一封纯文本邮件(可带附件)。失败抛异常,由调用方决定如何处理。 + + host-side 调用(SendEmailTool 在 to_thread 的 run 线程里;scheduler 在 + run_in_executor 里)—— smtplib 是阻塞 IO,不要在 asyncio loop 直接 await。 + """ + if not smtp_configured(): + raise RuntimeError("SMTP 未配置(需 SMTP_HOST/SMTP_USER/SMTP_PASSWORD)") + + host = os.getenv("SMTP_HOST", "").strip() + port = int(os.getenv("SMTP_PORT", "465").strip() or "465") + user = os.getenv("SMTP_USER", "").strip() + password = os.getenv("SMTP_PASSWORD", "").strip() + sender = os.getenv("SMTP_FROM", "").strip() or user + + if isinstance(to, str): + to_list = [to] + else: + to_list = list(to) + to_list = [a.strip() for a in to_list if a and a.strip()] + if not to_list: + raise ValueError("收件人为空") + if len(to_list) > _MAX_RECIPIENTS: + raise ValueError(f"收件人过多(上限 {_MAX_RECIPIENTS})") + + msg = EmailMessage() + msg["From"] = formataddr(("zcbot", sender)) + msg["To"] = ", ".join(to_list) + msg["Subject"] = subject or "(无主题)" + msg.set_content(body or "") + + total = 0 + for p in attachments or []: + p = Path(p) + if not p.is_file(): + continue + data = p.read_bytes() + total += len(data) + if total > _MAX_ATTACH_BYTES: + raise ValueError(f"附件总大小超过 {_MAX_ATTACH_BYTES // (1024*1024)}MB") + msg.add_attachment( + data, maintype="application", subtype="octet-stream", filename=p.name + ) + + tls = _tls_mode(port) + if tls == "ssl": + with smtplib.SMTP_SSL(host, port, timeout=timeout) as smtp: + smtp.login(user, password) + smtp.send_message(msg) + else: + with smtplib.SMTP(host, port, timeout=timeout) as smtp: + if tls == "starttls": + smtp.starttls() + smtp.login(user, password) + smtp.send_message(msg) + + +class SendEmailTool(Tool): + name = "send_email" + description = ( + "Send an email (plain text, optional file attachments) via the server's configured " + "SMTP account. Use this when the user asks to email a result/report to someone, or when " + "a scheduled task's instruction says to email its output. Attachments are paths inside " + "the working directory (e.g. 'report.docx' or '/figures/x.png'). Returns a short " + "confirmation or an [Error] line." + ) + parameters = { + "type": "object", + "properties": { + "to": { + "type": "array", + "items": {"type": "string"}, + "description": "Recipient email address(es).", + }, + "subject": {"type": "string", "description": "Email subject line."}, + "body": {"type": "string", "description": "Plain-text email body."}, + "attachments": { + "type": "array", + "items": {"type": "string"}, + "description": "Optional file paths (relative to working dir) to attach.", + }, + }, + "required": ["to", "subject", "body"], + } + + def execute( + self, + to: list[str] | str, + subject: str, + body: str, + attachments: Optional[list[str]] = None, + ) -> str: + if isinstance(to, str): + to = [to] + recipients = [a.strip() for a in (to or []) if isinstance(a, str) and a.strip()] + if not recipients: + return "[Error] to 不能为空" + + resolved: list[Path] = [] + for raw in attachments or []: + if not isinstance(raw, str) or not raw.strip(): + continue + p = self._resolve(raw.strip()).resolve() + # 附件强制落 user_root 内,防 ../ 读到别人/系统文件 + if self.user_root is not None: + try: + p.relative_to(self.user_root.resolve()) + except ValueError: + return f"[Error] 附件路径越界(必须在工作目录内): {raw}" + if not p.is_file(): + return f"[Error] 附件不存在: {raw}" + resolved.append(p) + + try: + send_email_smtp(recipients, subject, body, resolved) + except Exception as e: + return f"[Error] 发送失败: {type(e).__name__}: {e}" + n = f"(含 {len(resolved)} 个附件)" if resolved else "" + return f"[ok] 已发送给 {', '.join(recipients)} {n}".strip() diff --git a/web/app.py b/web/app.py index afaf5bb..57d48ad 100644 --- a/web/app.py +++ b/web/app.py @@ -36,13 +36,13 @@ from sqlalchemy import BigInteger, cast, func, select, update from starlette.background import BackgroundTask from core import __version__ -from core.paths import to_db_path +from core.paths import from_db_path, to_db_path from core.storage import ( NoSubtaskError, check_no_subtask, session_scope, ) -from core.storage.models import Message, Task, UsageEvent +from core.storage.models import Message, ScheduledJob, Task, UsageEvent from core.storage.utils import ensure_local_task_row from .auth import ( @@ -349,6 +349,7 @@ def _validate_transfer( def _run_agent_bg( task_id: UUID, user_id: UUID, user_message: str, image_variant: str = "", video_variant: str = "", + scheduled: bool = False, ) -> None: """工作线程:`build_agent(resume=True)` → 装 WebEventSink + cancel_check → `agent.run` → 写 tasks.run_status。 @@ -372,6 +373,7 @@ def _run_agent_bg( image_variant=image_variant, video_variant=video_variant, cancel_check=cancel_check, + scheduled_run=scheduled, ) agent.sink = WebEventSink(broker, task_id) agent.run(user_message) @@ -510,6 +512,12 @@ class TaskPatchRequest(BaseModel): model_profile: Optional[str] = None # 切模型(c 模式 task 层 / A 粒度 — 下条 send 生效) +class SchedulePatchRequest(BaseModel): + # 前端只读视图仅用 enabled(停用/启用);其余字段留着供"对话改不了时"的兜底直改, + # 但前端不暴露编辑表单(建/改走对话,§8.5)。 + enabled: Optional[bool] = None + + class MessageRequest(BaseModel): content: str # 该条消息触发的生图 / 生视频模型 variant key(config/media/doubao.yaml image/video 段)。 @@ -691,6 +699,154 @@ def create_app() -> FastAPI: stats_logger_task = asyncio.create_task(_stats_logger(), name="stats-logger") + # ── 定时任务守护循环(§8.5)── 仿 _disk_scanner 的 plain-asyncio 范式,不引 + # APScheduler/Celery。每 ~30s 认领到点 job(claim+advance next_run 防重复触发), + # 复用 _run_agent_bg 起 run,跑完确定性兜底投递 + 回写 last_*。ZCBOT_DISABLE_SCHEDULER=1 + # 整体关掉(对照 Claude Code CLAUDE_CODE_DISABLE_CRON)。 + scheduler_enabled = os.getenv("ZCBOT_DISABLE_SCHEDULER", "").strip() not in ("1", "true", "yes") + sched_tick = int(os.getenv("ZCBOT_SCHEDULER_TICK_SECONDS", "30") or "30") + sched_sema = asyncio.Semaphore(int(os.getenv("ZCBOT_SCHEDULER_CONCURRENCY", "4") or "4")) + + async def _execute_scheduled_job(snap: dict) -> None: + """认领后跑一个 job:解析目标 task → 抢 run 锁 → _run_agent_bg → 投递 + 记账。""" + from core.agent_builder import ( + resolve_workspace, working_dir_from_name, validate_task_name, InvalidTaskName, + ) + from core.scheduler import build_run_message, deliver_notify, record_result + from core.storage.utils import ensure_local_task_row + + job_id = snap["job_id"] + uid = snap["user_id"] + async with sched_sema: + try: + profile, model_id = _resolve_model_profile(snap.get("model_profile") or "") + ws = resolve_workspace(None, _cfg) + # 目标 task:persistent 用绑定 task(缺则新建并回填);isolated 用稳定 per-job 目录 + tid: Optional[UUID] = None + if snap["mode"] == "persistent" and snap.get("bound_task_id"): + tid = snap["bound_task_id"] + # 绑定 task 可能已被删(SET NULL 已处理 None;这里再查实在性) + with session_scope() as s: + exists = s.execute( + select(Task.task_id).where( + Task.task_id == tid, Task.deleted_at.is_(None) + ) + ).first() + if exists is None: + tid = None + if tid is None: + tid = uuid4() + wd_name = f"scheduled-{str(job_id)[:8]}" + fs_dir = working_dir_from_name(ws, uid, wd_name) + fs_dir.mkdir(parents=True, exist_ok=True) + disp = f"{snap['name']}" + try: + disp = validate_task_name(disp) + except InvalidTaskName: + disp = wd_name # 名字含非法字符 → 退到安全名 + ensure_local_task_row( + task_id=tid, name=disp, working_dir=to_db_path(fs_dir), + skill=snap.get("skill") or "", user_id=uid, + model=model_id, model_profile=profile, + description="(定时任务自动创建)", + ) + if snap["mode"] == "persistent": + with session_scope() as s: + s.execute(update(ScheduledJob).where( + ScheduledJob.job_id == job_id + ).values(bound_task_id=tid)) + + # 抢 run 锁(同 post_message):busy → 本次跳过,下个 cron 点再来 + with session_scope() as s: + row = s.execute( + select(Task.run_status).where(Task.task_id == tid).with_for_update() + ).first() + if row is None: + record_result(job_id, status="error", task_id=tid, error="目标 task 不存在") + return + if row.run_status in ("running", "cancelling"): + record_result(job_id, status="skipped", task_id=tid, + error="目标 task 正忙,本次跳过") + print(f"[scheduler] job {str(job_id)[:8]} skipped (task busy)") + return + s.execute(update(Task).where(Task.task_id == tid).values( + run_status="running", run_error=None)) + + message = build_run_message(snap) + broker.start(tid) + runner = asyncio.create_task(asyncio.to_thread( + _run_agent_bg, tid, uid, message, "", "", True, + )) + app.state.inflight[runner] = tid + runner.add_done_callback(lambda t: app.state.inflight.pop(t, None)) + + timeout = int(snap.get("timeout_seconds") or 0) + if timeout > 0: + done, _pending = await asyncio.wait({runner}, timeout=timeout) + if not done: + broker.request_cancel(tid) # 协作式停;loop 在 chunk 间 poll 到即退 + print(f"[scheduler] job {str(job_id)[:8]} timed out ({timeout}s), cancelling") + await runner + else: + await runner + + # run 终态:_run_agent_bg 收尾把 run_status 写回 idle(ok)/error + with session_scope() as s: + st = s.execute( + select(Task.run_status, Task.run_error).where(Task.task_id == tid) + ).first() + if st is not None and st.run_status == "error": + record_result(job_id, status="error", task_id=tid, error=st.run_error) + print(f"[scheduler] job {str(job_id)[:8]} run error: {st.run_error}") + return + + # 第 3 层确定性兜底投递(notify);失败不影响 run 已成功这一事实 + if snap.get("notify"): + try: + with session_scope() as s: + wd_db = s.execute( + select(Task.working_dir).where(Task.task_id == tid) + ).scalar_one_or_none() + fs_dir = from_db_path(wd_db) if wd_db else ws + await asyncio.get_running_loop().run_in_executor( + None, lambda: deliver_notify( + snap["notify"], job_name=snap["name"], + working_dir=fs_dir, tz=snap["tz"], + ) + ) + except Exception as e: + print(f"[scheduler] job {str(job_id)[:8]} notify failed: {type(e).__name__}: {e}") + record_result(job_id, status="ok", task_id=tid) + print(f"[scheduler] job {str(job_id)[:8]} '{snap['name']}' done") + except Exception as e: + print(f"[scheduler] job {str(job_id)[:8]} crashed: {type(e).__name__}: {e}") + try: + record_result(job_id, status="error", task_id=None, error=f"{type(e).__name__}: {e}") + except Exception: + pass + + async def _scheduler_loop() -> None: + from core.scheduler import claim_due_jobs + loop = asyncio.get_running_loop() + while True: + try: + await asyncio.sleep(sched_tick) + if getattr(app.state, "draining", None) is not None and app.state.draining.is_set(): + continue # 关停 drain 期不起新 job + due = await loop.run_in_executor(None, claim_due_jobs) + for snap in due: + asyncio.create_task(_execute_scheduled_job(snap)) + if due: + print(f"[scheduler] fired {len(due)} job(s)") + except asyncio.CancelledError: + raise + except Exception as e: + print(f"[scheduler] loop error: {type(e).__name__}: {e}") + + scheduler_task = asyncio.create_task(_scheduler_loop(), name="scheduler") if scheduler_enabled else None + if scheduler_enabled: + print(f"[scheduler] enabled (tick={sched_tick}s)") + # Sandbox pool(§7.5):仅当 ZCBOT_SANDBOX_BACKEND=docker 时启用。 # 启动钩子:① init_pool(创建 docker network + pool 实例)② shutdown_all 清 # 前驱孤儿(上次进程留下的 zcbot-sandbox-* 容器,内存 _last_active 为空, @@ -782,6 +938,12 @@ def create_app() -> FastAPI: await stats_logger_task except (asyncio.CancelledError, Exception): pass + if scheduler_task is not None: + scheduler_task.cancel() + try: + await scheduler_task + except (asyncio.CancelledError, Exception): + pass if sandbox_reaper_task is not None: sandbox_reaper_task.cancel() try: @@ -1329,6 +1491,37 @@ def create_app() -> FastAPI: raise HTTPException(404, f"memory file not found: {filename!r}") return {"filename": filename, "content": content} + # ───────────── 定时任务(DESIGN §8.5)───────────── + # 前端只读展示 + 停用/删除两个便捷动作;建/改全走对话(schedule_* 工具)。 + # 与对话工具共用 core.scheduler 服务层,两条路径不漂移。 + @app.get("/v1/schedules", tags=["schedules"]) + def list_schedules(user_id: UUID = Depends(require_user)): + """列当前用户的定时任务(只读)。前端「定时」面板一次拉满。""" + from core import scheduler + return {"results": scheduler.list_jobs(user_id)} + + @app.patch("/v1/schedules/{job_id}", tags=["schedules"]) + def patch_schedule( + job_id: str, body: SchedulePatchRequest, user_id: UUID = Depends(require_user), + ): + """改定时任务 —— 前端只用来停用/启用(enabled)。其余编辑走对话。""" + from core import scheduler + if body.enabled is None: + raise HTTPException(400, "no fields to update") + try: + return scheduler.set_enabled(user_id, job_id, body.enabled) + except scheduler.JobError as e: + raise HTTPException(404, str(e)) + + @app.delete("/v1/schedules/{job_id}", status_code=204, tags=["schedules"]) + def delete_schedule(job_id: str, user_id: UUID = Depends(require_user)): + """删定时任务(软删,立即停止触发)。""" + from core import scheduler + try: + scheduler.cancel_job(user_id, job_id) + except scheduler.JobError as e: + raise HTTPException(404, str(e)) + @app.delete("/v1/tasks/{task_id}", status_code=204, tags=["tasks"]) def delete_task(task_id: str, user_id: UUID = Depends(require_user)): """软删除:置 deleted_at=now(),从任务列表隐藏。 diff --git a/web/static/dev.html b/web/static/dev.html index c07ff81..0ed293c 100644 --- a/web/static/dev.html +++ b/web/static/dev.html @@ -292,6 +292,42 @@ .sk-pane { width: auto; max-height: 26vh; border-right: none; border-bottom: 1px solid var(--border); } } + /* 定时任务 modal(只读 + 停用/删除,DESIGN §8.5)— 复用 .sk-item/.sk-badge/.sk-empty */ + #crons-modal .card { + width: 880px; max-width: 94vw; height: 78vh; max-height: 78vh; + display: flex; flex-direction: column; + } + #crons-modal h3 { + margin: 0; padding: 12px 16px; font-size: 16px; + border-bottom: 1px solid var(--border); display: flex; align-items: center; gap: 8px; + } + #crons-modal h3 .spacer { flex: 1; } + #crons-modal h3 svg { opacity: .85; } + #crons-modal .cr-hint { font-size: 11px; font-weight: 400; color: var(--muted); } + #crons-modal .sk-x { + border: none; background: transparent; font-size: 16px; + cursor: pointer; color: var(--muted); padding: 2px 6px; + } + #cr-cols { flex: 1; display: flex; min-height: 0; } + #cr-list { width: 300px; flex-shrink: 0; overflow: auto; padding: 12px; border-right: 1px solid var(--border); } + #cr-detail { flex: 1; min-width: 0; overflow: auto; padding: 16px 20px; } + .cr-sched { font-size: 12px; color: var(--accent); margin-top: 2px; } + .cr-meta { font-size: 11px; color: var(--muted); margin-top: 3px; } + .cr-st { font-size: 10px; font-weight: 500; border-radius: 8px; padding: 0 6px; white-space: nowrap; } + .cr-st.ok { color: #2a8a4a; border: 1px solid #2a8a4a; } + .cr-st.error { color: #c0392b; border: 1px solid #c0392b; } + .cr-st.paused { color: var(--muted); border: 1px solid var(--border); } + .cr-d-row { display: flex; gap: 8px; padding: 7px 0; border-bottom: 1px solid var(--border); font-size: 13px; } + .cr-d-row .k { width: 84px; flex-shrink: 0; color: var(--muted); } + .cr-d-row .v { flex: 1; min-width: 0; word-break: break-word; white-space: pre-wrap; } + .cr-acts { display: flex; gap: 8px; margin-top: 16px; flex-wrap: wrap; } + @media (max-width: 760px) { + #crons-modal .card { width: 96vw; height: 88vh; max-height: 88vh; } + #cr-cols { flex-direction: column; } + #cr-list { width: auto; max-height: 30vh; border-right: none; border-bottom: 1px solid var(--border); } + #crons-modal .cr-hint { display: none; } + } + /* ───── 记忆查看 modal(只读两栏;改走对话)───── */ #memory-modal { z-index: 112; } #memory-modal .card { @@ -1230,6 +1266,22 @@ + + diff --git a/web/static/js/crons.js b/web/static/js/crons.js new file mode 100644 index 0000000..1c35af5 --- /dev/null +++ b/web/static/js/crons.js @@ -0,0 +1,163 @@ +// 定时任务 modal:只读两栏 master-detail(左列表 / 右详情),仅「停用/启用」「删除」 +// 两个便捷动作;新建 / 修改全走对话(schedule_* 工具,DESIGN §8.5)。 +// 左侧 rail 底部「定时」按钮触发。 +// 后端:GET /v1/schedules(列表)、PATCH /v1/schedules/{id}{enabled}(停用/启用)、 +// DELETE /v1/schedules/{id}(删除)。建/改无 REST —— 故意只读。 +import { $ } from "./dom.js"; +import { api } from "./api.js"; +import { escapeHtml } from "./format.js"; +import { selectTask } from "./chat.js"; + +const PLACEHOLDER = '
← 选一个定时任务查看详情
'; +let _jobs = []; + +function openCronsModal() { + $("crons-modal").classList.add("show"); + $("cr-detail").innerHTML = PLACEHOLDER; + renderList(); +} +export function closeCronsModal() { + $("crons-modal").classList.remove("show"); +} + +// UTC ISO → 浏览器本地短时刻(用户浏览器基本就在 CST,直观)。 +function ts(iso) { + if (!iso) return "—"; + const d = new Date(iso); + if (isNaN(d)) return "—"; + return d.toLocaleString("zh-CN", { + month: "numeric", day: "numeric", hour: "2-digit", minute: "2-digit", hour12: false, + }); +} + +function statusBadge(j) { + if (!j.enabled) return '已停用'; + if (j.last_status === "error") return '上次失败'; + if (j.last_status === "ok") return '正常'; + return '待运行'; +} + +function itemHtml(j) { + return `
+
${escapeHtml(j.name)} ${statusBadge(j)}
+
${escapeHtml(j.schedule_desc || j.cron)}
+
下次 ${ts(j.next_run_at)} · 上次 ${ts(j.last_run_at)}
+
`; +} + +async function renderList() { + const list = $("cr-list"); + list.innerHTML = '
加载中…
'; + let data; + try { + data = await api("GET", "/v1/schedules"); + } catch (e) { + list.innerHTML = `
加载失败: ${escapeHtml(e.message)}
`; + return; + } + _jobs = data.results || []; + if (!_jobs.length) { + list.innerHTML = + '
还没有定时任务。' + + '对助手说「每天早上八点用 brief skill 出份简报发我邮箱」即可创建。
'; + return; + } + const active = _jobs.filter((j) => j.enabled); + const paused = _jobs.filter((j) => !j.enabled); + let html = `
活跃 (${active.length})
`; + html += active.map(itemHtml).join("") || + '
(无)
'; + if (paused.length) { + html += `
已停用 (${paused.length})
`; + html += paused.map(itemHtml).join(""); + } + list.innerHTML = html; +} + +function row(k, v) { + return `
${k}${v}
`; +} + +function selectJob(id, itemEl) { + $("cr-list").querySelectorAll(".sk-item.active").forEach((el) => el.classList.remove("active")); + if (itemEl) itemEl.classList.add("active"); + const j = _jobs.find((x) => x.job_id === id); + if (!j) return; + + const notify = j.notify && j.notify.to + ? `必达邮件 → ${escapeHtml(j.notify.to)}` : "无(结果进任务线程 / 由指令自行投递)"; + const modeDesc = j.mode === "persistent" ? "持续(同一任务线程,有连续性)" : "独立(每次新建任务,省 token)"; + let rows = + row("排程", `${escapeHtml(j.schedule_desc || "")} (${escapeHtml(j.cron)} @${escapeHtml(j.tz)})`) + + row("模式", escapeHtml(modeDesc)) + + row("指令", escapeHtml(j.prompt)) + + (j.skill ? row("技能", escapeHtml(j.skill)) : "") + + row("通知", notify) + + row("下次触发", ts(j.next_run_at)) + + row("上次执行", `${ts(j.last_run_at)} · ${escapeHtml(j.last_status || "—")}`); + if (j.last_error) rows += row("上次错误", `${escapeHtml(j.last_error)}`); + rows += row("累计运行", `${j.run_count} 次` + (j.consecutive_failures ? ` · 连续失败 ${j.consecutive_failures}` : "")); + + const openTask = j.last_task_id + ? `` : ""; + $("cr-detail").innerHTML = + `
${escapeHtml(j.name)} ${statusBadge(j)}
` + + rows + + `
+ + + ${openTask} +
`; +} + +// ───── 顶层绑定 ───── +$("hd-crons").onclick = openCronsModal; +$("cr-close").onclick = closeCronsModal; +$("crons-modal").addEventListener("click", (e) => { + if (e.target.id === "crons-modal") closeCronsModal(); // 点遮罩关闭 +}); + +$("cr-list").addEventListener("click", (e) => { + const item = e.target.closest(".sk-item"); + if (item) selectJob(item.getAttribute("data-id"), item); +}); + +$("cr-detail").addEventListener("click", async (e) => { + const toggle = e.target.closest("[data-toggle]"); + const del = e.target.closest("[data-del]"); + const openTask = e.target.closest("[data-open-task]"); + + if (openTask) { + closeCronsModal(); + selectTask(openTask.getAttribute("data-open-task")); + return; + } + if (toggle) { + const id = toggle.getAttribute("data-toggle"); + const j = _jobs.find((x) => x.job_id === id); + toggle.disabled = true; + try { + await api("PATCH", "/v1/schedules/" + encodeURIComponent(id), { enabled: !j.enabled }); + await renderList(); + selectJob(id); + } catch (err) { + alert("操作失败: " + err.message); + toggle.disabled = false; + } + return; + } + if (del) { + const id = del.getAttribute("data-del"); + const j = _jobs.find((x) => x.job_id === id); + if (!confirm(`删除定时任务「${j ? j.name : id}」?不可撤销(停用可保留改用「停用」)。`)) return; + del.disabled = true; + try { + await api("DELETE", "/v1/schedules/" + encodeURIComponent(id)); + $("cr-detail").innerHTML = PLACEHOLDER; + await renderList(); + } catch (err) { + alert("删除失败: " + err.message); + del.disabled = false; + } + } +}); diff --git a/web/static/js/main.js b/web/static/js/main.js index 928b3e5..7f7d35d 100644 --- a/web/static/js/main.js +++ b/web/static/js/main.js @@ -8,6 +8,7 @@ import { api } from "./api.js"; import { closeChpwModal } from "./auth.js"; import { closeSkillsModal } from "./skills.js"; import { closeMemoryModal } from "./memory.js"; +import { closeCronsModal } from "./crons.js"; import { closeFilePreview, closeMiniPreview } from "./preview.js"; import { closeSrcPicker, loadFiles } from "./files.js"; import { loadFolderSuggestions } from "./newtask.js"; @@ -87,6 +88,7 @@ document.addEventListener("keydown", (e) => { if ($("chpw-modal").classList.contains("show")) { closeChpwModal(); return; } if ($("skills-modal").classList.contains("show")) { closeSkillsModal(); return; } if ($("memory-modal").classList.contains("show")) { closeMemoryModal(); return; } + if ($("crons-modal").classList.contains("show")) { closeCronsModal(); return; } if ($("mini-preview-modal").classList.contains("show")) { closeMiniPreview(); return; } if ($("src-picker-modal").classList.contains("show")) { closeSrcPicker(); return; } if ($("file-preview-modal").classList.contains("show")) { closeFilePreview(); return; }