from __future__ import absolute_import, unicode_literals import time from datetime import datetime, timedelta from celery import shared_task from dateutil import tz from django.core.cache import cache from apps.hrm.models import Employee, Certificate from apps.hrm.services import HrmService from apps.third.dahua import dhClient from apps.third.tapis import dhapis from apps.utils.tasks import CustomTask import os from django.conf import settings import shutil @shared_task def update_all_employee_not_atwork(): """ 将所有员工设为非在岗状态 """ Employee.objects.all().update(is_atwork=False, last_check_time=None, not_work_remark=None) @shared_task(base=CustomTask) def correct_swip_task(start_time="", end_time=""): # from apps.monitor.models import DrfRequestLog # from apps.hrm.services import HrmService # lgs = DrfRequestLog.objects.filter(path='/api/third/dahua/c_swip/', requested_at__gte=start_time, requested_at__lte=end_time) # for i in lgs: # HrmService.swipe(data=eval(i.data)) # 从大华历史记录校正打卡记录 now = datetime.now() if start_time == "": start_time = (now - timedelta(hours=25)).strftime("%Y-%m-%d %H:%M:%S") if end_time == "": end_time = now.strftime("%Y-%m-%d %H:%M:%S") data0 = { "pageNum":1, "pageSize": "20", "startSwingTime":start_time, "endSwingTime":end_time, "openType":"61" } _, count = dhClient.request(**dhapis['swipe_list_count'], json=data0) data = { "pageNum":1, "pageSize": str(count), "startSwingTime":start_time, "endSwingTime":end_time, "openType":"61" } _, res = dhClient.request(**dhapis['swipe_list'], json=data) pageData = sorted(res['pageData'], key=lambda obj: obj['swingTime']) for i in pageData: if 'paperNumber' in i: HrmService.swipe_next(i['channelCode'], i['paperNumber'], i['swingTime'], i['enterOrExit'], i) # ClockRecord.objects.filter(type=30).delete() @shared_task def correct_card_time(): tzinfo = tz.gettz('Asia/Shanghai') s_time_f = datetime.strptime("2022-11-15 14:20:20", "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzinfo) eps = Employee.objects.filter(update_time__lte = s_time_f) print(eps) for ep in eps: dh_face_card = ep.third_info.get('dh_face_card', None) dh_face_card_end = ep.third_info.get('dh_face_card_end', None) if dh_face_card and dh_face_card_end is None: departmentId = 1 if ep.belong_dept: try: departmentId = ep.belong_dept.third_info['dh_id'] except Exception: pass # 获取卡片时间 _, res = dhClient.request(**dhapis['card_detail'], params={'cardNumber': dh_face_card}) time.sleep(1) start_time_str, end_time_str = res['startDate'], res['endDate'] end_time_new = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S") + timedelta(hours=8) end_time_new_str = end_time_new.strftime("%Y-%m-%d %H:%M:%S") json_data = { "cardNumber": dh_face_card, "startDate": start_time_str, "endDate": end_time_new_str, "departmentId": departmentId, } _, res = dhClient.request(**dhapis['card_update'], json=json_data) HrmService.save(ep, {'dh_face_card_start': start_time_str, 'dh_face_card_end': end_time_new_str}) print('已更新-' + ep.name + '-' + dh_face_card + '-' + end_time_new_str) time.sleep(1) @shared_task def update_all_facedata_cache(): """ 更新人脸数据缓存 """ facedata_queyset = Employee.objects.filter(face_data__dlib__isnull=False).values('id', 'face_data__dlib') face_eps = [] face_datas = [] for i in facedata_queyset: face_eps.append(i['id']) face_datas.append(i['face_data__dlib']) face_data_dict = {"eps": face_eps, "datas": face_datas} cache.set('face_datas_dlib', face_data_dict) @shared_task(base=CustomTask) def delete_face_pkl(epId): """ 尝试删除人脸库缓存文件 """ if epId: ep = Employee.objects.get(id=epId) photo_path = settings.BASE_DIR + ep.photo face_path = os.path.join(settings.FACE_PATH, f'{epId}.jpg') shutil.copy(photo_path, face_path) file_path = os.path.join(settings.BASE_DIR, 'media/face/representations_facenet.pkl') if os.path.exists(file_path): try: os.remove(file_path) except Exception as e: delete_face_pkl.apply_async(countdown=5) @shared_task(base=CustomTask) def update_global_face(): facedata = Employee.objects.filter(face_data__isnull=False, user__is_active=True).values_list('id', 'face_data') cache.set('global_face', list(facedata), timeout=None) @shared_task(base=CustomTask) def check_cert_state(): cert_qs = Certificate.objects.filter(state=Certificate.CERT_OK) for cert in cert_qs: cert.get_state(need_update=True)