增加手机号校验

This commit is contained in:
曹前明 2022-08-30 12:09:52 +08:00
parent 75d006328a
commit 244c220cc7
12 changed files with 135 additions and 48 deletions

View File

@ -115,7 +115,7 @@ class RemindSerializer(CustomModelSerializer):
class AlgoChannelCreateSerializer(CustomModelSerializer):
class Meta:
model = AlgoChannel
fields = ['algo', 'vchannel', 'always_on']
fields = ['algo', 'vchannel', 'snap_on']
class AlgoChannelCreatesSerializer(CustomModelSerializer):

View File

@ -248,7 +248,7 @@ def dispatch_dahua_event(data: dict):
'cate': i,
'event': event
})
elif alarm_type in [302, 303] and vchannel:
elif alarm_type in [302, 303] and vchannel: # 拌线入侵/区域入侵
ec = EventCate.objects.filter(code=str(alarm_type)).first()
event = None
global_img_o = dhClient.get_full_pic(data['info']['alarmPicture'])

View File

@ -110,8 +110,8 @@ def snap_and_analyse(code: str, algo_codes: list, opl_id: str):
'cate': i,
'event': event
})
if event:
notify_event(event)
if event:
notify_event(event)
@shared_task
@ -119,10 +119,9 @@ def opl_task(vc_codes: list, opl_id: str):
"""作业监控任务
"""
opl_cate = Opl.objects.get(id=opl_id).cate
# 找到加载的算法
# 找到作业需要加载的算法
algo_codes = list(EventCate.objects.filter(opl_cates=opl_cate).values_list('code', flat=True))
while True:
for i in vc_codes:
snap_and_analyse.delay(i, algo_codes, opl_id)
time.sleep(2)
time.sleep(4)
time.sleep(2)

View File

@ -12,9 +12,10 @@ from apps.hrm.models import Certificate, ClockRecord, Employee, NotWorkRemark
from apps.system.serializers import DeptSimpleSerializer, UserSimpleSerializer
from django.db import transaction
from apps.third.clients import dhClient
import re
from django.conf import settings
from apps.utils.tools import check_phone_e
class EmployeeSimpleSerializer(CustomModelSerializer):
belong_dept_name = serializers.CharField(source='belong_dept.name', read_only=True)
@ -34,14 +35,8 @@ class EmployeeSimpleSerializer(CustomModelSerializer):
# return super().save(**kwargs)
def phone_check(phone):
re_phone = r'^1[358]\d{9}$|^147\d{8}$|^176\d{8}$'
if not re.match(re_phone, phone):
raise serializers.ValidationError(**PHONE_F_WRONG)
return phone
class EmployeeCreateUpdateSerializer(CustomModelSerializer):
phone = serializers.CharField(label="手机号", validators=[check_phone_e])
class Meta:
model = Employee

View File

@ -97,7 +97,7 @@ class RpjListSerializer(CustomModelSerializer):
# class RemployeeCreateSerializer(CustomModelSerializer):
# phone = serializers.CharField(label="手机号", validators=[phone_check])
# phone = serializers.CharField(label="手机号", validators=[])
# rparty = serializers.PrimaryKeyRelatedField(queryset=Rparty.objects.all(), label='相关方ID', required=False)
# class Meta:

View File

@ -8,6 +8,7 @@ from apps.system.services import sync_dahua_dept
from apps.utils.fields import MyFilePathField
from apps.utils.serializers import CustomModelSerializer
from apps.utils.constants import EXCLUDE_FIELDS, EXCLUDE_FIELDS_BASE
from apps.utils.tools import check_phone_e
from .models import (Dictionary, DictType, File, Dept, Permission, Post, PostRole,
Role, User, UserPost)
from rest_framework.exceptions import ParseError, ValidationError
@ -291,12 +292,6 @@ class UserListSerializer(CustomModelSerializer):
model = User
exclude = ['password', 'secret']
# def phone_check(phone):
# re_phone = '^1[358]\d{9}$|^147\d{8}$|^176\d{8}$'
# if not re.match(re_phone, phone):
# raise serializers.ValidationError(**PHONE_F_WRONG)
# return phone
def phone_exist(phone):
if User.objects.filter(phone=phone).exists():
@ -313,6 +308,8 @@ class UserUpdateSerializer(CustomModelSerializer):
"""
用户编辑序列化
"""
phone = serializers.CharField(required=False, validators=[check_phone_e])
class Meta:
model = User
fields = ['username', 'name', 'avatar', 'is_active', 'phone']
@ -329,7 +326,7 @@ class UserCreateSerializer(CustomModelSerializer):
创建用户序列化
"""
username = serializers.CharField(required=True, validators=[user_exist])
phone = serializers.CharField(required=False, validators=[phone_exist])
phone = serializers.CharField(required=False, validators=[phone_exist, check_phone_e])
class Meta:
model = User

View File

@ -1,3 +1,4 @@
import logging
import json
import time
from threading import Thread
@ -16,6 +17,7 @@ from django.utils.timezone import now
from apps.third.tapis import dhapis
requests.packages.urllib3.disable_warnings()
myLogger = logging.getLogger('log')
class DhClient:
@ -43,14 +45,17 @@ class DhClient:
'client_id': self.client_id,
'client_secret': self.client_secret
}
r = requests.post(params=params,
url=settings.DAHUA_BASE_URL + '/evo-apigw/evo-oauth/oauth/token', verify=False)
ret = r.json()
if ret['success']:
self.token = ret['data']['access_token']
self.headers['Authorization'] = 'bearer ' + ret['data']['access_token']
self.headers['User-Id'] = '1'
time.sleep(3600)
try:
r = requests.post(params=params,
url=settings.DAHUA_BASE_URL + '/evo-apigw/evo-oauth/oauth/token', verify=False)
ret = r.json()
if ret['success']:
self.token = ret['data']['access_token']
self.headers['Authorization'] = 'bearer ' + ret['data']['access_token']
self.headers['User-Id'] = '1'
time.sleep(1200)
except Exception:
myLogger.error('大华服务连接失败', exc_info=True)
def setup(self):
t = Thread(target=self._get_token_loop, args=(), daemon=True)
@ -169,7 +174,7 @@ class DhClient:
Args:
path (str): 图片地址
返回第一个识别结果
返回识别结果
"""
base64img = convert_to_base64(path)
json_data = {

View File

@ -2,6 +2,7 @@ import time
from threading import Thread
import traceback
import uuid
import logging
import requests
from rest_framework.exceptions import APIException, ParseError
@ -14,6 +15,7 @@ from apps.third.tapis import spapis
from django.utils.timezone import now
requests.packages.urllib3.disable_warnings()
myLogger = logging.getLogger('log')
class SpClient:
@ -42,12 +44,15 @@ class SpClient:
"password": self.password
}
}
r = requests.post(json=params,
url=settings.SP_BASE_URL + '/api/users/login', verify=False)
if r.status_code == 200:
ret = r.json()
self.headers['Authorization'] = 'Bearer ' + ret['user']['token']
time.sleep(3600)
try:
r = requests.post(json=params,
url=settings.SP_BASE_URL + '/api/users/login', verify=False)
if r.status_code == 200:
ret = r.json()
self.headers['Authorization'] = 'Bearer ' + ret['user']['token']
time.sleep(1200)
except Exception:
myLogger.error('喇叭服务连接失败', exc_info=True)
def get_token(self):
self.isGetingToken = True

View File

@ -1,3 +1,4 @@
import logging
import time
from threading import Thread
import traceback
@ -12,6 +13,7 @@ from apps.utils.tools import print_roundtrip
from django.utils.timezone import now
requests.packages.urllib3.disable_warnings()
myLogger = logging.getLogger('log')
class XxClient:
@ -35,11 +37,14 @@ class XxClient:
json = {
'licence': self.licence
}
r = requests.post(json=json, url=settings.XX_BASE_URL + '/getAccessTokenV2', verify=False, timeout=20)
ret = r.json()
if ret.get('errorCode', 1) == 0:
self.token = ret['data']['token']
time.sleep(3600)
try:
r = requests.post(json=json, url=settings.XX_BASE_URL + '/getAccessTokenV2', verify=False, timeout=20)
ret = r.json()
if ret.get('errorCode', 1) == 0:
self.token = ret['data']['token']
time.sleep(1200)
except Exception:
myLogger.error('寻息服务连接失败', exc_info=True)
def get_token(self):
self.isGetingToken = True

View File

@ -1,3 +1,4 @@
import re
import textwrap
import random
import string
@ -6,6 +7,7 @@ from django.conf import settings
import base64
import requests
from io import BytesIO
from rest_framework.serializers import ValidationError
def print_roundtrip(response, *args, **kwargs):
@ -91,3 +93,85 @@ def p_in_poly(p, poly):
# 射线穿过多边形边界的次数为奇数时点在多边形内
return (px, py) if flag else 'out'
def check_id_number_e(val):
is_ok, msg = check_id_number(val)
if is_ok:
return val
raise ValidationError(detail=msg)
def check_id_number(idcard):
"""校验身份证号
Args:
id_number (_type_): 身份证号
"""
Errors = ['身份证号码位数不对!', '身份证号码出生日期超出范围或含有非法字符!', '身份证号码校验错误!', '身份证地区非法!']
area = {"11": "北京", "12": "天津", "13": "河北", "14": "山西", "15": "内蒙古", "21": "辽宁", "22": "吉林", "23": "黑龙江",
"31": "上海", "32": "江苏", "33": "浙江", "34": "安徽", "35": "福建", "36": "江西", "37": "山东", "41": "河南", "42": "湖北",
"43": "湖南", "44": "广东", "45": "广西", "46": "海南", "50": "重庆", "51": "四川", "52": "贵州", "53": "云南", "54": "西藏",
"61": "陕西", "62": "甘肃", "63": "青海", "64": "宁夏", "65": "新疆", "71": "台湾", "81": "香港", "82": "澳门", "91": "国外"}
idcard = str(idcard)
idcard = idcard.strip()
idcard_list = list(idcard)
# 地区校验
if not area[(idcard)[0:2]]:
return False, Errors[4]
# 15位身份号码检测
if len(idcard) == 15:
if ((int(idcard[6:8]) + 1900) % 4 == 0 or (
(int(idcard[6:8]) + 1900) % 100 == 0 and (int(idcard[6:8]) + 1900) % 4 == 0)):
ereg = re.compile(
'[1-9][0-9]{5}[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|[1-2][0-9]))[0-9]{3}$') # //测试出生日期的合法性
else:
ereg = re.compile(
'[1-9][0-9]{5}[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|1[0-9]|2[0-8]))[0-9]{3}$') # //测试出生日期的合法性
if re.match(ereg, idcard):
return True, ''
else:
return False, Errors[2]
# 18位身份号码检测
elif len(idcard) == 18:
# 出生日期的合法性检查
# 闰年月日:((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|[1-2][0-9]))
# 平年月日:((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|1[0-9]|2[0-8]))
if (int(idcard[6:10]) % 4 == 0 or (int(idcard[6:10]) % 100 == 0 and int(idcard[6:10]) % 4 == 0)):
# 闰年出生日期的合法性正则表达式
ereg = re.compile(
'[1-9][0-9]{5}19[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|[1-2][0-9]))[0-9]{3}[0-9Xx]$')
else:
# 平年出生日期的合法性正则表达式
ereg = re.compile(
'[1-9][0-9]{5}19[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|1[0-9]|2[0-8]))[0-9]{3}[0-9Xx]$')
# 测试出生日期的合法性
if re.match(ereg, idcard):
# 计算校验位
S = (int(idcard_list[0]) + int(idcard_list[10])) * 7 + (int(idcard_list[1]) + int(idcard_list[11])) * 9 + (
int(idcard_list[2]) + int(idcard_list[12])) * 10 + (
int(idcard_list[3]) + int(idcard_list[13])) * 5 + (
int(idcard_list[4]) + int(idcard_list[14])) * 8 + (
int(idcard_list[5]) + int(idcard_list[15])) * 4 + (
int(idcard_list[6]) + int(idcard_list[16])) * 2 + int(idcard_list[7]) * 1 + int(
idcard_list[8]) * 6 + int(idcard_list[9]) * 3
Y = S % 11
M = "F"
JYM = "10X98765432"
M = JYM[Y] # 判断校验位
if M == idcard_list[17]: # 检测ID的校验位
return True, ''
else:
return False, Errors[3]
else:
return False, Errors[2]
else:
return False, Errors[1]
def check_phone_e(phone):
re_phone = r'/^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$/'
if not re.match(re_phone, phone):
raise ValidationError('手机号格式错误')
return phone

View File

@ -1,7 +1,3 @@
from distutils import errors
import json
import time
from threading import Thread
import uuid
import requests

View File

@ -1,6 +1,7 @@
from apps.utils.fields import MyFilePathField
from apps.utils.serializers import CustomModelSerializer
from apps.utils.tools import check_phone_e
from apps.vm.models import Visit, Visitor, Vpeople
from rest_framework import serializers
from rest_framework.exceptions import ParseError
@ -28,7 +29,7 @@ class VisitSerializer(CustomModelSerializer):
class VisitorCreateSerializer(CustomModelSerializer):
# phone = serializers.CharField(label="手机号", validators=[phone_check])
phone = serializers.CharField(label="手机号", validators=[check_phone_e])
class Meta:
model = Visitor