From 7ea7c83fd8959a91bb454ee82fda2c5ed3475596 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 13 Jun 2024 13:18:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96save=5Fitems?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mqttc.py | 53 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/mqttc.py b/mqttc.py index 86b9a73..fae7223 100644 --- a/mqttc.py +++ b/mqttc.py @@ -48,32 +48,39 @@ def send_error_email(message, subject='hfnf_mqtt', ): def save_items(payload): item_list = json.loads(payload) sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)" - with engine.connect() as conn: - data = {} - for item in item_list: - data['timex'] = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) - data['mpoint_id'] = item['name'] - last_timex: datetime = mpoint_dict.get(data['mpoint_id'], None) - if data["timex"].minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( - last_timex and last_timex.minute == data["timex"].minute): - continue + save_list = [] + for item in item_list: + timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai')) + mpoint_id = item['name'] + val_float = None + val_str = None + last_timex: datetime = mpoint_dict.get(mpoint_id, None) + if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or ( + last_timex and last_timex.minute == timex.minute): + continue + else: try: val_float = float(item["value"]) - data["val_float"] = val_float - data["val_str"] = None except Exception: - data["val_float"] = None - data["val_str"] = item["value"] + val_str = item["value"] + save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str}) - try: - conn.execute(text(sql_str), data) - except IntegrityError: - logger.error(f"{data['mpoint_id']}-主键冲突") - except Exception as e: - logger.error(f"{data['mpoint_id']}-保存数据失败", exc_info=True) - Thread(target=send_error_email, args=(f"{data['mpoint_id']}-保存数据失败-{e}",)).start() - conn.commit() - mpoint_dict[data['mpoint_id']] = data['timex'] + if save_list: + with engine.connect() as conn: + for item in save_list: + is_ok = False + try: + conn.execute(text(sql_str), item) + is_ok = True + except IntegrityError: + logger.error(f"{item['mpoint_id']}-主键冲突") + except Exception as e: + logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True) + Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start() + + if is_ok: + conn.commit() + mpoint_dict[item['mpoint_id']] = item['timex'] @@ -101,7 +108,7 @@ def start_mqtt(): client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect - client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT, keepalive=60*5) + client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT) client.loop_forever()