from apps.wf.serializers import TicketSerializer, TicketSimpleSerializer from typing import Tuple from apps.system.models import User from apps.wf.models import CustomField, State, Ticket, Transition, Workflow from rest_framework.exceptions import APIException from django.utils import timezone from datetime import timedelta import random class WfService(object): @staticmethod def get_worlflow_states(workflow:Workflow): """ 获取工作流状态列表 """ return State.objects.filter(workflow=workflow, is_deleted=False).order_by('sort') @staticmethod def get_workflow_transitions(workflow:Workflow): """ 获取工作流流转列表 """ return Transition.objects.filter(workflow=workflow, is_deleted=False) @staticmethod def get_workflow_start_state(workflow:Workflow): """ 获取工作流初始状态 """ try: wf_state_obj = State.objects.get(workflow=workflow, type=State.STATE_TYPE_START, is_deleted=False) return wf_state_obj except: raise Exception('工作流状态配置错误') @staticmethod def get_workflow_custom_fields(workflow:Workflow): """ 获取工单字段 """ return CustomField.objects.filter(is_deleted=False, workflow=workflow).order_by('sort') @classmethod def get_ticket_transitions(cls, ticket:Ticket): """ 获取工单当前状态下可用的流转条件 """ return cls.get_state_transitions(ticket.state) @classmethod def get_state_transitions(cls, state:State): """ 获取状态可执行的操作 """ return Transition.objects.filter(is_deleted=False, source_state=state).all() @classmethod def get_ticket_steps(cls, ticket:Ticket): steps = cls.get_worlflow_states(ticket.workflow) for i in steps: if ticket.state.is_hidden and ticket.state != i: steps.remove(i) return steps @classmethod def get_ticket_transitions(cls, ticket:Ticket): """ 获取工单可执行的操作 """ return cls.get_state_transitions(ticket.state) @classmethod def get_transition_by_args(cls, kwargs:dict): """ 查询并获取流转 """ kwargs['is_deleted'] = False return Transition.objects.filter(**kwargs).all() @classmethod def get_ticket_sn(cls, workflow:Workflow): """ 生成工单流水号 """ now = timezone.now() today = str(now)[:10]+' 00:00:00' next_day = str(now+timedelta(days=1))[:10]+' 00:00:00' ticket_day_count_new = Ticket.objects.filter(create_time__gte=today, create_time__lte=next_day, workflow=workflow).count()+1 return '%s_%04d%02d%02d%04d' % (workflow.sn_prefix, now.year, now.month, now.day, ticket_day_count_new) @classmethod def get_next_state_by_transition_and_ticket_info(cls, ticket:Ticket, transition: Transition)->object: """ 获取下个节点状态 """ # if ticket: # 如果是新建工单 # source_state = ticket.state # else: # source_state = cls.get_workflow_start_state(workflow) # if transition.source_state != source_state: # raise APIException('流转错误') source_state = ticket.state destination_state = transition.destination_state if transition.condition_expression: pass return destination_state @classmethod def get_ticket_state_participant_info(cls, state:State, ticket:Ticket, ticket_data:dict={}): """ 获取工单目标状态实际的处理人, 处理人类型 """ if state.type == State.STATE_TYPE_START: """ 回到初始状态 """ return dict(destination_participant_type=State.PARTICIPANT_TYPE_PERSONAL, destination_participant=ticket.create_by.id, multi_all_person={}) elif state.type == State.STATE_TYPE_END: """ 到达结束状态 """ return dict(destination_participant_type=State.PARTICIPANT_TYPE_PERSONAL, destination_participant=0, multi_all_person={}) multi_all_person_dict = {} destination_participant_type, destination_participant = state.participant_type, state.participant if destination_participant_type == State.PARTICIPANT_TYPE_FIELD: destination_participant = ticket_data.get(destination_participant, None) if destination_participant in ticket_data else Ticket.ticket_data.get(destination_participant, None) elif destination_participant_type == State.PARTICIPANT_TYPE_DEPT:#单部门 destination_participant = list(User.objects.filter(dept=destination_participant).values_list('id', flat=True)) elif destination_participant_type == State.PARTICIPANT_TYPE_ROLE:#单角色 destination_participant = list(User.objects.filter(roles=destination_participant).values_list('id', flat=True)) if type(destination_participant) == list: destination_participant_type = State.PARTICIPANT_TYPE_MULTI destination_participant = list(set(destination_participant)) else: destination_participant_type = State.PARTICIPANT_TYPE_PERSONAL if destination_participant_type == State.PARTICIPANT_TYPE_MULTI: if state.distribute_type == State.STATE_DISTRIBUTE_TYPE_RANDOM: destination_participant = random.choice(destination_participant) elif state.distribute_type == State.STATE_DISTRIBUTE_TYPE_ALL: for i in destination_participant: multi_all_person_dict[i]={} return dict(destination_participant_type=destination_participant_type, destination_participant=destination_participant, multi_all_person=multi_all_person_dict) @classmethod def ticket_handle_permission_check(cls, ticket:Ticket, user:User)-> dict: transitions = cls.get_state_transitions(ticket.state) if not transitions: return dict(permission=True, msg="工单当前状态无需操作") current_participant_count = 1 participant_type = ticket.participant_type participant = ticket.participant state = ticket.state if participant_type == State.PARTICIPANT_TYPE_PERSONAL: if user.id != participant: return dict(permission=False, msg="非当前处理人") elif participant_type in [State.PARTICIPANT_TYPE_MULTI, State.PARTICIPANT_TYPE_DEPT, State.PARTICIPANT_TYPE_ROLE]: if user.id not in participant: return dict(permission=False, msg="非当前处理人") current_participant_count = len(participant) if current_participant_count >1 and state.distribute_type == State.STATE_DISTRIBUTE_TYPE_ACTIVE: return dict(permission=False, msg="需要先接单再处理") if ticket.in_add_node: return dict(permission=False, msg="工单当前处于加签中,请加签完成后操作") return dict(permission=True, msg="") @classmethod def check_dict_has_all_same_value(cls, dict_obj: object)->tuple: """ check whether all key are equal in a dict :param dict_obj: :return: """ value_list = [] for key, value in dict_obj.items(): value_list.append(value) value_0 = value_list[0] for value in value_list: if value_0 != value: return False return True @classmethod def get_ticket_all_field_value(cls, ticket: Ticket)->dict: """ 工单所有字段的值 get ticket's all field value :param ticket: :return: """ # 获取工单基础表中的字段中的字段信息 field_info_dict = TicketSimpleSerializer(instance=ticket).data # 获取自定义字段的值 custom_fields_queryset = cls.get_workflow_custom_fields(ticket.workflow) for i in custom_fields_queryset: field_info_dict[i.field_key] = ticket.ticket_data.get(i.field_key, None) return field_info_dict