424 lines
17 KiB
Python
424 lines
17 KiB
Python
from apps.qm.models import (QuaStat, TestItem, Ftest, FtestItem, FtestWork, Ptest,
|
|
NotOkOption, Defect, Qct, QctTestItem, QctMat,
|
|
QctDefect, FtestDefect, FtestworkDefect)
|
|
from apps.utils.constants import EXCLUDE_FIELDS, EXCLUDE_FIELDS_BASE
|
|
from apps.utils.serializers import CustomModelSerializer
|
|
from rest_framework import serializers
|
|
from rest_framework.exceptions import ValidationError, ParseError
|
|
from apps.wpm.models import SfLog, WMaterial
|
|
from django.db import transaction
|
|
from apps.inm.serializers import MaterialBatchDetailSerializer
|
|
from apps.mtm.models import Material
|
|
|
|
class DefectSerializer(CustomModelSerializer):
|
|
class Meta:
|
|
model = Defect
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS
|
|
|
|
# def create(self, validated_data):
|
|
# code = validated_data["code"]
|
|
# if Defect.objects.get_queryset(all=True).filter(code=code).exists():
|
|
# raise ValidationError("缺陷标识已存在")
|
|
# return super().create(validated_data)
|
|
|
|
# def update(self, instance, validated_data):
|
|
# validated_data.pop("code", None)
|
|
# return super().update(instance, validated_data)
|
|
class QctGetSerializer(serializers.Serializer):
|
|
material = serializers.CharField(label="物料ID")
|
|
tag = serializers.CharField(label="标签")
|
|
|
|
class TestItemSerializer(CustomModelSerializer):
|
|
process_name = serializers.CharField(source="process.name", read_only=True)
|
|
class Meta:
|
|
model = TestItem
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS
|
|
|
|
class QctSerializer(CustomModelSerializer):
|
|
class Meta:
|
|
model = Qct
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS
|
|
|
|
class QctTestItemSerializer(CustomModelSerializer):
|
|
testitem_name = serializers.CharField(source='testitem.name', read_only=True)
|
|
testitem_description = serializers.CharField(source='testitem.description', read_only=True)
|
|
testitem_field_type = serializers.CharField(source='testitem.field_type', read_only=True)
|
|
testitem_choices = serializers.CharField(source='testitem.choices', read_only=True)
|
|
testitem_cd_expr = serializers.CharField(source='testitem.cd_expr', read_only=True)
|
|
class Meta:
|
|
model = QctTestItem
|
|
fields = '__all__'
|
|
|
|
def validate(self, attrs):
|
|
testitem:TestItem = attrs.get("testitem")
|
|
if testitem.type != TestItem.T_TEST:
|
|
raise ParseError("只可选择检测项")
|
|
return attrs
|
|
|
|
class QctDefectSerializer(CustomModelSerializer):
|
|
defect_name = serializers.CharField(source='defect.name', read_only=True)
|
|
defect_okcate = serializers.CharField(source='defect.okcate', read_only=True)
|
|
class Meta:
|
|
model = QctDefect
|
|
fields = '__all__'
|
|
|
|
class QctMatSerializer(CustomModelSerializer):
|
|
material_name = serializers.StringRelatedField(source='material', read_only=True)
|
|
class Meta:
|
|
model = QctMat
|
|
fields = '__all__'
|
|
|
|
class QctDetailSerializer(CustomModelSerializer):
|
|
qct_testitems = QctTestItemSerializer(many=True, read_only=True)
|
|
qct_defects = QctDefectSerializer(many=True, read_only=True)
|
|
qct_mats = QctMatSerializer(many=True, read_only=True)
|
|
class Meta:
|
|
model = Qct
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS
|
|
|
|
class QuaStatSerializer(CustomModelSerializer):
|
|
sflog = serializers.PrimaryKeyRelatedField(
|
|
label="值班记录", queryset=SfLog.objects.all(), required=True)
|
|
belong_dept_name = serializers.CharField(
|
|
source='belong_dept.name', read_only=True)
|
|
material_name = serializers.CharField(
|
|
source='material.name', read_only=True)
|
|
testitem_name = serializers.CharField(
|
|
source='testitem.name', read_only=True)
|
|
|
|
class Meta:
|
|
model = QuaStat
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS + ['belong_dept']
|
|
extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
|
|
'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
|
|
|
|
def validate(self, attrs):
|
|
attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
|
|
attrs['belong_dept'] = attrs['sflog'].mgroup.belong_dept
|
|
return attrs
|
|
|
|
|
|
class UpdateRatePassTSerializer(serializers.Serializer):
|
|
material_id = serializers.IntegerField(required=True)
|
|
testitem_id = serializers.IntegerField(required=True)
|
|
rate_pass_t = serializers.FloatField(required=True)
|
|
month_s = serializers.IntegerField(required=True)
|
|
|
|
def validate_rate_pass_t(self, value):
|
|
if value < 0 or value > 100:
|
|
raise serializers.ValidationError("Rate pass must be between 0 and 100.")
|
|
return value
|
|
|
|
class QuaStatUpdateSerializer(CustomModelSerializer):
|
|
belong_dept_name = serializers.CharField(
|
|
source='belong_dept.name', read_only=True)
|
|
material_name = serializers.CharField(
|
|
source='material.name', read_only=True)
|
|
testitem_name = serializers.CharField(
|
|
source='testitem.name', read_only=True)
|
|
|
|
class Meta:
|
|
model = QuaStat
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS + \
|
|
['belong_dept', 'sflog', 'material', 'testitem']
|
|
extra_kwargs = {'val_avg': {'required': True, 'allow_null': False}, 'num_test': {
|
|
'required': True, 'allow_null': False}, 'num_ok': {'required': True, 'allow_null': False}}
|
|
|
|
def validate(self, attrs):
|
|
if attrs['num_test'] != 0:
|
|
attrs['rate_pass'] = attrs['num_ok']/attrs['num_test']
|
|
else:
|
|
attrs['rate_pass'] = 0
|
|
return super().validate(attrs)
|
|
|
|
|
|
class FtestworkdefectSerializer(CustomModelSerializer):
|
|
defect_name = serializers.CharField(source="defect.name", read_only=True)
|
|
class Meta:
|
|
model = FtestworkDefect
|
|
fields = "__all__"
|
|
read_only_fields = EXCLUDE_FIELDS_BASE + ["ftestwork"]
|
|
|
|
|
|
class FtestWorkCreateUpdateSerializer(CustomModelSerializer):
|
|
ftestworkdefect = FtestworkdefectSerializer(many=True, required=False)
|
|
class Meta:
|
|
model = FtestWork
|
|
fields = ['id', 'shift', 'wm', 'mb',
|
|
'type', 'type2', 'test_date', 'count', 'count_sampling',
|
|
'count_sampling_ok', 'count_ok', 'count_notok',
|
|
'count_notok_json', 'test_user', 'need_update_wm',
|
|
'equipment', 'note', "qct", "ftestworkdefect"]
|
|
extra_kwargs = {'test_user': {'required': True}, 'type': {'required': True}}
|
|
|
|
def validate(self, attrs):
|
|
qct = attrs.get("qct", None)
|
|
ftestworkdefect = attrs.get("ftestworkdefect", [])
|
|
type2 = attrs.get('type2', 20)
|
|
if type2 == 20: # 如果是全检
|
|
attrs['count_sampling'] = attrs['count']
|
|
if 'wm' in attrs and attrs['wm']:
|
|
attrs['mb'] = None
|
|
wm:WMaterial = attrs['wm']
|
|
if wm.material.tracking != Material.MA_TRACKING_BATCH:
|
|
raise ParseError('只支持追踪批次的物料检验')
|
|
if wm.state not in [WMaterial.WM_OK, WMaterial.WM_TEST]:
|
|
raise ValidationError('不支持对该物料检验')
|
|
attrs['material'] = wm.material
|
|
attrs['batch'] = wm.batch
|
|
count_notok_json = attrs.get('count_notok_json', None)
|
|
if qct or ftestworkdefect: # 没有qct时按原有逻辑处理
|
|
pass
|
|
else:
|
|
if count_notok_json is None:
|
|
raise ValidationError('不合格项不能为空')
|
|
count_notok = 0
|
|
for k, v in count_notok_json.items():
|
|
k_2 = k.replace('count_n_', '')
|
|
if k_2 not in NotOkOption.values:
|
|
raise ValidationError(f'不支持的不合格项{k_2}')
|
|
if isinstance(v, int) and v >= 0:
|
|
count_notok = count_notok + v
|
|
else:
|
|
raise ValidationError(f'不合格项{k_2}必须为非负整数')
|
|
attrs['count_notok'] = count_notok
|
|
if type2 == 20 and attrs['count'] != attrs['count_ok'] + attrs['count_notok']:
|
|
raise ValidationError('全检时总数必须等于合格数+不合格数')
|
|
elif 'mb' in attrs and attrs['mb']:
|
|
attrs['wm'] = None
|
|
attrs['material'] = attrs['mb'].material
|
|
attrs['batch'] = attrs['mb'].batch
|
|
if attrs['material'].tracking != Material.MA_TRACKING_BATCH:
|
|
raise ParseError('只支持追踪批次的物料检验')
|
|
else:
|
|
raise ValidationError('请选择车间/仓库库存')
|
|
if attrs['test_user']:
|
|
attrs['belong_dept'] = attrs['test_user'].belong_dept
|
|
return attrs
|
|
|
|
def create(self, validated_data):
|
|
ftestworkdefect = validated_data.pop("ftestworkdefect", [])
|
|
with transaction.atomic():
|
|
ins: FtestWork = super().create(validated_data)
|
|
for ftestworkdefect in ftestworkdefect:
|
|
if ftestworkdefect['count'] > 0:
|
|
FtestworkDefect.objects.create(ftestwork=ins, **ftestworkdefect)
|
|
if ins.qct or ftestworkdefect:
|
|
ins.cal_count()
|
|
return ins
|
|
|
|
def update(self, instance, validated_data):
|
|
ftestworkdefect = validated_data.pop("ftestworkdefect", [])
|
|
ins:FtestWork = super().update(instance, validated_data)
|
|
with transaction.atomic():
|
|
fd_ids = []
|
|
for item in ftestworkdefect:
|
|
if item["count"] > 0:
|
|
try:
|
|
ins = FtestworkDefect.objects.get(ftestwork=ins, defect=item["defect"])
|
|
except FtestworkDefect.DoesNotExist:
|
|
raise ParseError("新的缺陷项!")
|
|
except FtestworkDefect.MultipleObjectsReturned:
|
|
raise ParseError("缺陷项重复!")
|
|
ins.count = item["count"]
|
|
ins.save()
|
|
fd_ids.append(ins.id)
|
|
FtestworkDefect.objects.filter(ftestwork=ins).exclude(id__in=fd_ids).delete()
|
|
if ins.qct or ftestworkdefect:
|
|
ins.cal_count()
|
|
return ins
|
|
|
|
class FtestWorkSerializer(CustomModelSerializer):
|
|
material_name = serializers.StringRelatedField(
|
|
source='material', read_only=True)
|
|
material_cate = serializers.CharField(source='material.cate', read_only=True)
|
|
mb_ = MaterialBatchDetailSerializer(source='mb', read_only=True)
|
|
test_user_name = serializers.CharField(source='test_user.name', read_only=True)
|
|
|
|
class Meta:
|
|
model = FtestWork
|
|
fields = "__all__"
|
|
|
|
|
|
class FtestItemSerializer(CustomModelSerializer):
|
|
testitem_name = serializers.CharField(
|
|
source='testitem.name', read_only=True)
|
|
testitem_description = serializers.CharField(
|
|
source='testitem.description', read_only=True)
|
|
|
|
class Meta:
|
|
model = FtestItem
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest']
|
|
|
|
|
|
class FtestSerializer(CustomModelSerializer):
|
|
test_user_name = serializers.CharField(
|
|
source='test_user.name', read_only=True)
|
|
check_user_name = serializers.CharField(
|
|
source='check_user.name', read_only=True)
|
|
ftestitems = FtestItemSerializer(label='检验明细', many=True)
|
|
|
|
class Meta:
|
|
model = Ftest
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS
|
|
|
|
def validate(self, attrs):
|
|
ftest_work: FtestWork = attrs.get('ftest_work', None)
|
|
if ftest_work:
|
|
test_user = attrs.get('test_user', None)
|
|
attrs['type'] = ftest_work.type
|
|
if test_user is None:
|
|
attrs['test_user'] = ftest_work.test_user
|
|
return attrs
|
|
|
|
def create(self, validated_data):
|
|
ftestitems = validated_data.pop('ftestitems', [])
|
|
with transaction.atomic():
|
|
instance = super().create(validated_data)
|
|
for item in ftestitems:
|
|
FtestItem.objects.create(ftest=instance, **item)
|
|
return instance
|
|
|
|
def update(self, instance, validated_data):
|
|
validated_data.pop('ftest_work', None)
|
|
ftestitems = validated_data.pop('ftestitems', [])
|
|
with transaction.atomic():
|
|
instance = super().update(instance, validated_data)
|
|
for item in ftestitems:
|
|
id = item.get('id', None)
|
|
if id:
|
|
ftestitem = FtestItem.objects.get(id=id)
|
|
ftestitem.test_val = item['test_val']
|
|
ftestitem.check_val = item['check_val']
|
|
ftestitem.save()
|
|
return instance
|
|
|
|
|
|
class PtestSerializer(CustomModelSerializer):
|
|
testitem_name = serializers.CharField(
|
|
source='testitem.name', read_only=True)
|
|
test_user_name = serializers.CharField(source='test_user', read_only=True)
|
|
|
|
class Meta:
|
|
model = Ptest
|
|
fields = '__all__'
|
|
|
|
def create(self, validated_data):
|
|
if Ptest.objects.filter(sample_number=validated_data['sample_number']).exists():
|
|
raise serializers.ValidationError('该样品编号已存在')
|
|
return super().create(validated_data)
|
|
|
|
|
|
class FtestDefectSerializer(CustomModelSerializer):
|
|
defect_name = serializers.CharField(source='defect.name', read_only=True)
|
|
class Meta:
|
|
model = FtestDefect
|
|
fields = '__all__'
|
|
read_only_fields = EXCLUDE_FIELDS_BASE + ['ftest', 'is_main']
|
|
|
|
|
|
class FtestItemProcessSerializer(CustomModelSerializer):
|
|
testitem_name = serializers.CharField(
|
|
source='testitem.name', read_only=True)
|
|
testitem_description = serializers.CharField(
|
|
source='testitem.description', read_only=True)
|
|
class Meta:
|
|
model = FtestItem
|
|
fields = ["id", "testitem", "test_user", "test_val_json", "testitem_name", "testitem_description", "addto_wpr", "test_equip"]
|
|
|
|
class FtestProcessSerializer(CustomModelSerializer):
|
|
test_user_name = serializers.CharField(
|
|
source='test_user.name', read_only=True)
|
|
ftestitems = FtestItemProcessSerializer(label='检验明细', many=True)
|
|
ftestdefects = FtestDefectSerializer(label='缺陷明细', many=True)
|
|
|
|
class Meta:
|
|
model = Ftest
|
|
fields = ["id", "test_date",
|
|
"test_user", "note", "ftestitems", "ftestdefects", "qct", "test_user_name"]
|
|
extra_kwargs = {"qct": {"required": True}}
|
|
|
|
def validate(self, attrs):
|
|
attrs["type"] = "process"
|
|
return attrs
|
|
|
|
def create(self, validated_data):
|
|
ftestitems = validated_data.pop('ftestitems', [])
|
|
ftestdefects = validated_data.pop('ftestdefects', [])
|
|
with transaction.atomic():
|
|
instance = super().create(validated_data)
|
|
for item in ftestitems:
|
|
FtestItem.objects.create(ftest=instance, **item)
|
|
is_ok = True
|
|
defect_main = None
|
|
has_is_main = False
|
|
for item2 in ftestdefects:
|
|
defect:Defect = item2["defect"]
|
|
if defect.okcate in [Defect.DEFECT_NOTOK] and item2["has"]:
|
|
is_ok = False
|
|
if not has_is_main:
|
|
item2["is_main"] = True
|
|
has_is_main = True
|
|
defect_main = defect
|
|
else:
|
|
item2["is_main"] = False
|
|
FtestDefect.objects.create(ftest=instance, **item2)
|
|
if not is_ok:
|
|
instance.defect_main = defect_main
|
|
else:
|
|
instance.defect_main = None
|
|
instance.is_ok = is_ok
|
|
instance.save()
|
|
return instance
|
|
|
|
def update(self, instance, validated_data):
|
|
ftestitems = validated_data.pop('ftestitems', [])
|
|
ftestdefects = validated_data.pop('ftestdefects', [])
|
|
with transaction.atomic():
|
|
instance = super().update(instance, validated_data)
|
|
for item in ftestitems:
|
|
try:
|
|
ins = FtestItem.objects.get(testitem = item["testitem"], ftest=instance)
|
|
except FtestItem.DoesNotExist:
|
|
raise ParseError("新的检测项!")
|
|
for k, v in item.items():
|
|
setattr(ins, k, v)
|
|
ins.save()
|
|
is_ok = True
|
|
defect_main = None
|
|
has_is_main = False
|
|
for item2 in ftestdefects:
|
|
try:
|
|
ins:FtestDefect = FtestDefect.objects.get(ftest=instance, defect=item2["defect"])
|
|
except FtestDefect.DoesNotExist:
|
|
raise ParseError("新的缺陷项!")
|
|
for k, v in item2.items():
|
|
setattr(ins, k, v)
|
|
ins.save()
|
|
if ins.is_main:
|
|
has_is_main = True
|
|
defect_main = ins.defect
|
|
if ins.has and ins.defect.okcate in [Defect.DEFECT_NOTOK]:
|
|
is_ok = False
|
|
if not has_is_main:
|
|
ins.is_main = True
|
|
has_is_main = True
|
|
defect_main = ins.defect
|
|
else:
|
|
ins.is_main = False
|
|
ins.save()
|
|
if not is_ok:
|
|
instance.defect_main = defect_main
|
|
else:
|
|
instance.defect_main = None
|
|
instance.is_ok = is_ok
|
|
instance.save()
|
|
return instance |