from apps.system.models import Dept, User from apps.sam.models import OrderItem from apps.mtm.models import Route, Material, Mgroup, Process from rest_framework.exceptions import ParseError from apps.pm.models import Mtask, Utask from django.db.models.query import QuerySet from django.utils import timezone from datetime import date, timedelta import math from typing import List, Union from collections import defaultdict, deque class PmService: @classmethod def cal_x_task_count(cls, target_materialId: str, target_quantity, route_qs): # 构建逆向依赖图 {输出物料ID: [生产该物料的Route]} graph = defaultdict(list) graphin = defaultdict(list) for route in route_qs: if route.material_out: # 确保输出物料存在 graph[route.material_out.id].append(route) if route.material_in: # 确保输入物料存在 graphin[route.material_in.id].append((route, False)) # 存储每个物料的需求量 demands = defaultdict(float) demands[target_materialId] = target_quantity step_production = defaultdict(int) # 逆向遍历队列,从目标物料开始 queue = deque([target_materialId]) visited = set() # 防止重复处理 while queue: current_out_id = queue.popleft() if current_out_id in visited: continue visited.add(current_out_id) current_demand = demands.get(current_out_id, 0) if current_demand <= 0: continue # 获取生产该物料的所有Route routes = graph.get(current_out_id, []) if not routes: continue # 无生产路线,无法继续逆向 # 平均分配需求到各Route(根据业务需求可能需要调整分配策略) routes_count = len(routes) base_demand = current_demand // routes_count remainder = current_demand % routes_count for i, route in enumerate(routes): # 分配任务数 task_count = base_demand + 1 if i < remainder else base_demand step_production[route.id] = task_count # 累加任务数 # 根据工序类型计算所需输入物料 if route.process.mtype == 20: # 拆分工序 input_needed = math.ceil(task_count / route.div_number / (route.out_rate / 100)) elif route.process.mtype == 30: # 合并工序 input_needed = math.ceil(task_count * route.div_number / (route.out_rate / 100)) else: # 普通工序 input_needed = math.ceil(task_count / (route.out_rate / 100)) # 更新输入物料需求并加入队列 if route.material_in: input_material_id = route.material_in.id demands[input_material_id] += input_needed for index, (route_in, _) in enumerate(graphin[input_material_id]): if route_in == route: graphin[input_material_id][index] = (route_in, True) can_append = True for route_in, caled in graphin[input_material_id]: if caled is False: can_append = False break if can_append: queue.append(input_material_id) # 返回各Route的任务数量 return {route.id: step_production.get(route.id, 0) for route in route_qs} @classmethod def cal_real_task_count(cls, count: int, rate_list): """ 废弃 计算实际任务数 """ r_list = [] rate_len = len(rate_list) for ind, val in enumerate(rate_list): indx = ind*1 xcount = count*1 while indx < rate_len: if indx + 1 == rate_len: # 循环到最后一步 break div_number0 = 1 rate, div_number, mtype, rq = rate_list[indx+1] if rq.parent is not None: xcount = 0 else: if mtype == Process.PRO_DIV: div_number0 = div_number elif mtype == Process.PRO_MERGE: div_number0 = 1/div_number xcount = xcount/(rate/100)/div_number0 indx = indx + 1 r_list.append(math.ceil(xcount)) return r_list @classmethod def make_utasks_from_orderitems(cls, user, orderitemIds: List[str], start_date: date, end_date: date, belong_dept: Union[Dept, None]): start_date_str = start_date.strftime('%Y%m%d') if start_date >= end_date: raise ParseError('开始时间不可大于结束时间') orderitems = OrderItem.objects.filter(pk__in=orderitemIds) if orderitems.exclude(utask=None).exists(): raise ParseError('存在订单项已排任务!') rdict = {} for item in orderitems: productId = item.material.id if productId not in rdict: rdict[productId] = [item.material, item.count, [item.id]] else: rdict[productId][1] = rdict[productId][1] + item.count rdict[productId][2].append(item.id) # 生成大任务 make_list = [] for k, v in rdict.items(): xproduct, xcount, orderitemsId = v if xproduct.is_assemb: for key in xproduct.components: make_list.append([Material.objects.get( id=key), xcount*xproduct.components[key], orderitemsId]) else: make_list = [[xproduct, xcount, orderitemsId]] for i in make_list: material, count, orderitemsId = i utask = Utask.objects.create( number=f'{material.number}_{start_date_str}', material=material, count=count, start_date=start_date, end_date=end_date, create_by=user, update_by=user, belong_dept=belong_dept ) OrderItem.objects.filter(id__in=orderitemIds).update(utask=utask) @classmethod def schedue_mtasks(cls, user, utask: Utask, schedule_type: str = "to_day"): """ param schedule_type: to_day/to_mgroup 从大任务自动排产出小任务(按工段/按天分配) """ # from apps.wpm.services import make_sflogs if utask.state != Utask.UTASK_CREATED: raise ParseError('任务状态异常') utask.state = Utask.UTASK_DECOMPOSE utask.save() number, product, count, start_date, end_date = utask.number, utask.material, utask.count, utask.start_date, utask.end_date # 计算相差天数 rela_days = (end_date - start_date).days + 1 if utask.mgroup: # 如果存在指定的mgroup则直接排产 if schedule_type == 'to_day': if rela_days >= 1: task_count_day = math.ceil(count/rela_days) for i in range(rela_days): task_date = start_date + timedelta(days=i) Mtask.objects.create(**{ 'type': utask.type, 'number': f'{number}_{i+1}', 'material_out': utask.material, 'material_in': utask.material_in, 'mgroup': utask.mgroup, 'count': task_count_day, 'start_date': task_date, 'end_date': task_date, 'utask': utask, 'create_by': user, 'update_by': user, 'is_count_utask': True }) elif schedule_type == 'to_mgroup': Mtask.objects.create(**{ 'type': utask.type, 'number': f'{number}_r1', 'material_out': product, 'material_in': utask.material_in, 'mgroup': utask.mgroup, 'count': count, 'start_date': start_date, 'end_date': end_date, 'utask': utask, 'create_by': user, 'update_by': user, 'is_count_utask': True }) else: raise ParseError('不支持的排产类型') else: # 获取产品的加工路线 if utask.routepack: # 指定工艺路线 gjson_item = utask.routepack.get_gjson(final_material_id=product.id) if not gjson_item: raise ParseError("缺少该产品的生产子图") rqs = Route.get_routes(routeIds=gjson_item["routes"]) else: rqs = Route.get_routes(material=product) if not rqs.exists(): raise ParseError('未配置工艺路线') # 通过出材率校正任务数, out_rate 默认为 100 Route.validate_dag(product, rqs) r_count_dict = cls.cal_x_task_count(product.id, count, rqs) # 创建小任务 for ind, (routeId, count) in enumerate(r_count_dict.items()): route = Route.objects.get(id=routeId) material_in, material_out = route.material_in, route.material_out if route.is_autotask: # 找到工段 mgroups = Mgroup.objects.filter(process=route.process) mgroups_count = mgroups.count() if mgroups_count == 1: pass elif mgroups_count == 0: raise ParseError(f'第{ind+1}步-工段不存在!') else: # 存在同一工序的多个工段,先平均分配 pass if schedule_type == 'to_day': if mgroups_count > 1: raise ParseError(f'第{ind+1}步-工段存在多个!') mgroup = mgroups.first() task_count_day = math.ceil(count/rela_days) if rela_days >= 1: for i in range(rela_days): task_date = start_date + timedelta(days=i) Mtask.objects.create(**{ 'route': route, 'number': f'{number}_r{ind+1}_{i+1}', 'type': utask.type, 'material_out': material_out, 'material_in': material_in, 'mgroup': mgroup, 'count': task_count_day, 'start_date': task_date, 'end_date': task_date, 'utask': utask, 'create_by': user, 'update_by': user, 'is_count_utask': route.is_count_utask }) elif schedule_type == 'to_mgroup': base_demand = count // mgroups_count remainder = count % mgroups_count for indx, mgroup in enumerate(mgroups): Mtask.objects.create(**{ 'route': route, 'number': f'{number}_r{ind+1}_m{indx+1}', 'type': utask.type, 'material_out': material_out, 'material_in': material_in, 'mgroup': mgroup, 'count': base_demand + 1 if indx < remainder else base_demand, 'start_date': start_date, 'end_date': end_date, 'utask': utask, 'create_by': user, 'update_by': user, 'hour_work': route.hour_work, 'is_count_utask': route.is_count_utask }) else: raise ParseError('不支持的排产类型') @classmethod def check_orderitems(cls, orderitems: QuerySet[OrderItem]): """ 废弃 校验orderitems并返回整合后的字典以productId为key, [product, count, end_date, orderitems] 为value """ rdict = {} if orderitems.exclude(mtask=None).exists(): raise ParseError('存在订单项已排任务!') for item in orderitems: productId = item.material.id if productId not in rdict: rdict[productId] = [item.material, item.count, item.order.delivery_date, [item.id]] else: order_date = item.order.delivery_date if rdict[productId][2] > order_date: rdict[productId][2] = order_date rdict[productId][1] = rdict[productId][1] + item.count rdict[productId][3].append(item.id) return rdict @classmethod def schedue_from_orderitems(cls, user, orderitemIds: List[str], start_date: date, end_date: date = None): """ 废弃 从多个订单明细自动排产 """ orderitems = OrderItem.objects.filter(pk__in=orderitemIds) rdict = cls.check_orderitems(orderitems) start_date_str = start_date.strftime('%Y%m%d') for k, v in rdict.items(): xproduct, xcount, end_date_cal, orderitemId_list = v if end_date is None: end_date = end_date_cal if start_date >= end_date: raise ParseError('开始时间不可大于结束时间') # 计算相差天数 rela_days = (end_date - start_date).days + 1 make_list = [] if xproduct.is_assemb: for key in xproduct.components: make_list.append([Material.objects.get( id=key), xcount*xproduct.components[key], end_date, orderitemId_list]) else: make_list = [[xproduct, xcount, end_date, orderitemId_list]] for i in make_list: product, count, end_date, orderitemId_list = i # 获取每个产品的加工路线 rqs = Route.get_routes(product) lasttask = None # 创建父任务 for ind, val in enumerate(rqs): if val.material_out: halfgood = val.material_out else: raise ParseError(f'第{ind+1}步-无输出物料') if val.material_in: material_in = val.material_in elif ind > 0: material_in = rqs[ind-1].material_out if val.is_autotask: # 找到工段 mgroups = Mgroup.objects.filter(process=val.process) mgroups_count = mgroups.count() if mgroups_count == 1: mgroup = mgroups.first() elif mgroups_count == 0: raise ParseError(f'第{ind+1}步-工段不存在!') else: # 后面可能会指定车间 raise ParseError(f'第{ind+1}步-工段存在多个!') task_count = count if val.out_rate: if val.out_rate > 1: task_count = math.ceil( count / (val.out_rate/100)) else: task_count = math.ceil(count / val.out_rate) fmtask = Mtask.objects.create(**{ 'number': f'{product.number}_{start_date_str}_r{ind+1}', 'material_out': halfgood, 'material_in': material_in, 'mgroup': mgroup, 'count': task_count, 'start_date': start_date, 'end_date': end_date, 'create_by': user, 'update_by': user, }) task_count_day = math.ceil(task_count/rela_days) if rela_days > 1: for i in range(rela_days): task_date = start_date + timedelta(days=i) Mtask.objects.create(**{ 'number': f'{fmtask.number}_{i+1}', 'material_out': halfgood, 'material_in': material_in, 'mgroup': mgroup, 'count': task_count_day, 'start_date': task_date, 'end_date': task_date, 'parent': fmtask, 'create_by': user, 'update_by': user }) lasttask = fmtask OrderItem.objects.filter( id__in=orderitemId_list).update(mtask=lasttask) from apps.sam.tasks import change_order_state_when_schedue change_order_state_when_schedue(orderitemIds) @classmethod def mtask_submit(cls, mtask: Mtask, user: User): """ 生产任务提交 """ from apps.wpm.models import Mlog now = timezone.now() if mtask.state == Mtask.MTASK_ASSGINED: mlogs = Mlog.objects.filter(mtask=mtask)|Mlog.objects.filter(b_mlog__mtask=mtask) if not mlogs.exists(): raise ParseError('该任务没有日志') if mlogs.filter(submit_time__isnull=True).exists(): raise ParseError('存在未提交的日志') mtask.state = Mtask.MTASK_SUBMIT mtask.submit_time = now mtask.submit_user = user mtask.save() utask = mtask.utask if utask: cls.utask_submit(utask, raise_e=False) else: raise ParseError('该任务状态不可提交') @classmethod def utask_submit(cls, utask: Utask, raise_e=True): """ 生产大任务提交 """ if utask.state == Utask.UTASK_WORKING and Mtask.objects.filter(utask=utask).exclude(state=Mtask.MTASK_SUBMIT).count() == 0: utask.state = Utask.UTASK_SUBMIT utask.save() else: if raise_e: raise ParseError('存在子任务未提交')