from django.db import models from apps.qm.models import FtestDefect, FtestItem from apps.utils.models import BaseModel from apps.mtm.models import Material from rest_framework.exceptions import ParseError from apps.wpm.models import WmStateOption, Mlogbw, Handoverbw from apps.utils.tools import update_dict from apps.inm.models import MIOItemw from django.db.models import F, Value from django.db import IntegrityError # Create your models here. class Wpr(BaseModel): """ TN: 动态产品 """ number = models.CharField("编号", max_length=50, null=True, blank=True) number_out = models.CharField("对外编号", max_length=50, null=True, blank=True, unique=True) version = models.PositiveIntegerField("版本号", default=9999) state = models.PositiveSmallIntegerField('状态', default=10, choices=WmStateOption.choices) material = models.ForeignKey(Material, verbose_name="当前物料形态", on_delete=models.CASCADE, related_name="material_wpr") material_start = models.ForeignKey(Material, verbose_name="初始物料", on_delete=models.CASCADE, null=True, blank=True, related_name="material_start_wpr") defects = models.ManyToManyField("qm.defect", through="wpmw.wprdefect", verbose_name="缺陷项") mb = models.ForeignKey("inm.materialbatch", verbose_name="仓库物料", on_delete=models.CASCADE, null=True, blank=True) wm = models.ForeignKey("wpm.wmaterial", verbose_name="车间物料", on_delete=models.CASCADE, null=True, blank=True) oinfo = models.JSONField(verbose_name="其他信息", default=dict, blank=True) wpr_from = models.ForeignKey("self", verbose_name="来源于", on_delete=models.CASCADE, null=True, blank=True) data = models.JSONField(verbose_name="数据", default=dict, blank=True) @classmethod def change_or_new(cls, wpr=None, number=None, mb=None, wm=None, old_mb=None, old_wm=None, ftest=None, wpr_from=None, add_version=True, number_out=None): is_create = False if wpr is None and number is None: raise ParseError("id和number不能同时为空") if mb and wm: raise ParseError("所属仓库批次和车间批次不可同时存在") if wpr: ins:Wpr = wpr # if wpr and mb is None and wm is None and ins.version <= 0: # 相当于撤回操作 # wpr.delete() # return elif number: ins_x = cls.objects.filter(number=number).order_by("-update_time").first() if ins_x: if ins_x.wm is None and ins_x.mb is None: # 此时可以直接复用 ins = ins_x # if ins_x.version > 1: # 说明被复用了 # if wpr_from is None: # wpr_from = ins_x # # 创建新的wpr # ins = cls(number=number) # ins.version = -1 # ins.oinfo = {} # is_create = True # else: # raise ParseError(f"该物料编号{number}-已存在不可使用") else: raise ParseError(f"该物料编号{number}-已存在不可使用") else: ins = cls(number=number) ins.version = -1 ins.oinfo = {} is_create = True # if batch_from: # 尝试从批号追踪来源 # ins_from = cls.objects.filter(number=batch_from).order_by("-version").first() # if ins_from: # if ins_from.wm is None and ins_from.mb is None: # if ins_from.version > 1: # 说明被复用了 # wpr_from = ins_from # else: # raise ParseError(f"该物料编号{number}-已存在不可使用") # elif wpr_from is None: # raise ParseError(f"该物料编号{number}-尝试从批号追踪来源失败") # elif number_from: # 尝试从编号追踪来源 # ins_from = cls.objects.filter(number=number_from).order_by("-version").first() # if ins_from: # if ins_from.wm is None and ins_from.mb is None: # if ins_from.version > 1: # 说明被复用了 # wpr_from = ins_from # else: # raise ParseError(f"该物料编号{number}-已存在不可使用") # elif wpr_from is None: # raise ParseError(f"该物料编号{number}-尝试历史编号追踪来源失败") if old_mb and ins.mb != old_mb: raise ParseError(f"请检查-{ins.number}-所属仓库批次") if old_wm and ins.wm != old_wm: raise ParseError(f"请检查-{ins.number}-所属车间批次") ins.wpr_from = wpr_from ins.mb = mb if mb: ins.material = mb.material ins.state = mb.state if is_create: ins.material_start = mb.material ins.wm = wm if wm: ins.material = wm.material ins.state = wm.state if is_create: ins.material_start = wm.material WprDefect.objects.filter(wpr=ins).delete() if add_version: ins.version = ins.version + 1 ins.save() if wm: if wm.defect: WprDefect.objects.create(wpr=ins, defect=wm.defect, is_main=True) if ftest: # 通过检验变更其缺陷项 WprDefect.objects.filter(wpr=ins).delete() for ftestdefect in FtestDefect.objects.filter(ftest=ftest, has=True): WprDefect.objects.create(wpr=ins, defect=ftestdefect.defect, is_main=ftestdefect.is_main) # 携带某些检测信息 oinfo = {} for ftestitem in FtestItem.objects.filter(ftest=ftest, addto_wpr=True): if ftestitem.test_val_json: oinfo[ftestitem.id] = {"name": ftestitem.testitem.name, "val": ftestitem.test_val_json} ins.oinfo = update_dict(ins.oinfo, oinfo) ins.save() # if ins.mb and ins.wm: # raise ParseError("所属仓库批次和车间批次不可同时存在2") if number_out: try: ins.number_out = number_out ins.save() except IntegrityError: raise ParseError(f"{ins.number}-出库编号-{number_out}-已存在") return ins @classmethod def clear(cls, number_list): cls.objects.filter(number__in=number_list).update(mb=None, wm=None) @classmethod def get_qs_by_mb(cls, mb): return cls.objects.filter(mb=mb, wm=None) @classmethod def get_qs_by_wm(cls, wm): return cls.objects.filter(wm=wm, mb=None) @property def wprdefect(self): return WprDefect.objects.filter(wpr=self) def timeline(self): # 获取完整时间线 mioitemw_qs = MIOItemw.objects.filter(wpr=self, mioitem__mio__submit_time__isnull=False).values( "id", timex=F("mioitem__mio__submit_time"), model=Value('mioitemw', output_field=models.CharField())) mlogbw_qs = Mlogbw.objects.filter(wpr=self, mlogb__mlog__submit_time__isnull=False, mlogb__material_out__isnull=False).values( "id", timex=F("mlogb__mlog__submit_time"), model=Value('mlogbw', output_field=models.CharField())) h_qs = Handoverbw.objects.filter(wpr=self, handoverb__handover__submit_time__isnull=False).values( "id", timex=F("handoverb__handover__submit_time"), model=Value('handoverbw', output_field=models.CharField())) combined_qs = mioitemw_qs.union(mlogbw_qs, h_qs).order_by('timex') return list(combined_qs) def get_material_start(self): timeline = self.timeline() if timeline: x = timeline[0] if x['model'] == 'mioitemw': return MIOItemw.objects.get(id=x['id']).mioitem.material elif x['model'] == 'mlogbw': return Mlogbw.objects.get(id=x['id']).mlogb.material_out return None class WprDefect(BaseModel): """TN:产物缺陷项关联表""" wpr = models.ForeignKey(Wpr, verbose_name="关联产物", on_delete=models.CASCADE) defect = models.ForeignKey("qm.defect", verbose_name="关联缺陷项", on_delete=models.CASCADE) is_main = models.BooleanField("是否主要缺陷", default=False)