From 164620536481b71f7dfe6bd407f24fdad990ad03 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Mon, 29 Jun 2026 14:35:40 +0800 Subject: [PATCH] =?UTF-8?q?fix(scheduler):=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E8=B6=85=E6=97=B6=E8=A2=AB=E6=8E=90=E6=96=AD=E6=97=B6?= =?UTF-8?q?=E8=AE=B0=20error=20=E8=80=8C=E9=9D=9E=E8=AF=AF=E5=90=9E?= =?UTF-8?q?=E6=88=90=20ok(bump=200.32.2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实测 bug:isolated 定时 job 跑满 timeout_seconds 被协作式 cancel 后, _run_agent_bg 对 ok/cancelled 都把 run_status 收回 idle(DB 不可区分), 而 _execute_scheduled_job 收尾只判 run_status=="error",于是超时中断被落成 last_status="ok" —— 掩盖"跑到一半没写 sections/没推送",且不计连续失败、 不触发兜底。复盘 job e621c8a6「每日水泥科研简报」:timeout=600s,task 创建→last_run_at 正好 600.0s,agent 停在"按期刊打印 38 篇摘要"(还在取数)。 修:超时分支置 timed_out 标志,run 收尾后若 timed_out → record_result( status="error", 半成品不投递 notify)并直接返回。复用既有 error 语义(计入 consecutive_failures、到阈值自动停用、前端 crons 显示「上次失败」)。不动 _run_agent_bg 的 idle-on-cancel 共享语义(HTTP cancel/drain 也依赖)。 配套:PROGRESS/RUN 故障兜底各加一条;诊断脚本 scripts/diag_sched_e621.py (dump 输出 scripts/_*.txt 入 gitignore)。 Co-Authored-By: Claude Opus 4.8 (1M context) --- .gitignore | 3 ++ PROGRESS.md | 6 +++ RUN.md | 1 + core/__init__.py | 2 +- scripts/diag_sched_e621.py | 93 ++++++++++++++++++++++++++++++++++++++ web/app.py | 12 +++++ 6 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 scripts/diag_sched_e621.py diff --git a/.gitignore b/.gitignore index 7385731..fe850e8 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,6 @@ col.ps1 # 探测脚本 scripts/probe_clawbot*.py 保留作参考与复测) scripts/clawbot_qr*.png scripts/zcbot_filetest.txt + +# 诊断脚本的使用即弃 dump 输出(diag_*.py 写本地,不入库) +scripts/_*.txt diff --git a/PROGRESS.md b/PROGRESS.md index b7b0b42..6e9c15a 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -21,6 +21,12 @@ ## 已完成关键能力 +### 2026-06-29 / 修定时任务超时被误记成 ok(bump 0.32.2) + +- 实测 bug:定时 job(isolated)跑满 `timeout_seconds` 被调度器协作式 cancel 后,`_run_agent_bg` 对 ok/cancelled 都把 `run_status` 收回 `idle`(二者 DB 不可区分),而 `_execute_scheduled_job` 收尾只判 `run_status=="error"`,于是超时中断被落成 `last_status="ok"` —— 掩盖"跑到一半没写 sections / 没推送",且不计连续失败、不触发兜底。复盘 job `e621c8a6`「每日水泥科研简报」:`timeout_seconds=600`,task 创建→`last_run_at` 正好 600.0s,最后一条 agent 消息停在"按期刊分组打印 38 篇摘要"(还在取数阶段),`last_status` 却是 ok。 +- 修:`web/app.py` `_execute_scheduled_job` 在超时分支置 `timed_out` 标志,run 收尾后若 `timed_out` → `record_result(status="error", ...)` 并直接返回(不投递半成品 notify)。复用既有 error 语义:计入 `consecutive_failures`、到阈值自动停用、前端 crons.js 显示「上次失败」。不动 `_run_agent_bg` 的 idle-on-cancel 共享语义(HTTP cancel/drain 也用)。 +- 配套:该 job 真正的诱因是 600s 超时对"7 刊 38 篇带中文摘要重写 + 渲 docx"太短,需用户把 `timeout_seconds` 调大(或 0=不限)。诊断脚本 `scripts/diag_sched_e621.py`。 + ### 2026-06-29 / channel 长会话上下文软重置(Phase 1,bump 0.32.0) - 问题:微信/企业微信复用同一常驻 chat_task,`Session.load` 全量喂模型 → 越用越贵/慢,终撞 context window。业界(OpenClaw/Hermes)做法:阈值摘要 + 会话分段 + 持久记忆;IM 场景独有的「会话分段」最高杠杆且零信息损失。 diff --git a/RUN.md b/RUN.md index e94783e..7aeef86 100644 --- a/RUN.md +++ b/RUN.md @@ -756,6 +756,7 @@ sudo xfs_quota -x -c "limit -p bhard=10g zcbot_" /opt | `kill -HUP ` 后 `/openapi.json` 没新接口 | uvicorn **不响应 SIGHUP**(没装 handler,落 Python 默认终止;Windows 上信号本身无效)。Ubuntu 上用 `systemctl restart zcbot`,或 unit 加 `--reload` 让 uvicorn 监听文件自动重起(见"部署"段)。验证:`curl -s http://127.0.0.1:8765/openapi.json \| python3 -c 'import sys,json;print([p for p in json.load(sys.stdin)["paths"] if "auth" in p])'` | | `systemctl restart zcbot` 要等几十秒才退 | 正常 —— 优雅 drain 在等在跑的 run 收尾(`shutdown.drain_timeout` 默 30s),没在跑 run 时秒退。journal 出现 `[shutdown] draining N in-flight run(s)` 即正常。真急(不在乎杀掉在跑 run):`systemctl kill -s KILL zcbot` | | 部署后在跑的对话被标 `error: server restarted before run finished` | 该 run 在 drain 期内没收尾、cancel 也没在 `cancel_grace` 内退,被 SIGKILL 后下次启动 reaper 标的。多半是 run 卡在不 poll cancel 的长动作(如单次超长 docker exec)或 `TimeoutStopSec` 配得比 drain 预算还小被提前 SIGKILL。先核对 unit `TimeoutStopSec > drain_timeout + cancel_grace`;真有超长 run 把 `drain_timeout` 调大 | +| 定时任务「跑到一半没推送」/ crons 页显示「上次失败: 运行超过超时上限 Ns 未完成」 | job 跑满 `timeout_seconds` 被协作式中断(还没写完 / 没推送)。**0.32.2 起超时记 error**(此前误记 ok 看不出来),计入连续失败、到阈值自动停用。处置:该 job 调大 `timeout_seconds`(报告类重活如多刊检索+渲 docx 建议 ≥1800,或 0=不限),被自动停用的重新 enable。诊断单个 job 用 `scripts/diag_sched_e621.py ` | | `POST /v1/files/rename` 返 409 `folder has active run(s)` | 顶层目录被某 running/cancelling 的 task 占用;先 cancel 等流式 done 再 rename | | `POST /v1/files/rename` 返 409 `... 前缀嵌套` | 改名后会与其他 task 的 working_dir 形成嵌套;换不冲突的 new_name | | `POST /v1/files/upload` 返 413 `已达磁盘配额上限` | per-user 5GB(yaml `quotas.disk_bytes_per_user`)。让用户在 dev SPA 右侧文件栏删旧产物 / 大文件,或改 yaml 升配重启 web | diff --git a/core/__init__.py b/core/__init__.py index 71afdd5..e0783d0 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,3 +1,3 @@ # zcbot 版本号单一事实源:web/app.py 的 FastAPI version、/healthz 返回、前端展示都引这里。 # 改版本只动这一行。 -__version__ = "0.32.1" +__version__ = "0.32.2" diff --git a/scripts/diag_sched_e621.py b/scripts/diag_sched_e621.py new file mode 100644 index 0000000..6d8068b --- /dev/null +++ b/scripts/diag_sched_e621.py @@ -0,0 +1,93 @@ +"""diag: 查 scheduled-e621c8a6 这个 job 为何执行到一半没推送(ASCII only, GBK safe).""" +import os +import sys +from pathlib import Path + +env = Path(__file__).resolve().parent.parent / ".env" +for line in env.read_text(encoding="utf-8").splitlines(): + if line.strip().startswith("ZCBOT_DB_URL="): + os.environ["ZCBOT_DB_URL"] = line.split("=", 1)[1].strip() +from sqlalchemy import create_engine, text # noqa: E402 +import builtins # noqa: E402 + +_out = open(Path(__file__).resolve().parent / "_sched_e621.txt", "w", encoding="utf-8") + + +def print(*a, **k): # noqa: A001 + builtins.print(*a, **k, file=_out) + + +PREFIX = sys.argv[1] if len(sys.argv) > 1 else "e621c8a6" +engine = create_engine(os.environ["ZCBOT_DB_URL"]) + + +def s(x, n=2000): + t = str(x if x is not None else "") + return t if len(t) <= n else t[:n] + f"...[+{len(t)-n}]" + + +with engine.connect() as conn: + job = conn.execute(text( + "select job_id,user_id,name,mode,cron,tz,enabled,notify,timeout_seconds," + "next_run_at,last_run_at,last_status,last_error,last_task_id," + "consecutive_failures,run_count,bound_task_id,created_at,deleted_at " + "from scheduled_jobs where cast(job_id as text) like :p"), + {"p": PREFIX + "%"}).fetchall() + print(f"[JOBS matched '{PREFIX}'] {len(job)}") + for j in job: + print("-" * 60) + print(f"job_id={j[0]} name={j[2]!r}") + print(f" mode={j[3]} cron={j[4]!r} tz={j[5]} enabled={j[6]} timeout={j[8]}") + print(f" notify={j[7]}") + print(f" next_run_at={j[9]} last_run_at={j[10]}") + print(f" last_status={j[11]} consecutive_failures={j[14]} run_count={j[15]}") + print(f" last_task_id={j[13]} bound_task_id={j[16]}") + print(f" deleted_at={j[18]} created_at={j[17]}") + if j[12]: + print(f" last_error: {s(j[12], 1500)}") + + if not job: + sys.exit(0) + + j = job[0] + uid = j[1] + last_tid = j[13] + + # 找该 job 关联的所有 task(scheduled_job_id 回填 + last_task_id) + tasks = conn.execute(text( + "select task_id,name,status,run_status,run_error,tokens_prompt,tokens_completion," + "created_at,updated_at,scheduled_job_id from tasks " + "where scheduled_job_id = :jid order by created_at"), + {"jid": str(j[0])}).fetchall() + print("\n" + "=" * 60) + print(f"[TASKS with scheduled_job_id={str(j[0])[:8]}] {len(tasks)}") + for t in tasks: + print(f" task={t[0]} name={t[1]!r} status={t[2]} run={t[3]} " + f"tok={t[5]}/{t[6]} created={t[7]} updated={t[8]}") + if t[4]: + print(f" run_error: {s(t[4], 1500)}") + + # dump last_task_id 的消息(执行到哪一步) + tid = last_tid or (tasks[-1][0] if tasks else None) + if tid is None: + print("\n[no task to dump]") + sys.exit(0) + print("\n" + "=" * 60) + print(f"[DUMP messages of task {tid}]") + msgs = conn.execute(text( + "select idx,payload,tokens_in,tokens_out,created_at from messages " + "where task_id=:t order by idx"), {"t": str(tid)}).fetchall() + print(f"messages: {len(msgs)}\n") + for idx, p, ti, to, cat in msgs: + role = p.get("role") + head = f"[{idx}] {role} tok={ti}/{to} at={cat}" + print(head) + content = p.get("content") + if content: + print(" content:", s(content, 1500)) + for tc in p.get("tool_calls") or []: + fn = tc.get("function") or {} + print(f" CALL {fn.get('name')}({s(fn.get('arguments'), 800)})") + if role == "tool": + print(f" TOOL[{p.get('name')}]:", s(content, 1200)) + print() diff --git a/web/app.py b/web/app.py index a43ef9c..ad58e7d 100644 --- a/web/app.py +++ b/web/app.py @@ -921,15 +921,27 @@ def create_app() -> FastAPI: runner.add_done_callback(lambda t: app.state.inflight.pop(t, None)) timeout = int(snap.get("timeout_seconds") or 0) + timed_out = False if timeout > 0: done, _pending = await asyncio.wait({runner}, timeout=timeout) if not done: + timed_out = True broker.request_cancel(tid) # 协作式停;loop 在 chunk 间 poll 到即退 print(f"[scheduler] job {str(job_id)[:8]} timed out ({timeout}s), cancelling") await runner else: await runner + # 超时被掐断:_run_agent_bg 对 ok/cancelled 都把 run_status 收回 idle + # (二者在 DB 里不可区分),只有这里知道本次是 timeout 中断的。必须记为 + # error —— 否则会误判成 ok(掩盖"跑到一半没推送"),且不计入连续失败/不触发 + # 兜底。半成品不投递 notify,直接收尾返回。 + if timed_out: + record_result(job_id, status="error", task_id=tid, + error=f"运行超过超时上限 {timeout}s 未完成,已中断(本次未推送)") + print(f"[scheduler] job {str(job_id)[:8]} recorded as timeout-error") + return + # run 终态:_run_agent_bg 收尾把 run_status 写回 idle(ok)/error with session_scope() as s: st = s.execute(