diff --git a/mqttc.py b/mqttc.py index 2c970f8..d1050ee 100644 --- a/mqttc.py +++ b/mqttc.py @@ -33,13 +33,26 @@ logger = logging.getLogger(__name__) engine:Engine = None -mpoint_dict = {} # {"mpoint_id": last_timex} -msg_queue: queue.Queue = queue.Queue() +# {table_name: {mpoint_id: last_timex}} 按表分桶,避免跨表互相抑制 +mpoint_dict: dict = {} + +# topic -> 目标表名 映射 +TOPIC_TABLE_MAP = { + conf.MQTT_TOPIC: 'mplogx', + conf.MQTT_TOPIC_zl: 'mplogx_zlwd', + conf.MQTT_TOPIC_tl: 'mplogx_tlsn', + conf.MQTT_TOPIC_1: 'mplogx_hknf_l1', + conf.MQTT_TOPIC_2: 'mplogx_hknf_l2', + conf.MQTT_TOPIC_3: 'mplogx_hknf_l3', +} + +# topic -> 独立队列,每个 topic 一个 worker 线程消费 +msg_queues: dict = {topic: queue.Queue() for topic in TOPIC_TABLE_MAP} def send_error_email(message, subject='hfnf_mqtt', ): msg = MIMEMultipart() msg['From'] = conf.EMAIL_HOST_USER - msg['To'] = conf.EMAIL_HOST_USER + msg['To'] = conf.EMAIL_HOST_USER msg['Subject'] = subject msg.attach(MIMEText(message, 'plain')) server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT) @@ -49,33 +62,45 @@ def send_error_email(message, subject='hfnf_mqtt', ): server.quit() -def worker(): - last_message_time = time.time() # 最后一次收到消息时间 - last_alert_time = 0 # 上一次报警时间 +def worker(topic: str): + """每个 topic 一个 worker 线程,独立做断流告警""" + table_name = TOPIC_TABLE_MAP[topic] + q = msg_queues[topic] + last_message_time = time.time() # 该 topic 最后一次收到消息时间 + last_alert_time = 0 # 该 topic 上一次报警时间 while True: try: - msg = msg_queue.get(timeout=60) - if msg is not None: - save_items(msg) + payload = q.get(timeout=60) + if payload is not None: + save_items(topic, payload) last_message_time = time.time() last_alert_time = 0 except queue.Empty: now = time.time() if now - last_message_time > 600 and now - last_alert_time > 600: - Thread(target=send_error_email, args=(f"time-out 10min hfnf_mqtt-无数据推送",)).start() + Thread( + target=send_error_email, + args=(f"time-out 10min [{table_name}] topic={topic} 无数据推送",), + ).start() last_alert_time = now -def save_items(payload): +def save_items(topic, payload): + table_name = TOPIC_TABLE_MAP.get(topic) + if not table_name: + logger.error(f"未知 topic:{topic},跳过入库") + return + item_list = json.loads(payload) - sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" + sql_str = f"INSERT INTO {table_name} (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" + table_mpoint_dict = mpoint_dict.setdefault(table_name, {}) save_list = [] for item in item_list: timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) mpoint_id = item['name'] val_float = None val_str = None - last_timex: datetime = mpoint_dict.get(mpoint_id, None) + last_timex: datetime = table_mpoint_dict.get(mpoint_id, None) if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( last_timex and last_timex.minute == timex.minute): continue @@ -94,14 +119,14 @@ def save_items(payload): conn.execute(text(sql_str), item) is_ok = True except IntegrityError: - logger.error(f"{item['mpoint_id']}-主键冲突") + logger.error(f"[{table_name}] {item['mpoint_id']}-主键冲突") except Exception as e: - logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True) - Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start() - + logger.error(f"[{table_name}] {item['mpoint_id']}-保存数据失败", exc_info=True) + Thread(target=send_error_email, args=(f"[{table_name}] {item['mpoint_id']}-保存数据失败-{e}",)).start() + if is_ok: conn.commit() - mpoint_dict[item['mpoint_id']] = item['timex'] + table_mpoint_dict[item['mpoint_id']] = item['timex'] @@ -109,16 +134,18 @@ def save_items(payload): def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): if rc == 0: logger.info("Connected to MQTT broker") - mqttc.subscribe(conf.MQTT_TOPIC, qos=1) - logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}") + topics = [(t, 1) for t in TOPIC_TABLE_MAP.keys()] + mqttc.subscribe(topics) + logger.info(f"Subscribed to topics: {[t for t, _ in topics]}") else: logger.error("Failed to connect to MQTT broker") def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): topic = msg.topic - if topic == conf.MQTT_TOPIC: - msg_queue.put(msg.payload) + q = msg_queues.get(topic) + if q is not None: + q.put(msg.payload) def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties): logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}") @@ -137,7 +164,9 @@ if __name__ == '__main__': try: engine = create_engine(conf.DATABASE_URL) logger.info("Connected to database") - Thread(target=worker, daemon=True).start() + for _topic in TOPIC_TABLE_MAP: + Thread(target=worker, args=(_topic,), daemon=True).start() + logger.info(f"Worker thread started for topic={_topic} -> {TOPIC_TABLE_MAP[_topic]}") start_mqtt() except Exception: logger.error("异常退出", exc_info=True)