zcbot/tools/schedule.py

203 lines
9.0 KiB
Python

"""定时任务对话工具(DESIGN §8.5 对话端)。
create / list / update / cancel —— 对话端的完整 CRUD。host-side,按 user_id 隔离
(ctor 注入,不信模型传的 id)。增删改查逻辑全在 core.scheduler 服务层,本文件只做
schema + 把结果/JobError 转成给模型看的文本。前端只读视图走 /v1/schedules REST,
同样调那一层 → 两条路径不漂移。
定时 run 内这几个工具不注册(防任务造任务,§8.5)。
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from core import scheduler
from core.scheduler import JobError, _tzinfo
from .base import Tool
_MODES = ("isolated", "persistent")
def _fmt_local_iso(iso: Optional[str], tz: str) -> str:
if not iso:
return ""
dt = datetime.fromisoformat(iso)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M")
class _UserScopedTool(Tool):
"""带 user_id 的 host-side 工具基类。"""
def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None:
super().__init__(base_dir=base_dir, user_root=user_root)
self.user_id = user_id
class ScheduleCreateTool(_UserScopedTool):
name = "schedule_create"
description = (
"Create a recurring scheduled task that runs an instruction automatically on a cron "
"schedule. The instruction (`prompt`) is what an agent will be told to do at each fire "
"time — write it self-contained and repeatable (which skill to use, what to produce, "
"where to deliver). To email a result, say so IN the prompt (the run can call send_email), "
"or set `notify_email` for guaranteed delivery of the latest output. ALWAYS confirm the "
"schedule (read back the cron in plain words + the resolved next run time) with the user "
"before relying on it. Times are wall-clock in `tz`."
)
parameters = {
"type": "object",
"properties": {
"name": {"type": "string", "description": "Short human label, e.g. '每日水泥简报'."},
"prompt": {
"type": "string",
"description": "Self-contained instruction to run each time (e.g. '用 brief skill 生成今日简报并存盘').",
},
"cron": {
"type": "string",
"description": "Standard 5-field cron 'min hour dom month dow'. E.g. '0 8 * * *' = daily 08:00; '0 9 * * 1' = Mondays 09:00.",
},
"tz": {"type": "string", "description": "IANA timezone, default 'Asia/Shanghai'."},
"mode": {
"type": "string",
"enum": list(_MODES),
"description": "isolated (default): fresh task each run, cheap, no cross-run memory. persistent: same task thread, keeps continuity (costs more tokens).",
},
"skill": {"type": "string", "description": "Optional skill name to preload for the run."},
"notify_email": {
"type": "string",
"description": "Optional email for guaranteed delivery of the run's latest artifact (deterministic, not dependent on the agent remembering).",
},
"timeout_seconds": {
"type": "integer",
"description": "Optional hard timeout for each run (0 = no limit).",
},
},
"required": ["name", "prompt", "cron"],
}
def execute(
self, name: str, prompt: str, cron: str, tz: str = "Asia/Shanghai",
mode: str = "isolated", skill: str = "", notify_email: str = "",
timeout_seconds: int = 0,
) -> str:
notify = None
if notify_email and notify_email.strip():
notify = {"channel": "email", "to": notify_email.strip()}
try:
job = scheduler.create_job(
self.user_id, name=name, prompt=prompt, cron=cron, tz=tz, mode=mode,
skill=skill, notify=notify, timeout_seconds=timeout_seconds,
)
except JobError as e:
return f"[Error] {e}"
nf = f";产物将发到 {notify['to']}" if notify else ""
return (
f"[ok] 已创建定时任务「{job['name']}」(id={job['short_id']},mode={job['mode']}{nf})。\n"
f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});"
f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}"
)
class ScheduleListTool(_UserScopedTool):
name = "schedule_list"
description = (
"List the current user's scheduled tasks with id, schedule, next run time and last "
"status. Use when the user asks what scheduled tasks they have."
)
parameters = {"type": "object", "properties": {}}
def execute(self) -> str:
jobs = scheduler.list_jobs(self.user_id)
if not jobs:
return "(没有定时任务)"
lines = ["定时任务列表:"]
for j in jobs:
flag = "" if j["enabled"] else " [已停用]"
nxt = _fmt_local_iso(j["next_run_at"], j["tz"])
lines.append(
f"- {j['short_id']}{j['name']}{flag} | {j['schedule_desc']} | "
f"{j['mode']} | 下次 {nxt} | 上次 {j['last_status'] or ''}"
)
return "\n".join(lines)
class ScheduleUpdateTool(_UserScopedTool):
name = "schedule_update"
description = (
"Edit an existing scheduled task by id (8-char short id from schedule_list is accepted). "
"Pass ONLY the fields to change. Use to reschedule (cron), rewrite the instruction "
"(prompt), pause/resume (enabled), change delivery (notify_email), etc. Changing cron/tz "
"recomputes the next run time. Confirm the change (read back new schedule) with the user."
)
parameters = {
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."},
"name": {"type": "string", "description": "New label."},
"prompt": {"type": "string", "description": "New instruction to run."},
"cron": {"type": "string", "description": "New 5-field cron expression."},
"tz": {"type": "string", "description": "New IANA timezone."},
"mode": {"type": "string", "enum": list(_MODES), "description": "New session mode."},
"skill": {"type": "string", "description": "New preload skill (empty string to clear)."},
"enabled": {"type": "boolean", "description": "true=resume, false=pause."},
"notify_email": {
"type": "string",
"description": "Set guaranteed-delivery email; empty string clears it.",
},
"timeout_seconds": {"type": "integer", "description": "New per-run timeout (0=none)."},
},
"required": ["job_id"],
}
def execute(
self, job_id: str, name: Optional[str] = None, prompt: Optional[str] = None,
cron: Optional[str] = None, tz: Optional[str] = None, mode: Optional[str] = None,
skill: Optional[str] = None, enabled: Optional[bool] = None,
notify_email: Optional[str] = None, timeout_seconds: Optional[int] = None,
) -> str:
fields: dict = {}
for k, v in (("name", name), ("prompt", prompt), ("cron", cron), ("tz", tz),
("mode", mode), ("skill", skill), ("enabled", enabled),
("timeout_seconds", timeout_seconds)):
if v is not None:
fields[k] = v
# notify_email 是便捷字段 → 转 notify dict;传空串 = 清除兜底投递
if notify_email is not None:
fields["notify"] = {"channel": "email", "to": notify_email.strip()} if notify_email.strip() else {}
try:
job = scheduler.update_job(self.user_id, job_id, **fields)
except JobError as e:
return f"[Error] {e}"
state = "启用" if job["enabled"] else "已停用"
return (
f"[ok] 已更新定时任务「{job['name']}」(id={job['short_id']},{state})。\n"
f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});"
f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}"
)
class ScheduleCancelTool(_UserScopedTool):
name = "schedule_cancel"
description = (
"Cancel (soft-delete) a scheduled task by id (8-char short id accepted). It stops firing "
"immediately. To merely pause (keep it for later), use schedule_update enabled=false instead."
)
parameters = {
"type": "object",
"properties": {
"job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."},
},
"required": ["job_id"],
}
def execute(self, job_id: str) -> str:
try:
job = scheduler.cancel_job(self.user_id, job_id)
except JobError as e:
return f"[Error] {e}"
return f"[ok] 已取消定时任务「{job['name']}」(id={job['short_id']})"