126 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			126 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
| from rest_framework.exceptions import ParseError
 | ||
| from apps.mtm.models import Process, Material
 | ||
| from apps.inm.models import WareHouse, MaterialBatch, MIOItem, MIOItemw
 | ||
| from apps.utils.tools import ranstr
 | ||
| from apps.mtm.services_2 import cal_material_count
 | ||
| 
 | ||
| def daoru_mb(path: str):
 | ||
|     """
 | ||
|     导入物料批次(如没有物料自动创建)
 | ||
|     """
 | ||
|     # 注释的是初次导入时做的数据矫正
 | ||
|     # objs1 = Material.objects.filter(specification__contains=' ')
 | ||
|     # for i in objs1:
 | ||
|     #     i.specification = i.specification.replace(' ', '')
 | ||
|     #     i.save()
 | ||
|     # objs2 = Material.objects.filter(specification__contains='×')
 | ||
|     # for i in objs2:
 | ||
|     #     i.specification = i.specification.replace('×', '*')
 | ||
|     #     i.save()
 | ||
|     # objs3 = (Material.objects.filter(
 | ||
|     #     specification__contains='优级') | Material.objects.filter(specification__contains='一级')).exclude(specification__contains='|')
 | ||
|     # for i in objs3:
 | ||
|     #     i.specification = i.specification.replace(
 | ||
|     #         '优级', '|优级').replace('一级', '|一级')
 | ||
|     #     i.save()
 | ||
|     type_dict = {"主要原料": 30, "半成品": 20, "成品": 10, "辅助材料": 40, "加工工具": 50, "辅助工装": 60, "办公用品": 70}
 | ||
|     from apps.utils.snowflake import idWorker
 | ||
|     from openpyxl import load_workbook
 | ||
| 
 | ||
|     wb = load_workbook(path)
 | ||
|     process_l = Process.objects.all()
 | ||
|     process_d = {p.name: p for p in process_l}
 | ||
|     warehouse_l = WareHouse.objects.all()
 | ||
|     warehouse_d = {w.name: w for w in warehouse_l}
 | ||
|     for sheet in wb.worksheets:
 | ||
|         i = 3
 | ||
|         while sheet[f"a{i}"].value:
 | ||
|             try:
 | ||
|                 type = type_dict[sheet[f"a{i}"].value.replace(" ", "")]
 | ||
|                 name = sheet[f"b{i}"].value.replace(" ", "")
 | ||
|                 specification = sheet[f"c{i}"].value.replace(" ", "")
 | ||
|                 if sheet[f"d{i}"].value and sheet[f"d{i}"].value.replace(" ", ""):
 | ||
|                     specification = specification + "|" + sheet[f"d{i}"].value.replace(" ", "")
 | ||
|                 model = sheet[f"e{i}"].value.replace(" ", "")
 | ||
|                 process = process_d[sheet[f"f{i}"].value.replace(" ", "")]
 | ||
|                 batch = sheet[f"g{i}"].value.replace(" ", "")
 | ||
|                 count = int(sheet[f"h{i}"].value)
 | ||
|                 warehouse = warehouse_d[sheet[f"i{i}"].value.replace(" ", "")]
 | ||
|             except KeyError as e:
 | ||
|                 raise ParseError(f"第{i}行数据有误:{str(e)}")
 | ||
|             material, _ = Material.objects.get_or_create(
 | ||
|                 type=type,
 | ||
|                 name=name,
 | ||
|                 specification=specification,
 | ||
|                 model=model,
 | ||
|                 process=process,
 | ||
|                 defaults={"type": type, "name": name, "specification": specification, "model": model, "process": process, "number": ranstr(6), "id": idWorker.get_id()},
 | ||
|             )
 | ||
|             MaterialBatch.objects.get_or_create(
 | ||
|                 material=material, batch=batch, warehouse=warehouse, defaults={"material": material, "batch": batch, "warehouse": warehouse, "count": count, "id": idWorker.get_id()}
 | ||
|             )
 | ||
|             cal_material_count([material.id])
 | ||
|             i = i + 1
 | ||
| 
 | ||
| def daoru_mioitem_test(path:str, mioitem:MIOItem):
 | ||
|     from apps.utils.snowflake import idWorker
 | ||
|     from openpyxl import load_workbook
 | ||
|     from apps.qm.models import TestItem, Ftest, Qct, FtestItem, FtestDefect
 | ||
| 
 | ||
|     qct = Qct.get(mioitem.material, tag="inm", type="in")
 | ||
|     if qct is None:
 | ||
|         raise ParseError("未找到检验表")
 | ||
|     
 | ||
|     t_name_list = ["配套序号", "棒编号", "棒最大外径/mm", "锥度/mm", "管编号", "管最大内径/mm", "配合间隙"]
 | ||
|     t_list = []
 | ||
|     for name in t_name_list:
 | ||
|         try:
 | ||
|             t_list.append(TestItem.objects.get(name=name))
 | ||
|         except TestItem.DoesNotExist:
 | ||
|             raise ParseError(f"未找到检验项:{name}")
 | ||
|         except TestItem.MultipleObjectsReturned:
 | ||
|             raise ParseError(f"检验项重复:{name}")
 | ||
| 
 | ||
|     test_user = mioitem.mio.mio_user
 | ||
|     test_date = mioitem.mio.inout_date
 | ||
| 
 | ||
|     wb = load_workbook(path, data_only=True)
 | ||
|     if "Sheet1" in wb.sheetnames:  # 检查是否存在
 | ||
|         sheet = wb["Sheet1"]  # 获取工作表
 | ||
|     else:
 | ||
|         raise ParseError("未找到Sheet1")
 | ||
| 
 | ||
|     mioitemws = MIOItemw.objects.filter(mioitem=mioitem).order_by("number")
 | ||
| 
 | ||
|     for ind, item in enumerate(mioitemws):
 | ||
|         ftest:Ftest = item.ftest
 | ||
|         if ftest is None:
 | ||
|             ftest = Ftest.objects.create(
 | ||
|                 type="purin", 
 | ||
|                 test_numer=item.number, 
 | ||
|                 qct=qct,
 | ||
|                 test_user=test_user,
 | ||
|                 is_ok=True,
 | ||
|                 test_date=test_date)
 | ||
|             item.ftest = ftest
 | ||
|             item.save()
 | ||
|         else:
 | ||
|             FtestItem.objects.filter(ftest=ftest).delete()
 | ||
|             FtestDefect.objects.filter(ftest=ftest).delete()
 | ||
|             ftest.is_ok = True
 | ||
|             ftest.defect_main = None
 | ||
|             ftest.save()
 | ||
| 
 | ||
|         i = ind + 4
 | ||
|         if sheet[f"c{i}"].value:
 | ||
|             ftestitems = []
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[0], test_val_json=sheet[f"b{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[1], test_val_json=sheet[f"c{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[2], test_val_json=sheet[f"e{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[3], test_val_json=sheet[f"f{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[4], test_val_json=sheet[f"g{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[5], test_val_json=sheet[f"j{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             ftestitems.append(FtestItem(ftest=ftest, testitem=t_list[6], test_val_json=sheet[f"k{i}"].value, test_user=test_user, id=idWorker.get_id()))
 | ||
|             FtestItem.objects.bulk_create(ftestitems)
 | ||
|         else:
 | ||
|             break |