feat(hrm): 完善员工导入导出

导出(export_excel):
- 去掉 1000 条上限,导出筛选后的全部记录
- 由 7 列扩展为全部业务字段;select_related 避免 N+1

导入(import_excel):
- 改为按姓名 upsert:姓名唯一直接更新(身份证号可空);
  重名必须用身份证号区分,否则报错
- 已存在记录用 Excel 非空列覆盖,空单元格保持原值
- 所属部门改为必填:为空或系统不存在均报错
- 修复新增分支重复关键字参数导致的报错

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
zty 2026-06-03 14:12:17 +08:00
parent 1e920296e0
commit b395db7a28
1 changed files with 90 additions and 35 deletions

View File

@ -290,23 +290,70 @@ class EmployeeViewSet(CustomModelViewSet):
"""导出excel
导出excel
"""
field_data = ['人员类型', '人员', '手机号', '身份证号', '所属部门', '在职状态', '定位卡号']
queryset = self.filter_queryset(self.get_queryset())
if queryset.count() > 1000:
raise ParseError('数据量超过1000,请筛选后导出')
# 导出列定义: (表头, 取值函数)。覆盖人员的全部业务字段。
def _cell(v):
return '' if v is None else v
columns = [
('人员类型', lambda i: epTypeOptions.get(i['type'], i['type'])),
('姓名', lambda i: i['name']),
('性别', lambda i: i['gender']),
('人员编号', lambda i: i['number']),
('手机号', lambda i: i['phone']),
('个人邮箱', lambda i: i['email']),
('身份证号', lambda i: i['id_number']),
('所属部门', lambda i: i.get('belong_dept_name', '')),
('所属岗位', lambda i: i.get('post_name', '')),
('车间', lambda i: i['workshop']),
('职务', lambda i: i['position']),
('职务聘任日期', lambda i: i['office_date']),
('在职状态', lambda i: epStateOptions.get(i['job_state'], i['job_state'])),
('入职日期', lambda i: i['start_date']),
('转正日期', lambda i: i['regular_date']),
('合同到期日', lambda i: i['contract_end_date']),
('离职日期', lambda i: i['end_date']),
('银行卡号', lambda i: i['bank_card']),
('出生年月日', lambda i: i['birthday']),
('年龄', lambda i: i['age']),
('工龄', lambda i: i['work_years']),
('学历', lambda i: i['qualification']),
('政治面貌', lambda i: i['partisan']),
('入党时间', lambda i: i['join_partisan_date']),
('民族', lambda i: i['nation']),
('婚姻状况', lambda i: i['marriage']),
('户口性质', lambda i: i['hukou_type']),
('籍贯', lambda i: i['birthplace']),
('户籍地址', lambda i: i['hukou_address']),
('现住地址', lambda i: i['address']),
('枣庄市职称', lambda i: i['zhuanzhi']),
('获得枣庄市职称日期', lambda i: i['zhuanzhi_date']),
('总院/集团职称', lambda i: i['zyjt_zhuanzhi']),
('获得职称日期', lambda i: i['zyjt_zhuanzhi_date']),
('职业技能等级', lambda i: i['skill_rank']),
('获得技能等级证书日期', lambda i: i['skill_rank_date']),
('全日制最高学历', lambda i: i['full_edu']),
('全日制最高学历学校名称', lambda i: i['full_edu_school']),
('全日制最高学历专业', lambda i: i['full_edu_major']),
('全日制最高学历毕业时间', lambda i: i['full_edu_time']),
('非全日制最高学历', lambda i: i['part_edu']),
('非全日制最高学历学校名称', lambda i: i['part_edu_school']),
('非全日制最高学历专业', lambda i: i['part_edu_major']),
('非全日制最高学历毕业时间', lambda i: i['part_edu_time']),
('紧急联系人姓名', lambda i: i['emergency_contact']),
('紧急联系人电话', lambda i: i['emergency_phone']),
('所获荣誉', lambda i: i['honor']),
('首次缴纳社保日期', lambda i: i['first_social_security_date']),
('是否为退役军人', lambda i: '' if i['is_veteran'] else ''),
('定位卡号', lambda i: i['blt_'].get('code', '') if i.get('blt_') else ''),
('创建时间', lambda i: (i.get('create_time') or '')[:19].replace('T', ' ')),
]
field_data = [c[0] for c in columns]
queryset = self.filter_queryset(self.get_queryset()).select_related(
'belong_dept', 'post', 'blt')
odata = EmployeeSerializer(queryset, many=True).data
# 处理数据
data = []
for i in odata:
data.append(
[epTypeOptions[i['type']],
i['name'],
i['phone'],
i['id_number'],
i.get('belong_dept_name', ''),
epStateOptions[i['job_state']],
i['blt_'].get('code', '') if 'blt_' in i and i['blt_'] else '']
)
data.append([_cell(getter(i)) for _, getter in columns])
return Response({'path': export_excel(field_data, data, '人员信息')})
@action(methods=['post'], detail=False, perms_map={'post': 'employee.import_excel'},
@ -361,8 +408,8 @@ class EmployeeViewSet(CustomModelViewSet):
id_number = data.get("id_number")
name = data.get("name")
if not id_number or not name:
raise ParseError(f'{row_num}行,身份证号或姓名为空')
if not name:
raise ParseError(f'{row_num}行,姓名为空')
myLogger.info(f"处理第{row_num}行:{name} - {id_number}")
# 处理人员类型
@ -372,12 +419,13 @@ class EmployeeViewSet(CustomModelViewSet):
data['type'] = TYPE_MAPPING[excel_type]
else:
raise ParseError(f'{row_num}行,人员类型"{excel_type}"无效,有效类型:{", ".join(TYPE_MAPPING.keys())}')
# 处理部门外键
if 'belong_dept' in data and data['belong_dept']:
dept_name = data.pop('belong_dept')
if dept_name not in dept_map:
raise ParseError(f'{row_num}行,部门"{dept_name}"不存在')
data['belong_dept_id'] = dept_map[dept_name]
# 处理部门外键(所属部门必填)
dept_name = data.pop('belong_dept', None)
if not dept_name:
raise ParseError(f'{row_num}行,所属部门为空')
if dept_name not in dept_map:
raise ParseError(f'{row_num}行,部门"{dept_name}"不存在')
data['belong_dept_id'] = dept_map[dept_name]
# 数据验证
if data.get('phone'):
@ -388,34 +436,41 @@ class EmployeeViewSet(CustomModelViewSet):
if not re.match(r'^[1-9]\d{5}(18|19|20)\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{3}[0-9Xx]$', data['id_number']):
raise ParseError(f'{row_num}行,身份证号格式不正确')
# 查找或创建/补全
# 按姓名匹配并更新:姓名唯一则直接更新(身份证号可为空);
# 存在重名则必须用身份证号定位具体记录,否则报错。
try:
with transaction.atomic():
# 优先按身份证号匹配,匹配不到再按姓名匹配
existing = None
if id_number:
existing = Employee.objects.filter(id_number=id_number).first()
if not existing and name:
existing = Employee.objects.filter(name=name, id_number__isnull=True).first() or \
Employee.objects.filter(name=name, id_number='').first()
name_matches = list(Employee.objects.filter(name=name))
if len(name_matches) <= 1:
existing = name_matches[0] if name_matches else None
# 同名但身份证号不一致,视为不同人员,新建
if existing and id_number and existing.id_number \
and existing.id_number != id_number:
existing = None
else:
# 存在重名,必须用身份证号区分
if not id_number:
raise ParseError(f'{row_num}行,姓名"{name}"存在重名,必须填写身份证号')
existing = next(
(e for e in name_matches if e.id_number == id_number), None)
if existing:
# 只用 Excel 非空值填补数据库中为空的字段
# 用 Excel 中填写了值的列覆盖数据库已有数据;空单元格保持原值不变
updated_fields = []
for field_name, value in data.items():
if value in [None, '']:
continue
current_value = getattr(existing, field_name, None)
if current_value in [None, '']:
if getattr(existing, field_name, None) != value:
setattr(existing, field_name, value)
updated_fields.append(field_name)
if updated_fields:
existing.save(update_fields=updated_fields + ['update_time'])
myLogger.info(f"✅ 第{row_num}补全成功:{name},更新字段:{updated_fields}")
myLogger.info(f"✅ 第{row_num}更新成功:{name},更新字段:{updated_fields}")
else:
myLogger.info(f"⏭️ 第{row_num}行无需补全{name}")
myLogger.info(f"⏭️ 第{row_num}行无变化{name}")
created = False
else:
Employee.objects.create(id_number=id_number, name=name, **data)
Employee.objects.create(**data)
created = True
except Exception as e:
raise