144 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
	
| import requests
 | ||
| import os
 | ||
| import sys
 | ||
| import django
 | ||
| import json
 | ||
| import logging
 | ||
| import time
 | ||
| import threading
 | ||
| 
 | ||
| CUR_DIR = os.path.dirname(os.path.abspath(__file__))
 | ||
| BASE_DIR = os.path.dirname(CUR_DIR)
 | ||
| sys.path.insert(0, BASE_DIR)
 | ||
| 
 | ||
| os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
 | ||
| django.setup()
 | ||
| 
 | ||
| from apps.enm.services import insert_mplogx_item
 | ||
| from django.utils import timezone
 | ||
| from apps.utils.tasks import send_mail_task
 | ||
| from datetime import datetime, timedelta
 | ||
| CUR_DIR = os.path.dirname(os.path.abspath(__file__))
 | ||
| BASE_DIR = os.path.dirname(CUR_DIR)
 | ||
| sys.path.insert(0, BASE_DIR)
 | ||
| 
 | ||
| os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
 | ||
| django.setup()
 | ||
| 
 | ||
| SERVER = '192.168.1.37'
 | ||
| PORT = '8089'
 | ||
| PATH = f'http://{SERVER}:{PORT}/GetTagList'
 | ||
| 
 | ||
| SERVER2 = '192.168.1.43'
 | ||
| PORT2 = '8081'
 | ||
| PATH2 = f'http://{SERVER2}:{PORT2}/GetTagList'
 | ||
| 
 | ||
| #MIN_LIST = set(range(0,61,5))
 | ||
| MIN_LIST = [0]
 | ||
| myLogger = logging.getLogger("log")
 | ||
| 
 | ||
| 
 | ||
| class MailController:
 | ||
|     def __init__(self):
 | ||
|         self.last_sent_time = None
 | ||
|         self.interval = timedelta(days=1)
 | ||
|         self.lock = threading.Lock()
 | ||
|         self._is_sending = False
 | ||
| 
 | ||
|     def should_send_mail(self):
 | ||
|         now = datetime.now()
 | ||
|         # thread_name = threading.current_thread().name
 | ||
|         with self.lock:
 | ||
|             if self._is_sending:
 | ||
|                 # myLogger.info(f"线程 {thread_name}: 邮件正在发送中,跳过")
 | ||
|                 return False
 | ||
|             if self.last_sent_time is None:
 | ||
|                 # myLogger.info(f"线程 {thread_name}: 首次发送邮件")
 | ||
|                 self._is_sending = True
 | ||
|                 return True
 | ||
|             time_since_last = now - self.last_sent_time
 | ||
|             if time_since_last > self.interval:
 | ||
|                 # myLogger.info(f"线程 {thread_name}: 距离上次发送已超过间隔,允许发送")
 | ||
|                 self._is_sending = True
 | ||
|                 return True
 | ||
|             else:
 | ||
|                 # myLogger.info(f"线程 {thread_name}: 距离上次发送不足间隔,跳过")
 | ||
|                 return False
 | ||
| 
 | ||
|     def mark_as_sent(self):
 | ||
|         with self.lock:
 | ||
|             self.last_sent_time = datetime.now()
 | ||
|             self._is_sending = False
 | ||
|             myLogger.info("邮件发送状态已重置")
 | ||
| 
 | ||
| 
 | ||
| mail_controller = MailController()
 | ||
| 
 | ||
| 
 | ||
| def send_error_notification(error_message):
 | ||
|     """
 | ||
|     发送错误通知
 | ||
|     """
 | ||
|     if mail_controller.should_send_mail():
 | ||
|         try:
 | ||
|             send_mail_task.delay(subject='insert_kvt_error', message=str(error_message))
 | ||
|             myLogger.error(f"请求组态王失败:{str(error_message)}")
 | ||
|         except Exception as e:
 | ||
|             myLogger.exception(f"发送错误邮件失败: {e}")
 | ||
|         finally:
 | ||
|             mail_controller.mark_as_sent()
 | ||
| 
 | ||
| def fetch_data(timex, enp_mpoint_dict, path):
 | ||
|     """
 | ||
|     从数据库转存到超表
 | ||
|     """
 | ||
|     response = None  
 | ||
|     try:
 | ||
|         response = requests.get(path, timeout=5)
 | ||
|         response.raise_for_status()  # 如果响应码不是 200,将触发异常
 | ||
|     except requests.RequestException as e:
 | ||
|         send_error_notification(e)
 | ||
| 
 | ||
|     if response is None:
 | ||
|         return
 | ||
|     try:
 | ||
|         lines = response.text
 | ||
|         json_line = [line.strip() for line in lines.split('\n') if line.strip() ]
 | ||
|         current_object = []    
 | ||
|         # 将碎片分组为完整的 JSON 对象
 | ||
|         for line in json_line:
 | ||
|             current_object.append(line.strip())  # 将当前行加入对象
 | ||
|             if line.strip() == '}':  # 遇到结束大括号时
 | ||
|                 try:
 | ||
|                     obj_str = ' '.join(current_object).replace(',}', '}')
 | ||
|                     obj_dict = json.loads(obj_str)  
 | ||
|                     insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict)
 | ||
|                 except json.JSONDecodeError as e:
 | ||
|                     send_error_notification(e)
 | ||
|                 current_object = []  # 重置,准备处理下一个对象
 | ||
|     except Exception as e:
 | ||
|         send_error_notification(e)
 | ||
| 
 | ||
| def get_data():
 | ||
|     last_triggered = None 
 | ||
|     while True:
 | ||
|         now = timezone.now()
 | ||
|         now = now.replace(microsecond=0)
 | ||
|         if now.second in MIN_LIST:
 | ||
|             if last_triggered != now:
 | ||
|                 last_triggered = now
 | ||
|                 enp_mpoint_dict= {}
 | ||
|                 threads = [
 | ||
|                     threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH), daemon=True),
 | ||
|                     threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH2), daemon=True)
 | ||
|                 ]
 | ||
|                 for t in threads:
 | ||
|                     t.start()
 | ||
|                 for t in threads:
 | ||
|                     t.join(timeout=10)  # 设置超时防止线程挂起
 | ||
|             
 | ||
|         time.sleep(0.5)
 | ||
| 
 | ||
| if __name__ == '__main__':
 | ||
|     get_data()
 |