From 4e3d325437eaeb4b4fabc44f866a74a60e296745 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 24 Mar 2026 10:20:23 +0800 Subject: [PATCH] feat(control): add unit and event foundation --- docs/投煤器布料机功能实现方案.md | 561 ++++++++++++++++++ .../20260324090000_add_unit_and_event.sql | 38 ++ src/event.rs | 76 +++ src/handler.rs | 1 + src/handler/control.rs | 248 ++++++++ src/handler/equipment.rs | 60 +- src/main.rs | 18 + src/model.rs | 33 ++ src/service.rs | 2 + src/service/control.rs | 303 ++++++++++ src/service/equipment.rs | 36 +- src/util/pagination.rs | 2 +- 12 files changed, 1374 insertions(+), 4 deletions(-) create mode 100644 docs/投煤器布料机功能实现方案.md create mode 100644 migrations/20260324090000_add_unit_and_event.sql create mode 100644 src/handler/control.rs create mode 100644 src/service/control.rs diff --git a/docs/投煤器布料机功能实现方案.md b/docs/投煤器布料机功能实现方案.md new file mode 100644 index 0000000..1d38719 --- /dev/null +++ b/docs/投煤器布料机功能实现方案.md @@ -0,0 +1,561 @@ +# 投煤器布料机远程监控与控制功能实现方案 + +## 1. 目标 + +基于当前 `plc_control` 项目,扩展出面向投煤器与布料机的业务化远程监控与控制能力,满足以下目标: + +- 实时监控投煤器、布料机运行状态 +- 支持远程手动启停 +- 支持投煤器自动定时运行 +- 支持投煤累计触发布料机自动运行 +- 支持故障锁定、人工复位、通讯异常冻结 +- 支持通过配置适配不同现场,不改代码完成项目复用 + +## 2. 现有系统能力盘点 + +当前项目已经具备较好的通用工业采集平台基础: + +- OPC UA 数据源接入与自动重连 +- 节点浏览、批量建点、点位实时订阅 +- 点位批量写入能力 +- 设备 `equipment` 模型 +- 点位到设备绑定 `equipment_id` +- 点位信号角色字段 `signal_role` +- WebSocket 实时推送 +- 前端设备、点位、日志、趋势图基础界面 +- 页面配置 `page` 能力 +- 进程内事件总线 `event.rs` + +现状更接近“通用点位监控平台”,还不是“投煤器/布料机业务控制系统”。 + +## 3. 与需求的差距 + +需求文档要求的软件能力,当前尚未落地的核心部分如下: + +- 缺少“控制单元”概念,无法表达一组投煤器对应一组布料机 +- 缺少业务配置模型,无法配置 `RunTime`、`StopTime`、`AccTime`、`BLTime` +- 缺少设备业务类型约束,尚未明确区分投煤器、布料机 +- 缺少设备归属单元字段,尚未形成 `unit -> equipment -> point` 的业务链路 +- 缺少业务信号角色规范,尚未标准化 `REM/RUN/FLT/STA/STP` +- 缺少自动控制状态机 +- 缺少故障锁定与人工确认恢复流程 +- 缺少通讯异常冻结与恢复后重同步机制 +- 缺少脉冲写入封装,当前只有通用批量写点 +- 缺少单元总览、设备详情、控制面板、报警面板等业务界面 +- 缺少统一事件持久化与后续报警模型 + +## 4. 推荐实现思路 + +推荐在现有平台上“增量扩展”,而不是重写: + +- 保留现有 `source/node/point/equipment` 通用底座 +- 新增面向业务控制的配置表和运行态管理 +- 将控制逻辑放在 Rust 服务端,复用当前 OPC UA 连接和写点能力 +- 前端增加业务页面,不破坏现有通用点位管理页面 +- 基于现有 `event.rs` 扩展事件体系,而不是再造一套事件机制 + +这样做的好处是: + +- 现有 OPC UA、点位、设备、WebSocket 基础都能继续复用 +- 后续不同现场只需要换设备映射和参数 +- 手动调试仍然可以通过现有点位/设备页面完成 +- 未来做报警、审计、统计时可以直接复用统一事件体系 + +## 5. 命名与模型设计 + +### 5.1 命名原则 + +建议区分“代码命名”和“表命名”: + +- 代码模型名保留语义完整性,如 `ControlUnit` +- 数据库表名尽量简洁,如 `unit` + +不建议使用 `group` 作为表名,原因是语义过泛,后续容易与权限分组、界面分组、标签分组等概念冲突。 + +### 5.2 设备分类 + +继续复用 `equipment` 表中的 `kind` 字段,约定: + +- `coal_feeder`:投煤器 +- `distributor`:布料机 + +### 5.3 点位角色规范 + +继续复用 `point.signal_role`,建议统一枚举值: + +- 状态点:`rem` `run` `flt` `ii` `q` +- 控制点:`start_cmd` `stop_cmd` +- 可选扩展:`estop` `mode_auto` `mode_manual` `reset_cmd` + +这样每台设备都可以通过“设备 + 点位角色”完成映射,而不是在代码里写死点名。 + +### 5.4 新增 `unit` 表 + +建议新增 `unit` 表,对应代码模型 `ControlUnit`,表示一个业务控制单元。 + +建议字段: + +- `id` +- `code` +- `name` +- `description` +- `enabled` +- `run_time_sec` +- `stop_time_sec` +- `acc_time_sec` +- `bl_time_sec` +- `require_manual_ack_after_fault` +- `created_at` +- `updated_at` + +### 5.5 设备直接归属 Unit + +当前业务前提下,一台设备只属于一个控制单元,不会跨单元复用,因此不建议单独建立关系表。 + +更合适的方式是直接在 `equipment` 表新增: + +- `unit_id` + +这样模型会更简单: + +- 一个 `unit` 对多台 `equipment` +- 一台 `equipment` 只属于一个 `unit` +- `equipment.kind` 用于区分 `coal_feeder` 和 `distributor` + +如果后续现场出现“一台设备可挂多个单元”或“单元内设备编排顺序复杂”的需求,再演进成关系表会更合适。第一阶段不建议设计过重。 + +### 5.6 Unit 运行态放内存 + +`unit` 运行态不建议优先落数据库,建议由控制引擎保存在内存中。 + +建议维护的内存运行态字段: + +- `state` +- `accumulated_run_sec` +- `current_run_elapsed_sec` +- `current_stop_elapsed_sec` +- `distributor_run_elapsed_sec` +- `fault_locked` +- `comm_locked` +- `manual_ack_required` +- `last_tick_at` + +状态值建议: + +- `stopped` +- `running` +- `distributor_running` +- `fault_locked` +- `comm_locked` + +原因: + +- 这些字段变化频率高,不适合高频写库 +- 服务重启后直接恢复旧控制态并不安全 +- 更合理的方式是重启后重新读取 `REM/RUN/FLT/Q` 并重建运行态 +- 通讯恢复或服务重启后不自动补发控制命令,更符合工业控制安全原则 + +如后续确实需要“断电恢复上下文”或运行分析,再补充轻量级快照能力即可,但不是第一阶段必须项。 + +### 5.7 统一 `event` 表,预留 `alarm` 表 + +建议不要命名为 `control_event`,而是使用统一的 `event` 表。 + +原因: + +- 当前不仅有控制事件,后续还会有配置事件、通讯事件、数据源事件 +- `event` 更适合作为统一审计与业务时间线 +- 现有 `event.rs` 已经是进程内事件总线,命名保持一致更自然 + +建议 `event` 表记录: + +- 手动启动/停止 +- 自动启动/停止 +- 故障锁定 +- 人工解除故障锁定 +- 通讯异常/恢复 +- 数据源创建、更新、删除 +- 控制参数变更 +- 关键状态切换 + +关键字段建议: + +- `id` +- `event_type` +- `level` +- `unit_id` +- `equipment_id` +- `source_id` +- `message` +- `payload` +- `created_at` + +同时建议未来单独设计 `alarm` 表,而不是把报警状态硬塞进 `event` 表。 + +原因: + +- 报警通常有独立生命周期:触发、确认、恢复、清除 +- 报警需要独立字段,如 `severity`、`active`、`acked`、`acked_by`、`acked_at`、`cleared_at` +- 把报警硬塞到 `event` 中会让通用事件表越来越臃肿 + +因此推荐边界是: + +- `event`:记录“发生了什么” +- `alarm`:记录“需要被告警管理的异常” + +第一阶段可以先只落 `event` 表,`alarm` 表先在方案中预留,不急着实现。 + +## 6. 控制逻辑设计 + +### 6.1 手动控制 + +手动控制前置校验: + +- `REM == 1` +- `FLT == 0` +- 通讯正常 +- 未处于故障锁定 +- 如有急停点,`ESTOP == 0` + +控制命令统一走脉冲写入: + +1. 写入 `1` +2. 延时 `200-500ms` +3. 写回 `0` + +服务端需要封装 `pulse_write(point_id, high_ms)`,前端不能直接拼两次写点。 + +### 6.2 自动控制状态机 + +每个 `unit` 独立维护状态机。 + +#### `STOPPED` + +- 累计停止时间 +- 若 `stop_elapsed >= StopTime`,尝试启动投煤器 +- 成功后切换到 `RUNNING` + +#### `RUNNING` + +- 累计运行时间 +- `accumulated_run_sec += delta` +- 若 `run_elapsed >= RunTime`,尝试停止投煤器,切换回 `STOPPED` +- 若 `accumulated_run_sec >= AccTime`,进入布料机触发流程 + +#### `DISTRIBUTOR_RUNNING` + +- 校验布料机 `REM/FLT/通讯` +- 启动布料机 +- 等待 `RUN == 1` 反馈 +- 计时 `BLTime` +- 停止布料机 +- 累计时间清零 +- 回到 `STOPPED` 或回到自动节拍起点 + +### 6.3 故障机制 + +任意设备检测到 `FLT == 1`: + +- 停止该单元自动控制 +- 标记 `fault_locked = true` +- 禁止再次自动发命令 +- 发送并持久化关键事件 + +当 `FLT` 从 `1 -> 0` 恢复: + +- 不自动解锁 +- 标记 `manual_ack_required = true` +- 等待人工在界面点击“解除故障锁定” + +### 6.4 通讯异常机制 + +当质量位异常或 OPC 连接中断: + +- 标记 `comm_locked = true` +- 冻结全部控制动作 +- 前端按钮灰化 +- 不允许任何自动/手动写入 + +通讯恢复后: + +- 重新读取 `REM/RUN/FLT` +- 重同步运行态 +- 不自动补发控制命令 +- 发送并持久化恢复事件 +- 等待人工操作或下一次自动触发 + +## 7. 事件体系设计 + +### 7.1 继续复用 `src/event.rs` + +建议不要另起一套业务事件中心,而是在现有 [src/event.rs](D:/projects/plc_control/src/event.rs) 上扩展。 + +当前它已经承担两类职责: + +- 控制类内部事件分发 +- 遥测类高频事件分发 + +推荐继续保留这个结构: + +- `AppEvent` 作为统一进程内事件枚举 +- 高频遥测事件继续走内存和 WebSocket +- 低频且有审计价值的事件选择性落库到 `event` 表 + +### 7.2 哪些事件适合落库 + +适合落库的: + +- `SourceCreate` +- `SourceUpdate` +- `SourceDelete` +- 自动控制启动/停止 +- 手动启动/停止命令发送 +- 故障锁定 +- 人工确认恢复 +- 通讯异常/恢复 +- 参数配置变更 +- 单元状态切换 + +不适合直接落库的: + +- `PointNewValue` +- 高频实时遥测 +- 细碎的内部轮询过程 + +### 7.3 推荐扩展方向 + +建议在 `AppEvent` 中逐步增加业务事件,例如: + +- `AutoControlStarted` +- `AutoControlStopped` +- `EquipmentStartCommandSent` +- `EquipmentStopCommandSent` +- `FaultLocked` +- `FaultAcked` +- `CommLocked` +- `CommRecovered` +- `UnitStateChanged` + +这样后续无论是写日志、落库、推送 WebSocket、做报警触发,都可以基于同一个事件入口。 + +## 8. 后端改造方案 + +### 8.1 新增模块 + +建议新增: + +- `src/handler/control.rs` +- `src/service/control.rs` +- `src/control/engine.rs` +- `src/control/runtime.rs` +- `src/control/validator.rs` + +职责划分: + +- `handler`:HTTP 接口 +- `service`:数据库读写 +- `control/engine`:状态机与调度 +- `control/runtime`:内存运行态缓存与同步 +- `control/validator`:控制前置校验 + +### 8.2 新增接口 + +建议新增接口: + +- `GET /api/control/unit` +- `POST /api/control/unit` +- `PUT /api/control/unit/{id}` +- `GET /api/control/unit/{id}` +- `POST /api/control/unit/{id}/start-auto` +- `POST /api/control/unit/{id}/stop-auto` +- `POST /api/control/unit/{id}/ack-fault` +- `POST /api/control/equipment/{id}/start` +- `POST /api/control/equipment/{id}/stop` +- `GET /api/events` + +说明: + +- 设备手动控制必须走业务接口,不建议继续直接暴露给页面做原始点位写入 +- 原 `/api/point/value/batch` 保留给调试或底层工具能力 +- 事件查询接口可以直接面向统一 `event` 表 + +### 8.3 控制引擎运行方式 + +建议服务启动后增加一个后台任务: + +- 每 `500ms` 或 `1s` 扫描所有启用的 `unit` +- 从内存读取运行态缓存 +- 从当前点位监控数据中取 `REM/RUN/FLT/Q` +- 驱动状态机执行 + +控制引擎不要直接查 OPC,应复用当前 `connection_manager` 已维护的实时点值。 + +### 8.4 关键复用点 + +可直接复用当前已有能力: + +- `connection_manager` 的点位实时缓存 +- `get_point_monitor_data_read_guard` +- 批量写点能力 +- WebSocket 实时推送 +- `event.rs` 的统一事件入口 +- `unit_id + equipment.kind + point.signal_role` 的业务映射关系 + +## 9. 前端改造方案 + +建议在现有通用页面之外新增业务页面,避免混杂。 + +### 9.1 新增页面 + +- 单元总览页 +- 单元详情页 +- 设备控制面板 +- 事件记录页 +- 报警页 +- 参数配置页 + +### 9.2 单元总览页内容 + +每个 `unit` 展示: + +- 单元名称 +- 自动/手动状态 +- 当前状态机状态 +- 投煤器运行状态 +- 布料机运行状态 +- 当前累计运行时间 +- 故障锁定状态 +- 通讯状态 + +并提供按钮: + +- 启动自动 +- 停止自动 +- 故障确认/解除锁定 + +### 9.3 设备控制页内容 + +针对单台投煤器/布料机提供: + +- REM/RUN/FLT/Q/II 实时显示 +- 启动按钮 +- 停止按钮 +- 通讯异常、故障锁定提示 +- 最近事件 + +### 9.4 趋势、事件与报警 + +复用已有趋势图能力: + +- 电流 `II` 趋势 +- 运行状态变化曲线 +- 事件时间线 + +后续报警页面基于独立 `alarm` 表实现: + +- 当前活动报警 +- 已确认报警 +- 已恢复报警 +- 报警确认操作 + +## 10. 分阶段实施建议 + +### 第一阶段:最小可用版 + +目标:先让系统具备业务闭环,但不追求复杂页面。 + +内容: + +- 新增 `unit` 表 +- 为 `equipment` 增加 `unit_id` +- 约定设备 `kind` 和点位 `signal_role` +- 新增手动控制接口 +- 实现脉冲写入 +- 实现故障锁定与通讯冻结 +- 实现自动控制状态机 +- 基于 `event.rs` 落统一 `event` 表 +- 前端增加一个“控制单元”面板和事件列表 + +交付后即可验证: + +- 单台投煤器启停 +- 单元级自动启停 +- 累计触发布料机运行 +- 故障恢复后人工确认 +- 关键操作和状态切换可追溯 + +### 第二阶段:增强版 + +内容: + +- 单元总览页 +- 单元详情页 +- 参数在线编辑 +- 更丰富的趋势图 +- WebSocket 业务事件推送 +- 报警规则与 `alarm` 表 +- 报警确认与恢复流程 + +### 第三阶段:现场适配版 + +内容: + +- 导入导出配置 +- 项目模板 +- 配置校验工具 +- 启停联锁自检 +- 操作权限控制 + +## 11. 建议优先落地顺序 + +从当前代码基础出发,建议按下面顺序开发: + +1. 补齐业务数据模型和数据库迁移 +2. 新增 `unit` 表并为 `equipment` 增加 `unit_id` +3. 规范 `equipment.kind` 与 `point.signal_role` +4. 实现服务端脉冲写入能力 +5. 实现手动控制接口 +6. 实现 `unit` 自动控制状态机 +7. 扩展 `event.rs` 并实现统一 `event` 表持久化 +8. 实现故障锁定、通讯冻结、人工确认 +9. 增加前端业务页面 +10. 第二阶段再引入独立 `alarm` 表 + +## 12. 对当前代码的具体落点 + +基于现有代码,建议主要改动点如下: + +- [src/model.rs](D:/projects/plc_control/src/model.rs) + - 增加 `ControlUnit` 模型 + - 为 `Equipment` 增加 `unit_id` + - 后续增加 `EventRecord`、`AlarmRecord` 模型 +- [src/event.rs](D:/projects/plc_control/src/event.rs) + - 扩展 `AppEvent` 业务事件类型 + - 增加低频关键事件的持久化逻辑 +- [src/main.rs](D:/projects/plc_control/src/main.rs) + - 注册控制相关路由 + - 启动控制引擎后台任务 +- [src/handler/point.rs](D:/projects/plc_control/src/handler/point.rs) + - 保留底层写点接口,不直接承载业务控制 +- [src/handler/equipment.rs](D:/projects/plc_control/src/handler/equipment.rs) + - 继续作为设备基础资料管理 +- [web/index.html](D:/projects/plc_control/web/index.html) + - 增加业务控制页面入口或独立面板 +- [web/js/app.js](D:/projects/plc_control/web/js/app.js) + - 增加控制单元页面事件绑定 + +## 13. 本次结论 + +当前项目不需要推倒重来,可以直接演进成投煤器与布料机远程监控控制系统。 + +最合理的路径是: + +- 以现有 OPC UA 与点位平台为底座 +- 数据库使用简洁表名 `unit`、`event` +- 代码层保留语义化命名,如 `ControlUnit`、`AppEvent` +- 在 `equipment` 上直接增加 `unit_id` +- 在服务端以内存运行态实现状态机和脉冲控制 +- 在现有 [src/event.rs](D:/projects/plc_control/src/event.rs) 上扩展统一事件体系 +- 第一阶段先做统一 `event`,第二阶段再拆分独立 `alarm` + +如果进入下一步开发,建议先做“第一阶段最小可用版”。 diff --git a/migrations/20260324090000_add_unit_and_event.sql b/migrations/20260324090000_add_unit_and_event.sql new file mode 100644 index 0000000..b501455 --- /dev/null +++ b/migrations/20260324090000_add_unit_and_event.sql @@ -0,0 +1,38 @@ +CREATE TABLE unit ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + code TEXT NOT NULL, + name TEXT NOT NULL, + description TEXT, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + run_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (run_time_sec >= 0), + stop_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (stop_time_sec >= 0), + acc_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (acc_time_sec >= 0), + bl_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (bl_time_sec >= 0), + require_manual_ack_after_fault BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE (code) +); + +ALTER TABLE equipment + ADD COLUMN unit_id UUID REFERENCES unit(id) ON DELETE SET NULL; + +CREATE INDEX idx_equipment_unit_id ON equipment(unit_id); + +CREATE TABLE event ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_type TEXT NOT NULL, + level TEXT NOT NULL DEFAULT 'info', + unit_id UUID REFERENCES unit(id) ON DELETE SET NULL, + equipment_id UUID REFERENCES equipment(id) ON DELETE SET NULL, + source_id UUID REFERENCES source(id) ON DELETE SET NULL, + message TEXT NOT NULL, + payload JSONB, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX idx_event_created_at ON event(created_at DESC); +CREATE INDEX idx_event_type ON event(event_type); +CREATE INDEX idx_event_unit_id ON event(unit_id); +CREATE INDEX idx_event_equipment_id ON event(equipment_id); +CREATE INDEX idx_event_source_id ON event(source_id); diff --git a/src/event.rs b/src/event.rs index 7811af2..c6b0004 100644 --- a/src/event.rs +++ b/src/event.rs @@ -124,6 +124,8 @@ async fn handle_control_event( pool: &sqlx::PgPool, connection_manager: &std::sync::Arc, ) { + persist_event_if_needed(&event, pool).await; + match event { AppEvent::SourceCreate { source_id } => { tracing::info!("Processing SourceCreate event for {}", source_id); @@ -186,6 +188,80 @@ async fn handle_control_event( } } +async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) { + let record = match event { + AppEvent::SourceCreate { source_id } => Some(( + "source.created", + "info", + None, + None, + Some(*source_id), + format!("Source {} created", source_id), + serde_json::json!({ "source_id": source_id }), + )), + AppEvent::SourceUpdate { source_id } => Some(( + "source.updated", + "info", + None, + None, + Some(*source_id), + format!("Source {} updated", source_id), + serde_json::json!({ "source_id": source_id }), + )), + AppEvent::SourceDelete { source_id } => Some(( + "source.deleted", + "warn", + None, + None, + Some(*source_id), + format!("Source {} deleted", source_id), + serde_json::json!({ "source_id": source_id }), + )), + AppEvent::PointCreateBatch { source_id, point_ids } => Some(( + "point.batch_created", + "info", + None, + None, + Some(*source_id), + format!("{} points created for source {}", point_ids.len(), source_id), + serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + )), + AppEvent::PointDeleteBatch { source_id, point_ids } => Some(( + "point.batch_deleted", + "warn", + None, + None, + Some(*source_id), + format!("{} points deleted for source {}", point_ids.len(), source_id), + serde_json::json!({ "source_id": source_id, "point_ids": point_ids }), + )), + AppEvent::PointNewValue(_) => None, + }; + + let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else { + return; + }; + + if let Err(err) = sqlx::query( + r#" + INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + ) + .bind(event_type) + .bind(level) + .bind(unit_id as Option) + .bind(equipment_id as Option) + .bind(source_id) + .bind(message) + .bind(sqlx::types::Json(payload)) + .execute(pool) + .await + { + tracing::warn!("Failed to persist event: {}", err); + } +} + async fn process_point_new_value( payload: crate::telemetry::PointNewValue, connection_manager: &std::sync::Arc, diff --git a/src/handler.rs b/src/handler.rs index a0382b6..c9befdf 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,3 +1,4 @@ +pub mod control; pub mod doc; pub mod equipment; pub mod log; diff --git a/src/handler/control.rs b/src/handler/control.rs new file mode 100644 index 0000000..2dea52b --- /dev/null +++ b/src/handler/control.rs @@ -0,0 +1,248 @@ +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use serde::Deserialize; +use uuid::Uuid; +use validator::Validate; + +use crate::{ + util::{ + pagination::{PaginatedResponse, PaginationParams}, + response::ApiErr, + }, + AppState, +}; + +#[derive(Debug, Deserialize, Validate)] +pub struct GetUnitListQuery { + #[validate(length(min = 1, max = 100))] + pub keyword: Option, + #[serde(flatten)] + pub pagination: PaginationParams, +} + +pub async fn get_unit_list( + State(state): State, + Query(query): Query, +) -> Result { + query.validate()?; + + let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?; + let data = crate::service::get_units_paginated( + &state.pool, + query.keyword.as_deref(), + query.pagination.page_size, + query.pagination.offset(), + ) + .await?; + + Ok(Json(PaginatedResponse::new( + data, + total, + query.pagination.page, + query.pagination.page_size, + ))) +} + +pub async fn get_unit( + State(state): State, + Path(unit_id): Path, +) -> Result { + match crate::service::get_unit_by_id(&state.pool, unit_id).await? { + Some(unit) => Ok(Json(unit)), + None => Err(ApiErr::NotFound("Unit not found".to_string(), None)), + } +} + +#[derive(Debug, Deserialize, Validate)] +pub struct CreateUnitReq { + #[validate(length(min = 1, max = 100))] + pub code: String, + #[validate(length(min = 1, max = 100))] + pub name: String, + pub description: Option, + pub enabled: Option, + #[validate(range(min = 0))] + pub run_time_sec: Option, + #[validate(range(min = 0))] + pub stop_time_sec: Option, + #[validate(range(min = 0))] + pub acc_time_sec: Option, + #[validate(range(min = 0))] + pub bl_time_sec: Option, + pub require_manual_ack_after_fault: Option, +} + +pub async fn create_unit( + State(state): State, + Json(payload): Json, +) -> Result { + payload.validate()?; + + if crate::service::get_unit_by_code(&state.pool, &payload.code) + .await? + .is_some() + { + return Err(ApiErr::BadRequest( + "Unit code already exists".to_string(), + None, + )); + } + + let unit_id = crate::service::create_unit( + &state.pool, + crate::service::CreateUnitParams { + code: &payload.code, + name: &payload.name, + description: payload.description.as_deref(), + enabled: payload.enabled.unwrap_or(true), + run_time_sec: payload.run_time_sec.unwrap_or(0), + stop_time_sec: payload.stop_time_sec.unwrap_or(0), + acc_time_sec: payload.acc_time_sec.unwrap_or(0), + bl_time_sec: payload.bl_time_sec.unwrap_or(0), + require_manual_ack_after_fault: payload + .require_manual_ack_after_fault + .unwrap_or(true), + }, + ) + .await?; + + Ok(( + StatusCode::CREATED, + Json(serde_json::json!({ + "id": unit_id, + "ok_msg": "Unit created successfully" + })), + )) +} + +#[derive(Debug, Deserialize, Validate)] +pub struct UpdateUnitReq { + #[validate(length(min = 1, max = 100))] + pub code: Option, + #[validate(length(min = 1, max = 100))] + pub name: Option, + pub description: Option, + pub enabled: Option, + #[validate(range(min = 0))] + pub run_time_sec: Option, + #[validate(range(min = 0))] + pub stop_time_sec: Option, + #[validate(range(min = 0))] + pub acc_time_sec: Option, + #[validate(range(min = 0))] + pub bl_time_sec: Option, + pub require_manual_ack_after_fault: Option, +} + +pub async fn update_unit( + State(state): State, + Path(unit_id): Path, + Json(payload): Json, +) -> Result { + payload.validate()?; + + if crate::service::get_unit_by_id(&state.pool, unit_id) + .await? + .is_none() + { + return Err(ApiErr::NotFound("Unit not found".to_string(), None)); + } + + if let Some(code) = payload.code.as_deref() { + let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?; + if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { + return Err(ApiErr::BadRequest( + "Unit code already exists".to_string(), + None, + )); + } + } + + if payload.code.is_none() + && payload.name.is_none() + && payload.description.is_none() + && payload.enabled.is_none() + && payload.run_time_sec.is_none() + && payload.stop_time_sec.is_none() + && payload.acc_time_sec.is_none() + && payload.bl_time_sec.is_none() + && payload.require_manual_ack_after_fault.is_none() + { + return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); + } + + crate::service::update_unit( + &state.pool, + unit_id, + crate::service::UpdateUnitParams { + code: payload.code.as_deref(), + name: payload.name.as_deref(), + description: payload.description.as_deref(), + enabled: payload.enabled, + run_time_sec: payload.run_time_sec, + stop_time_sec: payload.stop_time_sec, + acc_time_sec: payload.acc_time_sec, + bl_time_sec: payload.bl_time_sec, + require_manual_ack_after_fault: payload.require_manual_ack_after_fault, + }, + ) + .await?; + + Ok(Json(serde_json::json!({ + "ok_msg": "Unit updated successfully" + }))) +} + +pub async fn delete_unit( + State(state): State, + Path(unit_id): Path, +) -> Result { + let deleted = crate::service::delete_unit(&state.pool, unit_id).await?; + if !deleted { + return Err(ApiErr::NotFound("Unit not found".to_string(), None)); + } + + Ok(StatusCode::NO_CONTENT) +} + +#[derive(Debug, Deserialize, Validate)] +pub struct GetEventListQuery { + pub unit_id: Option, + #[validate(length(min = 1, max = 100))] + pub event_type: Option, + #[serde(flatten)] + pub pagination: PaginationParams, +} + +pub async fn get_event_list( + State(state): State, + Query(query): Query, +) -> Result { + query.validate()?; + + let total = crate::service::get_events_count( + &state.pool, + query.unit_id, + query.event_type.as_deref(), + ) + .await?; + let data = crate::service::get_events_paginated( + &state.pool, + query.unit_id, + query.event_type.as_deref(), + query.pagination.page_size, + query.pagination.offset(), + ) + .await?; + + Ok(Json(PaginatedResponse::new( + data, + total, + query.pagination.page, + query.pagination.page_size, + ))) +} diff --git a/src/handler/equipment.rs b/src/handler/equipment.rs index 3557076..c657a43 100644 --- a/src/handler/equipment.rs +++ b/src/handler/equipment.rs @@ -79,6 +79,7 @@ pub async fn get_equipment_points( #[derive(Debug, Deserialize, Validate)] pub struct CreateEquipmentReq { + pub unit_id: Option, #[validate(length(min = 1, max = 100))] pub code: String, #[validate(length(min = 1, max = 100))] @@ -89,6 +90,7 @@ pub struct CreateEquipmentReq { #[derive(Debug, Deserialize, Validate)] pub struct UpdateEquipmentReq { + pub unit_id: Option>, #[validate(length(min = 1, max = 100))] pub code: Option, #[validate(length(min = 1, max = 100))] @@ -97,6 +99,12 @@ pub struct UpdateEquipmentReq { pub description: Option, } +#[derive(Debug, Deserialize, Validate)] +pub struct BatchSetEquipmentUnitReq { + pub equipment_ids: Vec, + pub unit_id: Option, +} + pub async fn create_equipment( State(state): State, Json(payload): Json, @@ -111,8 +119,16 @@ pub async fn create_equipment( )); } + if let Some(unit_id) = payload.unit_id { + let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; + if unit_exists.is_none() { + return Err(ApiErr::NotFound("Unit not found".to_string(), None)); + } + } + let equipment_id = crate::service::create_equipment( &state.pool, + payload.unit_id, &payload.code, &payload.name, payload.kind.as_deref(), @@ -136,7 +152,8 @@ pub async fn update_equipment( ) -> Result { payload.validate()?; - if payload.code.is_none() + if payload.unit_id.is_none() + && payload.code.is_none() && payload.name.is_none() && payload.kind.is_none() && payload.description.is_none() @@ -149,6 +166,13 @@ pub async fn update_equipment( return Err(ApiErr::NotFound("Equipment not found".to_string(), None)); } + if let Some(Some(unit_id)) = payload.unit_id { + let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; + if unit_exists.is_none() { + return Err(ApiErr::NotFound("Unit not found".to_string(), None)); + } + } + if let Some(code) = payload.code.as_deref() { let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?; if duplicate @@ -165,6 +189,7 @@ pub async fn update_equipment( crate::service::update_equipment( &state.pool, equipment_id, + payload.unit_id, payload.code.as_deref(), payload.name.as_deref(), payload.kind.as_deref(), @@ -177,6 +202,39 @@ pub async fn update_equipment( }))) } +pub async fn batch_set_equipment_unit( + State(state): State, + Json(payload): Json, +) -> Result { + payload.validate()?; + + if payload.equipment_ids.is_empty() { + return Err(ApiErr::BadRequest( + "equipment_ids cannot be empty".to_string(), + None, + )); + } + + if let Some(unit_id) = payload.unit_id { + let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; + if unit_exists.is_none() { + return Err(ApiErr::NotFound("Unit not found".to_string(), None)); + } + } + + let updated_count = crate::service::batch_set_equipment_unit( + &state.pool, + &payload.equipment_ids, + payload.unit_id, + ) + .await?; + + Ok(Json(serde_json::json!({ + "ok_msg": "Equipment unit updated successfully", + "updated_count": updated_count + }))) +} + pub async fn delete_equipment( State(state): State, Path(equipment_id): Path, diff --git a/src/main.rs b/src/main.rs index 344558a..5aadce9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -182,10 +182,28 @@ fn build_router(state: AppState) -> Router { .put(handler::equipment::update_equipment) .delete(handler::equipment::delete_equipment), ) + .route( + "/api/equipment/batch/set-unit", + put(handler::equipment::batch_set_equipment_unit), + ) .route( "/api/equipment/{equipment_id}/points", get(handler::equipment::get_equipment_points), ) + .route( + "/api/unit", + get(handler::control::get_unit_list).post(handler::control::create_unit), + ) + .route( + "/api/unit/{unit_id}", + get(handler::control::get_unit) + .put(handler::control::update_unit) + .delete(handler::control::delete_unit), + ) + .route( + "/api/event", + get(handler::control::get_event_list), + ) .route( "/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag), diff --git a/src/model.rs b/src/model.rs index 176ff5d..f33fccc 100644 --- a/src/model.rs +++ b/src/model.rs @@ -123,6 +123,7 @@ pub struct Tag { #[derive(Debug, Serialize, Deserialize, FromRow, Clone)] pub struct Equipment { pub id: Uuid, + pub unit_id: Option, pub code: String, pub name: String, pub kind: Option, @@ -133,6 +134,38 @@ pub struct Equipment { pub updated_at: DateTime, } +#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] +pub struct ControlUnit { + pub id: Uuid, + pub code: String, + pub name: String, + pub description: Option, + pub enabled: bool, + pub run_time_sec: i32, + pub stop_time_sec: i32, + pub acc_time_sec: i32, + pub bl_time_sec: i32, + pub require_manual_ack_after_fault: bool, + #[serde(serialize_with = "utc_to_local_str")] + pub created_at: DateTime, + #[serde(serialize_with = "utc_to_local_str")] + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] +pub struct EventRecord { + pub id: Uuid, + pub event_type: String, + pub level: String, + pub unit_id: Option, + pub equipment_id: Option, + pub source_id: Option, + pub message: String, + pub payload: Option>, + #[serde(serialize_with = "utc_to_local_str")] + pub created_at: DateTime, +} + #[derive(Debug, Serialize, Deserialize, FromRow, Clone)] pub struct Page { pub id: Uuid, diff --git a/src/service.rs b/src/service.rs index 33251cd..f19bfc6 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,8 +1,10 @@ +mod control; mod equipment; mod point; mod source; mod tag; +pub use control::*; pub use equipment::*; pub use point::*; pub use source::*; diff --git a/src/service/control.rs b/src/service/control.rs new file mode 100644 index 0000000..498259b --- /dev/null +++ b/src/service/control.rs @@ -0,0 +1,303 @@ +use crate::model::{ControlUnit, EventRecord}; +use sqlx::{PgPool, QueryBuilder}; +use uuid::Uuid; + +pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result { + match keyword { + Some(keyword) => { + let like = format!("%{}%", keyword); + sqlx::query_scalar::<_, i64>( + r#" + SELECT COUNT(*) + FROM unit + WHERE code ILIKE $1 OR name ILIKE $1 + "#, + ) + .bind(like) + .fetch_one(pool) + .await + } + None => sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#) + .fetch_one(pool) + .await, + } +} + +pub async fn get_units_paginated( + pool: &PgPool, + keyword: Option<&str>, + page_size: i32, + offset: u32, +) -> Result, sqlx::Error> { + match keyword { + Some(keyword) => { + let like = format!("%{}%", keyword); + if page_size == -1 { + sqlx::query_as::<_, ControlUnit>( + r#" + SELECT * + FROM unit + WHERE code ILIKE $1 OR name ILIKE $1 + ORDER BY created_at + "#, + ) + .bind(like) + .fetch_all(pool) + .await + } else { + sqlx::query_as::<_, ControlUnit>( + r#" + SELECT * + FROM unit + WHERE code ILIKE $1 OR name ILIKE $1 + ORDER BY created_at + LIMIT $2 OFFSET $3 + "#, + ) + .bind(like) + .bind(page_size as i64) + .bind(offset as i64) + .fetch_all(pool) + .await + } + } + None => { + if page_size == -1 { + sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit ORDER BY created_at"#) + .fetch_all(pool) + .await + } else { + sqlx::query_as::<_, ControlUnit>( + r#" + SELECT * + FROM unit + ORDER BY created_at + LIMIT $1 OFFSET $2 + "#, + ) + .bind(page_size as i64) + .bind(offset as i64) + .fetch_all(pool) + .await + } + } + } +} + +pub async fn get_unit_by_id( + pool: &PgPool, + unit_id: Uuid, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE id = $1"#) + .bind(unit_id) + .fetch_optional(pool) + .await +} + +pub async fn get_unit_by_code( + pool: &PgPool, + code: &str, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE code = $1"#) + .bind(code) + .fetch_optional(pool) + .await +} + +pub struct CreateUnitParams<'a> { + pub code: &'a str, + pub name: &'a str, + pub description: Option<&'a str>, + pub enabled: bool, + pub run_time_sec: i32, + pub stop_time_sec: i32, + pub acc_time_sec: i32, + pub bl_time_sec: i32, + pub require_manual_ack_after_fault: bool, +} + +pub async fn create_unit( + pool: &PgPool, + params: CreateUnitParams<'_>, +) -> Result { + let unit_id = Uuid::new_v4(); + sqlx::query( + r#" + INSERT INTO unit ( + id, code, name, description, enabled, + run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec, + require_manual_ack_after_fault + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + "#, + ) + .bind(unit_id) + .bind(params.code) + .bind(params.name) + .bind(params.description) + .bind(params.enabled) + .bind(params.run_time_sec) + .bind(params.stop_time_sec) + .bind(params.acc_time_sec) + .bind(params.bl_time_sec) + .bind(params.require_manual_ack_after_fault) + .execute(pool) + .await?; + + Ok(unit_id) +} + +pub struct UpdateUnitParams<'a> { + pub code: Option<&'a str>, + pub name: Option<&'a str>, + pub description: Option<&'a str>, + pub enabled: Option, + pub run_time_sec: Option, + pub stop_time_sec: Option, + pub acc_time_sec: Option, + pub bl_time_sec: Option, + pub require_manual_ack_after_fault: Option, +} + +pub async fn update_unit( + pool: &PgPool, + unit_id: Uuid, + params: UpdateUnitParams<'_>, +) -> Result<(), sqlx::Error> { + let mut updates = Vec::new(); + let mut param_count = 1; + + if params.code.is_some() { + updates.push(format!("code = ${}", param_count)); + param_count += 1; + } + if params.name.is_some() { + updates.push(format!("name = ${}", param_count)); + param_count += 1; + } + if params.description.is_some() { + updates.push(format!("description = ${}", param_count)); + param_count += 1; + } + if params.enabled.is_some() { + updates.push(format!("enabled = ${}", param_count)); + param_count += 1; + } + if params.run_time_sec.is_some() { + updates.push(format!("run_time_sec = ${}", param_count)); + param_count += 1; + } + if params.stop_time_sec.is_some() { + updates.push(format!("stop_time_sec = ${}", param_count)); + param_count += 1; + } + if params.acc_time_sec.is_some() { + updates.push(format!("acc_time_sec = ${}", param_count)); + param_count += 1; + } + if params.bl_time_sec.is_some() { + updates.push(format!("bl_time_sec = ${}", param_count)); + param_count += 1; + } + if params.require_manual_ack_after_fault.is_some() { + updates.push(format!( + "require_manual_ack_after_fault = ${}", + param_count + )); + param_count += 1; + } + + updates.push("updated_at = NOW()".to_string()); + + let sql = format!( + r#"UPDATE unit SET {} WHERE id = ${}"#, + updates.join(", "), + param_count + ); + + let mut query = sqlx::query(&sql); + + if let Some(code) = params.code { + query = query.bind(code); + } + if let Some(name) = params.name { + query = query.bind(name); + } + if let Some(description) = params.description { + query = query.bind(description); + } + if let Some(enabled) = params.enabled { + query = query.bind(enabled); + } + if let Some(run_time_sec) = params.run_time_sec { + query = query.bind(run_time_sec); + } + if let Some(stop_time_sec) = params.stop_time_sec { + query = query.bind(stop_time_sec); + } + if let Some(acc_time_sec) = params.acc_time_sec { + query = query.bind(acc_time_sec); + } + if let Some(bl_time_sec) = params.bl_time_sec { + query = query.bind(bl_time_sec); + } + if let Some(require_manual_ack_after_fault) = params.require_manual_ack_after_fault { + query = query.bind(require_manual_ack_after_fault); + } + + query.bind(unit_id).execute(pool).await?; + Ok(()) +} + +pub async fn delete_unit(pool: &PgPool, unit_id: Uuid) -> Result { + let result = sqlx::query(r#"DELETE FROM unit WHERE id = $1"#) + .bind(unit_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +pub async fn get_events_count( + pool: &PgPool, + unit_id: Option, + event_type: Option<&str>, +) -> Result { + let mut qb = + QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1"); + + if let Some(unit_id) = unit_id { + qb.push(" AND unit_id = ").push_bind(unit_id); + } + if let Some(event_type) = event_type { + qb.push(" AND event_type = ").push_bind(event_type); + } + + qb.build_query_scalar().fetch_one(pool).await +} + +pub async fn get_events_paginated( + pool: &PgPool, + unit_id: Option, + event_type: Option<&str>, + page_size: i32, + offset: u32, +) -> Result, sqlx::Error> { + let mut qb = QueryBuilder::new("SELECT * FROM event WHERE 1 = 1"); + + if let Some(unit_id) = unit_id { + qb.push(" AND unit_id = ").push_bind(unit_id); + } + if let Some(event_type) = event_type { + qb.push(" AND event_type = ").push_bind(event_type); + } + + qb.push(" ORDER BY created_at DESC"); + + if page_size != -1 { + qb.push(" LIMIT ").push_bind(page_size as i64); + qb.push(" OFFSET ").push_bind(offset as i64); + } + + qb.build_query_as::().fetch_all(pool).await +} diff --git a/src/service/equipment.rs b/src/service/equipment.rs index 63b7c77..c46d694 100644 --- a/src/service/equipment.rs +++ b/src/service/equipment.rs @@ -3,6 +3,7 @@ use crate::{ model::{Equipment, Point}, }; use sqlx::{query_as, PgPool, Row}; +use uuid::Uuid; pub async fn get_points_by_equipment_id( pool: &PgPool, @@ -129,6 +130,7 @@ pub async fn get_equipment_paginated( .map(|row| EquipmentListItem { equipment: Equipment { id: row.get("id"), + unit_id: row.get("unit_id"), code: row.get("code"), name: row.get("name"), kind: row.get("kind"), @@ -163,6 +165,7 @@ pub async fn get_equipment_by_code( pub async fn create_equipment( pool: &PgPool, + unit_id: Option, code: &str, name: &str, kind: Option<&str>, @@ -171,11 +174,12 @@ pub async fn create_equipment( let equipment_id = uuid::Uuid::new_v4(); sqlx::query( r#" - INSERT INTO equipment (id, code, name, kind, description) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO equipment (id, unit_id, code, name, kind, description) + VALUES ($1, $2, $3, $4, $5, $6) "#, ) .bind(equipment_id) + .bind(unit_id) .bind(code) .bind(name) .bind(kind) @@ -189,6 +193,7 @@ pub async fn create_equipment( pub async fn update_equipment( pool: &PgPool, equipment_id: uuid::Uuid, + unit_id: Option>, code: Option<&str>, name: Option<&str>, kind: Option<&str>, @@ -197,6 +202,10 @@ pub async fn update_equipment( let mut updates = Vec::new(); let mut param_count = 1; + if unit_id.is_some() { + updates.push(format!("unit_id = ${}", param_count)); + param_count += 1; + } if code.is_some() { updates.push(format!("code = ${}", param_count)); param_count += 1; @@ -222,6 +231,9 @@ pub async fn update_equipment( ); let mut query = sqlx::query(&sql); + if let Some(unit_id) = unit_id { + query = query.bind(unit_id); + } if let Some(code) = code { query = query.bind(code); } @@ -251,3 +263,23 @@ pub async fn delete_equipment( Ok(result.rows_affected() > 0) } + +pub async fn batch_set_equipment_unit( + pool: &PgPool, + equipment_ids: &[Uuid], + unit_id: Option, +) -> Result { + let result = sqlx::query( + r#" + UPDATE equipment + SET unit_id = $1, updated_at = NOW() + WHERE id = ANY($2) + "#, + ) + .bind(unit_id) + .bind(equipment_ids) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} diff --git a/src/util/pagination.rs b/src/util/pagination.rs index 254ef30..16d884a 100644 --- a/src/util/pagination.rs +++ b/src/util/pagination.rs @@ -33,7 +33,7 @@ impl PaginatedResponse { /// 分页查询参数 #[serde_as] -#[derive(Deserialize, Validate)] +#[derive(Debug, Deserialize, Validate)] pub struct PaginationParams { #[validate(range(min = 1))] #[serde_as(as = "serde_with::DisplayFromStr")]