zcbot/scripts/smoke_files_rename.py

256 lines
9.8 KiB
Python

"""Smoke: POST /v1/files/rename + 收紧的 POST /v1/files/delete。
跑法: .venv/Scripts/python.exe scripts/smoke_files_rename.py
依赖 .env 里 PLATFORM_KEY / JWT_SECRET / ZCBOT_DB_URL。
随机 user_id,run 完留 trace 自查;不清 DB(开发期约定)。
"""
from __future__ import annotations
import os
import sys
import uuid
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
# 读 .env(简单 KEY=VAL 解析)
env_file = ROOT / ".env"
if env_file.exists():
for line in env_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
os.environ.setdefault(k.strip(), v.strip())
from fastapi.testclient import TestClient
from sqlalchemy import update
from core.storage import session_scope
from core.storage.models import Task
from web.app import create_app
def main() -> int:
app = create_app()
client = TestClient(app)
uid = uuid.uuid4()
plat_key = os.environ["PLATFORM_KEY"]
# login
r = client.post("/v1/auth/login", json={"user_id": str(uid), "platform_key": plat_key})
assert r.status_code == 200, r.text
token = r.json()["token"]
H = {"Authorization": f"Bearer {token}"}
ws = ROOT / "workspace" / "users" / str(uid)
def case(name: str, fn):
try:
fn()
print(f"[OK] {name}")
except AssertionError as e:
print(f"[FAIL] {name}: {e}")
raise
# case 1: 顶层目录 rename + DB UPDATE
def t1():
# 建 task 创出 working_dir
r = client.post("/v1/tasks", json={"name": "proj_a"}, headers=H)
assert r.status_code == 201, r.text
tid = r.json()["task_id"]
assert (ws / "proj_a").is_dir()
# rename
r = client.post("/v1/files/rename", json={"path": "proj_a", "new_name": "proj_a2"}, headers=H)
assert r.status_code == 200, r.text
body = r.json()
assert body["tasks_updated"] == 1, body
assert body["new"] == "proj_a2"
# FS 真的改了
assert not (ws / "proj_a").exists()
assert (ws / "proj_a2").is_dir()
# DB working_dir 跟着变
r = client.get(f"/v1/tasks/{tid}", headers=H)
assert r.status_code == 200
wd = r.json()["working_dir"]
assert wd.endswith("/proj_a2"), wd
case("顶层目录 rename → tasks_updated + FS + DB 同步", t1)
# case 2: 子级 rename 不动 DB
def t2():
sub = ws / "proj_a2" / "sub_old"
sub.mkdir(parents=True, exist_ok=True)
r = client.post(
"/v1/files/rename",
json={"path": "proj_a2/sub_old", "new_name": "sub_new"},
headers=H,
)
assert r.status_code == 200, r.text
assert r.json()["tasks_updated"] == 0
assert not sub.exists()
assert (ws / "proj_a2" / "sub_new").is_dir()
case("子级 rename → 纯 FS,tasks_updated=0", t2)
# case 3: rename 顶层时有 running task → 409
def t3():
# 拿当前 proj_a2 的 task,mock 标 running
r = client.get("/v1/tasks", headers=H)
rows = r.json()["results"]
tid = uuid.UUID(rows[0]["task_id"])
with session_scope() as s:
s.execute(update(Task).where(Task.task_id == tid).values(run_status="running"))
try:
r = client.post("/v1/files/rename", json={"path": "proj_a2", "new_name": "blocked"}, headers=H)
assert r.status_code == 409, r.text
assert "active run" in r.text
finally:
with session_scope() as s:
s.execute(update(Task).where(Task.task_id == tid).values(run_status="idle"))
case("顶层 rename 有 running task → 409", t3)
# case 4: 删顶层(有 task 引用)→ 200,task.working_dir 字段不动(可重生)
def t4():
r = client.post("/v1/tasks", json={"name": "to_be_deleted"}, headers=H)
assert r.status_code == 201, r.text
tid = r.json()["task_id"]
wd_before = r.json()["working_dir"]
assert (ws / "to_be_deleted").is_dir()
r = client.post("/v1/files/delete", json={"path": "to_be_deleted"}, headers=H)
assert r.status_code == 200, r.text
assert not (ws / "to_be_deleted").exists()
r2 = client.get(f"/v1/tasks/{tid}", headers=H)
assert r2.status_code == 200
assert r2.json()["working_dir"] == wd_before, r2.json()
case("delete 顶层 + 有 task 引用 → 200,task.working_dir 不变", t4)
# case 5: rename target sibling 已存在 → 409
def t5():
(ws / "occupied").mkdir(exist_ok=True)
r = client.post(
"/v1/files/rename",
json={"path": "proj_a2", "new_name": "occupied"},
headers=H,
)
assert r.status_code == 409, r.text
assert "already exists" in r.text
case("rename target sibling 存在 → 409", t5)
# case 6: 删空子目录(非顶层)→ 正常
def t6():
r = client.post("/v1/files/delete", json={"path": "proj_a2/sub_new"}, headers=H)
assert r.status_code == 200, r.text
assert not (ws / "proj_a2" / "sub_new").exists()
case("delete 子级空目录 → 200", t6)
# case 7: 新 user,顶层目录无 task 引用时可删
def t7():
uid2 = uuid.uuid4()
r = client.post("/v1/auth/login", json={"user_id": str(uid2), "platform_key": plat_key})
tok2 = r.json()["token"]
H2 = {"Authorization": f"Bearer {tok2}"}
# 手建顶层(模拟用户上传到不存在路径,API 会 mkdir)
ws2 = ROOT / "workspace" / "users" / str(uid2)
(ws2 / "orphan").mkdir(parents=True, exist_ok=True)
r = client.post("/v1/files/delete", json={"path": "orphan"}, headers=H2)
assert r.status_code == 200, r.text
assert not (ws2 / "orphan").exists()
case("delete 顶层目录无 task 引用 → 200", t7)
# case 8: DELETE /v1/tasks 时若 working_dir 空且无其他引用 → 顺手 rmdir
def t8():
r = client.post("/v1/tasks", json={"name": "auto_clean"}, headers=H)
assert r.status_code == 201, r.text
tid = r.json()["task_id"]
assert (ws / "auto_clean").is_dir()
r = client.delete(f"/v1/tasks/{tid}", headers=H)
assert r.status_code == 204, r.text
assert not (ws / "auto_clean").exists()
case("DELETE task 空 working_dir 顺手清", t8)
# case 9: DELETE /v1/tasks 时 working_dir 非空 → 目录保留(best-effort)
def t9():
r = client.post("/v1/tasks", json={"name": "keep_dir"}, headers=H)
assert r.status_code == 201, r.text
tid = r.json()["task_id"]
(ws / "keep_dir" / "user_file.txt").write_text("hello", encoding="utf-8")
r = client.delete(f"/v1/tasks/{tid}", headers=H)
assert r.status_code == 204, r.text
assert (ws / "keep_dir" / "user_file.txt").is_file()
case("DELETE task 非空 working_dir 保留", t9)
# case 10: recursive=False 删非空目录 → 400
def t10():
(ws / "to_rm_shallow" / "f.txt").parent.mkdir(parents=True, exist_ok=True)
(ws / "to_rm_shallow" / "f.txt").write_text("x", encoding="utf-8")
r = client.post(
"/v1/files/delete",
json={"path": "to_rm_shallow"},
headers=H,
)
assert r.status_code == 400, r.text
# 目录与内容应原封不动
assert (ws / "to_rm_shallow" / "f.txt").is_file()
case("delete 非空目录 recursive=False → 400", t10)
# case 11: recursive=True 删非空顶层目录(无 task 引用)→ 200,整树清
def t11():
# 沿用 t10 留下的 to_rm_shallow
r = client.post(
"/v1/files/delete",
json={"path": "to_rm_shallow", "recursive": True},
headers=H,
)
assert r.status_code == 200, r.text
assert not (ws / "to_rm_shallow").exists()
case("delete 非空顶层目录 recursive=True 无 task 引用 → 200", t11)
# case 12: recursive=True 删顶层目录但被 task 引用 → 409
def t12():
r = client.post("/v1/tasks", json={"name": "occupied_top"}, headers=H)
assert r.status_code == 201, r.text
tid = r.json()["task_id"]
(ws / "occupied_top" / "art.md").write_text("artifact", encoding="utf-8")
r = client.post(
"/v1/files/delete",
json={"path": "occupied_top", "recursive": True},
headers=H,
)
assert r.status_code == 409, r.text
assert "task 引用" in r.text
# 内容应保留
assert (ws / "occupied_top" / "art.md").is_file()
# 删 task 后再递归删 → 200
r = client.delete(f"/v1/tasks/{tid}", headers=H)
assert r.status_code == 204, r.text
r = client.post(
"/v1/files/delete",
json={"path": "occupied_top", "recursive": True},
headers=H,
)
assert r.status_code == 200, r.text
assert not (ws / "occupied_top").exists()
case("delete 顶层目录 recursive=True 有 task 引用 → 409;删 task 后放行", t12)
# case 13: recursive=True 删非空子级目录(不可能撞 task,working_dir 永远顶层)→ 200
def t13():
(ws / "proj_a2" / "deep" / "a" / "b.txt").parent.mkdir(parents=True, exist_ok=True)
(ws / "proj_a2" / "deep" / "a" / "b.txt").write_text("y", encoding="utf-8")
r = client.post(
"/v1/files/delete",
json={"path": "proj_a2/deep", "recursive": True},
headers=H,
)
assert r.status_code == 200, r.text
assert not (ws / "proj_a2" / "deep").exists()
# 父顶层目录不受影响
assert (ws / "proj_a2").is_dir()
case("delete 非空子级 recursive=True → 200", t13)
print("\n[ALL PASS]")
return 0
if __name__ == "__main__":
sys.exit(main())