import os import sys import paho.mqtt.client as mqtt import json import logging from logging.handlers import RotatingFileHandler from sqlalchemy import create_engine, Engine, text from sqlalchemy.exc import IntegrityError from datetime import datetime from dateutil import tz import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from threading import Thread import traceback import queue CUR_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, CUR_DIR) import conf LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), # Log to console RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file ]) logger = logging.getLogger(__name__) engine:Engine = None mpoint_dict = {} # {"mpoint_id": last_timex} msg_queue: queue.Queue = queue.Queue() def send_error_email(message, subject='hfnf_mqtt', ): msg = MIMEMultipart() msg['From'] = conf.EMAIL_HOST_USER msg['To'] = conf.EMAIL_HOST_USER msg['Subject'] = subject msg.attach(MIMEText(message, 'plain')) server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT) server.starttls() server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD) server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string()) server.quit() def worker(): while True: msg = msg_queue.get() if msg is not None: save_items(msg) def save_items(payload): item_list = json.loads(payload) sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" save_list = [] for item in item_list: timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) mpoint_id = item['name'] val_float = None val_str = None last_timex: datetime = mpoint_dict.get(mpoint_id, None) if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( last_timex and last_timex.minute == timex.minute): continue else: try: val_float = float(item["value"]) except Exception: val_str = item["value"] save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str}) if save_list: with engine.connect() as conn: for item in save_list: is_ok = False try: conn.execute(text(sql_str), item) is_ok = True except IntegrityError: logger.error(f"{item['mpoint_id']}-主键冲突") except Exception as e: logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True) Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start() if is_ok: conn.commit() mpoint_dict[item['mpoint_id']] = item['timex'] def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): if rc == 0: logger.info("Connected to MQTT broker") mqttc.subscribe(conf.MQTT_TOPIC, qos=1) logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}") else: logger.error("Failed to connect to MQTT broker") def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): topic = msg.topic if topic == conf.MQTT_TOPIC: msg_queue.put(msg.payload) def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties): logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}") def start_mqtt(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105') client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT) client.loop_forever() if __name__ == '__main__': try: engine = create_engine(conf.DATABASE_URL) logger.info("Connected to database") Thread(target=worker, daemon=True).start() start_mqtt() except Exception: logger.error("异常退出", exc_info=True) Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start() raise