factory/apps/utils/sql.py

91 lines
2.6 KiB
Python

from django.db import connection
def execute_raw_sql(sql: str, params=None):
"""执行原始sql并返回rows, columns数据
Args:
sql (str): 查询语句
params (_type_, optional): 参数列表. Defaults to None.
"""
with connection.cursor() as cursor:
cursor.execute("SET statement_timeout TO %s;", [30000])
if params:
cursor.execute(sql, params=params)
else:
cursor.execute(sql)
if cursor.description:
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return columns, rows
return [], []
def format_sqldata(columns, rows):
return [columns] + rows, [dict(zip(columns, row)) for row in rows]
def query_all_dict(sql, params=None):
'''
查询所有结果返回字典类型数据
:param sql:
:param params:
:return:
'''
with connection.cursor() as cursor:
if params:
cursor.execute(sql, params=params)
else:
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def query_one_dict(sql, params=None):
"""
查询一个结果返回字典类型数据
:param sql:
:param params:
:return:
"""
with connection.cursor() as cursor:
cursor.execute(sql, params or ()) # 更简洁的参数处理
columns = [desc[0] for desc in cursor.description]
row = cursor.fetchone()
return dict(zip(columns, row)) if row else None # 安全处理None情况
import pymysql
import psycopg2
class DbConnection:
def __init__(self, host, user, password, database, dbtype='mysql'):
if dbtype not in ['mysql', 'pg']:
raise ValueError('dbtype must be mysql or pg')
self.dbtype = dbtype
self.host = host
self.user = user
self.password = password
self.database = database
self.conn = None
def _connect(self):
if self.dbtype == 'mysql':
return pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
elif self.dbtype == 'pg':
return psycopg2.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
)
def __enter__(self):
self.conn = self._connect()
return self.conn.cursor()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()