diff --git a/mqttc.py b/mqttc.py index fae7223..e664902 100644 --- a/mqttc.py +++ b/mqttc.py @@ -13,6 +13,7 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from threading import Thread import traceback +import queue CUR_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -31,6 +32,7 @@ logger = logging.getLogger(__name__) engine:Engine = None mpoint_dict = {} # {"mpoint_id": last_timex} +msg_queue: queue.Queue = queue.Queue() def send_error_email(message, subject='hfnf_mqtt', ): msg = MIMEMultipart() @@ -44,6 +46,11 @@ def send_error_email(message, subject='hfnf_mqtt', ): server.sendmail(conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER, msg.as_string()) server.quit() +def worker(): + while True: + msg = msg_queue.get() + if msg is not None: + save_items(msg) def save_items(payload): item_list = json.loads(payload) @@ -97,7 +104,7 @@ def on_connect(mqttc: mqtt.Client, userdata, flags, rc, properties): def on_message(mqttc: mqtt.Client, userdata, msg: mqtt.MQTTMessage): topic = msg.topic if topic == conf.MQTT_TOPIC: - save_items(msg.payload) + msg_queue.put(msg.payload) def on_disconnect(mqttc: mqtt.Client, userdata, disconnect_flags, reason_code, properties): logger.error(f"Disconnected from MQTT broker__:{disconnect_flags}, {reason_code}") @@ -116,6 +123,7 @@ if __name__ == '__main__': try: engine = create_engine(conf.DATABASE_URL) logger.info("Connected to database") + Thread(target=worker, daemon=True).start() start_mqtt() except Exception: logger.error("异常退出", exc_info=True)