mtc/mqttc.py

131 lines
4.7 KiB
Python

import os
import sys
import paho.mqtt.client as mqtt
import json
import logging
from logging.handlers import RotatingFileHandler
from sqlalchemy import create_engine, Engine, text
from sqlalchemy.exc import IntegrityError
from datetime import datetime
from dateutil import tz
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from threading import Thread
import traceback
import queue
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, CUR_DIR)
import conf
LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log')
logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout), # Log to console
RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file
])
logger = logging.getLogger(__name__)
engine:Engine = None
mpoint_dict = {} # {"mpoint_id": last_timex}
msg_queue: queue.Queue = queue.Queue()
def send_error_email(message, subject='hfnf_mqtt', ):
msg = MIMEMultipart()
msg['From'] = conf.EMAIL_HOST_USER
msg['To'] = conf.EMAIL_HOST_USER
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT)
server.starttls()
server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD)
server.sendmail(conf.EMAIL_HOST_USER, [conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER2], msg.as_string())
server.quit()
def worker():
while True:
msg = msg_queue.get()
if msg is not None:
save_items(msg)
def save_items(payload):
item_list = json.loads(payload)
sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"
save_list = []
for item in item_list:
timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai'))
mpoint_id = item['name']
val_float = None
val_str = None
last_timex: datetime = mpoint_dict.get(mpoint_id, None)
if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or (
last_timex and last_timex.minute == timex.minute):
continue
else:
try:
val_float = float(item["value"])
except Exception:
val_str = item["value"]
save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str})
if save_list:
with engine.connect() as conn:
for item in save_list:
is_ok = False
try:
conn.execute(text(sql_str), item)
is_ok = True
except IntegrityError:
logger.error(f"{item['mpoint_id']}-主键冲突")
except Exception as e:
logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True)
Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start()
if is_ok:
conn.commit()
mpoint_dict[item['mpoint_id']] = item['timex']
def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties):
if rc == 0:
logger.info("Connected to MQTT broker")
mqttc.subscribe(conf.MQTT_TOPIC, qos=1)
logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}")
else:
logger.error("Failed to connect to MQTT broker")
def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
topic = msg.topic
if topic == conf.MQTT_TOPIC:
msg_queue.put(msg.payload)
def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties):
logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}")
def start_mqtt():
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105')
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT)
client.loop_forever()
if __name__ == '__main__':
try:
engine = create_engine(conf.DATABASE_URL)
logger.info("Connected to database")
Thread(target=worker, daemon=True).start()
start_mqtt()
except Exception:
logger.error("异常退出", exc_info=True)
Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start()
raise