Move feeder unit model out of platform core

Lifts ControlUnit, UnitRuntime/State, ControlRuntimeStore and unit CRUD
into app_feeder_distributor; their feeder-specific semantics
(DistributorRunning state, run_time/stop_time/acc_time/bl_time pacing)
violated the shared-core invariant.

WsMessage drops the UnitRuntimeChanged variant in favor of a generic
AppEvent(AppWsEvent) carrying {app, event_type, data}. Feeder emits
"feeder"/"unit_runtime_changed"; the web client dispatches by app
namespace, leaving core free of business types. The equipment handler
keeps its friendly unit-exists check by issuing an inline EXISTS query
instead of pulling the unit service.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
caoqianming 2026-05-18 21:38:52 +08:00
parent 3467f203ca
commit 19ace9c2be
16 changed files with 406 additions and 356 deletions

View File

@ -11,12 +11,14 @@ use crate::{
runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState}, runtime::{ControlRuntimeStore, UnitRuntime, UnitRuntimeState},
}, },
event::AppEvent, event::AppEvent,
model::ControlUnit,
service,
AppState, AppState,
}; };
use plc_platform_core::{ use plc_platform_core::{
service::EquipmentRolePoint, service::EquipmentRolePoint,
telemetry::{PointMonitorInfo, PointQuality}, telemetry::{PointMonitorInfo, PointQuality},
websocket::WsMessage, websocket::{AppWsEvent, WsMessage},
}; };
/// Start the engine: a supervisor spawns one async task per enabled unit. /// Start the engine: a supervisor spawns one async task per enabled unit.
@ -35,7 +37,7 @@ async fn supervise(state: AppState, store: Arc<ControlRuntimeStore>) {
loop { loop {
interval.tick().await; interval.tick().await;
match plc_platform_core::service::get_all_enabled_units(&state.platform.pool).await { match service::get_all_enabled_units(&state.platform.pool).await {
Ok(units) => { Ok(units) => {
for unit in units { for unit in units {
let needs_spawn = tasks.get(&unit.id).is_none_or(|h| h.is_finished()); let needs_spawn = tasks.get(&unit.id).is_none_or(|h| h.is_finished());
@ -66,7 +68,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
loop { loop {
// Reload unit config on each iteration to detect disable/delete. // Reload unit config on each iteration to detect disable/delete.
let unit = let unit =
match plc_platform_core::service::get_unit_by_id(&state.platform.pool, unit_id).await { match service::get_unit_by_id(&state.platform.pool, unit_id).await {
Ok(Some(u)) if u.enabled => u, Ok(Some(u)) if u.enabled => u,
Ok(_) => { Ok(_) => {
tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id); tracing::info!("Engine: unit {} disabled or deleted, task exiting", unit_id);
@ -164,7 +166,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
} else { } else {
i32::MAX i32::MAX
}; };
let unit_for_wait = plc_platform_core::model::ControlUnit { let unit_for_wait = ControlUnit {
run_time_sec: secs, run_time_sec: secs,
..unit.clone() ..unit.clone()
}; };
@ -320,7 +322,7 @@ async fn unit_task(state: AppState, store: Arc<ControlRuntimeStore>, unit_id: Uu
async fn wait_phase( async fn wait_phase(
state: &AppState, state: &AppState,
store: &ControlRuntimeStore, store: &ControlRuntimeStore,
unit: &plc_platform_core::model::ControlUnit, unit: &ControlUnit,
all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)], all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)],
notify: &Arc<Notify>, notify: &Arc<Notify>,
fault_tick: &mut tokio::time::Interval, fault_tick: &mut tokio::time::Interval,
@ -361,12 +363,19 @@ async fn wait_phase(
} }
async fn push_ws(state: &AppState, runtime: &UnitRuntime) { async fn push_ws(state: &AppState, runtime: &UnitRuntime) {
if let Err(e) = state let payload = match serde_json::to_value(runtime) {
.platform Ok(value) => value,
.ws_manager Err(err) => {
.send_to_public(WsMessage::UnitRuntimeChanged(runtime.clone())) tracing::warn!("Engine: failed to serialize runtime for WS push: {}", err);
.await return;
{ }
};
let message = WsMessage::AppEvent(AppWsEvent {
app: "feeder".to_string(),
event_type: "unit_runtime_changed".to_string(),
data: payload,
});
if let Err(e) = state.platform.ws_manager.send_to_public(message).await {
tracing::debug!("Engine: WS push skipped (no subscribers): {}", e); tracing::debug!("Engine: WS push skipped (no subscribers): {}", e);
} }
} }
@ -376,7 +385,7 @@ async fn push_ws(state: &AppState, runtime: &UnitRuntime) {
async fn check_fault_comm( async fn check_fault_comm(
state: &AppState, state: &AppState,
runtime: &mut UnitRuntime, runtime: &mut UnitRuntime,
unit: &plc_platform_core::model::ControlUnit, unit: &ControlUnit,
all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)], all_roles: &[(Uuid, HashMap<String, EquipmentRolePoint>)],
) -> bool { ) -> bool {
let monitor = state let monitor = state

View File

@ -1,6 +1,7 @@
pub use plc_platform_core::control::{command, runtime}; pub use plc_platform_core::control::command;
pub mod engine; pub mod engine;
pub mod runtime;
pub mod simulate; pub mod simulate;
pub mod validator; pub mod validator;

View File

@ -8,7 +8,7 @@ use plc_platform_core::{
websocket::WsMessage, websocket::WsMessage,
}; };
use crate::AppState; use crate::{service as feeder_service, AppState};
/// Whether SIMULATE_PLC mode is enabled via environment variable. /// Whether SIMULATE_PLC mode is enabled via environment variable.
pub fn enabled() -> bool { pub fn enabled() -> bool {
@ -35,7 +35,7 @@ async fn run(state: AppState) {
tokio::time::sleep(Duration::from_secs(wait_secs)).await; tokio::time::sleep(Duration::from_secs(wait_secs)).await;
// Pick a random enabled unit. // Pick a random enabled unit.
let units = match service::get_all_enabled_units(&state.platform.pool).await { let units = match feeder_service::get_all_enabled_units(&state.platform.pool).await {
Ok(u) if !u.is_empty() => u, Ok(u) if !u.is_empty() => u,
_ => continue, _ => continue,
}; };

View File

@ -15,11 +15,13 @@ use crate::{
control::runtime::{UnitRuntime, UnitRuntimeState}, control::runtime::{UnitRuntime, UnitRuntimeState},
control::validator::{validate_manual_control, ControlAction}, control::validator::{validate_manual_control, ControlAction},
event::AppEvent, event::AppEvent,
model::ControlUnit,
service as feeder_service,
AppState, AppState,
}; };
use plc_platform_core::{ use plc_platform_core::{
handler::equipment::SignalRolePoint, handler::equipment::SignalRolePoint,
model::{ControlUnit, Equipment, Point}, model::{Equipment, Point},
service, service,
telemetry::PointMonitorInfo, telemetry::PointMonitorInfo,
util::{ util::{
@ -76,9 +78,9 @@ pub async fn get_unit_list(
query.validate()?; query.validate()?;
let total = let total =
service::get_units_count(&state.platform.pool, query.keyword.as_deref()) feeder_service::get_units_count(&state.platform.pool, query.keyword.as_deref())
.await?; .await?;
let units = service::get_units_paginated( let units = feeder_service::get_units_paginated(
&state.platform.pool, &state.platform.pool,
query.keyword.as_deref(), query.keyword.as_deref(),
query.pagination.page_size, query.pagination.page_size,
@ -223,7 +225,7 @@ pub async fn get_unit(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let unit = service::get_unit_by_id(&state.platform.pool, unit_id) let unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
let runtime = state.control_runtime.get(unit_id).await; let runtime = state.control_runtime.get(unit_id).await;
@ -298,7 +300,7 @@ pub async fn get_unit_detail(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let unit = service::get_unit_by_id(&state.platform.pool, unit_id) let unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -396,7 +398,7 @@ pub async fn create_unit(
validate_unit_timing_order(run_time_sec, acc_time_sec)?; validate_unit_timing_order(run_time_sec, acc_time_sec)?;
if service::get_unit_by_code(&state.platform.pool, &payload.code) if feeder_service::get_unit_by_code(&state.platform.pool, &payload.code)
.await? .await?
.is_some() .is_some()
{ {
@ -406,9 +408,9 @@ pub async fn create_unit(
)); ));
} }
let unit_id = service::create_unit( let unit_id = feeder_service::create_unit(
&state.platform.pool, &state.platform.pool,
service::CreateUnitParams { feeder_service::CreateUnitParams {
code: &payload.code, code: &payload.code,
name: &payload.name, name: &payload.name,
description: payload.description.as_deref(), description: payload.description.as_deref(),
@ -457,7 +459,7 @@ pub async fn update_unit(
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
payload.validate()?; payload.validate()?;
let existing_unit = service::get_unit_by_id(&state.platform.pool, unit_id) let existing_unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -468,7 +470,7 @@ pub async fn update_unit(
if let Some(code) = payload.code.as_deref() { if let Some(code) = payload.code.as_deref() {
let duplicate = let duplicate =
service::get_unit_by_code(&state.platform.pool, code).await?; feeder_service::get_unit_by_code(&state.platform.pool, code).await?;
if duplicate.as_ref().is_some_and(|item| item.id != unit_id) { if duplicate.as_ref().is_some_and(|item| item.id != unit_id) {
return Err(ApiErr::BadRequest( return Err(ApiErr::BadRequest(
"Unit code already exists".to_string(), "Unit code already exists".to_string(),
@ -490,10 +492,10 @@ pub async fn update_unit(
return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"}))); return Ok(Json(serde_json::json!({"ok_msg": "No fields to update"})));
} }
service::update_unit( feeder_service::update_unit(
&state.platform.pool, &state.platform.pool,
unit_id, unit_id,
service::UpdateUnitParams { feeder_service::UpdateUnitParams {
code: payload.code.as_deref(), code: payload.code.as_deref(),
name: payload.name.as_deref(), name: payload.name.as_deref(),
description: payload.description.as_deref(), description: payload.description.as_deref(),
@ -518,7 +520,7 @@ pub async fn delete_unit(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let deleted = service::delete_unit(&state.platform.pool, unit_id).await?; let deleted = feeder_service::delete_unit(&state.platform.pool, unit_id).await?;
if !deleted { if !deleted {
return Err(ApiErr::NotFound("Unit not found".to_string(), None)); return Err(ApiErr::NotFound("Unit not found".to_string(), None));
} }
@ -570,7 +572,7 @@ pub async fn start_auto_unit(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
let unit = service::get_unit_by_id(&state.platform.pool, unit_id) let unit = feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -609,7 +611,7 @@ pub async fn stop_auto_unit(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
service::get_unit_by_id(&state.platform.pool, unit_id) feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -628,7 +630,7 @@ pub async fn stop_auto_unit(
} }
pub async fn batch_start_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> { pub async fn batch_start_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let units = service::get_all_enabled_units(&state.platform.pool).await?; let units = feeder_service::get_all_enabled_units(&state.platform.pool).await?;
let mut started = Vec::new(); let mut started = Vec::new();
let mut skipped = Vec::new(); let mut skipped = Vec::new();
@ -656,7 +658,7 @@ pub async fn batch_start_auto(State(state): State<AppState>) -> Result<impl Into
} }
pub async fn batch_stop_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> { pub async fn batch_stop_auto(State(state): State<AppState>) -> Result<impl IntoResponse, ApiErr> {
let units = service::get_all_enabled_units(&state.platform.pool).await?; let units = feeder_service::get_all_enabled_units(&state.platform.pool).await?;
let mut stopped = Vec::new(); let mut stopped = Vec::new();
for unit in units { for unit in units {
@ -680,7 +682,7 @@ pub async fn ack_fault_unit(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
service::get_unit_by_id(&state.platform.pool, unit_id) feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;
@ -718,7 +720,7 @@ pub async fn get_unit_runtime(
State(state): State<AppState>, State(state): State<AppState>,
Path(unit_id): Path<Uuid>, Path(unit_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiErr> { ) -> Result<impl IntoResponse, ApiErr> {
service::get_unit_by_id(&state.platform.pool, unit_id) feeder_service::get_unit_by_id(&state.platform.pool, unit_id)
.await? .await?
.ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?; .ok_or_else(|| ApiErr::NotFound("Unit not found".to_string(), None))?;

View File

@ -2,7 +2,9 @@ pub mod app;
pub mod control; pub mod control;
pub mod event; pub mod event;
pub mod handler; pub mod handler;
pub mod model;
pub mod router; pub mod router;
pub mod service;
pub use app::{run, test_state, AppState}; pub use app::{run, test_state, AppState};
pub use router::build_router; pub use router::build_router;

View File

@ -0,0 +1,23 @@
use chrono::{DateTime, Utc};
use plc_platform_core::util::datetime::utc_to_local_str;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct ControlUnit {
pub id: Uuid,
pub code: String,
pub name: String,
pub description: Option<String>,
pub enabled: bool,
pub run_time_sec: i32,
pub stop_time_sec: i32,
pub acc_time_sec: i32,
pub bl_time_sec: i32,
pub require_manual_ack_after_fault: bool,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}

View File

@ -0,0 +1,3 @@
pub mod unit;
pub use unit::*;

View File

@ -0,0 +1,284 @@
use sqlx::PgPool;
use uuid::Uuid;
use crate::model::ControlUnit;
fn unit_order_clause() -> &'static str {
"code"
}
pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result<i64, sqlx::Error> {
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*)
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
"#,
)
.bind(like)
.fetch_one(pool)
.await
}
None => {
sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#)
.fetch_one(pool)
.await
}
}
}
pub async fn get_units_paginated(
pool: &PgPool,
keyword: Option<&str>,
page_size: i32,
offset: u32,
) -> Result<Vec<ControlUnit>, sqlx::Error> {
let unit_order = unit_order_clause();
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
if page_size == -1 {
let sql = format!(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY {}
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(like)
.fetch_all(pool)
.await
} else {
let sql = format!(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY {}
LIMIT $2 OFFSET $3
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
None => {
if page_size == -1 {
let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order);
sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await
} else {
let sql = format!(
r#"
SELECT *
FROM unit
ORDER BY {}
LIMIT $1 OFFSET $2
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
}
}
pub async fn get_unit_by_id(
pool: &PgPool,
unit_id: Uuid,
) -> Result<Option<ControlUnit>, sqlx::Error> {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE id = $1"#)
.bind(unit_id)
.fetch_optional(pool)
.await
}
pub async fn get_unit_by_code(
pool: &PgPool,
code: &str,
) -> Result<Option<ControlUnit>, sqlx::Error> {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE code = $1"#)
.bind(code)
.fetch_optional(pool)
.await
}
pub struct CreateUnitParams<'a> {
pub code: &'a str,
pub name: &'a str,
pub description: Option<&'a str>,
pub enabled: bool,
pub run_time_sec: i32,
pub stop_time_sec: i32,
pub acc_time_sec: i32,
pub bl_time_sec: i32,
pub require_manual_ack_after_fault: bool,
}
pub async fn create_unit(pool: &PgPool, params: CreateUnitParams<'_>) -> Result<Uuid, sqlx::Error> {
let unit_id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO unit (
id, code, name, description, enabled,
run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec,
require_manual_ack_after_fault
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(unit_id)
.bind(params.code)
.bind(params.name)
.bind(params.description)
.bind(params.enabled)
.bind(params.run_time_sec)
.bind(params.stop_time_sec)
.bind(params.acc_time_sec)
.bind(params.bl_time_sec)
.bind(params.require_manual_ack_after_fault)
.execute(pool)
.await?;
Ok(unit_id)
}
pub struct UpdateUnitParams<'a> {
pub code: Option<&'a str>,
pub name: Option<&'a str>,
pub description: Option<&'a str>,
pub enabled: Option<bool>,
pub run_time_sec: Option<i32>,
pub stop_time_sec: Option<i32>,
pub acc_time_sec: Option<i32>,
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
pub async fn update_unit(
pool: &PgPool,
unit_id: Uuid,
params: UpdateUnitParams<'_>,
) -> Result<(), sqlx::Error> {
let mut updates = Vec::new();
let mut param_count = 1;
if params.code.is_some() {
updates.push(format!("code = ${}", param_count));
param_count += 1;
}
if params.name.is_some() {
updates.push(format!("name = ${}", param_count));
param_count += 1;
}
if params.description.is_some() {
updates.push(format!("description = ${}", param_count));
param_count += 1;
}
if params.enabled.is_some() {
updates.push(format!("enabled = ${}", param_count));
param_count += 1;
}
if params.run_time_sec.is_some() {
updates.push(format!("run_time_sec = ${}", param_count));
param_count += 1;
}
if params.stop_time_sec.is_some() {
updates.push(format!("stop_time_sec = ${}", param_count));
param_count += 1;
}
if params.acc_time_sec.is_some() {
updates.push(format!("acc_time_sec = ${}", param_count));
param_count += 1;
}
if params.bl_time_sec.is_some() {
updates.push(format!("bl_time_sec = ${}", param_count));
param_count += 1;
}
if params.require_manual_ack_after_fault.is_some() {
updates.push(format!("require_manual_ack_after_fault = ${}", param_count));
param_count += 1;
}
updates.push("updated_at = NOW()".to_string());
let sql = format!(
r#"UPDATE unit SET {} WHERE id = ${}"#,
updates.join(", "),
param_count
);
let mut query = sqlx::query(&sql);
if let Some(code) = params.code {
query = query.bind(code);
}
if let Some(name) = params.name {
query = query.bind(name);
}
if let Some(description) = params.description {
query = query.bind(description);
}
if let Some(enabled) = params.enabled {
query = query.bind(enabled);
}
if let Some(run_time_sec) = params.run_time_sec {
query = query.bind(run_time_sec);
}
if let Some(stop_time_sec) = params.stop_time_sec {
query = query.bind(stop_time_sec);
}
if let Some(acc_time_sec) = params.acc_time_sec {
query = query.bind(acc_time_sec);
}
if let Some(bl_time_sec) = params.bl_time_sec {
query = query.bind(bl_time_sec);
}
if let Some(require_manual_ack_after_fault) = params.require_manual_ack_after_fault {
query = query.bind(require_manual_ack_after_fault);
}
query.bind(unit_id).execute(pool).await?;
Ok(())
}
pub async fn delete_unit(pool: &PgPool, unit_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(r#"DELETE FROM unit WHERE id = $1"#)
.bind(unit_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn get_all_enabled_units(pool: &PgPool) -> Result<Vec<ControlUnit>, sqlx::Error> {
let sql = format!(
"SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}",
unit_order_clause()
);
sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await
}
#[cfg(test)]
mod tests {
use super::unit_order_clause;
#[test]
fn unit_ordering_defaults_to_code() {
assert_eq!(unit_order_clause(), "code");
}
}

View File

@ -1,6 +1,6 @@
use std::time::Duration; use std::time::Duration;
use plc_platform_core::control::runtime::{ControlRuntimeStore, UnitRuntimeState}; use app_feeder_distributor::control::runtime::{ControlRuntimeStore, UnitRuntimeState};
use uuid::Uuid; use uuid::Uuid;
#[tokio::test] #[tokio::test]

View File

@ -1,2 +1 @@
pub mod command; pub mod command;
pub mod runtime;

View File

@ -14,6 +14,13 @@ use crate::util::{
response::ApiErr, response::ApiErr,
}; };
async fn unit_row_exists(pool: &sqlx::PgPool, unit_id: Uuid) -> Result<bool, sqlx::Error> {
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM unit WHERE id = $1)")
.bind(unit_id)
.fetch_one(pool)
.await
}
#[derive(Deserialize, Validate)] #[derive(Deserialize, Validate)]
pub struct GetEquipmentListQuery { pub struct GetEquipmentListQuery {
#[validate(length(min = 1, max = 100))] #[validate(length(min = 1, max = 100))]
@ -162,8 +169,8 @@ pub async fn create_equipment(
} }
if let Some(unit_id) = payload.unit_id { if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; let unit_exists = unit_row_exists(&state.pool, unit_id).await?;
if unit_exists.is_none() { if !unit_exists {
return Err(ApiErr::NotFound("Unit not found".to_string(), None)); return Err(ApiErr::NotFound("Unit not found".to_string(), None));
} }
} }
@ -211,8 +218,8 @@ pub async fn update_equipment(
} }
if let Some(Some(unit_id)) = payload.unit_id { if let Some(Some(unit_id)) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; let unit_exists = unit_row_exists(&state.pool, unit_id).await?;
if unit_exists.is_none() { if !unit_exists {
return Err(ApiErr::NotFound("Unit not found".to_string(), None)); return Err(ApiErr::NotFound("Unit not found".to_string(), None));
} }
} }
@ -262,8 +269,8 @@ pub async fn batch_set_equipment_unit(
} }
if let Some(unit_id) = payload.unit_id { if let Some(unit_id) = payload.unit_id {
let unit_exists = crate::service::get_unit_by_id(&state.pool, unit_id).await?; let unit_exists = unit_row_exists(&state.pool, unit_id).await?;
if unit_exists.is_none() { if !unit_exists {
return Err(ApiErr::NotFound("Unit not found".to_string(), None)); return Err(ApiErr::NotFound("Unit not found".to_string(), None));
} }
} }

View File

@ -134,24 +134,6 @@ pub struct Equipment {
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
} }
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct ControlUnit {
pub id: Uuid,
pub code: String,
pub name: String,
pub description: Option<String>,
pub enabled: bool,
pub run_time_sec: i32,
pub stop_time_sec: i32,
pub acc_time_sec: i32,
pub bl_time_sec: i32,
pub require_manual_ack_after_fault: bool,
#[serde(serialize_with = "utc_to_local_str")]
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "utc_to_local_str")]
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] #[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
pub struct EventRecord { pub struct EventRecord {
pub id: Uuid, pub id: Uuid,

View File

@ -1,11 +1,7 @@
use crate::model::{ControlUnit, EventRecord}; use crate::model::EventRecord;
use sqlx::{PgPool, QueryBuilder, Row}; use sqlx::{PgPool, QueryBuilder, Row};
use uuid::Uuid; use uuid::Uuid;
fn unit_order_clause() -> &'static str {
"code"
}
fn equipment_order_clause_with_unit() -> &'static str { fn equipment_order_clause_with_unit() -> &'static str {
"unit_id, code" "unit_id, code"
} }
@ -16,264 +12,6 @@ pub struct EquipmentRolePoint {
pub signal_role: String, pub signal_role: String,
} }
pub async fn get_units_count(pool: &PgPool, keyword: Option<&str>) -> Result<i64, sqlx::Error> {
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*)
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
"#,
)
.bind(like)
.fetch_one(pool)
.await
}
None => {
sqlx::query_scalar::<_, i64>(r#"SELECT COUNT(*) FROM unit"#)
.fetch_one(pool)
.await
}
}
}
pub async fn get_units_paginated(
pool: &PgPool,
keyword: Option<&str>,
page_size: i32,
offset: u32,
) -> Result<Vec<ControlUnit>, sqlx::Error> {
let unit_order = unit_order_clause();
match keyword {
Some(keyword) => {
let like = format!("%{}%", keyword);
if page_size == -1 {
let sql = format!(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY {}
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(like)
.fetch_all(pool)
.await
} else {
let sql = format!(
r#"
SELECT *
FROM unit
WHERE code ILIKE $1 OR name ILIKE $1
ORDER BY {}
LIMIT $2 OFFSET $3
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(like)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
None => {
if page_size == -1 {
let sql = format!("SELECT * FROM unit ORDER BY {}", unit_order);
sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await
} else {
let sql = format!(
r#"
SELECT *
FROM unit
ORDER BY {}
LIMIT $1 OFFSET $2
"#,
unit_order
);
sqlx::query_as::<_, ControlUnit>(&sql)
.bind(page_size as i64)
.bind(offset as i64)
.fetch_all(pool)
.await
}
}
}
}
pub async fn get_unit_by_id(
pool: &PgPool,
unit_id: Uuid,
) -> Result<Option<ControlUnit>, sqlx::Error> {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE id = $1"#)
.bind(unit_id)
.fetch_optional(pool)
.await
}
pub async fn get_unit_by_code(
pool: &PgPool,
code: &str,
) -> Result<Option<ControlUnit>, sqlx::Error> {
sqlx::query_as::<_, ControlUnit>(r#"SELECT * FROM unit WHERE code = $1"#)
.bind(code)
.fetch_optional(pool)
.await
}
pub struct CreateUnitParams<'a> {
pub code: &'a str,
pub name: &'a str,
pub description: Option<&'a str>,
pub enabled: bool,
pub run_time_sec: i32,
pub stop_time_sec: i32,
pub acc_time_sec: i32,
pub bl_time_sec: i32,
pub require_manual_ack_after_fault: bool,
}
pub async fn create_unit(pool: &PgPool, params: CreateUnitParams<'_>) -> Result<Uuid, sqlx::Error> {
let unit_id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO unit (
id, code, name, description, enabled,
run_time_sec, stop_time_sec, acc_time_sec, bl_time_sec,
require_manual_ack_after_fault
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(unit_id)
.bind(params.code)
.bind(params.name)
.bind(params.description)
.bind(params.enabled)
.bind(params.run_time_sec)
.bind(params.stop_time_sec)
.bind(params.acc_time_sec)
.bind(params.bl_time_sec)
.bind(params.require_manual_ack_after_fault)
.execute(pool)
.await?;
Ok(unit_id)
}
pub struct UpdateUnitParams<'a> {
pub code: Option<&'a str>,
pub name: Option<&'a str>,
pub description: Option<&'a str>,
pub enabled: Option<bool>,
pub run_time_sec: Option<i32>,
pub stop_time_sec: Option<i32>,
pub acc_time_sec: Option<i32>,
pub bl_time_sec: Option<i32>,
pub require_manual_ack_after_fault: Option<bool>,
}
pub async fn update_unit(
pool: &PgPool,
unit_id: Uuid,
params: UpdateUnitParams<'_>,
) -> Result<(), sqlx::Error> {
let mut updates = Vec::new();
let mut param_count = 1;
if params.code.is_some() {
updates.push(format!("code = ${}", param_count));
param_count += 1;
}
if params.name.is_some() {
updates.push(format!("name = ${}", param_count));
param_count += 1;
}
if params.description.is_some() {
updates.push(format!("description = ${}", param_count));
param_count += 1;
}
if params.enabled.is_some() {
updates.push(format!("enabled = ${}", param_count));
param_count += 1;
}
if params.run_time_sec.is_some() {
updates.push(format!("run_time_sec = ${}", param_count));
param_count += 1;
}
if params.stop_time_sec.is_some() {
updates.push(format!("stop_time_sec = ${}", param_count));
param_count += 1;
}
if params.acc_time_sec.is_some() {
updates.push(format!("acc_time_sec = ${}", param_count));
param_count += 1;
}
if params.bl_time_sec.is_some() {
updates.push(format!("bl_time_sec = ${}", param_count));
param_count += 1;
}
if params.require_manual_ack_after_fault.is_some() {
updates.push(format!("require_manual_ack_after_fault = ${}", param_count));
param_count += 1;
}
updates.push("updated_at = NOW()".to_string());
let sql = format!(
r#"UPDATE unit SET {} WHERE id = ${}"#,
updates.join(", "),
param_count
);
let mut query = sqlx::query(&sql);
if let Some(code) = params.code {
query = query.bind(code);
}
if let Some(name) = params.name {
query = query.bind(name);
}
if let Some(description) = params.description {
query = query.bind(description);
}
if let Some(enabled) = params.enabled {
query = query.bind(enabled);
}
if let Some(run_time_sec) = params.run_time_sec {
query = query.bind(run_time_sec);
}
if let Some(stop_time_sec) = params.stop_time_sec {
query = query.bind(stop_time_sec);
}
if let Some(acc_time_sec) = params.acc_time_sec {
query = query.bind(acc_time_sec);
}
if let Some(bl_time_sec) = params.bl_time_sec {
query = query.bind(bl_time_sec);
}
if let Some(require_manual_ack_after_fault) = params.require_manual_ack_after_fault {
query = query.bind(require_manual_ack_after_fault);
}
query.bind(unit_id).execute(pool).await?;
Ok(())
}
pub async fn delete_unit(pool: &PgPool, unit_id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query(r#"DELETE FROM unit WHERE id = $1"#)
.bind(unit_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn get_events_count( pub async fn get_events_count(
pool: &PgPool, pool: &PgPool,
unit_id: Option<Uuid>, unit_id: Option<Uuid>,
@ -317,14 +55,6 @@ pub async fn get_events_paginated(
qb.build_query_as::<EventRecord>().fetch_all(pool).await qb.build_query_as::<EventRecord>().fetch_all(pool).await
} }
pub async fn get_all_enabled_units(pool: &PgPool) -> Result<Vec<ControlUnit>, sqlx::Error> {
let sql = format!(
"SELECT * FROM unit WHERE enabled = TRUE ORDER BY {}",
unit_order_clause()
);
sqlx::query_as::<_, ControlUnit>(&sql).fetch_all(pool).await
}
pub async fn get_equipment_by_unit_ids( pub async fn get_equipment_by_unit_ids(
pool: &PgPool, pool: &PgPool,
unit_ids: &[Uuid], unit_ids: &[Uuid],
@ -346,14 +76,12 @@ pub async fn get_equipment_by_unit_id(
pool: &PgPool, pool: &PgPool,
unit_id: Uuid, unit_id: Uuid,
) -> Result<Vec<crate::model::Equipment>, sqlx::Error> { ) -> Result<Vec<crate::model::Equipment>, sqlx::Error> {
let sql = format!( sqlx::query_as::<_, crate::model::Equipment>(
"SELECT * FROM equipment WHERE unit_id = $1 ORDER BY {}", "SELECT * FROM equipment WHERE unit_id = $1 ORDER BY code",
unit_order_clause() )
); .bind(unit_id)
sqlx::query_as::<_, crate::model::Equipment>(&sql) .fetch_all(pool)
.bind(unit_id) .await
.fetch_all(pool)
.await
} }
pub async fn get_points_by_equipment_ids( pub async fn get_points_by_equipment_ids(
@ -462,12 +190,7 @@ pub async fn get_equipment_role_points(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{equipment_order_clause_with_unit, unit_order_clause}; use super::equipment_order_clause_with_unit;
#[test]
fn unit_ordering_defaults_to_code() {
assert_eq!(unit_order_clause(), "code");
}
#[test] #[test]
fn unit_equipment_ordering_uses_code_within_unit() { fn unit_equipment_ordering_uses_code_within_unit() {

View File

@ -14,7 +14,6 @@ use uuid::Uuid;
use crate::platform_context::PlatformContext; use crate::platform_context::PlatformContext;
use crate::{ use crate::{
connection::{BatchSetPointValueReq, BatchSetPointValueRes}, connection::{BatchSetPointValueReq, BatchSetPointValueRes},
control::runtime::UnitRuntime,
model::EventRecord, model::EventRecord,
telemetry::PointMonitorInfo, telemetry::PointMonitorInfo,
}; };
@ -25,7 +24,19 @@ pub enum WsMessage {
PointNewValue(PointMonitorInfo), PointNewValue(PointMonitorInfo),
PointSetValueBatchResult(BatchSetPointValueRes), PointSetValueBatchResult(BatchSetPointValueRes),
EventCreated(EventRecord), EventCreated(EventRecord),
UnitRuntimeChanged(UnitRuntime), AppEvent(AppWsEvent),
}
/// Business-event payload carried by `WsMessage::AppEvent`.
///
/// Apps construct this so core stays free of business types. Frontend dispatches
/// by `app` first, then `event_type`. `data` is opaque to core; each app
/// documents its schema.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppWsEvent {
pub app: String,
pub event_type: String,
pub data: serde_json::Value,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -150,15 +150,19 @@ export function startPointSocket() {
prependEvent(payload.data); prependEvent(payload.data);
} }
if (payload.type === "UnitRuntimeChanged") { if (payload.type === "AppEvent" || payload.type === "app_event") {
const runtime = payload.data; const envelope = payload.data || {};
state.runtimes.set(runtime.unit_id, runtime); if (envelope.app !== "feeder") return;
renderUnits(); if (envelope.event_type === "unit_runtime_changed") {
// lazy import to avoid circular dep (ops.js -> logs.js -> ops.js) const runtime = envelope.data;
import("./ops.js").then(({ renderOpsUnits, syncEquipmentButtonsForUnit }) => { state.runtimes.set(runtime.unit_id, runtime);
renderOpsUnits(); renderUnits();
syncEquipmentButtonsForUnit(runtime.unit_id); // lazy import to avoid circular dep (ops.js -> logs.js -> ops.js)
}); import("./ops.js").then(({ renderOpsUnits, syncEquipmentButtonsForUnit }) => {
renderOpsUnits();
syncEquipmentButtonsForUnit(runtime.unit_id);
});
}
return; return;
} }
} catch { } catch {