From 8c1b7b636d0f0572278e884f74d9e12bc7254c8d Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 26 Mar 2026 08:33:00 +0800 Subject: [PATCH] refactor(engine): replace 500ms ticker with per-unit event-driven tasks - Engine now spawns one async task per enabled unit (supervised every 10s) - wait_phase uses sleep_until + select! for precise timing; 500ms fault-tick runs inside each phase so fault/comm is still checked promptly - WS UnitRuntimeChanged pushed only on state transitions, not every tick - ControlRuntimeStore gains notify_unit/get_or_create_notify for instant wake-up when handlers change auto_enabled or fault_locked - UnitRuntime: remove last_tick_at, current_run/stop/distributor_elapsed_sec; add display_acc_sec (snapshot at transition, avoids mid-cycle jitter) - accumulated_run_sec now increments by exact run_time_sec*1000 per cycle - unit.state_changed events no longer written to DB (too frequent) - Frontend: show display_acc_sec instead of accumulated_run_sec - styles: event-card flex-shrink:0 fixes text overlap under flex column Co-Authored-By: Claude Sonnet 4.6 --- .../plans/2026-03-24-control-engine.md | 271 ++++++ .../plans/2026-03-25-dual-view-web.md | 837 ++++++++++++++++++ src/control/engine.rs | 675 +++++++------- src/control/runtime.rs | 33 +- src/event.rs | 7 +- src/handler/control.rs | 7 +- web/js/ops.js | 2 +- web/js/units.js | 2 +- web/styles.css | 1 + 9 files changed, 1478 insertions(+), 357 deletions(-) create mode 100644 docs/superpowers/plans/2026-03-24-control-engine.md create mode 100644 docs/superpowers/plans/2026-03-25-dual-view-web.md diff --git a/docs/superpowers/plans/2026-03-24-control-engine.md b/docs/superpowers/plans/2026-03-24-control-engine.md new file mode 100644 index 0000000..1fcb975 --- /dev/null +++ b/docs/superpowers/plans/2026-03-24-control-engine.md @@ -0,0 +1,271 @@ +# Control Engine Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Implement the automated control engine for coal feeder / distributor units, including state machine, fault/comm protection, runtime API and frontend control panels. + +**Architecture:** The engine spawns one async task per enabled unit (supervised by a 10s scanner). Each task drives the unit's state machine using `tokio::time::sleep_until` for phase timing and `tokio::sync::Notify` for instant wake-up when external state changes (auto enable/disable, fault ack). A 500ms fault-poll ticker runs inside each task's `wait_phase` helper so fault/comm status is still checked promptly during long phases. State is kept in `ControlRuntimeStore` (in-memory, never persisted). Frontend receives real-time updates via `WsMessage::UnitRuntimeChanged` — pushed **only on state transitions**, not every tick. + +**Tech Stack:** Rust/Axum backend, sqlx/PostgreSQL, tokio async, vanilla JS ES modules frontend. + +--- + +## File Map + +| File | Action | Responsibility | +|------|--------|---------------| +| `src/control/runtime.rs` | ✅ Done | `UnitRuntime` struct + `ControlRuntimeStore` with `Notify` per unit | +| `src/control/command.rs` | ✅ Done | Shared `send_pulse_command()` helper | +| `src/control/engine.rs` | ✅ Done | Supervisor + per-unit async tasks + `wait_phase` | +| `src/control/validator.rs` | ✅ Done | Block manual commands when unit is fault/comm locked | +| `src/control/mod.rs` | ✅ Done | Exposes `command`, `engine`, `runtime`, `validator` | +| `src/event.rs` | ✅ Done | 7 `AppEvent` variants; `UnitStateChanged` fires but is **not** persisted to DB | +| `src/websocket.rs` | ✅ Done | `WsMessage::UnitRuntimeChanged` | +| `src/service/control.rs` | ✅ Done | `get_all_enabled_units`, `get_equipment_by_unit_id` | +| `src/handler/control.rs` | ✅ Done | `start_auto`, `stop_auto`, `batch_start_auto`, `batch_stop_auto`, `ack_fault`, `get_unit_runtime`; calls `notify_unit` after every state change | +| `src/main.rs` | ✅ Done | Routes for above endpoints | +| `web/js/state.js` | ✅ Done | `runtimes: new Map()` | +| `web/js/units.js` | ✅ Done | Runtime state badge, Auto Start/Stop, Ack Fault; shows `display_acc_sec` | +| `web/js/ops.js` | ✅ Done | Ops panel unit cards show runtime badge + `display_acc_sec` | +| `web/js/app.js` | ✅ Done | Handles `UnitRuntimeChanged` WS message | +| `web/styles.css` | ✅ Done | `.event-card { flex-shrink: 0 }` prevents text overlap under flex column | + +--- + +## Current UnitRuntime Shape + +```rust +// src/control/runtime.rs +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct UnitRuntime { + pub unit_id: Uuid, + pub state: UnitRuntimeState, + pub auto_enabled: bool, + pub accumulated_run_sec: i64, // internal accumulator (ms); do NOT display directly + pub display_acc_sec: i64, // snapshot at state-transition; use this for display + pub fault_locked: bool, + pub flt_active: bool, + pub comm_locked: bool, + pub manual_ack_required: bool, +} +// NOTE: elapsed-time fields (current_run_elapsed_sec, current_stop_elapsed_sec, +// distributor_run_elapsed_sec, last_tick_at) were removed in the event-driven +// refactor. Timing is now managed entirely by tokio::time::sleep_until inside +// the per-unit task. Do not re-add them. +``` + +`ControlRuntimeStore` adds: + +```rust +notifiers: Arc>>>, + +// Methods: +pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc +pub async fn notify_unit(&self, unit_id: Uuid) // call from handlers after state changes +``` + +--- + +## Engine Architecture (event-driven, 2026-03-26) + +``` +start() + └─ supervise() — interval 10s, spawns unit_task per enabled unit + +unit_task(unit_id) + ├─ load_equipment_maps — once at task start (cached for task lifetime) + ├─ fault_tick — interval 500ms, used inside wait_phase + └─ loop: + ├─ reload unit config (check still enabled) + ├─ check_fault_comm → push WS if changed + ├─ if !auto || fault || comm → select!(fault_tick | notify), continue + └─ match state: + Stopped → wait_phase(stop_time_sec) → start feeder → state=Running → push WS + Running → wait_phase(run_time_sec) → stop feeder → acc += run_time_sec + → if acc >= acc_time_sec: start distributor, state=DistributorRunning + → else: state=Stopped → push WS + DistributorRunning → wait_phase(bl_time_sec) → stop distributor → acc=0 → state=Stopped → push WS + FaultLocked|CommLocked → select!(fault_tick | notify) + +wait_phase(secs): + deadline = now + secs + loop: + select! { sleep_until(deadline) => return true + fault_tick.tick() => re-check fault/comm; if interrupted return false + notify.notified() => re-check fault/comm; if interrupted return false } +``` + +**Key invariants:** +- `accumulated_run_sec` is updated by **exactly** `run_time_sec * 1000` per completed cycle (no delta drift). +- `display_acc_sec` is a snapshot copied from `accumulated_run_sec` only at Running→Stopped or Running→DistributorRunning transitions. Frontend always reads `display_acc_sec`. +- WS is pushed **only** when something changes. No periodic push. +- `unit.state_changed` events are fired (for logging) but **not** written to the DB event table (too frequent). + +--- + +## Task 1: Extend UnitRuntime — ✅ DONE + +**Files:** `src/control/runtime.rs` + +Fields as shown in "Current UnitRuntime Shape" above. `ControlRuntimeStore` includes the `notifiers` map with `get_or_create_notify` and `notify_unit` methods. + +--- + +## Task 2: Create shared pulse-command helper — ✅ DONE + +**Files:** `src/control/command.rs`, `src/control/mod.rs`, `src/handler/control.rs` + +`send_pulse_command(connection_manager, point_id, value_type, pulse_ms)` writes high→delay→low. +`simulate_run_feedback(state, eq_id, running)` writes a fake run-feedback value in simulate mode. + +--- + +## Task 3: Add runtime-state checks to validator.rs — ✅ DONE + +**Files:** `src/control/validator.rs` + +After existing REM/FLT/quality checks in `validate_manual_control`: + +```rust +if let Some(unit_id) = equipment.unit_id { + if let Some(runtime) = state.control_runtime.get(unit_id).await { + if runtime.auto_enabled { + return Err(ApiErr::Forbidden("Auto control is active; disable auto first", ...)); + } + if runtime.comm_locked { + return Err(ApiErr::Forbidden("Unit communication is locked", ...)); + } + if runtime.fault_locked { + return Err(ApiErr::Forbidden("Unit is fault locked", ...)); + } + } +} +``` + +--- + +## Task 4: Extend AppEvent with business events — ✅ DONE + +**Files:** `src/event.rs` + +7 variants added: + +```rust +AutoControlStarted { unit_id: Uuid }, +AutoControlStopped { unit_id: Uuid }, +FaultLocked { unit_id: Uuid, equipment_id: Uuid }, +FaultAcked { unit_id: Uuid }, +CommLocked { unit_id: Uuid }, +CommRecovered { unit_id: Uuid }, +UnitStateChanged { unit_id: Uuid, from_state: String, to_state: String }, +``` + +**`persist_event_if_needed` mapping:** + +| Variant | DB? | event_type | +|---------|-----|-----------| +| `AutoControlStarted` | ✅ | `unit.auto_control_started` | +| `AutoControlStopped` | ✅ | `unit.auto_control_stopped` | +| `FaultLocked` | ✅ | `unit.fault_locked` (level: error) | +| `FaultAcked` | ✅ | `unit.fault_acked` | +| `CommLocked` | ✅ | `unit.comm_locked` (level: warn) | +| `CommRecovered` | ✅ | `unit.comm_recovered` | +| `UnitStateChanged` | ❌ | — (too frequent; fires every cycle) | + +--- + +## Task 5: Add WsMessage::UnitRuntimeChanged — ✅ DONE + +**Files:** `src/websocket.rs` + +```rust +UnitRuntimeChanged(crate::control::runtime::UnitRuntime), +``` + +--- + +## Task 6: Add service helpers — ✅ DONE + +**Files:** `src/service/control.rs` + +```rust +pub async fn get_all_enabled_units(pool: &PgPool) -> Result, sqlx::Error> +pub async fn get_equipment_by_unit_id(pool: &PgPool, unit_id: Uuid) -> Result, sqlx::Error> +``` + +--- + +## Task 7: Implement control/engine.rs — ✅ DONE (event-driven, 2026-03-26) + +**Files:** `src/control/engine.rs` + +See "Engine Architecture" section above for the full design. + +**Critical rules for future modifications:** +- Never push `WsMessage::UnitRuntimeChanged` except at state transitions or fault/comm changes. +- `wait_phase` must use `sleep_until(deadline)` not `sleep(duration)` — the deadline is fixed when the phase starts so that fault-tick re-checks don't restart the timer. +- When handling `notify.notified()` inside `wait_phase`, always re-read runtime from store (the handler may have changed `auto_enabled`). +- Equipment maps are loaded once per task invocation; if equipment config changes, the supervisor will restart the task on its next scan (≤10s delay). + +--- + +## Task 8: New API endpoints — ✅ DONE + +**Files:** `src/handler/control.rs`, `src/main.rs` + +| Method | Path | Handler | +|--------|------|---------| +| POST | `/api/unit/:id/start-auto` | `start_auto_unit` | +| POST | `/api/unit/:id/stop-auto` | `stop_auto_unit` | +| POST | `/api/unit/:id/ack-fault` | `ack_fault_unit` | +| POST | `/api/unit/batch-start-auto` | `batch_start_auto` | +| POST | `/api/unit/batch-stop-auto` | `batch_stop_auto` | +| GET | `/api/unit/:id/runtime` | `get_unit_runtime` | + +**Notify contract:** every handler that modifies `auto_enabled` or `fault_locked` MUST call `state.control_runtime.notify_unit(unit_id).await` after upserting the runtime. This wakes the sleeping unit task immediately. + +```rust +// Pattern to follow in every auto/fault handler: +state.control_runtime.upsert(runtime).await; +state.control_runtime.notify_unit(unit_id).await; // ← must not be omitted +let _ = state.event_manager.send(AppEvent::...); +``` + +--- + +## Task 9: Frontend runtime integration — ✅ DONE + +**Files:** `web/js/state.js`, `web/js/units.js`, `web/js/ops.js`, `web/js/app.js` + +**WS handler in app.js:** +```js +case "UnitRuntimeChanged": + state.runtimes.set(payload.data.unit_id, payload.data); + renderUnits(); // re-renders unit cards with new badge/buttons + renderOpsUnits(); + break; +``` + +**Display rule:** Always use `runtime.display_acc_sec` for Acc display, never `runtime.accumulated_run_sec`. + +```js +// ✅ Correct +`Acc ${Math.floor(runtime.display_acc_sec / 1000)}s` + +// ❌ Wrong — shows mid-cycle jitter values +`Acc ${Math.floor(runtime.accumulated_run_sec / 1000)}s` +``` + +**Event list CSS:** `.event-card` must have `flex-shrink: 0` (in `web/styles.css`) to prevent card height compression and text overlap when the flex-column list grows. + +--- + +## Task 10: Connect engine to AppState — ✅ DONE + +**Files:** `src/main.rs` + +```rust +let control_runtime = Arc::new(control::runtime::ControlRuntimeStore::new()); +// ... build AppState ... +control::engine::start(state.clone(), control_runtime); +``` diff --git a/docs/superpowers/plans/2026-03-25-dual-view-web.md b/docs/superpowers/plans/2026-03-25-dual-view-web.md new file mode 100644 index 0000000..ae2bc72 --- /dev/null +++ b/docs/superpowers/plans/2026-03-25-dual-view-web.md @@ -0,0 +1,837 @@ +# Dual-View Web UI Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a top-level Tab switch between an **运维视图** (operational, equipment-first) and the existing **配置视图** (configuration, point-first), with the ops view showing realtime signal values on equipment cards and a system-events panel at the bottom, while the config view replaces the events panel with a realtime SSE log stream. + +**Architecture:** Two CSS grid classes (`grid-ops` / `grid-config`) on `
` control which panels are visible. A new `ops.js` module drives the ops view: it calls `GET /api/unit/{id}/detail` on unit selection, renders equipment cards with per-role signal cells, and registers point DOM elements in `state.opsPointEls` so the existing WebSocket handler can push live updates. The SSE log stream (`/api/logs/stream`) is revived as a separate panel shown only in config view, started/stopped on tab switch. + +**Tech Stack:** Vanilla JS ES modules, CSS Grid, SSE (`EventSource`), existing WebSocket infrastructure, existing `/api/unit/{id}/detail` endpoint. + +--- + +## Current layout (reference) + +``` +grid (3 cols × 2 rows): + top-left → equipment-panel.html (col 1, row 1) + top-right → points-panel.html (col 2-3, row 1) + bottom-left → source-panel.html (col 1, row 2) — units + sources stacked + bottom-mid → logs-panel.html (col 2, row 2) — system events + bottom-right→ chart-panel.html (col 3, row 2) +``` + +## Target layouts + +``` +grid-config (same as current): + top-left → equipment-panel (col 1, row 1) + top-right → points-panel (col 2-3, row 1) + bottom-left → source-panel (col 1, row 2) + bottom-mid → log-stream-panel (NEW) (col 2, row 2) — SSE logs + bottom-right→ chart-panel (col 3, row 2) + +grid-ops (new): + top → ops-panel (NEW) (col 1-3, row 1) — unit sidebar + equipment cards + bottom → logs-panel (col 1-3, row 2) — system events (full width) +``` + +## File Map + +| File | Action | Purpose | +|---|---|---| +| `web/html/topbar.html` | Modify | Add `#tabOps` / `#tabConfig` tab buttons | +| `web/html/ops-panel.html` | **Create** | Ops view: `#opsUnitList` sidebar + `#opsEquipmentArea` card grid | +| `web/html/log-stream-panel.html` | **Create** | Config view bottom-mid: SSE log stream (`#logView`) | +| `web/index.html` | Modify | Add new partials, version bump | +| `web/js/ops.js` | **Create** | Load unit detail, render equipment cards, expose `updateOpsPoint()` | +| `web/js/state.js` | Modify | Add `activeView`, `opsPointEls`, `logSource` | +| `web/js/dom.js` | Modify | Add refs: `tabOps`, `tabConfig`, `opsUnitList`, `opsEquipmentArea`, `logView` | +| `web/js/logs.js` | Modify | Restore `startLogs` / `stopLogs`; call `updateOpsPoint` in WS handler | +| `web/js/app.js` | Modify | Tab switch logic, bind ops unit-click, start/stop log stream on switch | +| `web/styles.css` | Modify | Tab styles, `grid-ops`, `grid-config`, ops card + signal row styles | + +--- + +## Task 1: Tab scaffold + CSS layout switching + +**Files:** +- Modify: `web/html/topbar.html` +- Modify: `web/index.html` +- Modify: `web/js/state.js` +- Modify: `web/js/dom.js` +- Modify: `web/js/app.js` +- Modify: `web/styles.css` + +- [ ] **Step 1: Add tab buttons to topbar** + +Replace `web/html/topbar.html` with: + +```html +
+
PLC Control
+
+ + +
+
+ + +
Ready
+
+
+``` + +- [ ] **Step 2: Add tab + grid CSS to `web/styles.css`** + +After the existing `.topbar-actions` block, add: + +```css +/* ── Tabs ───────────────────────────────────────── */ + +.tab-bar { + display: flex; + gap: 2px; +} + +.tab-btn { + padding: 0 16px; + height: 28px; + font-size: 13px; + font-weight: 500; + background: transparent; + border: 1px solid var(--border); + color: var(--text-2); + cursor: pointer; +} + +.tab-btn.active { + background: var(--accent); + border-color: var(--accent); + color: #fff; +} +``` + +Replace the existing `.grid` block (lines 82–94) with: + +```css +.grid-ops, +.grid-config { + display: grid; + gap: 1px; + height: calc(100vh - var(--topbar-h)); +} + +.grid-config { + grid-template-columns: 320px minmax(0, 2fr) minmax(0, 1.3fr); + grid-template-rows: 1fr 380px; +} + +.grid-ops { + grid-template-columns: 260px minmax(0, 1fr); + grid-template-rows: 1fr 260px; +} + +/* config view slot assignments */ +.grid-config .panel.top-left { grid-column: 1; grid-row: 1; } +.grid-config .panel.top-right { grid-column: 2 / 4; grid-row: 1; } +.grid-config .panel.bottom-left { grid-column: 1; grid-row: 2; } +.grid-config .panel.bottom-mid { grid-column: 2; grid-row: 2; } +.grid-config .panel.bottom-right{ grid-column: 3; grid-row: 2; } + +/* ops view slot assignments */ +.grid-ops .panel.ops-main { grid-column: 1 / 3; grid-row: 1; } +.grid-ops .panel.ops-bottom { grid-column: 1 / 3; grid-row: 2; } +``` + +- [ ] **Step 3: Add `activeView` and `logSource` to `web/js/state.js`** + +```js +export const state = { + // ... existing fields ... + activeView: "ops", // "ops" | "config" + opsPointEls: new Map(), // point_id -> { valueEl, qualityEl } + logSource: null, +}; +``` + +- [ ] **Step 4: Add DOM refs in `web/js/dom.js`** + +```js +tabOps: byId("tabOps"), +tabConfig: byId("tabConfig"), +opsUnitList: byId("opsUnitList"), +opsEquipmentArea: byId("opsEquipmentArea"), +logView: byId("logView"), +``` + +- [ ] **Step 5: Add `switchView` function + wiring in `web/js/app.js`** + +Add at top of `app.js`: +```js +import { startOps, handleOpsUnitClick } from "./ops.js"; +import { startLogs, stopLogs } from "./logs.js"; +``` + +Add `switchView` function before `bindEvents`: +```js +function switchView(view) { + state.activeView = view; + const main = document.querySelector("main"); + main.className = view === "ops" ? "grid-ops" : "grid-config"; + + dom.tabOps.classList.toggle("active", view === "ops"); + dom.tabConfig.classList.toggle("active", view === "config"); + + // config-only panels + ["top-left", "top-right", "bottom-left", "bottom-right"].forEach((cls) => { + const el = main.querySelector(`.panel.${cls}`); + if (el) el.classList.toggle("hidden", view === "ops"); + }); + // bottom-mid is log-stream in config, hidden in ops + const logStreamPanel = main.querySelector(".panel.bottom-mid"); + if (logStreamPanel) logStreamPanel.classList.toggle("hidden", view === "ops"); + + // ops-only panels + const opsMain = main.querySelector(".panel.ops-main"); + const opsBottom = main.querySelector(".panel.ops-bottom"); + if (opsMain) opsMain.classList.toggle("hidden", view === "config"); + if (opsBottom) opsBottom.classList.toggle("hidden", view === "config"); + + if (view === "config") { + startLogs(); + } else { + stopLogs(); + } +} +``` + +In `bindEvents`, add: +```js +dom.tabOps.addEventListener("click", () => switchView("ops")); +dom.tabConfig.addEventListener("click", () => switchView("config")); +``` + +In `bootstrap`, call after `bindEvents()`: +```js +switchView("ops"); // default to ops view +``` + +- [ ] **Step 6: Update `web/index.html` — add new partials, default grid class, version bump** + +```html +
+
+
+
+
+
+
+
+
+``` + +Bump version: `?v=20260325a` on both CSS and JS. + +- [ ] **Step 7: Verify panels show/hide correctly** + +Open browser, click 运维 / 配置 tabs — panels should swap. Layout may be unstyled; that's fine for now. + +- [ ] **Step 8: Commit** + +```bash +git add web/html/topbar.html web/index.html web/js/state.js web/js/dom.js web/js/app.js web/styles.css +git commit -m "feat(web): add tab scaffold for ops/config dual-view layout" +``` + +--- + +## Task 2: Ops panel HTML + CSS skeleton + +**Files:** +- Create: `web/html/ops-panel.html` +- Modify: `web/styles.css` + +- [ ] **Step 1: Create `web/html/ops-panel.html`** + +```html +
+
+ +
+
← 选择控制单元
+
+
+
+``` + +Note: `logs-panel.html` already has `id="eventList"` and class structure. Add `ops-bottom` class to it in HTML: + +In `web/html/logs-panel.html`, change: +```html +
+``` + +- [ ] **Step 2: Add ops layout CSS to `web/styles.css`** + +```css +/* ── Ops View ───────────────────────────────────── */ + +.ops-layout { + display: flex; + min-height: 0; + flex: 1 1 auto; + overflow: hidden; +} + +.ops-unit-sidebar { + width: 260px; + flex-shrink: 0; + border-right: 1px solid var(--border); + display: flex; + flex-direction: column; + min-height: 0; + overflow: hidden; +} + +.ops-unit-list { + flex: 1 1 auto; + overflow-y: auto; +} + +.ops-equipment-area { + flex: 1 1 auto; + overflow: auto; + padding: 12px; + display: flex; + flex-wrap: wrap; + align-content: flex-start; + gap: 12px; +} + +.ops-placeholder { + padding: 20px; +} + +/* Equipment ops card */ +.ops-eq-card { + width: 220px; + border: 1px solid var(--border); + background: var(--surface); + display: flex; + flex-direction: column; + gap: 0; +} + +.ops-eq-card-head { + padding: 8px 10px 6px; + border-bottom: 1px solid var(--border-light); + display: flex; + align-items: center; + gap: 6px; +} + +.ops-eq-card-head strong { + flex: 1; + font-size: 13px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.ops-signal-rows { + padding: 6px 10px; + display: flex; + flex-direction: column; + gap: 3px; +} + +.ops-signal-row { + display: flex; + align-items: center; + gap: 6px; + font-size: 12px; +} + +.ops-signal-label { + width: 36px; + color: var(--text-3); + font-size: 11px; + text-transform: uppercase; + flex-shrink: 0; +} + +.ops-signal-value { + flex: 1; + font-weight: 500; +} + +.ops-eq-card-actions { + padding: 6px 10px 8px; + display: flex; + gap: 6px; + border-top: 1px solid var(--border-light); +} + +.ops-eq-card-actions button { + flex: 1; + padding: 3px 0; + font-size: 12px; +} + +/* ops unit list item */ +.ops-unit-item { + padding: 8px 10px; + cursor: pointer; + border-bottom: 1px solid var(--border-light); + display: flex; + flex-direction: column; + gap: 3px; +} + +.ops-unit-item:hover { background: var(--accent-bg); } +.ops-unit-item.selected { + background: var(--accent-bg); + border-left: 3px solid var(--accent); +} + +.ops-unit-item-name { + font-size: 13px; + font-weight: 600; +} + +.ops-unit-item-meta { + font-size: 11px; + color: var(--text-3); + display: flex; + gap: 6px; +} +``` + +- [ ] **Step 3: Verify layout renders correctly (empty, no JS yet)** + +Refresh browser in ops tab — sidebar and card area should be visible with placeholder text. + +- [ ] **Step 4: Commit** + +```bash +git add web/html/ops-panel.html web/html/logs-panel.html web/styles.css +git commit -m "feat(web): add ops panel HTML skeleton and layout CSS" +``` + +--- + +## Task 3: ops.js — unit list + equipment card rendering + +**Files:** +- Create: `web/js/ops.js` +- Modify: `web/js/app.js` + +The ops view unit list is separate from the config view's `#unitList`. When a unit is clicked, `GET /api/unit/{id}/detail` returns the nested structure and we render equipment cards. + +Equipment card signal roles to display (in order): `rem`, `run`, `flt`. Show label + quality dot + value. Start/Stop buttons only for `coal_feeder` / `distributor` kind. + +- [ ] **Step 1: Create `web/js/ops.js`** + +```js +import { apiFetch } from "./api.js"; +import { dom } from "./dom.js"; +import { state } from "./state.js"; + +const SIGNAL_ROLES = ["rem", "run", "flt"]; +const ROLE_LABELS = { rem: "REM", run: "RUN", flt: "FLT" }; + +export function renderOpsUnits() { + if (!dom.opsUnitList) return; + dom.opsUnitList.innerHTML = ""; + + if (!state.units.length) { + dom.opsUnitList.innerHTML = '
暂无控制单元
'; + return; + } + + state.units.forEach((unit) => { + const runtime = state.runtimes.get(unit.id); + const item = document.createElement("div"); + item.className = `ops-unit-item${state.selectedOpsUnitId === unit.id ? " selected" : ""}`; + item.innerHTML = ` +
${unit.code} / ${unit.name}
+
+ ${unit.enabled ? "EN" : "DIS"} + ${runtime ? `Acc ${Math.floor(runtime.accumulated_run_sec / 1000)}s` : ""} +
+ `; + item.addEventListener("click", () => selectOpsUnit(unit.id)); + dom.opsUnitList.appendChild(item); + }); +} + +async function selectOpsUnit(unitId) { + state.selectedOpsUnitId = unitId === state.selectedOpsUnitId ? null : unitId; + renderOpsUnits(); + + if (!state.selectedOpsUnitId) { + dom.opsEquipmentArea.innerHTML = '
← 选择控制单元
'; + state.opsPointEls.clear(); + return; + } + + dom.opsEquipmentArea.innerHTML = '
加载中...
'; + state.opsPointEls.clear(); + + const detail = await apiFetch(`/api/unit/${state.selectedOpsUnitId}/detail`); + renderOpsEquipments(detail.equipments || []); +} + +function renderOpsEquipments(equipments) { + dom.opsEquipmentArea.innerHTML = ""; + if (!equipments.length) { + dom.opsEquipmentArea.innerHTML = '
该单元下暂无设备
'; + return; + } + + equipments.forEach((eq) => { + const runtime = state.runtimes.get(state.selectedOpsUnitId); + const card = document.createElement("div"); + card.className = "ops-eq-card"; + + // Build role → point_id map + const roleMap = {}; + (eq.points || []).forEach((p) => { + if (p.signal_role) roleMap[p.signal_role] = p; + }); + + // Signal rows HTML (placeholders; WS will fill values) + const signalRowsHtml = SIGNAL_ROLES.map((role) => { + const point = roleMap[role]; + if (!point) return ""; + return ` +
+ ${ROLE_LABELS[role] || role} + ? + -- +
`; + }).join(""); + + const canControl = eq.kind === "coal_feeder" || eq.kind === "distributor"; + + card.innerHTML = ` +
+ ${eq.code} + ${eq.kind || "--"} +
+
${signalRowsHtml || '无绑定信号'}
+ ${canControl ? '
' : ""} + `; + + if (canControl) { + const actions = card.querySelector(".ops-eq-card-actions"); + const startBtn = document.createElement("button"); + startBtn.className = "secondary"; + startBtn.textContent = "Start"; + startBtn.addEventListener("click", () => + apiFetch(`/api/control/equipment/${eq.id}/start`, { method: "POST" }).catch(() => {}) + ); + const stopBtn = document.createElement("button"); + stopBtn.className = "danger"; + stopBtn.textContent = "Stop"; + stopBtn.addEventListener("click", () => + apiFetch(`/api/control/equipment/${eq.id}/stop`, { method: "POST" }).catch(() => {}) + ); + actions.append(startBtn, stopBtn); + } + + dom.opsEquipmentArea.appendChild(card); + + // Register DOM elements for WS updates + SIGNAL_ROLES.forEach((role) => { + const point = roleMap[role]; + if (!point) return; + const valueEl = card.querySelector(`[data-ops-value="${point.id}"]`); + const qualityEl = card.querySelector(`[data-ops-quality="${point.id}"]`); + if (valueEl && qualityEl) { + state.opsPointEls.set(point.id, { valueEl, qualityEl }); + } + }); + }); +} + +export function startOps() { + renderOpsUnits(); +} +``` + +- [ ] **Step 2: Add `selectedOpsUnitId` to `web/js/state.js`** + +```js +selectedOpsUnitId: null, +``` + +- [ ] **Step 3: Wire ops into `web/js/app.js`** + +Add import: +```js +import { startOps, renderOpsUnits } from "./ops.js"; +``` + +In `bootstrap`, add after `loadUnits`: +```js +await withStatus(loadUnits()); // already exists +startOps(); // initialise ops unit list +``` + +Also update the `equipments-updated` listener to also call `renderOpsUnits`: +```js +document.addEventListener("equipments-updated", () => { + renderUnits(); + renderOpsUnits(); +}); +``` + +After `loadUnits()` is called anywhere (e.g., `refreshUnitBtn`), `renderOpsUnits()` should also be triggered. Simplest: call `renderOpsUnits()` inside `loadUnits()` in `units.js` — add at end of that function: + +In `web/js/units.js`, at end of `loadUnits()`: +```js +// notify ops view +document.dispatchEvent(new Event("units-loaded")); +``` + +In `web/js/app.js`: +```js +document.addEventListener("units-loaded", renderOpsUnits); +``` + +- [ ] **Step 4: Verify unit list renders and card area populates on click** + +Click 运维 tab → unit list shows → click a unit → equipment cards appear with signal row placeholders. + +- [ ] **Step 5: Commit** + +```bash +git add web/js/ops.js web/js/state.js web/js/app.js web/js/units.js +git commit -m "feat(web): ops view unit list and equipment card rendering" +``` + +--- + +## Task 4: Realtime signal values in ops cards + +**Files:** +- Modify: `web/js/logs.js` + +The WebSocket `PointNewValue` handler already updates `state.pointEls`. Add a second lookup for `state.opsPointEls`. + +- [ ] **Step 1: Update WebSocket handler in `web/js/logs.js`** + +In the `PointNewValue` branch, after the existing `state.pointEls` block: + +```js +if (payload.type === "PointNewValue" || payload.type === "point_new_value") { + const data = payload.data; + + // config view point table + const entry = state.pointEls.get(data.point_id); + if (entry) { + entry.value.textContent = formatValue(data); + entry.quality.className = `badge quality-${(data.quality || "unknown").toLowerCase()}`; + entry.quality.textContent = (data.quality || "unknown").toUpperCase(); + entry.time.textContent = data.timestamp || "--"; + } + + // ops view signal cell + const opsEntry = state.opsPointEls.get(data.point_id); + if (opsEntry) { + opsEntry.valueEl.textContent = formatValue(data); + opsEntry.qualityEl.className = `badge quality-${(data.quality || "unknown").toLowerCase()}`; + opsEntry.qualityEl.textContent = (data.quality || "unknown").toUpperCase(); + } + + if (state.chartPointId === data.point_id) { + appendChartPoint(data); + } + return; +} +``` + +Also update `UnitRuntimeChanged` to re-render ops unit list: + +```js +if (payload.type === "UnitRuntimeChanged") { + const runtime = payload.data; + state.runtimes.set(runtime.unit_id, runtime); + renderUnits(); + // lazy import to avoid circular dep + import("./ops.js").then(({ renderOpsUnits }) => renderOpsUnits()); + return; +} +``` + +- [ ] **Step 2: Verify realtime updates** + +With a live OPC UA source connected, open ops view, select a unit — signal cells should show live quality badges and values updating in real time. + +- [ ] **Step 3: Commit** + +```bash +git add web/js/logs.js +git commit -m "feat(web): ops card signal cells update from WebSocket PointNewValue" +``` + +--- + +## Task 5: Log stream panel for config view + +**Files:** +- Create: `web/html/log-stream-panel.html` +- Modify: `web/js/logs.js` +- Modify: `web/js/dom.js` +- Modify: `web/js/app.js` + +Restore the SSE `EventSource` log stream, but only active when in config view. The `startLogs` / `stopLogs` functions are called by `switchView` in `app.js` (already wired in Task 1 Step 5). + +- [ ] **Step 1: Create `web/html/log-stream-panel.html`** + +```html +
+
+

实时日志

+
+
+
+``` + +- [ ] **Step 2: Restore `startLogs` / `stopLogs` in `web/js/logs.js`** + +Add before `startPointSocket`: + +```js +function escapeHtml(text) { + return text.replaceAll("&", "&").replaceAll("<", "<").replaceAll(">", ">"); +} + +function parseLogLine(line) { + const trimmed = line.trim(); + if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return null; + try { return JSON.parse(trimmed); } catch { return null; } +} + +export function appendLog(line) { + if (!dom.logView) return; + const atBottom = dom.logView.scrollTop + dom.logView.clientHeight >= dom.logView.scrollHeight - 10; + const div = document.createElement("div"); + const parsed = parseLogLine(line); + if (!parsed) { + div.className = "log-line"; + div.textContent = line; + } else { + const levelRaw = (parsed.level || "").toString(); + const level = levelRaw.toLowerCase(); + div.className = `log-line${level ? ` level-${level}` : ""}`; + div.innerHTML = [ + `${escapeHtml(levelRaw || "LOG")}`, + parsed.timestamp ? ` ${escapeHtml(parsed.timestamp)}` : "", + parsed.target ? ` ${escapeHtml(parsed.target)}` : "", + `${escapeHtml(parsed.fields?.message || parsed.message || parsed.msg || line)}`, + ].join(""); + } + dom.logView.appendChild(div); + if (atBottom) dom.logView.scrollTop = dom.logView.scrollHeight; +} + +export function startLogs() { + if (state.logSource) return; + state.logSource = new EventSource("/api/logs/stream"); + state.logSource.addEventListener("log", (event) => { + const data = JSON.parse(event.data); + (data.lines || []).forEach(appendLog); + }); + state.logSource.addEventListener("error", () => appendLog("[log stream error]")); +} + +export function stopLogs() { + if (state.logSource) { + state.logSource.close(); + state.logSource = null; + } +} +``` + +- [ ] **Step 3: Add `logView` to `web/js/dom.js`** (already added in Task 1 Step 4 — verify it's present) + +- [ ] **Step 4: Verify config view shows SSE log stream** + +Click 配置 tab → bottom-middle panel should show "实时日志" with live log lines streaming. Click 运维 tab → SSE connection closes. + +- [ ] **Step 5: Commit** + +```bash +git add web/html/log-stream-panel.html web/js/logs.js web/js/dom.js +git commit -m "feat(web): restore SSE log stream panel in config view" +``` + +--- + +## Task 6: Final wiring, cleanup and polish + +**Files:** +- Modify: `web/styles.css` (log panel, minor tweaks) +- Modify: `web/js/units.js` (dispatch units-loaded) +- Modify: `web/index.html` (version bump) + +- [ ] **Step 1: Add log panel CSS** (if not already in styles.css from previous work) + +Verify `.log`, `.log-line`, `.level-info`, `.level-warn`, `.level-error` styles exist. If not, add: + +```css +.log { + flex: 1 1 auto; + overflow-y: auto; + font-family: monospace; + font-size: 12px; + padding: 4px 8px; +} + +.log-line { padding: 1px 0; border-bottom: 1px solid var(--border-light); } +.log-line .level { font-weight: 700; margin-right: 6px; } +.log-line.level-error { color: var(--danger); } +.log-line.level-warn { color: var(--warning); } +.log-line.level-info { color: var(--text-2); } +.log-line .message { color: var(--text); } +``` + +- [ ] **Step 2: Bump version in `web/index.html`** + +Change `?v=20260325a` → `?v=20260325b` on both CSS and JS links. + +- [ ] **Step 3: Final verification checklist** + +- [ ] 运维 tab: unit list renders, click unit → equipment cards appear +- [ ] Equipment cards show REM / RUN / FLT rows with live values +- [ ] Start/Stop buttons work (coal_feeder / distributor only) +- [ ] `UnitRuntimeChanged` WS message updates ops unit list badges +- [ ] 配置 tab: all existing panels visible (equipment, points, sources, chart) +- [ ] 配置 tab bottom-mid shows SSE log stream, lines append in real time +- [ ] Switching tabs starts/stops SSE correctly (no duplicate connections) +- [ ] 配置 tab events/chart/points work as before + +- [ ] **Step 4: Final commit** + +```bash +git add web/styles.css web/js/units.js web/index.html +git commit -m "feat(web): dual-view UI complete — ops cards + config log stream" +``` + +--- + +## Notes for implementer + +- `state.opsPointEls` is cleared and rebuilt every time a different unit is selected in ops view — no stale references. +- The lazy `import("./ops.js")` in `logs.js` for `UnitRuntimeChanged` avoids a circular dependency (`ops.js` → `logs.js` → `ops.js`). Alternatively, expose a `document.dispatchEvent(new Event("unit-runtime-changed"))` and listen in `ops.js`. +- The ops view does **not** reload `state.equipments` separately — it uses the `/api/unit/{id}/detail` response which is self-contained. +- `startLogs()` is idempotent (guards with `if (state.logSource) return`), so double-calling is safe. +- Backend log CSS classes: the existing styles from before the log removal commit should still be in `styles.css`. If they were removed, add them back per Task 6 Step 1. diff --git a/src/control/engine.rs b/src/control/engine.rs index 6be50d2..4a77193 100644 --- a/src/control/engine.rs +++ b/src/control/engine.rs @@ -1,7 +1,8 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use chrono::Utc; +use tokio::sync::Notify; +use tokio::time::Duration; use uuid::Uuid; use crate::{ @@ -16,378 +17,378 @@ use crate::{ AppState, }; +/// Start the engine: a supervisor spawns one async task per enabled unit. pub fn start(state: AppState, runtime_store: Arc) { tokio::spawn(async move { - let mut ticker = tokio::time::interval(std::time::Duration::from_millis(500)); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - ticker.tick().await; - tick_all_units(&state, &runtime_store).await; - } + supervise(state, runtime_store).await; }); } -async fn tick_all_units(state: &AppState, store: &ControlRuntimeStore) { - let units = match crate::service::get_all_enabled_units(&state.pool).await { - Ok(u) => u, - Err(e) => { - tracing::error!("Engine: failed to load units: {}", e); - return; +/// Supervisor: scans for enabled units every 10 s and ensures each has a running task. +async fn supervise(state: AppState, store: Arc) { + let mut spawned: HashSet = HashSet::new(); + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + match crate::service::get_all_enabled_units(&state.pool).await { + Ok(units) => { + for unit in units { + if spawned.insert(unit.id) { + let s = state.clone(); + let st = store.clone(); + tokio::spawn(async move { unit_task(s, st, unit.id).await; }); + } + } + } + Err(e) => tracing::error!("Engine supervisor: failed to load units: {}", e), } - }; - for unit in units { - tick_unit(state, store, &unit).await; } } -async fn tick_unit( +// ── Per-unit task ───────────────────────────────────────────────────────────── + +async fn unit_task(state: AppState, store: Arc, unit_id: Uuid) { + let notify = store.get_or_create_notify(unit_id).await; + + // Load equipment maps once at task start. + // If equipment config changes, the supervisor's next scan will restart the task. + let (kind_roles, kind_eq_ids, all_roles) = loop { + match load_equipment_maps(&state, unit_id).await { + Ok(maps) => break maps, + Err(e) => { + tracing::error!("Engine: unit {} equipment load failed: {}", unit_id, e); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + }; + + // Fault/comm check ticker — still need periodic polling of point monitor data. + let mut fault_tick = tokio::time::interval(Duration::from_millis(500)); + fault_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + // Reload unit config on each iteration to detect disable/delete. + let unit = match crate::service::get_unit_by_id(&state.pool, unit_id).await { + Ok(Some(u)) if u.enabled => u, + Ok(_) => { + tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); + return; + } + Err(e) => { + tracing::error!("Engine: unit {} config reload failed: {}", unit_id, e); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + // ── Fault / comm check ──────────────────────────────────────────────── + let mut runtime = store.get_or_init(unit_id).await; + if check_fault_comm(&state, &mut runtime, &unit, &all_roles).await { + store.upsert(runtime.clone()).await; + push_ws(&state, &runtime).await; + } + + // ── Wait when not active ────────────────────────────────────────────── + if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked { + tokio::select! { + _ = fault_tick.tick() => {} + _ = notify.notified() => {} + } + continue; + } + + // ── State machine step ──────────────────────────────────────────────── + match runtime.state { + UnitRuntimeState::Stopped => { + // Wait stop_time_sec (0 = skip wait, start immediately). + if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { + continue; + } + // Send feeder start command. + let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "start_cmd", &monitor)); + drop(monitor); + if let Some((pid, vt)) = cmd { + if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + tracing::warn!("Engine: start feeder failed for unit {}: {}", unit_id, e); + continue; + } + if state.config.simulate_plc { + if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { + crate::control::command::simulate_run_feedback(&state, eq_id, true).await; + } + } + } + let mut runtime = store.get_or_init(unit_id).await; + runtime.state = UnitRuntimeState::Running; + store.upsert(runtime.clone()).await; + push_ws(&state, &runtime).await; + } + + UnitRuntimeState::Running => { + // Wait run_time_sec. run_time_sec == 0 means run without a time limit + // (relies on acc_time_sec to eventually stop). Treat as a very long phase. + let secs = if unit.run_time_sec > 0 { unit.run_time_sec } else { i32::MAX }; + let unit_for_wait = crate::model::ControlUnit { run_time_sec: secs, ..unit.clone() }; + if !wait_phase(&state, &store, &unit_for_wait, &all_roles, ¬ify, &mut fault_tick).await { + continue; + } + // Stop feeder. + let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let cmd = kind_roles.get("coal_feeder").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); + drop(monitor); + if let Some((pid, vt)) = cmd { + if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + tracing::warn!("Engine: stop feeder failed for unit {}: {}", unit_id, e); + continue; + } + if state.config.simulate_plc { + if let Some(eq_id) = kind_eq_ids.get("coal_feeder").copied() { + crate::control::command::simulate_run_feedback(&state, eq_id, false).await; + } + } + } + let mut runtime = store.get_or_init(unit_id).await; + runtime.accumulated_run_sec += secs as i64 * 1000; + runtime.display_acc_sec = runtime.accumulated_run_sec; + + if unit.acc_time_sec > 0 && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 { + // Accumulated threshold reached — start distributor. + let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let dist_cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "start_cmd", &monitor)); + drop(monitor); + if let Some((pid, vt)) = dist_cmd { + if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + tracing::warn!("Engine: start distributor failed for unit {}: {}", unit_id, e); + } else if state.config.simulate_plc { + if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { + crate::control::command::simulate_run_feedback(&state, eq_id, true).await; + } + } + } + runtime.state = UnitRuntimeState::DistributorRunning; + } else { + runtime.state = UnitRuntimeState::Stopped; + } + store.upsert(runtime.clone()).await; + push_ws(&state, &runtime).await; + } + + UnitRuntimeState::DistributorRunning => { + // Wait bl_time_sec then stop distributor. + if !wait_phase(&state, &store, &unit, &all_roles, ¬ify, &mut fault_tick).await { + continue; + } + let monitor = state.connection_manager.get_point_monitor_data_read_guard().await; + let cmd = kind_roles.get("distributor").and_then(|r| find_cmd(r, "stop_cmd", &monitor)); + drop(monitor); + if let Some((pid, vt)) = cmd { + if let Err(e) = send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await { + tracing::warn!("Engine: stop distributor failed for unit {}: {}", unit_id, e); + continue; + } + if state.config.simulate_plc { + if let Some(eq_id) = kind_eq_ids.get("distributor").copied() { + crate::control::command::simulate_run_feedback(&state, eq_id, false).await; + } + } + } + let mut runtime = store.get_or_init(unit_id).await; + runtime.accumulated_run_sec = 0; + runtime.display_acc_sec = 0; + runtime.state = UnitRuntimeState::Stopped; + store.upsert(runtime.clone()).await; + push_ws(&state, &runtime).await; + } + + UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => { + tokio::select! { + _ = fault_tick.tick() => {} + _ = notify.notified() => {} + } + } + } + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +/// Sleep for the duration appropriate to the *current* state, interrupting every +/// 500 ms to re-check fault/comm. Returns `true` when the full time elapsed, +/// `false` if the phase was interrupted (auto disabled, fault, or comm lock). +async fn wait_phase( state: &AppState, store: &ControlRuntimeStore, unit: &crate::model::ControlUnit, -) { - let mut runtime = store.get_or_init(unit.id).await; - - // ── Load equipment role-point maps by kind ─────────────── - let equipment_list = match crate::service::get_equipment_by_unit_id(&state.pool, unit.id).await { - Ok(e) => e, - Err(e) => { - tracing::error!( - "Engine: equipment load failed for unit {}: {}", - unit.id, - e - ); - return; - } + all_roles: &[(Uuid, HashMap)], + notify: &Arc, + fault_tick: &mut tokio::time::Interval, +) -> bool { + let secs = match store.get_or_init(unit.id).await.state { + UnitRuntimeState::Stopped => unit.stop_time_sec, + UnitRuntimeState::Running => unit.run_time_sec, + UnitRuntimeState::DistributorRunning => unit.bl_time_sec, + _ => return false, }; - - // kind -> role -> EquipmentRolePoint (first equipment per kind wins) - let mut kind_roles: HashMap> = HashMap::new(); - // kind -> equipment id (first equipment per kind) - let mut kind_eq_ids: HashMap = HashMap::new(); - // all role maps for fault/comm scanning across all equipment - let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); - - for equip in &equipment_list { - match crate::service::get_equipment_role_points(&state.pool, equip.id).await { - Ok(role_points) => { - let role_map: HashMap = role_points - .into_iter() - .map(|rp| (rp.signal_role.clone(), rp)) - .collect(); - - if let Some(kind) = &equip.kind { - if kind_roles.contains_key(kind.as_str()) { - tracing::warn!( - "Engine: unit {} has multiple {} equipment; using first", - unit.id, - kind - ); - } else { - kind_roles.insert(kind.clone(), role_map.clone()); - kind_eq_ids.insert(kind.clone(), equip.id); - } - } - all_roles.push((equip.id, role_map)); - } - Err(e) => { - tracing::warn!( - "Engine: role points load failed for equipment {}: {}", - equip.id, - e - ); - } + if secs <= 0 { + return true; + } + let deadline = tokio::time::Instant::now() + Duration::from_secs(secs as u64); + loop { + let completed = tokio::select! { + _ = tokio::time::sleep_until(deadline) => true, + _ = fault_tick.tick() => false, + _ = notify.notified() => false, + }; + if completed { + return true; + } + // Re-check fault/comm mid-phase. + let mut runtime = store.get_or_init(unit.id).await; + if check_fault_comm(state, &mut runtime, unit, all_roles).await { + store.upsert(runtime.clone()).await; + push_ws(state, &runtime).await; + } + if !runtime.auto_enabled || runtime.fault_locked || runtime.comm_locked { + return false; } } +} - let monitor_guard = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - - // ── Communication check ────────────────────────────────── - let any_bad_quality = all_roles.iter().flat_map(|(_, r)| r.values()).any(|rp| { - monitor_guard - .get(&rp.point_id) - .map(|m| m.quality != PointQuality::Good) - .unwrap_or(false) - }); - - let prev_comm = runtime.comm_locked; - runtime.comm_locked = any_bad_quality; - if !prev_comm && runtime.comm_locked { - let _ = state - .event_manager - .send(AppEvent::CommLocked { unit_id: unit.id }); - } else if prev_comm && !runtime.comm_locked { - let _ = state - .event_manager - .send(AppEvent::CommRecovered { unit_id: unit.id }); - } - - // ── Fault check ────────────────────────────────────────── - let any_flt = all_roles.iter().any(|(_, roles)| { - roles - .get("flt") - .and_then(|rp| monitor_guard.get(&rp.point_id)) - .map(|m| monitor_value_as_bool(m)) - .unwrap_or(false) - }); - - let prev_flt = runtime.flt_active; - runtime.flt_active = any_flt; - - if any_flt && !runtime.fault_locked { - // Find which equipment triggered the fault - let flt_eq_id = all_roles - .iter() - .find(|(_, roles)| { - roles - .get("flt") - .and_then(|rp| monitor_guard.get(&rp.point_id)) - .map(|m| monitor_value_as_bool(m)) - .unwrap_or(false) - }) - .map(|(eq_id, _)| *eq_id) - .unwrap_or(Uuid::nil()); - - runtime.fault_locked = true; - let _ = state.event_manager.send(AppEvent::FaultLocked { - unit_id: unit.id, - equipment_id: flt_eq_id, - }); - if runtime.auto_enabled { - runtime.auto_enabled = false; - let _ = state - .event_manager - .send(AppEvent::AutoControlStopped { unit_id: unit.id }); - } - } - - // FLT just cleared → require manual ack if unit is configured that way - if prev_flt && !any_flt && runtime.fault_locked { - if unit.require_manual_ack_after_fault { - runtime.manual_ack_required = true; - } else { - // Auto-clear fault lock - runtime.fault_locked = false; - } - } - - drop(monitor_guard); - - // ── State machine tick ─────────────────────────────────── - if runtime.auto_enabled && !runtime.fault_locked && !runtime.comm_locked { - let now = Utc::now(); - // Accumulate in milliseconds to avoid sub-second truncation - let delta_ms = runtime - .last_tick_at - .map(|t| (now - t).num_milliseconds().max(0)) - .unwrap_or(0); - - let prev_state = runtime.state.clone(); - tick_state_machine(state, &mut runtime, unit, &kind_roles, &kind_eq_ids, delta_ms).await; - if runtime.state != prev_state { - let _ = state.event_manager.send(AppEvent::UnitStateChanged { - unit_id: unit.id, - from_state: format!("{:?}", prev_state), - to_state: format!("{:?}", runtime.state), - }); - } - } - - runtime.last_tick_at = Some(Utc::now()); - - store.upsert(runtime.clone()).await; +async fn push_ws(state: &AppState, runtime: &UnitRuntime) { if let Err(e) = state .ws_manager - .send_to_public(WsMessage::UnitRuntimeChanged(runtime)) + .send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) .await { tracing::debug!("Engine: WS push skipped (no subscribers): {}", e); } } -/// Drive one state-machine tick for a unit. -/// All elapsed counters accumulate in **milliseconds**; comparisons use `*_time_sec * 1000`. -async fn tick_state_machine( +/// Check fault and comm status, mutate runtime, fire events. +/// Returns `true` if any field changed. +async fn check_fault_comm( state: &AppState, runtime: &mut UnitRuntime, unit: &crate::model::ControlUnit, - kind_roles: &HashMap>, - kind_eq_ids: &HashMap, - delta_ms: i64, -) { - let feeder_roles = kind_roles.get("coal_feeder"); - let dist_roles = kind_roles.get("distributor"); - let feeder_eq_id = kind_eq_ids.get("coal_feeder").copied(); - let dist_eq_id = kind_eq_ids.get("distributor").copied(); + all_roles: &[(Uuid, HashMap)], +) -> bool { + let monitor = state + .connection_manager + .get_point_monitor_data_read_guard() + .await; - match runtime.state { - UnitRuntimeState::Stopped => { - // stop_time_sec == 0 means start immediately (no wait) - if unit.stop_time_sec > 0 { - runtime.current_stop_elapsed_sec += delta_ms; // field holds ms - if runtime.current_stop_elapsed_sec < unit.stop_time_sec as i64 * 1000 { - return; - } - } - let monitor = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - if let Some((pid, vt)) = - feeder_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor)) - { - drop(monitor); - if let Err(e) = - send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await - { - tracing::warn!("Engine: auto start coal_feeder failed: {}", e); - return; - } - if state.config.simulate_plc { - if let Some(eq_id) = feeder_eq_id { - crate::control::command::simulate_run_feedback(state, eq_id, true).await; - } - } - runtime.state = UnitRuntimeState::Running; - runtime.current_stop_elapsed_sec = 0; - runtime.current_run_elapsed_sec = 0; - } - } + let any_bad = all_roles.iter().flat_map(|(_, r)| r.values()).any(|rp| { + monitor + .get(&rp.point_id) + .map(|m| m.quality != PointQuality::Good) + .unwrap_or(false) + }); - UnitRuntimeState::Running => { - runtime.current_run_elapsed_sec += delta_ms; - runtime.accumulated_run_sec += delta_ms; + let any_flt = all_roles.iter().any(|(_, roles)| { + roles + .get("flt") + .and_then(|rp| monitor.get(&rp.point_id)) + .map(|m| monitor_value_as_bool(m)) + .unwrap_or(false) + }); - // Check RunTime first — stop feeder before considering distributor trigger - if unit.run_time_sec > 0 - && runtime.current_run_elapsed_sec >= unit.run_time_sec as i64 * 1000 - { - let monitor = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - if let Some((pid, vt)) = - feeder_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor)) - { - drop(monitor); - if let Err(e) = - send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await - { - tracing::warn!("Engine: auto stop coal_feeder failed: {}", e); - return; - } - if state.config.simulate_plc { - if let Some(eq_id) = feeder_eq_id { - crate::control::command::simulate_run_feedback(state, eq_id, false) - .await; - } - } - runtime.state = UnitRuntimeState::Stopped; - runtime.current_run_elapsed_sec = 0; - runtime.current_stop_elapsed_sec = 0; - } - return; - } + let flt_eq_id = if any_flt && !runtime.fault_locked { + all_roles + .iter() + .find(|(_, roles)| { + roles + .get("flt") + .and_then(|rp| monitor.get(&rp.point_id)) + .map(|m| monitor_value_as_bool(m)) + .unwrap_or(false) + }) + .map(|(eq_id, _)| *eq_id) + } else { + None + }; - // Check AccTime — stop feeder then trigger distributor - if unit.acc_time_sec > 0 - && runtime.accumulated_run_sec >= unit.acc_time_sec as i64 * 1000 - { - let monitor = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - if let Some((pid, vt)) = - feeder_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor)) - { - drop(monitor); - if let Err(e) = - send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await - { - tracing::warn!("Engine: stop coal_feeder before distributor failed: {}", e); - return; - } - if state.config.simulate_plc { - if let Some(eq_id) = feeder_eq_id { - crate::control::command::simulate_run_feedback(state, eq_id, false) - .await; - } - } - } - runtime.state = UnitRuntimeState::DistributorRunning; - runtime.distributor_run_elapsed_sec = 0; - } - } + drop(monitor); - UnitRuntimeState::DistributorRunning => { - // First tick in this state (distributor_run_elapsed_sec == 0): send start pulse then return. - // Time advance happens on subsequent ticks. - if runtime.distributor_run_elapsed_sec == 0 { - let monitor = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - if let Some((pid, vt)) = - dist_roles.and_then(|r| find_cmd(r, "start_cmd", &monitor)) - { - drop(monitor); - if let Err(e) = - send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await - { - tracing::warn!("Engine: auto start distributor failed: {}", e); - } else if state.config.simulate_plc { - if let Some(eq_id) = dist_eq_id { - crate::control::command::simulate_run_feedback(state, eq_id, true) - .await; - } - } - } - // Mark as "started" by advancing to 1ms so this branch won't re-fire - runtime.distributor_run_elapsed_sec = 1; - return; - } + let prev_comm = runtime.comm_locked; + let prev_flt = runtime.flt_active; + let prev_fault_locked = runtime.fault_locked; + let prev_auto = runtime.auto_enabled; + let prev_ack = runtime.manual_ack_required; - runtime.distributor_run_elapsed_sec += delta_ms; + runtime.comm_locked = any_bad; + runtime.flt_active = any_flt; - if unit.bl_time_sec > 0 - && runtime.distributor_run_elapsed_sec >= unit.bl_time_sec as i64 * 1000 - { - let monitor = state - .connection_manager - .get_point_monitor_data_read_guard() - .await; - if let Some((pid, vt)) = - dist_roles.and_then(|r| find_cmd(r, "stop_cmd", &monitor)) - { - drop(monitor); - if let Err(e) = - send_pulse_command(&state.connection_manager, pid, vt.as_ref(), 300).await - { - tracing::warn!("Engine: auto stop distributor failed: {}", e); - return; - } - if state.config.simulate_plc { - if let Some(eq_id) = dist_eq_id { - crate::control::command::simulate_run_feedback(state, eq_id, false) - .await; - } - } - } - runtime.accumulated_run_sec = 0; - runtime.distributor_run_elapsed_sec = 0; - runtime.state = UnitRuntimeState::Stopped; - runtime.current_stop_elapsed_sec = 0; - } - } - - UnitRuntimeState::FaultLocked | UnitRuntimeState::CommLocked => {} + if !prev_comm && runtime.comm_locked { + let _ = state.event_manager.send(AppEvent::CommLocked { unit_id: unit.id }); + } else if prev_comm && !runtime.comm_locked { + let _ = state.event_manager.send(AppEvent::CommRecovered { unit_id: unit.id }); } + + if let Some(eq_id) = flt_eq_id { + runtime.fault_locked = true; + let _ = state.event_manager.send(AppEvent::FaultLocked { unit_id: unit.id, equipment_id: eq_id }); + if runtime.auto_enabled { + runtime.auto_enabled = false; + let _ = state.event_manager.send(AppEvent::AutoControlStopped { unit_id: unit.id }); + } + } + + if prev_flt && !any_flt && runtime.fault_locked { + if unit.require_manual_ack_after_fault { + runtime.manual_ack_required = true; + } else { + runtime.fault_locked = false; + } + } + + runtime.comm_locked != prev_comm + || runtime.flt_active != prev_flt + || runtime.fault_locked != prev_fault_locked + || runtime.auto_enabled != prev_auto + || runtime.manual_ack_required != prev_ack } -/// Find a command point by role in a single equipment's role map. -/// Returns `None` if REM==0 or FLT==1 or quality is bad. +type EquipMaps = ( + HashMap>, + HashMap, + Vec<(Uuid, HashMap)>, +); + +async fn load_equipment_maps(state: &AppState, unit_id: Uuid) -> Result { + let equipment_list = crate::service::get_equipment_by_unit_id(&state.pool, unit_id).await?; + + let mut kind_roles: HashMap> = HashMap::new(); + let mut kind_eq_ids: HashMap = HashMap::new(); + let mut all_roles: Vec<(Uuid, HashMap)> = Vec::new(); + + for equip in &equipment_list { + let role_points = crate::service::get_equipment_role_points(&state.pool, equip.id).await?; + let role_map: HashMap = role_points + .into_iter() + .map(|rp| (rp.signal_role.clone(), rp)) + .collect(); + + if let Some(kind) = &equip.kind { + if !kind_roles.contains_key(kind.as_str()) { + kind_roles.insert(kind.clone(), role_map.clone()); + kind_eq_ids.insert(kind.clone(), equip.id); + } else { + tracing::warn!( + "Engine: unit {} has multiple {} equipment; using first", + unit_id, kind + ); + } + } + all_roles.push((equip.id, role_map)); + } + + Ok((kind_roles, kind_eq_ids, all_roles)) +} + +/// Find a command point by role. Returns `None` if REM==0, FLT==1, or quality is bad. fn find_cmd( roles: &HashMap, role: &str, diff --git a/src/control/runtime.rs b/src/control/runtime.rs index 12e9e46..959b9b5 100644 --- a/src/control/runtime.rs +++ b/src/control/runtime.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use chrono::{DateTime, Utc}; -use tokio::sync::RwLock; +use tokio::sync::{Notify, RwLock}; use uuid::Uuid; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -20,14 +19,12 @@ pub struct UnitRuntime { pub state: UnitRuntimeState, pub auto_enabled: bool, pub accumulated_run_sec: i64, - pub current_run_elapsed_sec: i64, - pub current_stop_elapsed_sec: i64, - pub distributor_run_elapsed_sec: i64, + /// Snapshot updated only on state transitions; used for display to avoid mid-tick jitter. + pub display_acc_sec: i64, pub fault_locked: bool, pub flt_active: bool, pub comm_locked: bool, pub manual_ack_required: bool, - pub last_tick_at: Option>, } impl UnitRuntime { @@ -37,14 +34,11 @@ impl UnitRuntime { state: UnitRuntimeState::Stopped, auto_enabled: false, accumulated_run_sec: 0, - current_run_elapsed_sec: 0, - current_stop_elapsed_sec: 0, - distributor_run_elapsed_sec: 0, + display_acc_sec: 0, fault_locked: false, flt_active: false, comm_locked: false, manual_ack_required: false, - last_tick_at: None, } } } @@ -52,6 +46,7 @@ impl UnitRuntime { #[derive(Clone, Default)] pub struct ControlRuntimeStore { inner: Arc>>, + notifiers: Arc>>>, } impl ControlRuntimeStore { @@ -76,4 +71,22 @@ impl ControlRuntimeStore { pub async fn upsert(&self, runtime: UnitRuntime) { self.inner.write().await.insert(runtime.unit_id, runtime); } + + pub async fn get_or_create_notify(&self, unit_id: Uuid) -> Arc { + let read = self.notifiers.read().await; + if let Some(n) = read.get(&unit_id) { + return n.clone(); + } + drop(read); + let n = Arc::new(Notify::new()); + self.notifiers.write().await.insert(unit_id, n.clone()); + n + } + + /// Wake the engine task for a unit (e.g., when auto_enabled or fault_locked changes). + pub async fn notify_unit(&self, unit_id: Uuid) { + if let Some(n) = self.notifiers.read().await.get(&unit_id) { + n.notify_one(); + } + } } diff --git a/src/event.rs b/src/event.rs index f3bbb91..c155761 100644 --- a/src/event.rs +++ b/src/event.rs @@ -374,12 +374,7 @@ async fn persist_event_if_needed( format!("Unit {} communication recovered", unit_id), serde_json::json!({ "unit_id": unit_id }), )), - AppEvent::UnitStateChanged { unit_id, from_state, to_state } => Some(( - "unit.state_changed", "info", - Some(*unit_id), None, None, - format!("Unit {} state: {} → {}", unit_id, from_state, to_state), - serde_json::json!({ "unit_id": unit_id, "from": from_state, "to": to_state }), - )), + AppEvent::UnitStateChanged { .. } => None, AppEvent::PointNewValue(_) => None, }; diff --git a/src/handler/control.rs b/src/handler/control.rs index 1da822c..0bac201 100644 --- a/src/handler/control.rs +++ b/src/handler/control.rs @@ -385,8 +385,8 @@ pub async fn start_auto_unit( let mut runtime = state.control_runtime.get_or_init(unit_id).await; runtime.auto_enabled = true; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; - runtime.current_stop_elapsed_sec = 0; state.control_runtime.upsert(runtime).await; + state.control_runtime.notify_unit(unit_id).await; let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStarted { unit_id }); @@ -404,6 +404,7 @@ pub async fn stop_auto_unit( let mut runtime = state.control_runtime.get_or_init(unit_id).await; runtime.auto_enabled = false; state.control_runtime.upsert(runtime).await; + state.control_runtime.notify_unit(unit_id).await; let _ = state.event_manager.send(crate::event::AppEvent::AutoControlStopped { unit_id }); @@ -429,8 +430,8 @@ pub async fn batch_start_auto( } runtime.auto_enabled = true; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; - runtime.current_stop_elapsed_sec = 0; state.control_runtime.upsert(runtime).await; + state.control_runtime.notify_unit(unit.id).await; let _ = state .event_manager .send(crate::event::AppEvent::AutoControlStarted { unit_id: unit.id }); @@ -453,6 +454,7 @@ pub async fn batch_stop_auto( } runtime.auto_enabled = false; state.control_runtime.upsert(runtime).await; + state.control_runtime.notify_unit(unit.id).await; let _ = state .event_manager .send(crate::event::AppEvent::AutoControlStopped { unit_id: unit.id }); @@ -489,6 +491,7 @@ pub async fn ack_fault_unit( runtime.manual_ack_required = false; runtime.state = crate::control::runtime::UnitRuntimeState::Stopped; state.control_runtime.upsert(runtime).await; + state.control_runtime.notify_unit(unit_id).await; let _ = state.event_manager.send(crate::event::AppEvent::FaultAcked { unit_id }); diff --git a/web/js/ops.js b/web/js/ops.js index 410c1ac..2627ee9 100644 --- a/web/js/ops.js +++ b/web/js/ops.js @@ -42,7 +42,7 @@ export function renderOpsUnits() {
${runtimeBadge(runtime)} ${unit.enabled ? "EN" : "DIS"} - ${runtime ? `Acc ${Math.floor(runtime.accumulated_run_sec / 1000)}s` : ""} + ${runtime ? `Acc ${Math.floor(runtime.display_acc_sec / 1000)}s` : ""}
`; diff --git a/web/js/units.js b/web/js/units.js index ef6f686..c654ddc 100644 --- a/web/js/units.js +++ b/web/js/units.js @@ -117,7 +117,7 @@ export function renderUnits() { ${unit.enabled ? "EN" : "DIS"}
${unit.name}
-
设备 ${equipmentCount(unit.id)} 台 | Acc ${runtime ? Math.floor(runtime.accumulated_run_sec / 1000) : 0}s
+
设备 ${equipmentCount(unit.id)} 台 | Acc ${runtime ? Math.floor(runtime.display_acc_sec / 1000) : 0}s
Run ${unit.run_time_sec}s / Stop ${unit.stop_time_sec}s / Acc ${unit.acc_time_sec}s / BL ${unit.bl_time_sec}s
`; diff --git a/web/styles.css b/web/styles.css index 6848e1d..a4e749e 100644 --- a/web/styles.css +++ b/web/styles.css @@ -895,6 +895,7 @@ button.danger:hover { background: var(--danger-hover); } border-bottom: 1px solid var(--border); white-space: nowrap; overflow: hidden; + flex-shrink: 0; } .event-card:hover {