From 5fb7bb47423e2e8eb0e340cc367febc06d7b1aab Mon Sep 17 00:00:00 2001 From: caoqianming Date: Fri, 28 Jun 2024 16:24:05 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=87=E7=94=A8=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E6=B6=88=E8=B4=B9=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mqttc.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mqttc.py b/mqttc.py index fae7223..e664902 100644 --- a/mqttc.py +++ b/mqttc.py @@ -13,6 +13,7 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from threading import Thread import traceback +import queue CUR_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -31,6 +32,7 @@ logger = logging.getLogger(__name__) engine:Engine = None mpoint_dict = {} # {"mpoint_id": last_timex} +msg_queue: queue.Queue = queue.Queue() def send_error_email(message, subject='hfnf_mqtt', ): msg = MIMEMultipart() @@ -44,6 +46,11 @@ def send_error_email(message, subject='hfnf_mqtt', ): server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string()) server.quit() +def worker(): + while True: + msg = msg_queue.get() + if msg is not None: + save_items(msg) def save_items(payload): item_list = json.loads(payload) @@ -97,7 +104,7 @@ def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): topic = msg.topic if topic == conf.MQTT_TOPIC: - save_items(msg.payload) + msg_queue.put(msg.payload) def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties): logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}") @@ -116,6 +123,7 @@ if __name__ == '__main__': try: engine = create_engine(conf.DATABASE_URL) logger.info("Connected to database") + Thread(target=worker, daemon=True).start() start_mqtt() except Exception: logger.error("异常退出", exc_info=True)