import os import sys import paho.mqtt.client as mqtt import json import logging from logging.handlers import RotatingFileHandler from sqlalchemy import create_engine, Engine, text from sqlalchemy.exc import IntegrityError from datetime import datetime from dateutil import tz import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from threading import Thread import traceback CUR_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, CUR_DIR) import conf LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), # Log to console RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file ]) logger = logging.getLogger(__name__) engine:Engine = None mpoint_dict = {} # {"mpoint_id": last_timex} def send_error_email(message, subject='hfnf_mqtt', ): msg = MIMEMultipart() msg['From'] = conf.EMAIL_HOST_USER msg['To'] = conf.EMAIL_HOST_USER msg['Subject'] = subject msg.attach(MIMEText(message, 'plain')) server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT) server.starttls() server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD) server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string()) server.quit() def save_items(payload): item_list = json.loads(payload) sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" with engine.connect() as conn: data = {} for item in item_list: data['timex'] = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) data['mpoint_id'] = item['name'] last_timex: datetime = mpoint_dict.get(data['mpoint_id'], None) if data["timex"].second not in getattr(conf.SAVE_SECONDS, [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( last_timex and last_timex.second == data["timex"].second): continue try: val_float = float(item["value"]) data["val_float"] = val_float data["val_str"] = None except Exception: data["val_float"] = None data["val_str"] = item["value"] try: conn.execute(text(sql_str), data) except IntegrityError: logger.error(f"{data['mpoint_id']}-主键冲突") except Exception as e: logger.error(f"{data['mpoint_id']}-保存数据失败", exc_info=True) Thread(target=send_error_email, args=(f"{data['mpoint_id']}-保存数据失败-{e}",)).start() conn.commit() mpoint_dict[data['mpoint_id']] = data['timex'] def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): if rc == 0: logger.info("Connected to MQTT broker") mqttc.subscribe(conf.MQTT_TOPIC, qos=1) logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}") else: logger.error("Failed to connect to MQTT broker") def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): topic = msg.topic if topic == conf.MQTT_TOPIC: save_items(msg.payload) def start_mqtt(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105') client.on_connect = on_connect client.on_message = on_message client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT) client.loop_forever() if __name__ == '__main__': try: engine = create_engine(conf.DATABASE_URL) logger.info("Connected to database") start_mqtt() except Exception: logger.error("异常退出", exc_info=True) Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start() raise