factory/apps/hrm/tasks.py

138 lines
5.0 KiB
Python
Executable File

from __future__ import absolute_import, unicode_literals
import time
from datetime import datetime, timedelta
from celery import shared_task
from dateutil import tz
from django.core.cache import cache
from apps.hrm.models import Employee, Certificate
from apps.hrm.services import HrmService
from apps.third.dahua import dhClient
from apps.third.tapis import dhapis
from apps.utils.tasks import CustomTask
import os
from django.conf import settings
import shutil
@shared_task
def update_all_employee_not_atwork():
"""
将所有员工设为非在岗状态
"""
Employee.objects.all().update(is_atwork=False, last_check_time=None, not_work_remark=None)
@shared_task(base=CustomTask)
def correct_swip_task(start_time="", end_time=""):
# from apps.monitor.models import DrfRequestLog
# from apps.hrm.services import HrmService
# lgs = DrfRequestLog.objects.filter(path='/api/third/dahua/c_swip/', requested_at__gte=start_time, requested_at__lte=end_time)
# for i in lgs:
# HrmService.swipe(data=eval(i.data))
# 从大华历史记录校正打卡记录
now = datetime.now()
if start_time == "":
start_time = (now - timedelta(hours=25)).strftime("%Y-%m-%d %H:%M:%S")
if end_time == "":
end_time = now.strftime("%Y-%m-%d %H:%M:%S")
data0 = {
"pageNum":1,
"pageSize": "20",
"startSwingTime":start_time,
"endSwingTime":end_time,
"openType":"61"
}
_, count = dhClient.request(**dhapis['swipe_list_count'], json=data0)
data = {
"pageNum":1,
"pageSize": str(count),
"startSwingTime":start_time,
"endSwingTime":end_time,
"openType":"61"
}
_, res = dhClient.request(**dhapis['swipe_list'], json=data)
pageData = sorted(res['pageData'], key=lambda obj: obj['swingTime'])
for i in pageData:
if 'paperNumber' in i:
HrmService.swipe_next(i['channelCode'], i['paperNumber'], i['swingTime'], i['enterOrExit'], i)
# ClockRecord.objects.filter(type=30).delete()
@shared_task
def correct_card_time():
tzinfo = tz.gettz('Asia/Shanghai')
s_time_f = datetime.strptime("2022-11-15 14:20:20", "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzinfo)
eps = Employee.objects.filter(update_time__lte = s_time_f)
print(eps)
for ep in eps:
dh_face_card = ep.third_info.get('dh_face_card', None)
dh_face_card_end = ep.third_info.get('dh_face_card_end', None)
if dh_face_card and dh_face_card_end is None:
departmentId = 1
if ep.belong_dept:
try:
departmentId = ep.belong_dept.third_info['dh_id']
except Exception:
pass
# 获取卡片时间
_, res = dhClient.request(**dhapis['card_detail'], params={'cardNumber': dh_face_card})
time.sleep(1)
start_time_str, end_time_str = res['startDate'], res['endDate']
end_time_new = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S") + timedelta(hours=8)
end_time_new_str = end_time_new.strftime("%Y-%m-%d %H:%M:%S")
json_data = {
"cardNumber": dh_face_card,
"startDate": start_time_str,
"endDate": end_time_new_str,
"departmentId": departmentId,
}
_, res = dhClient.request(**dhapis['card_update'], json=json_data)
HrmService.save(ep, {'dh_face_card_start': start_time_str, 'dh_face_card_end': end_time_new_str})
print('已更新-' + ep.name + '-' + dh_face_card + '-' + end_time_new_str)
time.sleep(1)
@shared_task
def update_all_facedata_cache():
"""
更新人脸数据缓存
"""
facedata_queyset = Employee.objects.filter(face_data__dlib__isnull=False).values('id', 'face_data__dlib')
face_eps = []
face_datas = []
for i in facedata_queyset:
face_eps.append(i['id'])
face_datas.append(i['face_data__dlib'])
face_data_dict = {"eps": face_eps, "datas": face_datas}
cache.set('face_datas_dlib', face_data_dict)
@shared_task(base=CustomTask)
def delete_face_pkl(epId):
"""
尝试删除人脸库缓存文件
"""
if epId:
ep = Employee.objects.get(id=epId)
photo_path = settings.BASE_DIR + ep.photo
face_path = os.path.join(settings.FACE_PATH, f'{epId}.jpg')
shutil.copy(photo_path, face_path)
file_path = os.path.join(settings.BASE_DIR, 'media/face/representations_facenet.pkl')
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as e:
delete_face_pkl.apply_async(countdown=5)
@shared_task(base=CustomTask)
def update_global_face():
facedata = Employee.objects.filter(face_data__isnull=False,
user__is_active=True).values_list('id', 'face_data')
cache.set('global_face', list(facedata), timeout=None)
@shared_task(base=CustomTask)
def check_cert_state():
cert_qs = Certificate.objects.filter(state=Certificate.CERT_OK)
for cert in cert_qs:
cert.get_state(need_update=True)