203 lines
9.0 KiB
Python
203 lines
9.0 KiB
Python
"""定时任务对话工具(DESIGN §8.5 对话端)。
|
|
|
|
create / list / update / cancel —— 对话端的完整 CRUD。host-side,按 user_id 隔离
|
|
(ctor 注入,不信模型传的 id)。增删改查逻辑全在 core.scheduler 服务层,本文件只做
|
|
schema + 把结果/JobError 转成给模型看的文本。前端只读视图走 /v1/schedules REST,
|
|
同样调那一层 → 两条路径不漂移。
|
|
|
|
定时 run 内这几个工具不注册(防任务造任务,§8.5)。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from core import scheduler
|
|
from core.scheduler import JobError, _tzinfo
|
|
from .base import Tool
|
|
|
|
_MODES = ("isolated", "persistent")
|
|
|
|
|
|
def _fmt_local_iso(iso: Optional[str], tz: str) -> str:
|
|
if not iso:
|
|
return "—"
|
|
dt = datetime.fromisoformat(iso)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(_tzinfo(tz)).strftime("%Y-%m-%d %H:%M")
|
|
|
|
|
|
class _UserScopedTool(Tool):
|
|
"""带 user_id 的 host-side 工具基类。"""
|
|
|
|
def __init__(self, user_id: UUID, base_dir=None, user_root=None) -> None:
|
|
super().__init__(base_dir=base_dir, user_root=user_root)
|
|
self.user_id = user_id
|
|
|
|
|
|
class ScheduleCreateTool(_UserScopedTool):
|
|
name = "schedule_create"
|
|
description = (
|
|
"Create a recurring scheduled task that runs an instruction automatically on a cron "
|
|
"schedule. The instruction (`prompt`) is what an agent will be told to do at each fire "
|
|
"time — write it self-contained and repeatable (which skill to use, what to produce, "
|
|
"where to deliver). To email a result, say so IN the prompt (the run can call send_email), "
|
|
"or set `notify_email` for guaranteed delivery of the latest output. ALWAYS confirm the "
|
|
"schedule (read back the cron in plain words + the resolved next run time) with the user "
|
|
"before relying on it. Times are wall-clock in `tz`."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "Short human label, e.g. '每日水泥简报'."},
|
|
"prompt": {
|
|
"type": "string",
|
|
"description": "Self-contained instruction to run each time (e.g. '用 brief skill 生成今日简报并存盘').",
|
|
},
|
|
"cron": {
|
|
"type": "string",
|
|
"description": "Standard 5-field cron 'min hour dom month dow'. E.g. '0 8 * * *' = daily 08:00; '0 9 * * 1' = Mondays 09:00.",
|
|
},
|
|
"tz": {"type": "string", "description": "IANA timezone, default 'Asia/Shanghai'."},
|
|
"mode": {
|
|
"type": "string",
|
|
"enum": list(_MODES),
|
|
"description": "isolated (default): fresh task each run, cheap, no cross-run memory. persistent: same task thread, keeps continuity (costs more tokens).",
|
|
},
|
|
"skill": {"type": "string", "description": "Optional skill name to preload for the run."},
|
|
"notify_email": {
|
|
"type": "string",
|
|
"description": "Optional email for guaranteed delivery of the run's latest artifact (deterministic, not dependent on the agent remembering).",
|
|
},
|
|
"timeout_seconds": {
|
|
"type": "integer",
|
|
"description": "Optional hard timeout for each run (0 = no limit).",
|
|
},
|
|
},
|
|
"required": ["name", "prompt", "cron"],
|
|
}
|
|
|
|
def execute(
|
|
self, name: str, prompt: str, cron: str, tz: str = "Asia/Shanghai",
|
|
mode: str = "isolated", skill: str = "", notify_email: str = "",
|
|
timeout_seconds: int = 0,
|
|
) -> str:
|
|
notify = None
|
|
if notify_email and notify_email.strip():
|
|
notify = {"channel": "email", "to": notify_email.strip()}
|
|
try:
|
|
job = scheduler.create_job(
|
|
self.user_id, name=name, prompt=prompt, cron=cron, tz=tz, mode=mode,
|
|
skill=skill, notify=notify, timeout_seconds=timeout_seconds,
|
|
)
|
|
except JobError as e:
|
|
return f"[Error] {e}"
|
|
nf = f";产物将发到 {notify['to']}" if notify else ""
|
|
return (
|
|
f"[ok] 已创建定时任务「{job['name']}」(id={job['short_id']},mode={job['mode']}{nf})。\n"
|
|
f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});"
|
|
f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}。"
|
|
)
|
|
|
|
|
|
class ScheduleListTool(_UserScopedTool):
|
|
name = "schedule_list"
|
|
description = (
|
|
"List the current user's scheduled tasks with id, schedule, next run time and last "
|
|
"status. Use when the user asks what scheduled tasks they have."
|
|
)
|
|
parameters = {"type": "object", "properties": {}}
|
|
|
|
def execute(self) -> str:
|
|
jobs = scheduler.list_jobs(self.user_id)
|
|
if not jobs:
|
|
return "(没有定时任务)"
|
|
lines = ["定时任务列表:"]
|
|
for j in jobs:
|
|
flag = "" if j["enabled"] else " [已停用]"
|
|
nxt = _fmt_local_iso(j["next_run_at"], j["tz"])
|
|
lines.append(
|
|
f"- {j['short_id']} 「{j['name']}」{flag} | {j['schedule_desc']} | "
|
|
f"{j['mode']} | 下次 {nxt} | 上次 {j['last_status'] or '—'}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
class ScheduleUpdateTool(_UserScopedTool):
|
|
name = "schedule_update"
|
|
description = (
|
|
"Edit an existing scheduled task by id (8-char short id from schedule_list is accepted). "
|
|
"Pass ONLY the fields to change. Use to reschedule (cron), rewrite the instruction "
|
|
"(prompt), pause/resume (enabled), change delivery (notify_email), etc. Changing cron/tz "
|
|
"recomputes the next run time. Confirm the change (read back new schedule) with the user."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."},
|
|
"name": {"type": "string", "description": "New label."},
|
|
"prompt": {"type": "string", "description": "New instruction to run."},
|
|
"cron": {"type": "string", "description": "New 5-field cron expression."},
|
|
"tz": {"type": "string", "description": "New IANA timezone."},
|
|
"mode": {"type": "string", "enum": list(_MODES), "description": "New session mode."},
|
|
"skill": {"type": "string", "description": "New preload skill (empty string to clear)."},
|
|
"enabled": {"type": "boolean", "description": "true=resume, false=pause."},
|
|
"notify_email": {
|
|
"type": "string",
|
|
"description": "Set guaranteed-delivery email; empty string clears it.",
|
|
},
|
|
"timeout_seconds": {"type": "integer", "description": "New per-run timeout (0=none)."},
|
|
},
|
|
"required": ["job_id"],
|
|
}
|
|
|
|
def execute(
|
|
self, job_id: str, name: Optional[str] = None, prompt: Optional[str] = None,
|
|
cron: Optional[str] = None, tz: Optional[str] = None, mode: Optional[str] = None,
|
|
skill: Optional[str] = None, enabled: Optional[bool] = None,
|
|
notify_email: Optional[str] = None, timeout_seconds: Optional[int] = None,
|
|
) -> str:
|
|
fields: dict = {}
|
|
for k, v in (("name", name), ("prompt", prompt), ("cron", cron), ("tz", tz),
|
|
("mode", mode), ("skill", skill), ("enabled", enabled),
|
|
("timeout_seconds", timeout_seconds)):
|
|
if v is not None:
|
|
fields[k] = v
|
|
# notify_email 是便捷字段 → 转 notify dict;传空串 = 清除兜底投递
|
|
if notify_email is not None:
|
|
fields["notify"] = {"channel": "email", "to": notify_email.strip()} if notify_email.strip() else {}
|
|
try:
|
|
job = scheduler.update_job(self.user_id, job_id, **fields)
|
|
except JobError as e:
|
|
return f"[Error] {e}"
|
|
state = "启用" if job["enabled"] else "已停用"
|
|
return (
|
|
f"[ok] 已更新定时任务「{job['name']}」(id={job['short_id']},{state})。\n"
|
|
f"{job['schedule_desc']}(cron={job['cron']} @{job['tz']});"
|
|
f"下次触发 {_fmt_local_iso(job['next_run_at'], job['tz'])}。"
|
|
)
|
|
|
|
|
|
class ScheduleCancelTool(_UserScopedTool):
|
|
name = "schedule_cancel"
|
|
description = (
|
|
"Cancel (soft-delete) a scheduled task by id (8-char short id accepted). It stops firing "
|
|
"immediately. To merely pause (keep it for later), use schedule_update enabled=false instead."
|
|
)
|
|
parameters = {
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {"type": "string", "description": "Job id (full UUID or 8-char short id)."},
|
|
},
|
|
"required": ["job_id"],
|
|
}
|
|
|
|
def execute(self, job_id: str) -> str:
|
|
try:
|
|
job = scheduler.cancel_job(self.user_id, job_id)
|
|
except JobError as e:
|
|
return f"[Error] {e}"
|
|
return f"[ok] 已取消定时任务「{job['name']}」(id={job['short_id']})"
|