feat: 玻纤拉丝采集问题
This commit is contained in:
parent
e5a9f77f3d
commit
f6720ad7ce
|
@ -0,0 +1,143 @@
|
|||
import requests
|
||||
import os
|
||||
import sys
|
||||
import django
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
|
||||
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
BASE_DIR = os.path.dirname(CUR_DIR)
|
||||
sys.path.insert(0, BASE_DIR)
|
||||
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
|
||||
django.setup()
|
||||
|
||||
from apps.enm.services import insert_mplogx_item
|
||||
from django.utils import timezone
|
||||
from apps.utils.tasks import send_mail_task
|
||||
from datetime import datetime, timedelta
|
||||
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
BASE_DIR = os.path.dirname(CUR_DIR)
|
||||
sys.path.insert(0, BASE_DIR)
|
||||
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
|
||||
django.setup()
|
||||
|
||||
SERVER = '192.168.1.37'
|
||||
PORT = '8089'
|
||||
PATH = f'http://{SERVER}:{PORT}/GetTagList'
|
||||
|
||||
SERVER2 = '192.168.1.43'
|
||||
PORT2 = '8081'
|
||||
PATH2 = f'http://{SERVER2}:{PORT2}/GetTagList'
|
||||
|
||||
#MIN_LIST = set(range(0,61,5))
|
||||
MIN_LIST = [0]
|
||||
myLogger = logging.getLogger("log")
|
||||
|
||||
|
||||
class MailController:
|
||||
def __init__(self):
|
||||
self.last_sent_time = None
|
||||
self.interval = timedelta(days=1)
|
||||
self.lock = threading.Lock()
|
||||
self._is_sending = False
|
||||
|
||||
def should_send_mail(self):
|
||||
now = datetime.now()
|
||||
# thread_name = threading.current_thread().name
|
||||
with self.lock:
|
||||
if self._is_sending:
|
||||
# myLogger.info(f"线程 {thread_name}: 邮件正在发送中,跳过")
|
||||
return False
|
||||
if self.last_sent_time is None:
|
||||
# myLogger.info(f"线程 {thread_name}: 首次发送邮件")
|
||||
self._is_sending = True
|
||||
return True
|
||||
time_since_last = now - self.last_sent_time
|
||||
if time_since_last > self.interval:
|
||||
# myLogger.info(f"线程 {thread_name}: 距离上次发送已超过间隔,允许发送")
|
||||
self._is_sending = True
|
||||
return True
|
||||
else:
|
||||
# myLogger.info(f"线程 {thread_name}: 距离上次发送不足间隔,跳过")
|
||||
return False
|
||||
|
||||
def mark_as_sent(self):
|
||||
with self.lock:
|
||||
self.last_sent_time = datetime.now()
|
||||
self._is_sending = False
|
||||
myLogger.info("邮件发送状态已重置")
|
||||
|
||||
|
||||
mail_controller = MailController()
|
||||
|
||||
|
||||
def send_error_notification(error_message):
|
||||
"""
|
||||
发送错误通知
|
||||
"""
|
||||
if mail_controller.should_send_mail():
|
||||
try:
|
||||
send_mail_task.delay(subject='insert_kvt_error', message=str(error_message))
|
||||
myLogger.error(f"请求组态王失败:{str(error_message)}")
|
||||
except Exception as e:
|
||||
myLogger.exception(f"发送错误邮件失败: {e}")
|
||||
finally:
|
||||
mail_controller.mark_as_sent()
|
||||
|
||||
def fetch_data(timex, enp_mpoint_dict, path):
|
||||
"""
|
||||
从数据库转存到超表
|
||||
"""
|
||||
response = None
|
||||
try:
|
||||
response = requests.get(path, timeout=5)
|
||||
response.raise_for_status() # 如果响应码不是 200,将触发异常
|
||||
except requests.RequestException as e:
|
||||
send_error_notification(e)
|
||||
|
||||
if response is None:
|
||||
return
|
||||
try:
|
||||
lines = response.text
|
||||
json_line = [line.strip() for line in lines.split('\n') if line.strip() ]
|
||||
current_object = []
|
||||
# 将碎片分组为完整的 JSON 对象
|
||||
for line in json_line:
|
||||
current_object.append(line.strip()) # 将当前行加入对象
|
||||
if line.strip() == '}': # 遇到结束大括号时
|
||||
try:
|
||||
obj_str = ' '.join(current_object).replace(',}', '}')
|
||||
obj_dict = json.loads(obj_str)
|
||||
insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict)
|
||||
except json.JSONDecodeError as e:
|
||||
send_error_notification(e)
|
||||
current_object = [] # 重置,准备处理下一个对象
|
||||
except Exception as e:
|
||||
send_error_notification(e)
|
||||
|
||||
def get_data():
|
||||
last_triggered = None
|
||||
while True:
|
||||
now = timezone.now()
|
||||
now = now.replace(microsecond=0)
|
||||
if now.second in MIN_LIST:
|
||||
if last_triggered != now:
|
||||
last_triggered = now
|
||||
enp_mpoint_dict= {}
|
||||
threads = [
|
||||
threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH), daemon=True),
|
||||
threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH2), daemon=True)
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(timeout=10) # 设置超时防止线程挂起
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
if __name__ == '__main__':
|
||||
get_data()
|
Loading…
Reference in New Issue