hberp/hb_server/apps/wf/services.py

155 lines
6.0 KiB
Python

from apps.system.models import User
from apps.wf.models import CustomField, State, Ticket, Transition, Workflow
from rest_framework.exceptions import APIException
from django.utils import timezone
from datetime import timedelta
import random
class WfService(object):
@staticmethod
def get_worlflow_states(workflow:Workflow):
"""
获取工作流状态列表
"""
return State.objects.filter(workflow=workflow, is_deleted=False).order_by('sort')
@staticmethod
def get_workflow_transitions(workflow:Workflow):
"""
获取工作流流转列表
"""
return Transition.objects.filter(workflow=workflow, is_deleted=False)
@staticmethod
def get_workflow_start_state(workflow:Workflow):
"""
获取工作流初始状态
"""
try:
wf_state_obj = State.objects.get(workflow=workflow, type=State.STATE_TYPE_START, is_deleted=False)
return wf_state_obj
except:
raise Exception('工作流状态配置错误')
@staticmethod
def get_workflow_custom_fields(workflow:Workflow):
"""
获取工单字段
"""
return CustomField.objects.filter(is_deleted=False, workflow=workflow).order_by('sort')
@classmethod
def get_ticket_transitions(cls, ticket:Ticket):
"""
获取工单当前状态下可用的流转条件
"""
return cls.get_state_transitions(ticket.state)
@classmethod
def get_state_transitions(cls, state:State):
"""
获取状态可执行的操作
"""
return Transition.objects.filter(is_deleted=False, source_state=state).all()
@classmethod
def get_ticket_steps(cls, ticket:Ticket):
steps = cls.get_worlflow_states(ticket.workflow)
for i in steps:
if ticket.state.is_hidden and ticket.state != i:
steps.remove(i)
return steps
@classmethod
def get_ticket_transitions(cls, ticket:Ticket):
"""
获取工单可执行的操作
"""
return cls.get_state_transitions(ticket.state)
@classmethod
def get_transition_by_args(cls, kwargs:dict):
"""
查询并获取流转
"""
kwargs['is_deleted'] = False
return Transition.objects.filter(**kwargs).all()
@classmethod
def get_ticket_sn(cls, workflow:Workflow):
"""
生成工单流水号
"""
now = timezone.now()
today = str(now)[:10]+' 00:00:00'
next_day = str(now+timedelta(days=1))[:10]+' 00:00:00'
ticket_day_count_new = Ticket.objects.filter(create_time__gte=today, create_time__lte=next_day, workflow=workflow).count()+1
return '%s_%04d%02d%02d%04d' % (workflow.sn_prefix, now.year, now.month, now.day, ticket_day_count_new)
@classmethod
def get_next_state_id_by_transition_and_ticket_info(cls, ticket:Ticket, transition: Transition)->object:
"""
获取下个节点状态
"""
# if ticket: # 如果是新建工单
# source_state = ticket.state
# else:
# source_state = cls.get_workflow_start_state(workflow)
# if transition.source_state != source_state:
# raise APIException('流转错误')
source_state = ticket.state
destination_state = transition.destination_state
if transition.condition_expression:
pass
return destination_state
@classmethod
def get_ticket_state_participant_info(cls, state:State, ticket:Ticket, ticket_data:dict={}):
"""
获取工单目标状态实际的处理人, 处理人类型
"""
if state.type == State.STATE_TYPE_START:
"""
回到初始状态
"""
return dict(destination_participant_type=State.PARTICIPANT_TYPE_PERSONAL,
destination_participant=ticket.create_by.id,
multi_all_person={})
elif state.type == State.STATE_TYPE_END:
"""
到达结束状态
"""
return dict(destination_participant_type=State.PARTICIPANT_TYPE_PERSONAL,
destination_participant=None,
multi_all_person={})
multi_all_person_dict = {}
destination_participant_type, destination_participant = State.participant_type, State.participant
if destination_participant_type == State.PARTICIPANT_TYPE_FIELD:
destination_participant = ticket_data.get(destination_participant, None) if destination_participant in ticket_data else Ticket.ticket_data.get(destination_participant, None)
if destination_participant_type == State.PARTICIPANT_TYPE_DEPT:#单部门
destination_participant = list(User.objects.filter(dept=destination_participant).values_list('id', flat=True))
if destination_participant_type == State.PARTICIPANT_TYPE_ROLE:#单角色
destination_participant = list(User.objects.filter(roles=destination_participant).values_list('id', flat=True))
if type(destination_participant) == list:
destination_participant_type = State.PARTICIPANT_TYPE_MULTI
destination_participant = list(set(destination_participant))
else:
destination_participant_type = State.PARTICIPANT_TYPE_PERSONAL
if destination_participant_type == State.PARTICIPANT_TYPE_MULTI:
if state.distribute_type == State.STATE_DISTRIBUTE_TYPE_RANDOM:
destination_participant = random.choice(destination_participant)
elif state.distribute_type == State.STATE_DISTRIBUTE_TYPE_ALL:
for i in destination_participant:
multi_all_person_dict[i]={}
return dict(destination_participant_type=destination_participant_type,
destination_participant=destination_participant,
multi_all_person=multi_all_person_dict)