factory/apps/hrm/services.py

200 lines
7.6 KiB
Python
Executable File

from os import access
from apps.hrm.models import Employee
from apps.third.models import TDevice
from apps.third.tapis import dhapis
from apps.third.clients import dhClient
from apps.utils.tools import rannum
from datetime import datetime
class HrmService:
@classmethod
def sync_dahua_employee(cls, ep: Employee, old_photo='', start_time=None, end_time=None):
"""同步大华信息(员工/卡片/门禁)
Args:
ep (Employee): 人员实例
old_photo (str, optional): 原照片地址. Defaults to ''.
start_time (_type_, optional): 开人脸卡起始时间. Defaults to None.
end_time (_type_, optional): 开人脸卡结束时间. Defaults to None.
Returns:
_type_: _description_
"""
dh_id = ep.third_info.get('dh_id', None)
dh_photo = ep.third_info.get('dh_photo', None)
dh_face_card = ep.third_info.get('dh_face_card', None)
departmentId = 1
if ep.belong_dept:
try:
departmentId = ep.belong_dept.third_info['dh_id']
except Exception:
pass
if ep.third_info.get('dh_id', None): # 如果有大华信息
dh_id = ep.third_info['dh_id']
dh_photo = ep.third_info['dh_photo']
json_data = {
"service": "ehs",
"id": dh_id,
"name": ep.name,
"code": ep.number,
"paperType": 111,
"paperNumber": ep.id_number,
"paperAddress": "default",
"departmentId": departmentId,
"phone": ep.phone,
"email": ep.email,
"sex": 1 if ep.gender == '' else 2,
"biosignatureTypeList": [3],
"personBiosignatures": [{
"type": 3,
"index": 1,
"path": dh_photo
}]
}
if ep.photo != old_photo and ep.photo:
_, res = dhClient.request(**dhapis['person_img_upload'], file_path_rela=ep.photo)
dh_photo = res["fileUrl"]
json_data.update(
{
"biosignatureTypeList": [3],
"personBiosignatures": [{
"type": 3,
"index": 1,
"path": dh_photo
}]
}
)
dhClient.request(**dhapis['person_update'], json=json_data)
ep = cls.save(ep, data={'dh_face_card': dh_face_card})
else:
_, res = dhClient.request(**dhapis['person_gen_id'])
dh_id = res['id']
json_data = {
"service": "ehs",
"id": dh_id,
"name": ep.name,
"code": ep.number,
"paperType": 111,
"paperNumber": ep.id_number,
"paperAddress": "default",
"departmentId": departmentId,
"phone": ep.phone,
"email": ep.email,
"sex": 1 if ep.gender == '' else 2
}
_, res = dhClient.request(**dhapis['person_img_upload'], file_path_rela=ep.photo)
dh_photo = res["fileUrl"]
json_data.update(
{
"biosignatureTypeList": [3],
"personBiosignatures": [{
"type": 3,
"index": 1,
"path": dh_photo
}]
}
)
_, res = dhClient.request(**dhapis['person_add'], json=json_data)
ep = cls.save(ep, data={'dh_id': dh_id, 'dh_photo': dh_photo})
# 开人脸卡
dh_face_card = cls.open_face_card(
ep=ep, dh_id=dh_id, departmentId=departmentId, start_time=start_time, end_time=end_time)
# 授予门禁权限
dh_dchannels = cls.door_auth(ep=ep, dh_face_card=dh_face_card)
return {'dh_id': dh_id, 'dh_photo': dh_photo, 'dh_face_card': dh_face_card, 'dh_dchannels': dh_dchannels}
@classmethod
def open_face_card(cls, ep, dh_id, departmentId, start_time, end_time):
"""开人脸卡/有卡就更新卡时间
"""
if ep.third_info.get('dh_face_card', None):
cardNumber = ep.third_info.get('dh_face_card')
# 如果有人脸卡就执行更新操作
if start_time is None: # 如果时间段未提供,跳过更新操作
pass
else:
startDate = start_time.strftime("%Y-%m-%d %H:%M:%S")
endDate = end_time.strftime("%Y-%m-%d %H:%M:%S")
json_data = {
"cardNumber": cardNumber,
"startDate": startDate,
"endDate": endDate,
}
_, res = dhClient.request(**dhapis['card_update'])
return cardNumber
else:
_, res = dhClient.request(**dhapis['card_gen_id'])
cardId = res['id']
cardNumber = str(ep.id)[3:8] + rannum(5)
now = datetime.now()
if start_time is None: # 如果未规定时间
startDate = now.strftime("%Y-%m-%d %H:%M:%S")
endDate = (datetime(year=now.year+50,
month=now.month, day=1)).strftime("%Y-%m-%d %H:%M:%S")
else:
startDate = start_time.strftime("%Y-%m-%d %H:%M:%S")
endDate = end_time.strftime("%Y-%m-%d %H:%M:%S")
json_data = {
"id": cardId,
"cardNumber": cardNumber,
"category": 0,
"cardType": 0,
"personId": dh_id,
"departmentId": departmentId,
"startDate": startDate,
"endDate": endDate
}
_, res = dhClient.request(**dhapis['card_add'], json=json_data)
cls.save(ep, data={'dh_face_card': cardNumber})
return cardNumber
@classmethod
def door_auth(cls, ep: Employee, dh_face_card: str):
"""授予门禁权限
"""
from apps.third.models import TDevice
# 查找可授予的门禁
dh_dchannels = list(TDevice.objects.filter(type=TDevice.DEVICE_DCHANNEL,
access_list__contains=ep.type).values_list('code', flat=True))
details = []
for i in dh_dchannels:
details.append({
"privilegeType": 1,
"resouceCode": i
})
json_data = {
"cardNumbers": [dh_face_card],
"timeQuantumId": 1,
"cardPrivilegeDetails": details
}
dhClient.request(**dhapis['card_door_authority'], json=json_data)
cls.save(ep, data={'dh_dchannels': dh_dchannels})
return dh_dchannels
@classmethod
def save(cls, ep: Employee, data: dict):
"""更新third_info
"""
ti = ep.third_info
ti.update(data)
ep.third_info = ti
ep.save()
if ep.user and ep.user.phone != ep.phone:
ep.user.phone = ep.phone
ep.save()
return ep
@classmethod
def swipe(cls, data: dict):
"""
刷卡事件/用于记录考勤
"""
deviceCode = data['infoArray']['deviceCode']
device = TDevice.objects.filter(code=deviceCode).first()
if not device:
device = TDevice.objects.create(type=TDevice.DEVICE_DCHANNEL, code=deviceCode)
if device.is_clock:
pass