from rest_framework.exceptions import ParseError import json from jinja2 import Template from apps.bi.models import Dataset import concurrent from apps.utils.sql import execute_raw_sql, format_sqldata forbidden_keywords = ["UPDATE", "DELETE", "DROP", "TRUNCATE", "INSERT", "CREATE", "ALTER", "GRANT", "REVOKE", "EXEC", "EXECUTE"] def check_sql_safe(sql: str): """检查sql安全性 """ sql_upper = sql.upper() # 将SQL按空格和分号分割成单词 words = [word for word in sql_upper.replace(';', ' ').split() if word] for kw in forbidden_keywords: # 检查关键字是否作为独立单词出现 if kw in words: raise ParseError(f'sql查询有风险-{kw}') return sql def format_json_with_placeholders(json_str, **kwargs): formatted_json = json_str # 遍历关键字参数,将占位符替换为对应的值 for key, value in kwargs.items(): formatted_json = formatted_json.replace("{" + key + "}", json.dumps(value)) # 格式化后的字符串依然是 JSON 字符串,没有使用 json.loads() return formatted_json def exec_dataset(dt: Dataset, xquery: dict = {}): """执行数据集 返回 (sql语句, { rda}) """ rdata = {} results = {} results2 = {} query = dt.default_param if dt.sql_query: query.update(xquery) sql_f_ = check_sql_safe(dt.sql_query.format(**query)) sql_f_strip = sql_f_.strip(';') sql_f_l = sql_f_strip.split(';') # 多线程运行并返回字典结果 with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: fun_ps = [] for ind, val in enumerate(sql_f_l): fun_ps.append((f'ds{ind}', execute_raw_sql, val)) # 生成执行函数 futures = {executor.submit(i[1], i[2]): i for i in fun_ps} for future in concurrent.futures.as_completed(futures): name, *_, sql_f = futures[future] # 获取对应的键 res = future.result() results[name], results2[name] = format_sqldata( res[0], res[1]) rdata['data'] = results rdata['data2'] = results2 return sql_f_, rdata