feat: 添加定时执行bi的task和记录表

This commit is contained in:
caoqianming 2024-07-26 17:18:42 +08:00
parent 9c27ffd7dd
commit 2a225bdc86
5 changed files with 93 additions and 2 deletions

View File

@ -0,0 +1,31 @@
# Generated by Django 3.2.12 on 2024-07-26 09:14
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('bi', '0004_dataset_test_param'),
]
operations = [
migrations.CreateModel(
name='DatasetRecord',
fields=[
('id', models.CharField(editable=False, help_text='主键ID', max_length=20, primary_key=True, serialize=False, verbose_name='主键ID')),
('create_time', models.DateTimeField(default=django.utils.timezone.now, help_text='创建时间', verbose_name='创建时间')),
('update_time', models.DateTimeField(auto_now=True, help_text='修改时间', verbose_name='修改时间')),
('is_deleted', models.BooleanField(default=False, help_text='删除标记', verbose_name='删除标记')),
('timex', models.DateTimeField(auto_now_add=True, verbose_name='执行时间')),
('full_sql', models.TextField(blank=True, null=True, verbose_name='完整SQL')),
('result', models.JSONField(blank=True, default=dict, verbose_name='执行返回')),
('dataset', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='bi.dataset', verbose_name='数据集')),
],
options={
'abstract': False,
},
),
]

View File

@ -19,3 +19,11 @@ class Dataset(CommonBDModel):
# code = models.CharField('标识', max_length=100, default='', blank=True)
# js_function = models.TextField('数据转化函数', default='', blank=True)
# datasets = models.ManyToManyField(Dataset, verbose_name='关联数据集', blank=True)
class DatasetRecord(BaseModel):
"""
数据执行记录
"""
timex = models.DateTimeField('执行时间', auto_now_add=True)
dataset = models.ForeignKey(Dataset, on_delete=models.CASCADE, verbose_name='数据集')
full_sql = models.TextField('完整SQL', null=True, blank=True)
result = models.JSONField('执行返回', default=dict, blank=True)

View File

@ -1,6 +1,9 @@
from rest_framework.exceptions import ParseError
import json
from jinja2 import Template
from apps.bi.models import Dataset
import concurrent
from apps.utils.sql import execute_raw_sql, format_sqldata
forbidden_keywords = ["UPDATE", "DELETE", "DROP", "TRUNCATE"]
@ -22,4 +25,34 @@ def format_json_with_placeholders(json_str, **kwargs):
formatted_json = formatted_json.replace("{" + key + "}", json.dumps(value))
# 格式化后的字符串依然是 JSON 字符串,没有使用 json.loads()
return formatted_json
return formatted_json
def exec_dataset(dt: Dataset, xquery: dict = {}):
"""执行数据集
返回 (sql语句, { rda})
"""
rdata = {}
results = {}
results2 = {}
query = dt.default_param
if dt.sql_query:
query.update(xquery)
sql_f_ = check_sql_safe(dt.sql_query.format(**query))
sql_f_strip = sql_f_.strip(';')
sql_f_l = sql_f_strip.split(';')
# 多线程运行并返回字典结果
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
fun_ps = []
for ind, val in enumerate(sql_f_l):
fun_ps.append((f'ds{ind}', execute_raw_sql, val))
# 生成执行函数
futures = {executor.submit(i[1], i[2]): i for i in fun_ps}
for future in concurrent.futures.as_completed(futures):
name, *_, sql_f = futures[future] # 获取对应的键
res = future.result()
results[name], results2[name] = format_sqldata(
res[0], res[1])
rdata['data'] = results
rdata['data2'] = results2
return sql_f_, rdata

19
apps/bi/tasks.py Normal file
View File

@ -0,0 +1,19 @@
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.utils import timezone
from apps.bi.models import Dataset, DatasetRecord
from apps.bi.services import exec_dataset
import json
@shared_task()
def exec_dataset_and_store(code: str, query: str = ''):
dt = Dataset.objects.get(code=code)
dtr = DatasetRecord()
dtr.timex = timezone.now()
dtr.dataset = dt
squery = {}
if query:
squery = json.loads(query)
dtr.full_sql, dtr.result = exec_dataset(dt, squery)
dtr.save()

View File

@ -68,9 +68,9 @@ class DatasetViewSet(CustomModelViewSet):
raise_exception = request.data.get('raise_exception', True)
xquery['r_user'] = request.user.id
xquery['r_dept'] = request.user.belong_dept.id if request.user.belong_dept else ''
can_cache = True
results = {}
results2 = {}
can_cache = True
query = dt.default_param
if dt.sql_query:
if is_test: