factory/apps/bi/views.py

68 lines
2.8 KiB
Python

from django.shortcuts import render
from apps.utils.viewsets import CustomModelViewSet
from rest_framework.decorators import action
from rest_framework.response import Response
from apps.bi.models import Dataset, Report
from apps.bi.serializers import DatasetSerializer, DatasetCreateUpdateSerializer, ReportCreateUpdateSerializer, ReportSerializer, ReportExecSerializer
import concurrent.futures
from django.core.cache import cache
from apps.utils.sql import execute_raw_sql
from apps.bi.services import check_sql_safe
# Create your views here.
class DatasetViewSet(CustomModelViewSet):
queryset = Dataset.objects.all()
serializer_class = DatasetSerializer
create_serializer_class = DatasetCreateUpdateSerializer
update_serializer_class = DatasetCreateUpdateSerializer
search_fields = ['name']
class ReportViewSet(CustomModelViewSet):
queryset = Report.objects.all()
serializer_class = ReportSerializer
create_serializer_class = ReportCreateUpdateSerializer
update_serializer_class = ReportCreateUpdateSerializer
search_fields = ['name', 'code']
@action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ReportExecSerializer)
def exec(self, request, pk=None):
"""执行报表查询
执行报表查询并用于返回前端渲染
"""
report = self.get_object()
rdata = ReportSerializer(instance=report).data
query = request.data.get('query', {})
query['r_user'] = request.user.id
query['r_dept'] = request.user.belong_dept.id if request.user.belong_dept else ''
datasets = report.datasets.all()
results = {}
seconds = 10 # 缓存秒数
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: # 多线程运行并返回字典结果
fun_ps = []
for ds in datasets:
sql_query = ds.sql_query
if sql_query:
sql_f = check_sql_safe(sql_query.format(**query)) # 有风险先这样处理一下
res = cache.get(sql_f, None)
if isinstance(res, list):
results[ds.name] = res
else:
fun_ps.append((ds.name, execute_raw_sql, sql_f, seconds))
# 生成执行函数
futures = {executor.submit(i[1], i[2]): i for i in fun_ps}
for future in concurrent.futures.as_completed(futures):
name, *_, sql_f, seconds = futures[future] # 获取对应的键
try:
r = future.result()
results[name] = r
if seconds:
cache.set(sql_f, r, seconds)
except Exception as e:
results[name] = 'error: ' + str(e)
rdata['data'] = results
return Response(rdata)