# 控制引擎实现计划 > **适用于代理执行:** 必须使用 superpowers:subagent-driven-development(推荐)或 superpowers:executing-plans 逐任务执行。步骤使用复选框(`- [ ]`)语法跟踪进度。 **目标:** 实现投煤器 / 布料机单元的自动控制引擎,包括状态机、故障/通信保护、运行时 API 及前端控制面板。 **架构:** 引擎为每个已启用的单元各启动一个异步任务(由10秒扫描器监督)。每个任务通过 `tokio::time::sleep_until` 控制阶段计时,通过 `tokio::sync::Notify` 在外部状态变化时(自动启停、故障确认)立即唤醒。状态保存在 `ControlRuntimeStore`(内存中,不持久化)。前端通过 `WsMessage::UnitRuntimeChanged` 实时接收更新——**仅在状态转换时推送**,不做周期性推送。 **技术栈:** Rust/Axum 后端、sqlx/PostgreSQL、tokio 异步、Vanilla JS ES 模块前端。 --- ## 文件清单 | 文件 | 操作 | 职责 | |------|------|------| | `src/control/runtime.rs` | ✅ 已完成 | `UnitRuntime` 结构体 + `ControlRuntimeStore`(含 `Notify`) | | `src/control/command.rs` | ✅ 已完成 | 共享 `send_pulse_command()` 和 `simulate_run_feedback()` | | `src/control/engine.rs` | ✅ 已完成 | 监督器 + 单元异步任务 + `wait_phase` | | `src/control/validator.rs` | ✅ 已完成 | 故障/通信锁定时阻断手动指令 | | `src/control/mod.rs` | ✅ 已完成 | 导出 `command`、`engine`、`runtime`、`validator` | | `src/event.rs` | ✅ 已完成 | 7个 `AppEvent` 变体;`UnitStateChanged` 触发但**不持久化到数据库** | | `src/websocket.rs` | ✅ 已完成 | `WsMessage::UnitRuntimeChanged` | | `src/service/control.rs` | ✅ 已完成 | `get_all_enabled_units`、`get_equipment_by_unit_id` | | `src/handler/control.rs` | ✅ 已完成 | `start_auto`、`stop_auto`、`batch_start_auto`、`batch_stop_auto`、`ack_fault`、`get_unit_runtime`;每次状态变更后调用 `notify_unit` | | `src/main.rs` | ✅ 已完成 | 上述端点的路由注册 | | `web/js/state.js` | ✅ 已完成 | `runtimes: new Map()` | | `web/js/units.js` | ✅ 已完成 | 运行时状态徽章、Auto Start/Stop、Ack Fault;显示 `display_acc_sec` | | `web/js/ops.js` | ✅ 已完成 | 运维面板单元卡片显示运行时徽章与 `display_acc_sec` | | `web/js/app.js` | ✅ 已完成 | 处理 `UnitRuntimeChanged` WS 消息 | | `web/styles.css` | ✅ 已完成 | `.event-card { flex-shrink: 0 }` 防止 flex 列表中文字重叠 | --- ## UnitRuntime 结构体(当前) ```rust // src/control/runtime.rs #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct UnitRuntime { pub unit_id: Uuid, pub state: UnitRuntimeState, pub auto_enabled: bool, pub accumulated_run_sec: i64, // 内部累加器(毫秒),不直接用于显示 pub display_acc_sec: i64, // 状态转换时的快照,前端展示用此字段 pub fault_locked: bool, pub flt_active: bool, pub comm_locked: bool, pub manual_ack_required: bool, } // 注意:elapsed 字段(current_run_elapsed_sec、current_stop_elapsed_sec、 // distributor_run_elapsed_sec、last_tick_at)已在事件驱动重构中移除。 // 计时完全由单元任务内部的 tokio::time::sleep_until 管理,请勿重新添加。 ``` `ControlRuntimeStore` 额外包含: ```rust notifiers: Arc>>>, // 方法: pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc pub async fn notify_unit(&self, unit_id: Uuid) // 每次状态变更后调用 ``` --- ## 引擎架构(事件驱动,2026-03-26) ``` start() └─ supervise() —— 10秒间隔,为每个启用单元启动 unit_task unit_task(unit_id) ├─ load_equipment_maps —— 任务启动时加载一次(缓存至任务生命周期结束) ├─ fault_tick —— 500ms 间隔,在 wait_phase 内部使用 └─ loop: ├─ 重新加载单元配置(检查是否仍启用) ├─ check_fault_comm → 有变化则推送 WS ├─ 若 !auto || fault || comm → select!(fault_tick | notify),continue └─ 按状态分支: Stopped → wait_phase(stop_time_sec) → 启动给煤机 → 状态=Running → 推送 WS Running → wait_phase(run_time_sec) → 停止给煤机 → acc += run_time_sec → 若 acc >= acc_time_sec:启动布料机,状态=DistributorRunning → 否则:状态=Stopped → 推送 WS DistributorRunning → wait_phase(bl_time_sec) → 停止布料机 → acc=0 → 状态=Stopped → 推送 WS FaultLocked|CommLocked → select!(fault_tick | notify) wait_phase(secs): deadline = now + secs loop: select! { sleep_until(deadline) => 返回 true(阶段正常完成) fault_tick.tick() => 重新检查故障/通信;若中断返回 false notify.notified() => 重新检查故障/通信;若中断返回 false } ``` **关键不变量:** - `accumulated_run_sec` 每个完成周期**精确**增加 `run_time_sec * 1000`(无 delta 漂移)。 - `display_acc_sec` 仅在 Running→Stopped 或 Running→DistributorRunning 转换时从 `accumulated_run_sec` 复制快照,前端始终读取 `display_acc_sec`。 - WS **仅在状态变化时**推送,无周期性推送。 - `unit.state_changed` 事件仅用于日志记录,**不写入**数据库事件表(频率过高)。 --- ## 任务一:扩展 UnitRuntime ✅ 已完成 **文件:** `src/control/runtime.rs` 字段如上方"UnitRuntime 结构体"所示。`ControlRuntimeStore` 包含 `notifiers` 映射,提供 `get_or_create_notify` 和 `notify_unit` 方法。 --- ## 任务二:创建共享脉冲指令辅助函数 ✅ 已完成 **文件:** `src/control/command.rs`、`src/control/mod.rs`、`src/handler/control.rs` `send_pulse_command(connection_manager, point_id, value_type, pulse_ms)` 写入高→延迟→低电平序列。 `simulate_run_feedback(state, eq_id, running)` 在模拟模式下写入虚拟运行反馈值(用于无真实 OPC UA 设备时的调试)。 --- ## 任务三:在 validator.rs 添加运行时状态检查 ✅ 已完成 **文件:** `src/control/validator.rs` 在 `validate_manual_control` 的现有 REM/FLT/quality 检查之后添加: ```rust if let Some(unit_id) = equipment.unit_id { if let Some(runtime) = state.control_runtime.get(unit_id).await { if runtime.auto_enabled { return Err(ApiErr::Forbidden("自动控制已激活,请先停止自动控制", ...)); } if runtime.comm_locked { return Err(ApiErr::Forbidden("单元通信已锁定", ...)); } if runtime.fault_locked { return Err(ApiErr::Forbidden("单元处于故障锁定状态", ...)); } } } ``` --- ## 任务四:扩展 AppEvent 业务事件 ✅ 已完成 **文件:** `src/event.rs` 新增 7 个变体: ```rust AutoControlStarted { unit_id: Uuid }, AutoControlStopped { unit_id: Uuid }, FaultLocked { unit_id: Uuid, equipment_id: Uuid }, FaultAcked { unit_id: Uuid }, CommLocked { unit_id: Uuid }, CommRecovered { unit_id: Uuid }, UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String }, ``` **`persist_event_if_needed` 映射:** | 变体 | 写库? | event_type | |------|--------|-----------| | `AutoControlStarted` | ✅ | `unit.auto_control_started` | | `AutoControlStopped` | ✅ | `unit.auto_control_stopped` | | `FaultLocked` | ✅ | `unit.fault_locked`(level: error)| | `FaultAcked` | ✅ | `unit.fault_acked` | | `CommLocked` | ✅ | `unit.comm_locked`(level: warn)| | `CommRecovered` | ✅ | `unit.comm_recovered` | | `UnitStateChanged` | ❌ | —(频率过高,每周期触发)| --- ## 任务五:添加 WsMessage::UnitRuntimeChanged ✅ 已完成 **文件:** `src/websocket.rs` ```rust UnitRuntimeChanged(crate::control::runtime::UnitRuntime), ``` --- ## 任务六:添加 service 辅助函数 ✅ 已完成 **文件:** `src/service/control.rs` ```rust pub async fn get_all_enabled_units(pool: &PgPool) -> Result, sqlx::Error> pub async fn get_equipment_by_unit_id(pool: &PgPool, unit_id: Uuid) -> Result, sqlx::Error> ``` --- ## 任务七:实现 control/engine.rs ✅ 已完成(事件驱动,2026-03-26) **文件:** `src/control/engine.rs` 完整设计参见上方"引擎架构"章节。 **后续修改的关键规则:** - 除状态转换或故障/通信变化外,不得推送 `WsMessage::UnitRuntimeChanged`。 - `wait_phase` 必须使用 `sleep_until(deadline)` 而非 `sleep(duration)`——deadline 在阶段开始时固定,故障 tick 重检不会重置计时器。 - 在 `wait_phase` 内处理 `notify.notified()` 时,必须从 store 重新读取运行时(handler 可能已修改 `auto_enabled`)。 - 设备映射在每次任务调用时加载一次;若设备配置变更,监督器将在下次扫描(≤10秒)时重启该任务。 --- ## 任务八:新增 API 端点 ✅ 已完成 **文件:** `src/handler/control.rs`、`src/main.rs` | 方法 | 路径 | Handler | |------|------|---------| | POST | `/api/unit/:id/start-auto` | `start_auto_unit` | | POST | `/api/unit/:id/stop-auto` | `stop_auto_unit` | | POST | `/api/unit/:id/ack-fault` | `ack_fault_unit` | | POST | `/api/unit/batch-start-auto` | `batch_start_auto` | | POST | `/api/unit/batch-stop-auto` | `batch_stop_auto` | | GET | `/api/unit/:id/runtime` | `get_unit_runtime` | **Notify 规约:** 每个修改 `auto_enabled` 或 `fault_locked` 的 handler,在 upsert 运行时后**必须**调用 `state.control_runtime.notify_unit(unit_id).await`,以立即唤醒休眠中的单元任务。 ```rust // 每个 auto/fault handler 必须遵循此模式: state.control_runtime.upsert(runtime).await; state.control_runtime.notify_unit(unit_id).await; // ← 不可省略 let _ = state.event_manager.send(AppEvent::...); ``` --- ## 任务九:前端运行时集成 ✅ 已完成 **文件:** `web/js/state.js`、`web/js/units.js`、`web/js/ops.js`、`web/js/app.js` **app.js 中的 WS 处理器:** ```js case "UnitRuntimeChanged": state.runtimes.set(payload.data.unit_id, payload.data); renderUnits(); // 重新渲染单元卡片(更新徽章和按钮) renderOpsUnits(); // 更新运维视图单元列表 syncEquipmentButtonsForUnit(runtime.unit_id, runtime.auto_enabled); break; ``` **显示规则:** 前端始终使用 `runtime.display_acc_sec` 显示累计时间,不使用 `runtime.accumulated_run_sec`。 ```js // ✅ 正确 `Acc ${Math.floor(runtime.display_acc_sec / 1000)}s` // ❌ 错误——会显示周期中途的抖动值 `Acc ${Math.floor(runtime.accumulated_run_sec / 1000)}s` ``` **事件列表 CSS:** `.event-card` 必须设置 `flex-shrink: 0`(在 `web/styles.css` 中),防止 flex 列高度压缩导致文字重叠。 --- ## 任务十:将引擎接入 AppState ✅ 已完成 **文件:** `src/main.rs` ```rust let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); // ... 构建 AppState ... control::engine::start(state.clone(), control_runtime); ```