import requests import os import sys import django import json import logging import time import threading CUR_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(CUR_DIR) sys.path.insert(0, BASE_DIR) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") django.setup() from apps.enm.services import insert_mplogx_item from django.utils import timezone from apps.utils.tasks import send_mail_task from datetime import datetime, timedelta CUR_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(CUR_DIR) sys.path.insert(0, BASE_DIR) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") django.setup() SERVER = '192.168.1.37' PORT = '8089' PATH = f'http://{SERVER}:{PORT}/GetTagList' SERVER2 = '192.168.1.43' PORT2 = '8081' PATH2 = f'http://{SERVER2}:{PORT2}/GetTagList' #MIN_LIST = set(range(0,61,5)) MIN_LIST = [0] myLogger = logging.getLogger("log") class MailController: def __init__(self): self.last_sent_time = None self.interval = timedelta(days=1) self.lock = threading.Lock() self._is_sending = False def should_send_mail(self): now = datetime.now() # thread_name = threading.current_thread().name with self.lock: if self._is_sending: # myLogger.info(f"线程 {thread_name}: 邮件正在发送中,跳过") return False if self.last_sent_time is None: # myLogger.info(f"线程 {thread_name}: 首次发送邮件") self._is_sending = True return True time_since_last = now - self.last_sent_time if time_since_last > self.interval: # myLogger.info(f"线程 {thread_name}: 距离上次发送已超过间隔,允许发送") self._is_sending = True return True else: # myLogger.info(f"线程 {thread_name}: 距离上次发送不足间隔,跳过") return False def mark_as_sent(self): with self.lock: self.last_sent_time = datetime.now() self._is_sending = False myLogger.info("邮件发送状态已重置") mail_controller = MailController() def send_error_notification(error_message): """ 发送错误通知 """ if mail_controller.should_send_mail(): try: send_mail_task.delay(subject='insert_kvt_error', message=str(error_message)) myLogger.error(f"请求组态王失败:{str(error_message)}") except Exception as e: myLogger.exception(f"发送错误邮件失败: {e}") finally: mail_controller.mark_as_sent() def fetch_data(timex, enp_mpoint_dict, path): """ 从数据库转存到超表 """ response = None try: response = requests.get(path, timeout=5) response.raise_for_status() # 如果响应码不是 200,将触发异常 except requests.RequestException as e: send_error_notification(e) if response is None: return try: lines = response.text json_line = [line.strip() for line in lines.split('\n') if line.strip() ] current_object = [] # 将碎片分组为完整的 JSON 对象 for line in json_line: current_object.append(line.strip()) # 将当前行加入对象 if line.strip() == '}': # 遇到结束大括号时 try: obj_str = ' '.join(current_object).replace(',}', '}') obj_dict = json.loads(obj_str) insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict) except json.JSONDecodeError as e: send_error_notification(e) current_object = [] # 重置,准备处理下一个对象 except Exception as e: send_error_notification(e) def get_data(): last_triggered = None while True: now = timezone.now() now = now.replace(microsecond=0) if now.second in MIN_LIST: if last_triggered != now: last_triggered = now enp_mpoint_dict= {} threads = [ threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH), daemon=True), threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH2), daemon=True) ] for t in threads: t.start() for t in threads: t.join(timeout=10) # 设置超时防止线程挂起 time.sleep(0.5) if __name__ == '__main__': get_data()