157 lines
5.9 KiB
Python
157 lines
5.9 KiB
Python
"""Smoke: 定时任务守护循环端到端(DESIGN §8.5)。
|
|
|
|
跑法(**需先在另一个终端起 web 服务** `.venv/Scripts/python.exe main.py web`):
|
|
.venv/Scripts/python.exe scripts/smoke_scheduler.py [--email a@b.com]
|
|
|
|
干什么:
|
|
1. 给某用户(默认 DB 第一个 / --email 指定)插一条 isolated 定时任务,
|
|
prompt 是"回一句早安、不调工具",并把 next_run_at 改成现在 → 让守护循环下一 tick 就认领。
|
|
2. 轮询 scheduled_jobs.last_status 直到翻成 ok/error/skipped(超时 180s)。
|
|
3. ok 则打印它新建的 task_id + agent 实际回复片段,证明全链路(认领→建 task→
|
|
_run_agent_bg→LLM→回写 run_status→record_result)走通。
|
|
4. 收尾软删该 job(留下 task 供查看)。
|
|
|
|
**会真的发起一次 LLM 调用**(一句短回复,费用可忽略)。不测邮件 —— notify 投递另行验。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from uuid import UUID
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
# Windows 控制台默认 GBK,打印中文会乱码 → 强制 stdout UTF-8(只影响本脚本打印)
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
|
|
except Exception:
|
|
pass
|
|
|
|
# 读 .env
|
|
env_file = ROOT / ".env"
|
|
if env_file.exists():
|
|
for line in env_file.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, _, v = line.partition("=")
|
|
os.environ.setdefault(k.strip(), v.strip())
|
|
|
|
from sqlalchemy import select, update
|
|
|
|
from core import scheduler
|
|
from core.storage import session_scope
|
|
from core.storage.models import Message, ScheduledJob, Task, User
|
|
|
|
POLL_TIMEOUT = 180 # 秒
|
|
POLL_INTERVAL = 3
|
|
PROMPT = "请直接回复一句『早安,今天也加油!』。不要调用任何工具,不要创建文件,不要做其它事。"
|
|
|
|
|
|
def _pick_user(email: str | None) -> UUID | None:
|
|
with session_scope() as s:
|
|
if email:
|
|
return s.execute(select(User.user_id).where(User.email == email)).scalar_one_or_none()
|
|
return s.execute(select(User.user_id).order_by(User.created_at).limit(1)).scalar_one_or_none()
|
|
|
|
|
|
def _last_assistant_text(task_id: UUID) -> str:
|
|
"""取该 task 最后一条 assistant 文本(payload JSONB)。"""
|
|
with session_scope() as s:
|
|
rows = s.execute(
|
|
select(Message.payload).where(Message.task_id == task_id)
|
|
.order_by(Message.idx.desc()).limit(20)
|
|
).scalars().all()
|
|
for payload in rows:
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
if payload.get("role") != "assistant":
|
|
continue
|
|
c = payload.get("content")
|
|
if isinstance(c, str) and c.strip():
|
|
return c.strip()
|
|
if isinstance(c, list): # 富内容块
|
|
for blk in c:
|
|
if isinstance(blk, dict) and isinstance(blk.get("text"), str) and blk["text"].strip():
|
|
return blk["text"].strip()
|
|
return "(未找到 assistant 文本)"
|
|
|
|
|
|
def main() -> int:
|
|
email = None
|
|
if "--email" in sys.argv:
|
|
i = sys.argv.index("--email")
|
|
email = sys.argv[i + 1] if i + 1 < len(sys.argv) else None
|
|
|
|
uid = _pick_user(email)
|
|
if uid is None:
|
|
print("[FAIL] DB 里没有用户(或 --email 未匹配)。先 main.py user add。")
|
|
return 1
|
|
print(f"[..] 用户 {str(uid)[:8]} prompt={PROMPT[:24]}...")
|
|
|
|
# 1) 建 job(cron 随便给个合法值,马上覆盖 next_run_at 为现在)
|
|
job = scheduler.create_job(
|
|
uid, name="[smoke] 早安测试", prompt=PROMPT, cron="*/5 * * * *", mode="isolated",
|
|
)
|
|
jid = UUID(job["job_id"])
|
|
with session_scope() as s:
|
|
s.execute(update(ScheduledJob).where(ScheduledJob.job_id == jid)
|
|
.values(next_run_at=datetime.now(timezone.utc)))
|
|
print(f"[ok] 已插入 job {job['short_id']},next_run 置为现在,等守护循环认领...")
|
|
print(" (若卡住不动 → 确认 web 服务在跑、ZCBOT_DISABLE_SCHEDULER 未设、tick 已过)")
|
|
|
|
# 2) 轮询 last_status
|
|
deadline = time.time() + POLL_TIMEOUT
|
|
status = None
|
|
last_task_id = None
|
|
last_error = None
|
|
while time.time() < deadline:
|
|
time.sleep(POLL_INTERVAL)
|
|
with session_scope() as s:
|
|
row = s.execute(
|
|
select(ScheduledJob.last_status, ScheduledJob.last_task_id,
|
|
ScheduledJob.last_error, ScheduledJob.next_run_at)
|
|
.where(ScheduledJob.job_id == jid)
|
|
).first()
|
|
if row is None:
|
|
print("[FAIL] job 不见了(被并发删?)")
|
|
return 1
|
|
status, last_task_id, last_error = row.last_status, row.last_task_id, row.last_error
|
|
waited = int(time.time() - (deadline - POLL_TIMEOUT))
|
|
print(f" [{waited:>3}s] last_status={status or '(待触发)'}")
|
|
if status in ("ok", "error", "skipped"):
|
|
break
|
|
|
|
# 3) 结果
|
|
print("-" * 50)
|
|
if status == "ok":
|
|
print(f"[PASS] 守护循环已触发并成功。task={str(last_task_id)[:8] if last_task_id else '?'}")
|
|
if last_task_id:
|
|
print(f" agent 回复: {_last_assistant_text(last_task_id)[:120]}")
|
|
rc = 0
|
|
elif status == "error":
|
|
print(f"[FAIL] 触发了但 run 报错: {last_error}")
|
|
rc = 1
|
|
elif status == "skipped":
|
|
print(f"[WARN] 被跳过(目标 task 正忙?): {last_error}")
|
|
rc = 1
|
|
else:
|
|
print(f"[FAIL] {POLL_TIMEOUT}s 内未触发(last_status 仍为空)。web 服务/调度是否在跑?")
|
|
rc = 1
|
|
|
|
# 4) 收尾:软删 job(留 task)
|
|
try:
|
|
scheduler.cancel_job(uid, str(jid))
|
|
print(f"[..] 已清理 smoke job {job['short_id']}(task 保留可查看)")
|
|
except Exception as e:
|
|
print(f"[..] 清理 job 失败(可手动删): {e}")
|
|
return rc
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|