From 775322b20a888c6c19d1d834c39493834002f786 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Wed, 29 May 2024 08:43:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=89=80=E6=9C=89?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 ++ mqttc.py | 106 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 ++ ruff.toml | 2 + 4 files changed, 116 insertions(+) create mode 100644 .gitignore create mode 100644 mqttc.py create mode 100644 requirements.txt create mode 100644 ruff.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d1cd86c --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.venv/ +__pycache__/ +conf*.py +*.log \ No newline at end of file diff --git a/mqttc.py b/mqttc.py new file mode 100644 index 0000000..149efbd --- /dev/null +++ b/mqttc.py @@ -0,0 +1,106 @@ +import os +import sys +import paho.mqtt.client as mqtt +import json +import logging +from logging.handlers import RotatingFileHandler +from sqlalchemy import create_engine, Engine, text +from sqlalchemy.exc import IntegrityError +from datetime import datetime +from dateutil import tz +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from threading import Thread +import traceback + + +CUR_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, CUR_DIR) +import conf + + +LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log') +logging.basicConfig(level=logging.WARNING, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout), # Log to console + RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file + ]) +logger = logging.getLogger(__name__) + +engine:Engine = None + +def send_error_email(message, subject='hfnf_mqtt', ): + msg = MIMEMultipart() + msg['From'] = conf.EMAIL_HOST_USER + msg['To'] = conf.EMAIL_HOST_USER + msg['Subject'] = subject + msg.attach(MIMEText(message, 'plain')) + server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT) + server.starttls() + server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD) + server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string()) + server.quit() + + +def save_items(payload): + item_list = json.loads(payload) + sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" + with engine.connect() as conn: + data = {} + for item in item_list: + data['timex'] = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) + data['mpoint_id'] = item['name'] + try: + val_float = float(item["value"]) + data["val_float"] = val_float + data["val_str"] = None + except Exception: + data["val_float"] = None + data["val_str"] = item["value"] + + try: + conn.execute(text(sql_str), data) + except IntegrityError: + logger.error(f"{data['mpoint_id']}-主键冲突") + except Exception as e: + logger.error(f"{data['mpoint_id']}-保存数据失败", exc_info=True) + Thread(target=send_error_email, args=(f"{data['mpoint_id']}-保存数据失败-{e}",)).start() + conn.commit() + + + + +def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): + if rc == 0: + logger.info("Connected to MQTT broker") + mqttc.subscribe(conf.MQTT_TOPIC, qos=1) + logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}") + else: + logger.error("Failed to connect to MQTT broker") + + +def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): + topic = msg.topic + if topic == conf.MQTT_TOPIC: + save_items(msg.payload) + + +def start_mqtt(): + client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105') + client.on_connect = on_connect + client.on_message = on_message + client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT) + client.loop_forever() + + +if __name__ == '__main__': + try: + engine = create_engine(conf.DATABASE_URL) + logger.info("Connected to database") + start_mqtt() + except Exception: + logger.error("异常退出", exc_info=True) + Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start() + raise \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..69fd6c1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +paho-mqtt==2.1.0 +sqlalchemy==2.0.30 +psycopg2==2.9.9 +python-dateutil==2.9.0 \ No newline at end of file diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..997d320 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,2 @@ +line-length = 200 +fix = true \ No newline at end of file