from apps.wpm.models import BatchSt import logging from apps.qm.models import Defect, FtestWork, FtestworkDefect from apps.wpm.models import Mlogb, MlogbDefect, Mlog from apps.mtm.models import Mgroup import decimal from django.db.models import Sum from datetime import datetime from apps.wpm.services_2 import get_f_l_date import json from apps.utils.tools import MyJSONEncoder myLogger = logging.getLogger("log") def main(batch: str, mgroup_obj): try: batchst = BatchSt.objects.get(batch=batch, version=1) except BatchSt.DoesNotExist: myLogger.error(f"Batch {batch} does not exist") return data = {"批次号": batch} mgroup_qs = Mgroup.objects.all().order_by("sort") for mgroup in mgroup_qs: mgroup_name = mgroup.name mlogb1_qs = Mlogb.objects.filter(mlog__submit_time__isnull=False, material_out__isnull=False, mlog__mgroup=mgroup, mlog__is_fix=False, batch=batch, need_inout=True) if mlogb1_qs.exists(): data[f"{mgroup_name}_日期"] = [] data[f"{mgroup_name}_操作人"] = [] data[f"{mgroup_name}_班次"] = [] data[f"{mgroup_name}_count_use"] = 0 data[f"{mgroup_name}_count_real"] = 0 data[f"{mgroup_name}_count_ok"] = 0 data[f"{mgroup_name}_count_notok"] = 0 data[f"{mgroup_name}_count_ok_full"] = 0 data[f"{mgroup_name}_count_pn_jgqbl"] = 0 mlogb_q_ids = [] for item in mlogb1_qs: # 找到对应的输入 mlogb_from:Mlogb = item.mlogb_from if mlogb_from: mlogb_q_ids.append(mlogb_from.id) data[f"{mgroup_name}_count_use"] += mlogb_from.count_use data[f"{mgroup_name}_count_pn_jgqbl"] += mlogb_from.count_pn_jgqbl if item.mlog.handle_user: data[f"{mgroup_name}_操作人"].append(item.mlog.handle_user) if item.mlog.handle_date: data[f"{mgroup_name}_日期"].append(item.mlog.handle_date) if item.mlog.shift: data[f"{mgroup_name}_班次"].append(item.mlog.shift.name) data[f"{mgroup_name}_count_real"] += item.count_real data[f"{mgroup_name}_count_ok"] += item.count_ok data[f"{mgroup_name}_count_ok_full"] += item.count_ok_full if item.count_ok_full else 0 data[f"{mgroup_name}_count_notok"] += item.count_notok if item.count_notok else 0 try: data[f"{mgroup_name}_完全合格率"] = round((data[f"{mgroup_name}_count_ok_full"] / data[f"{mgroup_name}_count_real"])*100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data[f"{mgroup_name}_完全合格率"] = 0 try: data[f"{mgroup_name}_合格率"] = round((data[f"{mgroup_name}_count_ok"] / data[f"{mgroup_name}_count_real"])*100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data[f"{mgroup_name}_合格率"] = 0 mlogbd1_qs = MlogbDefect.objects.filter(mlogb__in=mlogb1_qs, count__gt=0).values("defect__name").annotate(total=Sum("count")) mlogbd1_q_qs = MlogbDefect.objects.filter(mlogb__id__in=mlogb_q_ids, count__gt=0).values("defect__name").annotate(total=Sum("count")) for item in mlogbd1_q_qs: data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}"] = item["total"] data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_use"])*100, 2) for item in mlogbd1_qs: data[f"{mgroup_name}_缺陷_{item['defect__name']}"] = item["total"] data[f"{mgroup_name}_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_real"])*100, 2) data[f"{mgroup_name}_日期"] = list(set(data[f"{mgroup_name}_日期"])) data[f"{mgroup_name}_日期"].sort() data[f"{mgroup_name}_小日期"] = min(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d") data[f"{mgroup_name}_大日期"] = max(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d") data[f"{mgroup_name}_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data[f"{mgroup_name}_日期"]]) data[f"{mgroup_name}_操作人"] = list(set(data[f"{mgroup_name}_操作人"])) data[f"{mgroup_name}_操作人"] = ";".join([item.name for item in data[f"{mgroup_name}_操作人"]]) data[f"{mgroup_name}_班次"] = list(set(data[f"{mgroup_name}_班次"])) data[f"{mgroup_name}_班次"] = ";".join([item for item in data[f"{mgroup_name}_班次"]]) # 按 mlog__submit_time, id 排序,每条 Mlogb 记录独立为一次返修 # (同一 Mlog 下有多条同批次 Mlogb 时也能正确拆分为多次返修) _all_fix_qs = Mlogb.objects.filter( mlog__submit_time__isnull=False, material_out__isnull=False, mlog__mgroup__name="外观检验", mlog__is_fix=True, batch=batch, need_inout=True, ).order_by("mlog__submit_time", "id") _fix_prefixes = [] for fix_idx, fix_mlogb in enumerate(_all_fix_qs): suffix = "" if fix_idx == 0 else str(fix_idx + 1) prefix = f"外观检验_返修{suffix}_" _fix_prefixes.append(prefix) mlog = fix_mlogb.mlog handle_date = mlog.handle_date data[f"{prefix}count_real"] = fix_mlogb.count_real data[f"{prefix}count_ok"] = fix_mlogb.count_ok data[f"{prefix}count_ok_full"] = fix_mlogb.count_ok_full or 0 data[f"{prefix}count_notok"] = fix_mlogb.count_notok or 0 try: data[f"{prefix}合格率"] = round((fix_mlogb.count_ok / fix_mlogb.count_real) * 100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data[f"{prefix}合格率"] = 0 try: data[f"{prefix}完全合格率"] = round(((fix_mlogb.count_ok_full or 0) / fix_mlogb.count_real) * 100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data[f"{prefix}完全合格率"] = 0 data[f"{prefix}日期"] = handle_date.strftime("%Y-%m-%d") if handle_date else "" data[f"{prefix}小日期"] = handle_date.strftime("%Y-%m-%d") if handle_date else "" data[f"{prefix}大日期"] = handle_date.strftime("%Y-%m-%d") if handle_date else "" data[f"{prefix}操作人"] = mlog.handle_user.name if mlog.handle_user else "" data[f"{prefix}班次"] = mlog.shift.name if mlog.shift else "" fix_defect_qs = MlogbDefect.objects.filter(mlogb=fix_mlogb, count__gt=0).values("defect__name").annotate(total=Sum("count")) for item in fix_defect_qs: data[f"{prefix}缺陷_{item['defect__name']}"] = item["total"] data[f"{prefix}缺陷_{item['defect__name']}_比例"] = round((item["total"] / fix_mlogb.count_real) * 100, 2) # 车间库存抽检 ft_qs = FtestWork.objects.filter(type2=FtestWork.TYPE2_SOME, wm__mgroup__name="外观检验", batch=batch, submit_time__isnull=False) if ft_qs.exists(): data["外观检验_车间库存抽检_日期"] = [] data["外观检验_车间库存抽检_操作人"] = [] data["外观检验_车间库存抽检_count_notok"] = 0 for item in ft_qs: if item.test_user: data["外观检验_车间库存抽检_操作人"].append(item.test_user) if item.test_date: data["外观检验_车间库存抽检_日期"].append(item.test_date) data["外观检验_车间库存抽检_count_notok"] += item.count_notok if item.count_notok else 0 data["外观检验_车间库存抽检_日期"] = list(set(data["外观检验_车间库存抽检_日期"])) data["外观检验_车间库存抽检_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data["外观检验_车间库存抽检_日期"]]) data["外观检验_车间库存抽检_操作人"] = list(set(data["外观检验_车间库存抽检_操作人"])) data["外观检验_车间库存抽检_操作人"] = ";".join([item.name for item in data["外观检验_车间库存抽检_操作人"]]) # 车间库存抽检缺陷 ftd_qs = FtestworkDefect.objects.filter(ftestwork__in=ft_qs, count__gt=0).values("defect__name").annotate(total=Sum("count")) for item in ftd_qs: data[f"外观检验_车间库存抽检_缺陷_{item['defect__name']}"] = item["total"] if "外观检验_count_ok" in data: data["外观检验_总合格数"] = data["外观检验_count_ok"] + sum(data.get(f"{p}count_ok", 0) for p in _fix_prefixes) try: data["外观检验_总合格率"] = round((data["外观检验_总合格数"] / data["外观检验_count_real"])*100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data["外观检验_总合格率"] = 0 data["外观检验_完全总合格数"] = data["外观检验_count_ok_full"] + sum(data.get(f"{p}count_ok_full", 0) for p in _fix_prefixes) try: data["外观检验_完全总合格率"] = round((data["外观检验_完全总合格数"] / data["外观检验_count_real"])*100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data["外观检验_完全总合格率"] = 0 data["外观检验_直通合格数"] = data["外观检验_总合格数"] - data.get("外观检验_车间库存抽检_count_notok", 0) if "尺寸检验_合格率" in data: try: data["外观检验_直通合格率"] = round((data["外观检验_总合格率"]* data["尺寸检验_合格率"])/100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data["外观检验_直通合格率"] = 0 try: data["外观检验_直通合格率2"] = round((data["外观检验_直通合格数"]/data["尺寸检验_count_use"])*100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data["外观检验_直通合格率2"] = 0 if "尺寸检验_完全合格率" in data: try: data["外观检验_完全直通合格率"] = round((data["外观检验_完全总合格率"]* data["尺寸检验_完全合格率"])/100, 2) except (decimal.InvalidOperation, ZeroDivisionError): data["外观检验_完全直通合格率"] = 0 res = get_f_l_date(data) batchst.data = json.loads(json.dumps(data, cls=MyJSONEncoder)) if batchst.first_time is None or (res["first_time"] and res["first_time"] < batchst.first_time): batchst.first_time = res["first_time"] if batchst.last_time is None or (res["last_time"] and res["last_time"] > batchst.last_time): batchst.last_time = res["last_time"] batchst.save() if __name__ == '__main__': pass