import logging import os import time import uuid from datetime import datetime, timedelta from dateutil import tz from django.conf import settings from django.utils import timezone from django.core.cache import cache from apps.system.models import User from apps.hrm.models import ClockRecord, Employee from apps.third.dahua import dhClient from apps.third.models import TDevice from apps.third.tapis import dhapis from apps.utils.tools import rannum, ranstr import numpy as np myLogger = logging.getLogger('log') class HrmService: @classmethod def sync_dahua_employee(cls, ep: Employee, old_photo='', start_time=None, end_time=None): """同步大华信息(员工/卡片/门禁) Args: ep (Employee): 人员实例 old_photo (str, optional): 原照片地址. Defaults to ''. start_time (_type_, optional): 开人脸卡起始时间. Defaults to None. end_time (_type_, optional): 开人脸卡结束时间. Defaults to None. Returns: _type_: _description_ """ if not settings.DAHUA_ENABLED: # 如果大华没启用, 直接返回 return dh_id = ep.third_info.get('dh_id', None) dh_photo = ep.third_info.get('dh_photo', None) dh_face_card = ep.third_info.get('dh_face_card', None) departmentId = 1 if ep.belong_dept: try: departmentId = ep.belong_dept.third_info['dh_id'] except Exception: pass if ep.third_info.get('dh_id', None): # 如果有大华信息 dh_id = ep.third_info['dh_id'] dh_photo = ep.third_info['dh_photo'] json_data = { "service": "ehs", "id": dh_id, "name": ep.name, "code": ep.number if ep.number else ranstr(6), "paperType": 111, "paperNumber": ep.id_number, "paperAddress": "default", "departmentId": departmentId, "phone": ep.phone, "email": ep.email, "sex": 1 if ep.gender == '男' else 2, "biosignatureTypeList": [3], "personBiosignatures": [{ "type": 3, "index": 1, "path": dh_photo }] } if ep.photo != old_photo and ep.photo: _, res = dhClient.request( **dhapis['person_img_upload'], file_path_rela=ep.photo) dh_photo = res["fileUrl"] json_data.update( { "biosignatureTypeList": [3], "personBiosignatures": [{ "type": 3, "index": 1, "path": dh_photo }] } ) dhClient.request(**dhapis['person_update'], json=json_data) ep = cls.save(ep, data={'dh_id': dh_id, 'dh_photo': dh_photo}) else: _, res = dhClient.request(**dhapis['person_gen_id']) dh_id = res['id'] json_data = { "service": "ehs", "id": dh_id, "name": ep.name, "code": ep.number if ep.number else ranstr(6), "paperType": 111, "paperNumber": ep.id_number, "paperAddress": "default", "departmentId": departmentId, "phone": ep.phone, "email": ep.email, "sex": 1 if ep.gender == '男' else 2 } _, res = dhClient.request( **dhapis['person_img_upload'], file_path_rela=ep.photo) dh_photo = res["fileUrl"] json_data.update( { "biosignatureTypeList": [3], "personBiosignatures": [{ "type": 3, "index": 1, "path": dh_photo }] } ) _, res = dhClient.request(**dhapis['person_add'], json=json_data) ep = cls.save(ep, data={'dh_id': dh_id, 'dh_photo': dh_photo}) # 开人脸卡 dh_face_card = cls.open_face_card( ep=ep, dh_id=dh_id, departmentId=departmentId, start_time=start_time, end_time=end_time) # 授予门禁权限 dh_dchannels = cls.door_auth(ep=ep) return {'dh_id': dh_id, 'dh_photo': dh_photo, 'dh_face_card': dh_face_card, 'dh_dchannels': dh_dchannels} @classmethod def open_face_card(cls, ep, dh_id, departmentId, start_time, end_time): """开人脸卡/有卡就更新卡时间 """ if ep.third_info.get('dh_face_card', None): cardNumber = ep.third_info.get('dh_face_card') # 如果有人脸卡就执行更新操作 if start_time is None: # 如果时间段未提供,跳过更新操作 pass else: startDate = timezone.localtime( start_time).strftime("%Y-%m-%d %H:%M:%S") endDate = timezone.localtime( end_time).strftime("%Y-%m-%d %H:%M:%S") json_data = { "cardNumber": cardNumber, "startDate": startDate, "endDate": endDate, "departmentId": departmentId, } _, res = dhClient.request( **dhapis['card_update'], json=json_data) cls.save( ep, data={'dh_face_card_start': startDate, 'dh_face_card_end': endDate}) return cardNumber else: _, res = dhClient.request(**dhapis['card_gen_id']) cardId = res['id'] cardNumber = str(ep.id)[3:8] + rannum(5) if start_time is None: # 如果未规定时间范围, 默认1小时 now = datetime.now() startDate = now.strftime("%Y-%m-%d %H:%M:%S") endDate = (now+timedelta(minutes=60) ).strftime("%Y-%m-%d %H:%M:%S") else: startDate = timezone.localtime( start_time).strftime("%Y-%m-%d %H:%M:%S") endDate = timezone.localtime( end_time).strftime("%Y-%m-%d %H:%M:%S") json_data = { "id": cardId, "cardNumber": cardNumber, "category": 0, "cardType": 0, "personId": dh_id, "departmentId": departmentId, "startDate": startDate, "endDate": endDate } dhClient.request(**dhapis['card_add'], json=json_data) cls.save(ep, data={'dh_face_card': cardNumber, 'dh_face_card_start': startDate, 'dh_face_card_end': endDate}) # anum = 0 # 尝试次数以确保卡片生成 # while anum < 10: # time.sleep(2) # is_ok, res = dhClient.request(**dhapis['card_detail'], params={'cardNumber': cardNumber}, raise_exception=False) # if is_ok == 'success': # break # anum = anum + 1 return cardNumber @classmethod def door_auth(cls, ep: Employee, dh_dchannels: list = []): """授予门禁权限 """ from apps.third.models import TDevice dh_face_card = ep.third_info.get('dh_face_card', None) if dh_dchannels: # 只分配系统里有的 dh_dchannels = list(TDevice.objects.filter( code__in=dh_dchannels).values_list('code', flat=True)) else: # 查找可授予的门禁通道号 dh_dchannels = list(TDevice.objects.filter(type=TDevice.DEVICE_DCHANNEL, access_list__contains=ep.type).values_list('code', flat=True)) if dh_dchannels: details = [] for i in dh_dchannels: details.append({ "privilegeType": 1, "resouceCode": i }) json_data = { "cardNumbers": [dh_face_card], "timeQuantumId": 1, "cardPrivilegeDetails": details } ind = 0 while ind < 8: time.sleep(6) try: dhClient.request( **dhapis['card_door_authority'], json=json_data) break except Exception as e: ind = ind + 1 if ind >= 8: raise e # time.sleep(5) # is_ok, res = dhClient.request(**dhapis['card_door_authority'], json=json_data, raise_exception=False) # if is_ok == 'success': # break # elif is_ok == 'fail' and '44999999' in res['code']: # if ind # ind = ind + 1 # continue # elif is_ok == 'fail': # raise ParseError(**res) # elif is_ok == 'error': # raise APIException(**DH_REQUEST_ERROR) cls.save(ep, data={'dh_dchannels': dh_dchannels}) from apps.third.models import DoorAuth DoorAuth.objects.filter(employee=ep).delete() drs = [] for i in dh_dchannels: drs.append( DoorAuth(employee=ep, dchannel=TDevice.objects.get(code=i))) DoorAuth.objects.bulk_create(drs) return dh_dchannels @classmethod def save(cls, ep: Employee, data: dict): """更新third_info """ ti = ep.third_info ti.update(data) ep.third_info = ti ep.save() return ep @classmethod def swipe(cls, data: dict): """ 实时刷卡事件/用于记录考勤 有身份证号的 """ id_number = data['info']['extend'].get('paperNumber', None) if id_number: nodeCode = data['info']['extend']['acsChannelCode'] swip_time = data['info']['extend']['swingTime'] e_type = data['info']['extend']['enterOrExit'] cls.swipe_next(nodeCode, id_number, swip_time, e_type, data['info']['extend']) @classmethod def swipe_offline(cls, data: dict): """离线刷卡推送记录处理 """ for i in data['infoArray']: id_number = i.get('paperNumber', None) if id_number: nodeCode = i['acsChannelCode'] swip_time = i['swingTime'] e_type = i['enterOrExit'] cls.swipe_next(nodeCode, id_number, swip_time, e_type, i) @classmethod def swipe_next(cls, nodeCode: str, id_number: str, swip_time: str, e_type: int, detail: dict): from apps.rpm.models import Rpj from apps.vm.models import Visit # 进行相关方/访客项目更新 Visit.objects.filter(state=Visit.V_ENTER, visitors__employee__id_number=id_number).update( state=Visit.V_WORKING) Rpj.objects.filter(state=Rpj.RPJ_ENTER, remployees__employee__id_number=id_number).update( state=Rpj.RPJ_WORKING) # 打卡处理 device = TDevice.objects.filter(code=nodeCode).first() ep = Employee.objects.filter(id_number=id_number, type__in=[ "employee", "remployee"]).first() if device and device.is_clock and ep: tzinfo = tz.gettz('Asia/Shanghai') s_time_f = datetime.strptime( swip_time, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzinfo) first_time = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day, hour=0, minute=0, second=0, tzinfo=tzinfo) end_time = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day, hour=23, minute=59, second=59, tzinfo=tzinfo) card_type = 30 trigger = 'door' note = '' if e_type == 1: card_type = 10 elif e_type == 2: card_type = 20 elif e_type == 3: time_10_x = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day, hour=7, minute=20, second=0, tzinfo=tzinfo) time_10_y = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day, hour=11, minute=0, second=0, tzinfo=tzinfo) time_20_x = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day, hour=16, minute=0, second=0, tzinfo=tzinfo) time_20_y = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day, hour=18, minute=10, second=0, tzinfo=tzinfo) if time_10_x < s_time_f < time_10_y: card_type = 10 elif time_20_x < s_time_f < time_20_y: card_type = 20 if card_type == 30: note = '非打卡时间范围' trigger = 'panel' # 先直接创建记录 cr = ClockRecord.objects.filter( employee=ep, create_time=s_time_f).first() if cr: pass else: cr = ClockRecord() cr.employee = ep cr.create_time = s_time_f cr.type = card_type cr.exception_type = None cr.trigger = trigger cr.detail = detail cr.note = note cr.save() if card_type == 10: # 查找当天的进门记录 cr_10_q = ClockRecord.objects.filter( type=10, employee=ep, create_time__gte=first_time, create_time__lte=end_time) first_obj = cr_10_q.order_by('create_time').first() cr_10_q.exclude(id=first_obj.id).update( type=30, exception_type=None) elif card_type == 20: # 查找当天的出门记录 cr_20_q = ClockRecord.objects.filter( type=20, employee=ep, create_time__gte=first_time, create_time__lte=end_time) last_obj = cr_20_q.order_by('-create_time').first() cr_20_q.exclude(id=last_obj.id).update( type=30, exception_type=None) # 判断是否有异常 # 找到最近的上下打卡时间不一定是当天的 cr_e = ClockRecord.objects.filter(create_time__lt=last_obj.create_time, employee=ep, type__in=[ 10, 20]).order_by('-create_time').first() if cr_e and last_obj: time_d = last_obj.create_time - cr_e.create_time if cr_e.type == 10: if time_d < timedelta(hours=7): last_obj.exception_type = ClockRecord.E_TYPE_LESS last_obj.save() elif time_d > timedelta(hours=14): last_obj.exception_type = ClockRecord.E_TYPE_MORE last_obj.save() elif time_d > timedelta(hours=10): last_obj.exception_type = ClockRecord.E_TYPE_ADD last_obj.save() elif cr_e.type == 20: last_obj.exception_type = ClockRecord.E_TYPE_MISS last_obj.save() # 记录在岗情况 last_obj_t = ClockRecord.objects.filter( employee=ep, type__in=[10, 20]).order_by('-create_time').first() if last_obj_t: update_fields = {'last_check_time': last_obj_t.create_time} if last_obj_t.type == 10: update_fields['is_atwork'] = True else: update_fields['is_atwork'] = False Employee.objects.filter(id=ep.id).update(**update_fields) # 此处可触发安全帽事件逻辑 @classmethod def get_facedata_from_img_x(cls, img_path): import face_recognition try: picture_of_me = face_recognition.load_image_file(img_path) my_face_encoding = face_recognition.face_encodings( picture_of_me, num_jitters=2)[0] face_data_list = my_face_encoding.tolist() return face_data_list, '' except: return None, '人脸数据获取失败请重新上传图片' @classmethod def face_compare_from_base64(cls, base64_data): import face_recognition filename = str(uuid.uuid4()) filepath = settings.BASE_DIR + '/temp/face_' + filename + '.png' with open(filepath, 'wb') as f: f.write(base64_data) try: unknown_picture = face_recognition.load_image_file(filepath) unknown_face_encoding = face_recognition.face_encodings( unknown_picture, num_jitters=2)[0] os.remove(filepath) except: os.remove(filepath) return None, '识别失败,请调整位置' # 匹配人脸库 # 使用的是face_datas_dlib作为key的 face_datas = cache.get('face_datas_dlib', None) if face_datas is None: from apps.hrm.tasks import update_all_facedata_cache update_all_facedata_cache() face_datas = cache.get('face_datas_dlib') results = face_recognition.compare_faces(face_datas['datas'], unknown_face_encoding, tolerance=0.45) face_distances = face_recognition.face_distance( face_datas['datas'], unknown_face_encoding) best_match_index = np.argmin(face_distances) if results[best_match_index]: epId = face_datas['eps'][best_match_index] return Employee.objects.get(id=epId), '' # if True in results: # first_match_index = results.index(True) # # 识别成功 # ep = Employee.objects.get(id=face_datas['eps'][first_match_index]) # return ep, '' return None, '人脸未匹配,请调整位置' @classmethod def face_find_from_base64(cls, base64_data): from apps.utils.face import face_find # from deepface import DeepFace img_name = str(uuid.uuid4()) img_path = settings.BASE_DIR + '/temp/face_' + img_name + '.jpg' with open(img_path, 'wb') as f: f.write(base64_data) # db_path = os.path.join(settings.BASE_DIR, 'media/face') global_face = cache.get('global_face', None) if global_face is None: from apps.hrm.tasks import update_global_face global_face = update_global_face() global_face = cache.get('global_face') try: dfs = face_find(img_path=img_path, global_face=global_face) os.remove(img_path) except ValueError: os.remove(img_path) return None, '人脸未匹配,请调整位置' # dfs = DeepFace.find(img_path=img_path, db_path=db_path) df = dfs[0] if not df.empty: matched = df.iloc[0].identity return Employee.objects.get(id=matched), '' else: return None, '人脸未匹配,请调整位置' @classmethod def get_facedata_from_img(cls, img_path): try: from deepface import DeepFace embedding_objs = DeepFace.represent( img_path=img_path, model_name='Facenet') return embedding_objs[0]["embedding"], '' except Exception as e: return None, '人脸数据获取失败请重新上传图片' @classmethod def init_doorauth(cls): from apps.third.models import DoorAuth from apps.utils.snowflake import idWorker DoorAuth.objects.all().delete() for ind, ep in enumerate(Employee.objects.all().order_by('create_time')): third_info = ep.third_info if third_info.get('dh_dchannels', None): drs = [] for i in third_info['dh_dchannels']: id = idWorker.get_id() drs.append( DoorAuth(id=id, employee=ep, dchannel=TDevice.objects.get(code=i))) DoorAuth.objects.bulk_create(drs) print(f'{ind+1}-{ep.name}-doorauth已创建')