import re from typing import Any, Dict, List, Optional, Tuple import openpyxl from apps.factory.models import Factory from apps.material.models import Material MAJOR_CATEGORY_MAP = { "建筑": "architecture", "景观": "landscape", "设备": "equipment", "装修": "decoration", "室内": "decoration", } STAGE_VALUES = {choice[0] for choice in Material.STAGE_CHOICES} IMPORTANCE_LEVEL_VALUES = {choice[0] for choice in Material.IMPORTANCE_LEVEL_CHOICES} UNIT_SPLIT_RE = re.compile(r"[\s,,、/;;]+") def _cell(value: Any) -> str: if value is None: return "" return str(value).strip() def _single_line(value: Any, max_len: int = 255) -> str: text = _cell(value) if not text: return "" text = re.sub(r"\s+", " ", text) return text[:max_len].strip() def _parse_choice(value: Any, allowed_values: set) -> Optional[str]: text = _cell(value) if not text: return None return text if text in allowed_values else None def _resolve_factory(unit_name: str, factory_cache: Dict[str, Optional[Factory]], unrecognized_factory: Factory) -> Tuple[Factory, bool, bool]: if not unit_name: return unrecognized_factory, True, False if unit_name not in factory_cache: candidates = [part.strip() for part in UNIT_SPLIT_RE.split(unit_name) if part.strip()] search_terms = candidates or [unit_name] matched_factory = None for term in search_terms: matched_factory = Factory.objects.filter(brand=term).first() if matched_factory: break matched_factory = Factory.objects.filter(brand__icontains=term).first() if matched_factory: break term_lower = term.lower() for factory in Factory.objects.all(): brand = (factory.brand or "").lower() if brand and brand in term_lower: matched_factory = factory break if matched_factory: break factory_cache[unit_name] = matched_factory factory = factory_cache[unit_name] if factory: return factory, False, False created_factory = Factory.objects.create( factory_name=unit_name, brand=unit_name, province="北京", city="北京", district="北京", ) factory_cache[unit_name] = created_factory return created_factory, False, True def import_materials_plan_excel(file_obj) -> Dict[str, int]: workbook = openpyxl.load_workbook(file_obj, read_only=True, data_only=True) worksheet = workbook[workbook.sheetnames[0]] rows = list(worksheet.iter_rows(values_only=True)) workbook.close() if len(rows) < 3: raise ValueError("Excel 内容不足,未找到表头或数据。") header = [_cell(value) for value in rows[1]] header_index = {name: idx for idx, name in enumerate(header) if name} required_headers = ["材料大类", "细分种类", "材料名称", "材料单位名称"] missing_headers = [name for name in required_headers if name not in header_index] if missing_headers: raise ValueError(f"缺少必要表头: {', '.join(missing_headers)}") def get(row: Tuple[Any, ...], key: str) -> str: idx = header_index.get(key, -1) if idx < 0 or idx >= len(row): return "" return _cell(row[idx]) unrecognized_factory, _ = Factory.objects.get_or_create( brand="未识别的品牌", defaults={ "factory_name": "未识别的品牌工厂", "province": "-", "city": "-", }, ) created = 0 updated = 0 skipped = 0 unresolved_factory = 0 created_factory = 0 factory_cache: Dict[str, Optional[Factory]] = {} current_major_category = "" for row in rows[2:]: if not row: continue row_values = [_cell(value) for value in row] if not any(row_values): continue major_raw = get(row, "材料大类") if major_raw: current_major_category = major_raw material_name = _single_line(get(row, "材料名称")) material_category = _single_line(get(row, "细分种类")) if not material_name or not material_category or not current_major_category: skipped += 1 continue unit_name = get(row, "材料单位名称") factory, is_unresolved, is_created_factory = _resolve_factory(unit_name, factory_cache, unrecognized_factory) if is_unresolved: unresolved_factory += 1 if is_created_factory: created_factory += 1 defaults = { "stage": _parse_choice(get(row, "阶段"), STAGE_VALUES), "importance_level": _parse_choice(get(row, "重要等级"), IMPORTANCE_LEVEL_VALUES), "landing_project": _single_line(get(row, "落地项目")) or None, "contact_person": _single_line(get(row, "对接人")) or None, "contact_phone": _single_line(get(row, "对接人联系方式")) or None, "handler": _single_line(get(row, "经办人")) or None, "remark": _single_line(get(row, "备注")) or None, "factory": factory, } material, created_flag = Material.objects.update_or_create( name=material_name, major_category=MAJOR_CATEGORY_MAP.get(current_major_category, "architecture"), material_category=material_category, factory=factory, defaults=defaults, ) if created_flag: created += 1 else: updated += 1 return { "created": created, "updated": updated, "skipped": skipped, "unresolved_factory": unresolved_factory, "created_factory": created_factory, }