factory/apps/wpm/scripts/batch_bxerp.py

116 lines
5.8 KiB
Python

from apps.wpm.models import BatchSt
import logging
from apps.wpm.models import Mlogb, Mlogbw, MlogbDefect
from apps.mtm.models import Mgroup
import decimal
from django.db.models import Sum
from apps.wpm.services_2 import get_f_l_date
import json
from apps.utils.tools import MyJSONEncoder
myLogger = logging.getLogger("log")
def main(batch: str, mgroup_obj:Mgroup=None):
try:
batchst = BatchSt.objects.get(batch=batch, version=1)
except BatchSt.DoesNotExist:
myLogger.error(f"Batch {batch} does not exist")
return
data = {"批次号": batch}
if mgroup_obj is None:
mgroup_qs = Mgroup.objects.all().order_by("sort")
else:
mgroup_qs = Mgroup.objects.filter(id=mgroup_obj.id)
for mgroup in mgroup_qs:
mgroup_name = mgroup.name
mlogb1_qs = Mlogb.objects.filter(mlog__submit_time__isnull=False,
material_out__isnull=False, mlog__mgroup=mgroup,
mlog__is_fix=False, batch=batch, need_inout=True)
if mlogb1_qs.exists():
data[f"{mgroup_name}_日期"] = []
data[f"{mgroup_name}_操作人"] = []
data[f"{mgroup_name}_count_use"] = 0
data[f"{mgroup_name}_count_real"] = 0
data[f"{mgroup_name}_count_ok"] = 0
data[f"{mgroup_name}_count_notok"] = 0
data[f"{mgroup_name}_count_ok_full"] = 0
data[f"{mgroup_name}_count_pn_jgqbl"] = 0
mlogb_q_ids = []
for item in mlogb1_qs:
# 找到对应的输入
mlogb_from:Mlogb = item.mlogb_from
mlogbw_from:Mlogbw = item.mlogbw_from
if mlogb_from:
mlogb_q_ids.append(mlogb_from.id)
data[f"{mgroup_name}_count_use"] += mlogb_from.count_use
data[f"{mgroup_name}_count_pn_jgqbl"] += mlogb_from.count_pn_jgqbl
if mlogbw_from:
data[f"{mgroup_name}_count_use"] += 1
data[f"{mgroup_name}_count_pn_jgqbl"] += 0
if item.mlog.handle_user:
data[f"{mgroup_name}_操作人"].append(item.mlog.handle_user)
if item.mlog.handle_date:
data[f"{mgroup_name}_日期"].append(item.mlog.handle_date)
data[f"{mgroup_name}_count_real"] += item.count_real
data[f"{mgroup_name}_count_ok"] += item.count_ok
data[f"{mgroup_name}_count_ok_full"] += item.count_ok_full if item.count_ok_full else 0
data[f"{mgroup_name}_count_notok"] += item.count_notok if item.count_notok else 0
try:
data[f"{mgroup_name}_完全合格率"] = round((data[f"{mgroup_name}_count_ok_full"] / data[f"{mgroup_name}_count_real"])*100, 2)
except decimal.InvalidOperation:
data[f"{mgroup_name}_完全合格率"] = 0
try:
data[f"{mgroup_name}_合格率"] = round((data[f"{mgroup_name}_count_ok"] / data[f"{mgroup_name}_count_real"])*100, 2)
except decimal.InvalidOperation:
data[f"{mgroup_name}_合格率"] = 0
mlogbd1_qs = MlogbDefect.objects.filter(mlogb__in=mlogb1_qs, count__gt=0).values("defect__name").annotate(total=Sum("count"))
mlogbd1_qs_x = MlogbDefect.objects.filter(mlogb__in=mlogb1_qs, count_has__gt=0).values("defect__name").annotate(total=Sum("count_has"))
mlogbd1_q_qs = MlogbDefect.objects.filter(mlogb__id__in=mlogb_q_ids, count__gt=0).values("defect__name").annotate(total=Sum("count"))
for item in mlogbd1_q_qs:
data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_加工前_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_use"])*100, 2)
for item in mlogbd1_qs:
data[f"{mgroup_name}_缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_real"])*100, 2)
for item in mlogbd1_qs_x:
data[f"{mgroup_name}_含缺陷_{item['defect__name']}"] = item["total"]
data[f"{mgroup_name}_含缺陷_{item['defect__name']}_比例"] = round((item["total"] / data[f"{mgroup_name}_count_real"])*100, 2)
data[f"{mgroup_name}_日期"] = list(set(data[f"{mgroup_name}_日期"]))
data[f"{mgroup_name}_小日期"] = max(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d")
data[f"{mgroup_name}_大日期"] = min(data[f"{mgroup_name}_日期"]).strftime("%Y-%m-%d")
data[f"{mgroup_name}_日期"] = ";".join([item.strftime("%Y-%m-%d") for item in data[f"{mgroup_name}_日期"]])
data[f"{mgroup_name}_操作人"] = list(set(data[f"{mgroup_name}_操作人"]))
data[f"{mgroup_name}_操作人"] = ";".join([item.name for item in data[f"{mgroup_name}_操作人"]])
res = get_f_l_date(data)
old_data:dict = batchst.data
if old_data:
if mgroup_obj is not None:
for item in list(old_data.keys()):
if f'{mgroup_obj.name}_' in item:
del old_data[item]
else:
old_data = {}
else:
old_data = {}
old_data.update(data)
batchst.data = json.loads(json.dumps(old_data, cls=MyJSONEncoder))
if batchst.first_time is None or (res["first_time"] and res["first_time"] < batchst.first_time):
batchst.first_time = res["first_time"]
if batchst.last_time is None or (res["last_time"] and res["last_time"] > batchst.last_time):
batchst.last_time = res["last_time"]
batchst.save()
if __name__ == '__main__':
pass