factory/apps/pm/services.py

381 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from apps.system.models import Dept, User
from apps.sam.models import OrderItem
from apps.mtm.models import Route, Material, Mgroup, Process
from rest_framework.exceptions import ParseError
from apps.pm.models import Mtask, Utask
from django.db.models.query import QuerySet
from django.utils import timezone
from datetime import date, timedelta
import math
from typing import List, Union
from collections import defaultdict, deque
class PmService:
@classmethod
def cal_x_task_count(cls, target_materialId:str, target_quantity, route_qs):
# 构建逆向依赖图 {输出物料: [所有能生产它的工序]}
graph = defaultdict(list)
for route in route_qs:
graph[route.material_out.id].append(route)
# 存储每个Route记录的生产数量 {route_id: 需生产数量}
step_production = defaultdict(int)
# 广度优先逆向遍历BFS
queue = deque([(target_materialId, target_quantity)])
while queue:
current_material, current_demand = queue.popleft()
for route in graph.get(current_material, []):
# 根据工序类型计算当前工序的生产量
if route.process.mtype == 20: # 拆分
production = current_demand * route.div_number / (route.out_rate / 100)
elif route.process.mtype == 30: # 合并
production = current_demand / route.div_number / (route.out_rate / 100)
else: # 正常
production = current_demand / (route.out_rate / 100)
# 记录当前工序的生产量(向上取整)
step_production[route.id] = math.ceil(production)
# 将输入物料需求加入队列
if route.material_in:
queue.append((route.material_in.id, production))
return dict(step_production)
@classmethod
def cal_real_task_count(cls, count: int, rate_list):
"""
废弃
计算实际任务数
"""
r_list = []
rate_len = len(rate_list)
for ind, val in enumerate(rate_list):
indx = ind*1
xcount = count*1
while indx < rate_len:
if indx + 1 == rate_len: # 循环到最后一步
break
div_number0 = 1
rate, div_number, mtype, rq = rate_list[indx+1]
if rq.parent is not None:
xcount = 0
else:
if mtype == Process.PRO_DIV:
div_number0 = div_number
elif mtype == Process.PRO_MERGE:
div_number0 = 1/div_number
xcount = xcount/(rate/100)/div_number0
indx = indx + 1
r_list.append(math.ceil(xcount))
return r_list
@classmethod
def make_utasks_from_orderitems(cls, user, orderitemIds: List[str], start_date: date, end_date: date, belong_dept: Union[Dept, None]):
start_date_str = start_date.strftime('%Y%m%d')
if start_date >= end_date:
raise ParseError('开始时间不可大于结束时间')
orderitems = OrderItem.objects.filter(pk__in=orderitemIds)
if orderitems.exclude(utask=None).exists():
raise ParseError('存在订单项已排任务!')
rdict = {}
for item in orderitems:
productId = item.material.id
if productId not in rdict:
rdict[productId] = [item.material, item.count, [item.id]]
else:
rdict[productId][1] = rdict[productId][1] + item.count
rdict[productId][2].append(item.id)
# 生成大任务
make_list = []
for k, v in rdict.items():
xproduct, xcount, orderitemsId = v
if xproduct.is_assemb:
for key in xproduct.components:
make_list.append([Material.objects.get(
id=key), xcount*xproduct.components[key], orderitemsId])
else:
make_list = [[xproduct, xcount, orderitemsId]]
for i in make_list:
material, count, orderitemsId = i
utask = Utask.objects.create(
number=f'{material.number}_{start_date_str}',
material=material,
count=count,
start_date=start_date,
end_date=end_date,
create_by=user,
update_by=user,
belong_dept=belong_dept
)
OrderItem.objects.filter(id__in=orderitemIds).update(utask=utask)
@classmethod
def schedue_mtasks(cls, user, utask: Utask, schedule_type: str = "to_day"):
"""
param schedule_type: to_day/to_mgroup
从大任务自动排产出小任务(按工段/按天分配)
"""
# from apps.wpm.services import make_sflogs
if utask.state != Utask.UTASK_CREATED:
raise ParseError('任务状态异常')
utask.state = Utask.UTASK_DECOMPOSE
utask.save()
number, product, count, start_date, end_date = utask.number, utask.material, utask.count, utask.start_date, utask.end_date
# 计算相差天数
rela_days = (end_date - start_date).days + 1
if utask.mgroup: # 如果存在指定的mgroup则直接排产
if schedule_type == 'to_day':
if rela_days >= 1:
task_count_day = math.ceil(count/rela_days)
for i in range(rela_days):
task_date = start_date + timedelta(days=i)
Mtask.objects.create(**{
'type': utask.type,
'number': f'{number}_{i+1}',
'material_out': utask.material,
'material_in': utask.material_in,
'mgroup': utask.mgroup,
'count': task_count_day,
'start_date': task_date,
'end_date': task_date,
'utask': utask,
'create_by': user,
'update_by': user,
'is_count_utask': True
})
elif schedule_type == 'to_mgroup':
Mtask.objects.create(**{
'type': utask.type,
'number': f'{number}_r1',
'material_out': product,
'material_in': utask.material_in,
'mgroup': utask.mgroup,
'count': count,
'start_date': start_date,
'end_date': end_date,
'utask': utask,
'create_by': user,
'update_by': user,
'is_count_utask': True
})
else:
raise ParseError('不支持的排产类型')
else:
# 获取产品的加工路线
if utask.routepack: # 指定工艺路线
gjson_item = utask.routepack.get_gjson(final_material_id=product.id)
if not gjson_item:
raise ParseError("缺少该产品的生产子图")
rqs = Route.get_routes(gjson_item["routes"])
else:
rqs = Route.get_routes(material=product)
if not rqs.exists():
raise ParseError('未配置工艺路线')
# 通过出材率校正任务数, out_rate 默认为 100
Route.validate_dag(product, rqs)
r_count_dict = cls.cal_x_task_count(product.id, count, rqs)
# 创建小任务
for ind, (routeId, count) in enumerate(r_count_dict.items()):
route = Route.objects.get(id=routeId)
material_in, material_out = route.material_in, route.material_out
if route.is_autotask:
# 找到工段
mgroups = Mgroup.objects.filter(process=route.process)
mgroups_count = mgroups.count()
if mgroups_count == 1:
pass
elif mgroups_count == 0:
raise ParseError(f'{ind+1}步-工段不存在!')
else: # 存在同一工序的多个工段,先平均分配
pass
if schedule_type == 'to_day':
if mgroups_count > 1:
raise ParseError(f'{ind+1}步-工段存在多个!')
mgroup = mgroups.first()
task_count_day = math.ceil(count/rela_days)
if rela_days >= 1:
for i in range(rela_days):
task_date = start_date + timedelta(days=i)
Mtask.objects.create(**{
'route': route,
'number': f'{number}_r{ind+1}_{i+1}',
'type': utask.type,
'material_out': material_out,
'material_in': material_in,
'mgroup': mgroup,
'count': task_count_day,
'start_date': task_date,
'end_date': task_date,
'utask': utask,
'create_by': user,
'update_by': user,
'is_count_utask': route.is_count_utask
})
elif schedule_type == 'to_mgroup':
for indx, mgroup in enumerate(mgroups):
Mtask.objects.create(**{
'route': route,
'number': f'{number}_r{ind+1}_m{indx+1}',
'type': utask.type,
'material_out': material_out,
'material_in': material_in,
'mgroup': mgroup,
'count': math.ceil(count/mgroups_count),
'start_date': start_date,
'end_date': end_date,
'utask': utask,
'create_by': user,
'update_by': user,
'hour_work': route.hour_work,
'is_count_utask': route.is_count_utask
})
else:
raise ParseError('不支持的排产类型')
@classmethod
def check_orderitems(cls, orderitems: QuerySet[OrderItem]):
"""
废弃
校验orderitems并返回整合后的字典以productId为key, [product, count, end_date, orderitems] 为value
"""
rdict = {}
if orderitems.exclude(mtask=None).exists():
raise ParseError('存在订单项已排任务!')
for item in orderitems:
productId = item.material.id
if productId not in rdict:
rdict[productId] = [item.material, item.count,
item.order.delivery_date, [item.id]]
else:
order_date = item.order.delivery_date
if rdict[productId][2] > order_date:
rdict[productId][2] = order_date
rdict[productId][1] = rdict[productId][1] + item.count
rdict[productId][3].append(item.id)
return rdict
@classmethod
def schedue_from_orderitems(cls, user, orderitemIds: List[str], start_date: date, end_date: date = None):
"""
废弃
从多个订单明细自动排产
"""
orderitems = OrderItem.objects.filter(pk__in=orderitemIds)
rdict = cls.check_orderitems(orderitems)
start_date_str = start_date.strftime('%Y%m%d')
for k, v in rdict.items():
xproduct, xcount, end_date_cal, orderitemId_list = v
if end_date is None:
end_date = end_date_cal
if start_date >= end_date:
raise ParseError('开始时间不可大于结束时间')
# 计算相差天数
rela_days = (end_date - start_date).days + 1
make_list = []
if xproduct.is_assemb:
for key in xproduct.components:
make_list.append([Material.objects.get(
id=key), xcount*xproduct.components[key], end_date, orderitemId_list])
else:
make_list = [[xproduct, xcount, end_date, orderitemId_list]]
for i in make_list:
product, count, end_date, orderitemId_list = i
# 获取每个产品的加工路线
rqs = Route.get_routes(product)
lasttask = None
# 创建父任务
for ind, val in enumerate(rqs):
if val.material_out:
halfgood = val.material_out
else:
raise ParseError(f'{ind+1}步-无输出物料')
if val.material_in:
material_in = val.material_in
elif ind > 0:
material_in = rqs[ind-1].material_out
if val.is_autotask:
# 找到工段
mgroups = Mgroup.objects.filter(process=val.process)
mgroups_count = mgroups.count()
if mgroups_count == 1:
mgroup = mgroups.first()
elif mgroups_count == 0:
raise ParseError(f'{ind+1}步-工段不存在!')
else: # 后面可能会指定车间
raise ParseError(f'{ind+1}步-工段存在多个!')
task_count = count
if val.out_rate:
if val.out_rate > 1:
task_count = math.ceil(
count / (val.out_rate/100))
else:
task_count = math.ceil(count / val.out_rate)
fmtask = Mtask.objects.create(**{
'number': f'{product.number}_{start_date_str}_r{ind+1}',
'material_out': halfgood,
'material_in': material_in,
'mgroup': mgroup,
'count': task_count,
'start_date': start_date,
'end_date': end_date,
'create_by': user,
'update_by': user,
})
task_count_day = math.ceil(task_count/rela_days)
if rela_days > 1:
for i in range(rela_days):
task_date = start_date + timedelta(days=i)
Mtask.objects.create(**{
'number': f'{fmtask.number}_{i+1}',
'material_out': halfgood,
'material_in': material_in,
'mgroup': mgroup,
'count': task_count_day,
'start_date': task_date,
'end_date': task_date,
'parent': fmtask,
'create_by': user,
'update_by': user
})
lasttask = fmtask
OrderItem.objects.filter(
id__in=orderitemId_list).update(mtask=lasttask)
from apps.sam.tasks import change_order_state_when_schedue
change_order_state_when_schedue(orderitemIds)
@classmethod
def mtask_submit(cls, mtask: Mtask, user: User):
"""
生产任务提交
"""
from apps.wpm.models import Mlog
now = timezone.now()
if mtask.state == Mtask.MTASK_ASSGINED:
mlogs = Mlog.objects.filter(mtask=mtask)|Mlog.objects.filter(b_mlog__mtask=mtask)
if not mlogs.exists():
raise ParseError('该任务没有日志')
if mlogs.filter(submit_time__isnull=True).exists():
raise ParseError('存在未提交的日志')
mtask.state = Mtask.MTASK_SUBMIT
mtask.submit_time = now
mtask.submit_user = user
mtask.save()
else:
raise ParseError('该任务状态不可提交')
@classmethod
def utask_submit(cls, utask: Utask):
"""
生产大任务提交
"""
if utask.state == Utask.UTASK_WORKING and Mtask.objects.filter(utask=utask).exclude(state=Mtask.MTASK_SUBMIT).count() == 0:
utask.state = Utask.UTASK_SUBMIT
utask.save()
else:
raise ParseError('存在子任务未提交')