factory/apps/bi/services.py

61 lines
2.2 KiB
Python

from rest_framework.exceptions import ParseError
import json
from jinja2 import Template
from apps.bi.models import Dataset
import concurrent
from apps.utils.sql import execute_raw_sql, format_sqldata
forbidden_keywords = ["UPDATE", "DELETE", "DROP", "TRUNCATE", "INSERT", "CREATE", "ALTER", "GRANT", "REVOKE", "EXEC", "EXECUTE"]
def check_sql_safe(sql: str):
"""检查sql安全性
"""
sql_upper = sql.upper()
# 将SQL按空格和分号分割成单词
words = [word for word in sql_upper.replace(';', ' ').split() if word]
for kw in forbidden_keywords:
# 检查关键字是否作为独立单词出现
if kw in words:
raise ParseError(f'sql查询有风险-{kw}')
return sql
def format_json_with_placeholders(json_str, **kwargs):
formatted_json = json_str
# 遍历关键字参数,将占位符替换为对应的值
for key, value in kwargs.items():
formatted_json = formatted_json.replace("{" + key + "}", json.dumps(value))
# 格式化后的字符串依然是 JSON 字符串,没有使用 json.loads()
return formatted_json
def exec_dataset(dt: Dataset, xquery: dict = {}):
"""执行数据集
返回 (sql语句, { rda})
"""
rdata = {}
results = {}
results2 = {}
query = dt.default_param
if dt.sql_query:
query.update(xquery)
sql_f_ = check_sql_safe(dt.sql_query.format(**query))
sql_f_strip = sql_f_.strip(';')
sql_f_l = sql_f_strip.split(';')
# 多线程运行并返回字典结果
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
fun_ps = []
for ind, val in enumerate(sql_f_l):
fun_ps.append((f'ds{ind}', execute_raw_sql, val))
# 生成执行函数
futures = {executor.submit(i[1], i[2]): i for i in fun_ps}
for future in concurrent.futures.as_completed(futures):
name, *_, sql_f = futures[future] # 获取对应的键
res = future.result()
results[name], results2[name] = format_sqldata(
res[0], res[1])
rdata['data'] = results
rdata['data2'] = results2
return sql_f_, rdata