feat: 增加mlogbw start_test接口以优化批量操作

This commit is contained in:
caoqianming 2025-08-27 11:11:04 +08:00
parent 9ce5d29fa6
commit cd4b686f11
2 changed files with 148 additions and 1 deletions

View File

@ -26,6 +26,7 @@ import logging
from apps.qm.models import Defect
from apps.utils.snowflake import idWorker
from decimal import Decimal
from apps.em.models import Equipment
mylogger = logging.getLogger("log")
class OtherLogSerializer(CustomModelSerializer):
@ -895,6 +896,143 @@ class MlogbwCreateUpdateSerializer(CustomModelSerializer):
ftest.delete()
return mlogbw
class MlogbwStartTestSerializer(serializers.Serializer):
mlogbw_ids = serializers.ListField(child=serializers.CharField(), label="mlogbwId列表")
test_equip = serializers.CharField(label="测试设备", allow_null=True, required=False)
test_user = serializers.CharField(label="测试人员")
defects = serializers.ListField(child=serializers.CharField(), required=False, allow_null=True, label="缺陷项列表")
testitems = serializers.ListField(child=serializers.CharField(), required=False, allow_null=True, label="测试项列表")
test_date = serializers.DateField(label="测试日期")
qct = serializers.CharField(label="检测表id")
def save(self, **kwargs):
from apps.qm.models import Ftest, FtestDefect, FtestItem
validated_data = self.validated_data
mlogbw_ids = validated_data["mlogbw_ids"]
test_equip_id = validated_data.get("test_equip")
test_user_id = validated_data["test_user"]
defect_ids = validated_data.get("defects", [])
testitem_ids = validated_data.get("testitems", [])
test_date = validated_data["test_date"]
qct_id = validated_data["qct"]
# 预加载相关对象
test_equip = Equipment.objects.get(id=test_equip_id) if test_equip_id else None
test_user = User.objects.get(id=test_user_id)
qct = Qct.objects.get(id=qct_id)
# 批量获取所有mlogbw对象
mlogbws = Mlogbw.objects.filter(id__in=mlogbw_ids).select_related('ftest')
# 预加载缺陷和测试项
defects = Defect.objects.filter(id__in=defect_ids) if defect_ids else []
testitems = FtestItem.objects.filter(id__in=testitem_ids) if testitem_ids else []
existing_ftests = {}
new_ftests = []
mlogbws_to_update = []
# 分离已存在和需要新建的ftest
for mlogbw in mlogbws:
if mlogbw.ftest:
existing_ftests[mlogbw.ftest_id] = mlogbw.ftest
else:
ftest = Ftest(mlogbw=mlogbw, test_date=test_date, qct=qct)
new_ftests.append(ftest)
mlogbws_to_update.append(mlogbw)
# 批量创建新的ftest
if new_ftests:
Ftest.objects.bulk_create(new_ftests)
# 更新mlogbw的ftest关系
for mlogbw, ftest in zip(mlogbws_to_update, new_ftests):
mlogbw.ftest = ftest
Mlogbw.objects.bulk_update(mlogbws_to_update, ['ftest'])
# 将新创建的ftest添加到existing_ftests中
for ftest in new_ftests:
existing_ftests[ftest.id] = ftest
# 批量处理缺陷项需要更新test_user
if defects and existing_ftests:
# 获取所有现有的FtestDefect记录
existing_defect_ids = FtestDefect.objects.filter(
ftest_id__in=existing_ftests.keys(),
defect_id__in=[d.id for d in defects]
).values_list('defect_id', 'ftest_id')
existing_defect_map = {(defect_id, ftest_id) for defect_id, ftest_id in existing_defect_ids}
defects_to_create = []
defects_to_update = []
for ftest in existing_ftests.values():
for defect in defects:
if (defect.id, ftest.id) in existing_defect_map:
# 已有记录,需要更新
defects_to_update.append((ftest.id, defect.id))
else:
# 新记录,需要创建
defects_to_create.append(
FtestDefect(ftest=ftest, defect=defect, test_user=test_user)
)
# 批量创建新记录
if defects_to_create:
FtestDefect.objects.bulk_create(defects_to_create)
# 批量更新已有记录的test_user
if defects_to_update:
FtestDefect.objects.filter(
ftest_id__in=[item[0] for item in defects_to_update],
defect_id__in=[item[1] for item in defects_to_update]
).update(test_user=test_user)
# 批量处理测试项需要更新test_user和test_equip
if testitems and existing_ftests:
# 获取所有现有的FtestItem记录
existing_testitem_ids = FtestItem.objects.filter(
ftest_id__in=existing_ftests.keys(),
testitem_id__in=[t.id for t in testitems]
).values_list('testitem_id', 'ftest_id')
existing_testitem_map = {(testitem_id, ftest_id) for testitem_id, ftest_id in existing_testitem_ids}
testitems_to_create = []
testitems_to_update_condition = Q()
for ftest in existing_ftests.values():
for testitem in testitems:
if (testitem.id, ftest.id) in existing_testitem_map:
# 已有记录,添加到更新条件
testitems_to_update_condition |= Q(ftest=ftest, testitem=testitem)
else:
# 新记录,需要创建
testitems_to_create.append(
FtestItem(
ftest=ftest,
testitem=testitem,
test_user=test_user,
test_equip=test_equip
)
)
# 批量创建新记录
if testitems_to_create:
FtestItem.objects.bulk_create(testitems_to_create)
# 批量更新已有记录的test_user和test_equip
if testitems_to_update_condition:
FtestItem.objects.filter(testitems_to_update_condition).update(
test_user=test_user,
test_equip=test_equip
)
return super().save(**kwargs)
class MlogbOutUpdateSerializer(CustomModelSerializer):
mlogbdefect = MlogbDefectSerializer(many=True, required=False)
count_json = CountJsonSerializer(required=False, many=True)

View File

@ -26,7 +26,7 @@ from .serializers import (SflogExpSerializer, SfLogSerializer, StLogSerializer,
MlogbDetailSerializer, MlogbInSerializer, MlogbInUpdateSerializer,
MlogbOutUpdateSerializer, FmlogSerializer, FmlogUpdateSerializer, BatchStSerializer,
MlogbwCreateUpdateSerializer, HandoverMgroupSerializer, MlogListSerializer,
MlogbSerializer, MlogUserSerializer, BatchLogSerializer, MlogQuickSerializer)
MlogbSerializer, MlogUserSerializer, BatchLogSerializer, MlogQuickSerializer, MlogbwStartTestSerializer)
from .services import mlog_submit, handover_submit, mlog_revert, get_batch_dag, handover_revert
from apps.wpm.services import mlog_submit_validate, generate_new_batch
from apps.wf.models import State, Ticket
@ -38,6 +38,7 @@ from apps.utils.tools import convert_ordereddict, update_dict
from django.db.models import Count
from datetime import datetime, timedelta
from apps.utils.lock import lock_model_record_d_method
from apps.em.models import Equipment
# Create your views here.
@ -1006,6 +1007,14 @@ class MlogbwViewSet(CustomModelViewSet):
mlog = mlogb.mlog
mlog.cal_mlog_count_from_mlogb()
@action(methods=['post'], detail=False, perms_map={'post': 'mlog.update'}, serializer_class=MlogbwStartTestSerializer)
@transaction.atomic
def start_test(self, request, *args, **kwargs):
sr = self.get_serializer(data=request.data)
sr.is_valid(raise_exception=True)
sr.save()
return Response()
class MlogUserViewSet(BulkCreateModelMixin, ListModelMixin, DestroyModelMixin, CustomGenericViewSet):
perms_map = {"get": "*", "post": "mlog.update", "delete": "mlog.update"}
queryset = MlogUser.objects.all()