from django.shortcuts import render from apps.utils.viewsets import CustomModelViewSet from rest_framework.decorators import action from rest_framework.response import Response from apps.bi.models import Dataset, Report from apps.bi.serializers import DatasetSerializer, DatasetCreateUpdateSerializer, ReportCreateUpdateSerializer, ReportSerializer, ReportExecSerializer import concurrent.futures from django.core.cache import cache from apps.utils.sql import execute_raw_sql from apps.bi.services import check_sql_safe # Create your views here. class DatasetViewSet(CustomModelViewSet): queryset = Dataset.objects.all() serializer_class = DatasetSerializer create_serializer_class = DatasetCreateUpdateSerializer update_serializer_class = DatasetCreateUpdateSerializer search_fields = ['name'] class ReportViewSet(CustomModelViewSet): queryset = Report.objects.all() serializer_class = ReportSerializer create_serializer_class = ReportCreateUpdateSerializer update_serializer_class = ReportCreateUpdateSerializer search_fields = ['name', 'code'] @action(methods=['post'], detail=True, perms_map={'post': '*'}, serializer_class=ReportExecSerializer) def exec(self, request, pk=None): """执行报表查询 执行报表查询并用于返回前端渲染 """ report = self.get_object() rdata = ReportSerializer(instance=report).data query = request.data.get('query', {}) query['r_user'] = request.user.id query['r_dept'] = request.user.belong_dept.id if request.user.belong_dept else '' datasets = report.datasets.all() results = {} seconds = 10 # 缓存秒数 with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: # 多线程运行并返回字典结果 fun_ps = [] for ds in datasets: sql_query = ds.sql_query if sql_query: sql_f = check_sql_safe(sql_query.format(**query)) # 有风险先这样处理一下 res = cache.get(sql_f, None) if isinstance(res, list): results[ds.name] = res else: fun_ps.append((ds.name, execute_raw_sql, sql_f, seconds)) # 生成执行函数 futures = {executor.submit(i[1], i[2]): i for i in fun_ps} for future in concurrent.futures.as_completed(futures): name, *_, sql_f, seconds = futures[future] # 获取对应的键 try: r = future.result() results[name] = r if seconds: cache.set(sql_f, r, seconds) except Exception as e: results[name] = 'error: ' + str(e) rdata['data'] = results return Response(rdata)