436 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			436 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
from apps.qm.models import (QuaStat, TestItem, Ftest, FtestItem, FtestWork, Ptest, 
 | 
						|
                            NotOkOption, Defect, Qct, QctTestItem, QctMat,
 | 
						|
                            QctDefect, FtestDefect, FtestworkDefect)
 | 
						|
from apps.utils.constants import EXCLUDE_FIELDS, EXCLUDE_FIELDS_BASE
 | 
						|
from apps.utils.serializers import CustomModelSerializer
 | 
						|
from rest_framework import serializers
 | 
						|
from rest_framework.exceptions import ValidationError, ParseError
 | 
						|
from apps.wpm.models import SfLog, WMaterial
 | 
						|
from django.db import transaction
 | 
						|
from apps.inm.serializers import MaterialBatchDetailSerializer
 | 
						|
from apps.mtm.models import Material
 | 
						|
import logging
 | 
						|
 | 
						|
myLogger = logging.getLogger('log')
 | 
						|
 | 
						|
class DefectSerializer(CustomModelSerializer):
 | 
						|
    class Meta:
 | 
						|
        model = Defect
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS
 | 
						|
    
 | 
						|
    def validate(self, attrs):
 | 
						|
        cate = attrs["cate"]
 | 
						|
        if cate not in Defect.cate_list:
 | 
						|
            raise ParseError("缺陷类别错误")
 | 
						|
        return attrs
 | 
						|
 | 
						|
    # def create(self, validated_data):
 | 
						|
    #     code = validated_data["code"]
 | 
						|
    #     if Defect.objects.get_queryset(all=True).filter(code=code).exists():
 | 
						|
    #         raise ValidationError("缺陷标识已存在")
 | 
						|
    #     return super().create(validated_data)
 | 
						|
 | 
						|
    # def update(self, instance, validated_data):
 | 
						|
    #     validated_data.pop("code", None)
 | 
						|
    #     return super().update(instance, validated_data)
 | 
						|
class QctGetSerializer(serializers.Serializer):
 | 
						|
    material = serializers.CharField(label="物料ID")
 | 
						|
    tag = serializers.CharField(label="标签")
 | 
						|
 | 
						|
class TestItemSerializer(CustomModelSerializer):
 | 
						|
    process_name = serializers.CharField(source="process.name", read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = TestItem
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS
 | 
						|
 | 
						|
class QctSerializer(CustomModelSerializer):
 | 
						|
    class Meta:
 | 
						|
        model = Qct
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS
 | 
						|
 | 
						|
class QctTestItemSerializer(CustomModelSerializer):
 | 
						|
    testitem_name = serializers.CharField(source='testitem.name', read_only=True)
 | 
						|
    testitem_description = serializers.CharField(source='testitem.description', read_only=True)
 | 
						|
    testitem_field_type = serializers.CharField(source='testitem.field_type', read_only=True)
 | 
						|
    testitem_choices = serializers.CharField(source='testitem.choices', read_only=True)
 | 
						|
    testitem_cd_expr = serializers.CharField(source='testitem.cd_expr', read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = QctTestItem
 | 
						|
        fields = '__all__'
 | 
						|
 | 
						|
    def validate(self, attrs):
 | 
						|
        testitem:TestItem = attrs.get("testitem")
 | 
						|
        if testitem.type != TestItem.T_TEST:
 | 
						|
            raise ParseError("只可选择检测项")
 | 
						|
        return attrs
 | 
						|
 | 
						|
class QctDefectSerializer(CustomModelSerializer):
 | 
						|
    defect_name = serializers.CharField(source='defect.name', read_only=True)
 | 
						|
    defect_okcate = serializers.CharField(source='defect.okcate', read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = QctDefect
 | 
						|
        fields = '__all__'
 | 
						|
 | 
						|
class QctMatSerializer(CustomModelSerializer):
 | 
						|
    material_name = serializers.StringRelatedField(source='material', read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = QctMat
 | 
						|
        fields = '__all__'
 | 
						|
 | 
						|
class QctDetailSerializer(CustomModelSerializer):
 | 
						|
    qct_testitems = QctTestItemSerializer(many=True, read_only=True)
 | 
						|
    qct_defects = QctDefectSerializer(many=True, read_only=True)
 | 
						|
    qct_mats = QctMatSerializer(many=True, read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = Qct
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS
 | 
						|
 | 
						|
class QuaStatSerializer(CustomModelSerializer):
 | 
						|
    sflog = serializers.PrimaryKeyRelatedField(
 | 
						|
        label="值班记录", queryset=SfLog.objects.all(), required=True)
 | 
						|
    belong_dept_name = serializers.CharField(
 | 
						|
        source='belong_dept.name', read_only=True)
 | 
						|
    material_name = serializers.CharField(
 | 
						|
        source='material.name', read_only=True)
 | 
						|
    testitem_name = serializers.CharField(
 | 
						|
        source='testitem.name', read_only=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = QuaStat
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS + ['belong_dept']
 | 
						|
        extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
 | 
						|
            'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
 | 
						|
 | 
						|
    def validate(self, attrs):
 | 
						|
        attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
 | 
						|
        attrs['belong_dept'] = attrs['sflog'].mgroup.belong_dept
 | 
						|
        return attrs
 | 
						|
 | 
						|
 | 
						|
class UpdateRatePassTSerializer(serializers.Serializer):
 | 
						|
    material_id = serializers.IntegerField(required=True)
 | 
						|
    testitem_id = serializers.IntegerField(required=True)
 | 
						|
    rate_pass_t = serializers.FloatField(required=True)
 | 
						|
    month_s = serializers.IntegerField(required=True)
 | 
						|
 | 
						|
    def validate_rate_pass_t(self, value):
 | 
						|
        if value < 0 or value > 100:
 | 
						|
            raise serializers.ValidationError("Rate pass must be between 0 and 100.")
 | 
						|
        return value
 | 
						|
 | 
						|
class QuaStatUpdateSerializer(CustomModelSerializer):
 | 
						|
    belong_dept_name = serializers.CharField(
 | 
						|
        source='belong_dept.name', read_only=True)
 | 
						|
    material_name = serializers.CharField(
 | 
						|
        source='material.name', read_only=True)
 | 
						|
    testitem_name = serializers.CharField(
 | 
						|
        source='testitem.name', read_only=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = QuaStat
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS + \
 | 
						|
            ['belong_dept', 'sflog', 'material', 'testitem']
 | 
						|
        extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
 | 
						|
            'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
 | 
						|
 | 
						|
    def validate(self, attrs):
 | 
						|
        if attrs['num_test'] != 0:
 | 
						|
            attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
 | 
						|
        else:
 | 
						|
            attrs['rate_pass'] = 0
 | 
						|
        return super().validate(attrs)
 | 
						|
 | 
						|
 | 
						|
class FtestworkdefectSerializer(CustomModelSerializer):
 | 
						|
    defect_name = serializers.CharField(source="defect.name", read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = FtestworkDefect
 | 
						|
        fields = "__all__"
 | 
						|
        read_only_fields = EXCLUDE_FIELDS_BASE + ["ftestwork"]
 | 
						|
 | 
						|
 | 
						|
class FtestWorkCreateUpdateSerializer(CustomModelSerializer):
 | 
						|
    ftestworkdefect = FtestworkdefectSerializer(many=True, required=False)
 | 
						|
    class Meta:
 | 
						|
        model = FtestWork
 | 
						|
        fields = ['id', 'shift', 'wm', 'mb', 
 | 
						|
                  'type', 'type2', 'test_date', 'count', 'count_sampling', 
 | 
						|
                  'count_sampling_ok',  'count_ok', 'count_notok', 
 | 
						|
                  'count_notok_json', 'test_user', 'need_update_wm', 
 | 
						|
                  'equipment', 'note', "qct", "ftestworkdefect"]
 | 
						|
        extra_kwargs = {'test_user': {'required': True}, 'type': {'required': True}}
 | 
						|
 | 
						|
    def validate(self, attrs):
 | 
						|
        qct = attrs.get("qct", None)
 | 
						|
        ftestworkdefect = attrs.get("ftestworkdefect", [])
 | 
						|
        type2 = attrs.get('type2', 20)
 | 
						|
        if type2 == 20: # 如果是全检
 | 
						|
            attrs['count_sampling'] = attrs['count']
 | 
						|
        if 'wm' in attrs and attrs['wm']:
 | 
						|
            attrs['mb'] = None
 | 
						|
            wm:WMaterial = attrs['wm']
 | 
						|
            if wm.material.tracking != Material.MA_TRACKING_BATCH:
 | 
						|
                raise ParseError('只支持追踪批次的物料检验')
 | 
						|
            if wm.state not in [WMaterial.WM_OK, WMaterial.WM_TEST]:
 | 
						|
                raise ValidationError('不支持对该物料检验')
 | 
						|
            attrs['material'] = wm.material
 | 
						|
            attrs['batch'] = wm.batch
 | 
						|
            count_notok_json = attrs.get('count_notok_json', None)
 | 
						|
            if qct or ftestworkdefect:  # 没有qct时按原有逻辑处理
 | 
						|
                pass
 | 
						|
            else:
 | 
						|
                if count_notok_json is None:
 | 
						|
                    raise ValidationError('不合格项不能为空')
 | 
						|
                count_notok = 0
 | 
						|
                for k, v in count_notok_json.items():
 | 
						|
                    k_2 = k.replace('count_n_', '')
 | 
						|
                    if k_2 not in NotOkOption.values:
 | 
						|
                        raise ValidationError(f'不支持的不合格项{k_2}')
 | 
						|
                    if isinstance(v, int) and v >= 0:
 | 
						|
                        count_notok = count_notok + v
 | 
						|
                    else:
 | 
						|
                        raise ValidationError(f'不合格项{k_2}必须为非负整数')
 | 
						|
                attrs['count_notok'] = count_notok
 | 
						|
                if type2 == 20 and attrs['count'] != attrs['count_ok'] + attrs['count_notok']:
 | 
						|
                    raise ValidationError('全检时总数必须等于合格数+不合格数')
 | 
						|
        elif 'mb' in attrs and attrs['mb']:
 | 
						|
            attrs['wm'] = None
 | 
						|
            attrs['material'] = attrs['mb'].material
 | 
						|
            attrs['batch'] = attrs['mb'].batch
 | 
						|
            if attrs['material'].tracking != Material.MA_TRACKING_BATCH:
 | 
						|
                raise ParseError('只支持追踪批次的物料检验')
 | 
						|
        else:
 | 
						|
            raise ValidationError('请选择车间/仓库库存')
 | 
						|
        if attrs['test_user']:
 | 
						|
            attrs['belong_dept'] = attrs['test_user'].belong_dept
 | 
						|
        return attrs
 | 
						|
 | 
						|
    def create(self, validated_data):
 | 
						|
        ftestworkdefect = validated_data.pop("ftestworkdefect", [])
 | 
						|
        with transaction.atomic():
 | 
						|
            ins: FtestWork = super().create(validated_data)
 | 
						|
            for ftestworkdefect in ftestworkdefect:
 | 
						|
                if ftestworkdefect['count'] > 0:
 | 
						|
                    FtestworkDefect.objects.create(ftestwork=ins, **ftestworkdefect)
 | 
						|
            if ins.qct or ftestworkdefect:
 | 
						|
                ins.cal_count()
 | 
						|
        return ins
 | 
						|
 | 
						|
    def update(self, instance, validated_data):
 | 
						|
        ftestworkdefect = validated_data.pop("ftestworkdefect", [])
 | 
						|
        ins:FtestWork = super().update(instance, validated_data)
 | 
						|
        with transaction.atomic():
 | 
						|
            fd_ids = []
 | 
						|
            for item in ftestworkdefect:
 | 
						|
                if item["count"] > 0:
 | 
						|
                    try:
 | 
						|
                        ins = FtestworkDefect.objects.get(ftestwork=ins, defect=item["defect"])
 | 
						|
                    except FtestworkDefect.DoesNotExist:
 | 
						|
                        raise ParseError("新的缺陷项!")
 | 
						|
                    except FtestworkDefect.MultipleObjectsReturned:
 | 
						|
                        raise ParseError("缺陷项重复!")
 | 
						|
                    ins.count = item["count"]
 | 
						|
                    ins.save()
 | 
						|
                    fd_ids.append(ins.id)
 | 
						|
            FtestworkDefect.objects.filter(ftestwork=ins).exclude(id__in=fd_ids).delete()
 | 
						|
            if ins.qct or ftestworkdefect:
 | 
						|
                ins.cal_count()
 | 
						|
        return ins
 | 
						|
 | 
						|
class FtestWorkSerializer(CustomModelSerializer):
 | 
						|
    material_name = serializers.StringRelatedField(
 | 
						|
        source='material', read_only=True)
 | 
						|
    material_cate = serializers.CharField(source='material.cate', read_only=True)
 | 
						|
    mb_ = MaterialBatchDetailSerializer(source='mb', read_only=True)
 | 
						|
    test_user_name = serializers.CharField(source='test_user.name', read_only=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = FtestWork
 | 
						|
        fields = "__all__"
 | 
						|
 | 
						|
 | 
						|
class FtestItemSerializer(CustomModelSerializer):
 | 
						|
    testitem_name = serializers.CharField(
 | 
						|
        source='testitem.name', read_only=True)
 | 
						|
    testitem_description = serializers.CharField(
 | 
						|
        source='testitem.description', read_only=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = FtestItem
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest']
 | 
						|
 | 
						|
 | 
						|
class FtestSerializer(CustomModelSerializer):
 | 
						|
    test_user_name = serializers.CharField(
 | 
						|
        source='test_user.name', read_only=True)
 | 
						|
    check_user_name = serializers.CharField(
 | 
						|
        source='check_user.name', read_only=True)
 | 
						|
    ftestitems = FtestItemSerializer(label='检验明细', many=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = Ftest
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS
 | 
						|
 | 
						|
    def validate(self, attrs):
 | 
						|
        ftest_work: FtestWork = attrs.get('ftest_work', None)
 | 
						|
        if ftest_work:
 | 
						|
            test_user = attrs.get('test_user', None)
 | 
						|
            attrs['type'] = ftest_work.type
 | 
						|
            if test_user is None:
 | 
						|
                attrs['test_user'] = ftest_work.test_user
 | 
						|
        return attrs
 | 
						|
 | 
						|
    def create(self, validated_data):
 | 
						|
        ftestitems = validated_data.pop('ftestitems', [])
 | 
						|
        with transaction.atomic():
 | 
						|
            instance = super().create(validated_data)
 | 
						|
            for item in ftestitems:
 | 
						|
                FtestItem.objects.create(ftest=instance, **item)
 | 
						|
        return instance
 | 
						|
 | 
						|
    def update(self, instance, validated_data):
 | 
						|
        validated_data.pop('ftest_work', None)
 | 
						|
        ftestitems = validated_data.pop('ftestitems', [])
 | 
						|
        with transaction.atomic():
 | 
						|
            instance = super().update(instance, validated_data)
 | 
						|
            for item in ftestitems:
 | 
						|
                id = item.get('id', None)
 | 
						|
                if id:
 | 
						|
                    ftestitem = FtestItem.objects.get(id=id)
 | 
						|
                    ftestitem.test_val = item['test_val']
 | 
						|
                    ftestitem.check_val = item['check_val']
 | 
						|
                    ftestitem.save()
 | 
						|
        return instance
 | 
						|
 | 
						|
 | 
						|
class PtestSerializer(CustomModelSerializer):
 | 
						|
    testitem_name = serializers.CharField(
 | 
						|
        source='testitem.name', read_only=True)
 | 
						|
    test_user_name = serializers.CharField(source='test_user', read_only=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = Ptest
 | 
						|
        fields = '__all__'
 | 
						|
 | 
						|
    def create(self, validated_data):
 | 
						|
        if Ptest.objects.filter(sample_number=validated_data['sample_number']).exists():
 | 
						|
            raise serializers.ValidationError('该样品编号已存在')
 | 
						|
        return super().create(validated_data)
 | 
						|
 | 
						|
 | 
						|
class FtestDefectSerializer(CustomModelSerializer):
 | 
						|
    defect_name = serializers.CharField(source='defect.name', read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = FtestDefect
 | 
						|
        fields = '__all__'
 | 
						|
        read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest', 'is_main']
 | 
						|
 | 
						|
 | 
						|
class FtestItemProcessSerializer(CustomModelSerializer):
 | 
						|
    testitem_name = serializers.CharField(
 | 
						|
        source='testitem.name', read_only=True)
 | 
						|
    testitem_description = serializers.CharField(
 | 
						|
        source='testitem.description', read_only=True)
 | 
						|
    class Meta:
 | 
						|
        model = FtestItem
 | 
						|
        fields = ["id", "testitem", "test_user", "test_val_json", "testitem_name", "testitem_description", "addto_wpr", "test_equip"]
 | 
						|
 | 
						|
class FtestProcessSerializer(CustomModelSerializer):
 | 
						|
    test_user_name = serializers.CharField(
 | 
						|
        source='test_user.name', read_only=True)
 | 
						|
    ftestitems = FtestItemProcessSerializer(label='检验明细', many=True)
 | 
						|
    ftestdefects = FtestDefectSerializer(label='缺陷明细', many=True)
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = Ftest
 | 
						|
        fields = ["id", "test_date",
 | 
						|
                  "test_user", "note", "ftestitems", "ftestdefects", "qct", "test_user_name"]
 | 
						|
        extra_kwargs = {"qct": {"required": True}}
 | 
						|
 | 
						|
    def validate(self, attrs):
 | 
						|
        attrs["type"] = "process"
 | 
						|
        return attrs
 | 
						|
 | 
						|
    def create(self, validated_data):
 | 
						|
        ftestitems = validated_data.pop('ftestitems', [])
 | 
						|
        ftestdefects = validated_data.pop('ftestdefects', [])
 | 
						|
        with transaction.atomic():
 | 
						|
            instance = super().create(validated_data)
 | 
						|
            for item in ftestitems:
 | 
						|
                FtestItem.objects.create(ftest=instance, **item)
 | 
						|
            is_ok = True
 | 
						|
            defect_main = None
 | 
						|
            has_is_main = False
 | 
						|
            for item2 in ftestdefects:
 | 
						|
                defect:Defect = item2["defect"]
 | 
						|
                if defect.okcate in [Defect.DEFECT_NOTOK] and item2["has"]:
 | 
						|
                    is_ok = False
 | 
						|
                    if not has_is_main:
 | 
						|
                        item2["is_main"] = True
 | 
						|
                        has_is_main = True
 | 
						|
                        defect_main = defect
 | 
						|
                    else:
 | 
						|
                        item2["is_main"] = False
 | 
						|
                FtestDefect.objects.create(ftest=instance, **item2)
 | 
						|
            if not is_ok:
 | 
						|
                instance.defect_main = defect_main
 | 
						|
            else:
 | 
						|
                instance.defect_main = None
 | 
						|
            instance.is_ok = is_ok
 | 
						|
            instance.save()
 | 
						|
        return instance
 | 
						|
 | 
						|
    def update(self, instance, validated_data):
 | 
						|
        ftestitems = validated_data.pop('ftestitems', [])
 | 
						|
        ftestdefects = validated_data.pop('ftestdefects', [])
 | 
						|
        with transaction.atomic():
 | 
						|
            instance = super().update(instance, validated_data)
 | 
						|
            for item in ftestitems:
 | 
						|
                try:
 | 
						|
                    ins = FtestItem.objects.get(testitem = item["testitem"], ftest=instance)
 | 
						|
                except FtestItem.DoesNotExist:
 | 
						|
                    raise ParseError("新的检测项!")
 | 
						|
                for k, v in item.items():
 | 
						|
                    setattr(ins, k, v)
 | 
						|
                ins.save()
 | 
						|
            is_ok = True
 | 
						|
            defect_main = None
 | 
						|
            has_is_main = False
 | 
						|
            for item2 in ftestdefects:
 | 
						|
                try:
 | 
						|
                    ins:FtestDefect = FtestDefect.objects.get(ftest=instance, defect=item2["defect"])
 | 
						|
                except FtestDefect.MultipleObjectsReturned:
 | 
						|
                    myLogger.error(f"缺陷项重复!-ftestid:{instance.id}-defectid:{item2['defect'].id}")
 | 
						|
                    raise ParseError("获取到重复的缺陷项!")
 | 
						|
                except FtestDefect.DoesNotExist:
 | 
						|
                    raise ParseError("新的缺陷项!")
 | 
						|
                for k, v in item2.items():
 | 
						|
                    setattr(ins, k, v)
 | 
						|
                ins.save()
 | 
						|
                if ins.is_main:
 | 
						|
                    has_is_main = True
 | 
						|
                    defect_main = ins.defect
 | 
						|
                if ins.has and ins.defect.okcate in [Defect.DEFECT_NOTOK]:
 | 
						|
                    is_ok = False
 | 
						|
                    if not has_is_main:
 | 
						|
                        ins.is_main = True
 | 
						|
                        has_is_main = True
 | 
						|
                        defect_main = ins.defect
 | 
						|
                    else:
 | 
						|
                        ins.is_main = False
 | 
						|
                    ins.save()
 | 
						|
            if not is_ok:
 | 
						|
                instance.defect_main = defect_main
 | 
						|
            else:
 | 
						|
                instance.defect_main = None
 | 
						|
            instance.is_ok = is_ok
 | 
						|
            instance.save()
 | 
						|
        return instance |