factory/apps/qm/serializers.py

415 lines
17 KiB
Python

from apps.qm.models import (QuaStat, TestItem, Ftest, FtestItem, FtestWork, Ptest,
NotOkOption, Defect, Qct, QctTestItem, QctMat,
QctDefect, FtestDefect, FtestworkDefect)
from apps.utils.constants import EXCLUDE_FIELDS, EXCLUDE_FIELDS_BASE
from apps.utils.serializers import CustomModelSerializer
from rest_framework import serializers
from rest_framework.exceptions import ValidationError, ParseError
from apps.wpm.models import SfLog, WMaterial
from django.db import transaction
from apps.inm.serializers import MaterialBatchDetailSerializer
from apps.mtm.models import Material
class DefectSerializer(CustomModelSerializer):
class Meta:
model = Defect
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS
# def create(self, validated_data):
# code = validated_data["code"]
# if Defect.objects.get_queryset(all=True).filter(code=code).exists():
# raise ValidationError("缺陷标识已存在")
# return super().create(validated_data)
# def update(self, instance, validated_data):
# validated_data.pop("code", None)
# return super().update(instance, validated_data)
class TestItemSerializer(CustomModelSerializer):
process_name = serializers.CharField(source="process.name", read_only=True)
class Meta:
model = TestItem
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS
class QctSerializer(CustomModelSerializer):
class Meta:
model = Qct
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS
class QctTestItemSerializer(CustomModelSerializer):
testitem_name = serializers.CharField(source='testitem.name', read_only=True)
testitem_description = serializers.CharField(source='testitem.description', read_only=True)
testitem_field_type = serializers.CharField(source='testitem.field_type', read_only=True)
testitem_choices = serializers.CharField(source='testitem.choices', read_only=True)
testitem_cd_expr = serializers.CharField(source='testitem.cd_expr', read_only=True)
class Meta:
model = QctTestItem
fields = '__all__'
def validate(self, attrs):
testitem:TestItem = attrs.get("testitem")
if testitem.type != TestItem.T_TEST:
raise ParseError("只可选择检测项")
return attrs
class QctDefectSerializer(CustomModelSerializer):
defect_name = serializers.CharField(source='defect.name', read_only=True)
defect_okcate = serializers.CharField(source='defect.okcate', read_only=True)
class Meta:
model = QctDefect
fields = '__all__'
class QctMatSerializer(CustomModelSerializer):
material_name = serializers.StringRelatedField(source='material', read_only=True)
class Meta:
model = QctMat
fields = '__all__'
class QctDetailSerializer(CustomModelSerializer):
qct_testitems = QctTestItemSerializer(many=True, read_only=True)
qct_defects = QctDefectSerializer(many=True, read_only=True)
qct_mats = QctMatSerializer(many=True, read_only=True)
class Meta:
model = Qct
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS
class QuaStatSerializer(CustomModelSerializer):
sflog = serializers.PrimaryKeyRelatedField(
label="值班记录", queryset=SfLog.objects.all(), required=True)
belong_dept_name = serializers.CharField(
source='belong_dept.name', read_only=True)
material_name = serializers.CharField(
source='material.name', read_only=True)
testitem_name = serializers.CharField(
source='testitem.name', read_only=True)
class Meta:
model = QuaStat
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS + ['belong_dept']
extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
def validate(self, attrs):
attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
attrs['belong_dept'] = attrs['sflog'].mgroup.belong_dept
return attrs
class UpdateRatePassTSerializer(serializers.Serializer):
material_id = serializers.IntegerField(required=True)
testitem_id = serializers.IntegerField(required=True)
rate_pass_t = serializers.FloatField(required=True)
month_s = serializers.IntegerField(required=True)
def validate_rate_pass_t(self, value):
if value < 0 or value > 100:
raise serializers.ValidationError("Rate pass must be between 0 and 100.")
return value
class QuaStatUpdateSerializer(CustomModelSerializer):
belong_dept_name = serializers.CharField(
source='belong_dept.name', read_only=True)
material_name = serializers.CharField(
source='material.name', read_only=True)
testitem_name = serializers.CharField(
source='testitem.name', read_only=True)
class Meta:
model = QuaStat
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS + \
['belong_dept', 'sflog', 'material', 'testitem']
extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
def validate(self, attrs):
if attrs['num_test'] != 0:
attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
else:
attrs['rate_pass'] = 0
return super().validate(attrs)
class FtestworkdefectSerializer(CustomModelSerializer):
defect_name = serializers.CharField(source="defect.name", read_only=True)
class Meta:
model = FtestworkDefect
fields = "__all__"
read_only_fields = EXCLUDE_FIELDS_BASE + ["ftestwork"]
class FtestWorkCreateUpdateSerializer(CustomModelSerializer):
ftestworkdefect = FtestworkdefectSerializer(many=True)
class Meta:
model = FtestWork
fields = ['id', 'shift', 'wm', 'mb',
'type', 'type2', 'test_date', 'count', 'count_sampling',
'count_sampling_ok', 'count_ok', 'count_notok',
'count_notok_json', 'test_user', 'need_update_wm',
'equipment', 'note', "qct", "ftestworkdefect"]
extra_kwargs = {'test_user': {'required': True}, 'type': {'required': True}}
def validate(self, attrs):
qct = attrs.get("qct", None)
ftestworkdefect = attrs.get("ftestworkdefect", [])
type2 = attrs.get('type2', 20)
if type2 == 20: # 如果是全检
attrs['count_sampling'] = attrs['count']
if 'wm' in attrs and attrs['wm']:
attrs['mb'] = None
wm:WMaterial = attrs['wm']
if wm.material.tracking != Material.MA_TRACKING_BATCH:
raise ParseError('只支持追踪批次的物料检验')
if wm.state not in [WMaterial.WM_OK, WMaterial.WM_TEST]:
raise ValidationError('不支持对该物料检验')
attrs['material'] = wm.material
attrs['batch'] = wm.batch
count_notok_json = attrs.get('count_notok_json', None)
if qct or ftestworkdefect: # 没有qct时按原有逻辑处理
pass
else:
if count_notok_json is None:
raise ValidationError('不合格项不能为空')
count_notok = 0
for k, v in count_notok_json.items():
k_2 = k.replace('count_n_', '')
if k_2 not in NotOkOption.values:
raise ValidationError(f'不支持的不合格项{k_2}')
if isinstance(v, int) and v >= 0:
count_notok = count_notok + v
else:
raise ValidationError(f'不合格项{k_2}必须为非负整数')
attrs['count_notok'] = count_notok
if type2 == 20 and attrs['count'] != attrs['count_ok'] + attrs['count_notok']:
raise ValidationError('全检时总数必须等于合格数+不合格数')
elif 'mb' in attrs and attrs['mb']:
attrs['wm'] = None
attrs['material'] = attrs['mb'].material
attrs['batch'] = attrs['mb'].batch
if attrs['material'].tracking != Material.MA_TRACKING_BATCH:
raise ParseError('只支持追踪批次的物料检验')
else:
raise ValidationError('请选择车间/仓库库存')
if attrs['test_user']:
attrs['belong_dept'] = attrs['test_user'].belong_dept
return attrs
def create(self, validated_data):
ftestworkdefect = validated_data.pop("ftestworkdefect", [])
with transaction.atomic():
ins: FtestWork = super().create(validated_data)
for ftestworkdefect in ftestworkdefect:
if ftestworkdefect['count'] > 0:
FtestworkDefect.objects.create(ftestwork=ins, **ftestworkdefect)
if ins.qct or ftestworkdefect:
ins.cal_count()
return ins
def update(self, instance, validated_data):
ftestworkdefect = validated_data.pop("ftestworkdefect", [])
ins:FtestWork = super().update(instance, validated_data)
with transaction.atomic():
fd_ids = []
for item in ftestworkdefect:
if item["count"] > 0:
try:
ins = FtestworkDefect.objects.get(ftestwork=ins, defect=item["defect"])
except FtestworkDefect.DoesNotExist:
raise ParseError("新的缺陷项!")
except FtestworkDefect.MultipleObjectsReturned:
raise ParseError("缺陷项重复!")
ins.count = item["count"]
ins.save()
fd_ids.append(ins.id)
FtestworkDefect.objects.filter(ftestwork=ins).exclude(id__in=fd_ids).delete()
if ins.qct or ftestworkdefect:
ins.cal_count()
return ins
class FtestWorkSerializer(CustomModelSerializer):
material_name = serializers.StringRelatedField(
source='material', read_only=True)
material_cate = serializers.CharField(source='material.cate', read_only=True)
mb_ = MaterialBatchDetailSerializer(source='mb', read_only=True)
test_user_name = serializers.CharField(source='test_user.name', read_only=True)
class Meta:
model = FtestWork
fields = "__all__"
class FtestItemSerializer(CustomModelSerializer):
testitem_name = serializers.CharField(
source='testitem.name', read_only=True)
testitem_description = serializers.CharField(
source='testitem.description', read_only=True)
class Meta:
model = FtestItem
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest']
class FtestSerializer(CustomModelSerializer):
test_user_name = serializers.CharField(
source='test_user.name', read_only=True)
check_user_name = serializers.CharField(
source='check_user.name', read_only=True)
ftestitems = FtestItemSerializer(label='检验明细', many=True)
class Meta:
model = Ftest
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS
def validate(self, attrs):
ftest_work: FtestWork = attrs.get('ftest_work', None)
if ftest_work:
test_user = attrs.get('test_user', None)
attrs['type'] = ftest_work.type
if test_user is None:
attrs['test_user'] = ftest_work.test_user
return attrs
def create(self, validated_data):
ftestitems = validated_data.pop('ftestitems', [])
with transaction.atomic():
instance = super().create(validated_data)
for item in ftestitems:
FtestItem.objects.create(ftest=instance, **item)
return instance
def update(self, instance, validated_data):
validated_data.pop('ftest_work', None)
ftestitems = validated_data.pop('ftestitems', [])
with transaction.atomic():
instance = super().update(instance, validated_data)
for item in ftestitems:
id = item.get('id', None)
if id:
ftestitem = FtestItem.objects.get(id=id)
ftestitem.test_val = item['test_val']
ftestitem.check_val = item['check_val']
ftestitem.save()
return instance
class PtestSerializer(CustomModelSerializer):
testitem_name = serializers.CharField(
source='testitem.name', read_only=True)
test_user_name = serializers.CharField(source='test_user', read_only=True)
class Meta:
model = Ptest
fields = '__all__'
def create(self, validated_data):
if Ptest.objects.filter(sample_number=validated_data['sample_number']).exists():
raise serializers.ValidationError('该样品编号已存在')
return super().create(validated_data)
class FtestDefectSerializer(CustomModelSerializer):
defect_name = serializers.CharField(source='defect.name', read_only=True)
class Meta:
model = FtestDefect
fields = '__all__'
read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest', 'is_main']
class FtestItemProcessSerializer(CustomModelSerializer):
testitem_name = serializers.CharField(
source='testitem.name', read_only=True)
testitem_description = serializers.CharField(
source='testitem.description', read_only=True)
class Meta:
model = FtestItem
fields = ["id", "testitem", "test_user", "test_val_json", "testitem_name", "testitem_description", "addto_wpr", "test_equip"]
class FtestProcessSerializer(CustomModelSerializer):
test_user_name = serializers.CharField(
source='test_user.name', read_only=True)
ftestitems = FtestItemProcessSerializer(label='检验明细', many=True)
ftestdefects = FtestDefectSerializer(label='缺陷明细', many=True)
class Meta:
model = Ftest
fields = ["id", "test_date",
"test_user", "note", "ftestitems", "ftestdefects", "qct", "test_user_name"]
extra_kwargs = {"qct": {"required": True}}
def validate(self, attrs):
attrs["type"] = "process"
return attrs
def create(self, validated_data):
ftestitems = validated_data.pop('ftestitems', [])
ftestdefects = validated_data.pop('ftestdefects', [])
with transaction.atomic():
instance = super().create(validated_data)
for item in ftestitems:
FtestItem.objects.create(ftest=instance, **item)
is_ok = True
defect_main = None
has_is_main = False
for item2 in ftestdefects:
defect:Defect = item2["defect"]
if defect.okcate in [Defect.DEFECT_NOTOK] and item2["has"]:
is_ok = False
if not has_is_main:
item2["is_main"] = True
has_is_main = True
defect_main = defect
else:
item2["is_main"] = False
FtestDefect.objects.create(ftest=instance, **item2)
instance.defect_main = defect_main
instance.is_ok = is_ok
instance.save()
return instance
def update(self, instance, validated_data):
ftestitems = validated_data.pop('ftestitems', [])
ftestdefects = validated_data.pop('ftestdefects', [])
with transaction.atomic():
instance = super().update(instance, validated_data)
for item in ftestitems:
try:
ins = FtestItem.objects.get(testitem = item["testitem"], ftest=instance)
except FtestItem.DoesNotExist:
raise ParseError("新的检测项!")
for k, v in item.items():
setattr(ins, k, v)
ins.save()
is_ok = True
defect_main = None
has_is_main = False
for item2 in ftestdefects:
try:
ins:FtestDefect = FtestDefect.objects.get(ftest=instance, defect=item2["defect"])
except FtestDefect.DoesNotExist:
raise ParseError("新的缺陷项!")
for k, v in item2.items():
setattr(ins, k, v)
ins.save()
if ins.is_main:
has_is_main = True
defect_main = ins.defect
if ins.has and ins.defect.okcate in [Defect.DEFECT_NOTOK]:
is_ok = False
if not has_is_main:
ins.is_main = True
has_is_main = True
defect_main = ins.defect
else:
ins.is_main = False
ins.save()
instance.defect_main = defect_main
instance.is_ok = is_ok
instance.save()
return instance