Compare commits

..

No commits in common. "45b2317ee8694af06bacaa9376eb75db1f6aec64" and "13c4b515d7233e7551e3579da005849ba12e35cf" have entirely different histories.

30 changed files with 1370 additions and 2597 deletions

736
API.md

File diff suppressed because it is too large Load Diff

View File

@ -1,272 +0,0 @@
# 控制引擎实现计划
> **适用于代理执行:** 必须使用 superpowers:subagent-driven-development推荐或 superpowers:executing-plans 逐任务执行。步骤使用复选框(`- [ ]`)语法跟踪进度。
**目标:** 实现投煤器 / 布料机单元的自动控制引擎,包括状态机、故障/通信保护、运行时 API 及前端控制面板。
**架构:** 引擎为每个已启用的单元各启动一个异步任务由10秒扫描器监督。每个任务通过 `tokio::time::sleep_until` 控制阶段计时,通过 `tokio::sync::Notify` 在外部状态变化时(自动启停、故障确认)立即唤醒。状态保存在 `ControlRuntimeStore`(内存中,不持久化)。前端通过 `WsMessage::UnitRuntimeChanged` 实时接收更新——**仅在状态转换时推送**,不做周期性推送。
**技术栈:** Rust/Axum 后端、sqlx/PostgreSQL、tokio 异步、Vanilla JS ES 模块前端。
---
## 文件清单
| 文件 | 操作 | 职责 |
|------|------|------|
| `src/control/runtime.rs` | ✅ 已完成 | `UnitRuntime` 结构体 + `ControlRuntimeStore`(含 `Notify` |
| `src/control/command.rs` | ✅ 已完成 | 共享 `send_pulse_command()``simulate_run_feedback()` |
| `src/control/engine.rs` | ✅ 已完成 | 监督器 + 单元异步任务 + `wait_phase` |
| `src/control/validator.rs` | ✅ 已完成 | 故障/通信锁定时阻断手动指令 |
| `src/control/mod.rs` | ✅ 已完成 | 导出 `command`、`engine`、`runtime`、`validator` |
| `src/event.rs` | ✅ 已完成 | 7个 `AppEvent` 变体;`UnitStateChanged` 触发但**不持久化到数据库** |
| `src/websocket.rs` | ✅ 已完成 | `WsMessage::UnitRuntimeChanged` |
| `src/service/control.rs` | ✅ 已完成 | `get_all_enabled_units`、`get_equipment_by_unit_id` |
| `src/handler/control.rs` | ✅ 已完成 | `start_auto`、`stop_auto`、`batch_start_auto`、`batch_stop_auto`、`ack_fault`、`get_unit_runtime`;每次状态变更后调用 `notify_unit` |
| `src/main.rs` | ✅ 已完成 | 上述端点的路由注册 |
| `web/js/state.js` | ✅ 已完成 | `runtimes: new Map()` |
| `web/js/units.js` | ✅ 已完成 | 运行时状态徽章、Auto Start/Stop、Ack Fault显示 `display_acc_sec` |
| `web/js/ops.js` | ✅ 已完成 | 运维面板单元卡片显示运行时徽章与 `display_acc_sec` |
| `web/js/app.js` | ✅ 已完成 | 处理 `UnitRuntimeChanged` WS 消息 |
| `web/styles.css` | ✅ 已完成 | `.event-card { flex-shrink: 0 }` 防止 flex 列表中文字重叠 |
---
## UnitRuntime 结构体(当前)
```rust
// src/control/runtime.rs
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UnitRuntime {
pub unit_id: Uuid,
pub state: UnitRuntimeState,
pub auto_enabled: bool,
pub accumulated_run_sec: i64, // 内部累加器(毫秒),不直接用于显示
pub display_acc_sec: i64, // 状态转换时的快照,前端展示用此字段
pub fault_locked: bool,
pub flt_active: bool,
pub comm_locked: bool,
pub manual_ack_required: bool,
}
// 注意elapsed 字段current_run_elapsed_sec、current_stop_elapsed_sec、
// distributor_run_elapsed_sec、last_tick_at已在事件驱动重构中移除。
// 计时完全由单元任务内部的 tokio::time::sleep_until 管理,请勿重新添加。
```
`ControlRuntimeStore` 额外包含:
```rust
notifiers: Arc<RwLock<HashMap<Uuid, Arc<Notify>>>>,
// 方法:
pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc<Notify>
pub async fn notify_unit(&self, unit_id: Uuid) // 每次状态变更后调用
```
---
## 引擎架构事件驱动2026-03-26
```
start()
└─ supervise() —— 10秒间隔为每个启用单元启动 unit_task
unit_task(unit_id)
├─ load_equipment_maps —— 任务启动时加载一次(缓存至任务生命周期结束)
├─ fault_tick —— 500ms 间隔,在 wait_phase 内部使用
└─ loop:
├─ 重新加载单元配置(检查是否仍启用)
├─ check_fault_comm → 有变化则推送 WS
├─ 若 !auto || fault || comm → select!(fault_tick | notify)continue
└─ 按状态分支:
Stopped → wait_phase(stop_time_sec) → 启动给煤机 → 状态=Running → 推送 WS
Running → wait_phase(run_time_sec) → 停止给煤机 → acc += run_time_sec
→ 若 acc >= acc_time_sec启动布料机状态=DistributorRunning
→ 否则:状态=Stopped → 推送 WS
DistributorRunning → wait_phase(bl_time_sec) → 停止布料机 → acc=0 → 状态=Stopped → 推送 WS
FaultLocked|CommLocked → select!(fault_tick | notify)
wait_phase(secs):
deadline = now + secs
loop:
select! { sleep_until(deadline) => 返回 true阶段正常完成
fault_tick.tick() => 重新检查故障/通信;若中断返回 false
notify.notified() => 重新检查故障/通信;若中断返回 false }
```
**关键不变量:**
- `accumulated_run_sec` 每个完成周期**精确**增加 `run_time_sec * 1000`(无 delta 漂移)。
- `display_acc_sec` 仅在 Running→Stopped 或 Running→DistributorRunning 转换时从 `accumulated_run_sec` 复制快照,前端始终读取 `display_acc_sec`
- WS **仅在状态变化时**推送,无周期性推送。
- `unit.state_changed` 事件仅用于日志记录,**不写入**数据库事件表(频率过高)。
---
## 任务一:扩展 UnitRuntime ✅ 已完成
**文件:** `src/control/runtime.rs`
字段如上方"UnitRuntime 结构体"所示。`ControlRuntimeStore` 包含 `notifiers` 映射,提供 `get_or_create_notify``notify_unit` 方法。
---
## 任务二:创建共享脉冲指令辅助函数 ✅ 已完成
**文件:** `src/control/command.rs`、`src/control/mod.rs`、`src/handler/control.rs`
`send_pulse_command(connection_manager, point_id, value_type, pulse_ms)` 写入高→延迟→低电平序列。
`simulate_run_feedback(state, eq_id, running)` 在模拟模式下写入虚拟运行反馈值(用于无真实 OPC UA 设备时的调试)。
---
## 任务三:在 validator.rs 添加运行时状态检查 ✅ 已完成
**文件:** `src/control/validator.rs`
`validate_manual_control` 的现有 REM/FLT/quality 检查之后添加:
```rust
if let Some(unit_id) = equipment.unit_id {
if let Some(runtime) = state.control_runtime.get(unit_id).await {
if runtime.auto_enabled {
return Err(ApiErr::Forbidden("自动控制已激活,请先停止自动控制", ...));
}
if runtime.comm_locked {
return Err(ApiErr::Forbidden("单元通信已锁定", ...));
}
if runtime.fault_locked {
return Err(ApiErr::Forbidden("单元处于故障锁定状态", ...));
}
}
}
```
---
## 任务四:扩展 AppEvent 业务事件 ✅ 已完成
**文件:** `src/event.rs`
新增 7 个变体:
```rust
AutoControlStarted { unit_id: Uuid },
AutoControlStopped { unit_id: Uuid },
FaultLocked { unit_id: Uuid, equipment_id: Uuid },
FaultAcked { unit_id: Uuid },
CommLocked { unit_id: Uuid },
CommRecovered { unit_id: Uuid },
UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String },
```
**`persist_event_if_needed` 映射:**
| 变体 | 写库? | event_type |
|------|--------|-----------|
| `AutoControlStarted` | ✅ | `unit.auto_control_started` |
| `AutoControlStopped` | ✅ | `unit.auto_control_stopped` |
| `FaultLocked` | ✅ | `unit.fault_locked`level: error|
| `FaultAcked` | ✅ | `unit.fault_acked` |
| `CommLocked` | ✅ | `unit.comm_locked`level: warn|
| `CommRecovered` | ✅ | `unit.comm_recovered` |
| `UnitStateChanged` | ❌ | —(频率过高,每周期触发)|
---
## 任务五:添加 WsMessage::UnitRuntimeChanged ✅ 已完成
**文件:** `src/websocket.rs`
```rust
UnitRuntimeChanged(crate::control::runtime::UnitRuntime),
```
---
## 任务六:添加 service 辅助函数 ✅ 已完成
**文件:** `src/service/control.rs`
```rust
pub async fn get_all_enabled_units(pool: &PgPool) -> Result<Vec<ControlUnit>, sqlx::Error>
pub async fn get_equipment_by_unit_id(pool: &PgPool, unit_id: Uuid) -> Result<Vec<Equipment>, sqlx::Error>
```
---
## 任务七:实现 control/engine.rs ✅ 已完成事件驱动2026-03-26
**文件:** `src/control/engine.rs`
完整设计参见上方"引擎架构"章节。
**后续修改的关键规则:**
- 除状态转换或故障/通信变化外,不得推送 `WsMessage::UnitRuntimeChanged`
- `wait_phase` 必须使用 `sleep_until(deadline)` 而非 `sleep(duration)`——deadline 在阶段开始时固定,故障 tick 重检不会重置计时器。
- 在 `wait_phase` 内处理 `notify.notified()` 时,必须从 store 重新读取运行时handler 可能已修改 `auto_enabled`)。
- 设备映射在每次任务调用时加载一次若设备配置变更监督器将在下次扫描≤10秒时重启该任务。
---
## 任务八:新增 API 端点 ✅ 已完成
**文件:** `src/handler/control.rs`、`src/main.rs`
| 方法 | 路径 | Handler |
|------|------|---------|
| POST | `/api/unit/:id/start-auto` | `start_auto_unit` |
| POST | `/api/unit/:id/stop-auto` | `stop_auto_unit` |
| POST | `/api/unit/:id/ack-fault` | `ack_fault_unit` |
| POST | `/api/unit/batch-start-auto` | `batch_start_auto` |
| POST | `/api/unit/batch-stop-auto` | `batch_stop_auto` |
| GET | `/api/unit/:id/runtime` | `get_unit_runtime` |
**Notify 规约:** 每个修改 `auto_enabled``fault_locked` 的 handler在 upsert 运行时后**必须**调用 `state.control_runtime.notify_unit(unit_id).await`,以立即唤醒休眠中的单元任务。
```rust
// 每个 auto/fault handler 必须遵循此模式:
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await; // ← 不可省略
let _ = state.event_manager.send(AppEvent::...);
```
---
## 任务九:前端运行时集成 ✅ 已完成
**文件:** `web/js/state.js`、`web/js/units.js`、`web/js/ops.js`、`web/js/app.js`
**app.js 中的 WS 处理器:**
```js
case "UnitRuntimeChanged":
state.runtimes.set(payload.data.unit_id, payload.data);
renderUnits(); // 重新渲染单元卡片(更新徽章和按钮)
renderOpsUnits(); // 更新运维视图单元列表
syncEquipmentButtonsForUnit(runtime.unit_id, runtime.auto_enabled);
break;
```
**显示规则:** 前端始终使用 `runtime.display_acc_sec` 显示累计时间,不使用 `runtime.accumulated_run_sec`
```js
// ✅ 正确
`Acc ${Math.floor(runtime.display_acc_sec / 1000)}s`
// ❌ 错误——会显示周期中途的抖动值
`Acc ${Math.floor(runtime.accumulated_run_sec / 1000)}s`
```
**事件列表 CSS** `.event-card` 必须设置 `flex-shrink: 0`(在 `web/styles.css` 中),防止 flex 列高度压缩导致文字重叠。
---
## 任务十:将引擎接入 AppState ✅ 已完成
**文件:** `src/main.rs`
```rust
let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new());
// ... 构建 AppState ...
control::engine::start(state.clone(), control_runtime);
```

View File

@ -1,358 +0,0 @@
# 双视图 Web UI 实现计划
> **适用于代理执行:** 必须使用 superpowers:subagent-driven-development推荐或 superpowers:executing-plans 逐任务执行。步骤使用复选框(`- [ ]`)语法跟踪进度。
**目标:** 在顶部添加 **运维视图****配置视图** 两个标签页切换。运维视图以设备为核心,展示实时信号点状态(彩色信号点)及底部系统事件面板;配置视图在原有布局基础上,将底部中间面板替换为实时 SSE 日志流。
**架构:** `<main>` 元素通过 CSS 类名(`grid-ops` / `grid-config`)控制面板显示。新建 `ops.js` 模块负责运维视图:加载所有单元的设备详情并渲染设备卡片,每张卡片包含 REM/RUN/FLT 三个信号点(彩色小圆点),卡片中的 DOM 元素注册到 `state.opsPointEls``Map<point_id, { dotEl }>`WebSocket 处理器通过 `sigDotClass()` 实时更新信号点颜色。SSE 日志流(`/api/logs/stream`)仅在配置视图中启动,切换标签时启停。
**技术栈:** Vanilla JS ES 模块、CSS Grid、SSE`EventSource`)、现有 WebSocket 基础设施、`/api/unit/{id}/detail` 端点。
---
## 当前布局(参考)
```
grid3列 × 2行
左上 → equipment-panel.html 第1列第1行
右上 → points-panel.html 第2-3列第1行
左下 → source-panel.html 第1列第2行—— 单元 + 数据源
中下 → logs-panel.html 第2列第2行—— 系统事件
右下 → chart-panel.html 第3列第2行
```
## 目标布局
```
grid-config与原布局一致
左上 → equipment-panel 第1列第1行
右上 → points-panel 第2-3列第1行
左下 → source-panel 第1列第2行
中下 → log-stream-panel【新建】 第2列第2行—— SSE 日志
右下 → chart-panel 第3列第2行
grid-ops新布局
上方 → ops-panel【新建】 第1-2列第1行—— 单元侧栏 + 设备卡片
下方 → logs-panel 第1-2列第2行—— 系统事件(全宽)
```
## 文件清单
| 文件 | 操作 | 用途 |
|---|---|---|
| `web/html/topbar.html` | 修改 | 添加 `#tabOps` / `#tabConfig` 标签按钮及批量自动按钮 |
| `web/html/ops-panel.html` | **新建** | 运维视图:`#opsUnitList` 侧栏 + `#opsEquipmentArea` 设备卡片区 |
| `web/html/log-stream-panel.html` | **新建** | 配置视图底部中间SSE 日志流(`#logView`|
| `web/index.html` | 修改 | 引入新 partial、更新版本号 |
| `web/js/ops.js` | **新建** | 加载设备详情、渲染设备卡片、`sigDotClass()`、`syncEquipmentButtonsForUnit()` |
| `web/js/state.js` | 修改 | 添加 `activeView`、`opsPointEls`、`logSource`、`selectedOpsUnitId` |
| `web/js/dom.js` | 修改 | 添加引用:`tabOps`、`tabConfig`、`batchStartAutoBtn`、`batchStopAutoBtn`、`opsUnitList`、`opsEquipmentArea`、`logView` |
| `web/js/logs.js` | 修改 | 添加 `startLogs` / `stopLogs`;在 WS 处理器中更新 `opsPointEls` 信号点 |
| `web/js/app.js` | 修改 | 标签切换逻辑、监听 `units-loaded` 事件、启动 ops 视图 |
| `web/styles.css` | 修改 | 标签样式、`grid-ops`、`grid-config`、设备卡片与信号点样式 |
---
## 任务一:标签脚手架 + CSS 布局切换 ✅ 已完成
**涉及文件:**
- 修改:`web/html/topbar.html`
- 修改:`web/index.html`
- 修改:`web/js/state.js`
- 修改:`web/js/dom.js`
- 修改:`web/js/app.js`
- 修改:`web/styles.css`
- [x] **步骤 1在顶栏添加标签按钮**
`web/html/topbar.html` 中添加 `.tab-bar`(含 `#tabOps` / `#tabConfig`)及批量自动控制按钮(`#batchStartAutoBtn` / `#batchStopAutoBtn`)。
- [x] **步骤 2`web/styles.css` 添加标签与网格 CSS**
添加 `.tab-bar`、`.tab-btn`、`.tab-btn.active` 样式;将原有 `.grid` 替换为 `.grid-ops``.grid-config`,分别定义列、行及面板 `grid-column/row` 赋值。
- [x] **步骤 3`web/js/state.js` 添加新字段**
```js
activeView: "ops", // "ops" | "config"
opsPointEls: new Map(), // point_id -> { dotEl }
logSource: null,
selectedOpsUnitId: null,
```
- [x] **步骤 4`web/js/dom.js` 添加 DOM 引用**
```js
tabOps: byId("tabOps"),
tabConfig: byId("tabConfig"),
batchStartAutoBtn: byId("batchStartAutoBtn"),
batchStopAutoBtn: byId("batchStopAutoBtn"),
opsUnitList: byId("opsUnitList"),
opsEquipmentArea: byId("opsEquipmentArea"),
logView: byId("logView"),
```
- [x] **步骤 5`web/js/app.js` 中添加 `switchView` 函数并绑定事件**
```js
function switchView(view) {
state.activeView = view;
const main = document.querySelector("main");
main.className = view === "ops" ? "grid-ops" : "grid-config";
dom.tabOps.classList.toggle("active", view === "ops");
dom.tabConfig.classList.toggle("active", view === "config");
// 显示/隐藏配置视图专属面板top-left/top-right/bottom-left/bottom-right/bottom-mid
// 显示/隐藏运维视图专属面板ops-main/ops-bottom
if (view === "config") startLogs(); else stopLogs();
}
```
`bindEvents` 中添加:
```js
dom.tabOps.addEventListener("click", () => switchView("ops"));
dom.tabConfig.addEventListener("click", () => switchView("config"));
```
`bootstrap` 中调用:
```js
switchView("ops"); // 默认进入运维视图
```
- [x] **步骤 6更新 `web/index.html`**
`<main class="grid-ops">` 中引入所有 partial含新建的 ops-panel.html、log-stream-panel.html并更新 CSS/JS 版本号。
---
## 任务二:运维面板 HTML + CSS 骨架 ✅ 已完成
**涉及文件:**
- 新建:`web/html/ops-panel.html`
- 修改:`web/styles.css`
- [x] **步骤 1新建 `web/html/ops-panel.html`**
```html
<section class="panel ops-main">
<div class="ops-layout">
<aside class="ops-unit-sidebar">
<div class="panel-head"><h2>控制单元</h2></div>
<div class="list ops-unit-list" id="opsUnitList"></div>
</aside>
<div class="ops-equipment-area" id="opsEquipmentArea">
<div class="muted ops-placeholder">← 选择控制单元</div>
</div>
</div>
</section>
```
`web/html/logs-panel.html` 增加 `ops-bottom` class使其在运维视图中作为底部全宽面板。
- [x] **步骤 2`web/styles.css` 添加运维视图 CSS**
`.ops-layout`flex 横向)、`.ops-unit-sidebar`(固定宽度)、`.ops-unit-list`(可滚动)、`.ops-equipment-area`flex-wrap 卡片区)。
设备卡片相关类:`.ops-eq-card`、`.ops-eq-card-head`、`.ops-signal-rows`、`.ops-signal-row`、`.ops-signal-label`、`.ops-eq-card-actions`。
单元列表项相关类:`.ops-unit-item`、`.ops-unit-item-name`、`.ops-unit-item-meta`、`.ops-unit-item-actions`。
信号点相关类:`.sig-dot`(灰色默认)、`.sig-dot.sig-on`(绿色)、`.sig-dot.sig-fault`(红色)、`.sig-dot.sig-warn`(黄色)。
---
## 任务三ops.js —— 单元列表 + 设备卡片渲染 ✅ 已完成
**涉及文件:**
- 新建:`web/js/ops.js`
- 修改:`web/js/app.js`、`web/js/units.js`
### 实际实现说明
运维视图在**初始加载时一次性加载所有单元的所有设备卡片**`loadAllEquipmentCards`),而非等待点击单元后再加载。点击某个单元会过滤只展示该单元的设备;再次点击同一单元则取消过滤并恢复全部展示。
信号点使用**彩色小圆点**`sig-dot` 类)而非文字值+质量徽章。
#### 核心函数
**`sigDotClass(role, quality, valueText) → string`**(导出)
根据信号质量与值计算 CSS 类名:
- `quality !== "good"``"sig-dot sig-warn"`(黄色)
- 值为 `"1"` / `"true"` / `"on"`
- `role === "flt"``"sig-dot sig-fault"`(红色)
- 其他 → `"sig-dot sig-on"`(绿色)
- 其他 → `"sig-dot"`(灰色)
**`renderOpsUnits()`**(导出)
遍历 `state.units`,为每个单元渲染列表项,包含:
- 运行状态徽章(`runtimeBadge`)、启用/禁用徽章、累计时间
- "Start Auto" / "Stop Auto" 按钮(调用 `/api/control/unit/:id/start-auto``stop-auto`
- 若 `runtime.manual_ack_required` 为真,显示 "Ack Fault" 按钮
**`loadAllEquipmentCards()`**(导出)
并发请求所有单元的 `/api/unit/{id}/detail`,将全部设备合并后调用 `renderOpsEquipments()`
**`selectOpsUnit(unitId)`**(私有)
切换 `state.selectedOpsUnitId`。若取消选择,调用 `loadAllEquipmentCards()` 恢复全部展示;若选中某单元,加载该单元详情并渲染其设备。
**`renderOpsEquipments(equipments)`**(私有)
为每台设备渲染一张卡片,包含:
- 卡片头:设备编码 + 类型徽章
- 信号行REM / RUN / FLT 三个角色,每行一个 `<span class="sig-dot ...">` 元素(`data-ops-dot` + `data-ops-role` 属性)
- 控制按钮(仅 `coal_feeder` / `distributor`Start / Stop`auto_enabled` 时禁用
- 注册 DOM 元素:`state.opsPointEls.set(pointId, { dotEl })`
- 若缓存中有 `point.point_monitor`,立即根据缓存值初始化信号点颜色
**`startOps()`**(导出)
```js
export function startOps() {
renderOpsUnits();
loadAllEquipmentCards();
dom.batchStartAutoBtn?.addEventListener("click", () => {
apiFetch("/api/control/unit/batch-start-auto", { method: "POST" }).then(() => loadUnits()).catch(() => {});
});
dom.batchStopAutoBtn?.addEventListener("click", () => {
apiFetch("/api/control/unit/batch-stop-auto", { method: "POST" }).then(() => loadUnits()).catch(() => {});
});
}
```
**`syncEquipmentButtonsForUnit(unitId, autoEnabled)`**(导出)
WS 收到 `UnitRuntimeChanged` 时调用,同步设备卡片中 Start/Stop 按钮的 `disabled` 状态(避免重新渲染整个卡片区):
```js
export function syncEquipmentButtonsForUnit(unitId, autoEnabled) {
dom.opsEquipmentArea
.querySelectorAll(`.ops-eq-card-actions[data-unit-id="${unitId}"]`)
.forEach((actions) => {
actions.querySelectorAll("button").forEach((btn) => {
btn.disabled = autoEnabled;
btn.title = autoEnabled ? "自动控制运行中,请先停止自动" : "";
});
});
}
```
#### app.js 接入
```js
import { startOps, renderOpsUnits, loadAllEquipmentCards } from "./ops.js";
// bootstrap 中:
await withStatus(loadUnits());
startOps();
// 事件监听:
document.addEventListener("equipments-updated", () => {
renderUnits();
renderOpsUnits();
});
document.addEventListener("units-loaded", () => {
renderOpsUnits();
if (!state.selectedOpsUnitId) loadAllEquipmentCards();
});
```
---
## 任务四:运维卡片信号点实时更新 ✅ 已完成
**涉及文件:**
- 修改:`web/js/logs.js`
`startPointSocket` 的 WebSocket `PointNewValue` 分支中,添加运维视图信号点更新逻辑:
```js
// 运维视图信号点
const opsEntry = state.opsPointEls.get(data.point_id);
if (opsEntry) {
const { dotEl } = opsEntry;
const role = dotEl.dataset.opsRole;
import("./ops.js").then(({ sigDotClass }) => {
dotEl.className = sigDotClass(role, data.quality, data.value_text);
});
}
```
`UnitRuntimeChanged` 分支同步更新运维单元列表和设备按钮状态:
```js
if (payload.type === "UnitRuntimeChanged") {
const runtime = payload.data;
state.runtimes.set(runtime.unit_id, runtime);
renderUnits();
import("./ops.js").then(({ renderOpsUnits, syncEquipmentButtonsForUnit }) => {
renderOpsUnits();
syncEquipmentButtonsForUnit(runtime.unit_id, runtime.auto_enabled);
});
return;
}
```
> 注意:使用动态 `import("./ops.js")` 避免循环依赖(`ops.js` → `logs.js``ops.js`)。
---
## 任务五:配置视图的日志流面板 ✅ 已完成
**涉及文件:**
- 新建:`web/html/log-stream-panel.html`
- 修改:`web/js/logs.js`
- 修改:`web/js/dom.js`
- [x] **步骤 1新建 `web/html/log-stream-panel.html`**
```html
<section class="panel bottom-mid">
<div class="panel-head"><h2>实时日志</h2></div>
<div class="log" id="logView"></div>
</section>
```
- [x] **步骤 2`web/js/logs.js` 中实现 `startLogs` / `stopLogs`**
```js
export function startLogs() {
if (state.logSource) return;
state.logSource = new EventSource("/api/logs/stream");
state.logSource.addEventListener("log", (event) => {
const data = JSON.parse(event.data);
(data.lines || []).forEach(appendLog);
});
state.logSource.addEventListener("error", () => appendLog("[log stream error]"));
}
export function stopLogs() {
if (state.logSource) {
state.logSource.close();
state.logSource = null;
}
}
```
`startLogs()` 是幂等的(有 `if (state.logSource) return` 守卫),可安全重复调用。
---
## 任务六:收尾、清理与样式完善 ✅ 已完成
- [x] 补充日志面板 CSS`.log`、`.log-line`、`.level-info/warn/error`
- [x] `web/js/units.js``loadUnits()` 末尾派发 `units-loaded` 事件
- [x] 更新 `web/index.html` 版本号
- [x] 最终验证标签切换、信号点实时更新、Start/Stop 控制按钮、SSE 日志流
---
## 实现者注意事项
- `state.opsPointEls` 在每次重新渲染设备卡片时清空重建,不存在陈旧引用。
- `syncEquipmentButtonsForUnit` 仅更新按钮的 `disabled` 状态,避免每次运行时更新都重渲染整个卡片区。
- 运维视图默认展示**所有单元的所有设备**,点击单元后过滤;取消选择后恢复全部展示。
- 设备卡片头部 `data-unit-id` 属性供 `syncEquipmentButtonsForUnit` 精确定位按钮。
- 后端 `/api/unit/{id}/detail` 响应中 `point.point_monitor` 字段包含最新缓存值,可用于初始渲染信号点颜色,无需等待 WebSocket 推送。

View File

@ -1,6 +1,7 @@
use crate::{
connection::{BatchSetPointValueReq, ConnectionManager, SetPointValueReqItem},
telemetry::ValueType,
AppState,
};
use serde_json::json;
use std::sync::Arc;
@ -42,6 +43,143 @@ pub async fn send_pulse_command(
Ok(())
}
/// Simulate RUN signal feedback after a command when SIMULATE_PLC=true.
///
/// Strategy:
/// 1. Try writing the desired value to the RUN point via the normal OPC UA write path.
/// If the proxy accepts the write, `write_point_values_batch` will emit a local
/// `PointNewValue` event that updates the cache and WebSocket automatically.
/// 2. If the write is rejected (proxy has no write target or returns an error),
/// fall back to directly patching the local monitor cache and broadcasting over WS.
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
let role_points =
match crate::service::get_equipment_role_points(&state.pool, equipment_id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("simulate_run_feedback: db error: {}", e);
return;
}
};
let run_point = match role_points.iter().find(|p| p.signal_role == "run") {
Some(p) => p.clone(),
None => return,
};
// Determine the write value based on the current known value_type for the point.
let write_json = {
let guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard
.get(&run_point.point_id)
.and_then(|m| m.value_type.as_ref())
{
Some(crate::telemetry::ValueType::Int) | Some(crate::telemetry::ValueType::UInt) => {
serde_json::json!(if run_on { 1 } else { 0 })
}
_ => serde_json::json!(run_on),
}
};
// Try writing to the proxy server first.
let write_ok = match state
.connection_manager
.write_point_values_batch(crate::connection::BatchSetPointValueReq {
items: vec![crate::connection::SetPointValueReqItem {
point_id: run_point.point_id,
value: write_json,
}],
})
.await
{
Ok(res) => res.success,
Err(e) => {
tracing::debug!("simulate_run_feedback: write attempt failed: {}", e);
false
}
};
if write_ok {
// write_point_values_batch already emitted PointNewValue; nothing more to do.
tracing::info!(
"simulate_run_feedback: wrote run={} for equipment={} via OPC UA",
run_on,
equipment_id
);
return;
}
// Fallback: patch the local cache and push over WebSocket.
tracing::debug!(
"simulate_run_feedback: OPC UA write rejected, falling back to cache patch for equipment={}",
equipment_id
);
let (value, value_type, value_text) = {
let guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard
.get(&run_point.point_id)
.and_then(|m| m.value_type.as_ref())
{
Some(crate::telemetry::ValueType::Int) => (
crate::telemetry::DataValue::Int(if run_on { 1 } else { 0 }),
Some(crate::telemetry::ValueType::Int),
Some(if run_on { "1" } else { "0" }.to_string()),
),
Some(crate::telemetry::ValueType::UInt) => (
crate::telemetry::DataValue::UInt(if run_on { 1 } else { 0 }),
Some(crate::telemetry::ValueType::UInt),
Some(if run_on { "1" } else { "0" }.to_string()),
),
_ => (
crate::telemetry::DataValue::Bool(run_on),
Some(crate::telemetry::ValueType::Bool),
Some(run_on.to_string()),
),
}
};
let monitor = crate::telemetry::PointMonitorInfo {
protocol: "simulation".to_string(),
source_id: uuid::Uuid::nil(),
point_id: run_point.point_id,
client_handle: 0,
scan_mode: crate::model::ScanMode::Poll,
timestamp: Some(chrono::Utc::now()),
quality: crate::telemetry::PointQuality::Good,
value: Some(value),
value_type,
value_text,
old_value: None,
old_timestamp: None,
value_changed: true,
};
if let Err(e) = state
.connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::warn!("simulate_run_feedback: cache update failed: {}", e);
return;
}
let _ = state
.ws_manager
.send_to_public(crate::websocket::WsMessage::PointNewValue(monitor))
.await;
tracing::info!(
"simulate_run_feedback: cache-patched run={} for equipment={}",
run_on,
equipment_id
);
}
fn pulse_value(high: bool, value_type: Option<&ValueType>) -> serde_json::Value {
match value_type {
Some(ValueType::Bool) => serde_json::Value::Bool(high),

View File

@ -1,8 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::Duration;
use chrono::Utc;
use uuid::Uuid;
use crate::{
@ -12,416 +11,383 @@ use crate::{
},
event::AppEvent,
service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality},
telemetry::{DataValue, PointMonitorInfo, PointQuality},
websocket::WsMessage,
AppState,
};
/// Start the engine: a supervisor spawns one async task per enabled unit.
pub fn start(state: AppState, runtime_store: Arc<ControlRuntimeStore>) {
tokio::spawn(async move {
supervise(state, runtime_store).await;
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(500));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
tick_all_units(&state, &runtime_store).await;
}
});
}
/// Supervisor: scans for enabled units every 10 s and ensures each has a running task.
/// Uses JoinHandle to detect exited tasks so disabled-then-re-enabled units are restarted.
async fn supervise(state: AppState, store: Arc<ControlRuntimeStore>) {
let mut tasks: HashMap<Uuid, tokio::task::JoinHandle<()>> = HashMap::new();
let mut interval = tokio::time::interval(Duration::from_secs(10));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
match crate::service::get_all_enabled_units(&state.pool).await {
Ok(units) => {
for unit in units {
let needs_spawn = tasks
.get(&unit.id)
.map_or(true, |h| h.is_finished());
if needs_spawn {
let s = state.clone();
let st = store.clone();
let handle = tokio::spawn(async move { unit_task(s, st, unit.id).await; });
tasks.insert(unit.id, handle);
}
}
}
Err(e) => tracing::error!("Engine supervisor: failed to load units: {}", e),
async fn tick_all_units(state: &AppState, store: &ControlRuntimeStore) {
let units = match crate::service::get_all_enabled_units(&state.pool).await {
Ok(u) => u,
Err(e) => {
tracing::error!("Engine: failed to load units: {}", e);
return;
}
};
for unit in units {
tick_unit(state, store, &unit).await;
}
}
// ── Per-unit task ─────────────────────────────────────────────────────────────
async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uuid) {
let notify = store.get_or_create_notify(unit_id).await;
// Fault/comm check ticker — still need periodic polling of point monitor data.
let mut fault_tick = tokio::time::interval(Duration::from_millis(500));
fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
// Reload unit config on each iteration to detect disable/delete.
let unit = match crate::service::get_unit_by_id(&state.pool, unit_id).await {
Ok(Some(u)) if u.enabled => u,
Ok(_) => {
tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id);
return;
}
Err(e) => {
tracing::error!("Engine: unit {} config reload failed: {}", unit_id, e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};
// ── Fault / comm check ────────────────────────────────────────────────
let (kind_roles, kind_eq_ids, all_roles) = match load_equipment_maps(&state, unit_id).await {
Ok(maps) => maps,
Err(e) => {
tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let mut runtime = store.get_or_init(unit_id).await;
if check_fault_comm(&state, &mut runtime, &unit, &all_roles).await {
store.upsert(runtime.clone()).await;
push_ws(&state, &runtime).await;
}
// ── Wait when not active ──────────────────────────────────────────────
if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required {
tokio::select! {
_ = fault_tick.tick() => {}
_ = notify.notified() => {
// Push fresh runtime immediately so the frontend reflects the change
// (e.g. auto_enabled toggled) without waiting for the next state transition.
let runtime = store.get_or_init(unit_id).await;
push_ws(&state, &runtime).await;
}
}
continue;
}
// ── State machine step ────────────────────────────────────────────────
match runtime.state {
UnitRuntimeState::Stopped => {
// Wait stop_time_sec (0 = skip wait, start immediately).
if !wait_phase(&state, &store, &unit, &all_roles, &notify, &mut fault_tick).await {
continue;
}
// Send feeder start command.
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e);
continue;
}
if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await;
}
}
}
let mut runtime = store.get_or_init(unit_id).await;
runtime.state = UnitRuntimeState::Running;
store.upsert(runtime.clone()).await;
push_ws(&state, &runtime).await;
}
UnitRuntimeState::Running => {
// Wait run_time_sec. run_time_sec == 0 means run without a time limit
// (relies on acc_time_sec to eventually stop). Treat as a very long phase.
let secs = if unit.run_time_sec > 0 { unit.run_time_sec } else { i32::MAX };
let unit_for_wait = crate::model::ControlUnit { run_time_sec: secs, ..unit.clone() };
if !wait_phase(&state, &store, &unit_for_wait, &all_roles, &notify, &mut fault_tick).await {
continue;
}
// Stop feeder.
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e);
continue;
}
if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await;
}
}
}
let mut runtime = store.get_or_init(unit_id).await;
runtime.accumulated_run_sec += secs as i64 * 1000;
runtime.display_acc_sec = runtime.accumulated_run_sec;
if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 {
// Accumulated threshold reached — start distributor.
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = dist_cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e);
} else if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, true).await;
}
}
}
runtime.state = UnitRuntimeState::DistributorRunning;
} else {
runtime.state = UnitRuntimeState::Stopped;
}
store.upsert(runtime.clone()).await;
push_ws(&state, &runtime).await;
}
UnitRuntimeState::DistributorRunning => {
// Wait bl_time_sec then stop distributor.
if !wait_phase(&state, &store, &unit, &all_roles, &notify, &mut fault_tick).await {
continue;
}
let monitor = state.connection_manager.get_point_monitor_data_read_guard().await;
let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor));
drop(monitor);
if let Some((pid, vt)) = cmd {
if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await {
tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e);
continue;
}
if state.config.simulate_plc {
if let Some(eq_id) = kind_eq_ids.get("distributor").copied() {
crate::control::simulate::simulate_run_feedback(&state, eq_id, false).await;
}
}
}
let mut runtime = store.get_or_init(unit_id).await;
runtime.accumulated_run_sec = 0;
runtime.display_acc_sec = 0;
runtime.state = UnitRuntimeState::Stopped;
store.upsert(runtime.clone()).await;
push_ws(&state, &runtime).await;
}
UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => {
tokio::select! {
_ = fault_tick.tick() => {}
_ = notify.notified() => {}
}
}
}
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
/// Sleep for the duration appropriate to the *current* state, interrupting every
/// 500 ms to re-check fault/comm. Returns `true` when the full time elapsed,
/// `false` if the phase was interrupted (auto disabled, fault, or comm lock).
async fn wait_phase(
async fn tick_unit(
state: &AppState,
store: &ControlRuntimeStore,
unit: &crate::model::ControlUnit,
all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)],
notify: &Arc<Notify>,
fault_tick: &mut tokio::time::Interval,
) -> bool {
let secs = match store.get_or_init(unit.id).await.state {
UnitRuntimeState::Stopped => unit.stop_time_sec,
UnitRuntimeState::Running => unit.run_time_sec,
UnitRuntimeState::DistributorRunning => unit.bl_time_sec,
_ => return false,
};
if secs <= 0 {
return true;
}
let deadline = tokio::time::Instant::now() + Duration::from_secs(secs as u64);
loop {
let completed = tokio::select! {
_ = tokio::time::sleep_until(deadline) => true,
_ = fault_tick.tick() => false,
_ = notify.notified() => false,
};
if completed {
return true;
}
// Re-check fault/comm mid-phase.
let mut runtime = store.get_or_init(unit.id).await;
if check_fault_comm(state, &mut runtime, unit, all_roles).await {
store.upsert(runtime.clone()).await;
push_ws(state, &runtime).await;
}
if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required {
return false;
}
}
}
) {
let mut runtime = store.get_or_init(unit.id).await;
async fn push_ws(state: &AppState, runtime: &UnitRuntime) {
// ── Load equipment role-point maps by kind ───────────────
let equipment_list = match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await {
Ok(e) => e,
Err(e) => {
tracing::error!(
"Engine: equipment load failed for unit {}: {}",
unit.id,
e
);
return;
}
};
// kind -> role -> EquipmentRolePoint (first equipment per kind wins)
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
// kind -> equipment id (first equipment per kind)
let mut kind_eq_ids: HashMap<String, Uuid> = HashMap::new();
// all role maps for fault/comm scanning across all equipment
let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new();
for equip in &equipment_list {
match crate::service::get_equipment_role_points(&state.pool, equip.id).await {
Ok(role_points) => {
let role_map: HashMap<String, EquipmentRolePoint> = role_points
.into_iter()
.map(|rp| (rp.signal_role.clone(), rp))
.collect();
if let Some(kind) = &equip.kind {
if kind_roles.contains_key(kind.as_str()) {
tracing::warn!(
"Engine: unit {} has multiple {} equipment; using first",
unit.id,
kind
);
} else {
kind_roles.insert(kind.clone(), role_map.clone());
kind_eq_ids.insert(kind.clone(), equip.id);
}
}
all_roles.push((equip.id, role_map));
}
Err(e) => {
tracing::warn!(
"Engine: role points load failed for equipment {}: {}",
equip.id,
e
);
}
}
}
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
// ── Communication check ──────────────────────────────────
let any_bad_quality = all_roles.iter().flat_map(|(_, r)| r.values()).any(|rp| {
monitor_guard
.get(&rp.point_id)
.map(|m| m.quality != PointQuality::Good)
.unwrap_or(false)
});
let prev_comm = runtime.comm_locked;
runtime.comm_locked = any_bad_quality;
if !prev_comm && runtime.comm_locked {
let _ = state
.event_manager
.send(AppEvent::CommLocked { unit_id: unit.id });
} else if prev_comm && !runtime.comm_locked {
let _ = state
.event_manager
.send(AppEvent::CommRecovered { unit_id: unit.id });
}
// ── Fault check ──────────────────────────────────────────
let any_flt = all_roles.iter().any(|(_, roles)| {
roles
.get("flt")
.and_then(|rp| monitor_guard.get(&rp.point_id))
.map(|m| monitor_value_as_bool(m))
.unwrap_or(false)
});
let prev_flt = runtime.flt_active;
runtime.flt_active = any_flt;
if any_flt && !runtime.fault_locked {
// Find which equipment triggered the fault
let flt_eq_id = all_roles
.iter()
.find(|(_, roles)| {
roles
.get("flt")
.and_then(|rp| monitor_guard.get(&rp.point_id))
.map(|m| monitor_value_as_bool(m))
.unwrap_or(false)
})
.map(|(eq_id, _)| *eq_id)
.unwrap_or(Uuid::nil());
runtime.fault_locked = true;
let _ = state.event_manager.send(AppEvent::FaultLocked {
unit_id: unit.id,
equipment_id: flt_eq_id,
});
if runtime.auto_enabled {
runtime.auto_enabled = false;
let _ = state
.event_manager
.send(AppEvent::AutoControlStopped { unit_id: unit.id });
}
}
// FLT just cleared → require manual ack if unit is configured that way
if prev_flt && !any_flt && runtime.fault_locked {
if unit.require_manual_ack_after_fault {
runtime.manual_ack_required = true;
} else {
// Auto-clear fault lock
runtime.fault_locked = false;
}
}
drop(monitor_guard);
// ── State machine tick ───────────────────────────────────
if runtime.auto_enabled && !runtime.fault_locked && !runtime.comm_locked {
let now = Utc::now();
// Accumulate in milliseconds to avoid sub-second truncation
let delta_ms = runtime
.last_tick_at
.map(|t| (now - t).num_milliseconds().max(0))
.unwrap_or(0);
let prev_state = runtime.state.clone();
tick_state_machine(state, &mut runtime, unit, &kind_roles, &kind_eq_ids, delta_ms).await;
if runtime.state != prev_state {
let _ = state.event_manager.send(AppEvent::UnitStateChanged {
unit_id: unit.id,
from_state: format!("{:?}", prev_state),
to_state: format!("{:?}", runtime.state),
});
}
}
runtime.last_tick_at = Some(Utc::now());
store.upsert(runtime.clone()).await;
if let Err(e) = state
.ws_manager
.send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone()))
.send_to_public(WsMessage::UnitRuntimeChanged(runtime))
.await
{
tracing::debug!("Engine: WS push skipped (no subscribers): {}", e);
}
}
/// Check fault and comm status, mutate runtime, fire events.
/// Returns `true` if any field changed.
async fn check_fault_comm(
/// Drive one state-machine tick for a unit.
/// All elapsed counters accumulate in **milliseconds**; comparisons use `*_time_sec * 1000`.
async fn tick_state_machine(
state: &AppState,
runtime: &mut UnitRuntime,
unit: &crate::model::ControlUnit,
all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)],
) -> bool {
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
kind_roles: &HashMap<String, HashMap<String, EquipmentRolePoint>>,
kind_eq_ids: &HashMap<String, Uuid>,
delta_ms: i64,
) {
let feeder_roles = kind_roles.get("coal_feeder");
let dist_roles = kind_roles.get("distributor");
let feeder_eq_id = kind_eq_ids.get("coal_feeder").copied();
let dist_eq_id = kind_eq_ids.get("distributor").copied();
let any_bad = all_roles.iter().flat_map(|(_, r)| r.values()).any(|rp| {
monitor
.get(&rp.point_id)
.map(|m| m.quality != PointQuality::Good)
.unwrap_or(false)
});
let any_flt = all_roles.iter().any(|(_, roles)| {
roles
.get("flt")
.and_then(|rp| monitor.get(&rp.point_id))
.map(|m| super::monitor_value_as_bool(m))
.unwrap_or(false)
});
let flt_eq_id = if any_flt && !runtime.fault_locked {
all_roles
.iter()
.find(|(_, roles)| {
roles
.get("flt")
.and_then(|rp| monitor.get(&rp.point_id))
.map(|m| super::monitor_value_as_bool(m))
.unwrap_or(false)
})
.map(|(eq_id, _)| *eq_id)
} else {
None
};
drop(monitor);
let prev_comm = runtime.comm_locked;
let prev_flt = runtime.flt_active;
let prev_fault_locked = runtime.fault_locked;
let prev_auto = runtime.auto_enabled;
let prev_ack = runtime.manual_ack_required;
runtime.comm_locked = any_bad;
runtime.flt_active = any_flt;
if !prev_comm && runtime.comm_locked {
let _ = state.event_manager.send(AppEvent::CommLocked { unit_id: unit.id });
} else if prev_comm && !runtime.comm_locked {
let _ = state.event_manager.send(AppEvent::CommRecovered { unit_id: unit.id });
}
if let Some(eq_id) = flt_eq_id {
runtime.fault_locked = true;
let _ = state.event_manager.send(AppEvent::FaultLocked { unit_id: unit.id, equipment_id: eq_id });
if runtime.auto_enabled {
runtime.auto_enabled = false;
let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id });
}
}
if prev_flt && !any_flt && runtime.fault_locked {
if unit.require_manual_ack_after_fault {
runtime.manual_ack_required = true;
} else {
runtime.fault_locked = false;
}
}
runtime.comm_locked != prev_comm
|| runtime.flt_active != prev_flt
|| runtime.fault_locked != prev_fault_locked
|| runtime.auto_enabled != prev_auto
|| runtime.manual_ack_required != prev_ack
}
type EquipMaps = (
HashMap<String, HashMap<String, EquipmentRolePoint>>,
HashMap<String, Uuid>,
Vec<(Uuid, HashMap<String, EquipmentRolePoint>)>,
);
async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result<EquipMaps, sqlx::Error> {
let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipment_list.iter().map(|equip| equip.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?;
let mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>> = HashMap::new();
for row in role_point_rows {
role_points_by_equipment
.entry(row.equipment_id)
.or_default()
.push(EquipmentRolePoint {
point_id: row.point_id,
signal_role: row.signal_role,
});
}
Ok(build_equipment_maps(
unit_id,
&equipment_list,
role_points_by_equipment,
))
}
fn build_equipment_maps(
unit_id: Uuid,
equipment_list: &[crate::model::Equipment],
mut role_points_by_equipment: HashMap<Uuid, Vec<EquipmentRolePoint>>,
) -> EquipMaps {
let mut kind_roles: HashMap<String, HashMap<String, EquipmentRolePoint>> = HashMap::new();
let mut kind_eq_ids: HashMap<String, Uuid> = HashMap::new();
let mut all_roles: Vec<(Uuid, HashMap<String, EquipmentRolePoint>)> = Vec::new();
for equip in equipment_list {
let role_map: HashMap<String, EquipmentRolePoint> = role_points_by_equipment
.remove(&equip.id)
.unwrap_or_default()
.into_iter()
.map(|rp| (rp.signal_role.clone(), rp))
.collect();
if let Some(kind) = &equip.kind {
if !kind_roles.contains_key(kind.as_str()) {
kind_roles.insert(kind.clone(), role_map.clone());
kind_eq_ids.insert(kind.clone(), equip.id);
} else {
tracing::warn!(
"Engine: unit {} has multiple {} equipment; using first",
unit_id, kind
);
match runtime.state {
UnitRuntimeState::Stopped => {
// stop_time_sec == 0 means start immediately (no wait)
if unit.stop_time_sec > 0 {
runtime.current_stop_elapsed_sec += delta_ms; // field holds ms
if runtime.current_stop_elapsed_sec < unit.stop_time_sec as i64 * 1000 {
return;
}
}
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
feeder_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto start coal_feeder failed: {}", e);
return;
}
if state.config.simulate_plc {
if let Some(eq_id) = feeder_eq_id {
crate::control::command::simulate_run_feedback(state, eq_id, true).await;
}
}
runtime.state = UnitRuntimeState::Running;
runtime.current_stop_elapsed_sec = 0;
runtime.current_run_elapsed_sec = 0;
}
}
all_roles.push((equip.id, role_map));
}
(kind_roles, kind_eq_ids, all_roles)
UnitRuntimeState::Running => {
runtime.current_run_elapsed_sec += delta_ms;
runtime.accumulated_run_sec += delta_ms;
// Check RunTime first — stop feeder before considering distributor trigger
if unit.run_time_sec > 0
&& runtime.current_run_elapsed_sec >= unit.run_time_sec as i64 * 1000
{
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
feeder_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto stop coal_feeder failed: {}", e);
return;
}
if state.config.simulate_plc {
if let Some(eq_id) = feeder_eq_id {
crate::control::command::simulate_run_feedback(state, eq_id, false)
.await;
}
}
runtime.state = UnitRuntimeState::Stopped;
runtime.current_run_elapsed_sec = 0;
runtime.current_stop_elapsed_sec = 0;
}
return;
}
// Check AccTime — stop feeder then trigger distributor
if unit.acc_time_sec > 0
&& runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000
{
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
feeder_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: stop coal_feeder before distributor failed: {}", e);
return;
}
if state.config.simulate_plc {
if let Some(eq_id) = feeder_eq_id {
crate::control::command::simulate_run_feedback(state, eq_id, false)
.await;
}
}
}
runtime.state = UnitRuntimeState::DistributorRunning;
runtime.distributor_run_elapsed_sec = 0;
}
}
UnitRuntimeState::DistributorRunning => {
// First tick in this state (distributor_run_elapsed_sec == 0): send start pulse then return.
// Time advance happens on subsequent ticks.
if runtime.distributor_run_elapsed_sec == 0 {
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
dist_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto start distributor failed: {}", e);
} else if state.config.simulate_plc {
if let Some(eq_id) = dist_eq_id {
crate::control::command::simulate_run_feedback(state, eq_id, true)
.await;
}
}
}
// Mark as "started" by advancing to 1ms so this branch won't re-fire
runtime.distributor_run_elapsed_sec = 1;
return;
}
runtime.distributor_run_elapsed_sec += delta_ms;
if unit.bl_time_sec > 0
&& runtime.distributor_run_elapsed_sec >= unit.bl_time_sec as i64 * 1000
{
let monitor = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
if let Some((pid, vt)) =
dist_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor))
{
drop(monitor);
if let Err(e) =
send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await
{
tracing::warn!("Engine: auto stop distributor failed: {}", e);
return;
}
if state.config.simulate_plc {
if let Some(eq_id) = dist_eq_id {
crate::control::command::simulate_run_feedback(state, eq_id, false)
.await;
}
}
}
runtime.accumulated_run_sec = 0;
runtime.distributor_run_elapsed_sec = 0;
runtime.state = UnitRuntimeState::Stopped;
runtime.current_stop_elapsed_sec = 0;
}
}
UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => {}
}
}
/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad.
/// Find a command point by role in a single equipment's role map.
/// Returns `None` if REM==0 or FLT==1 or quality is bad.
fn find_cmd(
roles: &HashMap<String, EquipmentRolePoint>,
role: &str,
@ -432,13 +398,13 @@ fn find_cmd(
let rem_ok = roles
.get("rem")
.and_then(|rp| monitor.get(&rp.point_id))
.map(|m| super::monitor_value_as_bool(m) && m.quality == PointQuality::Good)
.map(|m| monitor_value_as_bool(m) && m.quality == PointQuality::Good)
.unwrap_or(true);
let flt_ok = roles
.get("flt")
.and_then(|rp| monitor.get(&rp.point_id))
.map(|m| !super::monitor_value_as_bool(m) && m.quality == PointQuality::Good)
.map(|m| !monitor_value_as_bool(m) && m.quality == PointQuality::Good)
.unwrap_or(true);
if rem_ok && flt_ok {
@ -451,64 +417,15 @@ fn find_cmd(
}
}
#[cfg(test)]
mod tests {
use super::build_equipment_maps;
use crate::model::Equipment;
use crate::service::EquipmentRolePoint;
use chrono::Utc;
use std::collections::HashMap;
use uuid::Uuid;
fn equipment(id: Uuid, unit_id: Uuid, kind: &str) -> Equipment {
Equipment {
id,
unit_id: Some(unit_id),
code: format!("EQ-{id}"),
name: format!("Equipment-{id}"),
kind: Some(kind.to_string()),
description: None,
created_at: Utc::now(),
updated_at: Utc::now(),
fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
match monitor.value.as_ref() {
Some(DataValue::Bool(v)) => *v,
Some(DataValue::Int(v)) => *v != 0,
Some(DataValue::UInt(v)) => *v != 0,
Some(DataValue::Float(v)) => *v != 0.0,
Some(DataValue::Text(v)) => {
matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on")
}
}
#[test]
fn build_equipment_maps_reflects_latest_role_bindings() {
let unit_id = Uuid::new_v4();
let equipment_id = Uuid::new_v4();
let first_start_point = Uuid::new_v4();
let second_start_point = Uuid::new_v4();
let equipment_list = vec![equipment(equipment_id, unit_id, "coal_feeder")];
let mut first_roles = HashMap::new();
first_roles.insert(
equipment_id,
vec![EquipmentRolePoint {
point_id: first_start_point,
signal_role: "start_cmd".to_string(),
}],
);
let (first_kind_roles, _, _) = build_equipment_maps(unit_id, &equipment_list, first_roles);
let mut second_roles = HashMap::new();
second_roles.insert(
equipment_id,
vec![EquipmentRolePoint {
point_id: second_start_point,
signal_role: "start_cmd".to_string(),
}],
);
let (second_kind_roles, _, _) =
build_equipment_maps(unit_id, &equipment_list, second_roles);
assert_eq!(
first_kind_roles["coal_feeder"]["start_cmd"].point_id,
first_start_point
);
assert_eq!(
second_kind_roles["coal_feeder"]["start_cmd"].point_id,
second_start_point
);
_ => false,
}
}

View File

@ -1,20 +1,4 @@
pub mod command;
pub mod engine;
pub mod runtime;
pub mod simulate;
pub mod validator;
use crate::telemetry::{DataValue, PointMonitorInfo};
pub(crate) fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
match monitor.value.as_ref() {
Some(DataValue::Bool(v)) => *v,
Some(DataValue::Int(v)) => *v != 0,
Some(DataValue::UInt(v)) => *v != 0,
Some(DataValue::Float(v)) => *v != 0.0,
Some(DataValue::Text(v)) => {
matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes")
}
_ => false,
}
}

View File

@ -1,6 +1,7 @@
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{Notify, RwLock};
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
@ -19,12 +20,14 @@ pub struct UnitRuntime {
pub state: UnitRuntimeState,
pub auto_enabled: bool,
pub accumulated_run_sec: i64,
/// Snapshot updated only on state transitions; used for display to avoid mid-tick jitter.
pub display_acc_sec: i64,
pub current_run_elapsed_sec: i64,
pub current_stop_elapsed_sec: i64,
pub distributor_run_elapsed_sec: i64,
pub fault_locked: bool,
pub flt_active: bool,
pub comm_locked: bool,
pub manual_ack_required: bool,
pub last_tick_at: Option<DateTime<Utc>>,
}
impl UnitRuntime {
@ -34,11 +37,14 @@ impl UnitRuntime {
state: UnitRuntimeState::Stopped,
auto_enabled: false,
accumulated_run_sec: 0,
display_acc_sec: 0,
current_run_elapsed_sec: 0,
current_stop_elapsed_sec: 0,
distributor_run_elapsed_sec: 0,
fault_locked: false,
flt_active: false,
comm_locked: false,
manual_ack_required: false,
last_tick_at: None,
}
}
}
@ -46,7 +52,6 @@ impl UnitRuntime {
#[derive(Clone, Default)]
pub struct ControlRuntimeStore {
inner: Arc<RwLock<HashMap<Uuid, UnitRuntime>>>,
notifiers: Arc<RwLock<HashMap<Uuid, Arc<Notify>>>>,
}
impl ControlRuntimeStore {
@ -71,24 +76,4 @@ impl ControlRuntimeStore {
pub async fn upsert(&self, runtime: UnitRuntime) {
self.inner.write().await.insert(runtime.unit_id, runtime);
}
pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc<Notify> {
self.notifiers
.write()
.await
.entry(unit_id)
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub async fn get_all(&self) -> HashMap<Uuid, UnitRuntime> {
self.inner.read().await.clone()
}
/// Wake the engine task for a unit (e.g., when auto_enabled or fault_locked changes).
pub async fn notify_unit(&self, unit_id: Uuid) {
if let Some(n) = self.notifiers.read().await.get(&unit_id) {
n.notify_one();
}
}
}

View File

@ -1,213 +0,0 @@
use tokio::time::Duration;
use uuid::Uuid;
use crate::{
connection::{BatchSetPointValueReq, SetPointValueReqItem},
telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType},
websocket::WsMessage,
AppState,
};
/// Start the chaos simulation task (only when SIMULATE_PLC=true).
/// Randomly disrupts `rem` or `flt` signals on equipment to exercise the control engine.
pub fn start(state: AppState) {
tokio::spawn(async move {
run(state).await;
});
}
async fn run(state: AppState) {
let mut rng = seed_rng();
loop {
// Wait a random 1560 s between events.
let wait_secs = 15 + xorshift(&mut rng) % 46;
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
// Pick a random enabled unit.
let units = match crate::service::get_all_enabled_units(&state.pool).await {
Ok(u) if !u.is_empty() => u,
_ => continue,
};
let unit = &units[xorshift(&mut rng) as usize % units.len()];
// Only target units with auto control running — otherwise the event is uninteresting.
let runtime = state.control_runtime.get(unit.id).await;
if runtime.map_or(true, |r| !r.auto_enabled) {
continue;
}
// Pick a random equipment in that unit.
let equipments =
match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
let eq = &equipments[xorshift(&mut rng) as usize % equipments.len()];
// Find which of rem / flt this equipment has.
let role_points =
match crate::service::get_equipment_role_points(&state.pool, eq.id).await {
Ok(rp) if !rp.is_empty() => rp,
_ => continue,
};
let candidates: Vec<&str> = ["flt", "rem"]
.iter()
.filter(|&&r| role_points.iter().any(|p| p.signal_role == r))
.copied()
.collect();
if candidates.is_empty() {
continue;
}
let target_role = candidates[xorshift(&mut rng) as usize % candidates.len()];
let target_point = role_points
.iter()
.find(|p| p.signal_role == target_role)
.unwrap();
// rem=false → not in remote mode (blocks commands)
// flt=true → fault signal active (triggers fault lock)
let trigger_value = target_role == "flt";
// Hold duration: 515 s for rem, 310 s for flt.
let hold_secs = if target_role == "flt" {
3 + xorshift(&mut rng) % 8
} else {
5 + xorshift(&mut rng) % 11
};
tracing::info!(
"[chaos] unit={} eq={} role={} → {} (hold {}s)",
unit.code,
eq.code,
target_role,
if trigger_value { "FAULT" } else { "REM OFF" },
hold_secs
);
patch_signal(&state, target_point.point_id, trigger_value).await;
tokio::time::sleep(Duration::from_secs(hold_secs)).await;
patch_signal(&state, target_point.point_id, !trigger_value).await;
tracing::info!(
"[chaos] unit={} eq={} role={} → RESTORED",
unit.code,
eq.code,
target_role
);
}
}
/// Simulate RUN signal feedback for an equipment after a manual start/stop command.
/// Called by the engine and control handler when SIMULATE_PLC=true.
pub async fn simulate_run_feedback(state: &AppState, equipment_id: Uuid, run_on: bool) {
let role_points =
match crate::service::get_equipment_role_points(&state.pool, equipment_id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("simulate_run_feedback: db error: {}", e);
return;
}
};
let run_point = match role_points.iter().find(|p| p.signal_role == "run") {
Some(p) => p.clone(),
None => return,
};
patch_signal(state, run_point.point_id, run_on).await;
}
/// Patch a signal point value: try OPC UA write first, fall back to cache patch + WS push.
pub async fn patch_signal(state: &AppState, point_id: Uuid, value_on: bool) {
let write_json = serde_json::json!(if value_on { 1 } else { 0 });
let write_ok = match state
.connection_manager
.write_point_values_batch(BatchSetPointValueReq {
items: vec![SetPointValueReqItem {
point_id,
value: write_json,
}],
})
.await
{
Ok(res) => res.success,
Err(_) => false,
};
if write_ok {
return;
}
// Fallback: patch the monitor cache directly and broadcast over WS.
let (value, value_type, value_text) = {
let guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
match guard.get(&point_id).and_then(|m| m.value_type.as_ref()) {
Some(ValueType::Int) => (
DataValue::Int(if value_on { 1 } else { 0 }),
Some(ValueType::Int),
Some(if value_on { "1" } else { "0" }.to_string()),
),
Some(ValueType::UInt) => (
DataValue::UInt(if value_on { 1 } else { 0 }),
Some(ValueType::UInt),
Some(if value_on { "1" } else { "0" }.to_string()),
),
_ => (
DataValue::Bool(value_on),
Some(ValueType::Bool),
Some(value_on.to_string()),
),
}
};
let monitor = PointMonitorInfo {
protocol: "simulation".to_string(),
source_id: Uuid::nil(),
point_id,
client_handle: 0,
scan_mode: crate::model::ScanMode::Poll,
timestamp: Some(chrono::Utc::now()),
quality: PointQuality::Good,
value: Some(value),
value_type,
value_text,
old_value: None,
old_timestamp: None,
value_changed: true,
};
if let Err(e) = state
.connection_manager
.update_point_monitor_data(monitor.clone())
.await
{
tracing::warn!("[chaos] cache update failed for {}: {}", point_id, e);
return;
}
let _ = state
.ws_manager
.send_to_public(WsMessage::PointNewValue(monitor))
.await;
}
// ── Minimal XorShift64 PRNG (no external crate needed) ────────────────────────
fn seed_rng() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64 ^ d.as_secs().wrapping_mul(0x9e37_79b9_7f4a_7c15))
.unwrap_or(0xdeadbeef)
}
fn xorshift(s: &mut u64) -> u64 {
*s ^= *s << 13;
*s ^= *s >> 7;
*s ^= *s << 17;
*s
}

View File

@ -5,7 +5,7 @@ use uuid::Uuid;
use crate::{
service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality, ValueType},
telemetry::{DataValue, PointMonitorInfo, PointQuality, ValueType},
util::response::ApiErr,
AppState,
};
@ -95,7 +95,7 @@ pub async fn validate_manual_control(
let rem_monitor = monitor_guard
.get(&rem_point.point_id)
.ok_or_else(|| missing_monitor_err("REM", equipment_id))?;
if !super::monitor_value_as_bool(rem_monitor) {
if !monitor_value_as_bool(rem_monitor) {
return Err(ApiErr::Forbidden(
"Remote control not allowed, REM is not enabled".to_string(),
Some(json!({ "equipment_id": equipment_id })),
@ -107,7 +107,7 @@ pub async fn validate_manual_control(
let flt_monitor = monitor_guard
.get(&flt_point.point_id)
.ok_or_else(|| missing_monitor_err("FLT", equipment_id))?;
if super::monitor_value_as_bool(flt_monitor) {
if monitor_value_as_bool(flt_monitor) {
return Err(ApiErr::Forbidden(
"Equipment fault is active, command denied".to_string(),
Some(json!({ "equipment_id": equipment_id })),
@ -199,3 +199,16 @@ fn missing_monitor_err(role: &str, equipment_id: Uuid) -> ApiErr {
)
}
fn monitor_value_as_bool(monitor: &PointMonitorInfo) -> bool {
match monitor.value.as_ref() {
Some(DataValue::Bool(value)) => *value,
Some(DataValue::Int(value)) => *value != 0,
Some(DataValue::UInt(value)) => *value != 0,
Some(DataValue::Float(value)) => *value != 0.0,
Some(DataValue::Text(value)) => matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "on" | "yes"
),
_ => false,
}
}

View File

@ -15,7 +15,6 @@ pub enum AppEvent {
},
SourceDelete {
source_id: Uuid,
source_name: String,
},
PointCreateBatch {
source_id: Uuid,
@ -160,7 +159,7 @@ async fn handle_control_event(
tracing::error!("Failed to reconnect source {}: {}", source_id, e);
}
}
AppEvent::SourceDelete { source_id, .. } => {
AppEvent::SourceDelete { source_id } => {
tracing::info!("Processing SourceDelete event for {}", source_id);
if let Err(e) = connection_manager.disconnect(source_id).await {
tracing::error!("Failed to disconnect from source {}: {}", source_id, e);
@ -254,166 +253,133 @@ async fn handle_control_event(
}
}
async fn fetch_source_name(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_unit_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM unit WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn fetch_equipment_code(pool: &sqlx::PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT code FROM equipment WHERE id = $1")
.bind(id)
.fetch_optional(pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| id.to_string())
}
async fn persist_event_if_needed(
event: &AppEvent,
pool: &sqlx::PgPool,
ws_manager: Option<&std::sync::Arc<crate::websocket::WebSocketManager>>,
) {
let record = match event {
AppEvent::SourceCreate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.created", "info",
None, None, Some(*source_id),
format!("数据源【{}】已创建", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceUpdate { source_id } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"source.updated", "info",
None, None, Some(*source_id),
format!("数据源【{}】已更新", name),
serde_json::json!({ "source_id": source_id }),
))
}
AppEvent::SourceDelete { source_id, source_name } => Some((
"source.deleted", "warn",
None, None, None,
format!("数据源【{}】已删除", source_name),
AppEvent::SourceCreate { source_id } => Some((
"source.created",
"info",
None,
None,
Some(*source_id),
format!("Source {} created", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::PointCreateBatch { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"point.batch_created", "info",
None, None, Some(*source_id),
format!("批量创建 {} 个测点(数据源:{}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
AppEvent::PointDeleteBatch { source_id, point_ids } => {
let name = fetch_source_name(pool, *source_id).await;
Some((
"point.batch_deleted", "warn",
None, None, Some(*source_id),
format!("批量删除 {} 个测点(数据源:{}", point_ids.len(), name),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
))
}
AppEvent::EquipmentStartCommandSent { equipment_id, unit_id, point_id } => {
let code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"equipment.start_command_sent", "info",
*unit_id, Some(*equipment_id), None,
format!("已发送启动指令(设备:{}", code),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
))
}
AppEvent::EquipmentStopCommandSent { equipment_id, unit_id, point_id } => {
let code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"equipment.stop_command_sent", "info",
*unit_id, Some(*equipment_id), None,
format!("已发送停止指令(设备:{}", code),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
))
}
AppEvent::AutoControlStarted { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_started", "info",
Some(*unit_id), None, None,
format!("已启动自动控制(单元:{}", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::AutoControlStopped { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.auto_control_stopped", "info",
Some(*unit_id), None, None,
format!("已停止自动控制(单元:{}", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::FaultLocked { unit_id, equipment_id } => {
let unit_code = fetch_unit_code(pool, *unit_id).await;
let eq_code = fetch_equipment_code(pool, *equipment_id).await;
Some((
"unit.fault_locked", "error",
Some(*unit_id), Some(*equipment_id), None,
format!("单元【{}】发生故障锁定,触发设备:{}", unit_code, eq_code),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
))
}
AppEvent::FaultAcked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.fault_acked", "info",
Some(*unit_id), None, None,
format!("单元【{}】故障已人工确认", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::CommLocked { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_locked", "warn",
Some(*unit_id), None, None,
format!("单元【{}】通讯中断", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::CommRecovered { unit_id } => {
let code = fetch_unit_code(pool, *unit_id).await;
Some((
"unit.comm_recovered", "info",
Some(*unit_id), None, None,
format!("单元【{}】通讯恢复", code),
serde_json::json!({ "unit_id": unit_id }),
))
}
AppEvent::UnitStateChanged { .. } => None,
AppEvent::SourceUpdate { source_id } => Some((
"source.updated",
"info",
None,
None,
Some(*source_id),
format!("Source {} updated", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::SourceDelete { source_id } => Some((
"source.deleted",
"warn",
None,
None,
None,
format!("Source {} deleted", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::PointCreateBatch { source_id, point_ids } => Some((
"point.batch_created",
"info",
None,
None,
Some(*source_id),
format!("{} points created for source {}", point_ids.len(), source_id),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)),
AppEvent::PointDeleteBatch { source_id, point_ids } => Some((
"point.batch_deleted",
"warn",
None,
None,
Some(*source_id),
format!("{} points deleted for source {}", point_ids.len(), source_id),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)),
AppEvent::EquipmentStartCommandSent {
equipment_id,
unit_id,
point_id,
} => Some((
"equipment.start_command_sent",
"info",
*unit_id,
Some(*equipment_id),
None,
format!("Start command sent to equipment {}", equipment_id),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
)),
AppEvent::EquipmentStopCommandSent {
equipment_id,
unit_id,
point_id,
} => Some((
"equipment.stop_command_sent",
"info",
*unit_id,
Some(*equipment_id),
None,
format!("Stop command sent to equipment {}", equipment_id),
serde_json::json!({
"equipment_id": equipment_id,
"unit_id": unit_id,
"point_id": point_id
}),
)),
AppEvent::AutoControlStarted { unit_id } => Some((
"unit.auto_control_started", "info",
Some(*unit_id), None, None,
format!("Auto control started for unit {}", unit_id),
serde_json::json!({ "unit_id": unit_id }),
)),
AppEvent::AutoControlStopped { unit_id } => Some((
"unit.auto_control_stopped", "info",
Some(*unit_id), None, None,
format!("Auto control stopped for unit {}", unit_id),
serde_json::json!({ "unit_id": unit_id }),
)),
AppEvent::FaultLocked { unit_id, equipment_id } => Some((
"unit.fault_locked", "error",
Some(*unit_id), Some(*equipment_id), None,
format!("Unit {} fault locked by equipment {}", unit_id, equipment_id),
serde_json::json!({ "unit_id": unit_id, "equipment_id": equipment_id }),
)),
AppEvent::FaultAcked { unit_id } => Some((
"unit.fault_acked", "info",
Some(*unit_id), None, None,
format!("Unit {} fault acknowledged", unit_id),
serde_json::json!({ "unit_id": unit_id }),
)),
AppEvent::CommLocked { unit_id } => Some((
"unit.comm_locked", "warn",
Some(*unit_id), None, None,
format!("Unit {} communication locked", unit_id),
serde_json::json!({ "unit_id": unit_id }),
)),
AppEvent::CommRecovered { unit_id } => Some((
"unit.comm_recovered", "info",
Some(*unit_id), None, None,
format!("Unit {} communication recovered", unit_id),
serde_json::json!({ "unit_id": unit_id }),
)),
AppEvent::UnitStateChanged { unit_id, from_state, to_state } => Some((
"unit.state_changed", "info",
Some(*unit_id), None, None,
format!("Unit {} state: {}{}", unit_id, from_state, to_state),
serde_json::json!({ "unit_id": unit_id, "from": from_state, "to": to_state }),
)),
AppEvent::PointNewValue(_) => None,
};

View File

@ -18,27 +18,6 @@ use crate::{
AppState,
};
fn validate_unit_timing_order(
run_time_sec: i32,
acc_time_sec: i32,
) -> Result<(), ApiErr> {
if acc_time_sec <= run_time_sec {
return Err(ApiErr::BadRequest(
"acc_time_sec must be greater than run_time_sec".to_string(),
Some(json!({
"run_time_sec": ["must be less than acc_time_sec"],
"acc_time_sec": ["must be greater than run_time_sec"]
})),
));
}
Ok(())
}
fn auto_control_start_blocked(runtime: &crate::control::runtime::UnitRuntime) -> bool {
runtime.fault_locked || runtime.comm_locked || runtime.manual_ack_required
}
#[derive(Debug, Deserialize, Validate)]
pub struct GetUnitListQuery {
#[validate(length(min = 1, max = 100))]
@ -47,21 +26,6 @@ pub struct GetUnitListQuery {
pub pagination: PaginationParams,
}
#[derive(serde::Serialize)]
pub struct UnitEquipmentItem {
#[serde(flatten)]
pub equipment: crate::model::Equipment,
pub role_points: Vec<crate::handler::equipment::SignalRolePoint>,
}
#[derive(serde::Serialize)]
pub struct UnitWithRuntime {
#[serde(flatten)]
pub unit: crate::model::ControlUnit,
pub runtime: Option<crate::control::runtime::UnitRuntime>,
pub equipments: Vec<UnitEquipmentItem>,
}
pub async fn get_unit_list(
State(state): State<AppState>,
Query(query): Query<GetUnitListQuery>,
@ -69,7 +33,7 @@ pub async fn get_unit_list(
query.validate()?;
let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?;
let units = crate::service::get_units_paginated(
let data = crate::service::get_units_paginated(
&state.pool,
query.keyword.as_deref(),
query.pagination.page_size,
@ -77,58 +41,6 @@ pub async fn get_unit_list(
)
.await?;
let all_runtimes = state.control_runtime.get_all().await;
let unit_ids: Vec<Uuid> = units.iter().map(|u| u.id).collect();
let all_equipments =
crate::service::get_equipment_by_unit_ids(&state.pool, &unit_ids).await?;
let eq_ids: Vec<Uuid> = all_equipments.iter().map(|e| e.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?;
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let mut role_points_map: std::collections::HashMap<
Uuid,
Vec<crate::handler::equipment::SignalRolePoint>,
> = std::collections::HashMap::new();
for rp in role_point_rows {
role_points_map
.entry(rp.equipment_id)
.or_default()
.push(crate::handler::equipment::SignalRolePoint {
point_id: rp.point_id,
signal_role: rp.signal_role,
point_monitor: monitor_guard.get(&rp.point_id).cloned(),
});
}
drop(monitor_guard);
let mut equipments_by_unit: std::collections::HashMap<Uuid, Vec<UnitEquipmentItem>> =
std::collections::HashMap::new();
for eq in all_equipments {
let role_points = role_points_map.remove(&eq.id).unwrap_or_default();
if let Some(unit_id) = eq.unit_id {
equipments_by_unit
.entry(unit_id)
.or_default()
.push(UnitEquipmentItem { equipment: eq, role_points });
}
}
let data = units
.into_iter()
.map(|unit| {
let runtime = all_runtimes.get(&unit.id).cloned();
let equipments = equipments_by_unit.remove(&unit.id).unwrap_or_default();
UnitWithRuntime { unit, runtime, equipments }
})
.collect::<Vec<_>>();
Ok(Json(PaginatedResponse::new(
data,
total,
@ -170,7 +82,7 @@ async fn send_equipment_command(
.map_err(|e| ApiErr::Internal(e, None))?;
if state.config.simulate_plc {
crate::control::simulate::simulate_run_feedback(
crate::control::command::simulate_run_feedback(
&state,
equipment_id,
matches!(action, ControlAction::Start),
@ -206,45 +118,10 @@ pub async fn get_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit = crate::service::get_unit_by_id(&state.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await;
let all_equipments =
crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
let eq_ids: Vec<Uuid> = all_equipments.iter().map(|e| e.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &eq_ids).await?;
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let mut role_points_map: std::collections::HashMap<
Uuid,
Vec<crate::handler::equipment::SignalRolePoint>,
> = std::collections::HashMap::new();
for rp in role_point_rows {
role_points_map
.entry(rp.equipment_id)
.or_default()
.push(crate::handler::equipment::SignalRolePoint {
point_id: rp.point_id,
signal_role: rp.signal_role,
point_monitor: monitor_guard.get(&rp.point_id).cloned(),
});
match crate::service::get_unit_by_id(&state.pool, unit_id).await? {
Some(unit) => Ok(Json(unit)),
None => Err(ApiErr::NotFound("Unit not found".to_string(), None)),
}
drop(monitor_guard);
let equipments = all_equipments
.into_iter()
.map(|eq| {
let role_points = role_points_map.remove(&eq.id).unwrap_or_default();
UnitEquipmentItem { equipment: eq, role_points }
})
.collect();
Ok(Json(UnitWithRuntime { unit, runtime, equipments }))
}
#[derive(serde::Serialize)]
@ -265,7 +142,6 @@ pub struct EquipmentDetail {
pub struct UnitDetail {
#[serde(flatten)]
pub unit: crate::model::ControlUnit,
pub runtime: Option<crate::control::runtime::UnitRuntime>,
pub equipments: Vec<EquipmentDetail>,
}
@ -277,8 +153,6 @@ pub async fn get_unit_detail(
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await;
let equipments = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?;
let equipment_ids: Vec<Uuid> = equipments.iter().map(|e| e.id).collect();
let all_points = crate::service::get_points_by_equipment_ids(&state.pool, &equipment_ids).await?;
@ -303,7 +177,7 @@ pub async fn get_unit_detail(
})
.collect();
Ok(Json(UnitDetail { unit, runtime, equipments }))
Ok(Json(UnitDetail { unit, equipments }))
}
#[derive(Debug, Deserialize, Validate)]
@ -314,13 +188,13 @@ pub struct CreateUnitReq {
pub name: String,
pub description: Option<String>,
pub enabled: Option<bool>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub run_time_sec: Option<i32>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub stop_time_sec: Option<i32>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub acc_time_sec: Option<i32>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
@ -331,33 +205,6 @@ pub async fn create_unit(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let run_time_sec = payload.run_time_sec.ok_or_else(|| {
ApiErr::BadRequest(
"run_time_sec is required".to_string(),
Some(json!({ "run_time_sec": ["is required"] })),
)
})?;
let stop_time_sec = payload.stop_time_sec.ok_or_else(|| {
ApiErr::BadRequest(
"stop_time_sec is required".to_string(),
Some(json!({ "stop_time_sec": ["is required"] })),
)
})?;
let acc_time_sec = payload.acc_time_sec.ok_or_else(|| {
ApiErr::BadRequest(
"acc_time_sec is required".to_string(),
Some(json!({ "acc_time_sec": ["is required"] })),
)
})?;
let bl_time_sec = payload.bl_time_sec.ok_or_else(|| {
ApiErr::BadRequest(
"bl_time_sec is required".to_string(),
Some(json!({ "bl_time_sec": ["is required"] })),
)
})?;
validate_unit_timing_order(run_time_sec, acc_time_sec)?;
if crate::service::get_unit_by_code(&state.pool, &payload.code)
.await?
.is_some()
@ -375,10 +222,10 @@ pub async fn create_unit(
name: &payload.name,
description: payload.description.as_deref(),
enabled: payload.enabled.unwrap_or(true),
run_time_sec,
stop_time_sec,
acc_time_sec,
bl_time_sec,
run_time_sec: payload.run_time_sec.unwrap_or(0),
stop_time_sec: payload.stop_time_sec.unwrap_or(0),
acc_time_sec: payload.acc_time_sec.unwrap_or(0),
bl_time_sec: payload.bl_time_sec.unwrap_or(0),
require_manual_ack_after_fault: payload
.require_manual_ack_after_fault
.unwrap_or(true),
@ -403,13 +250,13 @@ pub struct UpdateUnitReq {
pub name: Option<String>,
pub description: Option<String>,
pub enabled: Option<bool>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub run_time_sec: Option<i32>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub stop_time_sec: Option<i32>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub acc_time_sec: Option<i32>,
#[validate(range(min = 1, message = "must be greater than 0"))]
#[validate(range(min = 0))]
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
@ -421,14 +268,12 @@ pub async fn update_unit(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
let existing_unit = crate::service::get_unit_by_id(&state.pool, unit_id)
if crate::service::get_unit_by_id(&state.pool, unit_id)
.await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
validate_unit_timing_order(
payload.run_time_sec.unwrap_or(existing_unit.run_time_sec),
payload.acc_time_sec.unwrap_or(existing_unit.acc_time_sec),
)?;
.is_none()
{
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?;
@ -538,20 +383,10 @@ pub async fn start_auto_unit(
}
let mut runtime = state.control_runtime.get_or_init(unit_id).await;
if auto_control_start_blocked(&runtime) {
let message = if runtime.fault_locked {
"Unit is fault locked, cannot start auto control"
} else if runtime.comm_locked {
"Unit communication is locked, cannot start auto control"
} else {
"Fault acknowledgement required before starting auto control"
};
return Err(ApiErr::BadRequest(message.to_string(), None));
}
runtime.auto_enabled = true;
runtime.state = crate::control::runtime::UnitRuntimeState::Stopped;
runtime.current_stop_elapsed_sec = 0;
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await;
let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStarted { unit_id });
@ -569,7 +404,6 @@ pub async fn stop_auto_unit(
let mut runtime = state.control_runtime.get_or_init(unit_id).await;
runtime.auto_enabled = false;
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await;
let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStopped { unit_id });
@ -589,14 +423,14 @@ pub async fn batch_start_auto(
skipped.push(unit.id);
continue;
}
if auto_control_start_blocked(&runtime) {
if runtime.fault_locked || runtime.comm_locked {
skipped.push(unit.id);
continue;
}
runtime.auto_enabled = true;
runtime.state = crate::control::runtime::UnitRuntimeState::Stopped;
runtime.current_stop_elapsed_sec = 0;
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit.id).await;
let _ = state
.event_manager
.send(crate::event::AppEvent::AutoControlStarted { unit_id: unit.id });
@ -619,7 +453,6 @@ pub async fn batch_stop_auto(
}
runtime.auto_enabled = false;
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit.id).await;
let _ = state
.event_manager
.send(crate::event::AppEvent::AutoControlStopped { unit_id: unit.id });
@ -656,7 +489,6 @@ pub async fn ack_fault_unit(
runtime.manual_ack_required = false;
runtime.state = crate::control::runtime::UnitRuntimeState::Stopped;
state.control_runtime.upsert(runtime).await;
state.control_runtime.notify_unit(unit_id).await;
let _ = state.event_manager.send(crate::event::AppEvent::FaultAcked { unit_id });
@ -674,74 +506,3 @@ pub async fn get_unit_runtime(
let runtime = state.control_runtime.get_or_init(unit_id).await;
Ok(Json(runtime))
}
#[cfg(test)]
mod tests {
use super::{
auto_control_start_blocked, validate_unit_timing_order, CreateUnitReq, UpdateUnitReq,
};
use crate::control::runtime::{UnitRuntime, UnitRuntimeState};
use uuid::Uuid;
use validator::Validate;
#[test]
fn create_unit_req_rejects_zero_second_fields() {
let payload = CreateUnitReq {
code: "U-01".to_string(),
name: "Unit 01".to_string(),
description: None,
enabled: Some(true),
run_time_sec: Some(0),
stop_time_sec: Some(10),
acc_time_sec: Some(20),
bl_time_sec: Some(5),
require_manual_ack_after_fault: Some(true),
};
assert!(payload.validate().is_err());
}
#[test]
fn create_unit_req_rejects_acc_time_not_greater_than_run_time() {
assert!(validate_unit_timing_order(10, 10).is_err());
}
#[test]
fn update_unit_req_rejects_zero_second_fields() {
let payload = UpdateUnitReq {
code: None,
name: None,
description: None,
enabled: None,
run_time_sec: None,
stop_time_sec: Some(0),
acc_time_sec: Some(20),
bl_time_sec: Some(5),
require_manual_ack_after_fault: None,
};
assert!(payload.validate().is_err());
}
#[test]
fn update_unit_req_rejects_acc_time_not_greater_than_run_time_when_both_present() {
assert!(validate_unit_timing_order(20, 15).is_err());
}
#[test]
fn auto_control_start_is_blocked_by_comm_lock() {
let runtime = UnitRuntime {
unit_id: Uuid::new_v4(),
state: UnitRuntimeState::Stopped,
auto_enabled: false,
accumulated_run_sec: 0,
display_acc_sec: 0,
fault_locked: false,
flt_active: false,
comm_locked: true,
manual_ack_required: false,
};
assert!(auto_control_start_blocked(&runtime));
}
}

View File

@ -21,20 +21,3 @@ pub async fn get_api_md() -> Result<impl IntoResponse, ApiErr> {
Ok((StatusCode::OK, headers, content))
}
pub async fn get_readme_md() -> Result<impl IntoResponse, ApiErr> {
let content = tokio::fs::read_to_string("README.md")
.await
.map_err(|err| {
tracing::error!("Failed to read README.md: {}", err);
ApiErr::NotFound("README.md not found".to_string(), None)
})?;
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("text/markdown; charset=utf-8"),
);
Ok((StatusCode::OK, headers, content))
}

View File

@ -14,18 +14,6 @@ use crate::util::{
};
use crate::AppState;
async fn notify_units(
state: &AppState,
unit_ids: impl IntoIterator<Item = Uuid>,
) {
let mut seen = std::collections::HashSet::new();
for unit_id in unit_ids {
if seen.insert(unit_id) {
state.control_runtime.notify_unit(unit_id).await;
}
}
}
#[derive(Deserialize, Validate)]
pub struct GetEquipmentListQuery {
#[validate(length(min = 1, max = 100))]
@ -34,19 +22,11 @@ pub struct GetEquipmentListQuery {
pub pagination: PaginationParams,
}
#[derive(Serialize)]
pub struct SignalRolePoint {
pub point_id: uuid::Uuid,
pub signal_role: String,
pub point_monitor: Option<crate::telemetry::PointMonitorInfo>,
}
#[derive(Serialize)]
pub struct EquipmentListItem {
#[serde(flatten)]
pub equipment: crate::model::Equipment,
pub point_count: i64,
pub role_points: Vec<SignalRolePoint>,
}
pub async fn get_equipment_list(
@ -56,7 +36,7 @@ pub async fn get_equipment_list(
query.validate()?;
let total = crate::service::get_equipment_count(&state.pool, query.keyword.as_deref()).await?;
let items = crate::service::get_equipment_paginated(
let data = crate::service::get_equipment_paginated(
&state.pool,
query.keyword.as_deref(),
query.pagination.page_size,
@ -64,38 +44,6 @@ pub async fn get_equipment_list(
)
.await?;
let equipment_ids: Vec<uuid::Uuid> = items.iter().map(|item| item.equipment.id).collect();
let role_point_rows =
crate::service::get_signal_role_points_batch(&state.pool, &equipment_ids).await?;
let monitor_guard = state
.connection_manager
.get_point_monitor_data_read_guard()
.await;
let mut role_points_map: std::collections::HashMap<uuid::Uuid, Vec<SignalRolePoint>> =
std::collections::HashMap::new();
for rp in role_point_rows {
role_points_map
.entry(rp.equipment_id)
.or_default()
.push(SignalRolePoint {
point_id: rp.point_id,
signal_role: rp.signal_role,
point_monitor: monitor_guard.get(&rp.point_id).cloned(),
});
}
let data = items
.into_iter()
.map(|item| EquipmentListItem {
role_points: role_points_map
.remove(&item.equipment.id)
.unwrap_or_default(),
..item
})
.collect::<Vec<_>>();
Ok(Json(PaginatedResponse::new(
data,
total,
@ -188,10 +136,6 @@ pub async fn create_equipment(
)
.await?;
if let Some(unit_id) = payload.unit_id {
notify_units(&state, [unit_id]).await;
}
Ok((
StatusCode::CREATED,
Json(serde_json::json!({
@ -218,11 +162,9 @@ pub async fn update_equipment(
}
let exists = crate::service::get_equipment_by_id(&state.pool, equipment_id).await?;
let existing_equipment = if let Some(equipment) = exists {
equipment
} else {
if exists.is_none() {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
};
}
if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
@ -255,19 +197,6 @@ pub async fn update_equipment(
)
.await?;
let mut unit_ids = Vec::new();
if let Some(unit_id) = existing_equipment.unit_id {
unit_ids.push(unit_id);
}
let next_unit_id = match payload.unit_id {
Some(next) => next,
None => existing_equipment.unit_id,
};
if let Some(unit_id) = next_unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids).await;
Ok(Json(serde_json::json!({
"ok_msg": "Equipment updated successfully"
})))
@ -293,9 +222,6 @@ pub async fn batch_set_equipment_unit(
}
}
let before_unit_ids =
crate::service::get_unit_ids_by_equipment_ids(&state.pool, &payload.equipment_ids).await?;
let updated_count = crate::service::batch_set_equipment_unit(
&state.pool,
&payload.equipment_ids,
@ -303,12 +229,6 @@ pub async fn batch_set_equipment_unit(
)
.await?;
let mut unit_ids = before_unit_ids;
if let Some(unit_id) = payload.unit_id {
unit_ids.push(unit_id);
}
notify_units(&state, unit_ids).await;
Ok(Json(serde_json::json!({
"ok_msg": "Equipment unit updated successfully",
"updated_count": updated_count
@ -319,13 +239,10 @@ pub async fn delete_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let unit_ids = crate::service::get_unit_ids_by_equipment_ids(&state.pool, &[equipment_id]).await?;
let deleted = crate::service::delete_equipment(&state.pool, equipment_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
notify_units(&state, unit_ids).await;
Ok(StatusCode::NO_CONTENT)
}

View File

@ -46,13 +46,6 @@ pub struct LogChunkResponse {
pub reset: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct StreamFileState {
path: PathBuf,
file_name: String,
cursor: u64,
}
pub async fn get_logs(Query(query): Query<LogQuery>) -> Result<impl IntoResponse, ApiErr> {
let path = resolve_log_file(query.file.as_deref()).await?;
let file_name = file_name_of(&path);
@ -81,44 +74,17 @@ pub async fn stream_logs(Query(query): Query<LogQuery>) -> Result<impl IntoRespo
.max_bytes
.unwrap_or(STREAM_MAX_BYTES)
.clamp(1, MAX_MAX_BYTES);
let follow_latest = query.file.is_none();
let start_cursor = query.cursor.unwrap_or(file_len(&path).await?);
let event_stream = stream! {
let mut ticker = interval(Duration::from_millis(800));
let mut stream_file = StreamFileState {
path,
file_name,
cursor: start_cursor,
};
let mut cursor = start_cursor;
loop {
ticker.tick().await;
let switched = if follow_latest {
match latest_log_file(Path::new(LOG_DIR)).await {
Ok(latest_path) => {
let latest = StreamFileState {
file_name: file_name_of(&latest_path),
path: latest_path,
cursor: 0,
};
let (next, switched) = advance_stream_file(&stream_file, &latest);
stream_file = next;
switched
}
Err(_) => false,
}
} else {
false
};
match read_since(&stream_file.path, &stream_file.file_name, stream_file.cursor, max_bytes).await {
match read_since(&path, &file_name, cursor, max_bytes).await {
Ok(chunk) => {
stream_file.cursor = chunk.cursor;
let chunk = LogChunkResponse {
reset: chunk.reset || switched,
..chunk
};
cursor = chunk.cursor;
if chunk.reset || !chunk.lines.is_empty() {
match Event::default().event("log").json_data(&chunk) {
Ok(event) => yield Ok::<Event, Infallible>(event),
@ -301,54 +267,9 @@ fn file_name_of(path: &Path) -> String {
.to_string()
}
fn advance_stream_file(
current: &StreamFileState,
latest: &StreamFileState,
) -> (StreamFileState, bool) {
if current.path == latest.path {
return (current.clone(), false);
}
(
StreamFileState {
path: latest.path.clone(),
file_name: latest.file_name.clone(),
cursor: 0,
},
true,
)
}
fn map_open_err(err: std::io::Error) -> ApiErr {
match err.kind() {
std::io::ErrorKind::NotFound => ApiErr::NotFound("log file not found".to_string(), None),
_ => ApiErr::Internal("failed to access log file".to_string(), None),
}
}
#[cfg(test)]
mod tests {
use super::{advance_stream_file, StreamFileState};
use std::path::PathBuf;
#[test]
fn advance_stream_file_switches_to_latest_file_and_resets_cursor() {
let current = StreamFileState {
path: PathBuf::from("logs/app.log"),
file_name: "app.log".to_string(),
cursor: 128,
};
let latest = StreamFileState {
path: PathBuf::from("logs/app.log.1"),
file_name: "app.log.1".to_string(),
cursor: 42,
};
let (next, switched) = advance_stream_file(&current, &latest);
assert!(switched);
assert_eq!(next.path, latest.path);
assert_eq!(next.file_name, latest.file_name);
assert_eq!(next.cursor, 0);
}
}

View File

@ -21,18 +21,6 @@ use crate::{
AppState,
};
async fn notify_units(
state: &AppState,
unit_ids: impl IntoIterator<Item = Uuid>,
) {
let mut seen = std::collections::HashSet::new();
for unit_id in unit_ids {
if seen.insert(unit_id) {
state.control_runtime.notify_unit(unit_id).await;
}
}
}
/// List all points.
#[derive(Deserialize, Validate)]
pub struct GetPointListQuery {
@ -173,14 +161,12 @@ pub struct UpdatePointReq {
/// Request payload for batch setting point tags.
#[derive(Deserialize, Validate)]
pub struct BatchSetPointTagsReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
pub tag_id: Option<Uuid>,
}
#[derive(Deserialize, Validate)]
pub struct BatchSetPointEquipmentReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
pub equipment_id: Option<Uuid>,
pub signal_role: Option<String>,
@ -239,7 +225,6 @@ pub async fn update_point(
if existing_point.is_none() {
return Err(ApiErr::NotFound("Point not found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new("UPDATE point SET ");
let mut wrote_field = false;
@ -295,9 +280,6 @@ pub async fn update_point(
qb.push(" WHERE id = ").push_bind(point_id);
qb.build().execute(pool).await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await;
Ok(Json(
serde_json::json!({"ok_msg": "Point updated successfully"}),
))
@ -398,8 +380,6 @@ pub async fn batch_set_point_equipment(
return Err(ApiErr::NotFound("No valid points found".to_string(), None));
}
let before_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
let result = sqlx::query(
r#"
UPDATE point
@ -415,9 +395,6 @@ pub async fn batch_set_point_equipment(
.execute(pool)
.await?;
let after_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &existing_points).await?;
notify_units(&state, before_unit_ids.into_iter().chain(after_unit_ids)).await;
Ok(Json(serde_json::json!({
"ok_msg": "Point equipment updated successfully",
"updated_count": result.rows_affected()
@ -430,7 +407,6 @@ pub async fn delete_point(
Path(point_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &[point_id]).await?;
let source_id = {
let grouped = crate::service::get_points_grouped_by_source(pool, &[point_id]).await?;
@ -464,8 +440,6 @@ pub async fn delete_point(
}
}
notify_units(&state, affected_unit_ids).await;
Ok(Json(
serde_json::json!({"ok_msg": "Point deleted successfully"}),
))
@ -474,7 +448,6 @@ pub async fn delete_point(
#[derive(Deserialize, Validate)]
/// Request payload for batch point creation from node ids.
pub struct BatchCreatePointsReq {
#[validate(length(min = 1, max = 500))]
pub node_ids: Vec<Uuid>,
}
@ -590,7 +563,6 @@ pub async fn batch_create_points(
#[derive(Deserialize, Validate)]
/// Request payload for batch point deletion.
pub struct BatchDeletePointsReq {
#[validate(length(min = 1, max = 500))]
pub point_ids: Vec<Uuid>,
}
@ -618,7 +590,6 @@ pub async fn batch_delete_points(
let point_ids = payload.point_ids;
let grouped = crate::service::get_points_grouped_by_source(pool, &point_ids).await?;
let affected_unit_ids = crate::service::get_unit_ids_by_point_ids(pool, &point_ids).await?;
let existing_point_ids: Vec<Uuid> = grouped
.values()
.flat_map(|points| points.iter().map(|p| p.point_id))
@ -646,8 +617,6 @@ pub async fn batch_delete_points(
}
}
notify_units(&state, affected_unit_ids).await;
Ok(Json(BatchDeletePointsRes {
deleted_count: result.rows_affected(),
}))

View File

@ -171,24 +171,23 @@ fn build_node_tree(nodes: Vec<Node>) -> Vec<TreeNode> {
id: Uuid,
node_map: &mut HashMap<Uuid, TreeNode>,
children_map: &HashMap<Uuid, Vec<Uuid>>,
) -> Option<TreeNode> {
let mut node = node_map.remove(&id)?;
) -> TreeNode {
let mut node = node_map.remove(&id).unwrap();
if let Some(child_ids) = children_map.get(&id) {
for &cid in child_ids {
if let Some(child) = attach_children(cid, node_map, children_map) {
node.children.push(child);
}
let child = attach_children(cid, node_map, children_map);
node.children.push(child);
}
}
Some(node)
node
}
// ③ 生成最终树
roots
.into_iter()
.filter_map(|rid| attach_children(rid, &mut node_map, &children_map))
.map(|rid| attach_children(rid, &mut node_map, &children_map))
.collect()
}
@ -312,19 +311,19 @@ pub async fn delete_source(
) -> Result<impl IntoResponse, ApiErr> {
let pool = &state.pool;
let source_name = sqlx::query_scalar::<_, String>("SELECT name FROM source WHERE id = $1")
.bind(source_id)
.fetch_optional(pool)
.await?
.ok_or_else(|| ApiErr::NotFound(format!("Source with id {} not found", source_id), None))?;
sqlx::query("DELETE FROM source WHERE id = $1")
// 删除source
let result = sqlx::query("DELETE FROM source WHERE id = $1")
.bind(source_id)
.execute(pool)
.await?;
// 检查是否删除了记录
if result.rows_affected() == 0 {
return Err(ApiErr::NotFound(format!("Source with id {} not found", source_id), None));
}
// 触发 SourceDelete 事件
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id, source_name });
let _ = state.event_manager.send(crate::event::AppEvent::SourceDelete { source_id });
Ok(StatusCode::NO_CONTENT)
}

View File

@ -104,9 +104,6 @@ async fn main() {
control_runtime: control_runtime.clone(),
};
control::engine::start(state.clone(), control_runtime);
if config.simulate_plc {
control::simulate::start(state.clone());
}
let app = build_router(state.clone());
let addr = format!("{}:{}", config.server_host, config.server_port);
tracing::info!("Starting server at http://{}", addr);
@ -280,8 +277,7 @@ fn build_router(state: AppState) -> Router {
)
.route("/api/logs", get(handler::log::get_logs))
.route("/api/logs/stream", get(handler::log::stream_logs))
.route("/api/docs/api-md", get(handler::doc::get_api_md))
.route("/api/docs/readme-md", get(handler::doc::get_readme_md));
.route("/api/docs/api-md", get(handler::doc::get_api_md));
Router::new()
.merge(all_route)

View File

@ -2,14 +2,6 @@ use crate::model::{ControlUnit, EventRecord};
use sqlx::{PgPool, QueryBuilder, Row};
use uuid::Uuid;
fn unit_order_clause() -> &'static str {
"code"
}
fn equipment_order_clause_with_unit() -> &'static str {
"unit_id, code"
}
#[derive(Debug, Clone)]
pub struct EquipmentRolePoint {
pub point_id: Uuid,
@ -43,36 +35,31 @@ pub async fn get_units_paginated(
page_size: i32,
offset: u32,
) -> Result<Vec<ControlUnit>, sqlx::Error> {
let unit_order = unit_order_clause();
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
if page_size == -1 {
let sql = format!(
sqlx::query_as::<_, ControlUnit>(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY {}
ORDER BY created_at
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
)
.bind(like)
.fetch_all(pool)
.await
} else {
let sql = format!(
sqlx::query_as::<_, ControlUnit>(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY {}
ORDER BY created_at
LIMIT $2 OFFSET $3
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
)
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
@ -82,21 +69,18 @@ pub async fn get_units_paginated(
}
None => {
if page_size == -1 {
let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order);
sqlx::query_as::<_, ControlUnit>(&sql)
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit ORDER BY created_at"#)
.fetch_all(pool)
.await
} else {
let sql = format!(
sqlx::query_as::<_, ControlUnit>(
r#"
SELECT *
FROM unit
ORDER BY {}
ORDER BY created_at
LIMIT $1 OFFSET $2
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
@ -325,28 +309,9 @@ pub async fn get_events_paginated(
}
pub async fn get_all_enabled_units(pool: &PgPool) -> Result<Vec<ControlUnit>, sqlx::Error> {
let sql = format!(
"SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}",
unit_order_clause()
);
sqlx::query_as::<_, ControlUnit>(&sql)
.fetch_all(pool)
.await
}
pub async fn get_equipment_by_unit_ids(
pool: &PgPool,
unit_ids: &[Uuid],
) -> Result<Vec<crate::model::Equipment>, sqlx::Error> {
if unit_ids.is_empty() {
return Ok(vec![]);
}
let sql = format!(
"SELECT * FROM equipment WHERE unit_id = ANY($1) ORDER BY {}",
equipment_order_clause_with_unit()
);
sqlx::query_as::<_, crate::model::Equipment>(&sql)
.bind(unit_ids)
sqlx::query_as::<_, ControlUnit>(
r#"SELECT * FROM unit WHERE enabled = TRUE ORDER BY created_at"#,
)
.fetch_all(pool)
.await
}
@ -355,11 +320,9 @@ pub async fn get_equipment_by_unit_id(
pool: &PgPool,
unit_id: Uuid,
) -> Result<Vec<crate::model::Equipment>, sqlx::Error> {
let sql = format!(
"SELECT * FROM equipment WHERE unit_id = $1 ORDER BY {}",
unit_order_clause()
);
sqlx::query_as::<_, crate::model::Equipment>(&sql)
sqlx::query_as::<_, crate::model::Equipment>(
r#"SELECT * FROM equipment WHERE unit_id = $1 ORDER BY created_at"#,
)
.bind(unit_id)
.fetch_all(pool)
.await
@ -380,105 +343,6 @@ pub async fn get_points_by_equipment_ids(
.await
}
pub async fn get_unit_ids_by_equipment_ids(
pool: &PgPool,
equipment_ids: &[Uuid],
) -> Result<Vec<Uuid>, sqlx::Error> {
if equipment_ids.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT DISTINCT unit_id
FROM equipment
WHERE id = ANY($1)
AND unit_id IS NOT NULL
"#,
)
.bind(equipment_ids)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub async fn get_unit_ids_by_point_ids(
pool: &PgPool,
point_ids: &[Uuid],
) -> Result<Vec<Uuid>, sqlx::Error> {
if point_ids.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT DISTINCT e.unit_id
FROM point p
INNER JOIN equipment e ON e.id = p.equipment_id
WHERE p.id = ANY($1)
AND e.unit_id IS NOT NULL
"#,
)
.bind(point_ids)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub struct EquipmentSignalRole {
pub equipment_id: Uuid,
pub point_id: Uuid,
pub signal_role: String,
}
/// Batch fetch all signal-role points for multiple equipment IDs in one query.
pub async fn get_signal_role_points_batch(
pool: &PgPool,
equipment_ids: &[Uuid],
) -> Result<Vec<EquipmentSignalRole>, sqlx::Error> {
if equipment_ids.is_empty() {
return Ok(vec![]);
}
let rows = sqlx::query(
r#"
SELECT p.equipment_id, p.id AS point_id, p.signal_role
FROM point p
WHERE p.equipment_id = ANY($1)
AND p.signal_role IS NOT NULL
ORDER BY p.equipment_id, p.created_at
"#,
)
.bind(equipment_ids)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|row| EquipmentSignalRole {
equipment_id: row.get("equipment_id"),
point_id: row.get("point_id"),
signal_role: row.get("signal_role"),
})
.collect())
}
#[cfg(test)]
mod tests {
use super::{equipment_order_clause_with_unit, unit_order_clause};
#[test]
fn unit_ordering_defaults_to_code() {
assert_eq!(unit_order_clause(), "code");
}
#[test]
fn unit_equipment_ordering_uses_code_within_unit() {
assert_eq!(equipment_order_clause_with_unit(), "unit_id, code");
}
}
pub async fn get_equipment_role_points(
pool: &PgPool,
equipment_id: Uuid,

View File

@ -5,10 +5,6 @@ use crate::{
use sqlx::{query_as, PgPool, Row};
use uuid::Uuid;
fn equipment_order_clause() -> &'static str {
"e.code"
}
pub async fn get_points_by_equipment_id(
pool: &PgPool,
equipment_id: uuid::Uuid,
@ -53,12 +49,11 @@ pub async fn get_equipment_paginated(
page_size: i32,
offset: u32,
) -> Result<Vec<EquipmentListItem>, sqlx::Error> {
let equipment_order = equipment_order_clause();
let rows = match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
if page_size == -1 {
let sql = format!(
sqlx::query(
r#"
SELECT
e.*,
@ -67,16 +62,14 @@ pub async fn get_equipment_paginated(
LEFT JOIN point p ON p.equipment_id = e.id
WHERE e.code ILIKE $1 OR e.name ILIKE $1
GROUP BY e.id
ORDER BY {}
ORDER BY e.created_at
"#,
equipment_order
);
sqlx::query(&sql)
)
.bind(like)
.fetch_all(pool)
.await?
} else {
let sql = format!(
sqlx::query(
r#"
SELECT
e.*,
@ -85,12 +78,10 @@ pub async fn get_equipment_paginated(
LEFT JOIN point p ON p.equipment_id = e.id
WHERE e.code ILIKE $1 OR e.name ILIKE $1
GROUP BY e.id
ORDER BY {}
ORDER BY e.created_at
LIMIT $2 OFFSET $3
"#,
equipment_order
);
sqlx::query(&sql)
)
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
@ -100,7 +91,7 @@ pub async fn get_equipment_paginated(
}
None => {
if page_size == -1 {
let sql = format!(
sqlx::query(
r#"
SELECT
e.*,
@ -108,15 +99,13 @@ pub async fn get_equipment_paginated(
FROM equipment e
LEFT JOIN point p ON p.equipment_id = e.id
GROUP BY e.id
ORDER BY {}
ORDER BY e.created_at
"#,
equipment_order
);
sqlx::query(&sql)
)
.fetch_all(pool)
.await?
} else {
let sql = format!(
sqlx::query(
r#"
SELECT
e.*,
@ -124,12 +113,10 @@ pub async fn get_equipment_paginated(
FROM equipment e
LEFT JOIN point p ON p.equipment_id = e.id
GROUP BY e.id
ORDER BY {}
ORDER BY e.created_at
LIMIT $1 OFFSET $2
"#,
equipment_order
);
sqlx::query(&sql)
)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
@ -152,7 +139,6 @@ pub async fn get_equipment_paginated(
updated_at: row.get("updated_at"),
},
point_count: row.get::<i64, _>("point_count"),
role_points: vec![],
})
.collect())
}
@ -297,13 +283,3 @@ pub async fn batch_set_equipment_unit(
Ok(result.rows_affected())
}
#[cfg(test)]
mod tests {
use super::equipment_order_clause;
#[test]
fn equipment_ordering_defaults_to_code() {
assert_eq!(equipment_order_clause(), "e.code");
}
}

View File

@ -5,7 +5,7 @@
<button type="button" class="tab-btn" id="tabConfig">配置</button>
</div>
<div class="topbar-actions">
<button type="button" class="secondary" id="openReadmeDoc">README.md</button>
<button type="button" class="secondary" id="clearEquipmentFilter">设备筛选: 全部</button>
<button type="button" class="secondary" id="openApiDoc">API.md</button>
<div class="status" id="statusText">
<span class="ws-dot" id="wsDot"></span>

View File

@ -1,10 +1,11 @@
import { withStatus } from "./api.js";
import { openChart, renderChart } from "./chart.js";
import { dom } from "./dom.js";
import { closeApiDocDrawer, openApiDocDrawer, openReadmeDrawer } from "./docs.js";
import { closeApiDocDrawer, openApiDocDrawer } from "./docs.js";
import { loadEvents } from "./events.js";
import {
applyBatchEquipmentUnit,
clearEquipmentFilter,
clearPointBinding,
clearSelectedEquipments,
closeEquipmentModal,
@ -34,8 +35,6 @@ import { state } from "./state.js";
import { loadSources, saveSource } from "./sources.js";
import { closeUnitModal, loadUnits, openCreateUnitModal, resetUnitForm, renderUnits, saveUnit } from "./units.js";
let _configLoaded = false;
function switchView(view) {
state.activeView = view;
const main = document.querySelector("main");
@ -61,13 +60,6 @@ function switchView(view) {
if (view === "config") {
startLogs();
if (!_configLoaded) {
_configLoaded = true;
withStatus((async () => {
await Promise.all([loadSources(), loadEquipments(), loadEvents()]);
await loadPoints();
})());
}
} else {
stopLogs();
}
@ -90,6 +82,7 @@ function bindEvents() {
dom.refreshEquipmentBtn.addEventListener("click", () => withStatus(loadEquipments()));
dom.newEquipmentBtn.addEventListener("click", openCreateEquipmentModal);
dom.closeEquipmentModalBtn.addEventListener("click", closeEquipmentModal);
dom.clearEquipmentFilterBtn.addEventListener("click", () => withStatus(clearEquipmentFilter()));
dom.applyEquipmentUnitBtn.addEventListener("click", () => withStatus(applyBatchEquipmentUnit()));
dom.clearEquipmentSelectionBtn.addEventListener("click", clearSelectedEquipments);
@ -130,7 +123,6 @@ function bindEvents() {
});
});
dom.openReadmeDocBtn.addEventListener("click", () => withStatus(openReadmeDrawer()));
dom.openApiDocBtn.addEventListener("click", () => withStatus(openApiDocDrawer()));
dom.closeApiDocBtn.addEventListener("click", closeApiDocDrawer);
dom.refreshEventBtn.addEventListener("click", () => withStatus(loadEvents()));
@ -169,8 +161,7 @@ function bindEvents() {
document.addEventListener("equipments-updated", () => {
renderUnits();
// Re-fetch units so embedded equipment data stays in sync with config changes.
loadUnits().catch(() => {});
renderOpsUnits();
});
document.addEventListener("units-loaded", () => {
@ -188,8 +179,12 @@ async function bootstrap() {
renderChart();
startPointSocket();
await withStatus(Promise.all([loadUnits(), loadEvents()]));
await withStatus(loadUnits());
startOps();
await withStatus(loadSources());
await withStatus(loadEquipments());
await withStatus(loadEvents());
await withStatus(loadPoints());
}
bootstrap();

View File

@ -82,11 +82,11 @@ function parseMarkdown(text) {
return { html: blocks.join(""), headings };
}
async function loadDoc(url, emptyMessage) {
const text = await apiFetch(url);
export async function loadApiDoc() {
const text = await apiFetch("/api/docs/api-md");
const { html, headings } = parseMarkdown(text || "");
dom.apiDocContent.innerHTML = html || `<p>${emptyMessage}</p>`;
dom.apiDocContent.innerHTML = html || "<p>API.md 为空</p>";
dom.apiDocToc.innerHTML = headings.length
? headings
.map(
@ -110,25 +110,14 @@ async function loadDoc(url, emptyMessage) {
}
});
});
state.apiDocLoaded = true;
}
export async function openApiDocDrawer() {
const title = dom.apiDocDrawer.querySelector("h3");
if (title) title.textContent = "API.md";
dom.apiDocDrawer.classList.remove("hidden");
if (state.docDrawerSource !== "api") {
state.docDrawerSource = "api";
await loadDoc("/api/docs/api-md", "API.md 为空");
}
}
export async function openReadmeDrawer() {
const title = dom.apiDocDrawer.querySelector("h3");
if (title) title.textContent = "README.md";
dom.apiDocDrawer.classList.remove("hidden");
if (state.docDrawerSource !== "readme") {
state.docDrawerSource = "readme";
await loadDoc("/api/docs/readme-md", "README.md 为空");
if (!state.apiDocLoaded) {
await loadApiDoc();
}
}

View File

@ -20,6 +20,7 @@ export const dom = {
selectedCount: byId("selectedCount"),
selectedPointCount: byId("selectedPointCount"),
pointFilterSummary: byId("pointFilterSummary"),
clearEquipmentFilterBtn: byId("clearEquipmentFilter"),
pointSourceSelect: byId("pointSourceSelect"),
pointSourceNodeCount: byId("pointSourceNodeCount"),
openPointModalBtn: byId("openPointModal"),
@ -81,7 +82,6 @@ export const dom = {
batchBindingSignalRole: byId("batchBindingSignalRole"),
apiDocToc: byId("apiDocToc"),
apiDocContent: byId("apiDocContent"),
openReadmeDocBtn: byId("openReadmeDoc"),
openApiDocBtn: byId("openApiDoc"),
closeApiDocBtn: byId("closeApiDoc"),
refreshChartBtn: byId("refreshChart"),

View File

@ -146,6 +146,13 @@ export function renderEquipments() {
dom.equipmentList.innerHTML = "";
updateSelectedEquipmentSummary();
const activeEquipment = state.selectedEquipmentId
? state.equipmentMap.get(state.selectedEquipmentId) || null
: null;
dom.clearEquipmentFilterBtn.textContent = activeEquipment
? `设备筛选 ${activeEquipment.name}`
: "设备筛选 全部";
const items = filteredEquipments();
if (!items.length) {
dom.equipmentList.innerHTML = '<div class="list-item"><div class="muted">No equipment</div></div>';

View File

@ -14,9 +14,8 @@ function formatTime(value) {
function makeCard(item) {
const row = document.createElement("div");
const level = (item.level || "info").toLowerCase();
row.className = "event-card";
row.innerHTML = `<span class="badge event-badge level-${level}">${level.toUpperCase()}</span><span class="muted event-time">${formatTime(item.created_at)}</span><span class="event-type">${item.event_type}</span><span class="event-message">${item.message}</span>`;
row.innerHTML = `<div class="event-meta"><span class="badge">${(item.level || "info").toUpperCase()}</span><span class="muted event-time">${formatTime(item.created_at)}</span><strong class="event-type">${item.event_type}</strong></div><div class="event-message">${item.message}</div>`;
return row;
}
@ -72,10 +71,6 @@ export function prependEvent(item) {
if (placeholder) placeholder.remove();
dom.eventList.insertBefore(makeCard(item), dom.eventList.firstChild);
// Keep DOM bounded to prevent unbounded growth
const cards = dom.eventList.querySelectorAll(".event-card");
if (cards.length > 100) cards[cards.length - 1].remove();
}
dom.eventList.addEventListener("scroll", () => {

View File

@ -3,8 +3,7 @@ import { dom } from "./dom.js";
import { prependEvent } from "./events.js";
import { formatValue } from "./points.js";
import { state } from "./state.js";
import { loadUnits, renderUnits } from "./units.js";
import { loadEquipments } from "./equipment.js";
import { renderUnits } from "./units.js";
import { showToast } from "./api.js";
function escapeHtml(text) {
@ -17,7 +16,7 @@ function parseLogLine(line) {
try { return JSON.parse(trimmed); } catch { return null; }
}
function appendLog(line) {
export function appendLog(line) {
if (!dom.logView) return;
const atBottom = dom.logView.scrollTop + dom.logView.clientHeight >= dom.logView.scrollHeight - 10;
const div = document.createElement("div");
@ -40,26 +39,11 @@ function appendLog(line) {
if (atBottom) dom.logView.scrollTop = dom.logView.scrollHeight;
}
function appendLogDivider(text) {
if (!dom.logView) return;
const atBottom = dom.logView.scrollTop + dom.logView.clientHeight >= dom.logView.scrollHeight - 10;
const div = document.createElement("div");
div.className = "log-line muted";
div.textContent = text;
dom.logView.appendChild(div);
if (atBottom) dom.logView.scrollTop = dom.logView.scrollHeight;
}
export function startLogs() {
if (state.logSource) return;
let currentLogFile = null;
state.logSource = new EventSource("/api/logs/stream");
state.logSource.addEventListener("log", (event) => {
const data = JSON.parse(event.data);
if (data.reset && data.file && data.file !== currentLogFile) {
appendLogDivider(`[log switched to ${data.file}]`);
}
currentLogFile = data.file || currentLogFile;
(data.lines || []).forEach(appendLog);
});
state.logSource.addEventListener("error", () => appendLog("[log stream error]"));
@ -95,23 +79,12 @@ function setWsStatus(connected) {
}
}
let _reconnectDelay = 1000;
let _connectedOnce = false;
export function startPointSocket() {
const protocol = location.protocol === "https:" ? "wss" : "ws";
const ws = new WebSocket(`${protocol}://${location.host}/ws/public`);
state.pointSocket = ws;
ws.onopen = () => {
setWsStatus(true);
_reconnectDelay = 1000;
if (_connectedOnce) {
loadUnits().catch(() => {});
if (state.activeView === "config") loadEquipments().catch(() => {});
}
_connectedOnce = true;
};
ws.onopen = () => setWsStatus(true);
ws.onmessage = (event) => {
try {
@ -128,16 +101,12 @@ export function startPointSocket() {
entry.time.textContent = data.timestamp || "--";
}
// ops view signal pill
// ops view signal cell
const opsEntry = state.opsPointEls.get(data.point_id);
if (opsEntry) {
const { pillEl, syncBtns } = opsEntry;
state.opsSignalCache.set(data.point_id, { quality: data.quality, value_text: data.value_text });
const role = pillEl.dataset.opsRole;
import("./ops.js").then(({ sigPillClass }) => {
pillEl.className = sigPillClass(role, data.quality, data.value_text);
syncBtns?.();
});
opsEntry.valueEl.textContent = formatValue(data);
opsEntry.qualityEl.className = `badge quality-${(data.quality || "unknown").toLowerCase()}`;
opsEntry.qualityEl.textContent = (data.quality || "unknown").toUpperCase();
}
if (state.chartPointId === data.point_id) {
@ -157,7 +126,7 @@ export function startPointSocket() {
// lazy import to avoid circular dep (ops.js -> logs.js -> ops.js)
import("./ops.js").then(({ renderOpsUnits, syncEquipmentButtonsForUnit }) => {
renderOpsUnits();
syncEquipmentButtonsForUnit(runtime.unit_id);
syncEquipmentButtonsForUnit(runtime.unit_id, runtime.auto_enabled);
});
return;
}
@ -168,8 +137,7 @@ export function startPointSocket() {
ws.onclose = () => {
setWsStatus(false);
window.setTimeout(startPointSocket, _reconnectDelay);
_reconnectDelay = Math.min(_reconnectDelay * 2, 30000);
window.setTimeout(startPointSocket, 2000);
};
ws.onerror = () => setWsStatus(false);

View File

@ -1,24 +1,12 @@
import { apiFetch } from "./api.js";
import { dom } from "./dom.js";
import { formatValue } from "./points.js";
import { state } from "./state.js";
import { loadUnits } from "./units.js";
const SIGNAL_ROLES = ["rem", "run", "flt"];
const ROLE_LABELS = { rem: "REM", run: "RUN", flt: "FLT" };
function isSignalOn(quality, valueText) {
if (!quality || quality.toLowerCase() !== "good") return false;
const v = String(valueText ?? "").trim().toLowerCase();
return v === "1" || v === "true" || v === "on";
}
export function sigPillClass(role, quality, valueText) {
if (!quality || quality.toLowerCase() !== "good") return "sig-pill sig-warn";
const on = isSignalOn(quality, valueText);
if (!on) return "sig-pill";
return role === "flt" ? "sig-pill sig-fault" : "sig-pill sig-on";
}
function runtimeBadge(runtime) {
if (!runtime) return '<span class="badge offline">OFFLINE</span>';
if (runtime.comm_locked) return '<span class="badge offline">COMM ERR</span>';
@ -46,7 +34,7 @@ export function renderOpsUnits() {
<div class="ops-unit-item-meta">
${runtimeBadge(runtime)}
<span class="badge ${unit.enabled ? "" : "offline"}">${unit.enabled ? "EN" : "DIS"}</span>
${runtime ? `<span class="muted">Acc ${Math.floor(runtime.display_acc_sec / 1000)}s</span>` : ""}
${runtime ? `<span class="muted">Acc ${Math.floor(runtime.accumulated_run_sec / 1000)}s</span>` : ""}
</div>
<div class="ops-unit-item-actions"></div>
`;
@ -55,14 +43,10 @@ export function renderOpsUnits() {
const actions = item.querySelector(".ops-unit-item-actions");
const isAutoOn = runtime?.auto_enabled;
const startBlocked = !isAutoOn && (runtime?.fault_locked || runtime?.manual_ack_required);
const autoBtn = document.createElement("button");
autoBtn.className = isAutoOn ? "danger" : "secondary";
autoBtn.textContent = isAutoOn ? "Stop Auto" : "Start Auto";
autoBtn.disabled = startBlocked;
autoBtn.title = startBlocked
? (runtime?.fault_locked ? "设备故障中,无法启动自动控制" : "需人工确认故障后才可启动自动控制")
: (isAutoOn ? "停止自动控制" : "启动自动控制");
autoBtn.title = isAutoOn ? "停止自动控制" : "启动自动控制";
autoBtn.addEventListener("click", (e) => {
e.stopPropagation();
apiFetch(`/api/control/unit/${unit.id}/${isAutoOn ? "stop-auto" : "start-auto"}`, { method: "POST" })
@ -87,30 +71,41 @@ export function renderOpsUnits() {
});
}
function selectOpsUnit(unitId) {
async function selectOpsUnit(unitId) {
state.selectedOpsUnitId = unitId === state.selectedOpsUnitId ? null : unitId;
renderOpsUnits();
state.opsPointEls.clear();
if (!state.selectedOpsUnitId) {
renderOpsEquipments(state.units.flatMap((u) => u.equipments || []));
await loadAllEquipmentCards();
return;
}
const unit = state.unitMap.get(unitId);
renderOpsEquipments(unit ? (unit.equipments || []) : []);
dom.opsEquipmentArea.innerHTML = '<div class="muted ops-placeholder">加载中...</div>';
state.opsPointEls.clear();
const detail = await apiFetch(`/api/unit/${state.selectedOpsUnitId}/detail`);
renderOpsEquipments(detail.equipments || []);
}
export function loadAllEquipmentCards() {
export async function loadAllEquipmentCards() {
if (!dom.opsEquipmentArea) return;
if (!state.units.length) {
dom.opsEquipmentArea.innerHTML = '<div class="muted ops-placeholder">暂无控制单元</div>';
return;
}
dom.opsEquipmentArea.innerHTML = '<div class="muted ops-placeholder">加载中...</div>';
state.opsPointEls.clear();
renderOpsEquipments(state.units.flatMap((u) => u.equipments || []));
const details = await Promise.all(
state.units.map((u) => apiFetch(`/api/unit/${u.id}/detail`).catch(() => ({ equipments: [] })))
);
const allEquipments = details.flatMap((d) => d.equipments || []);
renderOpsEquipments(allEquipments);
}
function renderOpsEquipments(equipments) {
dom.opsEquipmentArea.innerHTML = "";
state.opsUnitSyncFns.clear();
if (!equipments.length) {
dom.opsEquipmentArea.innerHTML = '<div class="muted ops-placeholder">该单元下暂无设备</div>';
return;
@ -120,18 +115,25 @@ function renderOpsEquipments(equipments) {
const card = document.createElement("div");
card.className = "ops-eq-card";
// Build role → point map
const roleMap = {};
(eq.role_points || []).forEach((p) => { roleMap[p.signal_role] = p; });
(eq.points || []).forEach((p) => {
if (p.signal_role) roleMap[p.signal_role] = p;
});
// Signal pills — one pill per bound role, text label inside
// Signal rows HTML (placeholders; WS will fill values)
const signalRowsHtml = SIGNAL_ROLES.map((role) => {
const point = roleMap[role];
if (!point) return "";
return `<span class="sig-pill sig-warn" data-ops-dot="${point.point_id}" data-ops-role="${role}">${ROLE_LABELS[role] || role}</span>`;
return `
<div class="ops-signal-row">
<span class="ops-signal-label">${ROLE_LABELS[role] || role}</span>
<span class="badge quality-unknown" data-ops-quality="${point.id}">?</span>
<span class="ops-signal-value" data-ops-value="${point.id}">--</span>
</div>`;
}).join("");
const canControl = eq.kind === "coal_feeder" || eq.kind === "distributor";
const unitId = eq.unit_id ?? null;
card.innerHTML = `
<div class="ops-eq-card-head">
@ -139,77 +141,55 @@ function renderOpsEquipments(equipments) {
<span class="badge">${eq.kind || "--"}</span>
</div>
<div class="ops-signal-rows">${signalRowsHtml || '<span class="muted" style="font-size:11px;padding:2px 0">无绑定信号</span>'}</div>
${canControl ? `<div class="ops-eq-card-actions" data-unit-id="${unitId || ""}"></div>` : ""}
${canControl ? `<div class="ops-eq-card-actions" data-unit-id="${eq.unit_id || ""}"></div>` : ""}
`;
let syncBtns = null;
if (canControl) {
const actions = card.querySelector(".ops-eq-card-actions");
const remPointId = roleMap["rem"]?.point_id ?? null;
const fltPointId = roleMap["flt"]?.point_id ?? null;
const autoOn = !!(eq.unit_id && state.runtimes.get(eq.unit_id)?.auto_enabled);
const startBtn = document.createElement("button");
startBtn.className = "secondary";
startBtn.textContent = "Start";
startBtn.disabled = autoOn;
startBtn.title = autoOn ? "自动控制运行中,请先停止自动" : "";
startBtn.addEventListener("click", () =>
apiFetch(`/api/control/equipment/${eq.id}/start`, { method: "POST" }).catch(() => {})
);
const stopBtn = document.createElement("button");
stopBtn.className = "danger";
stopBtn.textContent = "Stop";
stopBtn.disabled = autoOn;
stopBtn.title = autoOn ? "自动控制运行中,请先停止自动" : "";
stopBtn.addEventListener("click", () =>
apiFetch(`/api/control/equipment/${eq.id}/stop`, { method: "POST" }).catch(() => {})
);
actions.append(startBtn, stopBtn);
syncBtns = function () {
const autoOn = !!(unitId && state.runtimes.get(unitId)?.auto_enabled);
const remSig = remPointId ? state.opsSignalCache.get(remPointId) : null;
const fltSig = fltPointId ? state.opsSignalCache.get(fltPointId) : null;
const remOk = !remPointId || isSignalOn(remSig?.quality, remSig?.value_text);
const fltActive = !!(fltPointId && isSignalOn(fltSig?.quality, fltSig?.value_text));
const disabled = autoOn || !remOk || fltActive;
const title = autoOn ? "自动控制运行中,请先停止自动"
: !remOk ? "设备未切换至远程模式"
: fltActive ? "设备故障中"
: "";
startBtn.disabled = disabled;
stopBtn.disabled = disabled;
startBtn.title = title;
stopBtn.title = title;
};
}
dom.opsEquipmentArea.appendChild(card);
// Register pills for WS updates; seed signal cache from initial point_monitor data
// Register DOM elements for WS updates, then seed from cached monitor data
SIGNAL_ROLES.forEach((role) => {
const point = roleMap[role];
if (!point) return;
const pillEl = card.querySelector(`[data-ops-dot="${point.point_id}"]`);
if (!pillEl) return;
if (point.point_monitor) {
const m = point.point_monitor;
state.opsSignalCache.set(point.point_id, { quality: m.quality, value_text: m.value_text });
pillEl.className = sigPillClass(role, m.quality, m.value_text);
const valueEl = card.querySelector(`[data-ops-value="${point.id}"]`);
const qualityEl = card.querySelector(`[data-ops-quality="${point.id}"]`);
if (valueEl && qualityEl) {
state.opsPointEls.set(point.id, { valueEl, qualityEl });
if (point.point_monitor) {
const m = point.point_monitor;
valueEl.textContent = formatValue(m);
qualityEl.className = `badge quality-${(m.quality || "unknown").toLowerCase()}`;
qualityEl.textContent = (m.quality || "unknown").toUpperCase();
}
}
const isSyncRole = canControl && (role === "rem" || role === "flt");
state.opsPointEls.set(point.point_id, { pillEl, syncBtns: isSyncRole ? syncBtns : null });
});
if (canControl) {
syncBtns();
if (unitId) {
if (!state.opsUnitSyncFns.has(unitId)) state.opsUnitSyncFns.set(unitId, new Set());
state.opsUnitSyncFns.get(unitId).add(syncBtns);
}
}
});
}
export function startOps() {
renderOpsUnits();
loadAllEquipmentCards();
dom.batchStartAutoBtn?.addEventListener("click", () => {
apiFetch("/api/control/unit/batch-start-auto", { method: "POST" })
@ -224,7 +204,15 @@ export function startOps() {
});
}
/** Called by WS handler when a unit's runtime changes — re-evaluates all equipment button states. */
export function syncEquipmentButtonsForUnit(unitId) {
state.opsUnitSyncFns.get(unitId)?.forEach((fn) => fn());
/** Called by WS handler when a unit's runtime changes — syncs manual button disabled state. */
export function syncEquipmentButtonsForUnit(unitId, autoEnabled) {
if (!dom.opsEquipmentArea) return;
dom.opsEquipmentArea
.querySelectorAll(`.ops-eq-card-actions[data-unit-id="${unitId}"]`)
.forEach((actions) => {
actions.querySelectorAll("button").forEach((btn) => {
btn.disabled = autoEnabled;
btn.title = autoEnabled ? "自动控制运行中,请先停止自动" : "";
});
});
}

View File

@ -19,12 +19,10 @@ export const state = {
chartPointName: "",
chartData: [],
pointSocket: null,
docDrawerSource: null, // null | "api" | "readme"
apiDocLoaded: false,
runtimes: new Map(), // unit_id -> UnitRuntime
activeView: "ops", // "ops" | "config"
opsPointEls: new Map(), // point_id -> { pillEl, syncBtns? }
opsSignalCache: new Map(), // point_id -> { quality, value_text }
opsUnitSyncFns: new Map(), // unit_id -> Set<syncBtns fn>
opsPointEls: new Map(), // point_id -> { valueEl, qualityEl }
logSource: null,
selectedOpsUnitId: null,
};

View File

@ -34,10 +34,10 @@ export function resetUnitForm() {
dom.unitId.value = "";
dom.unitEnabled.checked = true;
dom.unitManualAck.checked = true;
dom.unitRunTimeSec.value = "10";
dom.unitStopTimeSec.value = "10";
dom.unitAccTimeSec.value = "20";
dom.unitBlTimeSec.value = "10";
dom.unitRunTimeSec.value = "0";
dom.unitStopTimeSec.value = "0";
dom.unitAccTimeSec.value = "0";
dom.unitBlTimeSec.value = "0";
}
function openUnitModal() {
@ -117,7 +117,7 @@ export function renderUnits() {
<span class="badge ${unit.enabled ? "" : "offline"}">${unit.enabled ? "EN" : "DIS"}</span>
</div>
<div>${unit.name}</div>
<div class="muted">设备 ${equipmentCount(unit.id)} | Acc ${runtime ? Math.floor(runtime.display_acc_sec / 1000) : 0}s</div>
<div class="muted">设备 ${equipmentCount(unit.id)} | Acc ${runtime ? Math.floor(runtime.accumulated_run_sec / 1000) : 0}s</div>
<div class="muted">Run ${unit.run_time_sec}s / Stop ${unit.stop_time_sec}s / Acc ${unit.acc_time_sec}s / BL ${unit.bl_time_sec}s</div>
<div class="row unit-card-actions"></div>
`;
@ -151,14 +151,10 @@ export function renderUnits() {
actions.append(editBtn, deleteBtn);
const isAutoOn = runtime?.auto_enabled;
const startBlocked = !isAutoOn && (runtime?.fault_locked || runtime?.manual_ack_required);
const autoBtn = document.createElement("button");
autoBtn.className = isAutoOn ? "danger" : "secondary";
autoBtn.textContent = isAutoOn ? "Stop Auto" : "Start Auto";
autoBtn.disabled = startBlocked;
autoBtn.title = startBlocked
? (runtime?.fault_locked ? "设备故障中,无法启动自动控制" : "需人工确认故障后才可启动自动控制")
: (isAutoOn ? "停止自动控制" : "启动自动控制");
autoBtn.title = isAutoOn ? "停止自动控制" : "启动自动控制";
autoBtn.addEventListener("click", (e) => {
e.stopPropagation();
const url = `/api/control/unit/${unit.id}/${isAutoOn ? "stop-auto" : "start-auto"}`;
@ -192,10 +188,6 @@ export async function loadUnits() {
state.selectedUnitId = null;
}
state.units.forEach((unit) => {
if (unit.runtime) state.runtimes.set(unit.id, unit.runtime);
});
renderUnits();
renderUnitOptions(dom.equipmentUnitId?.value || "", dom.equipmentUnitId);
renderUnitOptions(dom.equipmentBatchUnitId?.value || "", dom.equipmentBatchUnitId);

View File

@ -237,29 +237,29 @@ body {
.ops-signal-rows {
padding: 6px 10px;
display: flex;
flex-direction: row;
gap: 4px;
align-items: center;
flex-direction: column;
gap: 3px;
}
.sig-pill {
display: inline-flex;
.ops-signal-row {
display: flex;
align-items: center;
justify-content: center;
width: 40px;
height: 20px;
border-radius: 3px;
font-size: 10px;
font-weight: 700;
letter-spacing: 0.5px;
background: var(--surface-2, #e0e0e0);
color: var(--text-3);
transition: background 0.2s, color 0.2s;
user-select: none;
gap: 6px;
font-size: 12px;
}
.ops-signal-label {
width: 36px;
color: var(--text-3);
font-size: 11px;
text-transform: uppercase;
flex-shrink: 0;
}
.ops-signal-value {
flex: 1;
font-weight: 500;
}
.sig-pill.sig-on { background: var(--success); color: #fff; }
.sig-pill.sig-fault { background: var(--danger); color: #fff; }
.sig-pill.sig-warn { background: var(--warning); color: #333; }
.ops-eq-card-actions {
padding: 6px 10px 8px;
@ -875,45 +875,38 @@ button.danger:hover { background: var(--danger-hover); }
}
.event-card {
display: flex;
align-items: baseline;
gap: 6px;
padding: 3px 8px;
padding: 4px 8px;
font-size: 12px;
border-bottom: 1px solid var(--border);
white-space: nowrap;
overflow: hidden;
flex-shrink: 0;
}
.event-card:hover {
background: var(--surface-hover, var(--surface));
}
.event-badge {
flex-shrink: 0;
.event-meta {
display: flex;
align-items: baseline;
gap: 6px;
}
.badge.level-info { background: rgba(52, 211, 153, 0.1); color: #34d399; }
.badge.level-warn { background: rgba(251, 191, 36, 0.1); color: #fbbf24; }
.badge.level-error { background: rgba(239, 68, 68, 0.1); color: #f87171; }
.badge.level-critical { background: rgba(239, 68, 68, 0.15); color: #dc2626; }
.event-type {
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.event-time {
flex-shrink: 0;
font-size: 11px;
}
.event-type {
flex-shrink: 0;
font-weight: 600;
}
.event-message {
color: var(--text-2);
color: var(--text-muted, #888);
font-size: 11px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.equipment-select-row {