139 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
	
from rest_framework.viewsets import ModelViewSet
 | 
						||
from apps.system.mixins import CreateUpdateCustomMixin
 | 
						||
from apps.edu.serializers import CertificateSerializer, CourseSerializer
 | 
						||
from apps.edu.models import Certificate, Course
 | 
						||
from rest_framework.decorators import action
 | 
						||
from django.conf import settings
 | 
						||
from rest_framework.exceptions import ParseError
 | 
						||
from rest_framework.response import Response
 | 
						||
from openpyxl import load_workbook
 | 
						||
from rest_framework.serializers import Serializer
 | 
						||
from rest_framework.permissions import AllowAny
 | 
						||
from apps.edu.services import make_img_x
 | 
						||
from django.db import transaction
 | 
						||
import datetime
 | 
						||
from django.db.models import Q
 | 
						||
 | 
						||
 | 
						||
# Create your views here.
 | 
						||
class CourseViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | 
						||
    perms_map = {"get": "*", "post": "course", "put": "course", "delete": "course"}
 | 
						||
    queryset = Course.objects.all()
 | 
						||
    serializer_class = CourseSerializer
 | 
						||
    search_fields = ["name"]
 | 
						||
    ordering = ["create_time"]
 | 
						||
 | 
						||
 | 
						||
class CertificateViewSet(CreateUpdateCustomMixin, ModelViewSet):
 | 
						||
    perms_map = {"get": "*", "post": "certificate", "put": "certificate", "delete": "certificate"}
 | 
						||
    queryset = Certificate.objects.all()
 | 
						||
    serializer_class = CertificateSerializer
 | 
						||
    search_fields = ["姓名", "证书编号", "所属单位"]
 | 
						||
    filterset_fields = ["是否内审员", "是否授权签字人", "是否质量负责人", "是否最高管理者", "姓名", "证书编号", "所属单位", "单位名称", "课程列表", "证书方案"]
 | 
						||
    ordering = ["-create_time", "证书编号"]
 | 
						||
 | 
						||
    def get_authenticators(self):
 | 
						||
        if self.request.method == "GET":
 | 
						||
            return []
 | 
						||
        return super().get_authenticators()
 | 
						||
 | 
						||
    def get_permissions(self):
 | 
						||
        if self.request.method == "GET":
 | 
						||
            return [AllowAny()]
 | 
						||
        return super().get_permissions()
 | 
						||
 | 
						||
    def retrieve(self, request, *args, **kwargs):
 | 
						||
        regen_img = request.query_params.get("regen_img", False)
 | 
						||
        instance = self.get_object()
 | 
						||
        if not instance.证书地址 or regen_img:
 | 
						||
            make_img_x(instance)
 | 
						||
        serializer = self.get_serializer(instance)
 | 
						||
        return Response(serializer.data)
 | 
						||
 | 
						||
    def make_data(self, data, sheet, i):
 | 
						||
        data["证书编号"] = sheet["b" + str(i)].value
 | 
						||
        data["所属单位"] = sheet["c" + str(i)].value
 | 
						||
        data["单位名称"] = sheet["d" + str(i)].value
 | 
						||
        data["姓名"] = sheet["e" + str(i)].value
 | 
						||
        data["性别"] = sheet["f" + str(i)].value
 | 
						||
        data["职务"] = sheet["g" + str(i)].value
 | 
						||
        data["手机号"] = sheet["h" + str(i)].value
 | 
						||
        data["是否内审员"] = True if sheet["i" + str(i)].value else False
 | 
						||
        data["是否授权签字人"] = True if sheet["j" + str(i)].value else False
 | 
						||
        data["是否质量负责人"] = True if sheet["k" + str(i)].value else False
 | 
						||
        data["是否最高管理者"] = True if sheet["l" + str(i)].value else False
 | 
						||
        data["是否需要集团证书"] = True if sheet["m" + str(i)].value else False
 | 
						||
        data["是否需要北京标研培训合格"] = True if sheet["n" + str(i)].value else False
 | 
						||
        return data
 | 
						||
 | 
						||
    @action(methods=["post"], detail=False, perms_map={"post": "certificate"}, serializer_class=Serializer)
 | 
						||
    def imp(self, request, *args, **kwargs):
 | 
						||
        """导入表格
 | 
						||
 | 
						||
        导入表格
 | 
						||
        """
 | 
						||
        path = request.data.get("path", "")
 | 
						||
        full_path = settings.BASE_DIR + path
 | 
						||
        if not path.endswith(".xlsx"):
 | 
						||
            raise ParseError("请提供xlsx格式文件")
 | 
						||
        wb = load_workbook(full_path, data_only=True)
 | 
						||
        sheet = wb.worksheets[0]
 | 
						||
        i = 3
 | 
						||
        while sheet["b" + str(i)].value:
 | 
						||
            data = {}
 | 
						||
            data = self.make_data(data, sheet, i)
 | 
						||
            obj, created = Certificate.objects.update_or_create(defaults=data, 证书编号=data["证书编号"])
 | 
						||
            i = i + 1
 | 
						||
        return Response()
 | 
						||
 | 
						||
    @action(methods=["post"], detail=False, perms_map={"post": "certificate"}, serializer_class=Serializer)
 | 
						||
    @transaction.atomic
 | 
						||
    def imp_202312(self, request, *args, **kwargs):
 | 
						||
        """导入202312版本表格
 | 
						||
 | 
						||
        导入表格
 | 
						||
        """
 | 
						||
        path = request.data.get("path", "")
 | 
						||
        full_path = settings.BASE_DIR + path
 | 
						||
        if not path.endswith(".xlsx"):
 | 
						||
            raise ParseError("请提供xlsx格式文件")
 | 
						||
        wb = load_workbook(full_path, data_only=True)
 | 
						||
        sheet = wb.worksheets[0]
 | 
						||
        courses_str = sheet["b1"].value
 | 
						||
        courses = courses_str.split(";")
 | 
						||
        courses_list = []
 | 
						||
        for course in courses:
 | 
						||
            try:
 | 
						||
                course_obj = Course.objects.get(name=course)
 | 
						||
                courses_list.append(course_obj.id)
 | 
						||
            except Course.DoesNotExist:
 | 
						||
                raise ParseError("课程不存在")
 | 
						||
        att_date = sheet["b2"].value
 | 
						||
        att_date2 = sheet["b3"].value
 | 
						||
        issue_date = sheet["b4"].value
 | 
						||
        i = 6
 | 
						||
        while sheet[f"a{i}"].value:
 | 
						||
            number = sheet[f"a{i}"].value.replace("\n", "").replace(" ", "")
 | 
						||
            name = sheet[f"b{i}"].value.replace("\n", "").replace(" ", "")
 | 
						||
            dept_full = sheet[f"c{i}"].value.replace("\n", "").replace(" ", "")
 | 
						||
            dept = sheet[f"d{i}"].value.replace("\n", "").replace(" ", "")
 | 
						||
            obj, created = Certificate.objects.update_or_create(
 | 
						||
                defaults={"证书方案": "202312", "证书编号": number, "姓名": name, "培训日期": att_date, "培训结束日期": att_date2, "发证日期": issue_date, "单位名称": dept_full, "所属单位": dept},
 | 
						||
                证书编号=number,
 | 
						||
            )
 | 
						||
            obj.课程列表.set(Course.objects.filter(id__in=courses_list))
 | 
						||
            i += 1
 | 
						||
        return Response()
 | 
						||
 | 
						||
    @action(methods=["get"], detail=False, perms_map={"get": "*"})
 | 
						||
    def gen_img(self, request, *args, **kwargs):
 | 
						||
        """没有生成证书照片的都生成照片
 | 
						||
 | 
						||
        没有生成证书照片的都生成照片
 | 
						||
        """
 | 
						||
        queryset = Certificate.objects.filter(Q(证书地址=None) | Q(证书地址=""))
 | 
						||
        for obj in queryset:
 | 
						||
            make_img_x(obj)
 | 
						||
            print(f"生成---{obj.姓名}---的证书")
 | 
						||
        return Response()
 |