From a104191341addaad7afb7c6a9dc57cba5564f9dd Mon Sep 17 00:00:00 2001 From: caoqianming Date: Tue, 2 Apr 2024 10:18:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BB=8Ekioserver=E4=B8=AD=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../enm/migrations/0024_auto_20240326_1617.py | 47 +++++++ apps/enm/models.py | 28 ++++- apps/enm/services.py | 117 +++++++++++++++++- apps/enm/views.py | 13 ++ 4 files changed, 200 insertions(+), 5 deletions(-) create mode 100644 apps/enm/migrations/0024_auto_20240326_1617.py diff --git a/apps/enm/migrations/0024_auto_20240326_1617.py b/apps/enm/migrations/0024_auto_20240326_1617.py new file mode 100644 index 00000000..249aee12 --- /dev/null +++ b/apps/enm/migrations/0024_auto_20240326_1617.py @@ -0,0 +1,47 @@ +# Generated by Django 3.2.12 on 2024-03-26 08:17 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('enm', '0023_mpoint_interval'), + ] + + operations = [ + migrations.CreateModel( + name='MpLogx', + fields=[ + ('timex', models.DateTimeField(primary_key=True, serialize=False, verbose_name='采集时间')), + ('val_float', models.FloatField(blank=True, null=True, verbose_name='数值')), + ('val_int', models.IntegerField(blank=True, null=True, verbose_name='数值')), + ('val_bool', models.BooleanField(blank=True, null=True, verbose_name='数值')), + ('val_str', models.CharField(blank=True, max_length=100, null=True, verbose_name='数值')), + ], + options={ + 'db_table': 'enm_mplog', + 'managed': False, + }, + ), + migrations.AddField( + model_name='mpoint', + name='enabled', + field=models.BooleanField(default=False, verbose_name='是否启用'), + ), + migrations.AddField( + model_name='mpoint', + name='val_type', + field=models.CharField(default='float', help_text='float, int, str, bool', max_length=50, verbose_name='值类型'), + ), + migrations.AlterField( + model_name='mpoint', + name='cate', + field=models.CharField(blank=True, max_length=50, null=True, verbose_name='分类'), + ), + migrations.AlterField( + model_name='mpoint', + name='unit', + field=models.CharField(blank=True, max_length=50, null=True, verbose_name='单位'), + ), + ] diff --git a/apps/enm/models.py b/apps/enm/models.py index ef92cd05..341a5d3a 100644 --- a/apps/enm/models.py +++ b/apps/enm/models.py @@ -4,13 +4,13 @@ from apps.wpm.models import SfLog from apps.mtm.models import Material, Mgroup, Team -class Mpoint(CommonBDModel): +class Mpoint(CommonBModel): """测点 """ name = models.CharField('测点名称', max_length=50) code = models.CharField('测点编号', max_length=50, unique=True) - unit = models.CharField('单位', max_length=50) - cate = models.CharField('分类', max_length=50, default='material') + unit = models.CharField('单位', max_length=50, null=True, blank=True) + cate = models.CharField('分类', max_length=50, null=True, blank=True) material = models.ForeignKey(Material, verbose_name='计量某种物料', on_delete=models.CASCADE, null=True, blank=True) ep_monitored = models.ForeignKey('em.equipment', verbose_name='监测哪个设备', related_name='mp_ep_monitored', on_delete=models.SET_NULL, null=True, blank=True) ep_belong = models.ForeignKey('em.equipment', verbose_name='属于哪个设备', related_name='mp_ep_belong', on_delete=models.SET_NULL, null=True, blank=True) @@ -21,10 +21,30 @@ class Mpoint(CommonBDModel): formula = models.TextField('计算公式', default='') func_on_change = models.CharField('数据变动时执行方法', max_length=100, default='') interval = models.PositiveSmallIntegerField('采集间隔(秒)', default=10) + val_type = models.CharField('值类型', default='float', max_length=50, help_text='float, int, str, bool') + enabled = models.BooleanField('是否启用', default=False) + third_info = models.JSONField('第三方信息', default=dict, blank=True) + # {"from": "king", "n": "某名称","d": "某描述或备注","g": "某组", "t": "某类型", "id": 5001, "o": "其他信息"} +class MpLogx(models.Model): + """ + 测点记录超表 + """ + timex = models.DateTimeField('采集时间', primary_key=True) + mpoint = models.ForeignKey( + Mpoint, verbose_name='关联测点', on_delete=models.CASCADE) + val_float = models.FloatField('数值', null=True, blank=True) + val_int = models.IntegerField('数值', null=True, blank=True) + val_bool = models.BooleanField('数值', null=True, blank=True) + val_str = models.CharField('数值', max_length=100, null=True, blank=True) + + class Meta: + db_table = 'enm_mplog' + managed = False + unique_together = (('mpoint', 'timex'), ) class MpLog(BaseModel): - """测点原始记录 + """旧表(已废弃) """ mpoint = models.ForeignKey(Mpoint, verbose_name='关联测点', on_delete=models.SET_NULL, null=True, blank=True) tag_id = models.BigIntegerField('记录ID', db_index=True) diff --git a/apps/enm/services.py b/apps/enm/services.py index 8904505e..ec487316 100644 --- a/apps/enm/services.py +++ b/apps/enm/services.py @@ -1,10 +1,17 @@ -from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog +from apps.enm.models import Mpoint, MpointStat, EnStat, MpLog, MpLogx import re import traceback from apps.mtm.services import get_mgroup_goals from django.db.models import Q import datetime from django.utils import timezone +from django.core.cache import cache +import concurrent.futures +from django.db import connection +from django.utils import timezone +from datetime import datetime, timedelta +import time +from apps.utils.decorators import auto_log def translate_eval_formula(exp_str: str, year: int, month: int, day: int, hour: int): @@ -117,3 +124,111 @@ def shutdown_or_startup(mplog: MpLog): sflog=get_sflog(mgroup, mplog.tag_update)) mgroup.is_runing = False mgroup.save() + + + +def king_sync(projectName: str): + """ + 同步亚控测点 + """ + from apps.third.king.k import kingClient + from apps.third.king import king_api + _, res = kingClient.request(**king_api["read_variables"], params={"projectInstanceName": projectName}) + t_dict = {1: "bool", 2: "int", 3: "float", 4: "float", 5: "str"} + for item in res['objectList']: + if 't' in item and item['t']: + code = f'king_{item["n"]}' + item['from'] = 'king' + Mpoint.objects.get_or_create(code=code, defaults={ + "name": item["n"], + "code": code, + "enabled": False, + "is_auto": True, + "val_type": t_dict[item["t"]], + "third_info": item + }) + +def cache_mpoints(mpointId: str|None = None): + """ + 缓存所有可用的测点 + """ + if mpointId: + mpoints_data = Mpoint.objects.filter(id=mpointId).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") + else: + mpoints_data = Mpoint.objects.filter(is_auto=True, enabled=True).values("id", "code", "name", "val_type", "enabled", "is_auto", "interval", "func_on_change", "formula", "third_info") + for item in list(mpoints_data): + key = f'mpoint_{item["code"]}' + cache_mpoint = cache.get(key, {}) + cache_mpoint.update(item) + cache.set(key, cache_mpoint, timeout=None) + + +def insert_mplogx_from_king_realdata(data: dict, is_offset=True): + """ + 从king mqtt数据插入超表 + """ + objs = data['Objs'] + len_objs = len(objs) + pvs = data['PVs'] + + chunk_size = 200 + num_chunks = (len(objs) + chunk_size - 1) // chunk_size + + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + threads = [] + for i in range(num_chunks): + start = i * chunk_size + end = min(start + chunk_size, len_objs) + chunk = objs[start:end] + threads.append(executor.submit(insert_mplogx_from_king_realdata_chunk, chunk, pvs, is_offset)) + + concurrent.futures.wait(threads) + +@auto_log +def insert_mplogx_from_king_realdata_chunk(objs: list, pvs: dict, is_offset=True): + """ + 分批存库 + """ + oval = pvs['1'] + otime_str = pvs['2'] + otime_obj = timezone.make_aware(datetime.strptime(otime_str, '%Y-%m-%d %H:%M:%S.%f')) + insert_data = [] + for obj in objs: + n = obj['N'] + val = obj.get('1', None) + timex = obj.get("2", None) + cache_key = f'mpoint_king_{n}' + mpoint_data = cache.get(cache_key, None) + if mpoint_data is None: + return + val_type = mpoint_data['val_type'] + if is_offset: + if val is None: + val = oval + else: + val = oval + val + if timex is None: + timex = otime_obj + else: + timex = otime_obj + timedelta(milliseconds=timex) + else: + timex = timezone.make_aware(datetime.strptime(timex, '%Y-%m-%d %H:%M:%S.%f')) + mpoint_interval = mpoint_data['interval'] + mpoint_last_timex = mpoint_data.get('last_timex', None) + # 控制采集间隔 + can_save = False + if mpoint_last_timex: + if (timex - mpoint_last_timex).total_seconds() > mpoint_interval: + can_save = True + if can_save: + save_dict = { + "timex": timex, + "mpoint": Mpoint.objects.get(id=mpoint_data['id']), + } + # 更新对应缓存 + save_dict[f'val_{val_type}'] = val + mpoint_data.update({'last_val': val, 'last_timex': timex}) + cache.set(cache_key, mpoint_data) + insert_data.append(MpLogx(**save_dict)) + MpLogx.objects.bulk_create(insert_data) + diff --git a/apps/enm/views.py b/apps/enm/views.py index 77a6b807..fe3d4465 100644 --- a/apps/enm/views.py +++ b/apps/enm/views.py @@ -1,4 +1,5 @@ from django.shortcuts import render +from django.conf import settings from apps.enm.models import Mpoint, MpLog, MpointStat, EnStat, EnStat2 from apps.utils.viewsets import CustomModelViewSet, CustomGenericViewSet from rest_framework.mixins import ListModelMixin @@ -7,8 +8,10 @@ from apps.enm.serializers import (MpointSerializer, MpLogSerializer, MpointStatS from apps.enm.filters import MpointStatFilter, EnStatFilter, EnStat2Filter from apps.enm.tasks import cal_mpointstat_manual from rest_framework.response import Response +from rest_framework.serializers import Serializer from rest_framework.decorators import action from apps.enm.tasks import cal_mpointstats_duration +from apps.enm.services import king_sync class MpointViewSet(CustomModelViewSet): """ @@ -23,6 +26,16 @@ class MpointViewSet(CustomModelViewSet): search_fields = ['number', 'code'] + @action(methods=['post'], detail=False, perms_map={'post': 'mpoint.sync'}, serializer_class=Serializer) + def king_sync(self, request, *args, **kwargs): + """同步亚控采集点 + + 同步亚控采集点 + """ + king_sync(getattr(settings, 'KING_PROJECTNAME', "")) + return Response() + + class MpLogViewSet(ListModelMixin, CustomGenericViewSet): """ list:测点原始记录