import time from apps.ecm.service import check_not_in_place, snap_and_analyse from apps.ecm.models import EventCate from apps.opm.models import Operation, Opl, OplWorker, GasCheck, OplCert from apps.third.models import TDevice from apps.utils.sms import send_sms from apps.wf.models import Ticket, Transition from django_celery_results.models import TaskResult from threading import Thread import uuid from django.core.cache import cache from rest_framework.exceptions import ParseError def get_op_charger(state, ticket, new_ticket_data, handler): """_summary_ Args: state (_type_): 工作流节点实例 ticket (_type_): 工单实例 new_ticket_data (_type_): 提交的工单数据 handler (_type_): 处理人实例 """ opl = Opl.objects.filter(ticket=ticket).first() if opl: return [opl.charger.id] def get_op_workers(state, ticket, new_ticket_data, handler): opl = Opl.objects.filter(ticket=ticket).first() if opl: return list(OplWorker.objects.filter(opl=opl).values_list('worker__id', flat=True)) def get_op_monitor(state, ticket, new_ticket_data, handler): opl = Opl.objects.filter(ticket=ticket).first() if opl: return [opl.monitor.id] def bind_opl(ticket: Ticket, transition: Transition, new_ticket_data: dict): opl = Opl.objects.get(id=new_ticket_data['opl']) if '有限空间' in opl.cate.name and GasCheck.objects.filter(opl=opl).count() == 0: raise ParseError('该许可证需填写气体检测记录!') if ('高处' in opl.cate.name or '动火' in opl.cate.name) and OplCert.objects.filter(opl_worker__opl=opl).count() == 0: raise ParseError('该许可证需添加作业人员相关证书!') ticket_data = ticket.ticket_data ticket_data.update({ 'operation': opl.operation.id, # operation id值 'level': opl.level, 'power_days': opl.power_days, 'work_scope': opl.work_scope, 'monitor': opl.monitor.id, 'workers': list(OplWorker.objects.filter(opl=opl).values_list('worker__id', flat=True)), 'charger': opl.charger.id, 'dept_ter': opl.operation.dept_ter.id, 'dept_bus': opl.operation.dept_bus.id }) ticket.ticket_data = ticket_data ticket.create_by = opl.create_by ticket.belong_dept = opl.belong_dept ticket.save() if opl.ticket is None: opl.ticket = ticket opl.number = ticket.sn opl.save() op = opl.operation if op.state == Operation.OP_CREATE: op.state = Operation.OP_AUDIT op.save() def t_submit_close_mtask(ticket: Ticket, transition: Transition, new_ticket_data: dict): # 提交作业关闭时关闭作业监控 opl = Opl.objects.filter(ticket=ticket).first() if opl: close_mtask(opl) def opl_audit_end(ticket: Ticket): # 任务执行1 opl = Opl.objects.get(ticket=ticket) op = opl.operation if op.state == Operation.OP_AUDIT: op.state = Operation.OP_WAIT op.save() # 授予相关工作人员区域进入权限 # worker_ep_ids = list(OplWorker.objects.filter(opl=opl).values_list('worker__id', flat=True)) # 发送通知 phone = opl.create_by.phone if opl.create_by else None if phone: send_sms(phone=phone, template_code=1005, template_param={'name': ticket.workflow.name}) def opl_start(ticket: Ticket): # 任务执行2 开始许可证作业 opl = Opl.objects.get(ticket=ticket) op = opl.operation if op.state == Operation.OP_WAIT: op.state = Operation.OP_WORK op.save() # 检查作业人员是否就位 # check_not_in_place(opl) # 给摄像头加载循环拍照算法 start_mtask(opl) def start_mtask(opl: Opl): tv = uuid.uuid4() cache.set('opl_'+opl.id, tv, timeout=10800) Thread(target=opl_monitor, args=(opl, tv), daemon=True).start() return dict({'mtask_uid': tv}) def close_mtask(opl: Opl): """关闭监控线程 """ cache.delete('opl_' + opl.id) def opl_end(ticket: Ticket): """ 作业许可证关闭时执行 """ opl = Opl.objects.filter(ticket=ticket).first() if opl: # 关闭作业视频监控任务 close_mtask(opl) operation = opl.operation opls = Opl.objects.filter(operation=operation) opls.filter(ticket=None).delete() # 删除无用许可证 states = opls.values_list('ticket__state__type', flat=True) if 0 in states or 1 in states: # 查看工单状态 pass else: operation.state = Operation.OP_DONE operation.save() def opl_monitor(opl: Opl, tv: str): """作业视频监控 tv: 定义的线程ID """ tkey = 'opl_' + opl.id op = opl.operation # 找到作业点的摄像头, 如果指定摄像头就用指定的摄像头 vchannels = op.vchannels.all() if vchannels: # 找到作业需要加载的算法 algo_codes = list(EventCate.objects.filter(opl_cates=opl.cate).values_list('code', flat=True)) if algo_codes: while tv == cache.get(tkey, None): for i in vchannels: Thread(target=snap_and_analyse, args=(i, algo_codes, opl), daemon=True).start() time.sleep(10) cache.delete(tkey)