feat : 修改无推送报警事件
This commit is contained in:
parent
21cb720d2e
commit
712b26be98
24
mqttc.py
24
mqttc.py
|
|
@ -19,6 +19,7 @@ import queue
|
|||
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.insert(0, CUR_DIR)
|
||||
import conf
|
||||
import time
|
||||
|
||||
|
||||
LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mqttc.log')
|
||||
|
|
@ -30,6 +31,7 @@ logging.basicConfig(level=getattr(logging, conf.LOG_LEVEL.upper(), logging.INFO)
|
|||
])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
engine:Engine = None
|
||||
mpoint_dict = {} # {"mpoint_id": last_timex}
|
||||
msg_queue: queue.Queue = queue.Queue()
|
||||
|
|
@ -46,12 +48,24 @@ def send_error_email(message, subject='hfnf_mqtt', ):
|
|||
server.sendmail(conf.EMAIL_HOST_USER, [conf.EMAIL_HOST_USER, conf.EMAIL_HOST_USER2], msg.as_string())
|
||||
server.quit()
|
||||
|
||||
def worker():
|
||||
while True:
|
||||
msg = msg_queue.get()
|
||||
if msg is not None:
|
||||
save_items(msg)
|
||||
|
||||
def worker():
|
||||
last_message_time = time.time() # 最后一次收到消息时间
|
||||
last_alert_time = 0 # 上一次报警时间
|
||||
while True:
|
||||
try:
|
||||
msg = msg_queue.get(timeout=60)
|
||||
if msg is not None:
|
||||
save_items(msg)
|
||||
last_message_time = time.time()
|
||||
last_alert_time = 0
|
||||
|
||||
except queue.Empty:
|
||||
now = time.time()
|
||||
if now - last_message_time > 600 and now - last_alert_time > 600:
|
||||
Thread(target=send_error_email, args=(f"time-out 10min hfnf_mqtt-无数据推送",)).start()
|
||||
last_alert_time = now
|
||||
|
||||
def save_items(payload):
|
||||
item_list = json.loads(payload)
|
||||
sql_str = "INSERT INTO mplogx (timex, mpoint_id, val_float, val_str) VALUES (:timex, :mpoint_id, :val_float, :val_str)"
|
||||
|
|
|
|||
Loading…
Reference in New Issue