From 712b26be9895c3cc3043153c5a3e8b0a97eb8844 Mon Sep 17 00:00:00 2001 From: zty Date: Thu, 18 Sep 2025 13:45:18 +0800 Subject: [PATCH] =?UTF-8?q?feat=20:=20=E4=BF=AE=E6=94=B9=E6=97=A0=E6=8E=A8?= =?UTF-8?q?=E9=80=81=E6=8A=A5=E8=AD=A6=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mqttc.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/mqttc.py b/mqttc.py index 79a6e17..2c970f8 100644 --- a/mqttc.py +++ b/mqttc.py @@ -19,6 +19,7 @@ import queue CUR_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, CUR_DIR) import conf +import time LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') @@ -30,6 +31,7 @@ logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO) ]) logger = logging.getLogger(__name__) + engine:Engine = None mpoint_dict = {} # {"mpoint_id": last_timex} msg_queue: queue.Queue = queue.Queue() @@ -46,12 +48,24 @@ def send_error_email(message, subject='hfnf_mqtt', ): server.sendmail(conf.EMAIL_HOST_USER, [conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER2], msg.as_string()) server.quit() -def worker(): - while True: - msg = msg_queue.get() - if msg is not None: - save_items(msg) +def worker(): + last_message_time = time.time() # 最后一次收到消息时间 + last_alert_time = 0 # 上一次报警时间 + while True: + try: + msg = msg_queue.get(timeout=60) + if msg is not None: + save_items(msg) + last_message_time = time.time() + last_alert_time = 0 + + except queue.Empty: + now = time.time() + if now - last_message_time > 600 and now - last_alert_time > 600: + Thread(target=send_error_email, args=(f"time-out 10min hfnf_mqtt-无数据推送",)).start() + last_alert_time = now + def save_items(payload): item_list = json.loads(payload) sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"