61 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			61 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
| from rest_framework.exceptions import ParseError
 | |
| import json
 | |
| from jinja2 import Template
 | |
| from apps.bi.models import Dataset
 | |
| import concurrent
 | |
| from apps.utils.sql import execute_raw_sql, format_sqldata
 | |
| 
 | |
| forbidden_keywords = ["UPDATE", "DELETE", "DROP", "TRUNCATE", "INSERT", "CREATE", "ALTER", "GRANT", "REVOKE", "EXEC", "EXECUTE"]
 | |
| 
 | |
| 
 | |
| def check_sql_safe(sql: str):
 | |
|     """检查sql安全性
 | |
|     """
 | |
|     sql_upper = sql.upper()
 | |
|     # 将SQL按空格和分号分割成单词
 | |
|     words = [word for word in sql_upper.replace(';', ' ').split() if word]
 | |
|     for kw in forbidden_keywords:
 | |
|         # 检查关键字是否作为独立单词出现
 | |
|         if kw in words:
 | |
|             raise ParseError(f'sql查询有风险-{kw}')
 | |
|     return sql
 | |
| 
 | |
| def format_json_with_placeholders(json_str, **kwargs):
 | |
|     formatted_json = json_str
 | |
| 
 | |
|     # 遍历关键字参数,将占位符替换为对应的值
 | |
|     for key, value in kwargs.items():
 | |
|         formatted_json = formatted_json.replace("{" + key + "}", json.dumps(value))
 | |
| 
 | |
|     # 格式化后的字符串依然是 JSON 字符串,没有使用 json.loads()
 | |
|     return formatted_json
 | |
| 
 | |
| 
 | |
| def exec_dataset(dt: Dataset, xquery: dict = {}):
 | |
|     """执行数据集
 | |
|     返回 (sql语句, { rda})
 | |
|     """
 | |
|     rdata = {}
 | |
|     results = {}
 | |
|     results2 = {}
 | |
|     query = dt.default_param
 | |
|     if dt.sql_query:
 | |
|         query.update(xquery)
 | |
|         sql_f_ = check_sql_safe(dt.sql_query.format(**query))
 | |
|         sql_f_strip = sql_f_.strip(';')
 | |
|         sql_f_l = sql_f_strip.split(';')
 | |
|         # 多线程运行并返回字典结果
 | |
|         with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
 | |
|             fun_ps = []
 | |
|             for ind, val in enumerate(sql_f_l):
 | |
|                 fun_ps.append((f'ds{ind}', execute_raw_sql, val))
 | |
|             # 生成执行函数
 | |
|             futures = {executor.submit(i[1], i[2]): i for i in fun_ps}
 | |
|             for future in concurrent.futures.as_completed(futures):
 | |
|                 name, *_, sql_f = futures[future]  # 获取对应的键
 | |
|                 res = future.result()
 | |
|                 results[name],  results2[name] = format_sqldata(
 | |
|                     res[0], res[1])
 | |
|     rdata['data'] = results
 | |
|     rdata['data2'] = results2
 | |
|     return sql_f_, rdata |