feat(control): add unit and event foundation

This commit is contained in:
caoqianming 2026-03-24 10:20:23 +08:00
parent 4d53ee0337
commit 4e3d325437
12 changed files with 1374 additions and 4 deletions

View File

@ -0,0 +1,561 @@
# 投煤器布料机远程监控与控制功能实现方案
## 1. 目标
基于当前 `plc_control` 项目,扩展出面向投煤器与布料机的业务化远程监控与控制能力,满足以下目标:
- 实时监控投煤器、布料机运行状态
- 支持远程手动启停
- 支持投煤器自动定时运行
- 支持投煤累计触发布料机自动运行
- 支持故障锁定、人工复位、通讯异常冻结
- 支持通过配置适配不同现场,不改代码完成项目复用
## 2. 现有系统能力盘点
当前项目已经具备较好的通用工业采集平台基础:
- OPC UA 数据源接入与自动重连
- 节点浏览、批量建点、点位实时订阅
- 点位批量写入能力
- 设备 `equipment` 模型
- 点位到设备绑定 `equipment_id`
- 点位信号角色字段 `signal_role`
- WebSocket 实时推送
- 前端设备、点位、日志、趋势图基础界面
- 页面配置 `page` 能力
- 进程内事件总线 `event.rs`
现状更接近“通用点位监控平台”,还不是“投煤器/布料机业务控制系统”。
## 3. 与需求的差距
需求文档要求的软件能力,当前尚未落地的核心部分如下:
- 缺少“控制单元”概念,无法表达一组投煤器对应一组布料机
- 缺少业务配置模型,无法配置 `RunTime`、`StopTime`、`AccTime`、`BLTime`
- 缺少设备业务类型约束,尚未明确区分投煤器、布料机
- 缺少设备归属单元字段,尚未形成 `unit -> equipment -> point` 的业务链路
- 缺少业务信号角色规范,尚未标准化 `REM/RUN/FLT/STA/STP`
- 缺少自动控制状态机
- 缺少故障锁定与人工确认恢复流程
- 缺少通讯异常冻结与恢复后重同步机制
- 缺少脉冲写入封装,当前只有通用批量写点
- 缺少单元总览、设备详情、控制面板、报警面板等业务界面
- 缺少统一事件持久化与后续报警模型
## 4. 推荐实现思路
推荐在现有平台上“增量扩展”,而不是重写:
- 保留现有 `source/node/point/equipment` 通用底座
- 新增面向业务控制的配置表和运行态管理
- 将控制逻辑放在 Rust 服务端,复用当前 OPC UA 连接和写点能力
- 前端增加业务页面,不破坏现有通用点位管理页面
- 基于现有 `event.rs` 扩展事件体系,而不是再造一套事件机制
这样做的好处是:
- 现有 OPC UA、点位、设备、WebSocket 基础都能继续复用
- 后续不同现场只需要换设备映射和参数
- 手动调试仍然可以通过现有点位/设备页面完成
- 未来做报警、审计、统计时可以直接复用统一事件体系
## 5. 命名与模型设计
### 5.1 命名原则
建议区分“代码命名”和“表命名”:
- 代码模型名保留语义完整性,如 `ControlUnit`
- 数据库表名尽量简洁,如 `unit`
不建议使用 `group` 作为表名,原因是语义过泛,后续容易与权限分组、界面分组、标签分组等概念冲突。
### 5.2 设备分类
继续复用 `equipment` 表中的 `kind` 字段,约定:
- `coal_feeder`:投煤器
- `distributor`:布料机
### 5.3 点位角色规范
继续复用 `point.signal_role`,建议统一枚举值:
- 状态点:`rem` `run` `flt` `ii` `q`
- 控制点:`start_cmd` `stop_cmd`
- 可选扩展:`estop` `mode_auto` `mode_manual` `reset_cmd`
这样每台设备都可以通过“设备 + 点位角色”完成映射,而不是在代码里写死点名。
### 5.4 新增 `unit`
建议新增 `unit` 表,对应代码模型 `ControlUnit`,表示一个业务控制单元。
建议字段:
- `id`
- `code`
- `name`
- `description`
- `enabled`
- `run_time_sec`
- `stop_time_sec`
- `acc_time_sec`
- `bl_time_sec`
- `require_manual_ack_after_fault`
- `created_at`
- `updated_at`
### 5.5 设备直接归属 Unit
当前业务前提下,一台设备只属于一个控制单元,不会跨单元复用,因此不建议单独建立关系表。
更合适的方式是直接在 `equipment` 表新增:
- `unit_id`
这样模型会更简单:
- 一个 `unit` 对多台 `equipment`
- 一台 `equipment` 只属于一个 `unit`
- `equipment.kind` 用于区分 `coal_feeder``distributor`
如果后续现场出现“一台设备可挂多个单元”或“单元内设备编排顺序复杂”的需求,再演进成关系表会更合适。第一阶段不建议设计过重。
### 5.6 Unit 运行态放内存
`unit` 运行态不建议优先落数据库,建议由控制引擎保存在内存中。
建议维护的内存运行态字段:
- `state`
- `accumulated_run_sec`
- `current_run_elapsed_sec`
- `current_stop_elapsed_sec`
- `distributor_run_elapsed_sec`
- `fault_locked`
- `comm_locked`
- `manual_ack_required`
- `last_tick_at`
状态值建议:
- `stopped`
- `running`
- `distributor_running`
- `fault_locked`
- `comm_locked`
原因:
- 这些字段变化频率高,不适合高频写库
- 服务重启后直接恢复旧控制态并不安全
- 更合理的方式是重启后重新读取 `REM/RUN/FLT/Q` 并重建运行态
- 通讯恢复或服务重启后不自动补发控制命令,更符合工业控制安全原则
如后续确实需要“断电恢复上下文”或运行分析,再补充轻量级快照能力即可,但不是第一阶段必须项。
### 5.7 统一 `event` 表,预留 `alarm`
建议不要命名为 `control_event`,而是使用统一的 `event` 表。
原因:
- 当前不仅有控制事件,后续还会有配置事件、通讯事件、数据源事件
- `event` 更适合作为统一审计与业务时间线
- 现有 `event.rs` 已经是进程内事件总线,命名保持一致更自然
建议 `event` 表记录:
- 手动启动/停止
- 自动启动/停止
- 故障锁定
- 人工解除故障锁定
- 通讯异常/恢复
- 数据源创建、更新、删除
- 控制参数变更
- 关键状态切换
关键字段建议:
- `id`
- `event_type`
- `level`
- `unit_id`
- `equipment_id`
- `source_id`
- `message`
- `payload`
- `created_at`
同时建议未来单独设计 `alarm` 表,而不是把报警状态硬塞进 `event` 表。
原因:
- 报警通常有独立生命周期:触发、确认、恢复、清除
- 报警需要独立字段,如 `severity`、`active`、`acked`、`acked_by`、`acked_at`、`cleared_at`
- 把报警硬塞到 `event` 中会让通用事件表越来越臃肿
因此推荐边界是:
- `event`:记录“发生了什么”
- `alarm`:记录“需要被告警管理的异常”
第一阶段可以先只落 `event` 表,`alarm` 表先在方案中预留,不急着实现。
## 6. 控制逻辑设计
### 6.1 手动控制
手动控制前置校验:
- `REM == 1`
- `FLT == 0`
- 通讯正常
- 未处于故障锁定
- 如有急停点,`ESTOP == 0`
控制命令统一走脉冲写入:
1. 写入 `1`
2. 延时 `200-500ms`
3. 写回 `0`
服务端需要封装 `pulse_write(point_id, high_ms)`,前端不能直接拼两次写点。
### 6.2 自动控制状态机
每个 `unit` 独立维护状态机。
#### `STOPPED`
- 累计停止时间
- 若 `stop_elapsed >= StopTime`,尝试启动投煤器
- 成功后切换到 `RUNNING`
#### `RUNNING`
- 累计运行时间
- `accumulated_run_sec += delta`
- 若 `run_elapsed >= RunTime`,尝试停止投煤器,切换回 `STOPPED`
- 若 `accumulated_run_sec >= AccTime`,进入布料机触发流程
#### `DISTRIBUTOR_RUNNING`
- 校验布料机 `REM/FLT/通讯`
- 启动布料机
- 等待 `RUN == 1` 反馈
- 计时 `BLTime`
- 停止布料机
- 累计时间清零
- 回到 `STOPPED` 或回到自动节拍起点
### 6.3 故障机制
任意设备检测到 `FLT == 1`
- 停止该单元自动控制
- 标记 `fault_locked = true`
- 禁止再次自动发命令
- 发送并持久化关键事件
`FLT``1 -> 0` 恢复:
- 不自动解锁
- 标记 `manual_ack_required = true`
- 等待人工在界面点击“解除故障锁定”
### 6.4 通讯异常机制
当质量位异常或 OPC 连接中断:
- 标记 `comm_locked = true`
- 冻结全部控制动作
- 前端按钮灰化
- 不允许任何自动/手动写入
通讯恢复后:
- 重新读取 `REM/RUN/FLT`
- 重同步运行态
- 不自动补发控制命令
- 发送并持久化恢复事件
- 等待人工操作或下一次自动触发
## 7. 事件体系设计
### 7.1 继续复用 `src/event.rs`
建议不要另起一套业务事件中心,而是在现有 [src/event.rs](D:/projects/plc_control/src/event.rs) 上扩展。
当前它已经承担两类职责:
- 控制类内部事件分发
- 遥测类高频事件分发
推荐继续保留这个结构:
- `AppEvent` 作为统一进程内事件枚举
- 高频遥测事件继续走内存和 WebSocket
- 低频且有审计价值的事件选择性落库到 `event`
### 7.2 哪些事件适合落库
适合落库的:
- `SourceCreate`
- `SourceUpdate`
- `SourceDelete`
- 自动控制启动/停止
- 手动启动/停止命令发送
- 故障锁定
- 人工确认恢复
- 通讯异常/恢复
- 参数配置变更
- 单元状态切换
不适合直接落库的:
- `PointNewValue`
- 高频实时遥测
- 细碎的内部轮询过程
### 7.3 推荐扩展方向
建议在 `AppEvent` 中逐步增加业务事件,例如:
- `AutoControlStarted`
- `AutoControlStopped`
- `EquipmentStartCommandSent`
- `EquipmentStopCommandSent`
- `FaultLocked`
- `FaultAcked`
- `CommLocked`
- `CommRecovered`
- `UnitStateChanged`
这样后续无论是写日志、落库、推送 WebSocket、做报警触发都可以基于同一个事件入口。
## 8. 后端改造方案
### 8.1 新增模块
建议新增:
- `src/handler/control.rs`
- `src/service/control.rs`
- `src/control/engine.rs`
- `src/control/runtime.rs`
- `src/control/validator.rs`
职责划分:
- `handler`HTTP 接口
- `service`:数据库读写
- `control/engine`:状态机与调度
- `control/runtime`:内存运行态缓存与同步
- `control/validator`:控制前置校验
### 8.2 新增接口
建议新增接口:
- `GET /api/control/unit`
- `POST /api/control/unit`
- `PUT /api/control/unit/{id}`
- `GET /api/control/unit/{id}`
- `POST /api/control/unit/{id}/start-auto`
- `POST /api/control/unit/{id}/stop-auto`
- `POST /api/control/unit/{id}/ack-fault`
- `POST /api/control/equipment/{id}/start`
- `POST /api/control/equipment/{id}/stop`
- `GET /api/events`
说明:
- 设备手动控制必须走业务接口,不建议继续直接暴露给页面做原始点位写入
- 原 `/api/point/value/batch` 保留给调试或底层工具能力
- 事件查询接口可以直接面向统一 `event`
### 8.3 控制引擎运行方式
建议服务启动后增加一个后台任务:
- 每 `500ms``1s` 扫描所有启用的 `unit`
- 从内存读取运行态缓存
- 从当前点位监控数据中取 `REM/RUN/FLT/Q`
- 驱动状态机执行
控制引擎不要直接查 OPC应复用当前 `connection_manager` 已维护的实时点值。
### 8.4 关键复用点
可直接复用当前已有能力:
- `connection_manager` 的点位实时缓存
- `get_point_monitor_data_read_guard`
- 批量写点能力
- WebSocket 实时推送
- `event.rs` 的统一事件入口
- `unit_id + equipment.kind + point.signal_role` 的业务映射关系
## 9. 前端改造方案
建议在现有通用页面之外新增业务页面,避免混杂。
### 9.1 新增页面
- 单元总览页
- 单元详情页
- 设备控制面板
- 事件记录页
- 报警页
- 参数配置页
### 9.2 单元总览页内容
每个 `unit` 展示:
- 单元名称
- 自动/手动状态
- 当前状态机状态
- 投煤器运行状态
- 布料机运行状态
- 当前累计运行时间
- 故障锁定状态
- 通讯状态
并提供按钮:
- 启动自动
- 停止自动
- 故障确认/解除锁定
### 9.3 设备控制页内容
针对单台投煤器/布料机提供:
- REM/RUN/FLT/Q/II 实时显示
- 启动按钮
- 停止按钮
- 通讯异常、故障锁定提示
- 最近事件
### 9.4 趋势、事件与报警
复用已有趋势图能力:
- 电流 `II` 趋势
- 运行状态变化曲线
- 事件时间线
后续报警页面基于独立 `alarm` 表实现:
- 当前活动报警
- 已确认报警
- 已恢复报警
- 报警确认操作
## 10. 分阶段实施建议
### 第一阶段:最小可用版
目标:先让系统具备业务闭环,但不追求复杂页面。
内容:
- 新增 `unit`
- 为 `equipment` 增加 `unit_id`
- 约定设备 `kind` 和点位 `signal_role`
- 新增手动控制接口
- 实现脉冲写入
- 实现故障锁定与通讯冻结
- 实现自动控制状态机
- 基于 `event.rs` 落统一 `event`
- 前端增加一个“控制单元”面板和事件列表
交付后即可验证:
- 单台投煤器启停
- 单元级自动启停
- 累计触发布料机运行
- 故障恢复后人工确认
- 关键操作和状态切换可追溯
### 第二阶段:增强版
内容:
- 单元总览页
- 单元详情页
- 参数在线编辑
- 更丰富的趋势图
- WebSocket 业务事件推送
- 报警规则与 `alarm`
- 报警确认与恢复流程
### 第三阶段:现场适配版
内容:
- 导入导出配置
- 项目模板
- 配置校验工具
- 启停联锁自检
- 操作权限控制
## 11. 建议优先落地顺序
从当前代码基础出发,建议按下面顺序开发:
1. 补齐业务数据模型和数据库迁移
2. 新增 `unit` 表并为 `equipment` 增加 `unit_id`
3. 规范 `equipment.kind``point.signal_role`
4. 实现服务端脉冲写入能力
5. 实现手动控制接口
6. 实现 `unit` 自动控制状态机
7. 扩展 `event.rs` 并实现统一 `event` 表持久化
8. 实现故障锁定、通讯冻结、人工确认
9. 增加前端业务页面
10. 第二阶段再引入独立 `alarm`
## 12. 对当前代码的具体落点
基于现有代码,建议主要改动点如下:
- [src/model.rs](D:/projects/plc_control/src/model.rs)
- 增加 `ControlUnit` 模型
- 为 `Equipment` 增加 `unit_id`
- 后续增加 `EventRecord`、`AlarmRecord` 模型
- [src/event.rs](D:/projects/plc_control/src/event.rs)
- 扩展 `AppEvent` 业务事件类型
- 增加低频关键事件的持久化逻辑
- [src/main.rs](D:/projects/plc_control/src/main.rs)
- 注册控制相关路由
- 启动控制引擎后台任务
- [src/handler/point.rs](D:/projects/plc_control/src/handler/point.rs)
- 保留底层写点接口,不直接承载业务控制
- [src/handler/equipment.rs](D:/projects/plc_control/src/handler/equipment.rs)
- 继续作为设备基础资料管理
- [web/index.html](D:/projects/plc_control/web/index.html)
- 增加业务控制页面入口或独立面板
- [web/js/app.js](D:/projects/plc_control/web/js/app.js)
- 增加控制单元页面事件绑定
## 13. 本次结论
当前项目不需要推倒重来,可以直接演进成投煤器与布料机远程监控控制系统。
最合理的路径是:
- 以现有 OPC UA 与点位平台为底座
- 数据库使用简洁表名 `unit`、`event`
- 代码层保留语义化命名,如 `ControlUnit`、`AppEvent`
- 在 `equipment` 上直接增加 `unit_id`
- 在服务端以内存运行态实现状态机和脉冲控制
- 在现有 [src/event.rs](D:/projects/plc_control/src/event.rs) 上扩展统一事件体系
- 第一阶段先做统一 `event`,第二阶段再拆分独立 `alarm`
如果进入下一步开发,建议先做“第一阶段最小可用版”。

View File

@ -0,0 +1,38 @@
CREATE TABLE unit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
run_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (run_time_sec >= 0),
stop_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (stop_time_sec >= 0),
acc_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (acc_time_sec >= 0),
bl_time_sec INTEGER NOT NULL DEFAULT 0 CHECK (bl_time_sec >= 0),
require_manual_ack_after_fault BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (code)
);
ALTER TABLE equipment
ADD COLUMN unit_id UUID REFERENCES unit(id) ON DELETE SET NULL;
CREATE INDEX idx_equipment_unit_id ON equipment(unit_id);
CREATE TABLE event (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
level TEXT NOT NULL DEFAULT 'info',
unit_id UUID REFERENCES unit(id) ON DELETE SET NULL,
equipment_id UUID REFERENCES equipment(id) ON DELETE SET NULL,
source_id UUID REFERENCES source(id) ON DELETE SET NULL,
message TEXT NOT NULL,
payload JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_event_created_at ON event(created_at DESC);
CREATE INDEX idx_event_type ON event(event_type);
CREATE INDEX idx_event_unit_id ON event(unit_id);
CREATE INDEX idx_event_equipment_id ON event(equipment_id);
CREATE INDEX idx_event_source_id ON event(source_id);

View File

@ -124,6 +124,8 @@ async fn handle_control_event(
pool: &sqlx::PgPool,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,
) {
persist_event_if_needed(&event, pool).await;
match event {
AppEvent::SourceCreate { source_id } => {
tracing::info!("Processing SourceCreate event for {}", source_id);
@ -186,6 +188,80 @@ async fn handle_control_event(
}
}
async fn persist_event_if_needed(event: &AppEvent, pool: &sqlx::PgPool) {
let record = match event {
AppEvent::SourceCreate { source_id } => Some((
"source.created",
"info",
None,
None,
Some(*source_id),
format!("Source {} created", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::SourceUpdate { source_id } => Some((
"source.updated",
"info",
None,
None,
Some(*source_id),
format!("Source {} updated", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::SourceDelete { source_id } => Some((
"source.deleted",
"warn",
None,
None,
Some(*source_id),
format!("Source {} deleted", source_id),
serde_json::json!({ "source_id": source_id }),
)),
AppEvent::PointCreateBatch { source_id, point_ids } => Some((
"point.batch_created",
"info",
None,
None,
Some(*source_id),
format!("{} points created for source {}", point_ids.len(), source_id),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)),
AppEvent::PointDeleteBatch { source_id, point_ids } => Some((
"point.batch_deleted",
"warn",
None,
None,
Some(*source_id),
format!("{} points deleted for source {}", point_ids.len(), source_id),
serde_json::json!({ "source_id": source_id, "point_ids": point_ids }),
)),
AppEvent::PointNewValue(_) => None,
};
let Some((event_type, level, unit_id, equipment_id, source_id, message, payload)) = record else {
return;
};
if let Err(err) = sqlx::query(
r#"
INSERT INTO event (event_type, level, unit_id, equipment_id, source_id, message, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
)
.bind(event_type)
.bind(level)
.bind(unit_id as Option<Uuid>)
.bind(equipment_id as Option<Uuid>)
.bind(source_id)
.bind(message)
.bind(sqlx::types::Json(payload))
.execute(pool)
.await
{
tracing::warn!("Failed to persist event: {}", err);
}
}
async fn process_point_new_value(
payload: crate::telemetry::PointNewValue,
connection_manager: &std::sync::Arc<crate::connection::ConnectionManager>,

View File

@ -1,3 +1,4 @@
pub mod control;
pub mod doc;
pub mod equipment;
pub mod log;

248
src/handler/control.rs Normal file
View File

@ -0,0 +1,248 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use serde::Deserialize;
use uuid::Uuid;
use validator::Validate;
use crate::{
util::{
pagination::{PaginatedResponse, PaginationParams},
response::ApiErr,
},
AppState,
};
#[derive(Debug, Deserialize, Validate)]
pub struct GetUnitListQuery {
#[validate(length(min = 1, max = 100))]
pub keyword: Option<String>,
#[serde(flatten)]
pub pagination: PaginationParams,
}
pub async fn get_unit_list(
State(state): State<AppState>,
Query(query): Query<GetUnitListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_units_count(&state.pool, query.keyword.as_deref()).await?;
let data = crate::service::get_units_paginated(
&state.pool,
query.keyword.as_deref(),
query.pagination.page_size,
query.pagination.offset(),
)
.await?;
Ok(Json(PaginatedResponse::new(
data,
total,
query.pagination.page,
query.pagination.page_size,
)))
}
pub async fn get_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
match crate::service::get_unit_by_id(&state.pool, unit_id).await? {
Some(unit) => Ok(Json(unit)),
None => Err(ApiErr::NotFound("Unit not found".to_string(), None)),
}
}
#[derive(Debug, Deserialize, Validate)]
pub struct CreateUnitReq {
#[validate(length(min = 1, max = 100))]
pub code: String,
#[validate(length(min = 1, max = 100))]
pub name: String,
pub description: Option<String>,
pub enabled: Option<bool>,
#[validate(range(min = 0))]
pub run_time_sec: Option<i32>,
#[validate(range(min = 0))]
pub stop_time_sec: Option<i32>,
#[validate(range(min = 0))]
pub acc_time_sec: Option<i32>,
#[validate(range(min = 0))]
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
pub async fn create_unit(
State(state): State<AppState>,
Json(payload): Json<CreateUnitReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if crate::service::get_unit_by_code(&state.pool, &payload.code)
.await?
.is_some()
{
return Err(ApiErr::BadRequest(
"Unit code already exists".to_string(),
None,
));
}
let unit_id = crate::service::create_unit(
&state.pool,
crate::service::CreateUnitParams {
code: &payload.code,
name: &payload.name,
description: payload.description.as_deref(),
enabled: payload.enabled.unwrap_or(true),
run_time_sec: payload.run_time_sec.unwrap_or(0),
stop_time_sec: payload.stop_time_sec.unwrap_or(0),
acc_time_sec: payload.acc_time_sec.unwrap_or(0),
bl_time_sec: payload.bl_time_sec.unwrap_or(0),
require_manual_ack_after_fault: payload
.require_manual_ack_after_fault
.unwrap_or(true),
},
)
.await?;
Ok((
StatusCode::CREATED,
Json(serde_json::json!({
"id": unit_id,
"ok_msg": "Unit created successfully"
})),
))
}
#[derive(Debug, Deserialize, Validate)]
pub struct UpdateUnitReq {
#[validate(length(min = 1, max = 100))]
pub code: Option<String>,
#[validate(length(min = 1, max = 100))]
pub name: Option<String>,
pub description: Option<String>,
pub enabled: Option<bool>,
#[validate(range(min = 0))]
pub run_time_sec: Option<i32>,
#[validate(range(min = 0))]
pub stop_time_sec: Option<i32>,
#[validate(range(min = 0))]
pub acc_time_sec: Option<i32>,
#[validate(range(min = 0))]
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
pub async fn update_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
Json(payload): Json<UpdateUnitReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if crate::service::get_unit_by_id(&state.pool, unit_id)
.await?
.is_none()
{
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_unit_by_code(&state.pool, code).await?;
if duplicate.as_ref().is_some_and(|item| item.id != unit_id) {
return Err(ApiErr::BadRequest(
"Unit code already exists".to_string(),
None,
));
}
}
if payload.code.is_none()
&& payload.name.is_none()
&& payload.description.is_none()
&& payload.enabled.is_none()
&& payload.run_time_sec.is_none()
&& payload.stop_time_sec.is_none()
&& payload.acc_time_sec.is_none()
&& payload.bl_time_sec.is_none()
&& payload.require_manual_ack_after_fault.is_none()
{
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
}
crate::service::update_unit(
&state.pool,
unit_id,
crate::service::UpdateUnitParams {
code: payload.code.as_deref(),
name: payload.name.as_deref(),
description: payload.description.as_deref(),
enabled: payload.enabled,
run_time_sec: payload.run_time_sec,
stop_time_sec: payload.stop_time_sec,
acc_time_sec: payload.acc_time_sec,
bl_time_sec: payload.bl_time_sec,
require_manual_ack_after_fault: payload.require_manual_ack_after_fault,
},
)
.await?;
Ok(Json(serde_json::json!({
"ok_msg": "Unit updated successfully"
})))
}
pub async fn delete_unit(
State(state): State<AppState>,
Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> {
let deleted = crate::service::delete_unit(&state.pool, unit_id).await?;
if !deleted {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug, Deserialize, Validate)]
pub struct GetEventListQuery {
pub unit_id: Option<Uuid>,
#[validate(length(min = 1, max = 100))]
pub event_type: Option<String>,
#[serde(flatten)]
pub pagination: PaginationParams,
}
pub async fn get_event_list(
State(state): State<AppState>,
Query(query): Query<GetEventListQuery>,
) -> Result<impl IntoResponse, ApiErr> {
query.validate()?;
let total = crate::service::get_events_count(
&state.pool,
query.unit_id,
query.event_type.as_deref(),
)
.await?;
let data = crate::service::get_events_paginated(
&state.pool,
query.unit_id,
query.event_type.as_deref(),
query.pagination.page_size,
query.pagination.offset(),
)
.await?;
Ok(Json(PaginatedResponse::new(
data,
total,
query.pagination.page,
query.pagination.page_size,
)))
}

View File

@ -79,6 +79,7 @@ pub async fn get_equipment_points(
#[derive(Debug, Deserialize, Validate)]
pub struct CreateEquipmentReq {
pub unit_id: Option<Uuid>,
#[validate(length(min = 1, max = 100))]
pub code: String,
#[validate(length(min = 1, max = 100))]
@ -89,6 +90,7 @@ pub struct CreateEquipmentReq {
#[derive(Debug, Deserialize, Validate)]
pub struct UpdateEquipmentReq {
pub unit_id: Option<Option<Uuid>>,
#[validate(length(min = 1, max = 100))]
pub code: Option<String>,
#[validate(length(min = 1, max = 100))]
@ -97,6 +99,12 @@ pub struct UpdateEquipmentReq {
pub description: Option<String>,
}
#[derive(Debug, Deserialize, Validate)]
pub struct BatchSetEquipmentUnitReq {
pub equipment_ids: Vec<Uuid>,
pub unit_id: Option<Uuid>,
}
pub async fn create_equipment(
State(state): State<AppState>,
Json(payload): Json<CreateEquipmentReq>,
@ -111,8 +119,16 @@ pub async fn create_equipment(
));
}
if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
let equipment_id = crate::service::create_equipment(
&state.pool,
payload.unit_id,
&payload.code,
&payload.name,
payload.kind.as_deref(),
@ -136,7 +152,8 @@ pub async fn update_equipment(
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.code.is_none()
if payload.unit_id.is_none()
&& payload.code.is_none()
&& payload.name.is_none()
&& payload.kind.is_none()
&& payload.description.is_none()
@ -149,6 +166,13 @@ pub async fn update_equipment(
return Err(ApiErr::NotFound("Equipment not found".to_string(), None));
}
if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
if let Some(code) = payload.code.as_deref() {
let duplicate = crate::service::get_equipment_by_code(&state.pool, code).await?;
if duplicate
@ -165,6 +189,7 @@ pub async fn update_equipment(
crate::service::update_equipment(
&state.pool,
equipment_id,
payload.unit_id,
payload.code.as_deref(),
payload.name.as_deref(),
payload.kind.as_deref(),
@ -177,6 +202,39 @@ pub async fn update_equipment(
})))
}
pub async fn batch_set_equipment_unit(
State(state): State<AppState>,
Json(payload): Json<BatchSetEquipmentUnitReq>,
) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?;
if payload.equipment_ids.is_empty() {
return Err(ApiErr::BadRequest(
"equipment_ids cannot be empty".to_string(),
None,
));
}
if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?;
if unit_exists.is_none() {
return Err(ApiErr::NotFound("Unit not found".to_string(), None));
}
}
let updated_count = crate::service::batch_set_equipment_unit(
&state.pool,
&payload.equipment_ids,
payload.unit_id,
)
.await?;
Ok(Json(serde_json::json!({
"ok_msg": "Equipment unit updated successfully",
"updated_count": updated_count
})))
}
pub async fn delete_equipment(
State(state): State<AppState>,
Path(equipment_id): Path<Uuid>,

View File

@ -182,10 +182,28 @@ fn build_router(state: AppState) -> Router {
.put(handler::equipment::update_equipment)
.delete(handler::equipment::delete_equipment),
)
.route(
"/api/equipment/batch/set-unit",
put(handler::equipment::batch_set_equipment_unit),
)
.route(
"/api/equipment/{equipment_id}/points",
get(handler::equipment::get_equipment_points),
)
.route(
"/api/unit",
get(handler::control::get_unit_list).post(handler::control::create_unit),
)
.route(
"/api/unit/{unit_id}",
get(handler::control::get_unit)
.put(handler::control::update_unit)
.delete(handler::control::delete_unit),
)
.route(
"/api/event",
get(handler::control::get_event_list),
)
.route(
"/api/tag",
get(handler::tag::get_tag_list).post(handler::tag::create_tag),

View File

@ -123,6 +123,7 @@ pub struct Tag {
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct Equipment {
pub id: Uuid,
pub unit_id: Option<Uuid>,
pub code: String,
pub name: String,
pub kind: Option<String>,
@ -133,6 +134,38 @@ pub struct Equipment {
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct ControlUnit {
pub id: Uuid,
pub code: String,
pub name: String,
pub description: Option<String>,
pub enabled: bool,
pub run_time_sec: i32,
pub stop_time_sec: i32,
pub acc_time_sec: i32,
pub bl_time_sec: i32,
pub require_manual_ack_after_fault: bool,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct EventRecord {
pub id: Uuid,
pub event_type: String,
pub level: String,
pub unit_id: Option<Uuid>,
pub equipment_id: Option<Uuid>,
pub source_id: Option<Uuid>,
pub message: String,
pub payload: Option<Json<serde_json::Value>>,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct Page {
pub id: Uuid,

View File

@ -1,8 +1,10 @@
mod control;
mod equipment;
mod point;
mod source;
mod tag;
pub use control::*;
pub use equipment::*;
pub use point::*;
pub use source::*;

303
src/service/control.rs Normal file
View File

@ -0,0 +1,303 @@
use crate::model::{ControlUnit, EventRecord};
use sqlx::{PgPool, QueryBuilder};
use uuid::Uuid;
pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result<i64, sqlx::Error> {
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*)
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
"#,
)
.bind(like)
.fetch_one(pool)
.await
}
None => sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#)
.fetch_one(pool)
.await,
}
}
pub async fn get_units_paginated(
pool: &PgPool,
keyword: Option<&str>,
page_size: i32,
offset: u32,
) -> Result<Vec<ControlUnit>, sqlx::Error> {
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
if page_size == -1 {
sqlx::query_as::<_, ControlUnit>(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY created_at
"#,
)
.bind(like)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, ControlUnit>(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY created_at
LIMIT $2 OFFSET $3
"#,
)
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
None => {
if page_size == -1 {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit ORDER BY created_at"#)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, ControlUnit>(
r#"
SELECT *
FROM unit
ORDER BY created_at
LIMIT $1 OFFSET $2
"#,
)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
}
}
pub async fn get_unit_by_id(
pool: &PgPool,
unit_id: Uuid,
) -> Result<Option<ControlUnit>, sqlx::Error> {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE id = $1"#)
.bind(unit_id)
.fetch_optional(pool)
.await
}
pub async fn get_unit_by_code(
pool: &PgPool,
code: &str,
) -> Result<Option<ControlUnit>, sqlx::Error> {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE code = $1"#)
.bind(code)
.fetch_optional(pool)
.await
}
pub struct CreateUnitParams<'a> {
pub code: &'a str,
pub name: &'a str,
pub description: Option<&'a str>,
pub enabled: bool,
pub run_time_sec: i32,
pub stop_time_sec: i32,
pub acc_time_sec: i32,
pub bl_time_sec: i32,
pub require_manual_ack_after_fault: bool,
}
pub async fn create_unit(
pool: &PgPool,
params: CreateUnitParams<'_>,
) -> Result<Uuid, sqlx::Error> {
let unit_id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO unit (
id, code, name, description, enabled,
run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec,
require_manual_ack_after_fault
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(unit_id)
.bind(params.code)
.bind(params.name)
.bind(params.description)
.bind(params.enabled)
.bind(params.run_time_sec)
.bind(params.stop_time_sec)
.bind(params.acc_time_sec)
.bind(params.bl_time_sec)
.bind(params.require_manual_ack_after_fault)
.execute(pool)
.await?;
Ok(unit_id)
}
pub struct UpdateUnitParams<'a> {
pub code: Option<&'a str>,
pub name: Option<&'a str>,
pub description: Option<&'a str>,
pub enabled: Option<bool>,
pub run_time_sec: Option<i32>,
pub stop_time_sec: Option<i32>,
pub acc_time_sec: Option<i32>,
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
pub async fn update_unit(
pool: &PgPool,
unit_id: Uuid,
params: UpdateUnitParams<'_>,
) -> Result<(), sqlx::Error> {
let mut updates = Vec::new();
let mut param_count = 1;
if params.code.is_some() {
updates.push(format!("code = ${}", param_count));
param_count += 1;
}
if params.name.is_some() {
updates.push(format!("name = ${}", param_count));
param_count += 1;
}
if params.description.is_some() {
updates.push(format!("description = ${}", param_count));
param_count += 1;
}
if params.enabled.is_some() {
updates.push(format!("enabled = ${}", param_count));
param_count += 1;
}
if params.run_time_sec.is_some() {
updates.push(format!("run_time_sec = ${}", param_count));
param_count += 1;
}
if params.stop_time_sec.is_some() {
updates.push(format!("stop_time_sec = ${}", param_count));
param_count += 1;
}
if params.acc_time_sec.is_some() {
updates.push(format!("acc_time_sec = ${}", param_count));
param_count += 1;
}
if params.bl_time_sec.is_some() {
updates.push(format!("bl_time_sec = ${}", param_count));
param_count += 1;
}
if params.require_manual_ack_after_fault.is_some() {
updates.push(format!(
"require_manual_ack_after_fault = ${}",
param_count
));
param_count += 1;
}
updates.push("updated_at = NOW()".to_string());
let sql = format!(
r#"UPDATE unit SET {} WHERE id = ${}"#,
updates.join(", "),
param_count
);
let mut query = sqlx::query(&sql);
if let Some(code) = params.code {
query = query.bind(code);
}
if let Some(name) = params.name {
query = query.bind(name);
}
if let Some(description) = params.description {
query = query.bind(description);
}
if let Some(enabled) = params.enabled {
query = query.bind(enabled);
}
if let Some(run_time_sec) = params.run_time_sec {
query = query.bind(run_time_sec);
}
if let Some(stop_time_sec) = params.stop_time_sec {
query = query.bind(stop_time_sec);
}
if let Some(acc_time_sec) = params.acc_time_sec {
query = query.bind(acc_time_sec);
}
if let Some(bl_time_sec) = params.bl_time_sec {
query = query.bind(bl_time_sec);
}
if let Some(require_manual_ack_after_fault) = params.require_manual_ack_after_fault {
query = query.bind(require_manual_ack_after_fault);
}
query.bind(unit_id).execute(pool).await?;
Ok(())
}
pub async fn delete_unit(pool: &PgPool, unit_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(r#"DELETE FROM unit WHERE id = $1"#)
.bind(unit_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn get_events_count(
pool: &PgPool,
unit_id: Option<Uuid>,
event_type: Option<&str>,
) -> Result<i64, sqlx::Error> {
let mut qb =
QueryBuilder::new("SELECT COUNT(*)::BIGINT FROM event WHERE 1 = 1");
if let Some(unit_id) = unit_id {
qb.push(" AND unit_id = ").push_bind(unit_id);
}
if let Some(event_type) = event_type {
qb.push(" AND event_type = ").push_bind(event_type);
}
qb.build_query_scalar().fetch_one(pool).await
}
pub async fn get_events_paginated(
pool: &PgPool,
unit_id: Option<Uuid>,
event_type: Option<&str>,
page_size: i32,
offset: u32,
) -> Result<Vec<EventRecord>, sqlx::Error> {
let mut qb = QueryBuilder::new("SELECT * FROM event WHERE 1 = 1");
if let Some(unit_id) = unit_id {
qb.push(" AND unit_id = ").push_bind(unit_id);
}
if let Some(event_type) = event_type {
qb.push(" AND event_type = ").push_bind(event_type);
}
qb.push(" ORDER BY created_at DESC");
if page_size != -1 {
qb.push(" LIMIT ").push_bind(page_size as i64);
qb.push(" OFFSET ").push_bind(offset as i64);
}
qb.build_query_as::<EventRecord>().fetch_all(pool).await
}

View File

@ -3,6 +3,7 @@ use crate::{
model::{Equipment, Point},
};
use sqlx::{query_as, PgPool, Row};
use uuid::Uuid;
pub async fn get_points_by_equipment_id(
pool: &PgPool,
@ -129,6 +130,7 @@ pub async fn get_equipment_paginated(
.map(|row| EquipmentListItem {
equipment: Equipment {
id: row.get("id"),
unit_id: row.get("unit_id"),
code: row.get("code"),
name: row.get("name"),
kind: row.get("kind"),
@ -163,6 +165,7 @@ pub async fn get_equipment_by_code(
pub async fn create_equipment(
pool: &PgPool,
unit_id: Option<uuid::Uuid>,
code: &str,
name: &str,
kind: Option<&str>,
@ -171,11 +174,12 @@ pub async fn create_equipment(
let equipment_id = uuid::Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO equipment (id, code, name, kind, description)
VALUES ($1, $2, $3, $4, $5)
INSERT INTO equipment (id, unit_id, code, name, kind, description)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
)
.bind(equipment_id)
.bind(unit_id)
.bind(code)
.bind(name)
.bind(kind)
@ -189,6 +193,7 @@ pub async fn create_equipment(
pub async fn update_equipment(
pool: &PgPool,
equipment_id: uuid::Uuid,
unit_id: Option<Option<uuid::Uuid>>,
code: Option<&str>,
name: Option<&str>,
kind: Option<&str>,
@ -197,6 +202,10 @@ pub async fn update_equipment(
let mut updates = Vec::new();
let mut param_count = 1;
if unit_id.is_some() {
updates.push(format!("unit_id = ${}", param_count));
param_count += 1;
}
if code.is_some() {
updates.push(format!("code = ${}", param_count));
param_count += 1;
@ -222,6 +231,9 @@ pub async fn update_equipment(
);
let mut query = sqlx::query(&sql);
if let Some(unit_id) = unit_id {
query = query.bind(unit_id);
}
if let Some(code) = code {
query = query.bind(code);
}
@ -251,3 +263,23 @@ pub async fn delete_equipment(
Ok(result.rows_affected() > 0)
}
pub async fn batch_set_equipment_unit(
pool: &PgPool,
equipment_ids: &[Uuid],
unit_id: Option<Uuid>,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE equipment
SET unit_id = $1, updated_at = NOW()
WHERE id = ANY($2)
"#,
)
.bind(unit_id)
.bind(equipment_ids)
.execute(pool)
.await?;
Ok(result.rows_affected())
}

View File

@ -33,7 +33,7 @@ impl<T> PaginatedResponse<T> {
/// 分页查询参数
#[serde_as]
#[derive(Deserialize, Validate)]
#[derive(Debug, Deserialize, Validate)]
pub struct PaginationParams {
#[validate(range(min = 1))]
#[serde_as(as = "serde_with::DisplayFromStr")]