feat: 工艺路线校验

This commit is contained in:
caoqianming 2025-03-26 16:36:29 +08:00
parent bac30a2925
commit eaa5d75b99
4 changed files with 147 additions and 28 deletions

View File

@ -2,6 +2,7 @@ from django.db import models
from apps.system.models import CommonAModel, Dictionary, CommonBModel, CommonADModel, File, BaseModel
from rest_framework.exceptions import ParseError
from apps.utils.models import CommonBDModel
from collections import defaultdict, deque
class Process(CommonBModel):
"""
@ -260,6 +261,87 @@ class Route(CommonADModel):
raise ParseError('未指定统计步骤')
return rq
@classmethod
def validate_dag(cls, final_material_out:Material, rqs):
"""
校验工艺路线是否正确
- 所有 Route 必须有 material_in material_out
- 所有路径最终指向给定的 final_material_out
- 无循环依赖
- 所有物料必须能到达 final_material_out
"""
# 1. 检查字段完整性
invalid_routes = rqs.filter(
models.Q(material_in__isnull=True) |
models.Q(material_out__isnull=True)
)
if invalid_routes.exists():
raise ParseError("存在 Route 记录缺失 material_in 或 material_out")
# 2. 构建依赖图使用material_id作为键
graph = defaultdict(list) # {material_out_id: [material_in_id]}
reverse_graph = defaultdict(list) # {material_in_id: [material_out_id]}
all_material_ids = set()
for route in rqs.all():
out_id = route.material_out.id
in_id = route.material_in.id
graph[out_id].append(in_id)
reverse_graph[in_id].append(out_id)
all_material_ids.update({in_id, out_id})
# 3. 检查final_material_out是否是终点
final_id = final_material_out.id
if final_id in reverse_graph:
raise ParseError(
f"最终物料 {final_material_out.name}(ID:{final_id}) 不能作为任何Route的输入"
)
# 4. BFS检查路径可达性使用material_id操作
visited = set()
queue = deque([final_id])
while queue:
current_id = queue.popleft()
if current_id in visited:
continue
visited.add(current_id)
for in_id in graph.get(current_id, []):
queue.append(in_id)
# 5. 检查未到达的物料
unreachable_ids = all_material_ids - visited
if unreachable_ids:
unreachable_materials = Material.objects.filter(id__in=unreachable_ids).values_list('name', flat=True)
raise ParseError(
f"以下物料无法到达最终物料: {list(unreachable_materials)}"
)
# 6. DFS检查循环依赖
visited_cycle = set()
path_cycle = set()
def has_cycle(material_id):
if material_id in path_cycle:
return True
if material_id in visited_cycle:
return False
visited_cycle.add(material_id)
path_cycle.add(material_id)
for out_id in reverse_graph.get(material_id, []):
if has_cycle(out_id):
return True
path_cycle.remove(material_id)
return False
for material_id in all_material_ids:
if has_cycle(material_id):
cycle_material = Material.objects.get(id=material_id)
raise ParseError(f"循环依赖涉及物料: {cycle_material.name}(ID:{material_id})")
return True
class RouteMat(BaseModel):
route = models.ForeignKey(Route, verbose_name='关联路线', on_delete=models.CASCADE, related_name="routemat_route")
material = models.ForeignKey(Material, verbose_name='辅助物料', on_delete=models.CASCADE)

View File

@ -8,10 +8,44 @@ from django.utils import timezone
from datetime import date, timedelta
import math
from typing import List, Union
from collections import defaultdict, deque
class PmService:
@classmethod
def cal_x_task_count(cls, target_materialId:str, target_quantity, route_qs):
# 构建逆向依赖图 {输出物料: [所有能生产它的工序]}
graph = defaultdict(list)
for route in route_qs:
graph[route.material_out.id].append(route)
# 存储每个Route记录的生产数量 {route_id: 需生产数量}
step_production = defaultdict(int)
# 广度优先逆向遍历BFS
queue = deque([(target_materialId, target_quantity)])
while queue:
current_material, current_demand = queue.popleft()
for route in graph.get(current_material, []):
# 根据工序类型计算当前工序的生产量
if route.process.mtype == 20: # 拆分
production = current_demand * route.div_number / (route.out_rate / 100)
elif route.process.mtype == 30: # 合并
production = current_demand / route.div_number / (route.out_rate / 100)
else: # 正常
production = current_demand / (route.out_rate / 100)
# 记录当前工序的生产量(向上取整)
step_production[route.id] = math.ceil(production)
# 将输入物料需求加入队列
if route.material_in:
queue.append((route.material_in.id, production))
return dict(step_production)
@classmethod
def cal_real_task_count(cls, count: int, rate_list):
"""
@ -138,22 +172,18 @@ class PmService:
rqs = Route.get_routes(product)
if not rqs.exists():
raise ParseError('未配置工艺路线')
# 通过出材率校正任务数, out_rate 默认为 100
out_rate_list = [(rq.out_rate, rq.div_number, rq.process.mtype, rq) for rq in rqs]
count_task_list = cls.cal_real_task_count(count,out_rate_list)
Route.validate_dag(utask.material, rqs)
r_count_dict = cls.cal_x_task_count(utask.material.id, count, rqs)
# 创建小任务
for ind, val in enumerate(rqs):
if val.material_out:
halfgood = val.material_out
else:
raise ParseError(f'{ind+1}步-无输出物料')
if val.material_in:
material_in = val.material_in
elif ind > 0: # 默认是上一步的输出
material_in = rqs[ind-1].material_out
if val.is_autotask:
for ind, (routeId, count) in enumerate(r_count_dict.items()):
route = Route.objects.get(id=routeId)
material_in, material_out = route.material_in, route.material_out
if route.is_autotask:
# 找到工段
mgroups = Mgroup.objects.filter(process=val.process)
mgroups = Mgroup.objects.filter(process=route.process)
mgroups_count = mgroups.count()
if mgroups_count == 1:
pass
@ -165,15 +195,15 @@ class PmService:
if mgroups_count > 1:
raise ParseError(f'{ind+1}步-工段存在多个!')
mgroup = mgroups.first()
task_count_day = math.ceil(count_task_list[ind]/rela_days)
task_count_day = math.ceil(count/rela_days)
if rela_days >= 1:
for i in range(rela_days):
task_date = start_date + timedelta(days=i)
Mtask.objects.create(**{
'route': val,
'route': route,
'number': f'{number}_r{ind+1}_{i+1}',
'type': utask.type,
'material_out': halfgood,
'material_out': material_out,
'material_in': material_in,
'mgroup': mgroup,
'count': task_count_day,
@ -182,25 +212,25 @@ class PmService:
'utask': utask,
'create_by': user,
'update_by': user,
'is_count_utask': val.is_count_utask
'is_count_utask': route.is_count_utask
})
elif schedule_type == 'to_mgroup':
for indx, mgroup in enumerate(mgroups):
Mtask.objects.create(**{
'route': val,
'route': route,
'number': f'{number}_r{ind+1}_m{indx+1}',
'type': utask.type,
'material_out': halfgood,
'material_out': material_out,
'material_in': material_in,
'mgroup': mgroup,
'count': math.ceil(count_task_list[ind]/mgroups_count),
'count': math.ceil(count/mgroups_count),
'start_date': start_date,
'end_date': end_date,
'utask': utask,
'create_by': user,
'update_by': user,
'hour_work': val.hour_work,
'is_count_utask': val.is_count_utask
'hour_work': route.hour_work,
'is_count_utask': route.is_count_utask
})
else:
raise ParseError('不支持的排产类型')

View File

@ -11,6 +11,7 @@ from django.db.models import Sum, Subquery
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ParseError
from django.db.models import Count
from django.db import transaction
# Create your models here.
class SfLog(CommonADModel):
@ -610,14 +611,18 @@ class BatchSt(BaseModel):
return ins, True
@classmethod
def init_dag(cls, batch:str, force_init=False):
@transaction.atomic
def init_dag(cls, batch:str, force_update=False):
"""
更新批次数据
更新批次数据关系链
"""
pass
ins:BatchSt = cls.g_create(batch)
if ins.mio is None and ins.handover is None and ins.mlog is None:
force_update = True
if force_update:
pass
return ins
class BatchLog(BaseModel):
"""
TN: 拆合批变更记录

View File

@ -343,7 +343,9 @@ def mlog_submit(mlog: Mlog, user: User, now: Union[datetime.datetime, None]):
wpr_from = None
if item.mlogbw_from:
wpr_from = item.mlogbw_from.wpr
wpr = Wpr.change_or_new(number=item.number, wm=wm, ftest=item.ftest, wpr_from=wpr_from)
wpr = Wpr.change_or_new(number=item.number,
wm=wm, ftest=item.ftest,
wpr_from=wpr_from, batch_from=item.mlogb.batch)
item.wpr = wpr
item.save()