This commit is contained in:
zty 2024-10-29 09:36:33 +08:00
commit a563566623
10 changed files with 169 additions and 137 deletions

View File

@ -1,61 +1,67 @@
from apps.cm.models import LableMat
from rest_framework.decorators import action
from apps.cm.serializers import TidSerializer, LabelMatSerializer
from apps.inm.models import MaterialBatch
from apps.inm.models import MaterialBatch, MIOItem
from apps.wpm.models import WMaterial
from rest_framework.exceptions import ParseError, NotFound
from rest_framework.response import Response
from apps.utils.viewsets import CustomGenericViewSet, RetrieveModelMixin, CustomListModelMixin
# Create your views here.
class LableMatViewSet(CustomListModelMixin, RetrieveModelMixin, CustomGenericViewSet):
perms_map = {"post": "*", "get": "*"}
serializer_class = LabelMatSerializer
queryset = LableMat.objects.all()
select_related_fields = ["material", "material_origin", "supplier"]
@action(methods=['post'], detail=False, serializer_class=TidSerializer)
@action(methods=["post"], detail=False, serializer_class=TidSerializer)
def get_from_mb(self, request, pk=None):
"""
从仓库明细获取物料标签
从仓库明细获取物料标签
"""
tid = request.data.get('tid')
tid = request.data.get("tid")
try:
mb = MaterialBatch.objects.get(id=tid)
except MaterialBatch.DoesNotExist:
raise NotFound('仓库明细不存在')
obj, _ = LableMat.objects.get_or_create(
state=10,
material=mb.material,
batch=mb.batch,
defaults={
"supplier": mb.supplier
}
)
raise NotFound("仓库明细不存在")
obj, _ = LableMat.objects.get_or_create(state=10, material=mb.material, batch=mb.batch, defaults={"supplier": mb.supplier})
rdata = LabelMatSerializer(obj).data
rdata["code_label"] = f"mat:{obj.id}"
return Response(rdata)
@action(methods=['post'], detail=False, serializer_class=TidSerializer)
@action(methods=["post"], detail=False, serializer_class=TidSerializer)
def get_from_wm(self, request, pk=None):
"""
从车间库存明细获取物料标签
从车间库存明细获取物料标签
"""
tid = request.data.get('tid')
tid = request.data.get("tid")
try:
wm = WMaterial.objects.get(id=tid)
except WMaterial.DoesNotExist:
raise NotFound('车间库存不存在')
obj, _ = LableMat.objects.get_or_create(
state=wm.state,
material=wm.material,
batch=wm.batch,
notok_sign=wm.notok_sign,
material_origin=wm.material_origin)
raise NotFound("车间库存不存在")
obj, _ = LableMat.objects.get_or_create(state=wm.state, material=wm.material, batch=wm.batch, notok_sign=wm.notok_sign, material_origin=wm.material_origin)
rdata = LabelMatSerializer(obj).data
rdata["code_label"] = f"mat:{obj.id}"
return Response(rdata)
return Response(rdata)
@action(methods=["post"], detail=False, serializer_class=TidSerializer)
def get_from_mioitem(self, request, pk=None):
"""
从出入库明细获取物料标签
从出入库明细获取物料标签
"""
tid = request.data.get("tid")
try:
mioitem: MIOItem = MIOItem.objects.get(id=tid)
except MIOItem.DoesNotExist:
raise NotFound("出入库明细不存在")
obj, _ = LableMat.objects.get_or_create(state=10, material=mioitem.material, batch=mioitem.batch, defaults={"supplier": mioitem.mio.supplier})
rdata = LabelMatSerializer(obj).data
rdata["code_label"] = f"mat:{obj.id}"
return Response(rdata)

View File

@ -10,14 +10,11 @@ def backup_database():
备份数据库
"""
import datetime
name = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
name = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
command = 'echo "{}" | sudo -S pg_dump "user={} password={} dbname={}" > {}/bak_{}.sql'.format(
SD_PWD,
DATABASES['default']['USER'],
DATABASES['default']['PASSWORD'],
DATABASES['default']['NAME'],
BACKUP_PATH + '/database',
name)
SD_PWD, DATABASES["default"]["USER"], DATABASES["default"]["PASSWORD"], DATABASES["default"]["NAME"], BACKUP_PATH + "/database", name
)
completed = subprocess.run(command, shell=True, capture_output=True, text=True)
if completed.returncode != 0:
return completed.stderr
@ -25,7 +22,7 @@ def backup_database():
@shared_task
def reload_server_git():
command = 'bash {}/git_server.sh'.format(SH_PATH)
command = "bash {}/git_server.sh".format(SH_PATH)
completed = subprocess.run(command, shell=True, capture_output=True, text=True)
if completed.returncode != 0:
return completed.stderr
@ -33,7 +30,7 @@ def reload_server_git():
@shared_task
def reload_web_git():
command = 'bash {}/git_web.sh'.format(SH_PATH)
command = "bash {}/git_web.sh".format(SH_PATH)
completed = subprocess.run(command, shell=True, capture_output=True, text=True)
if completed.returncode != 0:
return completed.stderr
@ -48,7 +45,17 @@ def reload_server_only():
@shared_task
def backup_media():
command = 'bash {}/backup_media.sh'.format(SH_PATH)
command = "bash {}/backup_media.sh".format(SH_PATH)
completed = subprocess.run(command, shell=True, capture_output=True, text=True)
if completed.returncode != 0:
return completed.stderr
@shared_task
def test_task():
import time
x = 1
while x < 300:
print(f"test_{x}")
time.sleep(1)
x += 1

View File

@ -0,0 +1,18 @@
# Generated by Django 3.2.12 on 2024-10-23 08:29
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('edu', '0003_alter_train_level'),
]
operations = [
migrations.AddField(
model_name='paper',
name='type',
field=models.PositiveSmallIntegerField(choices=[(10, '固定试卷'), (20, '随机试卷')], default=10, verbose_name='类型'),
),
]

View File

@ -4,39 +4,41 @@ from apps.system.models import User, Dept, File
# Create your models here.
class Train(CommonADModel):
class Train(CommonADModel):
T_L_POST = 10
T_L_TEAM = 20
T_L_DEPT = 30
T_L_COMPANY = 40
level = models.PositiveSmallIntegerField(verbose_name='级别', choices=((T_L_POST, '岗位'), (T_L_TEAM, '班组'), (T_L_DEPT, '部门'), (T_L_COMPANY, '公司')))
name = models.CharField(max_length=200, verbose_name='名称')
place = models.CharField(max_length=200, verbose_name='地点', default='', blank=True)
start_time = models.DateTimeField(verbose_name='开始时间')
end_time = models.DateTimeField(verbose_name='结束时间')
duration = models.PositiveIntegerField(verbose_name='时长', default=0, help_text='单位:s', editable=False)
description = models.TextField(verbose_name='内容描述', default='', blank=True)
is_public = models.BooleanField('是否公开', default=False)
files = models.ManyToManyField(File, verbose_name='附件', blank=True)
level = models.PositiveSmallIntegerField(verbose_name="级别", choices=((T_L_POST, "岗位"), (T_L_TEAM, "班组"), (T_L_DEPT, "部门"), (T_L_COMPANY, "公司")))
name = models.CharField(max_length=200, verbose_name="名称")
place = models.CharField(max_length=200, verbose_name="地点", default="", blank=True)
start_time = models.DateTimeField(verbose_name="开始时间")
end_time = models.DateTimeField(verbose_name="结束时间")
duration = models.PositiveIntegerField(verbose_name="时长", default=0, help_text="单位:s", editable=False)
description = models.TextField(verbose_name="内容描述", default="", blank=True)
is_public = models.BooleanField("是否公开", default=False)
files = models.ManyToManyField(File, verbose_name="附件", blank=True)
class Meta:
verbose_name = '线下培训'
verbose_name = "线下培训"
verbose_name_plural = verbose_name
class Questioncat(CommonAModel):
name = models.CharField(max_length=200, verbose_name='名称')
parent = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, verbose_name='父级', related_name='cate_parent')
sort = models.PositiveIntegerField(default=0, verbose_name='排序')
name = models.CharField(max_length=200, verbose_name="名称")
parent = models.ForeignKey("self", on_delete=models.SET_NULL, null=True, blank=True, verbose_name="父级", related_name="cate_parent")
sort = models.PositiveIntegerField(default=0, verbose_name="排序")
class Meta:
verbose_name = '题库分类'
verbose_name = "题库分类"
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class Question(CommonAModel):
Q_DAN = 10
Q_DUO = 20
@ -44,97 +46,106 @@ class Question(CommonAModel):
Q_EASY = 10
Q_SIMPLE = 20
Q_HARD = 30
name = models.TextField('题干')
img = models.TextField('题干图片', null=True, blank=True)
type = models.PositiveSmallIntegerField('题型', choices=((Q_DAN, '单选'), (Q_DUO, '多选'), (Q_PAN, '判断')))
level = models.PositiveSmallIntegerField('难度', default=Q_SIMPLE, choices=((Q_EASY, '简单'), (Q_SIMPLE, '一般'), (Q_HARD, '困难')))
questioncat = models.ForeignKey(Questioncat, blank=True, null=True, on_delete=models.SET_NULL, verbose_name='所属题库', related_name='questioncat')
options = models.JSONField('选项')
right = models.JSONField('正确答案')
resolution = models.TextField('解析', null=True, blank=True)
enabled = models.BooleanField('是否启用', default=False)
name = models.TextField("题干")
img = models.TextField("题干图片", null=True, blank=True)
type = models.PositiveSmallIntegerField("题型", choices=((Q_DAN, "单选"), (Q_DUO, "多选"), (Q_PAN, "判断")))
level = models.PositiveSmallIntegerField("难度", default=Q_SIMPLE, choices=((Q_EASY, "简单"), (Q_SIMPLE, "一般"), (Q_HARD, "困难")))
questioncat = models.ForeignKey(Questioncat, blank=True, null=True, on_delete=models.SET_NULL, verbose_name="所属题库", related_name="questioncat")
options = models.JSONField("选项")
right = models.JSONField("正确答案")
resolution = models.TextField("解析", null=True, blank=True)
enabled = models.BooleanField("是否启用", default=False)
class Meta:
verbose_name = '题目'
verbose_name = "题目"
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class Paper(CommonAModel):
name = models.CharField(max_length=200, verbose_name='名称')
questions = models.ManyToManyField(Question, through='PaperQuestion')
limit = models.IntegerField(default=0, verbose_name='限时(分钟)')
total_score = models.FloatField(default=0, verbose_name='满分')
pass_score = models.FloatField(default=0, verbose_name='通过分数')
danxuan_count = models.IntegerField(default=0, verbose_name='单选数量')
danxuan_score = models.FloatField(default=2, verbose_name='单选分数')
duoxuan_count = models.IntegerField(default=0, verbose_name='多选数量')
duoxuan_score = models.FloatField(default=4, verbose_name='多选分数')
panduan_count = models.IntegerField(default=0, verbose_name='判断数量')
panduan_score = models.FloatField(default=2, verbose_name='判断分数')
PAPER_FIX = 10 # 固定试卷
PAPER_RANDOM = 20 # 自动抽题
name = models.CharField(max_length=200, verbose_name="名称")
type = models.PositiveSmallIntegerField("类型", default=10, choices=((PAPER_FIX, "固定试卷"), (PAPER_RANDOM, "随机试卷")))
questions = models.ManyToManyField(Question, through="PaperQuestion")
limit = models.IntegerField(default=0, verbose_name="限时(分钟)")
total_score = models.FloatField(default=0, verbose_name="满分")
pass_score = models.FloatField(default=0, verbose_name="通过分数")
danxuan_count = models.IntegerField(default=0, verbose_name="单选数量")
danxuan_score = models.FloatField(default=2, verbose_name="单选分数")
duoxuan_count = models.IntegerField(default=0, verbose_name="多选数量")
duoxuan_score = models.FloatField(default=4, verbose_name="多选分数")
panduan_count = models.IntegerField(default=0, verbose_name="判断数量")
panduan_score = models.FloatField(default=2, verbose_name="判断分数")
class Meta:
verbose_name = '押题卷'
verbose_name = "押题卷"
verbose_name_plural = verbose_name
def __str__(self):
return self.name
@property
def detail(self):
return PaperQuestion.objects.filter(paper=self)
class PaperQuestion(BaseModel):
paper = models.ForeignKey(Paper, on_delete=models.CASCADE, verbose_name='试卷')
question = models.ForeignKey(Question, on_delete=models.CASCADE, verbose_name='试题')
total_score = models.FloatField(default=0, verbose_name='单题满分')
sort = models.IntegerField(default=0, verbose_name='排序')
paper = models.ForeignKey(Paper, on_delete=models.CASCADE, verbose_name="试卷")
question = models.ForeignKey(Question, on_delete=models.CASCADE, verbose_name="试题")
total_score = models.FloatField(default=0, verbose_name="单题满分")
sort = models.IntegerField(default=0, verbose_name="排序")
class Exam(CommonADModel):
name = models.CharField('名称', max_length=100)
open_time = models.DateTimeField('开启时间')
close_time = models.DateTimeField('关闭时间', null=True, blank=True)
chance = models.IntegerField('考试机会', default=1) # 0表示不限次数
paper = models.ForeignKey(Paper, verbose_name='使用的试卷', on_delete=models.SET_NULL, null=True, blank=True)
is_public = models.BooleanField('是否公开', default=False)
p_users = models.ManyToManyField(User, verbose_name='指定用户', blank=True)
p_depts = models.ManyToManyField(Dept, verbose_name='指定部门', blank=True)
name = models.CharField("名称", max_length=100)
open_time = models.DateTimeField("开启时间")
close_time = models.DateTimeField("关闭时间", null=True, blank=True)
chance = models.IntegerField("考试机会", default=1) # 0表示不限次数
paper = models.ForeignKey(Paper, verbose_name="使用的试卷", on_delete=models.SET_NULL, null=True, blank=True)
is_public = models.BooleanField("是否公开", default=False)
p_users = models.ManyToManyField(User, verbose_name="指定用户", blank=True)
p_depts = models.ManyToManyField(Dept, verbose_name="指定部门", blank=True)
class Meta:
verbose_name = '在线考试'
verbose_name = "在线考试"
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class ExamRecord(CommonADModel):
'''
"""
考试记录表
'''
exam = models.ForeignKey(Exam, on_delete=models.CASCADE, verbose_name='关联考试', related_name='record_exam')
score = models.FloatField(default=0, verbose_name='得分')
took = models.IntegerField(default=0, verbose_name='耗时(秒)')
start_time = models.DateTimeField(verbose_name='开始答题时间')
end_time = models.DateTimeField(verbose_name='结束答题时间', null=True, blank=True)
questions = models.ManyToManyField(Question, verbose_name='答题记录', through='AnswerDetail')
is_pass = models.BooleanField(default=True, verbose_name='是否通过')
is_submited = models.BooleanField(default=False, verbose_name='是否提交')
is_last = models.BooleanField(default=False, verbose_name='是否最后一次考试记录')
"""
exam = models.ForeignKey(Exam, on_delete=models.CASCADE, verbose_name="关联考试", related_name="record_exam")
score = models.FloatField(default=0, verbose_name="得分")
took = models.IntegerField(default=0, verbose_name="耗时(秒)")
start_time = models.DateTimeField(verbose_name="开始答题时间")
end_time = models.DateTimeField(verbose_name="结束答题时间", null=True, blank=True)
questions = models.ManyToManyField(Question, verbose_name="答题记录", through="AnswerDetail")
is_pass = models.BooleanField(default=True, verbose_name="是否通过")
is_submited = models.BooleanField(default=False, verbose_name="是否提交")
is_last = models.BooleanField(default=False, verbose_name="是否最后一次考试记录")
class Meta:
verbose_name = '考试记录'
verbose_name = "考试记录"
verbose_name_plural = verbose_name
class AnswerDetail(BaseModel):
examrecord = models.ForeignKey(ExamRecord, on_delete=models.CASCADE, related_name='detail_er')
total_score = models.FloatField(default=0, verbose_name='该题满分')
examrecord = models.ForeignKey(ExamRecord, on_delete=models.CASCADE, related_name="detail_er")
total_score = models.FloatField(default=0, verbose_name="该题满分")
question = models.ForeignKey(Question, on_delete=models.CASCADE)
user_answer = models.JSONField(null=True,blank=True)
score = models.FloatField(default=0, verbose_name='本题得分')
is_right = models.BooleanField(default=False, verbose_name='是否正确')
user_answer = models.JSONField(null=True, blank=True)
score = models.FloatField(default=0, verbose_name="本题得分")
is_right = models.BooleanField(default=False, verbose_name="是否正确")
class Meta:
verbose_name = '答题记录'
verbose_name_plural = verbose_name
verbose_name = "答题记录"
verbose_name_plural = verbose_name

View File

@ -32,7 +32,7 @@ def close_rpj_by_leave_time():
for m in Rpjmember.objects.filter(rpj__in=rpjs):
rpj_member_leave(m)
@shared_task(base=CustomTask)
def check_remployee_leave():
"""
检查相关方是否已经离开
@ -40,4 +40,9 @@ def check_remployee_leave():
now = datetime.now()
now_str = now.strftime('%Y-%m-%d %H:%M:%S')
from apps.hrm.models import Employee
Employee.objects.filter(type='remployee', third_info__dh_face_card_end__lte=now_str).update(job_state=Employee.JOB_OFF)
eobjs = Employee.objects.filter(type='remployee', third_info__dh_face_card_end__lte=now_str)
Employee.objects.filter(type='remployee', third_info__dh_face_card_end__lte=now_str).update(job_state=Employee.JOB_OFF)
for e in eobjs:
if e.user:
e.user.is_deleted = True
e.user.save()

View File

@ -58,7 +58,7 @@ class RpartyViewSet(CustomModelViewSet):
分配账号
"""
obj = self.get_object()
post = Post.objects.get(code='remployee')
post = Post.objects.get(code='remployee_m')
serializer = RpartyAssignSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
vdata = serializer.validated_data

View File

@ -12,10 +12,4 @@ class SystemConfig(AppConfig):
from server.settings import get_sysconfig
get_sysconfig(reload=True)
cache.set('cache_sysconfig_need_task', False, timeout=30)
if cache.get('cache_alldperms_task', True):
from apps.utils.permission import get_alld_perms
get_alld_perms(update_cache=True)
cache.set('cache_alldperms_task', False, timeout=30)
return super().ready()

View File

@ -1,5 +1,3 @@
import configparser
import os
import importlib
import json
from drf_yasg import openapi
@ -24,7 +22,7 @@ from apps.system.filters import DeptFilterSet, UserFilterSet
# from django_q.models import Task as QTask, Schedule as QSchedule
from apps.utils.mixins import (CustomCreateModelMixin, MyLoggingMixin)
from django.conf import settings
from apps.utils.permission import ALL_PERMS, get_user_perms_map, get_alld_perms
from apps.utils.permission import ALL_PERMS, get_user_perms_map
from apps.utils.viewsets import CustomGenericViewSet, CustomModelViewSet
from server.celery import app as celery_app
from .models import (Dept, Dictionary, DictType, File, Permission, Post, PostRole, Role, User,
@ -44,7 +42,7 @@ import locale
from drf_yasg.utils import swagger_auto_schema
from server.settings import get_sysconfig, update_sysconfig, update_dict
from apps.utils.constants import DEFAULT_PWD
from apps.utils.thread import MyThread
from django.core.cache import cache
# logger.info('请求成功! response_code:{}response_headers:{}
# response_body:{}'.format(response_code, response_headers, response_body[:251]))
@ -321,15 +319,15 @@ class PermissionViewSet(CustomModelViewSet):
def perform_create(self, serializer):
super().perform_create(serializer)
MyThread(target=get_alld_perms, kwargs={"update_cache": True}).start_p()
cache.delete('perms_alld_list')
def perform_update(self, serializer):
super().perform_update(serializer)
MyThread(target=get_alld_perms, kwargs={"update_cache": True}).start_p()
cache.delete('perms_alld_list')
def perform_destroy(self, instance):
super().perform_destroy(instance)
MyThread(target=get_alld_perms, kwargs={"update_cache": True}).start_p()
cache.delete('perms_alld_list')
class DeptViewSet(CustomModelViewSet):
"""部门-增删改查

View File

@ -13,7 +13,7 @@ ALL_PERMS = [
# 数据库里定义的权限标识
def get_alld_perms(update_cache=False) -> List[str]:
key = "perms_alld_list"
perms_alld_list = cache.get(key)
perms_alld_list = cache.get(key, None)
if perms_alld_list is None or update_cache:
nested_list = Permission.objects.all().values_list('codes', flat=True)
perms_alld_list = list(set([item for sublist in nested_list for item in sublist]))
@ -26,7 +26,8 @@ def get_user_perms_map(user, update_cache=False):
获取权限字典,可用redis存取(包括功能和数据权限)
"""
key = f'perms_{str(user.id)}'
if cache.get(key) is None or update_cache:
user_perms_map = cache.get(key, None)
if user_perms_map is None or update_cache:
user_perms_map = {}
if user.is_superuser:
codes = get_alld_perms()
@ -58,9 +59,7 @@ def has_perm(user: User, perm_codes: List[str]):
"""
返回用户是否具有给定权限列表中的权限
"""
user_perms_map = cache.get(f'perms_{user.id}', None)
if user_perms_map is None:
user_perms_map = get_user_perms_map(user)
user_perms_map = get_user_perms_map(user)
for item in perm_codes:
if item in user_perms_map:
return True
@ -81,9 +80,7 @@ class RbacPermission(BasePermission):
"""
if not hasattr(view, 'perms_map'):
return True
user_perms_map = cache.get('perms_' + request.user.id, None)
if user_perms_map is None:
user_perms_map = get_user_perms_map(request.user)
user_perms_map = get_user_perms_map(request.user)
if isinstance(user_perms_map, dict):
perms_map = view.perms_map
_method = request._request.method.lower()
@ -125,9 +122,7 @@ class RbacDataMixin:
if hasattr(queryset.model, 'belong_dept'):
user = self.request.user
user_perms_map = cache.get('perms_' + user.id, None)
if user_perms_map is None:
user_perms_map = get_user_perms_map(self.request.user)
user_perms_map = get_user_perms_map(self.request.user)
if isinstance(user_perms_map, dict):
if hasattr(self.view, 'perms_map'):
perms_map = self.view.perms_map

View File

@ -116,9 +116,7 @@ class CustomGenericViewSet(MyLoggingMixin, GenericViewSet):
user = self.request.user
if user.is_superuser:
return queryset
user_perms_map = cache.get('perms_' + str(user.id), None)
if user_perms_map is None:
user_perms_map = get_user_perms_map(self.request.user)
user_perms_map = get_user_perms_map(self.request.user)
if isinstance(user_perms_map, dict):
if hasattr(self, 'perms_map'):
perms_map = self.perms_map