110 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			110 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
from django.db import connection
 | 
						|
from django.utils import timezone
 | 
						|
from datetime import datetime
 | 
						|
 | 
						|
def execute_raw_sql(sql: str, params=None):
 | 
						|
    """执行原始sql并返回rows, columns数据
 | 
						|
 | 
						|
    Args:
 | 
						|
        sql (str): 查询语句
 | 
						|
        params (_type_, optional): 参数列表. Defaults to None.
 | 
						|
    """
 | 
						|
    with connection.cursor() as cursor:
 | 
						|
        cursor.execute("SET statement_timeout TO %s;", [30000])
 | 
						|
        if params:
 | 
						|
            cursor.execute(sql, params=params)
 | 
						|
        else:
 | 
						|
            cursor.execute(sql)
 | 
						|
        if cursor.description:
 | 
						|
            columns  = [desc[0] for desc in cursor.description]
 | 
						|
            rows = cursor.fetchall()
 | 
						|
            return columns, rows
 | 
						|
        return [], []
 | 
						|
    
 | 
						|
def format_sqldata(columns, rows):
 | 
						|
    return [columns] + rows, [dict(zip(columns, row)) for row in rows]
 | 
						|
 | 
						|
 | 
						|
def query_all_dict(sql, params=None, with_time_format=False):
 | 
						|
    '''
 | 
						|
    查询所有结果返回字典类型数据
 | 
						|
    :param sql:
 | 
						|
    :param params:
 | 
						|
    :return:
 | 
						|
    '''
 | 
						|
    with connection.cursor() as cursor:
 | 
						|
        if params:
 | 
						|
            cursor.execute(sql, params=params)
 | 
						|
        else:
 | 
						|
            cursor.execute(sql)
 | 
						|
        columns  = [desc[0] for desc in cursor.description]
 | 
						|
        if with_time_format:
 | 
						|
            results = []
 | 
						|
            for row in cursor.fetchall():
 | 
						|
                row_dict = {}
 | 
						|
                for col, val in zip(columns, row):
 | 
						|
                    if isinstance(val, datetime):
 | 
						|
                        val = timezone.make_naive(val).strftime("%Y-%m-%d %H:%M:%S")
 | 
						|
                    row_dict[col] = val
 | 
						|
                results.append(row_dict)
 | 
						|
            return results
 | 
						|
        return [dict(zip(columns, row)) for row in cursor.fetchall()]
 | 
						|
 | 
						|
def query_one_dict(sql, params=None, with_time_format=False):
 | 
						|
    """
 | 
						|
    查询一个结果返回字典类型数据
 | 
						|
    :param sql:
 | 
						|
    :param params:
 | 
						|
    :return:
 | 
						|
    """
 | 
						|
    with connection.cursor() as cursor:
 | 
						|
        cursor.execute(sql, params or ())  # 更简洁的参数处理
 | 
						|
        columns = [desc[0] for desc in cursor.description]
 | 
						|
        row = cursor.fetchone()
 | 
						|
        if with_time_format:
 | 
						|
            row_dict = {}
 | 
						|
            for col, val in zip(columns, row):
 | 
						|
                if isinstance(val, datetime):
 | 
						|
                    val = timezone.make_naive(val).strftime("%Y-%m-%d %H:%M:%S")
 | 
						|
                row_dict[col] = val
 | 
						|
            return row_dict
 | 
						|
        return dict(zip(columns, row)) if row else None  # 安全处理None情况
 | 
						|
    
 | 
						|
import pymysql
 | 
						|
import psycopg2
 | 
						|
 | 
						|
class DbConnection:
 | 
						|
    def __init__(self, host, user, password, database, dbtype='mysql'):
 | 
						|
        if dbtype not in ['mysql', 'pg']:
 | 
						|
            raise ValueError('dbtype must be mysql or pg')
 | 
						|
        self.dbtype = dbtype
 | 
						|
        self.host = host
 | 
						|
        self.user = user
 | 
						|
        self.password = password
 | 
						|
        self.database = database
 | 
						|
        self.conn = None
 | 
						|
 | 
						|
    def _connect(self):
 | 
						|
        if self.dbtype == 'mysql':
 | 
						|
            return pymysql.connect(
 | 
						|
                host=self.host,
 | 
						|
                user=self.user,
 | 
						|
                password=self.password,
 | 
						|
                database=self.database
 | 
						|
            )
 | 
						|
        elif self.dbtype == 'pg':
 | 
						|
            return psycopg2.connect(
 | 
						|
                host=self.host,
 | 
						|
                user=self.user,
 | 
						|
                password=self.password,
 | 
						|
                database=self.database,
 | 
						|
            )
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        self.conn = self._connect()
 | 
						|
        return self.conn.cursor()
 | 
						|
 | 
						|
    def __exit__(self, exc_type, exc_val, exc_tb):
 | 
						|
        if self.conn:
 | 
						|
            self.conn.close()
 |