mod model; mod config; mod util; mod db; mod handler; mod middleware; mod connection; mod event; mod service; mod websocket; mod telemetry; use config::AppConfig; use tower_http::cors::{Any, CorsLayer}; use tower_http::services::ServeDir; use db::init_database; use middleware::simple_logger; use connection::ConnectionManager; use event::EventManager; use std::sync::Arc; use axum::{ routing::{get, put}, Router, }; use tokio::sync::mpsc; #[derive(Clone)] pub struct AppState { pub config: AppConfig, pub pool: sqlx::PgPool, pub connection_manager: Arc, pub event_manager: Arc, pub ws_manager: Arc, } #[tokio::main] async fn main() { dotenv::dotenv().ok(); util::log::init_logger(); let config = AppConfig::from_env().expect("Failed to load configuration"); let pool = init_database(&config.database_url).await.expect("Failed to initialize database"); let mut connection_manager = ConnectionManager::new(); let ws_manager = Arc::new(websocket::WebSocketManager::new()); let event_manager = Arc::new(EventManager::new( pool.clone(), Arc::new(connection_manager.clone()), Some(ws_manager.clone()), )); connection_manager.set_event_manager(event_manager.clone()); connection_manager.set_pool_and_start_reconnect_task(Arc::new(pool.clone())); let connection_manager = Arc::new(connection_manager); // Connect to all enabled sources concurrently let sources = service::get_all_enabled_sources(&pool) .await .expect("Failed to fetch sources"); // Spawn a task for each source to connect and subscribe concurrently let mut tasks = Vec::new(); for source in sources { let cm = connection_manager.clone(); let p = pool.clone(); let source_name = source.name.clone(); let source_id = source.id; let task = tokio::spawn(async move { if let Err(e) = cm.connect_from_source(&p, source_id).await { tracing::error!("Failed to connect to source {}: {}", source_name, e); } }); tasks.push(task); } // Wait for all connection tasks to complete for task in tasks { if let Err(e) = task.await { tracing::error!("Source connection task failed: {:?}", e); } } let state = AppState { config: config.clone(), pool, connection_manager: connection_manager.clone(), event_manager, ws_manager, }; let app = build_router(state.clone()); let addr = format!("{}:{}", config.server_host, config.server_port); tracing::info!("Starting server at http://{}", addr); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); let ui_url = format!("http://{}:{}/ui", "localhost", config.server_port); let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); let shutdown_tx_ctrl = shutdown_tx.clone(); let rt_handle = tokio::runtime::Handle::current(); init_tray(ui_url, shutdown_tx.clone(), rt_handle); let connection_manager_for_shutdown = connection_manager.clone(); tokio::spawn(async move { tokio::signal::ctrl_c() .await .expect("Failed to install Ctrl+C handler"); let _ = shutdown_tx_ctrl.send(()).await; }); let shutdown_signal = async move{ let _ = shutdown_rx.recv().await; tracing::info!("Received shutdown signal, closing all connections..."); connection_manager_for_shutdown.disconnect_all().await; tracing::info!("All connections closed"); }; axum::serve(listener, app) .with_graceful_shutdown(shutdown_signal) .await .unwrap(); } fn build_router(state: AppState) -> Router { let all_route = Router::new() .route("/api/source", get(handler::source::get_source_list).post(handler::source::create_source)) .route("/api/source/{source_id}", axum::routing::delete(handler::source::delete_source).put(handler::source::update_source)) .route("/api/source/{source_id}/browse", axum::routing::post(handler::source::browse_and_save_nodes)) .route("/api/source/{source_id}/node-tree", get(handler::source::get_node_tree)) .route("/api/point", get(handler::point::get_point_list)) .route( "/api/point/value/batch", axum::routing::post(handler::point::batch_set_point_value), ) .route( "/api/point/batch", axum::routing::post(handler::point::batch_create_points) .delete(handler::point::batch_delete_points), ) .route("/api/point/{point_id}", get(handler::point::get_point).put(handler::point::update_point).delete(handler::point::delete_point)) .route("/api/point/batch/set-tags", put(handler::point::batch_set_point_tags)) .route("/api/tag", get(handler::tag::get_tag_list).post(handler::tag::create_tag)) .route("/api/tag/{tag_id}", get(handler::tag::get_tag_points).put(handler::tag::update_tag).delete(handler::tag::delete_tag)) .route("/api/page", get(handler::page::get_page_list).post(handler::page::create_page)) .route("/api/page/{page_id}", get(handler::page::get_page).put(handler::page::update_page).delete(handler::page::delete_page)) .route("/api/logs", get(handler::log::get_logs)) .route("/api/logs/stream", get(handler::log::stream_logs)); Router::new() .merge(all_route) .nest_service("/ui", ServeDir::new("web").append_index_html_on_directories(true)) .route("/ws/public", get(websocket::public_websocket_handler)) .route("/ws/client/{client_id}", get(websocket::client_websocket_handler)) .layer(axum::middleware::from_fn(simple_logger)) .layer( CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any), ) .with_state(state) } #[cfg(windows)] fn init_tray(ui_url: String, shutdown_tx: mpsc::Sender<()>, rt_handle: tokio::runtime::Handle) { std::thread::spawn(move || { if let Err(e) = tray::run_tray(ui_url, shutdown_tx, rt_handle) { tracing::warn!("Tray init failed: {}", e); } }); } #[cfg(not(windows))] fn init_tray(_ui_url: String, _shutdown_tx: mpsc::Sender<()>, _rt_handle: tokio::runtime::Handle) {} #[cfg(windows)] mod tray { use std::error::Error; use tokio::sync::mpsc; use tray_icon::{ menu::{Menu, MenuEvent, MenuItem}, Icon, TrayIconBuilder, }; use winit::application::ApplicationHandler; use winit::event_loop::{ActiveEventLoop, ControlFlow, EventLoop}; use winit::platform::windows::EventLoopBuilderExtWindows; pub fn run_tray( ui_url: String, shutdown_tx: mpsc::Sender<()>, rt_handle: tokio::runtime::Handle, ) -> Result<(), Box> { let mut builder = EventLoop::builder(); builder.with_any_thread(true); let event_loop = builder.build()?; let menu = Menu::new(); let open_item = MenuItem::new("Open UI", true, None); let exit_item = MenuItem::new("Exit", true, None); menu.append(&open_item)?; menu.append(&exit_item)?; let icon = Icon::from_rgba(vec![0, 120, 212, 255], 1, 1)?; let _tray = TrayIconBuilder::new() .with_tooltip("PLC Control") .with_menu(Box::new(menu)) .with_icon(icon) .build()?; let menu_rx = MenuEvent::receiver(); let mut app = TrayApp { menu_rx, open_id: open_item.id().clone(), exit_id: exit_item.id().clone(), ui_url, shutdown_tx, rt_handle, }; event_loop.run_app(&mut app).map_err(|e| e.into()) } struct TrayApp { menu_rx: &'static tray_icon::menu::MenuEventReceiver, open_id: tray_icon::menu::MenuId, exit_id: tray_icon::menu::MenuId, ui_url: String, shutdown_tx: mpsc::Sender<()>, rt_handle: tokio::runtime::Handle, } impl ApplicationHandler for TrayApp { fn resumed(&mut self, _event_loop: &ActiveEventLoop) {} fn window_event( &mut self, _event_loop: &ActiveEventLoop, _window_id: winit::window::WindowId, _event: winit::event::WindowEvent, ) {} fn about_to_wait(&mut self, event_loop: &ActiveEventLoop) { event_loop.set_control_flow(ControlFlow::Wait); while let Ok(menu_event) = self.menu_rx.try_recv() { if menu_event.id == self.open_id { let _ = webbrowser::open(&self.ui_url); } if menu_event.id == self.exit_id { let _ = self.rt_handle.block_on(self.shutdown_tx.send(())); event_loop.exit(); } } } } }