feat: 优化save_items

This commit is contained in:
caoqianming 2024-06-13 13:18:40 +08:00
parent 113660ca99
commit 7ea7c83fd8
1 changed files with 30 additions and 23 deletions

View File

@ -48,32 +48,39 @@ def send_error_email(message, subject='hfnf_mqtt', ):
def save_items(payload):
item_list = json.loads(payload)
sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"
with engine.connect() as conn:
data = {}
for item in item_list:
data['timex'] = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai'))
data['mpoint_id'] = item['name']
last_timex: datetime = mpoint_dict.get(data['mpoint_id'], None)
if data["timex"].minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or (
last_timex and last_timex.minute == data["timex"].minute):
continue
save_list = []
for item in item_list:
timex = datetime.strptime(item['time'], "%Y%m%d%H%M%S").replace(tzinfo=tz.gettz('Asia/Shanghai'))
mpoint_id = item['name']
val_float = None
val_str = None
last_timex: datetime = mpoint_dict.get(mpoint_id, None)
if timex.minute not in getattr(conf, "SAVE_MINUTES", [2, 7, 12, 17, 22, 27, 32, 37, 42, 47, 52, 57]) or (
last_timex and last_timex.minute == timex.minute):
continue
else:
try:
val_float = float(item["value"])
data["val_float"] = val_float
data["val_str"] = None
except Exception:
data["val_float"] = None
data["val_str"] = item["value"]
val_str = item["value"]
save_list.append({"timex": timex, "mpoint_id": mpoint_id, "val_float": val_float, "val_str": val_str})
try:
conn.execute(text(sql_str), data)
except IntegrityError:
logger.error(f"{data['mpoint_id']}-主键冲突")
except Exception as e:
logger.error(f"{data['mpoint_id']}-保存数据失败", exc_info=True)
Thread(target=send_error_email, args=(f"{data['mpoint_id']}-保存数据失败-{e}",)).start()
conn.commit()
mpoint_dict[data['mpoint_id']] = data['timex']
if save_list:
with engine.connect() as conn:
for item in save_list:
is_ok = False
try:
conn.execute(text(sql_str), item)
is_ok = True
except IntegrityError:
logger.error(f"{item['mpoint_id']}-主键冲突")
except Exception as e:
logger.error(f"{item['mpoint_id']}-保存数据失败", exc_info=True)
Thread(target=send_error_email, args=(f"{item['mpoint_id']}-保存数据失败-{e}",)).start()
if is_ok:
conn.commit()
mpoint_dict[item['mpoint_id']] = item['timex']
@ -101,7 +108,7 @@ def start_mqtt():
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT, keepalive=60*5)
client.connect(host=conf.MQTT_HOST, port=conf.MQTT_PORT)
client.loop_forever()