factory/apps/qm/views.py

329 lines
12 KiB
Python

from rest_framework.mixins import ListModelMixin
from rest_framework.decorators import action
from rest_framework.exceptions import ParseError
from rest_framework.views import APIView
from rest_framework.serializers import Serializer
from apps.qm.models import QuaStat, NotOkOption, Ftest, Ptest, FtestWork, FtestItem
from apps.qm.models import TestItem, Defect, Qct, QctTestItem, QctMat, QctDefect
from apps.qm.serializers import QuaStatSerializer, TestItemSerializer, QuaStatUpdateSerializer, FtestSerializer, PtestSerializer, \
FtestWorkCreateUpdateSerializer, FtestWorkSerializer, DefectSerializer, QctSerializer, QctTestItemSerializer, QctDefectSerializer, QctMatSerializer, \
QctDetailSerializer, UpdateRatePassTSerializer, QctGetSerializer
from apps.qm.tasks import cal_quastat_sflog
from rest_framework.response import Response
from apps.utils.mixins import BulkUpdateModelMixin
import datetime
from apps.utils.viewsets import CustomGenericViewSet, CustomModelViewSet
from apps.wpm.models import SfLog
from apps.qm.filters import QuaStatFilter, TestItemFilter, FtestWorkFilter, QctFilter, FtestFilter
from django.db import transaction
from apps.qm.services import ftestwork_submit
from apps.utils.thread import MyThread
from apps.wpm.services_2 import get_alldata_with_batch_and_store
from apps.wf.models import State
# Create your views here.
class DefectViewSet(CustomModelViewSet):
"""
缺陷项
缺陷项
"""
queryset = Defect.objects.all()
serializer_class = DefectSerializer
filterset_fields = ["cate", "okcate"]
search_fields = ["name", "code"]
@transaction.atomic
def perform_destroy(self, instance):
QctDefect.objects.filter(defect=instance).delete()
instance.delete()
class QctViewSet(CustomModelViewSet):
"""
检测模板
检测模板
"""
queryset = Qct.objects.all()
serializer_class = QctSerializer
retrieve_serializer_class = QctDetailSerializer
filterset_class = QctFilter
search_fields = ["name", "number"]
@action(methods=['post'], detail=False, perms_map={'post': '*'}, serializer_class=QctGetSerializer)
@transaction.atomic
def get_qct(self, request, *args, **kwargs):
"""
获取检验模板
获取检验模板
"""
data = request.data
qct = Qct.get(data["material"], data["tag"])
return QctSerializer(instance=qct).data
class QctTestItemViewSet(CustomModelViewSet):
"""检测模板项
检测模板项
"""
perms_map = {"get": "*", "post": "qct.update", "put": "qct.update", "delete": "qct.update"}
queryset = QctTestItem.objects.all()
serializer_class = QctTestItemSerializer
select_related_fields = ["qct", "testitem"]
filterset_fields = ["qct", "testitem"]
ordering = ["qct", "sort"]
class QctDefectViewSet(CustomModelViewSet):
"""检测缺陷项
检测缺陷项
"""
perms_map = {"get": "*", "post": "qct.update", "put": "qct.update", "delete": "qct.update"}
queryset = QctDefect.objects.all()
serializer_class = QctDefectSerializer
select_related_fields = ["qct", "defect"]
filterset_fields = ["qct", "defect"]
ordering = ["qct", "sort"]
@transaction.atomic
def perform_create(self, serializer):
ins: QctDefect = serializer.save()
if ins.is_default:
QctDefect.objects.filter(qct=ins.qct).exclude(id=ins.id).update(is_default=False)
def perform_update(self, serializer):
ins: QctDefect = serializer.save()
if ins.is_default:
QctDefect.objects.filter(qct=ins.qct).exclude(id=ins.id).update(is_default=False)
class QctMatViewSet(CustomModelViewSet):
"""检测物料
检测物料
"""
perms_map = {"get": "*", "post": "qct.update", "put": "qct.update", "delete": "qct.update"}
queryset = QctMat.objects.all()
serializer_class = QctMatSerializer
filterset_fields = ["qct", "material"]
class NotOkOptionView(APIView):
perms_map = {'get': '*'}
def get(self, request):
"""
获取不合格项
获取不合格项
"""
res1 = [{'value': i.value, 'label': i.label, 'extra_info': i.get_extra_info(i.value)} for i in NotOkOption]
res2 = {i.value: i.label for i in NotOkOption}
return Response({"res_list": res1, "res_dict": res2})
class TestItemViewSet(CustomModelViewSet):
"""
list:质检项目
质检项目
"""
queryset = TestItem.objects.all()
serializer_class = TestItemSerializer
filterset_class = TestItemFilter
search_fields = ['tags', 'name', 'number', 'mcate_tags']
ordering = ['id']
def add_info_for_list(self, data):
affects_list = [i['affects'] for i in data]
affectIds = []
for item in affects_list:
affectIds.extend(item)
affects = TestItem.objects.filter(id__in=affectIds).values_list('id', 'name')
affects_dict = dict(affects)
for item in data:
affects = item["affects"]
item["affects_name"] = ";".join([affects_dict.get(x, '未知') for x in affects])
return data
@transaction.atomic
def perform_destroy(self, instance):
QctTestItem.objects.filter(testitem=instance).delete()
instance.delete()
class QuaStatViewSet(ListModelMixin, BulkUpdateModelMixin, CustomGenericViewSet):
"""
list:质量数据统计
质量数据统计
"""
perms_map = {'get': '*', 'put': 'quastat.update'}
queryset = QuaStat.objects.all()
serializer_class = QuaStatSerializer
update_serializer_class = QuaStatUpdateSerializer
filterset_class = QuaStatFilter
select_related_fields = ['belong_dept', 'material', 'testitem']
ordering = ['belong_dept__sort', 'material__sort', 'testitem__sort']
def after_bulk_update(self, objs):
now = datetime.datetime.now()
sflogIds = []
for i in objs:
sflogIds.append(i['sflog'])
sflogIds = list(set(sflogIds))
SfLog.objects.filter(id__in=sflogIds).update(
last_test_time=now) # 更新质检记录时间
for sflogId in sflogIds:
cal_quastat_sflog.delay(sflogId)
# 根据物料ID和检测项目ID rate_pass_t
@action(detail=False, methods=['post'], url_path='update_rate_pass_t', perms_map={'post': 'quastat.submit'})
def update_rate_pass_t(self, request, *args, **kwargs):
# material_id = request.data.get("material_id")
# testitem_id = request.data.get("testitem_id")
# new_rate_pass_t = request.data.get("rate_pass_t")
# # update_start = datetime.datetime.strptime(request.data.get("update_start_time"), "%Y-%m-%d")
# # update_end = datetime.datetime.strptime(request.data.get("update_end_time"), "%Y-%m-%d")
# month_s = request.data.get("month_s")
serializer = UpdateRatePassTSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
material_id = serializer.validated_data.get("material_id")
testitem_id = serializer.validated_data.get("testitem_id")
new_rate_pass_t = serializer.validated_data.get("rate_pass_t")
month_s = serializer.validated_data.get("month_s")
if not (material_id and testitem_id and new_rate_pass_t is not None):
return Response(
{"detail": "Missing required fields: material_id, testitem_id, rate_pass_t"},
status=400
)
# 先根据查询物料查询 sflogids 然后再过滤 work_data ,返回在时间段内的sflogids,再更新quaStat
qua_data = QuaStat.objects.filter(
material_id=material_id,
testitem_id=testitem_id,
sflog__work_date__month=month_s,
)
sflog_ids = qua_data.values_list('sflog', flat=True).distinct()
update_count = qua_data.update(rate_pass_t=new_rate_pass_t)
for i in sflog_ids:
cal_quastat_sflog.delay(i)
return Response({"rate_pass_t": new_rate_pass_t}, status=200)
class FtestViewSet(CustomModelViewSet):
"""
list:首件/成品检验
首件/成品检验
"""
queryset = Ftest.objects.all()
serializer_class = FtestSerializer
select_related_fields = ['test_user', 'check_user', 'ftest_work']
filterset_class = FtestFilter
def count_sampling(self, ftest_work:FtestWork):
qs = Ftest.objects.filter(ftest_work=ftest_work)
all_count = qs.count()
ok_count = qs.filter(is_ok=True).count()
ftest_work.count_sampling = all_count
ftest_work.count_sampling_ok = ok_count
if ftest_work.type2 == FtestWork.TYPE2_ALL: # 如果是全检
ftest_work.count_ok = ok_count
ftest_work.count_notok = all_count - ok_count
ftest_work.save()
@transaction.atomic
def perform_create(self, serializer):
ins: Ftest = serializer.save()
if ins.ftest_work:
self.count_sampling(ins.ftest_work)
@transaction.atomic
def perform_update(self, serializer):
ins: Ftest = serializer.save()
if ins.ftest_work:
self.count_sampling(ins.ftest_work)
@transaction.atomic
def perform_destroy(self, instance):
ftest_work = instance.ftest_work
instance.delete()
if ftest_work:
self.count_sampling(ftest_work)
class PtestViewSet(CustomModelViewSet):
"""
list:性能测试
性能测试
"""
queryset = Ptest.objects.all()
serializer_class = PtestSerializer
select_related_fields = ['testitem']
filterset_fields = ['testitem', 'test_date', 'sample_number']
search_fields = ['testitem__name', 'sample_number']
class FtestWorkViewSet(CustomModelViewSet):
"""
list: 检验工作
检验工作
"""
queryset = FtestWork.objects.all()
serializer_class = FtestWorkSerializer
create_serializer_class = FtestWorkCreateUpdateSerializer
update_serializer_class = FtestWorkCreateUpdateSerializer
select_related_fields = ['material', 'mb', 'mb__material']
filterset_class = FtestWorkFilter
def update(self, request, *args, **kwargs):
ins:FtestWork = self.get_object()
if ins.submit_time is not None:
raise ParseError('已提交无法修改')
if ins.ticket and ins.ticket.state.type != State.STATE_TYPE_START:
raise ParseError('审批单已进行,无法修改')
x = super().update(request, *args, **kwargs)
# 触发批次统计分析
if ins.batch:
MyThread(target=get_alldata_with_batch_and_store, args=(ins.batch,)).start()
return x
def destroy(self, request, *args, **kwargs):
ins:FtestWork = self.get_object()
if ins.submit_time is not None:
raise ParseError('已提交无法删除')
if ins.ticket:
raise ParseError('存在审批, 无法删除')
x = super().destroy(request, *args, **kwargs)
# 触发批次统计分析
if ins.batch:
MyThread(target=get_alldata_with_batch_and_store, args=(ins.batch,)).start()
return x
def perform_create(self, serializer):
ins = serializer.save()
# 触发批次统计分析
if ins.batch:
MyThread(target=get_alldata_with_batch_and_store, args=(ins.batch,)).start()
@action(methods=['post'], detail=True, perms_map={'post': 'ftestwork.submit'}, serializer_class=Serializer)
@transaction.atomic
def submit(self, request, *args, **kwargs):
"""提交检验工作
提交检验工作
"""
ins:FtestWork = self.get_object()
if ins.ticket:
raise ParseError('该检验工作存在审批!')
if ins.wm is None:
raise ParseError('该检验工作未关联车间库存')
if ins.submit_time is None:
ftestwork_submit(ins, request.user)
else:
raise ParseError('该检验工作已提交')
return Response()