factory/apps/hrm/services.py

449 lines
19 KiB
Python
Executable File

import logging
import os
import time
import uuid
from datetime import datetime, timedelta
from dateutil import tz
from django.conf import settings
from django.utils import timezone
from django.core.cache import cache
from apps.system.models import User
from apps.hrm.models import ClockRecord, Employee
from apps.third.dahua import dhClient
from apps.third.models import TDevice
from apps.third.tapis import dhapis
from apps.utils.tools import rannum, ranstr
import numpy as np
myLogger = logging.getLogger('log')
class HrmService:
@classmethod
def sync_dahua_employee(cls, ep: Employee, old_photo='', start_time=None, end_time=None):
"""同步大华信息(员工/卡片/门禁)
Args:
ep (Employee): 人员实例
old_photo (str, optional): 原照片地址. Defaults to ''.
start_time (_type_, optional): 开人脸卡起始时间. Defaults to None.
end_time (_type_, optional): 开人脸卡结束时间. Defaults to None.
Returns:
_type_: _description_
"""
if not settings.DAHUA_ENABLED: # 如果大华没启用, 直接返回
return
dh_id = ep.third_info.get('dh_id', None)
dh_photo = ep.third_info.get('dh_photo', None)
dh_face_card = ep.third_info.get('dh_face_card', None)
departmentId = 1
if ep.belong_dept:
try:
departmentId = ep.belong_dept.third_info['dh_id']
except Exception:
pass
if ep.third_info.get('dh_id', None): # 如果有大华信息
dh_id = ep.third_info['dh_id']
dh_photo = ep.third_info['dh_photo']
json_data = {
"service": "ehs",
"id": dh_id,
"name": ep.name,
"code": ep.number if ep.number else ranstr(6),
"paperType": 111,
"paperNumber": ep.id_number,
"paperAddress": "default",
"departmentId": departmentId,
"phone": ep.phone,
"email": ep.email,
"sex": 1 if ep.gender == '' else 2,
"biosignatureTypeList": [3],
"personBiosignatures": [{
"type": 3,
"index": 1,
"path": dh_photo
}]
}
if ep.photo != old_photo and ep.photo:
_, res = dhClient.request(**dhapis['person_img_upload'], file_path_rela=ep.photo)
dh_photo = res["fileUrl"]
json_data.update(
{
"biosignatureTypeList": [3],
"personBiosignatures": [{
"type": 3,
"index": 1,
"path": dh_photo
}]
}
)
dhClient.request(**dhapis['person_update'], json=json_data)
ep = cls.save(ep, data={'dh_id': dh_id, 'dh_photo': dh_photo})
else:
_, res = dhClient.request(**dhapis['person_gen_id'])
dh_id = res['id']
json_data = {
"service": "ehs",
"id": dh_id,
"name": ep.name,
"code": ep.number if ep.number else ranstr(6),
"paperType": 111,
"paperNumber": ep.id_number,
"paperAddress": "default",
"departmentId": departmentId,
"phone": ep.phone,
"email": ep.email,
"sex": 1 if ep.gender == '' else 2
}
_, res = dhClient.request(**dhapis['person_img_upload'], file_path_rela=ep.photo)
dh_photo = res["fileUrl"]
json_data.update(
{
"biosignatureTypeList": [3],
"personBiosignatures": [{
"type": 3,
"index": 1,
"path": dh_photo
}]
}
)
_, res = dhClient.request(**dhapis['person_add'], json=json_data)
ep = cls.save(ep, data={'dh_id': dh_id, 'dh_photo': dh_photo})
# 开人脸卡
dh_face_card = cls.open_face_card(
ep=ep, dh_id=dh_id, departmentId=departmentId, start_time=start_time, end_time=end_time)
# 授予门禁权限
dh_dchannels = cls.door_auth(ep=ep)
return {'dh_id': dh_id, 'dh_photo': dh_photo, 'dh_face_card': dh_face_card, 'dh_dchannels': dh_dchannels}
@classmethod
def open_face_card(cls, ep, dh_id, departmentId, start_time, end_time):
"""开人脸卡/有卡就更新卡时间
"""
if ep.third_info.get('dh_face_card', None):
cardNumber = ep.third_info.get('dh_face_card')
# 如果有人脸卡就执行更新操作
if start_time is None: # 如果时间段未提供,跳过更新操作
pass
else:
startDate = timezone.localtime(start_time).strftime("%Y-%m-%d %H:%M:%S")
endDate = timezone.localtime(end_time).strftime("%Y-%m-%d %H:%M:%S")
json_data = {
"cardNumber": cardNumber,
"startDate": startDate,
"endDate": endDate,
"departmentId": departmentId,
}
_, res = dhClient.request(**dhapis['card_update'], json=json_data)
cls.save(ep, data={'dh_face_card_start': startDate, 'dh_face_card_end': endDate})
return cardNumber
else:
_, res = dhClient.request(**dhapis['card_gen_id'])
cardId = res['id']
cardNumber = str(ep.id)[3:8] + rannum(5)
if start_time is None: # 如果未规定时间范围, 默认1小时
now = datetime.now()
startDate = now.strftime("%Y-%m-%d %H:%M:%S")
endDate = (now+timedelta(minutes=60)).strftime("%Y-%m-%d %H:%M:%S")
else:
startDate = timezone.localtime(start_time).strftime("%Y-%m-%d %H:%M:%S")
endDate = timezone.localtime(end_time).strftime("%Y-%m-%d %H:%M:%S")
json_data = {
"id": cardId,
"cardNumber": cardNumber,
"category": 0,
"cardType": 0,
"personId": dh_id,
"departmentId": departmentId,
"startDate": startDate,
"endDate": endDate
}
dhClient.request(**dhapis['card_add'], json=json_data)
cls.save(ep, data={'dh_face_card': cardNumber,
'dh_face_card_start': startDate, 'dh_face_card_end': endDate})
# anum = 0 # 尝试次数以确保卡片生成
# while anum < 10:
# time.sleep(2)
# is_ok, res = dhClient.request(**dhapis['card_detail'], params={'cardNumber': cardNumber}, raise_exception=False)
# if is_ok == 'success':
# break
# anum = anum + 1
return cardNumber
@classmethod
def door_auth(cls, ep: Employee, dh_dchannels: list = []):
"""授予门禁权限
"""
from apps.third.models import TDevice
dh_face_card = ep.third_info.get('dh_face_card', None)
if dh_dchannels:
pass
else:
# 查找可授予的门禁通道号
dh_dchannels = list(TDevice.objects.filter(type=TDevice.DEVICE_DCHANNEL,
access_list__contains=ep.type).values_list('code', flat=True))
if dh_dchannels:
details = []
for i in dh_dchannels:
details.append({
"privilegeType": 1,
"resouceCode": i
})
json_data = {
"cardNumbers": [dh_face_card],
"timeQuantumId": 1,
"cardPrivilegeDetails": details
}
ind = 0
while ind < 8:
time.sleep(6)
try:
dhClient.request(**dhapis['card_door_authority'], json=json_data)
break
except Exception as e:
ind = ind + 1
if ind >= 8:
raise e
# time.sleep(5)
# is_ok, res = dhClient.request(**dhapis['card_door_authority'], json=json_data, raise_exception=False)
# if is_ok == 'success':
# break
# elif is_ok == 'fail' and '44999999' in res['code']:
# if ind
# ind = ind + 1
# continue
# elif is_ok == 'fail':
# raise ParseError(**res)
# elif is_ok == 'error':
# raise APIException(**DH_REQUEST_ERROR)
cls.save(ep, data={'dh_dchannels': dh_dchannels})
return dh_dchannels
@classmethod
def save(cls, ep: Employee, data: dict):
"""更新third_info
"""
ti = ep.third_info
ti.update(data)
ep.third_info = ti
ep.save()
return ep
@classmethod
def swipe(cls, data: dict):
"""
实时刷卡事件/用于记录考勤
有身份证号的
"""
id_number = data['info']['extend'].get('paperNumber', None)
if id_number:
nodeCode = data['info']['extend']['acsChannelCode']
swip_time = data['info']['extend']['swingTime']
e_type = data['info']['extend']['enterOrExit']
cls.swipe_next(nodeCode, id_number, swip_time, e_type, data['info']['extend'])
@classmethod
def swipe_offline(cls, data:dict):
"""离线刷卡推送记录处理
"""
for i in data['infoArray']:
id_number = i.get('paperNumber', None)
if id_number:
nodeCode = i['acsChannelCode']
swip_time = i['swingTime']
e_type = i['enterOrExit']
cls.swipe_next(nodeCode, id_number, swip_time, e_type, i)
@classmethod
def swipe_next(cls, nodeCode: str, id_number: str, swip_time: str, e_type:int, detail:dict):
from apps.rpm.models import Rpj
from apps.vm.models import Visit
# 进行相关方/访客项目更新
Visit.objects.filter(state=Visit.V_ENTER, visitors__employee__id_number=id_number).update(
state=Visit.V_WORKING)
Rpj.objects.filter(state=Rpj.RPJ_ENTER, remployees__employee__id_number=id_number).update(
state=Rpj.RPJ_WORKING)
# 打卡处理
device = TDevice.objects.filter(code=nodeCode).first()
ep = Employee.objects.filter(id_number=id_number, type__in=["employee", "remployee"]).first()
if device and device.is_clock and ep:
tzinfo = tz.gettz('Asia/Shanghai')
s_time_f = datetime.strptime(swip_time, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzinfo)
first_time = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day,
hour=0, minute=0, second=0, tzinfo=tzinfo)
end_time = datetime(year=s_time_f.year, month=s_time_f.month, day=s_time_f.day,
hour=23, minute=59, second=59, tzinfo=tzinfo)
card_type = 30
trigger = 'door'
note = ''
if e_type == 1:
card_type = 10
elif e_type == 2:
card_type = 20
elif e_type == 3:
time_10_x = datetime(year=s_time_f.year, month=s_time_f.month,
day=s_time_f.day, hour=7, minute=20, second=0, tzinfo=tzinfo)
time_10_y = datetime(year=s_time_f.year, month=s_time_f.month,
day=s_time_f.day, hour=11, minute=0, second=0, tzinfo=tzinfo)
time_20_x = datetime(year=s_time_f.year, month=s_time_f.month,
day=s_time_f.day, hour=16, minute=0, second=0, tzinfo=tzinfo)
time_20_y = datetime(year=s_time_f.year, month=s_time_f.month,
day=s_time_f.day, hour=18, minute=10, second=0, tzinfo=tzinfo)
if time_10_x < s_time_f < time_10_y:
card_type = 10
elif time_20_x < s_time_f < time_20_y:
card_type = 20
if card_type == 30:
note = '非打卡时间范围'
trigger = 'panel'
# 先直接创建记录
cr = ClockRecord.objects.filter(employee=ep, create_time=s_time_f).first()
if cr:
pass
else:
cr = ClockRecord()
cr.employee = ep
cr.create_time = s_time_f
cr.type = card_type
cr.exception_type = None
cr.trigger = trigger
cr.detail = detail
cr.note = note
cr.save()
if card_type == 10:
# 查找当天的进门记录
cr_10_q = ClockRecord.objects.filter(
type=10, employee=ep, create_time__gte=first_time, create_time__lte=end_time)
first_obj = cr_10_q.order_by('create_time').first()
cr_10_q.exclude(id=first_obj.id).update(type=30, exception_type=None)
elif card_type == 20:
# 查找当天的出门记录
cr_20_q = ClockRecord.objects.filter(
type=20, employee=ep, create_time__gte=first_time, create_time__lte=end_time)
last_obj = cr_20_q.order_by('-create_time').first()
cr_20_q.exclude(id=last_obj.id).update(type=30, exception_type=None)
# 判断是否有异常
# 找到最近的上下打卡时间不一定是当天的
cr_e = ClockRecord.objects.filter(create_time__lt=last_obj.create_time, employee=ep, type__in=[10, 20]).order_by('-create_time').first()
if cr_e and last_obj:
time_d = last_obj.create_time - cr_e.create_time
if cr_e.type == 10:
if time_d < timedelta(hours=7):
last_obj.exception_type = ClockRecord.E_TYPE_LESS
last_obj.save()
elif time_d > timedelta(hours=14):
last_obj.exception_type = ClockRecord.E_TYPE_MORE
last_obj.save()
elif time_d > timedelta(hours=10):
last_obj.exception_type = ClockRecord.E_TYPE_ADD
last_obj.save()
elif cr_e.type == 20:
last_obj.exception_type = ClockRecord.E_TYPE_MISS
last_obj.save()
# 记录在岗情况
last_obj_t = ClockRecord.objects.filter(employee=ep, type__in=[10,20]).order_by('-create_time').first()
if last_obj_t:
update_fields = {'last_check_time': last_obj_t.create_time}
if last_obj_t.type == 10:
update_fields['is_atwork'] = True
else:
update_fields['is_atwork'] = False
Employee.objects.filter(id=ep.id).update(**update_fields)
# 此处可触发安全帽事件逻辑
@classmethod
def get_facedata_from_img_x(cls, img_path):
import face_recognition
try:
picture_of_me = face_recognition.load_image_file(img_path)
my_face_encoding = face_recognition.face_encodings(picture_of_me, num_jitters=2)[0]
face_data_list = my_face_encoding.tolist()
return face_data_list, ''
except:
return None, '人脸数据获取失败请重新上传图片'
@classmethod
def face_compare_from_base64(cls, base64_data):
import face_recognition
filename = str(uuid.uuid4())
filepath = settings.BASE_DIR +'/temp/face_' + filename +'.png'
with open(filepath, 'wb') as f:
f.write(base64_data)
try:
unknown_picture = face_recognition.load_image_file(filepath)
unknown_face_encoding = face_recognition.face_encodings(unknown_picture, num_jitters=2)[0]
os.remove(filepath)
except:
os.remove(filepath)
return None, '识别失败,请调整位置'
# 匹配人脸库
face_datas = cache.get('face_datas_dlib', None) # 使用的是face_datas_dlib作为key的
if face_datas is None:
from apps.hrm.tasks import update_all_facedata_cache
update_all_facedata_cache()
face_datas = cache.get('face_datas_dlib')
results = face_recognition.compare_faces(face_datas['datas'],
unknown_face_encoding, tolerance=0.45)
face_distances = face_recognition.face_distance(face_datas['datas'], unknown_face_encoding)
best_match_index = np.argmin(face_distances)
if results[best_match_index]:
epId = face_datas['eps'][best_match_index]
return Employee.objects.get(id=epId), ''
# if True in results:
# first_match_index = results.index(True)
# # 识别成功
# ep = Employee.objects.get(id=face_datas['eps'][first_match_index])
# return ep, ''
return None, '人脸未匹配,请调整位置'
@classmethod
def face_find_from_base64(cls, base64_data):
from apps.utils.face import face_find
# from deepface import DeepFace
img_name = str(uuid.uuid4())
img_path = settings.BASE_DIR +'/temp/face_' + img_name +'.jpg'
with open(img_path, 'wb') as f:
f.write(base64_data)
# db_path = os.path.join(settings.BASE_DIR, 'media/face')
global_face = cache.get('global_face', None)
if global_face is None:
from apps.hrm.tasks import update_global_face
global_face = update_global_face()
global_face = cache.get('global_face')
try:
dfs = face_find(img_path=img_path, global_face=global_face)
os.remove(img_path)
except ValueError:
os.remove(img_path)
return None, '人脸未匹配,请调整位置'
# dfs = DeepFace.find(img_path=img_path, db_path=db_path)
df = dfs[0]
if not df.empty:
matched = df.iloc[0].identity
return Employee.objects.get(id=matched), ''
else:
return None, '人脸未匹配,请调整位置'
@classmethod
def get_facedata_from_img(cls, img_path):
try:
from deepface import DeepFace
embedding_objs = DeepFace.represent(img_path=img_path, model_name='Facenet')
return embedding_objs[0]["embedding"], ''
except Exception as e:
return None, '人脸数据获取失败请重新上传图片'