feat: 添加一个测试的mqtt

This commit is contained in:
caoqianming 2024-06-13 13:33:57 +08:00
parent 7ea7c83fd8
commit 576b4c2dfa
1 changed files with 123 additions and 0 deletions

123
mqttc_test.py Normal file
View File

@ -0,0 +1,123 @@
import os
import sys
import paho.mqtt.client as mqtt
import json
import logging
from logging.handlers import RotatingFileHandler
from sqlalchemy import create_engine, Engine, text
from sqlalchemy.exc import IntegrityError
from datetime import datetime
from dateutil import tz
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from threading import Thread
import traceback
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, CUR_DIR)
import conf
LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log')
logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout), # Log to console
RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file
])
logger = logging.getLogger(__name__)
engine:Engine = None
mpoint_dict = {} # {"mpoint_id": last_timex}
def send_error_email(message, subject='hfnf_mqtt', ):
msg = MIMEMultipart()
msg['From'] = conf.EMAIL_HOST_USER
msg['To'] = conf.EMAIL_HOST_USER
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT)
server.starttls()
server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD)
server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string())
server.quit()
def save_items(payload):
item_list = json.loads(payload)
sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"
save_list = []
for item in item_list:
timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai'))
mpoint_id = item['name']
val_float = None
val_str = None
last_timex: datetime = mpoint_dict.get(mpoint_id, None)
if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or (
last_timex and last_timex.minute == timex.minute):
continue
else:
try:
val_float = float(item["value"])
except Exception:
val_str = item["value"]
save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str})
if save_list:
with engine.connect() as conn:
for item in save_list:
is_ok = False
try:
conn.execute(text(sql_str), item)
is_ok = True
except IntegrityError:
logger.error(f"{item['mpoint_id']}-主键冲突")
except Exception as e:
logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True)
Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start()
if is_ok:
conn.commit()
mpoint_dict[item['mpoint_id']] = item['timex']
def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties):
if rc == 0:
logger.info("Connected to MQTT broker")
mqttc.subscribe(conf.MQTT_TOPIC, qos=1)
logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}")
else:
logger.error("Failed to connect to MQTT broker")
def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
topic = msg.topic
if topic == conf.MQTT_TOPIC:
pass
def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties):
logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}")
def start_mqtt():
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105_test')
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT)
client.loop_forever()
if __name__ == '__main__':
try:
engine = create_engine(conf.DATABASE_URL)
logger.info("Connected to database")
start_mqtt()
except Exception:
logger.error("异常退出", exc_info=True)
Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start()
raise