436 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			436 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
| from apps.qm.models import (QuaStat, TestItem, Ftest, FtestItem, FtestWork, Ptest, 
 | |
|                             NotOkOption, Defect, Qct, QctTestItem, QctMat,
 | |
|                             QctDefect, FtestDefect, FtestworkDefect)
 | |
| from apps.utils.constants import EXCLUDE_FIELDS, EXCLUDE_FIELDS_BASE
 | |
| from apps.utils.serializers import CustomModelSerializer
 | |
| from rest_framework import serializers
 | |
| from rest_framework.exceptions import ValidationError, ParseError
 | |
| from apps.wpm.models import SfLog, WMaterial
 | |
| from django.db import transaction
 | |
| from apps.inm.serializers import MaterialBatchDetailSerializer
 | |
| from apps.mtm.models import Material
 | |
| import logging
 | |
| 
 | |
| myLogger = logging.getLogger('log')
 | |
| 
 | |
| class DefectSerializer(CustomModelSerializer):
 | |
|     class Meta:
 | |
|         model = Defect
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS
 | |
|     
 | |
|     def validate(self, attrs):
 | |
|         cate = attrs["cate"]
 | |
|         if cate not in Defect.cate_list:
 | |
|             raise ParseError("缺陷类别错误")
 | |
|         return attrs
 | |
| 
 | |
|     # def create(self, validated_data):
 | |
|     #     code = validated_data["code"]
 | |
|     #     if Defect.objects.get_queryset(all=True).filter(code=code).exists():
 | |
|     #         raise ValidationError("缺陷标识已存在")
 | |
|     #     return super().create(validated_data)
 | |
| 
 | |
|     # def update(self, instance, validated_data):
 | |
|     #     validated_data.pop("code", None)
 | |
|     #     return super().update(instance, validated_data)
 | |
| class QctGetSerializer(serializers.Serializer):
 | |
|     material = serializers.CharField(label="物料ID")
 | |
|     tag = serializers.CharField(label="标签")
 | |
| 
 | |
| class TestItemSerializer(CustomModelSerializer):
 | |
|     process_name = serializers.CharField(source="process.name", read_only=True)
 | |
|     class Meta:
 | |
|         model = TestItem
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS
 | |
| 
 | |
| class QctSerializer(CustomModelSerializer):
 | |
|     class Meta:
 | |
|         model = Qct
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS
 | |
| 
 | |
| class QctTestItemSerializer(CustomModelSerializer):
 | |
|     testitem_name = serializers.CharField(source='testitem.name', read_only=True)
 | |
|     testitem_description = serializers.CharField(source='testitem.description', read_only=True)
 | |
|     testitem_field_type = serializers.CharField(source='testitem.field_type', read_only=True)
 | |
|     testitem_choices = serializers.CharField(source='testitem.choices', read_only=True)
 | |
|     testitem_cd_expr = serializers.CharField(source='testitem.cd_expr', read_only=True)
 | |
|     class Meta:
 | |
|         model = QctTestItem
 | |
|         fields = '__all__'
 | |
| 
 | |
|     def validate(self, attrs):
 | |
|         testitem:TestItem = attrs.get("testitem")
 | |
|         if testitem.type != TestItem.T_TEST:
 | |
|             raise ParseError("只可选择检测项")
 | |
|         return attrs
 | |
| 
 | |
| class QctDefectSerializer(CustomModelSerializer):
 | |
|     defect_name = serializers.CharField(source='defect.name', read_only=True)
 | |
|     defect_okcate = serializers.CharField(source='defect.okcate', read_only=True)
 | |
|     class Meta:
 | |
|         model = QctDefect
 | |
|         fields = '__all__'
 | |
| 
 | |
| class QctMatSerializer(CustomModelSerializer):
 | |
|     material_name = serializers.StringRelatedField(source='material', read_only=True)
 | |
|     class Meta:
 | |
|         model = QctMat
 | |
|         fields = '__all__'
 | |
| 
 | |
| class QctDetailSerializer(CustomModelSerializer):
 | |
|     qct_testitems = QctTestItemSerializer(many=True, read_only=True)
 | |
|     qct_defects = QctDefectSerializer(many=True, read_only=True)
 | |
|     qct_mats = QctMatSerializer(many=True, read_only=True)
 | |
|     class Meta:
 | |
|         model = Qct
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS
 | |
| 
 | |
| class QuaStatSerializer(CustomModelSerializer):
 | |
|     sflog = serializers.PrimaryKeyRelatedField(
 | |
|         label="值班记录", queryset=SfLog.objects.all(), required=True)
 | |
|     belong_dept_name = serializers.CharField(
 | |
|         source='belong_dept.name', read_only=True)
 | |
|     material_name = serializers.CharField(
 | |
|         source='material.name', read_only=True)
 | |
|     testitem_name = serializers.CharField(
 | |
|         source='testitem.name', read_only=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = QuaStat
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS + ['belong_dept']
 | |
|         extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
 | |
|             'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
 | |
| 
 | |
|     def validate(self, attrs):
 | |
|         attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
 | |
|         attrs['belong_dept'] = attrs['sflog'].mgroup.belong_dept
 | |
|         return attrs
 | |
| 
 | |
| 
 | |
| class UpdateRatePassTSerializer(serializers.Serializer):
 | |
|     material_id = serializers.IntegerField(required=True)
 | |
|     testitem_id = serializers.IntegerField(required=True)
 | |
|     rate_pass_t = serializers.FloatField(required=True)
 | |
|     month_s = serializers.IntegerField(required=True)
 | |
| 
 | |
|     def validate_rate_pass_t(self, value):
 | |
|         if value < 0 or value > 100:
 | |
|             raise serializers.ValidationError("Rate pass must be between 0 and 100.")
 | |
|         return value
 | |
| 
 | |
| class QuaStatUpdateSerializer(CustomModelSerializer):
 | |
|     belong_dept_name = serializers.CharField(
 | |
|         source='belong_dept.name', read_only=True)
 | |
|     material_name = serializers.CharField(
 | |
|         source='material.name', read_only=True)
 | |
|     testitem_name = serializers.CharField(
 | |
|         source='testitem.name', read_only=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = QuaStat
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS + \
 | |
|             ['belong_dept', 'sflog', 'material', 'testitem']
 | |
|         extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
 | |
|             'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
 | |
| 
 | |
|     def validate(self, attrs):
 | |
|         if attrs['num_test'] != 0:
 | |
|             attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
 | |
|         else:
 | |
|             attrs['rate_pass'] = 0
 | |
|         return super().validate(attrs)
 | |
| 
 | |
| 
 | |
| class FtestworkdefectSerializer(CustomModelSerializer):
 | |
|     defect_name = serializers.CharField(source="defect.name", read_only=True)
 | |
|     class Meta:
 | |
|         model = FtestworkDefect
 | |
|         fields = "__all__"
 | |
|         read_only_fields = EXCLUDE_FIELDS_BASE + ["ftestwork"]
 | |
| 
 | |
| 
 | |
| class FtestWorkCreateUpdateSerializer(CustomModelSerializer):
 | |
|     ftestworkdefect = FtestworkdefectSerializer(many=True, required=False)
 | |
|     class Meta:
 | |
|         model = FtestWork
 | |
|         fields = ['id', 'shift', 'wm', 'mb', 
 | |
|                   'type', 'type2', 'test_date', 'count', 'count_sampling', 
 | |
|                   'count_sampling_ok',  'count_ok', 'count_notok', 
 | |
|                   'count_notok_json', 'test_user', 'need_update_wm', 
 | |
|                   'equipment', 'note', "qct", "ftestworkdefect"]
 | |
|         extra_kwargs = {'test_user': {'required': True}, 'type': {'required': True}}
 | |
| 
 | |
|     def validate(self, attrs):
 | |
|         qct = attrs.get("qct", None)
 | |
|         ftestworkdefect = attrs.get("ftestworkdefect", [])
 | |
|         type2 = attrs.get('type2', 20)
 | |
|         if type2 == 20: # 如果是全检
 | |
|             attrs['count_sampling'] = attrs['count']
 | |
|         if 'wm' in attrs and attrs['wm']:
 | |
|             attrs['mb'] = None
 | |
|             wm:WMaterial = attrs['wm']
 | |
|             if wm.material.tracking != Material.MA_TRACKING_BATCH:
 | |
|                 raise ParseError('只支持追踪批次的物料检验')
 | |
|             if wm.state not in [WMaterial.WM_OK, WMaterial.WM_TEST]:
 | |
|                 raise ValidationError('不支持对该物料检验')
 | |
|             attrs['material'] = wm.material
 | |
|             attrs['batch'] = wm.batch
 | |
|             count_notok_json = attrs.get('count_notok_json', None)
 | |
|             if qct or ftestworkdefect:  # 没有qct时按原有逻辑处理
 | |
|                 pass
 | |
|             else:
 | |
|                 if count_notok_json is None:
 | |
|                     raise ValidationError('不合格项不能为空')
 | |
|                 count_notok = 0
 | |
|                 for k, v in count_notok_json.items():
 | |
|                     k_2 = k.replace('count_n_', '')
 | |
|                     if k_2 not in NotOkOption.values:
 | |
|                         raise ValidationError(f'不支持的不合格项{k_2}')
 | |
|                     if isinstance(v, int) and v >= 0:
 | |
|                         count_notok = count_notok + v
 | |
|                     else:
 | |
|                         raise ValidationError(f'不合格项{k_2}必须为非负整数')
 | |
|                 attrs['count_notok'] = count_notok
 | |
|                 if type2 == 20 and attrs['count'] != attrs['count_ok'] + attrs['count_notok']:
 | |
|                     raise ValidationError('全检时总数必须等于合格数+不合格数')
 | |
|         elif 'mb' in attrs and attrs['mb']:
 | |
|             attrs['wm'] = None
 | |
|             attrs['material'] = attrs['mb'].material
 | |
|             attrs['batch'] = attrs['mb'].batch
 | |
|             if attrs['material'].tracking != Material.MA_TRACKING_BATCH:
 | |
|                 raise ParseError('只支持追踪批次的物料检验')
 | |
|         else:
 | |
|             raise ValidationError('请选择车间/仓库库存')
 | |
|         if attrs['test_user']:
 | |
|             attrs['belong_dept'] = attrs['test_user'].belong_dept
 | |
|         return attrs
 | |
| 
 | |
|     def create(self, validated_data):
 | |
|         ftestworkdefect = validated_data.pop("ftestworkdefect", [])
 | |
|         with transaction.atomic():
 | |
|             ins: FtestWork = super().create(validated_data)
 | |
|             for ftestworkdefect in ftestworkdefect:
 | |
|                 if ftestworkdefect['count'] > 0:
 | |
|                     FtestworkDefect.objects.create(ftestwork=ins, **ftestworkdefect)
 | |
|             if ins.qct or ftestworkdefect:
 | |
|                 ins.cal_count()
 | |
|         return ins
 | |
| 
 | |
|     def update(self, instance, validated_data):
 | |
|         ftestworkdefect = validated_data.pop("ftestworkdefect", [])
 | |
|         ins:FtestWork = super().update(instance, validated_data)
 | |
|         with transaction.atomic():
 | |
|             fd_ids = []
 | |
|             for item in ftestworkdefect:
 | |
|                 if item["count"] > 0:
 | |
|                     try:
 | |
|                         ins = FtestworkDefect.objects.get(ftestwork=ins, defect=item["defect"])
 | |
|                     except FtestworkDefect.DoesNotExist:
 | |
|                         raise ParseError("新的缺陷项!")
 | |
|                     except FtestworkDefect.MultipleObjectsReturned:
 | |
|                         raise ParseError("缺陷项重复!")
 | |
|                     ins.count = item["count"]
 | |
|                     ins.save()
 | |
|                     fd_ids.append(ins.id)
 | |
|             FtestworkDefect.objects.filter(ftestwork=ins).exclude(id__in=fd_ids).delete()
 | |
|             if ins.qct or ftestworkdefect:
 | |
|                 ins.cal_count()
 | |
|         return ins
 | |
| 
 | |
| class FtestWorkSerializer(CustomModelSerializer):
 | |
|     material_name = serializers.StringRelatedField(
 | |
|         source='material', read_only=True)
 | |
|     material_cate = serializers.CharField(source='material.cate', read_only=True)
 | |
|     mb_ = MaterialBatchDetailSerializer(source='mb', read_only=True)
 | |
|     test_user_name = serializers.CharField(source='test_user.name', read_only=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = FtestWork
 | |
|         fields = "__all__"
 | |
| 
 | |
| 
 | |
| class FtestItemSerializer(CustomModelSerializer):
 | |
|     testitem_name = serializers.CharField(
 | |
|         source='testitem.name', read_only=True)
 | |
|     testitem_description = serializers.CharField(
 | |
|         source='testitem.description', read_only=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = FtestItem
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest']
 | |
| 
 | |
| 
 | |
| class FtestSerializer(CustomModelSerializer):
 | |
|     test_user_name = serializers.CharField(
 | |
|         source='test_user.name', read_only=True)
 | |
|     check_user_name = serializers.CharField(
 | |
|         source='check_user.name', read_only=True)
 | |
|     ftestitems = FtestItemSerializer(label='检验明细', many=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = Ftest
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS
 | |
| 
 | |
|     def validate(self, attrs):
 | |
|         ftest_work: FtestWork = attrs.get('ftest_work', None)
 | |
|         if ftest_work:
 | |
|             test_user = attrs.get('test_user', None)
 | |
|             attrs['type'] = ftest_work.type
 | |
|             if test_user is None:
 | |
|                 attrs['test_user'] = ftest_work.test_user
 | |
|         return attrs
 | |
| 
 | |
|     def create(self, validated_data):
 | |
|         ftestitems = validated_data.pop('ftestitems', [])
 | |
|         with transaction.atomic():
 | |
|             instance = super().create(validated_data)
 | |
|             for item in ftestitems:
 | |
|                 FtestItem.objects.create(ftest=instance, **item)
 | |
|         return instance
 | |
| 
 | |
|     def update(self, instance, validated_data):
 | |
|         validated_data.pop('ftest_work', None)
 | |
|         ftestitems = validated_data.pop('ftestitems', [])
 | |
|         with transaction.atomic():
 | |
|             instance = super().update(instance, validated_data)
 | |
|             for item in ftestitems:
 | |
|                 id = item.get('id', None)
 | |
|                 if id:
 | |
|                     ftestitem = FtestItem.objects.get(id=id)
 | |
|                     ftestitem.test_val = item['test_val']
 | |
|                     ftestitem.check_val = item['check_val']
 | |
|                     ftestitem.save()
 | |
|         return instance
 | |
| 
 | |
| 
 | |
| class PtestSerializer(CustomModelSerializer):
 | |
|     testitem_name = serializers.CharField(
 | |
|         source='testitem.name', read_only=True)
 | |
|     test_user_name = serializers.CharField(source='test_user', read_only=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = Ptest
 | |
|         fields = '__all__'
 | |
| 
 | |
|     def create(self, validated_data):
 | |
|         if Ptest.objects.filter(sample_number=validated_data['sample_number']).exists():
 | |
|             raise serializers.ValidationError('该样品编号已存在')
 | |
|         return super().create(validated_data)
 | |
| 
 | |
| 
 | |
| class FtestDefectSerializer(CustomModelSerializer):
 | |
|     defect_name = serializers.CharField(source='defect.name', read_only=True)
 | |
|     class Meta:
 | |
|         model = FtestDefect
 | |
|         fields = '__all__'
 | |
|         read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest', 'is_main']
 | |
| 
 | |
| 
 | |
| class FtestItemProcessSerializer(CustomModelSerializer):
 | |
|     testitem_name = serializers.CharField(
 | |
|         source='testitem.name', read_only=True)
 | |
|     testitem_description = serializers.CharField(
 | |
|         source='testitem.description', read_only=True)
 | |
|     class Meta:
 | |
|         model = FtestItem
 | |
|         fields = ["id", "testitem", "test_user", "test_val_json", "testitem_name", "testitem_description", "addto_wpr", "test_equip"]
 | |
| 
 | |
| class FtestProcessSerializer(CustomModelSerializer):
 | |
|     test_user_name = serializers.CharField(
 | |
|         source='test_user.name', read_only=True)
 | |
|     ftestitems = FtestItemProcessSerializer(label='检验明细', many=True)
 | |
|     ftestdefects = FtestDefectSerializer(label='缺陷明细', many=True)
 | |
| 
 | |
|     class Meta:
 | |
|         model = Ftest
 | |
|         fields = ["id", "test_date",
 | |
|                   "test_user", "note", "ftestitems", "ftestdefects", "qct", "test_user_name"]
 | |
|         extra_kwargs = {"qct": {"required": True}}
 | |
| 
 | |
|     def validate(self, attrs):
 | |
|         attrs["type"] = "process"
 | |
|         return attrs
 | |
| 
 | |
|     def create(self, validated_data):
 | |
|         ftestitems = validated_data.pop('ftestitems', [])
 | |
|         ftestdefects = validated_data.pop('ftestdefects', [])
 | |
|         with transaction.atomic():
 | |
|             instance = super().create(validated_data)
 | |
|             for item in ftestitems:
 | |
|                 FtestItem.objects.create(ftest=instance, **item)
 | |
|             is_ok = True
 | |
|             defect_main = None
 | |
|             has_is_main = False
 | |
|             for item2 in ftestdefects:
 | |
|                 defect:Defect = item2["defect"]
 | |
|                 if defect.okcate in [Defect.DEFECT_NOTOK] and item2["has"]:
 | |
|                     is_ok = False
 | |
|                     if not has_is_main:
 | |
|                         item2["is_main"] = True
 | |
|                         has_is_main = True
 | |
|                         defect_main = defect
 | |
|                     else:
 | |
|                         item2["is_main"] = False
 | |
|                 FtestDefect.objects.create(ftest=instance, **item2)
 | |
|             if not is_ok:
 | |
|                 instance.defect_main = defect_main
 | |
|             else:
 | |
|                 instance.defect_main = None
 | |
|             instance.is_ok = is_ok
 | |
|             instance.save()
 | |
|         return instance
 | |
| 
 | |
|     def update(self, instance, validated_data):
 | |
|         ftestitems = validated_data.pop('ftestitems', [])
 | |
|         ftestdefects = validated_data.pop('ftestdefects', [])
 | |
|         with transaction.atomic():
 | |
|             instance = super().update(instance, validated_data)
 | |
|             for item in ftestitems:
 | |
|                 try:
 | |
|                     ins = FtestItem.objects.get(testitem = item["testitem"], ftest=instance)
 | |
|                 except FtestItem.DoesNotExist:
 | |
|                     raise ParseError("新的检测项!")
 | |
|                 for k, v in item.items():
 | |
|                     setattr(ins, k, v)
 | |
|                 ins.save()
 | |
|             is_ok = True
 | |
|             defect_main = None
 | |
|             has_is_main = False
 | |
|             for item2 in ftestdefects:
 | |
|                 try:
 | |
|                     ins:FtestDefect = FtestDefect.objects.get(ftest=instance, defect=item2["defect"])
 | |
|                 except FtestDefect.MultipleObjectsReturned:
 | |
|                     myLogger.error(f"缺陷项重复!-ftestid:{instance.id}-defectid:{item2['defect'].id}")
 | |
|                     raise ParseError("获取到重复的缺陷项!")
 | |
|                 except FtestDefect.DoesNotExist:
 | |
|                     raise ParseError("新的缺陷项!")
 | |
|                 for k, v in item2.items():
 | |
|                     setattr(ins, k, v)
 | |
|                 ins.save()
 | |
|                 if ins.is_main:
 | |
|                     has_is_main = True
 | |
|                     defect_main = ins.defect
 | |
|                 if ins.has and ins.defect.okcate in [Defect.DEFECT_NOTOK]:
 | |
|                     is_ok = False
 | |
|                     if not has_is_main:
 | |
|                         ins.is_main = True
 | |
|                         has_is_main = True
 | |
|                         defect_main = ins.defect
 | |
|                     else:
 | |
|                         ins.is_main = False
 | |
|                     ins.save()
 | |
|             if not is_ok:
 | |
|                 instance.defect_main = defect_main
 | |
|             else:
 | |
|                 instance.defect_main = None
 | |
|             instance.is_ok = is_ok
 | |
|             instance.save()
 | |
|         return instance |