109 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			109 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
| from django.shortcuts import render
 | |
| from rest_framework.response import Response
 | |
| from rest_framework.viewsets import ModelViewSet, GenericViewSet
 | |
| from rest_framework.mixins import UpdateModelMixin, RetrieveModelMixin
 | |
| from apps.system.mixins import CreateUpdateModelAMixin, OptimizationMixin
 | |
| from apps.hrm.models import Employee
 | |
| from apps.hrm.serializers import EmployeeSerializer, FaceLoginSerializer
 | |
| import face_recognition
 | |
| from django.conf import settings
 | |
| from django.core.cache import cache
 | |
| import logging
 | |
| from rest_framework.generics import CreateAPIView
 | |
| from rest_framework import status
 | |
| from rest_framework_simplejwt.tokens import RefreshToken
 | |
| 
 | |
| from apps.system.models import User
 | |
| logger = logging.getLogger('log')
 | |
| 
 | |
| 
 | |
| def load_face_data(username:int, path:str):
 | |
|     """
 | |
|     将某用户face_encoding加载进缓存
 | |
|     """
 | |
|     face_datas = cache.get_or_set('face_datas', {}, timeout=None)
 | |
|     photo_path = settings.BASE_DIR + path
 | |
|     picture_of_me = face_recognition.load_image_file(photo_path)
 | |
|     my_face_encoding = face_recognition.face_encodings(picture_of_me)[0]
 | |
|     face_datas[username] = my_face_encoding
 | |
|     cache.set('face_datas', face_datas, timeout=None)
 | |
|     return my_face_encoding
 | |
| 
 | |
| # Create your views here.
 | |
| class EmployeeViewSet(CreateUpdateModelAMixin, OptimizationMixin, UpdateModelMixin, RetrieveModelMixin, GenericViewSet):
 | |
|     """
 | |
|     员工详细信息
 | |
|     """
 | |
|     perms_map = {'get': '*', 'put': 'employee_update'}
 | |
|     queryset = Employee.objects.all()
 | |
|     serializer_class = EmployeeSerializer
 | |
|     ordering = ['-pk']
 | |
|     
 | |
|     def perform_update(self, serializer):
 | |
|         instance = serializer.save(update_by = self.request.user)
 | |
|         try:
 | |
|             photo_path = settings.BASE_DIR + instance.photo
 | |
|             picture_of_me = face_recognition.load_image_file(photo_path)
 | |
|             my_face_encoding = face_recognition.face_encodings(picture_of_me)[0]
 | |
|             instance.face_data = my_face_encoding.tolist()
 | |
|             instance.save()
 | |
|         except:
 | |
|             logger.error('人脸识别出错')
 | |
| 
 | |
| import uuid
 | |
| import base64
 | |
| import os
 | |
| 
 | |
| def tran64(s):
 | |
|     missing_padding = len(s) % 4
 | |
|     if missing_padding != 0:
 | |
|         s = s+'='* (4 - missing_padding)
 | |
|     return s
 | |
| 
 | |
| class FaceLogin(CreateAPIView):
 | |
|     authentication_classes = []
 | |
|     permission_classes = []
 | |
|     serializer_class = FaceLoginSerializer
 | |
| 
 | |
| 
 | |
|     def create(self, request, *args, **kwargs):
 | |
|         """
 | |
|         人脸识别登录
 | |
|         """
 | |
|         # serializer = FaceLoginSerializer(data=request.data)
 | |
|         # serializer.is_valid(raise_exception=True)
 | |
|         filename = str(uuid.uuid4())
 | |
|         filepath = settings.BASE_DIR +'/temp/' + filename +'.png'
 | |
|         with open(filepath, 'wb') as f:
 | |
|             data = tran64(request.data.get('base64').replace(' ', '+'))
 | |
|             f.write(base64.urlsafe_b64decode(data))
 | |
|        # picture_of_me = face_recognition.load_image_file(settings.BASE_DIR +'/temp/me.png')
 | |
|        # my_face_encoding = face_recognition.face_encodings(picture_of_me)[0]
 | |
|         #results = face_recognition.compare_faces([my_face_encoding], unknown_face_encoding, tolerance=0.2)
 | |
|         try:
 | |
|             unknown_picture = face_recognition.load_image_file(filepath)
 | |
|             unknown_face_encoding = face_recognition.face_encodings(unknown_picture)[0]
 | |
|             os.remove(filepath)
 | |
|         except:
 | |
|             return Response('头像解码失败', status=status.HTTP_400_BAD_REQUEST)
 | |
| 
 | |
|         # 匹配人脸库
 | |
|         user_faces = Employee.objects.filter(face_data__isnull=False, user__is_active=True).values('user', 'face_data')
 | |
|         user_l = []
 | |
|         face_l = []
 | |
|         for i in user_faces:
 | |
|             user_l.append(i['user'])
 | |
|             face_l.append(i['face_data'])
 | |
| 
 | |
|         results = face_recognition.compare_faces(face_l, unknown_face_encoding, tolerance=0.5)
 | |
|         for index, value in enumerate(results):
 | |
|             if value:
 | |
|                 # 识别成功
 | |
|                 user = User.objects.get(id=user_l[index])
 | |
|                 refresh = RefreshToken.for_user(user)
 | |
|                 return Response({
 | |
|                     'refresh': str(refresh),
 | |
|                     'access': str(refresh.access_token),
 | |
|                     'username':user.username
 | |
|                 })      
 | |
|         return Response('未找到对应用户', status=status.HTTP_400_BAD_REQUEST) |