import os import sys import paho.mqtt.client as mqtt import json import logging from logging.handlers import RotatingFileHandler from sqlalchemy import create_engine, Engine, text from sqlalchemy.exc import IntegrityError from datetime import datetime from dateutil import tz import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from threading import Thread import traceback import queue CUR_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, CUR_DIR) import conf import time LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), # Log to console RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file ]) logger = logging.getLogger(__name__) engine:Engine = None # {table_name: {mpoint_id: last_timex}} 按表分桶,避免跨表互相抑制 mpoint_dict: dict = {} # topic -> 目标表名 映射 TOPIC_TABLE_MAP = { conf.MQTT_TOPIC: 'mplogx', conf.MQTT_TOPIC_zl: 'mplogx_zlwd', conf.MQTT_TOPIC_tl: 'mplogx_tlsn', conf.MQTT_TOPIC_1: 'mplogx_hknf_l1', conf.MQTT_TOPIC_2: 'mplogx_hknf_l2', conf.MQTT_TOPIC_3: 'mplogx_hknf_l3', } # topic -> 独立队列,每个 topic 一个 worker 线程消费 msg_queues: dict = {topic: queue.Queue() for topic in TOPIC_TABLE_MAP} def send_error_email(message, subject='hfnf_mqtt', ): msg = MIMEMultipart() msg['From'] = conf.EMAIL_HOST_USER msg['To'] = conf.EMAIL_HOST_USER msg['Subject'] = subject msg.attach(MIMEText(message, 'plain')) server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT) server.starttls() server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD) server.sendmail(conf.EMAIL_HOST_USER, [conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER2], msg.as_string()) server.quit() def worker(topic: str): """每个 topic 一个 worker 线程,独立做断流告警""" table_name = TOPIC_TABLE_MAP[topic] q = msg_queues[topic] last_message_time = time.time() # 该 topic 最后一次收到消息时间 last_alert_time = 0 # 该 topic 上一次报警时间 while True: try: payload = q.get(timeout=60) if payload is not None: save_items(topic, payload) last_message_time = time.time() last_alert_time = 0 except queue.Empty: now = time.time() if now - last_message_time > 600 and now - last_alert_time > 600: Thread( target=send_error_email, args=(f"time-out 10min [{table_name}] topic={topic} 无数据推送",), ).start() last_alert_time = now def save_items(topic, payload): table_name = TOPIC_TABLE_MAP.get(topic) if not table_name: logger.error(f"未知 topic:{topic},跳过入库") return item_list = json.loads(payload) sql_str = f"INSERT INTO {table_name} (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" table_mpoint_dict = mpoint_dict.setdefault(table_name, {}) save_list = [] for item in item_list: timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) mpoint_id = item['name'] val_float = None val_str = None last_timex: datetime = table_mpoint_dict.get(mpoint_id, None) if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( last_timex and last_timex.minute == timex.minute): continue else: try: val_float = float(item["value"]) except Exception: val_str = item["value"] save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str}) if save_list: with engine.connect() as conn: for item in save_list: is_ok = False try: conn.execute(text(sql_str), item) is_ok = True except IntegrityError: logger.error(f"[{table_name}] {item['mpoint_id']}-主键冲突") except Exception as e: logger.error(f"[{table_name}] {item['mpoint_id']}-保存数据失败", exc_info=True) Thread(target=send_error_email, args=(f"[{table_name}] {item['mpoint_id']}-保存数据失败-{e}",)).start() if is_ok: conn.commit() table_mpoint_dict[item['mpoint_id']] = item['timex'] def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): if rc == 0: logger.info("Connected to MQTT broker") topics = [(t, 1) for t in TOPIC_TABLE_MAP.keys()] mqttc.subscribe(topics) logger.info(f"Subscribed to topics: {[t for t, _ in topics]}") else: logger.error("Failed to connect to MQTT broker") def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): topic = msg.topic q = msg_queues.get(topic) if q is not None: q.put(msg.payload) def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties): logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}") def start_mqtt(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105') client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT) client.loop_forever() if __name__ == '__main__': try: engine = create_engine(conf.DATABASE_URL) logger.info("Connected to database") for _topic in TOPIC_TABLE_MAP: Thread(target=worker, args=(_topic,), daemon=True).start() logger.info(f"Worker thread started for topic={_topic} -> {TOPIC_TABLE_MAP[_topic]}") start_mqtt() except Exception: logger.error("异常退出", exc_info=True) Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start() raise