128 lines
5.8 KiB
Python
128 lines
5.8 KiB
Python
from django.shortcuts import render
|
|
from apps.utils.viewsets import CustomModelViewSet
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from apps.bi.models import Dataset
|
|
from apps.bi.serializers import DatasetSerializer, DatasetCreateUpdateSerializer, DataExecSerializer
|
|
from django.apps import apps
|
|
from rest_framework import serializers
|
|
import concurrent.futures
|
|
from django.core.cache import cache
|
|
from apps.utils.sql import execute_raw_sql, format_sqldata
|
|
from apps.bi.services import check_sql_safe, format_json_with_placeholders
|
|
from rest_framework.exceptions import ParseError
|
|
# Create your views here.
|
|
|
|
class DatasetViewSet(CustomModelViewSet):
|
|
queryset = Dataset.objects.all()
|
|
serializer_class = DatasetSerializer
|
|
create_serializer_class = DatasetCreateUpdateSerializer
|
|
update_serializer_class = DatasetCreateUpdateSerializer
|
|
search_fields = ['name', 'code']
|
|
|
|
@action(methods=['post'], detail=True, perms_map={'post': 'dataset.exec'}, serializer_class=DataExecSerializer, cache_seconds=0)
|
|
def exec(self, request, pk=None):
|
|
"""执行sql查询
|
|
"""
|
|
dt = self.get_object()
|
|
rdata = DatasetSerializer(instance=dt).data
|
|
query = request.data.get('query', {})
|
|
return_type = request.data.get('return_type', 2)
|
|
query['r_user'] = request.user.id
|
|
query['r_dept'] = request.user.belong_dept.id if request.user.belong_dept else ''
|
|
results = {}
|
|
seconds = 10
|
|
if dt.sql_query:
|
|
sql_f_ = check_sql_safe(dt.sql_query.format(**query))
|
|
sql_f_l = sql_f_.strip(';').split(';')
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: # 多线程运行并返回字典结果
|
|
fun_ps = []
|
|
for ind, val in enumerate(sql_f_l):
|
|
res = cache.get(val, None)
|
|
if isinstance(res, tuple):
|
|
results[f'ds{ind}'] = format_sqldata(res[0], res[1], return_type)
|
|
else:
|
|
fun_ps.append((f'ds{ind}', execute_raw_sql, val))
|
|
# 生成执行函数
|
|
futures = {executor.submit(i[1], i[2]): i for i in fun_ps}
|
|
for future in concurrent.futures.as_completed(futures):
|
|
name, *_, sql_f = futures[future] # 获取对应的键
|
|
try:
|
|
res = future.result()
|
|
results[name] = format_sqldata(res[0], res[1], return_type)
|
|
if seconds:
|
|
cache.set(sql_f, res, seconds)
|
|
except Exception as e:
|
|
results[name] = 'error: ' + str(e)
|
|
rdata['data'] = results
|
|
if rdata['echart_options']:
|
|
for key in results:
|
|
if isinstance(results[key], str):
|
|
raise ParseError(results[key])
|
|
rdata['echart_options'] = format_json_with_placeholders(rdata['echart_options'], **results)
|
|
return Response(rdata)
|
|
|
|
@action(methods=['get'], detail=False, perms_map={'get': '*'})
|
|
def base(self, request, pk=None):
|
|
all_models = apps.get_models()
|
|
rdict = {}
|
|
# 遍历所有模型
|
|
for model in all_models:
|
|
# 获取表名称
|
|
table_name = model._meta.db_table
|
|
rdict[table_name] = []
|
|
|
|
# 获取字段信息
|
|
fields = model._meta.get_fields()
|
|
for field in fields:
|
|
rdict[table_name].append({'name': field.name, 'type': field.get_internal_type()})
|
|
return Response(rdict)
|
|
|
|
|
|
# class ReportViewSet(CustomModelViewSet): # 暂时不用了
|
|
# queryset = Report.objects.all()
|
|
# serializer_class = ReportSerializer
|
|
# search_fields = ['name', 'code']
|
|
|
|
# @action(methods=['post'], detail=True, perms_map={'post': 'report.exec'}, serializer_class=DataExecSerializer, cache_seconds=0)
|
|
# def exec(self, request, pk=None):
|
|
# """执行报表查询
|
|
|
|
# 执行报表查询并用于返回前端渲染
|
|
# """
|
|
# report = self.get_object()
|
|
# rdata = ReportSerializer(instance=report).data
|
|
# query = request.data.get('query', {})
|
|
# return_type = request.data.get('return_type', 2)
|
|
# query['r_user'] = request.user.id
|
|
# query['r_dept'] = request.user.belong_dept.id if request.user.belong_dept else ''
|
|
# datasets = report.datasets.all()
|
|
# results = {}
|
|
# seconds = 10 # 缓存秒数
|
|
|
|
# with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: # 多线程运行并返回字典结果
|
|
# fun_ps = []
|
|
# for ds in datasets:
|
|
# sql_query = ds.sql_query
|
|
# if sql_query:
|
|
# sql_f = check_sql_safe(sql_query.format(**query)) # 有风险先这样处理一下
|
|
# res = cache.get(sql_f, None)
|
|
# if isinstance(res, tuple):
|
|
# results[ds.name] = format_sqldata(res[0], res[1], return_type)
|
|
# else:
|
|
# fun_ps.append((ds.name, execute_raw_sql, sql_f))
|
|
# # 生成执行函数
|
|
# futures = {executor.submit(i[1], i[2]): i for i in fun_ps}
|
|
# for future in concurrent.futures.as_completed(futures):
|
|
# name, *_, sql_f = futures[future] # 获取对应的键
|
|
# try:
|
|
# res = future.result()
|
|
# results[name] = format_sqldata(res[0], res[1], return_type)
|
|
# if seconds:
|
|
# cache.set(sql_f, res, seconds)
|
|
# except Exception as e:
|
|
# results[name] = 'error: ' + str(e)
|
|
|
|
# rdata['data'] = results
|
|
# return Response(rdata)
|
|
|