fix: 处理PG 连接泄漏隐患

This commit is contained in:
shijing 2026-06-18 10:22:21 +08:00
parent e77c1ac820
commit e5bf9d58c0
3 changed files with 43 additions and 28 deletions

View File

@ -1,7 +1,7 @@
import re import re
import psycopg2 import psycopg2
import threading import threading
from django.db import transaction from django.db import transaction, connections
from .models import Message from .models import Message
# 数据库连接 # 数据库连接
@ -190,6 +190,10 @@ def strip_sql_markdown(content: str) -> str:
# ORM 写入包装函数 # ORM 写入包装函数
def save_message_thread_safe(**kwargs): def save_message_thread_safe(**kwargs):
def _save(): def _save():
with transaction.atomic(): try:
Message.objects.create(**kwargs) with transaction.atomic():
Message.objects.create(**kwargs)
finally:
# 子线程退出前关闭本线程的 Django DB 连接,避免 PG 连接泄漏
connections.close_all()
threading.Thread(target=_save).start() threading.Thread(target=_save).start()

View File

@ -1,6 +1,7 @@
import threading import threading
from apps.utils.decorators import auto_log from apps.utils.decorators import auto_log
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from django.db import connections
# 创建全局线程池 # 创建全局线程池
global_executor = ThreadPoolExecutor(max_workers=20) global_executor = ThreadPoolExecutor(max_workers=20)
@ -8,7 +9,12 @@ class MyThread(threading.Thread):
@auto_log('MyThread', raise_exception=True, send_mail=True) @auto_log('MyThread', raise_exception=True, send_mail=True)
def run(self) -> None: def run(self) -> None:
return super().run() # 子线程退出 / 池内 worker 跑完一次任务后必须关闭本线程的 Django DB 连接,
# 否则 psycopg2 连接会一直驻留在线程的 thread-local导致 PG "too many clients"
try:
return super().run()
finally:
connections.close_all()
def start_p(self): def start_p(self):
""" """

View File

@ -16,6 +16,7 @@ django.setup()
from apps.enm.services import insert_mplogx_item from apps.enm.services import insert_mplogx_item
from django.utils import timezone from django.utils import timezone
from django.db import connections
from apps.utils.tasks import send_mail_task from apps.utils.tasks import send_mail_task
from datetime import datetime, timedelta from datetime import datetime, timedelta
CUR_DIR = os.path.dirname(os.path.abspath(__file__)) CUR_DIR = os.path.dirname(os.path.abspath(__file__))
@ -92,32 +93,36 @@ def fetch_data(timex, enp_mpoint_dict, path):
""" """
从数据库转存到超表 从数据库转存到超表
""" """
response = None
try: try:
response = requests.get(path, timeout=5) response = None
response.raise_for_status() # 如果响应码不是 200将触发异常 try:
except requests.RequestException as e: response = requests.get(path, timeout=5)
send_error_notification(e) response.raise_for_status() # 如果响应码不是 200将触发异常
except requests.RequestException as e:
send_error_notification(e)
if response is None: if response is None:
return return
try: try:
lines = response.text lines = response.text
json_line = [line.strip() for line in lines.split('\n') if line.strip() ] json_line = [line.strip() for line in lines.split('\n') if line.strip() ]
current_object = [] current_object = []
# 将碎片分组为完整的 JSON 对象 # 将碎片分组为完整的 JSON 对象
for line in json_line: for line in json_line:
current_object.append(line.strip()) # 将当前行加入对象 current_object.append(line.strip()) # 将当前行加入对象
if line.strip() == '}': # 遇到结束大括号时 if line.strip() == '}': # 遇到结束大括号时
try: try:
obj_str = ' '.join(current_object).replace(',}', '}') obj_str = ' '.join(current_object).replace(',}', '}')
obj_dict = json.loads(obj_str) obj_dict = json.loads(obj_str)
insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict) insert_mplogx_item(obj_dict.get('strVarName'), obj_dict.get('VarValue'), timex, enp_mpoint_dict)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
send_error_notification(e) send_error_notification(e)
current_object = [] # 重置,准备处理下一个对象 current_object = [] # 重置,准备处理下一个对象
except Exception as e: except Exception as e:
send_error_notification(e) send_error_notification(e)
finally:
# 子线程必须主动关闭 Django DB 连接,否则每分钟泄漏 2 条 PG 连接
connections.close_all()
def get_data(): def get_data():
last_triggered = None last_triggered = None