From 2a225bdc86a6fd371c92dc84ad926a81670c6d61 Mon Sep 17 00:00:00 2001 From: caoqianming Date: Fri, 26 Jul 2024 17:18:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E6=89=A7=E8=A1=8Cbi=E7=9A=84task=E5=92=8C=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/bi/migrations/0005_datasetrecord.py | 31 +++++++++++++++++++++ apps/bi/models.py | 8 ++++++ apps/bi/services.py | 35 +++++++++++++++++++++++- apps/bi/tasks.py | 19 +++++++++++++ apps/bi/views.py | 2 +- 5 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 apps/bi/migrations/0005_datasetrecord.py create mode 100644 apps/bi/tasks.py diff --git a/apps/bi/migrations/0005_datasetrecord.py b/apps/bi/migrations/0005_datasetrecord.py new file mode 100644 index 00000000..19caf38f --- /dev/null +++ b/apps/bi/migrations/0005_datasetrecord.py @@ -0,0 +1,31 @@ +# Generated by Django 3.2.12 on 2024-07-26 09:14 + +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('bi', '0004_dataset_test_param'), + ] + + operations = [ + migrations.CreateModel( + name='DatasetRecord', + fields=[ + ('id', models.CharField(editable=False, help_text='主键ID', max_length=20, primary_key=True, serialize=False, verbose_name='主键ID')), + ('create_time', models.DateTimeField(default=django.utils.timezone.now, help_text='创建时间', verbose_name='创建时间')), + ('update_time', models.DateTimeField(auto_now=True, help_text='修改时间', verbose_name='修改时间')), + ('is_deleted', models.BooleanField(default=False, help_text='删除标记', verbose_name='删除标记')), + ('timex', models.DateTimeField(auto_now_add=True, verbose_name='执行时间')), + ('full_sql', models.TextField(blank=True, null=True, verbose_name='完整SQL')), + ('result', models.JSONField(blank=True, default=dict, verbose_name='执行返回')), + ('dataset', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='bi.dataset', verbose_name='数据集')), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/apps/bi/models.py b/apps/bi/models.py index b300b234..b62354fb 100644 --- a/apps/bi/models.py +++ b/apps/bi/models.py @@ -19,3 +19,11 @@ class Dataset(CommonBDModel): # code = models.CharField('标识', max_length=100, default='', blank=True) # js_function = models.TextField('数据转化函数', default='', blank=True) # datasets = models.ManyToManyField(Dataset, verbose_name='关联数据集', blank=True) +class DatasetRecord(BaseModel): + """ + 数据执行记录 + """ + timex = models.DateTimeField('执行时间', auto_now_add=True) + dataset = models.ForeignKey(Dataset, on_delete=models.CASCADE, verbose_name='数据集') + full_sql = models.TextField('完整SQL', null=True, blank=True) + result = models.JSONField('执行返回', default=dict, blank=True) \ No newline at end of file diff --git a/apps/bi/services.py b/apps/bi/services.py index b6fa0e46..4c44ccfa 100644 --- a/apps/bi/services.py +++ b/apps/bi/services.py @@ -1,6 +1,9 @@ from rest_framework.exceptions import ParseError import json from jinja2 import Template +from apps.bi.models import Dataset +import concurrent +from apps.utils.sql import execute_raw_sql, format_sqldata forbidden_keywords = ["UPDATE", "DELETE", "DROP", "TRUNCATE"] @@ -22,4 +25,34 @@ def format_json_with_placeholders(json_str, **kwargs): formatted_json = formatted_json.replace("{" + key + "}", json.dumps(value)) # 格式化后的字符串依然是 JSON 字符串,没有使用 json.loads() - return formatted_json \ No newline at end of file + return formatted_json + + +def exec_dataset(dt: Dataset, xquery: dict = {}): + """执行数据集 + 返回 (sql语句, { rda}) + """ + rdata = {} + results = {} + results2 = {} + query = dt.default_param + if dt.sql_query: + query.update(xquery) + sql_f_ = check_sql_safe(dt.sql_query.format(**query)) + sql_f_strip = sql_f_.strip(';') + sql_f_l = sql_f_strip.split(';') + # 多线程运行并返回字典结果 + with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: + fun_ps = [] + for ind, val in enumerate(sql_f_l): + fun_ps.append((f'ds{ind}', execute_raw_sql, val)) + # 生成执行函数 + futures = {executor.submit(i[1], i[2]): i for i in fun_ps} + for future in concurrent.futures.as_completed(futures): + name, *_, sql_f = futures[future] # 获取对应的键 + res = future.result() + results[name], results2[name] = format_sqldata( + res[0], res[1]) + rdata['data'] = results + rdata['data2'] = results2 + return sql_f_, rdata \ No newline at end of file diff --git a/apps/bi/tasks.py b/apps/bi/tasks.py new file mode 100644 index 00000000..6e71ec5b --- /dev/null +++ b/apps/bi/tasks.py @@ -0,0 +1,19 @@ +from __future__ import absolute_import, unicode_literals +from celery import shared_task +from django.utils import timezone +from apps.bi.models import Dataset, DatasetRecord +from apps.bi.services import exec_dataset +import json + + +@shared_task() +def exec_dataset_and_store(code: str, query: str = ''): + dt = Dataset.objects.get(code=code) + dtr = DatasetRecord() + dtr.timex = timezone.now() + dtr.dataset = dt + squery = {} + if query: + squery = json.loads(query) + dtr.full_sql, dtr.result = exec_dataset(dt, squery) + dtr.save() diff --git a/apps/bi/views.py b/apps/bi/views.py index 20ca691d..a3ba6054 100644 --- a/apps/bi/views.py +++ b/apps/bi/views.py @@ -68,9 +68,9 @@ class DatasetViewSet(CustomModelViewSet): raise_exception = request.data.get('raise_exception', True) xquery['r_user'] = request.user.id xquery['r_dept'] = request.user.belong_dept.id if request.user.belong_dept else '' + can_cache = True results = {} results2 = {} - can_cache = True query = dt.default_param if dt.sql_query: if is_test: