# Create your tasks here from __future__ import absolute_import, unicode_literals import importlib from apps.utils.task import CustomTask from celery import shared_task from apps.wf.models import State, Ticket, TicketFlow, Transition @shared_task(base=CustomTask) def send_ticket_notice(ticket_id): """ 发送通知 """ pass @shared_task(base=CustomTask) def run_task(ticket_id: str): ticket = Ticket.objects.get(id=ticket_id) script_result = True script_result_msg = '' script_str = ticket.participant try: module, func = script_str.rsplit(".", 1) m = importlib.import_module(module) f = getattr(m, func) f(ticket) except Exception as e: script_result = False script_result_msg = e.__str__() ticket = Ticket.objects.filter(id=ticket_id).first() if not script_result: ticket.script_run_last_result = False ticket.save() # 记录日志 transition_obj = Transition.objects.filter(source_state=ticket.state, is_deleted=False).first() TicketFlow.objects.create(ticket=ticket, state=ticket.state, participant_type=State.PARTICIPANT_TYPE_ROBOT, participant_str='func:{}'.format(script_str), transition=transition_obj, suggestion=script_result_msg) # 自动流转 from apps.wf.services import WfService WfService.handle_ticket(ticket=ticket, transition=transition_obj, new_ticket_data=ticket.ticket_data, by_task=True)