factory/apps/wpm/scripts/batch_gxerp.py

194 lines
11 KiB
Python

from apps.wpm.models import BatchSt
import logging
from apps.qm.models import Defect, FtestWork, FtestworkDefect
from apps.wpm.models import Mlogb, MlogbDefect, Mlog
from apps.mtm.models import Mgroup
import decimal
from django.db.models import Sum
from datetime import datetime
from apps.wpm.services_2 import get_f_l_date
import json
from apps.utils.tools import MyJSONEncoder
myLogger = logging.getLogger("log")
def main(batch: str, mgroup_obj):
try:
batchst = BatchSt.objects.get(batch=batch, version=1)
except BatchSt.DoesNotExist:
myLogger.error(f"Batch {batch} does not exist")
return
data = {"批次号": batch}
mgroup_qs = Mgroup.objects.all().order_by("sort")
for mgroup in mgroup_qs:
mgroup_name = mgroup.name
mlogb1_qs = Mlogb.objects.filter(mlog__submit_time__isnull=False,
material_out__isnull=False, mlog__mgroup=mgroup,
mlog__is_fix=False, batch=batch, need_inout=True)
if mlogb1_qs.exists():
data[f"{mgroup_name}_日期"] = []
data[f"{mgroup_name}_操作人"] = []
data[f"{mgroup_name}_班次"] = []
data[f"{mgroup_name}_count_use"] = 0
data[f"{mgroup_name}_count_real"] = 0
data[f"{mgroup_name}_count_ok"] = 0
data[f"{mgroup_name}_count_notok"] = 0
data[f"{mgroup_name}_count_ok_full"] = 0
data[f"{mgroup_name}_count_pn_jgqbl"] = 0
mlogb_q_ids = []
for item in mlogb1_qs:
# 找到对应的输入
mlogb_from:Mlogb = item.mlogb_from
if mlogb_from:
mlogb_q_ids.append(mlogb_from.id)
data[f"{mgroup_name}_count_use"] += mlogb_from.count_use
data[f"{mgroup_name}_count_pn_jgqbl"] += mlogb_from.count_pn_jgqbl
if item.mlog.handle_user:
data[f"{mgroup_name}_操作人"].append(item.mlog.handle_user)
if item.mlog.handle_date:
data[f"{mgroup_name}_日期"].append(item.mlog.handle_date)
if item.mlog.shift:
data[f"{mgroup_name}_班次"].append(item.mlog.shift.name)
data[f"{mgroup_name}_count_real"] += item.count_real
data[f"{mgroup_name}_count_ok"] += item.count_ok
data[f"{mgroup_name}_count_ok_full"] += item.count_ok_full if item.count_ok_full else 0
data[f"{mgroup_name}_count_notok"] += item.count_notok if item.count_notok else 0
try:
data[f"{mgroup_name}_完全合格率"] = round((data[f"{mgroup_name}_count_ok_full"] / data[f"{mgroup_name}_count_real"])*100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data[f"{mgroup_name}_完全合格率"] = 0
try:
data[f"{mgroup_name}_合格率"] = round((data[f"{mgroup_name}_count_ok"] / data[f"{mgroup_name}_count_real"])*100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data[f"{mgroup_name}_合格率"] = 0
mlogbd1_qs = MlogbDefect.objects.filter(mlogb__in=mlogb1_qs, count__gt=0).values("defect__name").annotate(total=Sum("count"))
mlogbd1_q_qs = MlogbDefect.objects.filter(mlogb__id__in=mlogb_q_ids, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in mlogbd1_q_qs:
data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_use"])*100, 2)
for item in mlogbd1_qs:
data[f"{mgroup_name}_缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_real"])*100, 2)
data[f"{mgroup_name}_日期"] = list(set(data[f"{mgroup_name}_日期"]))
data[f"{mgroup_name}_日期"].sort()
data[f"{mgroup_name}_小日期"] = min(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d")
data[f"{mgroup_name}_大日期"] = max(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d")
data[f"{mgroup_name}_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data[f"{mgroup_name}_日期"]])
data[f"{mgroup_name}_操作人"] = list(set(data[f"{mgroup_name}_操作人"]))
data[f"{mgroup_name}_操作人"] = ";".join([item.name for item in data[f"{mgroup_name}_操作人"]])
data[f"{mgroup_name}_班次"] = list(set(data[f"{mgroup_name}_班次"]))
data[f"{mgroup_name}_班次"] = ";".join([item for item in data[f"{mgroup_name}_班次"]])
# 按 mlog__submit_time, id 排序,每条 Mlogb 记录独立为一次返修
# (同一 Mlog 下有多条同批次 Mlogb 时也能正确拆分为多次返修)
_all_fix_qs = Mlogb.objects.filter(
mlog__submit_time__isnull=False,
material_out__isnull=False,
mlog__mgroup__name="外观检验",
mlog__is_fix=True,
batch=batch,
need_inout=True,
).order_by("mlog__submit_time", "id")
_fix_prefixes = []
for fix_idx, fix_mlogb in enumerate(_all_fix_qs):
suffix = "" if fix_idx == 0 else str(fix_idx + 1)
prefix = f"外观检验_返修{suffix}_"
_fix_prefixes.append(prefix)
mlog = fix_mlogb.mlog
handle_date = mlog.handle_date
data[f"{prefix}count_real"] = fix_mlogb.count_real
data[f"{prefix}count_ok"] = fix_mlogb.count_ok
data[f"{prefix}count_ok_full"] = fix_mlogb.count_ok_full or 0
data[f"{prefix}count_notok"] = fix_mlogb.count_notok or 0
try:
data[f"{prefix}合格率"] = round((fix_mlogb.count_ok / fix_mlogb.count_real) * 100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data[f"{prefix}合格率"] = 0
try:
data[f"{prefix}完全合格率"] = round(((fix_mlogb.count_ok_full or 0) / fix_mlogb.count_real) * 100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data[f"{prefix}完全合格率"] = 0
data[f"{prefix}日期"] = handle_date.strftime("%Y-%m-%d") if handle_date else ""
data[f"{prefix}小日期"] = handle_date.strftime("%Y-%m-%d") if handle_date else ""
data[f"{prefix}大日期"] = handle_date.strftime("%Y-%m-%d") if handle_date else ""
data[f"{prefix}操作人"] = mlog.handle_user.name if mlog.handle_user else ""
data[f"{prefix}班次"] = mlog.shift.name if mlog.shift else ""
fix_defect_qs = MlogbDefect.objects.filter(mlogb=fix_mlogb, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in fix_defect_qs:
data[f"{prefix}缺陷_{item['defect__name']}"] = item["total"]
data[f"{prefix}缺陷_{item['defect__name']}_比例"] = round((item["total"] / fix_mlogb.count_real) * 100, 2)
# 车间库存抽检
ft_qs = FtestWork.objects.filter(type2=FtestWork.TYPE2_SOME, wm__mgroup__name="外观检验", batch=batch, submit_time__isnull=False)
if ft_qs.exists():
data["外观检验_车间库存抽检_日期"] = []
data["外观检验_车间库存抽检_操作人"] = []
data["外观检验_车间库存抽检_count_notok"] = 0
for item in ft_qs:
if item.test_user:
data["外观检验_车间库存抽检_操作人"].append(item.test_user)
if item.test_date:
data["外观检验_车间库存抽检_日期"].append(item.test_date)
data["外观检验_车间库存抽检_count_notok"] += item.count_notok if item.count_notok else 0
data["外观检验_车间库存抽检_日期"] = list(set(data["外观检验_车间库存抽检_日期"]))
data["外观检验_车间库存抽检_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data["外观检验_车间库存抽检_日期"]])
data["外观检验_车间库存抽检_操作人"] = list(set(data["外观检验_车间库存抽检_操作人"]))
data["外观检验_车间库存抽检_操作人"] = ";".join([item.name for item in data["外观检验_车间库存抽检_操作人"]])
# 车间库存抽检缺陷
ftd_qs = FtestworkDefect.objects.filter(ftestwork__in=ft_qs, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in ftd_qs:
data[f"外观检验_车间库存抽检_缺陷_{item['defect__name']}"] = item["total"]
if "外观检验_count_ok" in data:
data["外观检验_总合格数"] = data["外观检验_count_ok"] + sum(data.get(f"{p}count_ok", 0) for p in _fix_prefixes)
try:
data["外观检验_总合格率"] = round((data["外观检验_总合格数"] / data["外观检验_count_real"])*100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data["外观检验_总合格率"] = 0
data["外观检验_完全总合格数"] = data["外观检验_count_ok_full"] + sum(data.get(f"{p}count_ok_full", 0) for p in _fix_prefixes)
try:
data["外观检验_完全总合格率"] = round((data["外观检验_完全总合格数"] / data["外观检验_count_real"])*100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data["外观检验_完全总合格率"] = 0
data["外观检验_直通合格数"] = data["外观检验_总合格数"] - data.get("外观检验_车间库存抽检_count_notok", 0)
if "尺寸检验_合格率" in data:
try:
data["外观检验_直通合格率"] = round((data["外观检验_总合格率"]* data["尺寸检验_合格率"])/100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data["外观检验_直通合格率"] = 0
try:
data["外观检验_直通合格率2"] = round((data["外观检验_直通合格数"]/data["尺寸检验_count_use"])*100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data["外观检验_直通合格率2"] = 0
if "尺寸检验_完全合格率" in data:
try:
data["外观检验_完全直通合格率"] = round((data["外观检验_完全总合格率"]* data["尺寸检验_完全合格率"])/100, 2)
except (decimal.InvalidOperation, ZeroDivisionError):
data["外观检验_完全直通合格率"] = 0
res = get_f_l_date(data)
batchst.data = json.loads(json.dumps(data, cls=MyJSONEncoder))
if batchst.first_time is None or (res["first_time"] and res["first_time"] < batchst.first_time):
batchst.first_time = res["first_time"]
if batchst.last_time is None or (res["last_time"] and res["last_time"] > batchst.last_time):
batchst.last_time = res["last_time"]
batchst.save()
if __name__ == '__main__':
pass