diff --git a/mqttc.py b/mqttc.py index 79a6e17..2c970f8 100644 --- a/mqttc.py +++ b/mqttc.py @@ -19,6 +19,7 @@ import queue CUR_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, CUR_DIR) import conf +import time LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') @@ -30,6 +31,7 @@ logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO) ]) logger = logging.getLogger(__name__) + engine:Engine = None mpoint_dict = {} # {"mpoint_id": last_timex} msg_queue: queue.Queue = queue.Queue() @@ -46,12 +48,24 @@ def send_error_email(message, subject='hfnf_mqtt', ): server.sendmail(conf.EMAIL_HOST_USER, [conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER2], msg.as_string()) server.quit() -def worker(): - while True: - msg = msg_queue.get() - if msg is not None: - save_items(msg) +def worker(): + last_message_time = time.time() # 最后一次收到消息时间 + last_alert_time = 0 # 上一次报警时间 + while True: + try: + msg = msg_queue.get(timeout=60) + if msg is not None: + save_items(msg) + last_message_time = time.time() + last_alert_time = 0 + + except queue.Empty: + now = time.time() + if now - last_message_time > 600 and now - last_alert_time > 600: + Thread(target=send_error_email, args=(f"time-out 10min hfnf_mqtt-无数据推送",)).start() + last_alert_time = now + def save_items(payload): item_list = json.loads(payload) sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"