diff --git a/mqttc_test.py b/mqttc_test.py new file mode 100644 index 0000000..a248b14 --- /dev/null +++ b/mqttc_test.py @@ -0,0 +1,123 @@ +import os +import sys +import paho.mqtt.client as mqtt +import json +import logging +from logging.handlers import RotatingFileHandler +from sqlalchemy import create_engine, Engine, text +from sqlalchemy.exc import IntegrityError +from datetime import datetime +from dateutil import tz +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from threading import Thread +import traceback + + +CUR_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, CUR_DIR) +import conf + + +LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') +logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout), # Log to console + RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file + ]) +logger = logging.getLogger(__name__) + +engine:Engine = None +mpoint_dict = {} # {"mpoint_id": last_timex} + +def send_error_email(message, subject='hfnf_mqtt', ): + msg = MIMEMultipart() + msg['From'] = conf.EMAIL_HOST_USER + msg['To'] = conf.EMAIL_HOST_USER + msg['Subject'] = subject + msg.attach(MIMEText(message, 'plain')) + server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT) + server.starttls() + server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD) + server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string()) + server.quit() + + +def save_items(payload): + item_list = json.loads(payload) + sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" + save_list = [] + for item in item_list: + timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) + mpoint_id = item['name'] + val_float = None + val_str = None + last_timex: datetime = mpoint_dict.get(mpoint_id, None) + if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( + last_timex and last_timex.minute == timex.minute): + continue + else: + try: + val_float = float(item["value"]) + except Exception: + val_str = item["value"] + save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str}) + + if save_list: + with engine.connect() as conn: + for item in save_list: + is_ok = False + try: + conn.execute(text(sql_str), item) + is_ok = True + except IntegrityError: + logger.error(f"{item['mpoint_id']}-主键冲突") + except Exception as e: + logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True) + Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start() + + if is_ok: + conn.commit() + mpoint_dict[item['mpoint_id']] = item['timex'] + + + + +def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): + if rc == 0: + logger.info("Connected to MQTT broker") + mqttc.subscribe(conf.MQTT_TOPIC, qos=1) + logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}") + else: + logger.error("Failed to connect to MQTT broker") + + +def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): + topic = msg.topic + if topic == conf.MQTT_TOPIC: + pass + +def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties): + logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}") + + +def start_mqtt(): + client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105_test') + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT) + client.loop_forever() + + +if __name__ == '__main__': + try: + engine = create_engine(conf.DATABASE_URL) + logger.info("Connected to database") + start_mqtt() + except Exception: + logger.error("异常退出", exc_info=True) + Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start() + raise \ No newline at end of file