Recruitment_site/offer_backend/apps/applications/views.py

67 lines
2.7 KiB
Python

from rest_framework import generics, viewsets
from rest_framework.response import Response
from rest_framework.decorators import action
from django.http import FileResponse
from .models import Application
from .serializers import ApplicationCreateSerializer, ApplicationSerializer, ApplicationStatusSerializer
from .emails import notify_status_change
from apps.accounts.permissions import IsSeeker, IsAdminOrSuperAdmin
class ApplyView(generics.CreateAPIView):
serializer_class = ApplicationCreateSerializer
permission_classes = [IsSeeker]
class MyApplicationsView(generics.ListAPIView):
serializer_class = ApplicationSerializer
permission_classes = [IsSeeker]
def get_queryset(self):
return Application.objects.filter(applicant=self.request.user).select_related('job__organization')
class ApplicationManageViewSet(viewsets.ReadOnlyModelViewSet):
"""HR 查看本公司投递"""
serializer_class = ApplicationSerializer
permission_classes = [IsAdminOrSuperAdmin]
def get_queryset(self):
user = self.request.user
if user.is_superadmin:
return Application.objects.all().select_related('job__organization', 'applicant')
return Application.objects.filter(
job__organization=user.organization
).select_related('job__organization', 'applicant')
@action(detail=True, methods=['get'])
def download_resume(self, request, pk=None):
"""下载求职者的简历附件"""
application = self.get_object()
attachment_url = application.resume_snapshot.get('attachment_url')
if not attachment_url:
return Response({'detail': '附件不可用'}, status=404)
try:
from django.core.files.storage import default_storage
# 获取文件
file = default_storage.open(attachment_url.lstrip('/media/'), 'rb')
response = FileResponse(file)
response['Content-Type'] = 'application/octet-stream'
response['Content-Disposition'] = f'attachment; filename="{application.resume_snapshot.get("name", "resume")}.pdf"'
return response
except Exception as e:
return Response({'detail': f'下载失败: {str(e)}'}, status=400)
class ApplicationStatusUpdateView(generics.UpdateAPIView):
serializer_class = ApplicationStatusSerializer
permission_classes = [IsAdminOrSuperAdmin]
def get_queryset(self):
user = self.request.user
if user.is_superadmin:
return Application.objects.all()
return Application.objects.filter(job__organization=user.organization)
def perform_update(self, serializer):
instance = serializer.save()
notify_status_change(instance)