factory/apps/mtm/models.py

523 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.db import models
from apps.system.models import CommonAModel, Dictionary, CommonBModel, CommonADModel, File, BaseModel
from rest_framework.exceptions import ParseError
from apps.utils.models import CommonBDModel
from collections import defaultdict, deque
from django.db.models import Q
class Process(CommonBModel):
"""
工序
"""
PRO_PROD = 10
PRO_TEST = 20
PRO_NORMAL = 10
PRO_DIV = 20
PRO_MERGE = 30
name = models.CharField('工序名称', max_length=100)
type = models.PositiveSmallIntegerField("工序类型", default=PRO_PROD, choices=((PRO_PROD, '生产工序'), (PRO_TEST, '检验工序')))
mtype = models.PositiveSmallIntegerField("工序生产类型", default=PRO_NORMAL, choices=((PRO_NORMAL, '常规'), (PRO_DIV, '切分'), (PRO_MERGE, '合并')))
cate = models.CharField('大类', max_length=10, default='')
sort = models.PositiveSmallIntegerField('排序', default=1)
instruction = models.ForeignKey(
File, verbose_name='指导书', on_delete=models.SET_NULL, null=True, blank=True)
instruction_content = models.TextField('指导书内容', null=True, blank=True)
into_wm_mgroup = models.BooleanField('交接到工段', default=False)
store_notok = models.BooleanField('不合格品是否入库', default=False)
batch_append_equip = models.BooleanField('批号追加设备', default=False)
mlog_need_ticket = models.BooleanField('日志提交是否需要审批', default=False)
mstate_json = models.JSONField('中间状态', default=list, blank=True)
parent = models.ForeignKey('self', verbose_name='父工序', on_delete=models.CASCADE, null=True, blank=True)
class Meta:
verbose_name = '工序'
ordering = ['sort', 'create_time']
def get_canout_mat_ids(self):
"""获取可产出的materialIds
"""
return list(Route.objects.filter(process=self).values_list("material_out__id", flat=True).distinct())
# Create your models here.
class Material(CommonAModel):
MA_TYPE_BASE = 0
MA_TYPE_GOOD = 10
MA_TYPE_HALFGOOD = 20
MA_TYPE_MAINSO = 30
MA_TYPE_HELPSO = 40
MA_TYPE_TOOL = 50
MA_TYPE_HELPTOOL = 60
MA_TYPE_OFFICE = 70
type_choices = (
(MA_TYPE_BASE, '电/水/气'),
(MA_TYPE_GOOD, '成品'),
(MA_TYPE_HALFGOOD, '半成品'),
(MA_TYPE_MAINSO, '主要原料'),
(MA_TYPE_HELPSO, '辅助材料'),
(MA_TYPE_TOOL, '加工工具'),
(MA_TYPE_HELPTOOL, '辅助工装'),
(MA_TYPE_OFFICE, '办公用品')
)
MA_TRACKING_BATCH = 10
MA_TRACKING_SINGLE = 20
name = models.CharField('名称', max_length=50)
cate = models.CharField('大类', max_length=20, default='', blank=True)
number = models.CharField('编号', max_length=100, null=True, blank=True)
model = models.CharField(
'型号', max_length=100, null=True, blank=True)
specification = models.CharField(
'规格', max_length=100, null=True, blank=True)
code = models.CharField('标识', max_length=50, null=True, blank=True)
type = models.PositiveSmallIntegerField(
'物料类型', choices=type_choices, default=1, help_text=str(type_choices))
testitems = models.JSONField('检测项目', default=list, blank=True)
sort = models.FloatField('排序', default=1)
unit = models.CharField('基准计量单位', default='', max_length=10)
tracking = models.PositiveSmallIntegerField("追踪方式", default=10,
choices=((MA_TRACKING_BATCH, '批次'),
(MA_TRACKING_SINGLE, '单件')))
count = models.DecimalField('总库存', max_digits=14, decimal_places=3, default=0)
count_mb = models.DecimalField('仓库库存', max_digits=14, decimal_places=3, default=0)
count_wm = models.DecimalField('车间库存', max_digits=14, decimal_places=3, default=0)
count_safe = models.DecimalField('安全库存数', max_digits=14, decimal_places=3, null=True, blank=True)
week_esitimate_consume = models.DecimalField('周消耗预估', max_digits=14, decimal_places=3, null=True, blank=True)
process = models.ForeignKey(
Process, verbose_name='所用工序', on_delete=models.CASCADE, null=True, blank=True)
parent = models.ForeignKey(
'self', null=True, blank=True, on_delete=models.SET_NULL, verbose_name='父物料')
is_hidden = models.BooleanField('是否隐藏', default=False)
is_assemb = models.BooleanField('是否组合件', default=False)
need_route = models.BooleanField('是否需要定义工艺路线', default=False)
components = models.JSONField('组件', default=dict, null=False, blank=True)
brothers = models.JSONField('兄弟件', default=list, null=False, blank=True)
unit_price = models.DecimalField('单价', max_digits=14, decimal_places=2, null=True, blank=True)
into_wm = models.BooleanField('是否进入车间库存', default=True)
class Meta:
verbose_name = '物料表'
ordering = ['sort', '-create_time']
def __str__(self):
return f'{self.name}|{self.specification if self.specification else ""}|{self.model if self.model else ""}|{self.process.name if self.process else ""}'
class Shift(CommonBModel):
"""班次
"""
name = models.CharField('名称', max_length=50)
rule = models.CharField('所属规则', max_length=10, default='默认')
start_time_o = models.TimeField('开始时间')
end_time_o = models.TimeField('结束时间')
sort = models.PositiveSmallIntegerField('排序', default=1)
class Meta:
verbose_name = '班次'
class Srule(CommonBDModel):
"""
班组规则
"""
rule = models.JSONField('排班规则', default=list, blank=True)
class Team(CommonBModel):
"""班组, belong_dept为所属车间
"""
name = models.CharField('名称', max_length=50)
leader = models.ForeignKey(
'system.user', verbose_name='班长', on_delete=models.CASCADE)
class Mgroup(CommonBModel):
"""测点集
"""
M_SELF = 10
M_OTHER = 20
name = models.CharField('名称', max_length=50)
code = models.CharField('标识', max_length=50, null=True, blank=True)
cate = models.CharField(
'分类', max_length=50, default='section', help_text='section/other') # section是工段
mtype = models.PositiveSmallIntegerField("工段类型", default=10, help_text='10:自产 20:外协')
shift_rule = models.CharField('班次规则', max_length=10, default='默认')
process = models.ForeignKey(
Process, verbose_name='工序', on_delete=models.SET_NULL, null=True, blank=True)
product = models.ForeignKey(
Material, verbose_name='主要产品', on_delete=models.SET_NULL, null=True, blank=True, related_name='mgroup_product')
product_cost = models.ForeignKey(
Material, verbose_name='主要产品(成本计算)', on_delete=models.SET_NULL, null=True, blank=True, related_name='mgroup_product_cost')
input_materials = models.JSONField(
'直接材料', default=list, blank=True, help_text='material的ID列表')
test_materials = models.JSONField(
'检测材料', default=list, blank=True, help_text='material的ID列表')
batch_append_code = models.BooleanField('批号追加工段标识', default=False)
sort = models.PositiveSmallIntegerField('排序', default=1)
need_enm = models.BooleanField('是否进行能源监测', default=True)
is_running = models.BooleanField('是否正常运行', default=False)
class Meta:
verbose_name = '测点集'
ordering = ['sort', '-create_time']
def __str__(self) -> str:
return self.name
class TeamMember(BaseModel):
team = models.ForeignKey(Team, verbose_name='关联班组',
on_delete=models.CASCADE)
user = models.ForeignKey(
'system.user', verbose_name='成员', on_delete=models.CASCADE)
mgroup = models.ForeignKey(
Mgroup, verbose_name='所在工段', on_delete=models.CASCADE)
post = models.ForeignKey('system.post', verbose_name='岗位',
on_delete=models.SET_NULL, null=True, blank=True)
class Goal(CommonADModel):
"""目标
"""
mgroup = models.ForeignKey(
Mgroup, verbose_name='关联工段', on_delete=models.CASCADE, null=True, blank=True)
year = models.PositiveSmallIntegerField('')
goal_cate = models.ForeignKey(
Dictionary, verbose_name='目标种类', on_delete=models.CASCADE)
goal_val = models.FloatField('全年目标值')
goal_val_1 = models.FloatField('1月份目标值')
goal_val_2 = models.FloatField('2月份目标值')
goal_val_3 = models.FloatField('3月份目标值')
goal_val_4 = models.FloatField('4月份目标值')
goal_val_5 = models.FloatField('5月份目标值')
goal_val_6 = models.FloatField('6月份目标值')
goal_val_7 = models.FloatField('7月份目标值')
goal_val_8 = models.FloatField('8月份目标值')
goal_val_9 = models.FloatField('9月份目标值')
goal_val_10 = models.FloatField('10月份目标值')
goal_val_11 = models.FloatField('11月份目标值')
goal_val_12 = models.FloatField('12月份目标值')
class Meta:
unique_together = ("mgroup", "year", "goal_cate")
class RoutePack(CommonADModel):
"""
加工工艺
"""
RP_S_CREATE = 10
RP_S_AUDIT = 20
RP_S_CONFIRM = 30
RP_STATE = (
(RP_S_CREATE, '创建中'),
(RP_S_AUDIT, '审批中'),
(RP_S_CONFIRM, '已确认'),
)
state = models.PositiveSmallIntegerField('状态', default=10, choices=RP_STATE)
name = models.CharField('名称', max_length=100)
material = models.ForeignKey(
Material, verbose_name='产品', on_delete=models.CASCADE, null=True, blank=True)
ticket = models.ForeignKey('wf.ticket', verbose_name='关联工单',
on_delete=models.SET_NULL, related_name='routepack_ticket', null=True, blank=True, db_constraint=False)
document = models.ForeignKey("system.file", verbose_name='工艺文件', on_delete=models.SET_NULL, null=True, blank=True, db_constraint=False)
gjson = models.JSONField('子图结构', default=dict, blank=True)
def validate_and_return_gjson(self, raise_exception=True):
"""
校验所有子图,返回结构化数据:
{
final_material_out_id1: {
"routes": [1, 2, ...], # 子图关联的Route ID
"meta": {
"is_valid": True,
"errors": [],
"levels": {material_id: 层级深度}
}
},
final_material_out_id2: { ... }
}
"""
all_routes = Route.objects.filter(routepack=self).order_by('sort', 'process__sort', 'create_time')
result = {}
# 1. 构建正向和反向图material_id作为节点
forward_graph = defaultdict(list) # out_id -> [in_ids]
reverse_graph = defaultdict(list) # in_id -> [out_ids]
route_edges = defaultdict(list) # (in_id, out_id) -> [Route]
for route in all_routes:
if not (route.material_in and route.material_out):
raise ParseError(f"Route(ID:{route.id}) 缺失material_in或material_out")
in_id, out_id = route.material_in.id, route.material_out.id
forward_graph[out_id].append(in_id)
reverse_graph[in_id].append(out_id)
route_edges[(in_id, out_id)].append(route.id)
# 2. 识别所有最终节点未被任何in_id引用的out_id
all_out_ids = {r.material_out.id for r in all_routes}
all_in_ids = {r.material_in.id for r in all_routes}
final_ids = all_out_ids - all_in_ids
if not final_ids:
raise ParseError("未找到最终产品节点")
# 3. 为每个最终产品构建子图
for final_id in final_ids:
# BFS逆向遍历所有上游节点
visited = set()
queue = deque([final_id])
while queue:
current_out_id = queue.popleft()
if current_out_id in visited:
continue
visited.add(current_out_id)
# 将当前节点的所有上游in_id加入队列
for in_id in forward_graph.get(current_out_id, []):
if in_id not in visited:
queue.append(in_id)
# 收集所有关联Route只要out_id在子图内
subgraph_route_ids = []
for route in all_routes:
if route.material_out.id in visited:
subgraph_route_ids.append(route.id)
result[final_id] = {"routes": subgraph_route_ids}
return result
def get_gjson(self, need_update=False, final_material_id=None):
if not self.gjson or need_update:
self.gjson = self.validate_and_return_gjson()
self.save()
return self.gjson.get(final_material_id, {}) if final_material_id else self.gjson
def get_dags(self):
gjson = self.get_gjson()
materialIdList = []
routeIdList = []
for final_material_id, data in gjson.items():
materialIdList.append(final_material_id)
routeIdList.extend(data['routes'])
# 获取所有相关路由并确保关联了material_in和material_out
route_qs = Route.objects.filter(id__in=routeIdList).select_related("material_in", "material_out", "process")
# 收集所有相关物料ID并过滤掉None值
matids1 = [mid for mid in route_qs.values_list("material_in__id", flat=True) if mid is not None]
matids2 = [mid for mid in route_qs.values_list("material_out__id", flat=True) if mid is not None]
materialIdList.extend(matids1)
materialIdList.extend(matids2)
# 构建路由字典,添加空值检查
route_dict = {}
for r in route_qs:
if r.material_in and r.material_out: # 只有两个物料都存在时才添加
route_dict[r.id] = {
"label": r.process.name if r.process else "",
"source": r.material_in.id,
"target": r.material_out.id
}
# 获取所有物料信息
mat_qs = Material.objects.filter(id__in=materialIdList).order_by("process__sort", "create_time")
mat_dict = {mat.id: {"id": mat.id, "label": str(mat), "shape": "rect"} for mat in mat_qs}
res = {}
for final_material_id, data in gjson.items():
# 确保最终物料存在于mat_dict中
if final_material_id not in mat_dict:
continue
item = {"name": mat_dict[final_material_id]["label"]}
edges = []
nodes_set = set()
for route_id in data['routes']:
# 只处理存在于route_dict中的路由
if route_id in route_dict:
edges.append(route_dict[route_id])
nodes_set.update([route_dict[route_id]['source'], route_dict[route_id]['target']])
item['edges'] = edges
# 只添加存在于mat_dict中的节点
item['nodes'] = [mat_dict[node_id] for node_id in nodes_set if node_id in mat_dict]
res[final_material_id] = item
return res
def get_final_material_ids(self):
return list(self.get_gjson().keys())
class Route(CommonADModel):
"""
加工路线
"""
routepack = models.ForeignKey(RoutePack, verbose_name='关联路线包', on_delete=models.CASCADE, null=True, blank=True)
material = models.ForeignKey(
Material, verbose_name='关联产品', on_delete=models.CASCADE, null=True, blank=True)
process = models.ForeignKey(
Process, verbose_name='工序', on_delete=models.CASCADE, null=True, blank=True, related_name="route_p")
mgroup = models.ForeignKey(Mgroup, verbose_name='指定工段', on_delete=models.CASCADE,
null=True, blank=True, related_name='route_mgroup')
sort = models.PositiveSmallIntegerField('顺序', default=1)
is_autotask = models.BooleanField('是否自动排产', default=False)
material_in = models.ForeignKey(
Material, verbose_name='主要输入物料', on_delete=models.CASCADE, related_name='route_material_in', null=True, blank=True)
material_out = models.ForeignKey(
Material, verbose_name='主要输出物料', on_delete=models.CASCADE, related_name='route_material_out', null=True, blank=True)
is_count_utask = models.BooleanField('是否主任务统计', default=False)
out_rate = models.FloatField('出材率', default=100, blank=True)
div_number = models.PositiveSmallIntegerField('切分数量', default=1, blank=True)
hour_work = models.FloatField('工时', null=True, blank=True)
batch_bind = models.BooleanField('是否绑定批次', default=True)
materials = models.ManyToManyField(Material, verbose_name='关联辅助物料', related_name="route_materials",
through="mtm.routemat", blank=True)
parent = models.ForeignKey('self', verbose_name='上级路线', on_delete=models.CASCADE, null=True, blank=True)
def __str__(self):
x = ""
if self.material_in:
x = x + str(self.material_in) + ">"
if self.material_out:
x = x + str(self.material_out)
return x
@staticmethod
def get_routes(material: Material=None, routepack:RoutePack=None, routeIds=None):
"""
返回工艺路线带车间(不关联工艺包)
"""
if material:
kwargs = {'material': material, 'routepack__isnull': True}
rqs = Route.objects.filter(
**kwargs).order_by('sort', 'process__sort', 'create_time')
# if not rq.exists():
# raise ParseError('未配置工艺路线')
# if rq.first().material_in is None or rq.last().material_out is None or rq.last().material_out != rq.last().material:
# raise ParseError('首步缺少输入/最后一步缺少输出')
# if not rq.filter(is_count_utask=True).exists():
# raise ParseError('未指定统计步骤')
elif routepack:
rqs = Route.objects.filter(routepack=routepack).order_by('sort', 'process__sort', 'create_time')
elif routeIds:
rqs = Route.objects.filter(id__in=routeIds).order_by('sort', 'process__sort', 'create_time')
return rqs
@classmethod
def validate_dag(cls, final_material_out:Material, rqs):
"""
校验工艺路线是否正确:
- 所有 Route 必须有 material_in 和 material_out
- 所有路径最终指向给定的 final_material_out
- 无循环依赖
- 所有物料必须能到达 final_material_out
"""
# 1. 检查字段完整性
invalid_routes = rqs.filter(
models.Q(material_in__isnull=True) |
models.Q(material_out__isnull=True)
)
if invalid_routes.exists():
raise ParseError("存在 Route 记录缺失 material_in 或 material_out")
# 2. 构建依赖图使用material_id作为键
graph = defaultdict(list) # {material_out_id: [material_in_id]}
reverse_graph = defaultdict(list) # {material_in_id: [material_out_id]}
all_material_ids = set()
for route in rqs.all():
out_id = route.material_out.id
in_id = route.material_in.id
graph[out_id].append(in_id)
reverse_graph[in_id].append(out_id)
all_material_ids.update({in_id, out_id})
# 3. 检查final_material_out是否是终点
final_id = final_material_out.id
if final_id in reverse_graph:
# raise ParseError(
# f"最终物料 {final_material_out.name}(ID:{final_id}) 不能作为任何Route的输入"
# )
raise ParseError("最终物料不可作为输入")
# 4. BFS检查路径可达性使用material_id操作
visited = set()
queue = deque([final_id])
while queue:
current_id = queue.popleft()
if current_id in visited:
continue
visited.add(current_id)
for in_id in graph.get(current_id, []):
queue.append(in_id)
# 5. 检查未到达的物料
unreachable_ids = all_material_ids - visited
if unreachable_ids:
# unreachable_materials = Material.objects.filter(id__in=unreachable_ids).values_list('name', flat=True)
# raise ParseError(
# f"以下物料无法到达最终物料: {list(unreachable_materials)}"
# )
raise ParseError("存在无法到达的节点")
# 6. DFS检查循环依赖
visited_cycle = set()
path_cycle = set()
def has_cycle(material_id):
if material_id in path_cycle:
return True
if material_id in visited_cycle:
return False
visited_cycle.add(material_id)
path_cycle.add(material_id)
for out_id in reverse_graph.get(material_id, []):
if has_cycle(out_id):
return True
path_cycle.remove(material_id)
return False
for material_id in all_material_ids:
if has_cycle(material_id):
# cycle_material = Material.objects.get(id=material_id)
# raise ParseError(f"循环依赖涉及物料: {cycle_material.name}(ID:{material_id})")
raise ParseError('存在循环依赖')
return True
@classmethod
def get_dag(cls, rqs):
# 收集所有相关批次和边
nodes_set = set()
edges = []
for rq in rqs:
if rq.material_in and rq.material_out:
source = rq.material_in.id
target = rq.material_out.id
nodes_set.update([source, target])
edges.append({
'source': source,
'target': target,
'label': rq.process.name,
})
# 将批次号排序
nodes_qs = Material.objects.filter(id__in=nodes_set).order_by("process__sort", "create_time")
# 构建节点数据,默认使用'rect'形状
nodes = [{
'id': item.id,
'label': str(item),
'shape': 'rect' # 可根据业务需求调整形状
} for item in nodes_qs]
return {'nodes': nodes, 'edges': edges}
class RouteMat(BaseModel):
route = models.ForeignKey(Route, verbose_name='关联路线', on_delete=models.CASCADE, related_name="routemat_route")
material = models.ForeignKey(Material, verbose_name='辅助物料', on_delete=models.CASCADE)