"""Smoke: POST /v1/files/rename + 收紧的 POST /v1/files/delete。 跑法: .venv/Scripts/python.exe scripts/smoke_files_rename.py 依赖 .env 里 PLATFORM_KEY / JWT_SECRET / ZCBOT_DB_URL。 随机 user_id,run 完留 trace 自查;不清 DB(开发期约定)。 """ from __future__ import annotations import os import sys import uuid from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) # 读 .env(简单 KEY=VAL 解析) env_file = ROOT / ".env" if env_file.exists(): for line in env_file.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") os.environ.setdefault(k.strip(), v.strip()) from fastapi.testclient import TestClient from sqlalchemy import update from core.storage import session_scope from core.storage.models import Task from web.app import create_app def main() -> int: app = create_app() client = TestClient(app) uid = uuid.uuid4() plat_key = os.environ["PLATFORM_KEY"] # login r = client.post("/v1/auth/login", json={"user_id": str(uid), "platform_key": plat_key}) assert r.status_code == 200, r.text token = r.json()["token"] H = {"Authorization": f"Bearer {token}"} ws = ROOT / "workspace" / "users" / str(uid) def case(name: str, fn): try: fn() print(f"[OK] {name}") except AssertionError as e: print(f"[FAIL] {name}: {e}") raise # case 1: 顶层目录 rename + DB UPDATE def t1(): # 建 task 创出 working_dir r = client.post("/v1/tasks", json={"name": "proj_a"}, headers=H) assert r.status_code == 201, r.text tid = r.json()["task_id"] assert (ws / "proj_a").is_dir() # rename r = client.post("/v1/files/rename", json={"path": "proj_a", "new_name": "proj_a2"}, headers=H) assert r.status_code == 200, r.text body = r.json() assert body["tasks_updated"] == 1, body assert body["new"] == "proj_a2" # FS 真的改了 assert not (ws / "proj_a").exists() assert (ws / "proj_a2").is_dir() # DB working_dir 跟着变 r = client.get(f"/v1/tasks/{tid}", headers=H) assert r.status_code == 200 wd = r.json()["working_dir"] assert wd.endswith("/proj_a2"), wd case("顶层目录 rename → tasks_updated + FS + DB 同步", t1) # case 2: 子级 rename 不动 DB def t2(): sub = ws / "proj_a2" / "sub_old" sub.mkdir(parents=True, exist_ok=True) r = client.post( "/v1/files/rename", json={"path": "proj_a2/sub_old", "new_name": "sub_new"}, headers=H, ) assert r.status_code == 200, r.text assert r.json()["tasks_updated"] == 0 assert not sub.exists() assert (ws / "proj_a2" / "sub_new").is_dir() case("子级 rename → 纯 FS,tasks_updated=0", t2) # case 3: rename 顶层时有 running task → 409 def t3(): # 拿当前 proj_a2 的 task,mock 标 running r = client.get("/v1/tasks", headers=H) rows = r.json()["results"] tid = uuid.UUID(rows[0]["task_id"]) with session_scope() as s: s.execute(update(Task).where(Task.task_id == tid).values(run_status="running")) try: r = client.post("/v1/files/rename", json={"path": "proj_a2", "new_name": "blocked"}, headers=H) assert r.status_code == 409, r.text assert "active run" in r.text finally: with session_scope() as s: s.execute(update(Task).where(Task.task_id == tid).values(run_status="idle")) case("顶层 rename 有 running task → 409", t3) # case 4: 删顶层(有 task 引用)→ 200,task.working_dir 字段不动(可重生) def t4(): r = client.post("/v1/tasks", json={"name": "to_be_deleted"}, headers=H) assert r.status_code == 201, r.text tid = r.json()["task_id"] wd_before = r.json()["working_dir"] assert (ws / "to_be_deleted").is_dir() r = client.post("/v1/files/delete", json={"path": "to_be_deleted"}, headers=H) assert r.status_code == 200, r.text assert not (ws / "to_be_deleted").exists() r2 = client.get(f"/v1/tasks/{tid}", headers=H) assert r2.status_code == 200 assert r2.json()["working_dir"] == wd_before, r2.json() case("delete 顶层 + 有 task 引用 → 200,task.working_dir 不变", t4) # case 5: rename target sibling 已存在 → 409 def t5(): (ws / "occupied").mkdir(exist_ok=True) r = client.post( "/v1/files/rename", json={"path": "proj_a2", "new_name": "occupied"}, headers=H, ) assert r.status_code == 409, r.text assert "already exists" in r.text case("rename target sibling 存在 → 409", t5) # case 6: 删空子目录(非顶层)→ 正常 def t6(): r = client.post("/v1/files/delete", json={"path": "proj_a2/sub_new"}, headers=H) assert r.status_code == 200, r.text assert not (ws / "proj_a2" / "sub_new").exists() case("delete 子级空目录 → 200", t6) # case 7: 新 user,顶层目录无 task 引用时可删 def t7(): uid2 = uuid.uuid4() r = client.post("/v1/auth/login", json={"user_id": str(uid2), "platform_key": plat_key}) tok2 = r.json()["token"] H2 = {"Authorization": f"Bearer {tok2}"} # 手建顶层(模拟用户上传到不存在路径,API 会 mkdir) ws2 = ROOT / "workspace" / "users" / str(uid2) (ws2 / "orphan").mkdir(parents=True, exist_ok=True) r = client.post("/v1/files/delete", json={"path": "orphan"}, headers=H2) assert r.status_code == 200, r.text assert not (ws2 / "orphan").exists() case("delete 顶层目录无 task 引用 → 200", t7) # case 8: DELETE /v1/tasks 时若 working_dir 空且无其他引用 → 顺手 rmdir def t8(): r = client.post("/v1/tasks", json={"name": "auto_clean"}, headers=H) assert r.status_code == 201, r.text tid = r.json()["task_id"] assert (ws / "auto_clean").is_dir() r = client.delete(f"/v1/tasks/{tid}", headers=H) assert r.status_code == 204, r.text assert not (ws / "auto_clean").exists() case("DELETE task 空 working_dir 顺手清", t8) # case 9: DELETE /v1/tasks 时 working_dir 非空 → 目录保留(best-effort) def t9(): r = client.post("/v1/tasks", json={"name": "keep_dir"}, headers=H) assert r.status_code == 201, r.text tid = r.json()["task_id"] (ws / "keep_dir" / "user_file.txt").write_text("hello", encoding="utf-8") r = client.delete(f"/v1/tasks/{tid}", headers=H) assert r.status_code == 204, r.text assert (ws / "keep_dir" / "user_file.txt").is_file() case("DELETE task 非空 working_dir 保留", t9) print("\n[ALL PASS]") return 0 if __name__ == "__main__": sys.exit(main())