factory/apps/qm/views.py

203 lines
6.9 KiB
Python

from rest_framework.mixins import ListModelMixin
from rest_framework.decorators import action
from rest_framework.exceptions import ParseError
from rest_framework.views import APIView
from rest_framework.serializers import Serializer
from apps.qm.models import QuaStat, TestItem, Ftest, Ptest, FtestWork
from apps.qm.serializers import QuaStatSerializer, TestItemSerializer, QuaStatUpdateSerializer, FtestSerializer, PtestSerializer, \
FtestWorkCreateUpdateSerializer, FtestWorkSerializer, DefectSerializer
from apps.qm.tasks import cal_quastat_sflog
from rest_framework.response import Response
from apps.utils.mixins import BulkUpdateModelMixin
import datetime
from apps.utils.viewsets import CustomGenericViewSet, CustomModelViewSet
from apps.wpm.models import SfLog
from apps.qm.filters import QuaStatFilter, TestItemFilter, FtestWorkFilter
from django.db import transaction
from apps.qm.models import NotOkOption, Defect
from apps.qm.services import ftestwork_submit
from apps.utils.thread import MyThread
from apps.wpm.services_2 import get_alldata_with_batch_and_store
from apps.wf.models import State
# Create your views here.
class DefectViewSet(CustomModelViewSet):
"""
list:缺陷项
缺陷项
"""
queryset = Defect.objects.all()
serializer_class = DefectSerializer
filterset_fields = ["cate", "okcate"]
search_fields = ["name", "code"]
class NotOkOptionView(APIView):
perms_map = {'get': '*'}
def get(self, request):
"""
获取不合格项
获取不合格项
"""
res1 = [{'value': i.value, 'label': i.label, 'extra_info': i.get_extra_info(i.value)} for i in NotOkOption]
res2 = {i.value: i.label for i in NotOkOption}
return Response({"res_list": res1, "res_dict": res2})
class TestItemViewSet(CustomModelViewSet):
"""
list:质检项目
质检项目
"""
queryset = TestItem.objects.all()
serializer_class = TestItemSerializer
filterset_class = TestItemFilter
search_fields = ['tags', 'name', 'number', 'mcate_tags']
ordering = ['id']
class QuaStatViewSet(ListModelMixin, BulkUpdateModelMixin, CustomGenericViewSet):
"""
list:质量数据统计
质量数据统计
"""
perms_map = {'get': '*', 'put': 'quastat.update'}
queryset = QuaStat.objects.all()
serializer_class = QuaStatSerializer
update_serializer_class = QuaStatUpdateSerializer
filterset_class = QuaStatFilter
select_related_fields = ['belong_dept', 'material', 'testitem']
ordering = ['belong_dept__sort', 'material__sort', 'testitem__sort']
def after_bulk_update(self, objs):
now = datetime.datetime.now()
sflogIds = []
for i in objs:
sflogIds.append(i['sflog'])
sflogIds = list(set(sflogIds))
SfLog.objects.filter(id__in=sflogIds).update(
last_test_time=now) # 更新质检记录时间
for sflogId in sflogIds:
cal_quastat_sflog.delay(sflogId)
class FtestViewSet(CustomModelViewSet):
"""
list:首件/成品检验
首件/成品检验
"""
queryset = Ftest.objects.all()
serializer_class = FtestSerializer
select_related_fields = ['test_user', 'check_user', 'ftest_work']
filterset_fields = ['type', 'ftest_work']
def count_sampling(self, ftest_work:FtestWork):
qs = Ftest.objects.filter(ftest_work=ftest_work)
all_count = qs.count()
ok_count = qs.filter(is_ok=True).count()
ftest_work.count_sampling = all_count
ftest_work.count_sampling_ok = ok_count
if ftest_work.type2 == FtestWork.TYPE2_ALL: # 如果是全检
ftest_work.count_ok = ok_count
ftest_work.count_notok = all_count - ok_count
ftest_work.save()
@transaction.atomic
def perform_create(self, serializer):
ins: Ftest = serializer.save()
if ins.ftest_work:
self.count_sampling(ins.ftest_work)
@transaction.atomic
def perform_update(self, serializer):
ins: Ftest = serializer.save()
if ins.ftest_work:
self.count_sampling(ins.ftest_work)
@transaction.atomic
def perform_destroy(self, instance):
ftest_work = instance.ftest_work
instance.delete()
if ftest_work:
self.count_sampling(ftest_work)
class PtestViewSet(CustomModelViewSet):
"""
list:性能测试
性能测试
"""
queryset = Ptest.objects.all()
serializer_class = PtestSerializer
select_related_fields = ['testitem']
filterset_fields = ['testitem', 'test_date', 'sample_number']
search_fields = ['testitem__name', 'sample_number']
class FtestWorkViewSet(CustomModelViewSet):
"""
list: 检验工作
检验工作
"""
queryset = FtestWork.objects.all()
serializer_class = FtestWorkSerializer
create_serializer_class = FtestWorkCreateUpdateSerializer
update_serializer_class = FtestWorkCreateUpdateSerializer
select_related_fields = ['material', 'mb', 'mb__material']
filterset_class = FtestWorkFilter
def update(self, request, *args, **kwargs):
ins:FtestWork = self.get_object()
if ins.submit_time is not None:
raise ParseError('已提交无法修改')
if ins.ticket and ins.ticket.state.type != State.STATE_TYPE_START:
raise ParseError('审批单已进行,无法修改')
x = super().update(request, *args, **kwargs)
# 触发批次统计分析
if ins.batch:
MyThread(target=get_alldata_with_batch_and_store, args=(ins.batch,)).start()
return x
def destroy(self, request, *args, **kwargs):
ins:FtestWork = self.get_object()
if ins.submit_time is not None:
raise ParseError('已提交无法删除')
if ins.ticket:
raise ParseError('存在审批, 无法删除')
x = super().destroy(request, *args, **kwargs)
# 触发批次统计分析
if ins.batch:
MyThread(target=get_alldata_with_batch_and_store, args=(ins.batch,)).start()
return x
def perform_create(self, serializer):
ins = serializer.save()
# 触发批次统计分析
if ins.batch:
MyThread(target=get_alldata_with_batch_and_store, args=(ins.batch,)).start()
@action(methods=['post'], detail=True, perms_map={'post': 'ftestwork.submit'}, serializer_class=Serializer)
@transaction.atomic
def submit(self, request, *args, **kwargs):
"""提交检验工作
提交检验工作
"""
ins:FtestWork = self.get_object()
if ins.ticket:
raise ParseError('该检验工作存在审批!')
if ins.wm is None:
raise ParseError('该检验工作未关联车间库存')
if ins.submit_time is None:
ftestwork_submit(ins, request.user)
else:
raise ParseError('该检验工作已提交')
return Response()