diff --git a/apps/ichat/utils.py b/apps/ichat/utils.py index 1ce4c822..cc2f8cc5 100644 --- a/apps/ichat/utils.py +++ b/apps/ichat/utils.py @@ -1,7 +1,7 @@ import re import psycopg2 import threading -from django.db import transaction +from django.db import transaction, connections from .models import Message # 数据库连接 @@ -190,6 +190,10 @@ def strip_sql_markdown(content: str) -> str: # ORM 写入包装函数 def save_message_thread_safe(**kwargs): def _save(): - with transaction.atomic(): - Message.objects.create(**kwargs) + try: + with transaction.atomic(): + Message.objects.create(**kwargs) + finally: + # 子线程退出前关闭本线程的 Django DB 连接,避免 PG 连接泄漏 + connections.close_all() threading.Thread(target=_save).start() diff --git a/apps/utils/thread.py b/apps/utils/thread.py index 419dbcea..b913c89d 100644 --- a/apps/utils/thread.py +++ b/apps/utils/thread.py @@ -1,6 +1,7 @@ import threading from apps.utils.decorators import auto_log from concurrent.futures import ThreadPoolExecutor +from django.db import connections # 创建全局线程池 global_executor = ThreadPoolExecutor(max_workers=20) @@ -8,7 +9,12 @@ class MyThread(threading.Thread): @auto_log('MyThread', raise_exception=True, send_mail=True) def run(self) -> None: - return super().run() + # 子线程退出 / 池内 worker 跑完一次任务后必须关闭本线程的 Django DB 连接, + # 否则 psycopg2 连接会一直驻留在线程的 thread-local,导致 PG "too many clients" + try: + return super().run() + finally: + connections.close_all() def start_p(self): """ diff --git a/out_service/insert_kvt.py b/out_service/insert_kvt.py index 0fb3f09f..569cdda8 100644 --- a/out_service/insert_kvt.py +++ b/out_service/insert_kvt.py @@ -16,6 +16,7 @@ django.setup() from apps.enm.services import insert_mplogx_item from django.utils import timezone +from django.db import connections from apps.utils.tasks import send_mail_task from datetime import datetime, timedelta CUR_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -92,32 +93,36 @@ def fetch_data(timex, enp_mpoint_dict, path): """ 从数据库转存到超表 """ - response = None try: - response = requests.get(path, timeout=5) - response.raise_for_status() # 如果响应码不是 200,将触发异常 - except requests.RequestException as e: - send_error_notification(e) + response = None + try: + response = requests.get(path, timeout=5) + response.raise_for_status() # 如果响应码不是 200,将触发异常 + except requests.RequestException as e: + send_error_notification(e) - if response is None: - return - try: - lines = response.text - json_line = [line.strip() for line in lines.split('\n') if line.strip() ] - current_object = [] - # 将碎片分组为完整的 JSON 对象 - for line in json_line: - current_object.append(line.strip()) # 将当前行加入对象 - if line.strip() == '}': # 遇到结束大括号时 - try: - obj_str = ' '.join(current_object).replace(',}', '}') - obj_dict = json.loads(obj_str) - insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict) - except json.JSONDecodeError as e: - send_error_notification(e) - current_object = [] # 重置,准备处理下一个对象 - except Exception as e: - send_error_notification(e) + if response is None: + return + try: + lines = response.text + json_line = [line.strip() for line in lines.split('\n') if line.strip() ] + current_object = [] + # 将碎片分组为完整的 JSON 对象 + for line in json_line: + current_object.append(line.strip()) # 将当前行加入对象 + if line.strip() == '}': # 遇到结束大括号时 + try: + obj_str = ' '.join(current_object).replace(',}', '}') + obj_dict = json.loads(obj_str) + insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict) + except json.JSONDecodeError as e: + send_error_notification(e) + current_object = [] # 重置,准备处理下一个对象 + except Exception as e: + send_error_notification(e) + finally: + # 子线程必须主动关闭 Django DB 连接,否则每分钟泄漏 2 条 PG 连接 + connections.close_all() def get_data(): last_triggered = None