From f6720ad7ce5fedc2a48065531571f59823a3fe20 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Thu, 18 Sep 2025 13:20:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=8E=BB=E7=BA=A4=E6=8B=89=E4=B8=9D?= =?UTF-8?q?=E9=87=87=E9=9B=86=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- out_service/insert_kvt.py | 143 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 out_service/insert_kvt.py diff --git a/out_service/insert_kvt.py b/out_service/insert_kvt.py new file mode 100644 index 00000000..0fb3f09f --- /dev/null +++ b/out_service/insert_kvt.py @@ -0,0 +1,143 @@ +import requests +import os +import sys +import django +import json +import logging +import time +import threading + +CUR_DIR = os.path.dirname(os.path.abspath(__file__)) +BASE_DIR = os.path.dirname(CUR_DIR) +sys.path.insert(0, BASE_DIR) + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") +django.setup() + +from apps.enm.services import insert_mplogx_item +from django.utils import timezone +from apps.utils.tasks import send_mail_task +from datetime import datetime, timedelta +CUR_DIR = os.path.dirname(os.path.abspath(__file__)) +BASE_DIR = os.path.dirname(CUR_DIR) +sys.path.insert(0, BASE_DIR) + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") +django.setup() + +SERVER = '192.168.1.37' +PORT = '8089' +PATH = f'http://{SERVER}:{PORT}/GetTagList' + +SERVER2 = '192.168.1.43' +PORT2 = '8081' +PATH2 = f'http://{SERVER2}:{PORT2}/GetTagList' + +#MIN_LIST = set(range(0,61,5)) +MIN_LIST = [0] +myLogger = logging.getLogger("log") + + +class MailController: + def __init__(self): + self.last_sent_time = None + self.interval = timedelta(days=1) + self.lock = threading.Lock() + self._is_sending = False + + def should_send_mail(self): + now = datetime.now() + # thread_name = threading.current_thread().name + with self.lock: + if self._is_sending: + # myLogger.info(f"线程 {thread_name}: 邮件正在发送中,跳过") + return False + if self.last_sent_time is None: + # myLogger.info(f"线程 {thread_name}: 首次发送邮件") + self._is_sending = True + return True + time_since_last = now - self.last_sent_time + if time_since_last > self.interval: + # myLogger.info(f"线程 {thread_name}: 距离上次发送已超过间隔,允许发送") + self._is_sending = True + return True + else: + # myLogger.info(f"线程 {thread_name}: 距离上次发送不足间隔,跳过") + return False + + def mark_as_sent(self): + with self.lock: + self.last_sent_time = datetime.now() + self._is_sending = False + myLogger.info("邮件发送状态已重置") + + +mail_controller = MailController() + + +def send_error_notification(error_message): + """ + 发送错误通知 + """ + if mail_controller.should_send_mail(): + try: + send_mail_task.delay(subject='insert_kvt_error', message=str(error_message)) + myLogger.error(f"请求组态王失败:{str(error_message)}") + except Exception as e: + myLogger.exception(f"发送错误邮件失败: {e}") + finally: + mail_controller.mark_as_sent() + +def fetch_data(timex, enp_mpoint_dict, path): + """ + 从数据库转存到超表 + """ + response = None + try: + response = requests.get(path, timeout=5) + response.raise_for_status() # 如果响应码不是 200,将触发异常 + except requests.RequestException as e: + send_error_notification(e) + + if response is None: + return + try: + lines = response.text + json_line = [line.strip() for line in lines.split('\n') if line.strip() ] + current_object = [] + # 将碎片分组为完整的 JSON 对象 + for line in json_line: + current_object.append(line.strip()) # 将当前行加入对象 + if line.strip() == '}': # 遇到结束大括号时 + try: + obj_str = ' '.join(current_object).replace(',}', '}') + obj_dict = json.loads(obj_str) + insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict) + except json.JSONDecodeError as e: + send_error_notification(e) + current_object = [] # 重置,准备处理下一个对象 + except Exception as e: + send_error_notification(e) + +def get_data(): + last_triggered = None + while True: + now = timezone.now() + now = now.replace(microsecond=0) + if now.second in MIN_LIST: + if last_triggered != now: + last_triggered = now + enp_mpoint_dict= {} + threads = [ + threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH), daemon=True), + threading.Thread(target=fetch_data, args=(now, enp_mpoint_dict, PATH2), daemon=True) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) # 设置超时防止线程挂起 + + time.sleep(0.5) + +if __name__ == '__main__': + get_data()