from rest_framework.mixins import ListModelMixin from rest_framework.decorators import action from rest_framework.exceptions import ParseError from rest_framework.views import APIView from rest_framework.serializers import Serializer from apps.qm.models import QuaStat, NotOkOption, Ftest, Ptest, FtestWork, FtestItem from apps.qm.models import TestItem, Defect, Qct, QctTestItem, QctMat, QctDefect from apps.qm.serializers import QuaStatSerializer, TestItemSerializer, QuaStatUpdateSerializer, FtestSerializer, PtestSerializer, \ FtestWorkCreateUpdateSerializer, FtestWorkSerializer, DefectSerializer, QctSerializer, QctTestItemSerializer, QctDefectSerializer, QctMatSerializer, \ QctDetailSerializer, UpdateRatePassTSerializer, QctGetSerializer from apps.qm.tasks import cal_quastat_sflog from rest_framework.response import Response from apps.utils.mixins import BulkUpdateModelMixin import datetime from apps.utils.viewsets import CustomGenericViewSet, CustomModelViewSet from apps.wpm.models import SfLog from apps.qm.filters import QuaStatFilter, TestItemFilter, FtestWorkFilter, QctFilter, FtestFilter from django.db import transaction from apps.qm.services import ftestwork_submit from apps.wpm.services_2 import ana_batch_thread from apps.wf.models import State # Create your views here. class DefectViewSet(CustomModelViewSet): """ 缺陷项 缺陷项 """ queryset = Defect.objects.all() serializer_class = DefectSerializer filterset_fields = ["cate", "okcate"] search_fields = ["name", "code"] def perform_destroy(self, instance): QctDefect.objects.filter(defect=instance).delete() instance.delete() class QctViewSet(CustomModelViewSet): """ 检测模板 检测模板 """ queryset = Qct.objects.all() serializer_class = QctSerializer retrieve_serializer_class = QctDetailSerializer filterset_class = QctFilter search_fields = ["name", "number"] ordering = ["number", "-update_time"] ordering_fields = ["number", "create_time", "update_time"] @action(methods=['post'], detail=False, perms_map={'post': '*'}, serializer_class=QctGetSerializer) @transaction.atomic def get_qct(self, request, *args, **kwargs): """ 获取检验模板 获取检验模板 """ sr = QctGetSerializer(data=request.data) sr.is_valid(raise_exception=True) vdata = sr.validated_data qct = Qct.get(vdata["material"], vdata["tag"], vdata.get("type", None)) if qct: return Response(QctDetailSerializer(instance=qct).data) return Response() class QctTestItemViewSet(CustomModelViewSet): """检测模板项 检测模板项 """ perms_map = {"get": "*", "post": "qct.update", "put": "qct.update", "delete": "qct.update"} queryset = QctTestItem.objects.all() serializer_class = QctTestItemSerializer select_related_fields = ["qct", "testitem"] filterset_fields = ["qct", "testitem"] ordering = ["qct", "sort"] class QctDefectViewSet(CustomModelViewSet): """检测缺陷项 检测缺陷项 """ perms_map = {"get": "*", "post": "qct.update", "put": "qct.update", "delete": "qct.update"} queryset = QctDefect.objects.all() serializer_class = QctDefectSerializer select_related_fields = ["qct", "defect"] filterset_fields = ["qct", "defect"] ordering = ["qct", "sort"] def perform_create(self, serializer): ins: QctDefect = serializer.save() if ins.is_default: QctDefect.objects.filter(qct=ins.qct).exclude(id=ins.id).update(is_default=False) def perform_update(self, serializer): ins: QctDefect = serializer.save() if ins.is_default: QctDefect.objects.filter(qct=ins.qct).exclude(id=ins.id).update(is_default=False) class QctMatViewSet(CustomModelViewSet): """检测物料 检测物料 """ perms_map = {"get": "*", "post": "qct.update", "put": "qct.update", "delete": "qct.update"} queryset = QctMat.objects.all() serializer_class = QctMatSerializer filterset_fields = ["qct", "material"] class NotOkOptionView(APIView): perms_map = {'get': '*'} def get(self, request): """ 获取不合格项 获取不合格项 """ res1 = [{'value': i.value, 'label': i.label, 'extra_info': i.get_extra_info(i.value)} for i in NotOkOption] res2 = {i.value: i.label for i in NotOkOption} return Response({"res_list": res1, "res_dict": res2}) class TestItemViewSet(CustomModelViewSet): """ list:质检项目 质检项目 """ queryset = TestItem.objects.all() serializer_class = TestItemSerializer filterset_class = TestItemFilter search_fields = ['tags', 'name', 'number', 'mcate_tags'] ordering = ['id'] def add_info_for_list(self, data): affects_list = [i['affects'] for i in data] affectIds = [] for item in affects_list: affectIds.extend(item) affects = TestItem.objects.filter(id__in=affectIds).values_list('id', 'name') affects_dict = dict(affects) for item in data: affects = item["affects"] item["affects_name"] = ";".join([affects_dict.get(x, '未知') for x in affects]) return data def perform_destroy(self, instance): QctTestItem.objects.filter(testitem=instance).delete() instance.delete() class QuaStatViewSet(ListModelMixin, BulkUpdateModelMixin, CustomGenericViewSet): """ list:质量数据统计 质量数据统计 """ perms_map = {'get': '*', 'put': 'quastat.update'} queryset = QuaStat.objects.all() serializer_class = QuaStatSerializer update_serializer_class = QuaStatUpdateSerializer filterset_class = QuaStatFilter select_related_fields = ['belong_dept', 'material', 'testitem'] ordering = ['belong_dept__sort', 'material__sort', 'testitem__sort'] def after_bulk_update(self, objs): now = datetime.datetime.now() sflogIds = [] for i in objs: sflogIds.append(i['sflog']) sflogIds = list(set(sflogIds)) SfLog.objects.filter(id__in=sflogIds).update( last_test_time=now) # 更新质检记录时间 for sflogId in sflogIds: cal_quastat_sflog.delay(sflogId) # 根据物料ID和检测项目ID rate_pass_t @action(detail=False, methods=['post'], url_path='update_rate_pass_t', perms_map={'post': 'quastat.submit'}) def update_rate_pass_t(self, request, *args, **kwargs): # material_id = request.data.get("material_id") # testitem_id = request.data.get("testitem_id") # new_rate_pass_t = request.data.get("rate_pass_t") # # update_start = datetime.datetime.strptime(request.data.get("update_start_time"), "%Y-%m-%d") # # update_end = datetime.datetime.strptime(request.data.get("update_end_time"), "%Y-%m-%d") # month_s = request.data.get("month_s") serializer = UpdateRatePassTSerializer(data=request.data) serializer.is_valid(raise_exception=True) material_id = serializer.validated_data.get("material_id") testitem_id = serializer.validated_data.get("testitem_id") new_rate_pass_t = serializer.validated_data.get("rate_pass_t") month_s = serializer.validated_data.get("month_s") if not (material_id and testitem_id and new_rate_pass_t is not None): return Response( {"detail": "Missing required fields: material_id, testitem_id, rate_pass_t"}, status=400 ) # 先根据查询物料查询 sflogids 然后再过滤 work_data ,返回在时间段内的sflogids,再更新quaStat qua_data = QuaStat.objects.filter( material_id=material_id, testitem_id=testitem_id, sflog__work_date__month=month_s, ) sflog_ids = qua_data.values_list('sflog', flat=True).distinct() update_count = qua_data.update(rate_pass_t=new_rate_pass_t) for i in sflog_ids: cal_quastat_sflog.delay(i) return Response({"rate_pass_t": new_rate_pass_t}, status=200) class FtestViewSet(CustomModelViewSet): """ list:首件/成品检验 首件/成品检验 """ queryset = Ftest.objects.all() serializer_class = FtestSerializer select_related_fields = ['test_user', 'check_user', 'ftest_work'] filterset_class = FtestFilter def count_sampling(self, ftest_work:FtestWork): qs = Ftest.objects.filter(ftest_work=ftest_work) all_count = qs.count() ok_count = qs.filter(is_ok=True).count() ftest_work.count_sampling = all_count ftest_work.count_sampling_ok = ok_count if ftest_work.type2 == FtestWork.TYPE2_ALL: # 如果是全检 ftest_work.count_ok = ok_count ftest_work.count_notok = all_count - ok_count ftest_work.save() def perform_create(self, serializer): ins: Ftest = serializer.save() if ins.ftest_work: self.count_sampling(ins.ftest_work) def perform_update(self, serializer): ins: Ftest = serializer.save() if ins.ftest_work: self.count_sampling(ins.ftest_work) def perform_destroy(self, instance): ftest_work = instance.ftest_work instance.delete() if ftest_work: self.count_sampling(ftest_work) class PtestViewSet(CustomModelViewSet): """ list:性能测试 性能测试 """ queryset = Ptest.objects.all() serializer_class = PtestSerializer select_related_fields = ['testitem'] filterset_fields = ['testitem', 'test_date', 'sample_number'] search_fields = ['testitem__name', 'sample_number'] class FtestWorkViewSet(CustomModelViewSet): """ list: 检验工作 检验工作 """ queryset = FtestWork.objects.all() serializer_class = FtestWorkSerializer create_serializer_class = FtestWorkCreateUpdateSerializer update_serializer_class = FtestWorkCreateUpdateSerializer select_related_fields = ['material', 'mb', 'mb__material'] filterset_class = FtestWorkFilter @transaction.atomic def update(self, request, *args, **kwargs): ins:FtestWork = self.get_object() partial = kwargs.pop('partial', False) if ins.submit_time is not None: raise ParseError('已提交无法修改') if ins.ticket and ins.ticket.state.type != State.STATE_TYPE_START: raise ParseError('审批单已进行,无法修改') serializer = self.get_serializer(ins, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) self.perform_update(serializer) # 触发批次统计分析 ana_batch_thread(xbatchs=[ins.batch]) return Response(serializer.data) @transaction.atomic def destroy(self, request, *args, **kwargs): ins:FtestWork = self.get_object() if ins.submit_time is not None: raise ParseError('已提交无法删除') if ins.ticket: raise ParseError('存在审批, 无法删除') self.perform_destroy(ins) # 触发批次统计分析 ana_batch_thread(xbatchs=[ins.batch]) return Response(status=204) def perform_create(self, serializer): ins = serializer.save() # 触发批次统计分析 ana_batch_thread(xbatchs=[ins.batch]) @action(methods=['post'], detail=True, perms_map={'post': 'ftestwork.submit'}, serializer_class=Serializer) @transaction.atomic def submit(self, request, *args, **kwargs): """提交检验工作 提交检验工作 """ ins:FtestWork = self.get_object() if ins.ticket: raise ParseError('该检验工作存在审批!') if ins.wm is None and ins.mb is None: raise ParseError('该检验工作未关联库存') if ins.submit_time is None: ftestwork_submit(ins, request.user) else: raise ParseError('该检验工作已提交') return Response()