161 lines
7.3 KiB
Python
161 lines
7.3 KiB
Python
from rest_framework.exceptions import ParseError
|
||
from apps.mtm.models import Process, Material
|
||
from apps.inm.models import WareHouse, MaterialBatch, MIOItem, MIOItemw, MIO
|
||
from apps.utils.tools import ranstr
|
||
from apps.mtm.services_2 import cal_material_count
|
||
|
||
def daoru_mb(path: str):
|
||
"""
|
||
导入物料批次(如没有物料自动创建)
|
||
"""
|
||
# 注释的是初次导入时做的数据矫正
|
||
# objs1 = Material.objects.filter(specification__contains=' ')
|
||
# for i in objs1:
|
||
# i.specification = i.specification.replace(' ', '')
|
||
# i.save()
|
||
# objs2 = Material.objects.filter(specification__contains='×')
|
||
# for i in objs2:
|
||
# i.specification = i.specification.replace('×', '*')
|
||
# i.save()
|
||
# objs3 = (Material.objects.filter(
|
||
# specification__contains='优级') | Material.objects.filter(specification__contains='一级')).exclude(specification__contains='|')
|
||
# for i in objs3:
|
||
# i.specification = i.specification.replace(
|
||
# '优级', '|优级').replace('一级', '|一级')
|
||
# i.save()
|
||
type_dict = {"主要原料": 30, "半成品": 20, "成品": 10, "辅助材料": 40, "加工工具": 50, "辅助工装": 60, "办公用品": 70}
|
||
from apps.utils.snowflake import idWorker
|
||
from openpyxl import load_workbook
|
||
|
||
wb = load_workbook(path)
|
||
process_l = Process.objects.all()
|
||
process_d = {p.name: p for p in process_l}
|
||
warehouse_l = WareHouse.objects.all()
|
||
warehouse_d = {w.name: w for w in warehouse_l}
|
||
for sheet in wb.worksheets:
|
||
i = 3
|
||
while sheet[f"a{i}"].value:
|
||
try:
|
||
type = type_dict[sheet[f"a{i}"].value.replace(" ", "")]
|
||
name = sheet[f"b{i}"].value.replace(" ", "")
|
||
specification = sheet[f"c{i}"].value.replace(" ", "")
|
||
if sheet[f"d{i}"].value and sheet[f"d{i}"].value.replace(" ", ""):
|
||
specification = specification + "|" + sheet[f"d{i}"].value.replace(" ", "")
|
||
model = sheet[f"e{i}"].value.replace(" ", "")
|
||
process = process_d[sheet[f"f{i}"].value.replace(" ", "")]
|
||
batch = sheet[f"g{i}"].value.replace(" ", "")
|
||
count = int(sheet[f"h{i}"].value)
|
||
warehouse = warehouse_d[sheet[f"i{i}"].value.replace(" ", "")]
|
||
except KeyError as e:
|
||
raise ParseError(f"第{i}行数据有误:{str(e)}")
|
||
material, _ = Material.objects.get_or_create(
|
||
type=type,
|
||
name=name,
|
||
specification=specification,
|
||
model=model,
|
||
process=process,
|
||
defaults={"type": type, "name": name, "specification": specification, "model": model, "process": process, "number": ranstr(6), "id": idWorker.get_id()},
|
||
)
|
||
MaterialBatch.objects.get_or_create(
|
||
material=material, batch=batch, warehouse=warehouse, defaults={"material": material, "batch": batch, "warehouse": warehouse, "count": count, "id": idWorker.get_id()}
|
||
)
|
||
cal_material_count([material.id])
|
||
i = i + 1
|
||
|
||
def daoru_mioitem_test(path:str, mioitem:MIOItem):
|
||
from apps.utils.snowflake import idWorker
|
||
from openpyxl import load_workbook
|
||
from apps.qm.models import TestItem, Ftest, Qct, FtestItem, FtestDefect
|
||
|
||
qct = Qct.get(mioitem.material, tag="inm", type="in")
|
||
if qct is None:
|
||
raise ParseError("未找到检验表")
|
||
|
||
t_name_list = ["配套序号", "棒编号", "棒最大外径/mm", "锥度/mm", "管编号", "管最大内径/mm", "配合间隙"]
|
||
t_list = []
|
||
for name in t_name_list:
|
||
try:
|
||
t_list.append(TestItem.objects.get(name=name))
|
||
except TestItem.DoesNotExist:
|
||
raise ParseError(f"未找到检验项:{name}")
|
||
except TestItem.MultipleObjectsReturned:
|
||
raise ParseError(f"检验项重复:{name}")
|
||
|
||
test_user = mioitem.mio.mio_user
|
||
test_date = mioitem.mio.inout_date
|
||
|
||
wb = load_workbook(path, data_only=True)
|
||
if "Sheet1" in wb.sheetnames: # 检查是否存在
|
||
sheet = wb["Sheet1"] # 获取工作表
|
||
else:
|
||
raise ParseError("未找到Sheet1")
|
||
|
||
mioitemws = MIOItemw.objects.filter(mioitem=mioitem).order_by("number")
|
||
|
||
for ind, item in enumerate(mioitemws):
|
||
ftest:Ftest = item.ftest
|
||
if ftest is None:
|
||
ftest = Ftest.objects.create(
|
||
type="purin",
|
||
test_numer=item.number,
|
||
qct=qct,
|
||
test_user=test_user,
|
||
is_ok=True,
|
||
test_date=test_date)
|
||
item.ftest = ftest
|
||
item.save()
|
||
else:
|
||
FtestItem.objects.filter(ftest=ftest).delete()
|
||
FtestDefect.objects.filter(ftest=ftest).delete()
|
||
ftest.is_ok = True
|
||
ftest.defect_main = None
|
||
ftest.save()
|
||
|
||
i = ind + 4
|
||
if sheet[f"c{i}"].value:
|
||
ftestitems = []
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[0], test_val_json=sheet[f"b{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[1], test_val_json=sheet[f"c{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[2], test_val_json=sheet[f"e{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[3], test_val_json=sheet[f"f{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[4], test_val_json=sheet[f"g{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[5], test_val_json=sheet[f"j{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[6], test_val_json=sheet[f"k{i}"].value, test_user=test_user, id=idWorker.get_id()))
|
||
FtestItem.objects.bulk_create(ftestitems)
|
||
else:
|
||
break
|
||
|
||
|
||
def daoru_mioitems(path:str, mio:MIO):
|
||
from apps.utils.snowflake import idWorker
|
||
from openpyxl import load_workbook
|
||
|
||
wb = load_workbook(path, data_only=True)
|
||
if "Sheet1" in wb.sheetnames: # 检查是否存在
|
||
sheet = wb["Sheet1"] # 获取工作表
|
||
else:
|
||
raise ParseError("未找到Sheet1")
|
||
|
||
mioitems = []
|
||
ind = 2
|
||
while sheet[f"a{ind}"].value:
|
||
batch = sheet[f"b{ind}"].value
|
||
material_number = sheet[f"a{ind}"].value
|
||
try:
|
||
material = Material.objects.get(number=material_number)
|
||
except Exception as e:
|
||
raise ParseError(f"未找到物料:{material_number} {e}")
|
||
if not batch and material.bin_number_main:
|
||
batch = material.bin_number_main
|
||
else:
|
||
raise ParseError(f"第{ind}行批次为空")
|
||
count = sheet[f"c{ind}"].value
|
||
warehouse_name = sheet[f"d{ind}"].value
|
||
try:
|
||
warehouse = WareHouse.objects.get(name=warehouse_name)
|
||
except Exception as e:
|
||
raise ParseError(f"未找到仓库:{warehouse_name} {e}")
|
||
mioitems.append(MIOItem(mio=mio, warehouse=warehouse, material=material, batch=batch, count=count, id=idWorker.get_id()))
|
||
ind = ind + 1
|
||
|
||
MIOItem.objects.bulk_create(mioitems) |