from rest_framework.exceptions import ParseError from apps.mtm.models import Process, Material from apps.inm.models import WareHouse, MaterialBatch, MIOItem, MIOItemw, MIO from apps.utils.tools import ranstr from apps.mtm.services_2 import cal_material_count def daoru_mb(path: str): """ 导入物料批次(如没有物料自动创建) """ # 注释的是初次导入时做的数据矫正 # objs1 = Material.objects.filter(specification__contains=' ') # for i in objs1: # i.specification = i.specification.replace(' ', '') # i.save() # objs2 = Material.objects.filter(specification__contains='×') # for i in objs2: # i.specification = i.specification.replace('×', '*') # i.save() # objs3 = (Material.objects.filter( # specification__contains='优级') | Material.objects.filter(specification__contains='一级')).exclude(specification__contains='|') # for i in objs3: # i.specification = i.specification.replace( # '优级', '|优级').replace('一级', '|一级') # i.save() type_dict = {"主要原料": 30, "半成品": 20, "成品": 10, "辅助材料": 40, "加工工具": 50, "辅助工装": 60, "办公用品": 70} from apps.utils.snowflake import idWorker from openpyxl import load_workbook wb = load_workbook(path) process_l = Process.objects.all() process_d = {p.name: p for p in process_l} warehouse_l = WareHouse.objects.all() warehouse_d = {w.name: w for w in warehouse_l} for sheet in wb.worksheets: i = 3 while sheet[f"a{i}"].value: try: type = type_dict[sheet[f"a{i}"].value.replace(" ", "")] name = sheet[f"b{i}"].value.replace(" ", "") specification = sheet[f"c{i}"].value.replace(" ", "") if sheet[f"d{i}"].value and sheet[f"d{i}"].value.replace(" ", ""): specification = specification + "|" + sheet[f"d{i}"].value.replace(" ", "") model = sheet[f"e{i}"].value.replace(" ", "") process = process_d[sheet[f"f{i}"].value.replace(" ", "")] batch = sheet[f"g{i}"].value.replace(" ", "") count = int(sheet[f"h{i}"].value) warehouse = warehouse_d[sheet[f"i{i}"].value.replace(" ", "")] except KeyError as e: raise ParseError(f"第{i}行数据有误:{str(e)}") material, _ = Material.objects.get_or_create( type=type, name=name, specification=specification, model=model, process=process, defaults={"type": type, "name": name, "specification": specification, "model": model, "process": process, "number": ranstr(6), "id": idWorker.get_id()}, ) MaterialBatch.objects.get_or_create( material=material, batch=batch, warehouse=warehouse, defaults={"material": material, "batch": batch, "warehouse": warehouse, "count": count, "id": idWorker.get_id()} ) cal_material_count([material.id]) i = i + 1 def daoru_mioitem_test(path:str, mioitem:MIOItem): from apps.utils.snowflake import idWorker from openpyxl import load_workbook from apps.qm.models import TestItem, Ftest, Qct, FtestItem, FtestDefect qct = Qct.get(mioitem.material, tag="inm", type="in") if qct is None: raise ParseError("未找到检验表") t_name_list = ["配套序号", "棒编号", "棒最大外径/mm", "锥度/mm", "管编号", "管最大内径/mm", "配合间隙"] t_list = [] for name in t_name_list: try: t_list.append(TestItem.objects.get(name=name)) except TestItem.DoesNotExist: raise ParseError(f"未找到检验项:{name}") except TestItem.MultipleObjectsReturned: raise ParseError(f"检验项重复:{name}") test_user = mioitem.mio.mio_user test_date = mioitem.mio.inout_date wb = load_workbook(path, data_only=True) if "Sheet1" in wb.sheetnames: # 检查是否存在 sheet = wb["Sheet1"] # 获取工作表 else: raise ParseError("未找到Sheet1") mioitemws = MIOItemw.objects.filter(mioitem=mioitem).order_by("number") for ind, item in enumerate(mioitemws): ftest:Ftest = item.ftest if ftest is None: ftest = Ftest.objects.create( type="purin", test_numer=item.number, qct=qct, test_user=test_user, is_ok=True, test_date=test_date) item.ftest = ftest item.save() else: FtestItem.objects.filter(ftest=ftest).delete() FtestDefect.objects.filter(ftest=ftest).delete() ftest.is_ok = True ftest.defect_main = None ftest.save() i = ind + 4 if sheet[f"c{i}"].value: ftestitems = [] ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[0], test_val_json=sheet[f"b{i}"].value, test_user=test_user, id=idWorker.get_id())) ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[1], test_val_json=sheet[f"c{i}"].value, test_user=test_user, id=idWorker.get_id())) ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[2], test_val_json=sheet[f"e{i}"].value, test_user=test_user, id=idWorker.get_id())) ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[3], test_val_json=sheet[f"f{i}"].value, test_user=test_user, id=idWorker.get_id())) ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[4], test_val_json=sheet[f"g{i}"].value, test_user=test_user, id=idWorker.get_id())) ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[5], test_val_json=sheet[f"j{i}"].value, test_user=test_user, id=idWorker.get_id())) ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[6], test_val_json=sheet[f"k{i}"].value, test_user=test_user, id=idWorker.get_id())) FtestItem.objects.bulk_create(ftestitems) else: break def daoru_mioitems(path:str, mio:MIO): from apps.utils.snowflake import idWorker from openpyxl import load_workbook wb = load_workbook(path, data_only=True) if "Sheet1" in wb.sheetnames: # 检查是否存在 sheet = wb["Sheet1"] # 获取工作表 else: raise ParseError("未找到Sheet1") mioitems = [] ind = 2 while sheet[f"b{ind}"].value: batch = sheet[f"b{ind}"].value material_number = sheet[f"a{ind}"].value try: material = Material.objects.get(number=material_number) except Exception as e: raise ParseError(f"未找到物料:{material_number} {e}") count = sheet[f"c{ind}"].value warehouse_name = sheet[f"d{ind}"].value try: warehouse = WareHouse.objects.get(name=warehouse_name) except Exception as e: raise ParseError(f"未找到仓库:{warehouse_name} {e}") mioitems.append(MIOItem(mio=mio, warehouse=warehouse, material=material, batch=batch, count=count, id=idWorker.get_id())) ind = ind + 1 MIOItem.objects.bulk_create(mioitems)