from django.db import models from apps.system.models import CommonAModel, Dictionary, CommonBModel, CommonADModel, File, BaseModel from rest_framework.exceptions import ParseError from apps.utils.models import CommonBDModel from collections import defaultdict, deque from django.db.models import Q from datetime import datetime, timedelta from django.utils import timezone class Process(CommonBModel): """ TN:工序 """ PRO_PROD = 10 PRO_TEST = 20 PRO_NORMAL = 10 PRO_DIV = 20 PRO_MERGE = 30 name = models.CharField('工序名称', max_length=100) type = models.PositiveSmallIntegerField("工序类型", default=PRO_PROD, choices=((PRO_PROD, '生产工序'), (PRO_TEST, '检验工序'))) mtype = models.PositiveSmallIntegerField("工序生产类型", default=PRO_NORMAL, choices=((PRO_NORMAL, '常规'), (PRO_DIV, '切分'), (PRO_MERGE, '合并'))) cate = models.CharField('大类', max_length=10, default='') sort = models.PositiveSmallIntegerField('排序', default=1) instruction = models.ForeignKey( File, verbose_name='指导书', on_delete=models.SET_NULL, null=True, blank=True) instruction_content = models.TextField('指导书内容', null=True, blank=True) into_wm_mgroup = models.BooleanField('交接到工段', default=False) store_notok = models.BooleanField('不合格品是否入库', default=False) batch_append_equip = models.BooleanField('批号追加设备', default=False) mlog_need_ticket = models.BooleanField('日志提交是否需要审批', default=False) mstate_json = models.JSONField('中间状态', default=list, blank=True) parent = models.ForeignKey('self', verbose_name='父工序', on_delete=models.CASCADE, null=True, blank=True) number_to_batch = models.BooleanField('个号转批号', default=False) wpr_number_rule = models.TextField("单个编号规则", null=True, blank=True) class Meta: verbose_name = '工序' ordering = ['sort', 'create_time'] def get_canout_mat_ids(self): """获取可产出的materialIds """ return list(Route.objects.filter(process=self).values_list("material_out__id", flat=True).distinct()) def get_canin_mat_ids(self): """获取可输入的materialIds """ return list(RouteMat.objects.filter(route__process=self).values_list("material__id", flat=True).distinct()) + \ list(Route.objects.filter(process=self).values_list("material_in__id", flat=True).distinct()) # Create your models here. class Material(CommonAModel): """TN:物料""" MA_TYPE_BASE = 0 MA_TYPE_GOOD = 10 MA_TYPE_HALFGOOD = 20 MA_TYPE_MAINSO = 30 MA_TYPE_HELPSO = 40 MA_TYPE_TOOL = 50 MA_TYPE_HELPTOOL = 60 MA_TYPE_OFFICE = 70 type_choices = ( (MA_TYPE_BASE, '电/水/气'), (MA_TYPE_GOOD, '成品'), (MA_TYPE_HALFGOOD, '半成品'), (MA_TYPE_MAINSO, '主要原料'), (MA_TYPE_HELPSO, '辅助材料'), (MA_TYPE_TOOL, '加工工具'), (MA_TYPE_HELPTOOL, '辅助工装'), (MA_TYPE_OFFICE, '办公用品') ) MA_TRACKING_BATCH = 10 MA_TRACKING_SINGLE = 20 name = models.CharField('名称', max_length=50) cate = models.CharField('大类', max_length=20, default='', blank=True) number = models.CharField('编号', max_length=100, null=True, blank=True) model = models.CharField( '型号', max_length=100, null=True, blank=True) specification = models.CharField( '规格', max_length=100, null=True, blank=True) code = models.CharField('标识', max_length=50, null=True, blank=True) type = models.PositiveSmallIntegerField( '物料类型', choices=type_choices, default=1, help_text=str(type_choices)) testitems = models.JSONField('检测项目', default=list, blank=True) sort = models.FloatField('排序', default=1) unit = models.CharField('基准计量单位', default='个', max_length=10) tracking = models.PositiveSmallIntegerField("追踪方式", default=10, choices=((MA_TRACKING_BATCH, '批次'), (MA_TRACKING_SINGLE, '单件'))) count = models.DecimalField('总库存', max_digits=14, decimal_places=3, default=0) count_mb = models.DecimalField('仓库库存', max_digits=14, decimal_places=3, default=0) count_wm = models.DecimalField('车间库存', max_digits=14, decimal_places=3, default=0) count_safe = models.DecimalField('安全库存数', max_digits=14, decimal_places=3, null=True, blank=True) week_esitimate_consume = models.DecimalField('周消耗预估', max_digits=14, decimal_places=3, null=True, blank=True) process = models.ForeignKey( Process, verbose_name='所用工序', on_delete=models.CASCADE, null=True, blank=True) parent = models.ForeignKey( 'self', null=True, blank=True, on_delete=models.SET_NULL, verbose_name='父物料') is_hidden = models.BooleanField('是否隐藏', default=False) is_assemb = models.BooleanField('是否组合件', default=False) need_route = models.BooleanField('是否需要定义工艺路线', default=False) components = models.JSONField('组件', default=dict, null=False, blank=True) brothers = models.JSONField('兄弟件', default=list, null=False, blank=True) unit_price = models.DecimalField('单价', max_digits=14, decimal_places=2, null=True, blank=True) into_wm = models.BooleanField('是否进入车间库存', default=True) bin_number_main = models.CharField('主库位号', max_length=50, null=True, blank=True) img = models.TextField('图片', null=True, blank=True) class Meta: verbose_name = '物料表' ordering = ['sort', '-create_time'] def __str__(self): return f'{self.name}|{self.specification if self.specification else ""}|{self.model if self.model else ""}|{self.process.name if self.process else ""}' class Shift(CommonBModel): """TN:班次 """ name = models.CharField('名称', max_length=50) rule = models.CharField('所属规则', max_length=10, default='默认') start_time_o = models.TimeField('开始时间') end_time_o = models.TimeField('结束时间') sort = models.PositiveSmallIntegerField('排序', default=1) class Meta: verbose_name = '班次' class Srule(CommonBDModel): """ TN:班组规则 """ rule = models.JSONField('排班规则', default=list, blank=True) class Team(CommonBModel): """TN:班组, belong_dept为所属车间 """ name = models.CharField('名称', max_length=50) leader = models.ForeignKey( 'system.user', verbose_name='班长', on_delete=models.CASCADE) class Mgroup(CommonBModel): """TN:测点集 """ M_SELF = 10 M_OTHER = 20 name = models.CharField('名称', max_length=50) code = models.CharField('标识', max_length=50, null=True, blank=True) cate = models.CharField( '分类', max_length=50, default='section', help_text='section/other') # section是工段 mtype = models.PositiveSmallIntegerField("工段类型", default=10, help_text='10:自产 20:外协') shift_rule = models.CharField('班次规则', max_length=10, default='默认') process = models.ForeignKey( Process, verbose_name='工序', on_delete=models.SET_NULL, null=True, blank=True) product = models.ForeignKey( Material, verbose_name='主要产品', on_delete=models.SET_NULL, null=True, blank=True, related_name='mgroup_product') product_cost = models.ForeignKey( Material, verbose_name='主要产品(成本计算)', on_delete=models.SET_NULL, null=True, blank=True, related_name='mgroup_product_cost') input_materials = models.JSONField( '直接材料', default=list, blank=True, help_text='material的ID列表') test_materials = models.JSONField( '检测材料', default=list, blank=True, help_text='material的ID列表') batch_append_code = models.BooleanField('批号追加工段标识', default=False) sort = models.PositiveSmallIntegerField('排序', default=1) need_enm = models.BooleanField('是否进行能源监测', default=True) is_running = models.BooleanField('是否正常运行', default=False) class Meta: verbose_name = '测点集' ordering = ['sort', '-create_time'] def __str__(self) -> str: return self.name def get_shift(self, w_s_time:datetime): # 如果没有时区信息,使用默认时区(东八区) if not timezone.is_aware(w_s_time): w_s_time = timezone.make_aware(w_s_time) else: w_s_time = timezone.localtime(w_s_time) shifts = Shift.objects.filter(rule=self.shift_rule).order_by('sort') if not shifts: raise ParseError(f"工段{self.name}未配置班次") # 处理跨天班次的情况 for shift in shifts: # 如果开始时间小于结束时间,表示班次在同一天内 if shift.start_time_o < shift.end_time_o: if shift.start_time_o <= w_s_time.time() < shift.end_time_o: return w_s_time.date(), shift else: # 班次跨天(如夜班从当天晚上到次日凌晨) if w_s_time.time() >= shift.start_time_o or w_s_time.time() < shift.end_time_o: # 如果当前时间在开始时间之后,属于当天 if w_s_time.time() >= shift.start_time_o: return w_s_time.date(), shift # 如果当前时间在结束时间之前,属于前一天 else: return (w_s_time - timedelta(days=1)).date(), shift # return w_s_time.date(), None class TeamMember(BaseModel): team = models.ForeignKey(Team, verbose_name='关联班组', on_delete=models.CASCADE) user = models.ForeignKey( 'system.user', verbose_name='成员', on_delete=models.CASCADE) mgroup = models.ForeignKey( Mgroup, verbose_name='所在工段', on_delete=models.CASCADE) post = models.ForeignKey('system.post', verbose_name='岗位', on_delete=models.SET_NULL, null=True, blank=True) class Goal(CommonADModel): """TN:目标 """ mgroup = models.ForeignKey( Mgroup, verbose_name='关联工段', on_delete=models.CASCADE, null=True, blank=True) year = models.PositiveSmallIntegerField('年') goal_cate = models.ForeignKey( Dictionary, verbose_name='目标种类', on_delete=models.CASCADE) goal_val = models.FloatField('全年目标值') goal_val_1 = models.FloatField('1月份目标值') goal_val_2 = models.FloatField('2月份目标值') goal_val_3 = models.FloatField('3月份目标值') goal_val_4 = models.FloatField('4月份目标值') goal_val_5 = models.FloatField('5月份目标值') goal_val_6 = models.FloatField('6月份目标值') goal_val_7 = models.FloatField('7月份目标值') goal_val_8 = models.FloatField('8月份目标值') goal_val_9 = models.FloatField('9月份目标值') goal_val_10 = models.FloatField('10月份目标值') goal_val_11 = models.FloatField('11月份目标值') goal_val_12 = models.FloatField('12月份目标值') class Meta: unique_together = ("mgroup", "year", "goal_cate") class RoutePack(CommonADModel): """ TN:加工工艺 """ RP_S_CREATE = 10 RP_S_AUDIT = 20 RP_S_CONFIRM = 30 RP_STATE = ( (RP_S_CREATE, '创建中'), (RP_S_AUDIT, '审批中'), (RP_S_CONFIRM, '已确认'), ) state = models.PositiveSmallIntegerField('状态', default=10, choices=RP_STATE) name = models.CharField('名称', max_length=100) material = models.ForeignKey( Material, verbose_name='产品', on_delete=models.CASCADE, null=True, blank=True) ticket = models.ForeignKey('wf.ticket', verbose_name='关联工单', on_delete=models.SET_NULL, related_name='routepack_ticket', null=True, blank=True, db_constraint=False) document = models.ForeignKey("system.file", verbose_name='工艺文件', on_delete=models.SET_NULL, null=True, blank=True, db_constraint=False) gjson = models.JSONField('子图结构', default=dict, blank=True) def validate_and_return_gjson(self, raise_exception=True): """ 校验所有子图,返回结构化数据: { final_material_out_id1: { "routes": [1, 2, ...], # 子图关联的Route ID "meta": { "is_valid": True, "errors": [], "levels": {material_id: 层级深度} } }, final_material_out_id2: { ... } } """ all_routes = Route.objects.filter(routepack=self).order_by('sort', 'process__sort', 'create_time') result = {} # 1. 构建正向和反向图(material_id作为节点) forward_graph = defaultdict(list) # out_id -> [in_ids] reverse_graph = defaultdict(list) # in_id -> [out_ids] route_edges = defaultdict(list) # (in_id, out_id) -> [Route] for route in all_routes: if not (route.material_in and route.material_out): raise ParseError(f"Route(ID:{route.id}) 缺失material_in或material_out") in_id, out_id = route.material_in.id, route.material_out.id forward_graph[out_id].append(in_id) reverse_graph[in_id].append(out_id) route_edges[(in_id, out_id)].append(route.id) # 2. 识别所有最终节点(未被任何in_id引用的out_id) all_out_ids = {r.material_out.id for r in all_routes} all_in_ids = {r.material_in.id for r in all_routes} final_ids = all_out_ids - all_in_ids if not final_ids: raise ParseError("未找到最终产品节点") # 3. 为每个最终产品构建子图 for final_id in final_ids: # BFS逆向遍历所有上游节点 visited = set() queue = deque([final_id]) while queue: current_out_id = queue.popleft() if current_out_id in visited: continue visited.add(current_out_id) # 将当前节点的所有上游in_id加入队列 for in_id in forward_graph.get(current_out_id, []): if in_id not in visited: queue.append(in_id) # 收集所有关联Route(只要out_id在子图内) subgraph_route_ids = [] for route in all_routes: if route.material_out.id in visited: subgraph_route_ids.append(route.id) result[final_id] = {"routes": subgraph_route_ids} return result def get_gjson(self, need_update=False, final_material_id=None): if not self.gjson or need_update: self.gjson = self.validate_and_return_gjson() self.save() return self.gjson.get(final_material_id, {}) if final_material_id else self.gjson def get_dags(self): gjson = self.get_gjson() materialIdList = [] routeIdList = [] for final_material_id, data in gjson.items(): materialIdList.append(final_material_id) routeIdList.extend(data['routes']) # 获取所有相关路由,并确保关联了material_in和material_out route_qs = Route.objects.filter(id__in=routeIdList).select_related("material_in", "material_out", "process") # 收集所有相关物料ID,并过滤掉None值 matids1 = [mid for mid in route_qs.values_list("material_in__id", flat=True) if mid is not None] matids2 = [mid for mid in route_qs.values_list("material_out__id", flat=True) if mid is not None] materialIdList.extend(matids1) materialIdList.extend(matids2) # 构建路由字典,添加空值检查 route_dict = {} for r in route_qs: if r.material_in and r.material_out: # 只有两个物料都存在时才添加 route_dict[r.id] = { "label": r.process.name if r.process else "", "source": r.material_in.id, "target": r.material_out.id, "id": r.id } # 获取所有物料信息 mat_qs = Material.objects.filter(id__in=materialIdList).order_by("process__sort", "create_time") mat_dict = {mat.id: {"id": mat.id, "label": str(mat), "shape": "rect"} for mat in mat_qs} res = {} for final_material_id, data in gjson.items(): # 确保最终物料存在于mat_dict中 if final_material_id not in mat_dict: continue item = {"name": mat_dict[final_material_id]["label"]} edges = [] nodes_set = set() for route_id in data['routes']: # 只处理存在于route_dict中的路由 if route_id in route_dict: edges.append(route_dict[route_id]) nodes_set.update([route_dict[route_id]['source'], route_dict[route_id]['target']]) item['edges'] = edges # 只添加存在于mat_dict中的节点 item['nodes'] = [mat_dict[node_id] for node_id in nodes_set if node_id in mat_dict] res[final_material_id] = item return res def get_final_material_ids(self): return list(self.get_gjson().keys()) class Route(CommonADModel): """ TN:加工路线 """ routepack = models.ForeignKey(RoutePack, verbose_name='关联路线包', on_delete=models.CASCADE, null=True, blank=True) material = models.ForeignKey( Material, verbose_name='关联产品', on_delete=models.CASCADE, null=True, blank=True) process = models.ForeignKey( Process, verbose_name='工序', on_delete=models.CASCADE, null=True, blank=True, related_name="route_p") mgroup = models.ForeignKey(Mgroup, verbose_name='指定工段', on_delete=models.CASCADE, null=True, blank=True, related_name='route_mgroup') sort = models.PositiveSmallIntegerField('顺序', default=1) is_autotask = models.BooleanField('是否自动排产', default=False) material_in = models.ForeignKey( Material, verbose_name='主要输入物料', on_delete=models.CASCADE, related_name='route_material_in', null=True, blank=True) material_out = models.ForeignKey( Material, verbose_name='主要输出物料', on_delete=models.CASCADE, related_name='route_material_out', null=True, blank=True) is_count_utask = models.BooleanField('是否主任务统计', default=False) out_rate = models.FloatField('出材率', default=100, blank=True) div_number = models.PositiveSmallIntegerField('切分数量', default=1, blank=True) hour_work = models.FloatField('工时', null=True, blank=True) batch_bind = models.BooleanField('是否绑定批次', default=True) materials = models.ManyToManyField(Material, verbose_name='关联辅助物料', related_name="route_materials", through="mtm.routemat", blank=True) parent = models.ForeignKey('self', verbose_name='上级路线', on_delete=models.CASCADE, null=True, blank=True, related_name="route_parent") params_json = models.JSONField('工艺参数', default=dict, blank=True) from_route = models.ForeignKey('self', verbose_name='来源路线', on_delete=models.SET_NULL, null=True, blank=True, related_name="route_f") def __str__(self): x = "" if self.material_in: x = x + str(self.material_in) + "->" if self.material_out: x = x + str(self.material_out) return x @staticmethod def get_routes(material: Material=None, routepack:RoutePack=None, routeIds=None): """ TN:返回工艺路线带车间(不关联工艺包) """ if material: kwargs = {'material': material, 'routepack__isnull': True} rqs = Route.objects.filter( **kwargs).order_by('sort', 'process__sort', 'create_time') # if not rq.exists(): # raise ParseError('未配置工艺路线') # if rq.first().material_in is None or rq.last().material_out is None or rq.last().material_out != rq.last().material: # raise ParseError('首步缺少输入/最后一步缺少输出') # if not rq.filter(is_count_utask=True).exists(): # raise ParseError('未指定统计步骤') elif routepack: rqs = Route.objects.filter(routepack=routepack).order_by('sort', 'process__sort', 'create_time') elif routeIds: rqs = Route.objects.filter(id__in=routeIds).order_by('sort', 'process__sort', 'create_time') return rqs @classmethod def validate_dag(cls, final_material_out:Material, rqs): """ TN:校验工艺路线是否正确: - 所有 Route 必须有 material_in 和 material_out - 所有路径最终指向给定的 final_material_out - 无循环依赖 - 所有物料必须能到达 final_material_out """ # 1. 检查字段完整性 invalid_routes = rqs.filter( models.Q(material_in__isnull=True) | models.Q(material_out__isnull=True) ) if invalid_routes.exists(): raise ParseError("存在 Route 记录缺失 material_in 或 material_out") # 2. 构建依赖图(使用material_id作为键) graph = defaultdict(list) # {material_out_id: [material_in_id]} reverse_graph = defaultdict(list) # {material_in_id: [material_out_id]} all_material_ids = set() for route in rqs.all(): out_id = route.material_out.id in_id = route.material_in.id graph[out_id].append(in_id) reverse_graph[in_id].append(out_id) all_material_ids.update({in_id, out_id}) # 3. 检查final_material_out是否是终点 final_id = final_material_out.id if final_id in reverse_graph: # raise ParseError( # f"最终物料 {final_material_out.name}(ID:{final_id}) 不能作为任何Route的输入" # ) raise ParseError("最终物料不可作为输入") # 4. BFS检查路径可达性(使用material_id操作) visited = set() queue = deque([final_id]) while queue: current_id = queue.popleft() if current_id in visited: continue visited.add(current_id) for in_id in graph.get(current_id, []): queue.append(in_id) # 5. 检查未到达的物料 unreachable_ids = all_material_ids - visited if unreachable_ids: # unreachable_materials = Material.objects.filter(id__in=unreachable_ids).values_list('name', flat=True) # raise ParseError( # f"以下物料无法到达最终物料: {list(unreachable_materials)}" # ) raise ParseError("存在无法到达的节点") # 6. DFS检查循环依赖 visited_cycle = set() path_cycle = set() def has_cycle(material_id): if material_id in path_cycle: return True if material_id in visited_cycle: return False visited_cycle.add(material_id) path_cycle.add(material_id) for out_id in reverse_graph.get(material_id, []): if has_cycle(out_id): return True path_cycle.remove(material_id) return False for material_id in all_material_ids: if has_cycle(material_id): # cycle_material = Material.objects.get(id=material_id) # raise ParseError(f"循环依赖涉及物料: {cycle_material.name}(ID:{material_id})") raise ParseError('存在循环依赖') return True @classmethod def get_dag(cls, rqs): # 收集所有相关批次和边 nodes_set = set() edges = [] for rq in rqs: if rq.material_in and rq.material_out: source = rq.material_in.id target = rq.material_out.id nodes_set.update([source, target]) edges.append({ 'source': source, 'target': target, 'label': rq.process.name, 'id': rq.id }) # 将批次号排序 nodes_qs = Material.objects.filter(id__in=nodes_set).order_by("process__sort", "create_time") # 构建节点数据,默认使用'rect'形状 nodes = [{ 'id': item.id, 'label': str(item), 'shape': 'rect' # 可根据业务需求调整形状 } for item in nodes_qs] return {'nodes': nodes, 'edges': edges} class RouteMat(BaseModel): """TN:工艺路线辅助物料""" route = models.ForeignKey(Route, verbose_name='关联路线', on_delete=models.CASCADE, related_name="routemat_route") material = models.ForeignKey(Material, verbose_name='辅助物料', on_delete=models.CASCADE)