mat/backend/apps/material/importers.py

183 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import Any, Dict, List, Optional, Tuple
import openpyxl
from apps.factory.models import Factory
from apps.material.models import Material
MAJOR_CATEGORY_MAP = {
"建筑": "architecture",
"景观": "landscape",
"设备": "equipment",
"装修": "decoration",
"室内": "decoration",
}
STAGE_VALUES = {choice[0] for choice in Material.STAGE_CHOICES}
IMPORTANCE_LEVEL_VALUES = {choice[0] for choice in Material.IMPORTANCE_LEVEL_CHOICES}
UNIT_SPLIT_RE = re.compile(r"[\s,,、/;]+")
def _cell(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _single_line(value: Any, max_len: int = 255) -> str:
text = _cell(value)
if not text:
return ""
text = re.sub(r"\s+", " ", text)
return text[:max_len].strip()
def _parse_choice(value: Any, allowed_values: set) -> Optional[str]:
text = _cell(value)
if not text:
return None
return text if text in allowed_values else None
def _resolve_factory(unit_name: str, factory_cache: Dict[str, Optional[Factory]], unrecognized_factory: Factory) -> Tuple[Factory, bool, bool]:
if not unit_name:
return unrecognized_factory, True, False
if unit_name not in factory_cache:
candidates = [part.strip() for part in UNIT_SPLIT_RE.split(unit_name) if part.strip()]
search_terms = candidates or [unit_name]
matched_factory = None
for term in search_terms:
matched_factory = Factory.objects.filter(brand=term).first()
if matched_factory:
break
matched_factory = Factory.objects.filter(brand__icontains=term).first()
if matched_factory:
break
term_lower = term.lower()
for factory in Factory.objects.all():
brand = (factory.brand or "").lower()
if brand and brand in term_lower:
matched_factory = factory
break
if matched_factory:
break
factory_cache[unit_name] = matched_factory
factory = factory_cache[unit_name]
if factory:
return factory, False, False
created_factory = Factory.objects.create(
factory_name=unit_name,
brand=unit_name,
province="北京",
city="北京",
district="北京",
)
factory_cache[unit_name] = created_factory
return created_factory, False, True
def import_materials_plan_excel(file_obj) -> Dict[str, int]:
workbook = openpyxl.load_workbook(file_obj, read_only=True, data_only=True)
worksheet = workbook[workbook.sheetnames[0]]
rows = list(worksheet.iter_rows(values_only=True))
workbook.close()
if len(rows) < 3:
raise ValueError("Excel 内容不足,未找到表头或数据。")
header = [_cell(value) for value in rows[1]]
header_index = {name: idx for idx, name in enumerate(header) if name}
required_headers = ["材料大类", "细分种类", "材料名称", "材料单位名称"]
missing_headers = [name for name in required_headers if name not in header_index]
if missing_headers:
raise ValueError(f"缺少必要表头: {', '.join(missing_headers)}")
def get(row: Tuple[Any, ...], key: str) -> str:
idx = header_index.get(key, -1)
if idx < 0 or idx >= len(row):
return ""
return _cell(row[idx])
unrecognized_factory, _ = Factory.objects.get_or_create(
brand="未识别的品牌",
defaults={
"factory_name": "未识别的品牌工厂",
"province": "-",
"city": "-",
},
)
created = 0
updated = 0
skipped = 0
unresolved_factory = 0
created_factory = 0
factory_cache: Dict[str, Optional[Factory]] = {}
current_major_category = ""
for row in rows[2:]:
if not row:
continue
row_values = [_cell(value) for value in row]
if not any(row_values):
continue
major_raw = get(row, "材料大类")
if major_raw:
current_major_category = major_raw
material_name = _single_line(get(row, "材料名称"))
material_category = _single_line(get(row, "细分种类"))
if not material_name or not material_category or not current_major_category:
skipped += 1
continue
unit_name = get(row, "材料单位名称")
factory, is_unresolved, is_created_factory = _resolve_factory(unit_name, factory_cache, unrecognized_factory)
if is_unresolved:
unresolved_factory += 1
if is_created_factory:
created_factory += 1
defaults = {
"stage": _parse_choice(get(row, "阶段"), STAGE_VALUES),
"importance_level": _parse_choice(get(row, "重要等级"), IMPORTANCE_LEVEL_VALUES),
"landing_project": _single_line(get(row, "落地项目")) or None,
"contact_person": _single_line(get(row, "对接人")) or None,
"contact_phone": _single_line(get(row, "对接人联系方式")) or None,
"handler": _single_line(get(row, "经办人")) or None,
"remark": _single_line(get(row, "备注")) or None,
"factory": factory,
"status": "approved",
}
material, created_flag = Material.objects.update_or_create(
name=material_name,
major_category=MAJOR_CATEGORY_MAP.get(current_major_category, "architecture"),
material_category=material_category,
factory=factory,
defaults=defaults,
)
if created_flag:
created += 1
else:
updated += 1
return {
"created": created,
"updated": updated,
"skipped": skipped,
"unresolved_factory": unresolved_factory,
"created_factory": created_factory,
}