from io import BytesIO from pathlib import Path from django.http import FileResponse, Http404 from django.utils import timezone from openpyxl import Workbook from openpyxl.worksheet.datavalidation import DataValidation from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter from rest_framework import generics, status from rest_framework.decorators import api_view, action from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet from rest_framework.exceptions import PermissionDenied from rest_framework.parsers import MultiPartParser from .models import Material, MaterialCategory, MaterialSubcategory from .serializers import MaterialSerializer, MaterialListSerializer, MaterialCategorySerializer, MaterialSubcategorySerializer from .importers import import_materials_plan_excel def _join_choice_values(values, choices): if not values: return "" mapping = dict(choices) if isinstance(values, str): values = [values] return "、".join(mapping.get(value, value) for value in values) def _apply_template_header_style(worksheet, row_index, total_columns): header_font = Font(name='SimSun', size=18, bold=True, color='FFFFFFFF') header_fill = PatternFill(fill_type='solid', fgColor='FFC39E61') header_alignment = Alignment(horizontal='center', vertical='center') header_side = Side(style='thin', color='FF000000') header_border = Border(left=header_side, right=header_side, top=header_side) worksheet.row_dimensions[row_index].height = 23.25 for column_index in range(1, total_columns + 1): cell = worksheet.cell(row=row_index, column=column_index) cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment cell.border = header_border def _apply_template_title_style(worksheet, row_index, total_columns, title): title_font = Font(name='SimSun', size=18, bold=True, color='FFFFFFFF') title_fill = PatternFill(fill_type='solid', fgColor='FF039843') title_alignment = Alignment(horizontal='center', vertical='center') worksheet.merge_cells(start_row=row_index, start_column=1, end_row=row_index, end_column=total_columns) cell = worksheet.cell(row=row_index, column=1) cell.value = title cell.font = title_font cell.fill = title_fill cell.alignment = title_alignment worksheet.row_dimensions[row_index].height = 23.25 def _text_display_width(text): width = 0 for char in str(text or ""): width += 2 if ord(char) > 127 else 1 return width def _apply_header_column_widths(worksheet, headers): for column_index, header in enumerate(headers, start=1): display_width = _text_display_width(header) worksheet.column_dimensions[get_column_letter(column_index)].width = max(16, display_width + 4) def _apply_export_dropdowns(worksheet, headers, data_row_count): dropdown_configs = [ { "headers": {"质量等级", "耐久等级", "环保等级", "低碳等级", "总评分"}, "options": ["1星", "2星", "3星"], "prompt": "请选择星级", "error": "仅支持 1星、2星、3星", }, { "headers": {"材料子类"}, "options": [ "节能材料", "涂料", "界面剂", "地坪", "水磨石", "S垫疏水垫", "商用地垫", "方块毯", "保温板(无机材料)", "预制墙板(无机材料)", ], "prompt": "请选择材料子类", "error": "仅支持下拉列表中的材料子类", }, { "headers": {"替代材料类型"}, "options": ["平替", "新研发"], "prompt": "请选择替代材料类型", "error": "仅支持 平替、新研发", }, { "headers": {"竞争优势"}, "options": ["品质", "成本"], "prompt": "可从下拉选择;如需多填,请手动输入并用、分隔", "error": "仅支持 品质、成本;多值请用、分隔", }, { "headers": {"应用场景"}, "options": ["府系", "境系", "城系", "住系", "保障房"], "prompt": "Excel 单元格下拉不支持原生多选;如需多填,请手动输入并用、分隔", "error": "仅支持 府系、境系、城系、住系、保障房;多值请用、分隔", }, ] start_row = 3 end_row = max(start_row, data_row_count + 202) source_column_index = len(headers) + 2 for config in dropdown_configs: source_column_letter = get_column_letter(source_column_index) for row_index, option in enumerate(config["options"], start=1): worksheet.cell(row=row_index, column=source_column_index, value=option) worksheet.column_dimensions[source_column_letter].hidden = True option_formula = f"=${source_column_letter}$1:${source_column_letter}${len(config['options'])}" for column_index, header in enumerate(headers, start=1): if header not in config["headers"]: continue validation = DataValidation(type="list", formula1=option_formula, allow_blank=True) validation.prompt = config["prompt"] validation.error = config["error"] worksheet.add_data_validation(validation) validation.add(f"{get_column_letter(column_index)}{start_row}:{get_column_letter(column_index)}{end_row}") source_column_index += 1 class MaterialViewSet(ModelViewSet): """ 材料视图集 """ permission_classes = [IsAuthenticated] def get_queryset(self): """ 根据用户角色过滤材料 """ queryset = Material.objects.all().order_by('-created_at', '-id') # 普通用户只能看到自己工厂的材料 if self.request.user.role != 'admin': queryset = queryset.filter(factory=self.request.user.factory) # 支持按状态过滤 status_filter = self.request.query_params.get('status') if status_filter: queryset = queryset.filter(status=status_filter) # 支持按名称搜索 name = self.request.query_params.get('name') if name: queryset = queryset.filter(name__icontains=name) # 支持按工厂过滤 factory_id = self.request.query_params.get('factory_id') if factory_id: queryset = queryset.filter(factory_id=factory_id) # 支持按品牌过滤 brand_id = self.request.query_params.get('brand') if brand_id: queryset = queryset.filter(brand_id=brand_id) # 支持按专业类别过滤 major_category = self.request.query_params.get('major_category') if major_category: queryset = queryset.filter(major_category=major_category) # 支持按材料分类过滤 material_category = self.request.query_params.get('material_category') if material_category: queryset = queryset.filter(material_category=material_category) # 支持按材料子类过滤 material_subcategory = self.request.query_params.get('material_subcategory') if material_subcategory: queryset = queryset.filter(material_subcategory=material_subcategory) # 支持按应用场景过滤 (JSONField contains) application_scene = self.request.query_params.get('application_scene') if application_scene: queryset = queryset.filter(application_scene__contains=[application_scene]) # 支持按竞争优势过滤 (JSONField contains) advantage = self.request.query_params.get('advantage') if advantage: queryset = queryset.filter(advantage__contains=[advantage]) return queryset def get_serializer_class(self): """ 根据操作类型选择序列化器 """ if self.action == 'list': return MaterialListSerializer return MaterialSerializer def perform_create(self, serializer): """ 创建材料时自动设置工厂 """ # 普通用户只能为自己工厂创建材料 if self.request.user.role != 'admin': serializer.save(factory=self.request.user.factory) else: serializer.save() def perform_update(self, serializer): """ 更新材料时的权限控制 """ # 普通用户只能更新自己工厂的材料 if (self.request.user.role != 'admin' and self.request.user.factory_id != self.get_object().factory_id): raise PermissionDenied("无权修改其他工厂的材料") # 普通用户只能编辑创建中的材料 if self.request.user.role != 'admin' and self.get_object().status != 'draft': raise PermissionDenied("只有创建中的材料可以编辑") serializer.save() def perform_destroy(self, instance): """ 删除材料时的权限控制 """ # 普通用户只能删除自己工厂的材料 if (self.request.user.role != 'admin' and self.request.user.factory_id != instance.factory_id): raise PermissionDenied("无权删除其他工厂的材料") # 普通用户只能删除创建中的材料 if self.request.user.role != 'admin' and instance.status != 'draft': raise PermissionDenied("只有创建中的材料可以删除") instance.delete() @action(detail=True, methods=['post']) def submit(self, request, pk=None): """ 提交审核 """ material = self.get_object() # 普通用户只能提交自己工厂的材料 if (request.user.role != 'admin' and request.user.factory_id != material.factory_id): return Response( {"detail": "无权提交其他工厂的材料"}, status=status.HTTP_403_FORBIDDEN ) if material.status != 'draft': return Response( {"detail": "只有创建中的材料才能提交审核"}, status=status.HTTP_400_BAD_REQUEST ) material.status = 'pending' material.save() return Response({"status": "已提交审核"}) @action(detail=True, methods=['post']) def approve(self, request, pk=None): """ 审核通过 """ # 只有管理员可以审核 if request.user.role != 'admin': return Response( {"detail": "只有管理员可以审核材料"}, status=status.HTTP_403_FORBIDDEN ) material = self.get_object() if material.status != 'pending': return Response( {"detail": "只有待审核的材料才能审核"}, status=status.HTTP_400_BAD_REQUEST ) material.status = 'approved' material.save() return Response({"status": "审核通过"}) @action(detail=True, methods=['post']) def reject(self, request, pk=None): """ 审核拒绝 """ # 只有管理员可以审核 if request.user.role != 'admin': return Response( {"detail": "只有管理员可以审核材料"}, status=status.HTTP_403_FORBIDDEN ) material = self.get_object() if material.status != 'pending': return Response( {"detail": "只有待审核的材料才能审核"}, status=status.HTTP_400_BAD_REQUEST ) material.status = 'draft' material.save() return Response({"status": "审核拒绝"}) @action(detail=False, methods=['get']) def choices(self, request): """ 材料字段枚举 """ return Response({ 'major_category': Material.MAJOR_CATEGORY_CHOICES, 'stage': Material.STAGE_CHOICES, 'importance_level': Material.IMPORTANCE_LEVEL_CHOICES, 'replace_type': Material.REPLACE_TYPE_CHOICES, 'advantage': Material.ADVANTAGE_CHOICES, 'application_scene': Material.APPLICATION_SCENE_CHOICES, 'star_level': Material.STAR_LEVEL_CHOICES, 'status': Material.STATUS_CHOICES, }) @action(detail=False, methods=['get'], url_path='template') def template(self, request): template_path = Path(__file__).resolve().parents[3] / 'frontend' / 'public' / 'material_import_template.xlsx' if not template_path.exists(): raise Http404("模板文件不存在") return FileResponse( template_path.open('rb'), as_attachment=True, filename='material_import_template.xlsx', content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', ) @action(detail=False, methods=['get'], url_path='export-excel') def export_excel(self, request): workbook = Workbook() worksheet = workbook.active worksheet.title = "材料" headers = [ "材料名称","材料大类", "细分种类", "材料子类", "阶段", "重要等级", "落地项目", "对接人", "对接人联系方式", "经办人", "供应商", "工厂全称", "规格型号", "符合标准", "应用场景", "应用说明", "替代材料类型", "竞争优势", "优势说明", "成本对比(%)", "成本说明", "案例", "质量等级", "耐久等级", "环保等级", "低碳等级", "总评分", "连接方式", "施工工艺", "限制条件", "备注", ] _apply_template_title_style(worksheet, 1, len(headers), "新材料工作进展目录") worksheet.append(headers) _apply_template_header_style(worksheet, 2, len(headers)) _apply_header_column_widths(worksheet, headers) queryset = self.get_queryset().select_related('factory') for material in queryset: worksheet.append([ material.name or "", material.get_major_category_display() or "", material.material_category or "", material.material_subcategory or "", material.get_stage_display() or "", material.get_importance_level_display() or "", material.landing_project or "", material.contact_person or "", material.contact_phone or "", material.handler or "", material.factory.short_name if material.factory else "", material.factory.factory_name if material.factory else "", material.spec or "", material.standard or "", _join_choice_values(material.application_scene, Material.APPLICATION_SCENE_CHOICES), material.application_desc or "", material.get_replace_type_display() or "", _join_choice_values(material.advantage, Material.ADVANTAGE_CHOICES), material.advantage_desc or "", "" if material.cost_compare is None else str(material.cost_compare), material.cost_desc or "", material.cases or "", material.get_quality_level_display() or "", material.get_durability_level_display() or "", material.get_eco_level_display() or "", material.get_carbon_level_display() or "", material.get_score_level_display() or "", material.connection_method or "", material.construction_method or "", material.limit_condition or "", material.remark or "", ]) _apply_export_dropdowns(worksheet, headers, queryset.count()) output = BytesIO() workbook.save(output) output.seek(0) filename = f"materials_export_{timezone.localtime().strftime('%Y%m%d_%H%M%S')}.xlsx" return FileResponse( output, as_attachment=True, filename=filename, content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', ) @action(detail=False, methods=['post'], parser_classes=[MultiPartParser], url_path='import-excel') def import_excel(self, request): if request.user.role != 'admin': raise PermissionDenied("只有管理员可以导入材料") excel_file = request.FILES.get('file') if not excel_file: return Response({"detail": "未提供 Excel 文件"}, status=status.HTTP_400_BAD_REQUEST) if not excel_file.name.lower().endswith('.xlsx'): return Response({"detail": "仅支持 .xlsx 格式的 Excel 文件"}, status=status.HTTP_400_BAD_REQUEST) try: result = import_materials_plan_excel(excel_file) except ValueError as exc: return Response({"detail": str(exc)}, status=status.HTTP_400_BAD_REQUEST) except Exception as exc: return Response({"detail": f"导入失败: {exc}"}, status=status.HTTP_400_BAD_REQUEST) return Response(result) class MaterialCategoryViewSet(ModelViewSet): """ 材料分类视图集 """ queryset = MaterialCategory.objects.all() serializer_class = MaterialCategorySerializer permission_classes = [IsAuthenticated] def perform_create(self, serializer): if self.request.user.role != 'admin': raise PermissionDenied("只有管理员可以创建材料分类") serializer.save() def perform_update(self, serializer): if self.request.user.role != 'admin': raise PermissionDenied("只有管理员可以更新材料分类") serializer.save() def perform_destroy(self, instance): if self.request.user.role != 'admin': raise PermissionDenied("只有管理员可以删除材料分类") instance.delete() class MaterialSubcategoryViewSet(ModelViewSet): """ 材料子分类视图集 """ queryset = MaterialSubcategory.objects.select_related('category').all() serializer_class = MaterialSubcategorySerializer permission_classes = [IsAuthenticated] def get_queryset(self): queryset = super().get_queryset() category_id = self.request.query_params.get('category_id') if category_id: queryset = queryset.filter(category_id=category_id) return queryset def perform_create(self, serializer): if self.request.user.role != 'admin': raise PermissionDenied("只有管理员可以创建材料子分类") serializer.save() def perform_update(self, serializer): if self.request.user.role != 'admin': raise PermissionDenied("只有管理员可以更新材料子分类") serializer.save() def perform_destroy(self, instance): if self.request.user.role != 'admin': raise PermissionDenied("只有管理员可以删除材料子分类") instance.delete()