feat: 添加所有文件
This commit is contained in:
commit
775322b20a
|
@ -0,0 +1,4 @@
|
||||||
|
.venv/
|
||||||
|
__pycache__/
|
||||||
|
conf*.py
|
||||||
|
*.log
|
|
@ -0,0 +1,106 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from logging.handlers import RotatingFileHandler
|
||||||
|
from sqlalchemy import create_engine, Engine, text
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
from datetime import datetime
|
||||||
|
from dateutil import tz
|
||||||
|
import smtplib
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
from threading import Thread
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
|
||||||
|
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
sys.path.insert(0, CUR_DIR)
|
||||||
|
import conf
|
||||||
|
|
||||||
|
|
||||||
|
LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log')
|
||||||
|
logging.basicConfig(level=logging.WARNING,
|
||||||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(sys.stdout), # Log to console
|
||||||
|
RotatingFileHandler(LOG_FILE, maxBytes=5*1034*1024, backupCount=1, encoding="utf-8"), # Log to file
|
||||||
|
])
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
engine:Engine = None
|
||||||
|
|
||||||
|
def send_error_email(message, subject='hfnf_mqtt', ):
|
||||||
|
msg = MIMEMultipart()
|
||||||
|
msg['From'] = conf.EMAIL_HOST_USER
|
||||||
|
msg['To'] = conf.EMAIL_HOST_USER
|
||||||
|
msg['Subject'] = subject
|
||||||
|
msg.attach(MIMEText(message, 'plain'))
|
||||||
|
server = smtplib.SMTP(conf.EMAIL_HOST, conf.EMAIL_PORT)
|
||||||
|
server.starttls()
|
||||||
|
server.login(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_PASSWORD)
|
||||||
|
server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string())
|
||||||
|
server.quit()
|
||||||
|
|
||||||
|
|
||||||
|
def save_items(payload):
|
||||||
|
item_list = json.loads(payload)
|
||||||
|
sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"
|
||||||
|
with engine.connect() as conn:
|
||||||
|
data = {}
|
||||||
|
for item in item_list:
|
||||||
|
data['timex'] = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai'))
|
||||||
|
data['mpoint_id'] = item['name']
|
||||||
|
try:
|
||||||
|
val_float = float(item["value"])
|
||||||
|
data["val_float"] = val_float
|
||||||
|
data["val_str"] = None
|
||||||
|
except Exception:
|
||||||
|
data["val_float"] = None
|
||||||
|
data["val_str"] = item["value"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn.execute(text(sql_str), data)
|
||||||
|
except IntegrityError:
|
||||||
|
logger.error(f"{data['mpoint_id']}-主键冲突")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"{data['mpoint_id']}-保存数据失败", exc_info=True)
|
||||||
|
Thread(target=send_error_email, args=(f"{data['mpoint_id']}-保存数据失败-{e}",)).start()
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties):
|
||||||
|
if rc == 0:
|
||||||
|
logger.info("Connected to MQTT broker")
|
||||||
|
mqttc.subscribe(conf.MQTT_TOPIC, qos=1)
|
||||||
|
logger.info(f"Subscribed to topic: {conf.MQTT_TOPIC}")
|
||||||
|
else:
|
||||||
|
logger.error("Failed to connect to MQTT broker")
|
||||||
|
|
||||||
|
|
||||||
|
def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
|
||||||
|
topic = msg.topic
|
||||||
|
if topic == conf.MQTT_TOPIC:
|
||||||
|
save_items(msg.payload)
|
||||||
|
|
||||||
|
|
||||||
|
def start_mqtt():
|
||||||
|
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id='hfnf_105')
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT)
|
||||||
|
client.loop_forever()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
try:
|
||||||
|
engine = create_engine(conf.DATABASE_URL)
|
||||||
|
logger.info("Connected to database")
|
||||||
|
start_mqtt()
|
||||||
|
except Exception:
|
||||||
|
logger.error("异常退出", exc_info=True)
|
||||||
|
Thread(target=send_error_email, args=(f'{traceback.format_exc()}')).start()
|
||||||
|
raise
|
|
@ -0,0 +1,4 @@
|
||||||
|
paho-mqtt==2.1.0
|
||||||
|
sqlalchemy==2.0.30
|
||||||
|
psycopg2==2.9.9
|
||||||
|
python-dateutil==2.9.0
|
Loading…
Reference in New Issue