factory/apps/wpm/scripts/batch_gxerp.py

185 lines
10 KiB
Python

from apps.wpm.models import BatchSt
import logging
from apps.qm.models import Defect, FtestWork, FtestworkDefect
from apps.wpm.models import Mlogb, MlogbDefect
from apps.mtm.models import Mgroup
import decimal
from django.db.models import Sum
from datetime import datetime
from apps.wpm.services_2 import get_f_l_date
import json
from apps.utils.tools import MyJSONEncoder
myLogger = logging.getLogger("log")
def main(batch: str, mgroup_obj):
try:
batchst = BatchSt.objects.get(batch=batch, version=1)
except BatchSt.DoesNotExist:
myLogger.error(f"Batch {batch} does not exist")
return
data = {"批次号": batch}
mgroup_qs = Mgroup.objects.all().order_by("sort")
for mgroup in mgroup_qs:
mgroup_name = mgroup.name
mlogb1_qs = Mlogb.objects.filter(mlog__submit_time__isnull=False,
material_out__isnull=False, mlog__mgroup=mgroup,
mlog__is_fix=False, batch=batch, need_inout=True)
if mlogb1_qs.exists():
data[f"{mgroup_name}_日期"] = []
data[f"{mgroup_name}_操作人"] = []
data[f"{mgroup_name}_班次"] = []
data[f"{mgroup_name}_count_use"] = 0
data[f"{mgroup_name}_count_real"] = 0
data[f"{mgroup_name}_count_ok"] = 0
data[f"{mgroup_name}_count_notok"] = 0
data[f"{mgroup_name}_count_ok_full"] = 0
data[f"{mgroup_name}_count_pn_jgqbl"] = 0
mlogb_q_ids = []
for item in mlogb1_qs:
# 找到对应的输入
mlogb_from:Mlogb = item.mlogb_from
if mlogb_from:
mlogb_q_ids.append(mlogb_from.id)
data[f"{mgroup_name}_count_use"] += mlogb_from.count_use
data[f"{mgroup_name}_count_pn_jgqbl"] += mlogb_from.count_pn_jgqbl
if item.mlog.handle_user:
data[f"{mgroup_name}_操作人"].append(item.mlog.handle_user)
if item.mlog.handle_date:
data[f"{mgroup_name}_日期"].append(item.mlog.handle_date)
if item.mlog.shift:
data[f"{mgroup_name}_班次"].append(item.mlog.shift.name)
data[f"{mgroup_name}_count_real"] += item.count_real
data[f"{mgroup_name}_count_ok"] += item.count_ok
data[f"{mgroup_name}_count_ok_full"] += item.count_ok_full if item.count_ok_full else 0
data[f"{mgroup_name}_count_notok"] += item.count_notok if item.count_notok else 0
try:
data[f"{mgroup_name}_完全合格率"] = round((data[f"{mgroup_name}_count_ok_full"] / data[f"{mgroup_name}_count_real"])*100, 2)
except decimal.InvalidOperation:
data[f"{mgroup_name}_完全合格率"] = 0
try:
data[f"{mgroup_name}_合格率"] = round((data[f"{mgroup_name}_count_ok"] / data[f"{mgroup_name}_count_real"])*100, 2)
except decimal.InvalidOperation:
data[f"{mgroup_name}_合格率"] = 0
mlogbd1_qs = MlogbDefect.objects.filter(mlogb__in=mlogb1_qs, count__gt=0).values("defect__name").annotate(total=Sum("count"))
mlogbd1_q_qs = MlogbDefect.objects.filter(mlogb__id__in=mlogb_q_ids, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in mlogbd1_q_qs:
data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_use"])*100, 2)
for item in mlogbd1_qs:
data[f"{mgroup_name}_缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_real"])*100, 2)
data[f"{mgroup_name}_日期"] = list(set(data[f"{mgroup_name}_日期"]))
data[f"{mgroup_name}_日期"].sort()
data[f"{mgroup_name}_小日期"] = max(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d")
data[f"{mgroup_name}_大日期"] = min(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d")
data[f"{mgroup_name}_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data[f"{mgroup_name}_日期"]])
data[f"{mgroup_name}_操作人"] = list(set(data[f"{mgroup_name}_操作人"]))
data[f"{mgroup_name}_操作人"] = ";".join([item.name for item in data[f"{mgroup_name}_操作人"]])
data[f"{mgroup_name}_班次"] = list(set(data[f"{mgroup_name}_班次"]))
data[f"{mgroup_name}_班次"] = ";".join([item for item in data[f"{mgroup_name}_班次"]])
mlogb2_qs = Mlogb.objects.filter(mlog__submit_time__isnull=False,
material_out__isnull=False,
mlog__mgroup__name="外观检验",
mlog__is_fix=True, batch=batch, need_inout=True)
if mlogb2_qs.exists():
data["外观检验_返修_日期"] = []
data["外观检验_返修_操作人"] = []
data["外观检验_返修_count_real"] = 0
data["外观检验_返修_count_ok"] = 0
data["外观检验_返修_count_ok_full"] = 0
for item in mlogb2_qs:
if item.mlog.handle_user:
data["外观检验_返修_操作人"].append(item.mlog.handle_user)
if item.mlog.handle_date:
data["外观检验_返修_日期"].append(item.mlog.handle_date)
data["外观检验_返修_count_real"] += item.count_real
data["外观检验_返修_count_ok"] += item.count_ok
data["外观检验_返修_count_ok_full"] += item.count_ok_full if item.count_ok_full else 0
data["外观检验_返修_日期"] = list(set(data["外观检验_返修_日期"]))
data["外观检验_返修_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data["外观检验_返修_日期"]])
data["外观检验_返修_操作人"] = list(set(data["外观检验_返修_操作人"]))
data["外观检验_返修_操作人"] = ";".join([item.name for item in data["外观检验_返修_操作人"]])
mlogbd2_qs = MlogbDefect.objects.filter(mlogb__in=mlogb2_qs, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in mlogbd2_qs:
data[f"外观检验_返修_缺陷_{item['defect__name']}"] = item["total"]
data[f"外观检验_返修_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data["外观检验_返修_count_real"])*100, 2)
# 车间库存抽检
ft_qs = FtestWork.objects.filter(type2=FtestWork.TYPE2_SOME, wm__mgroup__name="外观检验", batch=batch, submit_time__isnull=False)
if ft_qs.exists():
data["外观检验_车间库存抽检_日期"] = []
data["外观检验_车间库存抽检_操作人"] = []
data["外观检验_车间库存抽检_count_notok"] = 0
for item in ft_qs:
if item.test_user:
data["外观检验_车间库存抽检_操作人"].append(item.test_user)
if item.test_date:
data["外观检验_车间库存抽检_日期"].append(item.test_date)
data["外观检验_车间库存抽检_count_notok"] += item.count_notok if item.count_notok else 0
data["外观检验_车间库存抽检_日期"] = list(set(data["外观检验_车间库存抽检_日期"]))
data["外观检验_车间库存抽检_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data["外观检验_车间库存抽检_日期"]])
data["外观检验_车间库存抽检_操作人"] = list(set(data["外观检验_车间库存抽检_操作人"]))
data["外观检验_车间库存抽检_操作人"] = ";".join([item.name for item in data["外观检验_车间库存抽检_操作人"]])
# 车间库存抽检缺陷
ftd_qs = FtestworkDefect.objects.filter(ftestwork__in=ft_qs, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in ftd_qs:
data[f"外观检验_车间库存抽检_缺陷_{item['defect__name']}"] = item["total"]
if "外观检验_count_ok" in data:
data["外观检验_总合格数"] = data["外观检验_count_ok"] + data.get("外观检验_返修_count_ok", 0)
try:
data["外观检验_总合格率"] = round((data["外观检验_总合格数"] / data["外观检验_count_real"])*100, 2)
except decimal.InvalidOperation:
data["外观检验_总合格率"] = 0
data["外观检验_完全总合格数"] = data["外观检验_count_ok_full"] + data.get("外观检验_返修_count_ok_full", 0)
try:
data["外观检验_完全总合格率"] = round((data["外观检验_完全总合格数"] / data["外观检验_count_real"])*100, 2)
except decimal.InvalidOperation:
data["外观检验_完全总合格率"] = 0
data["外观检验_直通合格数"] = data["外观检验_总合格数"] - data.get("外观检验_车间库存抽检_count_notok", 0)
if "尺寸检验_合格率" in data:
try:
data["外观检验_直通合格率"] = round((data["外观检验_总合格率"]* data["尺寸检验_合格率"])/100, 2)
except decimal.InvalidOperation:
data["外观检验_直通合格率"] = 0
try:
data["外观检验_直通合格率2"] = round((data["外观检验_直通合格数"]/data["尺寸检验_count_use"])*100, 2)
except decimal.InvalidOperation:
data["外观检验_直通合格率2"] = 0
if "尺寸检验_完全合格率" in data:
try:
data["外观检验_完全直通合格率"] = round((data["外观检验_完全总合格率"]* data["尺寸检验_完全合格率"])/100, 2)
except decimal.InvalidOperation:
data["外观检验_完全直通合格率"] = 0
res = get_f_l_date(data)
batchst.data = json.loads(json.dumps(data, cls=MyJSONEncoder))
if batchst.first_time is None or (res["first_time"] and res["first_time"] < batchst.first_time):
batchst.first_time = res["first_time"]
if batchst.last_time is None or (res["last_time"] and res["last_time"] > batchst.last_time):
batchst.last_time = res["last_time"]
batchst.save()
if __name__ == '__main__':
pass