256 lines
9.8 KiB
Python
256 lines
9.8 KiB
Python
"""Smoke: POST /v1/files/rename + 收紧的 POST /v1/files/delete。
|
|
|
|
跑法: .venv/Scripts/python.exe scripts/smoke_files_rename.py
|
|
依赖 .env 里 PLATFORM_KEY / JWT_SECRET / ZCBOT_DB_URL。
|
|
随机 user_id,run 完留 trace 自查;不清 DB(开发期约定)。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
# 读 .env(简单 KEY=VAL 解析)
|
|
env_file = ROOT / ".env"
|
|
if env_file.exists():
|
|
for line in env_file.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, _, v = line.partition("=")
|
|
os.environ.setdefault(k.strip(), v.strip())
|
|
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import update
|
|
|
|
from core.storage import session_scope
|
|
from core.storage.models import Task
|
|
from web.app import create_app
|
|
|
|
|
|
def main() -> int:
|
|
app = create_app()
|
|
client = TestClient(app)
|
|
uid = uuid.uuid4()
|
|
plat_key = os.environ["PLATFORM_KEY"]
|
|
|
|
# login
|
|
r = client.post("/v1/auth/login", json={"user_id": str(uid), "platform_key": plat_key})
|
|
assert r.status_code == 200, r.text
|
|
token = r.json()["token"]
|
|
H = {"Authorization": f"Bearer {token}"}
|
|
|
|
ws = ROOT / "workspace" / "users" / str(uid)
|
|
|
|
def case(name: str, fn):
|
|
try:
|
|
fn()
|
|
print(f"[OK] {name}")
|
|
except AssertionError as e:
|
|
print(f"[FAIL] {name}: {e}")
|
|
raise
|
|
|
|
# case 1: 顶层目录 rename + DB UPDATE
|
|
def t1():
|
|
# 建 task 创出 working_dir
|
|
r = client.post("/v1/tasks", json={"name": "proj_a"}, headers=H)
|
|
assert r.status_code == 201, r.text
|
|
tid = r.json()["task_id"]
|
|
assert (ws / "proj_a").is_dir()
|
|
# rename
|
|
r = client.post("/v1/files/rename", json={"path": "proj_a", "new_name": "proj_a2"}, headers=H)
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["tasks_updated"] == 1, body
|
|
assert body["new"] == "proj_a2"
|
|
# FS 真的改了
|
|
assert not (ws / "proj_a").exists()
|
|
assert (ws / "proj_a2").is_dir()
|
|
# DB working_dir 跟着变
|
|
r = client.get(f"/v1/tasks/{tid}", headers=H)
|
|
assert r.status_code == 200
|
|
wd = r.json()["working_dir"]
|
|
assert wd.endswith("/proj_a2"), wd
|
|
case("顶层目录 rename → tasks_updated + FS + DB 同步", t1)
|
|
|
|
# case 2: 子级 rename 不动 DB
|
|
def t2():
|
|
sub = ws / "proj_a2" / "sub_old"
|
|
sub.mkdir(parents=True, exist_ok=True)
|
|
r = client.post(
|
|
"/v1/files/rename",
|
|
json={"path": "proj_a2/sub_old", "new_name": "sub_new"},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
assert r.json()["tasks_updated"] == 0
|
|
assert not sub.exists()
|
|
assert (ws / "proj_a2" / "sub_new").is_dir()
|
|
case("子级 rename → 纯 FS,tasks_updated=0", t2)
|
|
|
|
# case 3: rename 顶层时有 running task → 409
|
|
def t3():
|
|
# 拿当前 proj_a2 的 task,mock 标 running
|
|
r = client.get("/v1/tasks", headers=H)
|
|
rows = r.json()["results"]
|
|
tid = uuid.UUID(rows[0]["task_id"])
|
|
with session_scope() as s:
|
|
s.execute(update(Task).where(Task.task_id == tid).values(run_status="running"))
|
|
try:
|
|
r = client.post("/v1/files/rename", json={"path": "proj_a2", "new_name": "blocked"}, headers=H)
|
|
assert r.status_code == 409, r.text
|
|
assert "active run" in r.text
|
|
finally:
|
|
with session_scope() as s:
|
|
s.execute(update(Task).where(Task.task_id == tid).values(run_status="idle"))
|
|
case("顶层 rename 有 running task → 409", t3)
|
|
|
|
# case 4: 删顶层(有 task 引用)→ 200,task.working_dir 字段不动(可重生)
|
|
def t4():
|
|
r = client.post("/v1/tasks", json={"name": "to_be_deleted"}, headers=H)
|
|
assert r.status_code == 201, r.text
|
|
tid = r.json()["task_id"]
|
|
wd_before = r.json()["working_dir"]
|
|
assert (ws / "to_be_deleted").is_dir()
|
|
r = client.post("/v1/files/delete", json={"path": "to_be_deleted"}, headers=H)
|
|
assert r.status_code == 200, r.text
|
|
assert not (ws / "to_be_deleted").exists()
|
|
r2 = client.get(f"/v1/tasks/{tid}", headers=H)
|
|
assert r2.status_code == 200
|
|
assert r2.json()["working_dir"] == wd_before, r2.json()
|
|
case("delete 顶层 + 有 task 引用 → 200,task.working_dir 不变", t4)
|
|
|
|
# case 5: rename target sibling 已存在 → 409
|
|
def t5():
|
|
(ws / "occupied").mkdir(exist_ok=True)
|
|
r = client.post(
|
|
"/v1/files/rename",
|
|
json={"path": "proj_a2", "new_name": "occupied"},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 409, r.text
|
|
assert "already exists" in r.text
|
|
case("rename target sibling 存在 → 409", t5)
|
|
|
|
# case 6: 删空子目录(非顶层)→ 正常
|
|
def t6():
|
|
r = client.post("/v1/files/delete", json={"path": "proj_a2/sub_new"}, headers=H)
|
|
assert r.status_code == 200, r.text
|
|
assert not (ws / "proj_a2" / "sub_new").exists()
|
|
case("delete 子级空目录 → 200", t6)
|
|
|
|
# case 7: 新 user,顶层目录无 task 引用时可删
|
|
def t7():
|
|
uid2 = uuid.uuid4()
|
|
r = client.post("/v1/auth/login", json={"user_id": str(uid2), "platform_key": plat_key})
|
|
tok2 = r.json()["token"]
|
|
H2 = {"Authorization": f"Bearer {tok2}"}
|
|
# 手建顶层(模拟用户上传到不存在路径,API 会 mkdir)
|
|
ws2 = ROOT / "workspace" / "users" / str(uid2)
|
|
(ws2 / "orphan").mkdir(parents=True, exist_ok=True)
|
|
r = client.post("/v1/files/delete", json={"path": "orphan"}, headers=H2)
|
|
assert r.status_code == 200, r.text
|
|
assert not (ws2 / "orphan").exists()
|
|
case("delete 顶层目录无 task 引用 → 200", t7)
|
|
|
|
# case 8: DELETE /v1/tasks 时若 working_dir 空且无其他引用 → 顺手 rmdir
|
|
def t8():
|
|
r = client.post("/v1/tasks", json={"name": "auto_clean"}, headers=H)
|
|
assert r.status_code == 201, r.text
|
|
tid = r.json()["task_id"]
|
|
assert (ws / "auto_clean").is_dir()
|
|
r = client.delete(f"/v1/tasks/{tid}", headers=H)
|
|
assert r.status_code == 204, r.text
|
|
assert not (ws / "auto_clean").exists()
|
|
case("DELETE task 空 working_dir 顺手清", t8)
|
|
|
|
# case 9: DELETE /v1/tasks 时 working_dir 非空 → 目录保留(best-effort)
|
|
def t9():
|
|
r = client.post("/v1/tasks", json={"name": "keep_dir"}, headers=H)
|
|
assert r.status_code == 201, r.text
|
|
tid = r.json()["task_id"]
|
|
(ws / "keep_dir" / "user_file.txt").write_text("hello", encoding="utf-8")
|
|
r = client.delete(f"/v1/tasks/{tid}", headers=H)
|
|
assert r.status_code == 204, r.text
|
|
assert (ws / "keep_dir" / "user_file.txt").is_file()
|
|
case("DELETE task 非空 working_dir 保留", t9)
|
|
|
|
# case 10: recursive=False 删非空目录 → 400
|
|
def t10():
|
|
(ws / "to_rm_shallow" / "f.txt").parent.mkdir(parents=True, exist_ok=True)
|
|
(ws / "to_rm_shallow" / "f.txt").write_text("x", encoding="utf-8")
|
|
r = client.post(
|
|
"/v1/files/delete",
|
|
json={"path": "to_rm_shallow"},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 400, r.text
|
|
# 目录与内容应原封不动
|
|
assert (ws / "to_rm_shallow" / "f.txt").is_file()
|
|
case("delete 非空目录 recursive=False → 400", t10)
|
|
|
|
# case 11: recursive=True 删非空顶层目录(无 task 引用)→ 200,整树清
|
|
def t11():
|
|
# 沿用 t10 留下的 to_rm_shallow
|
|
r = client.post(
|
|
"/v1/files/delete",
|
|
json={"path": "to_rm_shallow", "recursive": True},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
assert not (ws / "to_rm_shallow").exists()
|
|
case("delete 非空顶层目录 recursive=True 无 task 引用 → 200", t11)
|
|
|
|
# case 12: recursive=True 删顶层目录但被 task 引用 → 409
|
|
def t12():
|
|
r = client.post("/v1/tasks", json={"name": "occupied_top"}, headers=H)
|
|
assert r.status_code == 201, r.text
|
|
tid = r.json()["task_id"]
|
|
(ws / "occupied_top" / "art.md").write_text("artifact", encoding="utf-8")
|
|
r = client.post(
|
|
"/v1/files/delete",
|
|
json={"path": "occupied_top", "recursive": True},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 409, r.text
|
|
assert "task 引用" in r.text
|
|
# 内容应保留
|
|
assert (ws / "occupied_top" / "art.md").is_file()
|
|
# 删 task 后再递归删 → 200
|
|
r = client.delete(f"/v1/tasks/{tid}", headers=H)
|
|
assert r.status_code == 204, r.text
|
|
r = client.post(
|
|
"/v1/files/delete",
|
|
json={"path": "occupied_top", "recursive": True},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
assert not (ws / "occupied_top").exists()
|
|
case("delete 顶层目录 recursive=True 有 task 引用 → 409;删 task 后放行", t12)
|
|
|
|
# case 13: recursive=True 删非空子级目录(不可能撞 task,working_dir 永远顶层)→ 200
|
|
def t13():
|
|
(ws / "proj_a2" / "deep" / "a" / "b.txt").parent.mkdir(parents=True, exist_ok=True)
|
|
(ws / "proj_a2" / "deep" / "a" / "b.txt").write_text("y", encoding="utf-8")
|
|
r = client.post(
|
|
"/v1/files/delete",
|
|
json={"path": "proj_a2/deep", "recursive": True},
|
|
headers=H,
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
assert not (ws / "proj_a2" / "deep").exists()
|
|
# 父顶层目录不受影响
|
|
assert (ws / "proj_a2").is_dir()
|
|
case("delete 非空子级 recursive=True → 200", t13)
|
|
|
|
print("\n[ALL PASS]")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|