from apps.qm.models import (QuaStat, TestItem, Ftest, FtestItem, FtestWork, Ptest, NotOkOption, Defect, Qct, QctTestItem, QctMat, QctDefect, FtestDefect, FtestworkDefect) from apps.utils.constants import EXCLUDE_FIELDS, EXCLUDE_FIELDS_BASE from apps.utils.serializers import CustomModelSerializer from rest_framework import serializers from rest_framework.exceptions import ValidationError, ParseError from apps.wpm.models import SfLog, WMaterial from django.db import transaction from apps.inm.serializers import MaterialBatchDetailSerializer from apps.mtm.models import Material import logging myLogger = logging.getLogger('log') class DefectSerializer(CustomModelSerializer): class Meta: model = Defect fields = '__all__' read_only_fields = EXCLUDE_FIELDS def validate(self, attrs): cate = attrs["cate"] if cate not in Defect.cate_list: raise ParseError("缺陷类别错误") return attrs # def create(self, validated_data): # code = validated_data["code"] # if Defect.objects.get_queryset(all=True).filter(code=code).exists(): # raise ValidationError("缺陷标识已存在") # return super().create(validated_data) # def update(self, instance, validated_data): # validated_data.pop("code", None) # return super().update(instance, validated_data) class QctGetSerializer(serializers.Serializer): material = serializers.CharField(label="物料ID") tag = serializers.CharField(label="标签") type = serializers.CharField(label="类型(in/out)", required=False) class TestItemSerializer(CustomModelSerializer): process_name = serializers.CharField(source="process.name", read_only=True) class Meta: model = TestItem fields = '__all__' read_only_fields = EXCLUDE_FIELDS class QctSerializer(CustomModelSerializer): class Meta: model = Qct fields = '__all__' read_only_fields = EXCLUDE_FIELDS class QctTestItemSerializer(CustomModelSerializer): testitem_name = serializers.CharField(source='testitem.name', read_only=True) testitem_type = serializers.CharField(source='testitem.type', read_only=True) testitem_description = serializers.CharField(source='testitem.description', read_only=True) testitem_field_type = serializers.CharField(source='testitem.field_type', read_only=True) testitem_choices = serializers.CharField(source='testitem.choices', read_only=True) testitem_cd_expr = serializers.CharField(source='testitem.cd_expr', read_only=True) class Meta: model = QctTestItem fields = '__all__' # def validate(self, attrs): # testitem:TestItem = attrs.get("testitem") # if testitem.type != TestItem.T_TEST: # raise ParseError("只可选择检测项") # return attrs class QctDefectSerializer(CustomModelSerializer): defect_name = serializers.CharField(source='defect.name', read_only=True) defect_okcate = serializers.CharField(source='defect.okcate', read_only=True) class Meta: model = QctDefect fields = '__all__' class QctMatSerializer(CustomModelSerializer): material_name = serializers.StringRelatedField(source='material', read_only=True) class Meta: model = QctMat fields = '__all__' class QctDetailSerializer(CustomModelSerializer): qct_testitems = QctTestItemSerializer(many=True, read_only=True) qct_defects = QctDefectSerializer(many=True, read_only=True) qct_mats = QctMatSerializer(many=True, read_only=True) class Meta: model = Qct fields = '__all__' read_only_fields = EXCLUDE_FIELDS class QuaStatSerializer(CustomModelSerializer): sflog = serializers.PrimaryKeyRelatedField( label="值班记录", queryset=SfLog.objects.all(), required=True) belong_dept_name = serializers.CharField( source='belong_dept.name', read_only=True) material_name = serializers.CharField( source='material.name', read_only=True) testitem_name = serializers.CharField( source='testitem.name', read_only=True) class Meta: model = QuaStat fields = '__all__' read_only_fields = EXCLUDE_FIELDS + ['belong_dept'] extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': { 'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}} def validate(self, attrs): attrs['rate_pass'] = attrs['num_ok']/attrs['num_test'] attrs['belong_dept'] = attrs['sflog'].mgroup.belong_dept return attrs class UpdateRatePassTSerializer(serializers.Serializer): material_id = serializers.IntegerField(required=True) testitem_id = serializers.IntegerField(required=True) rate_pass_t = serializers.FloatField(required=True) month_s = serializers.IntegerField(required=True) def validate_rate_pass_t(self, value): if value < 0 or value > 100: raise serializers.ValidationError("Rate pass must be between 0 and 100.") return value class QuaStatUpdateSerializer(CustomModelSerializer): belong_dept_name = serializers.CharField( source='belong_dept.name', read_only=True) material_name = serializers.CharField( source='material.name', read_only=True) testitem_name = serializers.CharField( source='testitem.name', read_only=True) class Meta: model = QuaStat fields = '__all__' read_only_fields = EXCLUDE_FIELDS + \ ['belong_dept', 'sflog', 'material', 'testitem'] extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': { 'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}} def validate(self, attrs): if attrs['num_test'] != 0: attrs['rate_pass'] = attrs['num_ok']/attrs['num_test'] else: attrs['rate_pass'] = 0 return super().validate(attrs) class FtestworkdefectSerializer(CustomModelSerializer): defect_name = serializers.CharField(source="defect.name", read_only=True) class Meta: model = FtestworkDefect fields = "__all__" read_only_fields = EXCLUDE_FIELDS_BASE + ["ftestwork"] class FtestWorkCreateUpdateSerializer(CustomModelSerializer): ftestworkdefect = FtestworkdefectSerializer(many=True, required=False) class Meta: model = FtestWork fields = ['id', 'shift', 'wm', 'mb', 'type', 'type2', 'test_date', 'count', 'count_sampling', 'count_sampling_ok', 'count_ok', 'count_notok', 'count_notok_json', 'test_user', 'need_update_wm', 'equipment', 'note', "qct", "ftestworkdefect"] extra_kwargs = {'test_user': {'required': True}, 'type': {'required': True}} def validate(self, attrs): qct = attrs.get("qct", None) ftestworkdefect = attrs.get("ftestworkdefect", []) type2 = attrs.get('type2', 20) if type2 == 20: # 如果是全检 attrs['count_sampling'] = attrs['count'] if 'wm' in attrs and attrs['wm']: attrs['mb'] = None wm:WMaterial = attrs['wm'] if wm.material.tracking != Material.MA_TRACKING_BATCH: raise ParseError('只支持追踪批次的物料检验') if wm.state not in [WMaterial.WM_OK, WMaterial.WM_TEST]: raise ValidationError('不支持对该物料检验') attrs['material'] = wm.material attrs['batch'] = wm.batch count_notok_json = attrs.get('count_notok_json', None) if qct or ftestworkdefect: # 没有qct时按原有逻辑处理 pass else: if count_notok_json is None: raise ValidationError('不合格项不能为空') count_notok = 0 for k, v in count_notok_json.items(): k_2 = k.replace('count_n_', '') if k_2 not in NotOkOption.values: raise ValidationError(f'不支持的不合格项{k_2}') if isinstance(v, int) and v >= 0: count_notok = count_notok + v else: raise ValidationError(f'不合格项{k_2}必须为非负整数') attrs['count_notok'] = count_notok if type2 == 20 and attrs['count'] != attrs['count_ok'] + attrs['count_notok']: raise ValidationError('全检时总数必须等于合格数+不合格数') elif 'mb' in attrs and attrs['mb']: attrs['wm'] = None attrs['material'] = attrs['mb'].material attrs['batch'] = attrs['mb'].batch if attrs['material'].tracking != Material.MA_TRACKING_BATCH: raise ParseError('只支持追踪批次的物料检验') else: raise ValidationError('请选择车间/仓库库存') if attrs['test_user']: attrs['belong_dept'] = attrs['test_user'].belong_dept return attrs def create(self, validated_data): ftestworkdefect = validated_data.pop("ftestworkdefect", []) ins: FtestWork = super().create(validated_data) for ftestworkdefect in ftestworkdefect: if ftestworkdefect['count'] > 0: FtestworkDefect.objects.create(ftestwork=ins, **ftestworkdefect) if ins.qct or ftestworkdefect: ins.cal_count() return ins def update(self, instance, validated_data): ftestworkdefect = validated_data.pop("ftestworkdefect", []) ins:FtestWork = super().update(instance, validated_data) fd_ids = [] for item in ftestworkdefect: if item["count"] > 0: try: ins = FtestworkDefect.objects.get(ftestwork=ins, defect=item["defect"]) except FtestworkDefect.DoesNotExist: raise ParseError("新的缺陷项!") except FtestworkDefect.MultipleObjectsReturned: raise ParseError("缺陷项重复!") ins.count = item["count"] ins.save() fd_ids.append(ins.id) FtestworkDefect.objects.filter(ftestwork=ins).exclude(id__in=fd_ids).delete() if ins.qct or ftestworkdefect: ins.cal_count() return ins class FtestWorkSerializer(CustomModelSerializer): material_name = serializers.StringRelatedField( source='material', read_only=True) material_cate = serializers.CharField(source='material.cate', read_only=True) mb_ = MaterialBatchDetailSerializer(source='mb', read_only=True) test_user_name = serializers.CharField(source='test_user.name', read_only=True) class Meta: model = FtestWork fields = "__all__" class FtestItemSerializer(CustomModelSerializer): testitem_name = serializers.CharField( source='testitem.name', read_only=True) testitem_description = serializers.CharField( source='testitem.description', read_only=True) class Meta: model = FtestItem fields = '__all__' read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest'] class FtestSerializer(CustomModelSerializer): test_user_name = serializers.CharField( source='test_user.name', read_only=True) check_user_name = serializers.CharField( source='check_user.name', read_only=True) ftestitems = FtestItemSerializer(label='检验明细', many=True) class Meta: model = Ftest fields = '__all__' read_only_fields = EXCLUDE_FIELDS def validate(self, attrs): ftest_work: FtestWork = attrs.get('ftest_work', None) if ftest_work: test_user = attrs.get('test_user', None) attrs['type'] = ftest_work.type if test_user is None: attrs['test_user'] = ftest_work.test_user return attrs def create(self, validated_data): ftestitems = validated_data.pop('ftestitems', []) instance = super().create(validated_data) for item in ftestitems: FtestItem.objects.create(ftest=instance, **item) return instance def update(self, instance, validated_data): validated_data.pop('ftest_work', None) ftestitems = validated_data.pop('ftestitems', []) instance = super().update(instance, validated_data) for item in ftestitems: id = item.get('id', None) if id: ftestitem = FtestItem.objects.get(id=id) ftestitem.test_val = item['test_val'] ftestitem.check_val = item['check_val'] ftestitem.save() return instance class PtestSerializer(CustomModelSerializer): testitem_name = serializers.CharField( source='testitem.name', read_only=True) test_user_name = serializers.CharField(source='test_user', read_only=True) class Meta: model = Ptest fields = '__all__' def create(self, validated_data): if Ptest.objects.filter(sample_number=validated_data['sample_number']).exists(): raise serializers.ValidationError('该样品编号已存在') return super().create(validated_data) class FtestDefectSerializer(CustomModelSerializer): defect_name = serializers.CharField(source='defect.name', read_only=True) class Meta: model = FtestDefect fields = '__all__' read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest', 'is_main'] class FtestItemProcessSerializer(CustomModelSerializer): testitem_name = serializers.CharField( source='testitem.name', read_only=True) testitem_description = serializers.CharField( source='testitem.description', read_only=True) class Meta: model = FtestItem fields = ["id", "testitem", "test_user", "test_val_json", "testitem_name", "testitem_description", "addto_wpr", "test_equip"] class FtestProcessSerializer(CustomModelSerializer): test_user_name = serializers.CharField( source='test_user.name', read_only=True) ftestitems = FtestItemProcessSerializer(label='检验明细', many=True) ftestdefects = FtestDefectSerializer(label='缺陷明细', many=True) class Meta: model = Ftest fields = ["id", "test_date", "test_user", "note", "ftestitems", "ftestdefects", "qct", "test_user_name"] extra_kwargs = {"qct": {"required": True}} def validate(self, attrs): attrs["type"] = "process" return attrs def create(self, validated_data): ftestitems = validated_data.pop('ftestitems', []) ftestdefects = validated_data.pop('ftestdefects', []) instance = super().create(validated_data) for item in ftestitems: FtestItem.objects.create(ftest=instance, **item) is_ok = True defect_main = None has_is_main = False for item2 in ftestdefects: defect:Defect = item2["defect"] if defect.okcate in [Defect.DEFECT_NOTOK] and item2["has"]: is_ok = False if not has_is_main: item2["is_main"] = True has_is_main = True defect_main = defect else: item2["is_main"] = False FtestDefect.objects.create(ftest=instance, **item2) if not is_ok: instance.defect_main = defect_main else: instance.defect_main = None instance.is_ok = is_ok instance.save() return instance def update(self, instance, validated_data): ftestitems = validated_data.pop('ftestitems', []) ftestdefects = validated_data.pop('ftestdefects', []) instance = super().update(instance, validated_data) for item in ftestitems: try: ins = FtestItem.objects.get(testitem = item["testitem"], ftest=instance) except FtestItem.DoesNotExist: ins = FtestItem.objects.create(ftest=instance, **item) for k, v in item.items(): setattr(ins, k, v) ins.save() is_ok = True defect_main = None has_is_main = False for item2 in ftestdefects: try: ins:FtestDefect = FtestDefect.objects.get(ftest=instance, defect=item2["defect"]) except FtestDefect.MultipleObjectsReturned: myLogger.error(f"缺陷项重复!-ftestid:{instance.id}-defectid:{item2['defect'].id}") raise ParseError("获取到重复的缺陷项!") except FtestDefect.DoesNotExist: ins = FtestDefect.objects.create(ftest=instance, **item2) for k, v in item2.items(): setattr(ins, k, v) ins.save() if ins.is_main: has_is_main = True defect_main = ins.defect if ins.has and ins.defect.okcate in [Defect.DEFECT_NOTOK]: is_ok = False if not has_is_main: ins.is_main = True has_is_main = True defect_main = ins.defect else: ins.is_main = False ins.save() if not is_ok: instance.defect_main = defect_main else: instance.defect_main = None instance.is_ok = is_ok instance.save() return instance