"""定时任务对话工具(DESIGN §8.5 对话端)。 create / list / update / cancel —— 对话端的完整 CRUD。host-side,按 user_id 隔离 (ctor 注入,不信模型传的 id)。增删改查逻辑全在 core.scheduler 服务层,本文件只做 schema + 把结果/JobError 转成给模型看的文本。前端只读视图走 /v1/schedules REST, 同样调那一层 → 两条路径不漂移。 定时 run 内这几个工具不注册(防任务造任务,§8.5)。 """ from __future__ import annotations from datetime import datetime, timezone from typing import Optional from uuid import UUID from core import scheduler from core.scheduler import JobError, _tzinfo from .base import Tool _MODES = ("isolated", "persistent") def _fmt_local_iso(iso: Optional[str], tz: str) -> str: if not iso: return "—" dt = datetime.fromisoformat(iso) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M") class _UserScopedTool(Tool): """带 user_id 的 host-side 工具基类。""" def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None: super().__init__(base_dir=base_dir, user_root=user_root) self.user_id = user_id class ScheduleCreateTool(_UserScopedTool): name = "schedule_create" description = ( "Create a recurring scheduled task that runs an instruction automatically on a cron " "schedule. The instruction (`prompt`) is what an agent will be told to do at each fire " "time — write it self-contained and repeatable (which skill to use, what to produce, " "where to deliver). To email a result, say so IN the prompt (the run can call send_email), " "or set `notify_email` for guaranteed delivery of the latest output. ALWAYS confirm the " "schedule (read back the cron in plain words + the resolved next run time) with the user " "before relying on it. Times are wall-clock in `tz`." ) parameters = { "type": "object", "properties": { "name": {"type": "string", "description": "Short human label, e.g. '每日水泥简报'."}, "prompt": { "type": "string", "description": "Self-contained instruction to run each time (e.g. '用 brief skill 生成今日简报并存盘').", }, "cron": { "type": "string", "description": "Standard 5-field cron 'min hour dom month dow'. E.g. '0 8 * * *' = daily 08:00; '0 9 * * 1' = Mondays 09:00.", }, "tz": {"type": "string", "description": "IANA timezone, default 'Asia/Shanghai'."}, "mode": { "type": "string", "enum": list(_MODES), "description": "isolated (default): fresh task each run, cheap, no cross-run memory. persistent: same task thread, keeps continuity (costs more tokens).", }, "skill": {"type": "string", "description": "Optional skill name to preload for the run."}, "notify_email": { "type": "string", "description": "Optional email for guaranteed delivery of the run's latest artifact (deterministic, not dependent on the agent remembering).", }, "timeout_seconds": { "type": "integer", "description": ( "Optional hard timeout for each run in seconds (default 1800 = 30min; " "0 = no limit). Raise it for heavy report jobs (multi-journal search + " "docx render) that legitimately need longer." ), }, }, "required": ["name", "prompt", "cron"], } def execute( self, name: str, prompt: str, cron: str, tz: str = "Asia/Shanghai", mode: str = "isolated", skill: str = "", notify_email: str = "", timeout_seconds: int = scheduler.DEFAULT_TIMEOUT_SECONDS, ) -> str: notify = None if notify_email and notify_email.strip(): notify = {"channel": "email", "to": notify_email.strip()} try: job = scheduler.create_job( self.user_id, name=name, prompt=prompt, cron=cron, tz=tz, mode=mode, skill=skill, notify=notify, timeout_seconds=timeout_seconds, ) except JobError as e: return f"[Error] {e}" nf = f";产物将发到 {notify['to']}" if notify else "" return ( f"[ok] 已创建定时任务「{job['name']}」(id={job['short_id']},mode={job['mode']}{nf})。\n" f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});" f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}。" ) class ScheduleListTool(_UserScopedTool): name = "schedule_list" description = ( "List the current user's scheduled tasks with id, schedule, next run time and last " "status. Use when the user asks what scheduled tasks they have." ) parameters = {"type": "object", "properties": {}} def execute(self) -> str: jobs = scheduler.list_jobs(self.user_id) if not jobs: return "(没有定时任务)" lines = ["定时任务列表:"] for j in jobs: flag = "" if j["enabled"] else " [已停用]" nxt = _fmt_local_iso(j["next_run_at"], j["tz"]) lines.append( f"- {j['short_id']} 「{j['name']}」{flag} | {j['schedule_desc']} | " f"{j['mode']} | 下次 {nxt} | 上次 {j['last_status'] or '—'}" ) return "\n".join(lines) class ScheduleUpdateTool(_UserScopedTool): name = "schedule_update" description = ( "Edit an existing scheduled task by id (8-char short id from schedule_list is accepted). " "Pass ONLY the fields to change. Use to reschedule (cron), rewrite the instruction " "(prompt), pause/resume (enabled), change delivery (notify_email), etc. Changing cron/tz " "recomputes the next run time. Confirm the change (read back new schedule) with the user." ) parameters = { "type": "object", "properties": { "job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."}, "name": {"type": "string", "description": "New label."}, "prompt": {"type": "string", "description": "New instruction to run."}, "cron": {"type": "string", "description": "New 5-field cron expression."}, "tz": {"type": "string", "description": "New IANA timezone."}, "mode": {"type": "string", "enum": list(_MODES), "description": "New session mode."}, "skill": {"type": "string", "description": "New preload skill (empty string to clear)."}, "enabled": {"type": "boolean", "description": "true=resume, false=pause."}, "notify_email": { "type": "string", "description": "Set guaranteed-delivery email; empty string clears it.", }, "timeout_seconds": {"type": "integer", "description": "New per-run timeout (0=none)."}, }, "required": ["job_id"], } def execute( self, job_id: str, name: Optional[str] = None, prompt: Optional[str] = None, cron: Optional[str] = None, tz: Optional[str] = None, mode: Optional[str] = None, skill: Optional[str] = None, enabled: Optional[bool] = None, notify_email: Optional[str] = None, timeout_seconds: Optional[int] = None, ) -> str: fields: dict = {} for k, v in (("name", name), ("prompt", prompt), ("cron", cron), ("tz", tz), ("mode", mode), ("skill", skill), ("enabled", enabled), ("timeout_seconds", timeout_seconds)): if v is not None: fields[k] = v # notify_email 是便捷字段 → 转 notify dict;传空串 = 清除兜底投递 if notify_email is not None: fields["notify"] = {"channel": "email", "to": notify_email.strip()} if notify_email.strip() else {} try: job = scheduler.update_job(self.user_id, job_id, **fields) except JobError as e: return f"[Error] {e}" state = "启用" if job["enabled"] else "已停用" return ( f"[ok] 已更新定时任务「{job['name']}」(id={job['short_id']},{state})。\n" f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});" f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}。" ) class ScheduleCancelTool(_UserScopedTool): name = "schedule_cancel" description = ( "Cancel (soft-delete) a scheduled task by id (8-char short id accepted). It stops firing " "immediately. To merely pause (keep it for later), use schedule_update enabled=false instead." ) parameters = { "type": "object", "properties": { "job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."}, }, "required": ["job_id"], } def execute(self, job_id: str) -> str: try: job = scheduler.cancel_job(self.user_id, job_id) except JobError as e: return f"[Error] {e}" return f"[ok] 已取消定时任务「{job['name']}」(id={job['short_id']})"