mat/backend/apps/material/importers.py

417 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from typing import Any, Dict, List, Optional, Tuple
import openpyxl
from django.contrib.auth import get_user_model
from apps.factory.models import Factory
from apps.material.models import Material
MAJOR_CATEGORY_MAP = {
"建筑": "architecture",
"景观": "landscape",
"设备": "equipment",
"装修": "decoration",
"室内": "decoration",
}
UNIT_SPLIT_RE = re.compile(r"[\s,,、/;]+")
MULTI_VALUE_SPLIT_RE = re.compile(r"[\n,,、/;]+")
INITIAL_PASSWORD = "abc!0000"
HEADER_ALIASES = {
"major_category": ("材料大类", "专业类别"),
"material_category": ("细分种类", "材料分类"),
"material_subcategory": ("材料子类", "材料子分类"),
"material_name": ("材料名称",),
"unit_name": ("材料单位名称", "所属工厂", "品牌"),
"factory_name": ("工厂全称", "生产工厂全称"),
"stage": ("阶段",),
"importance_level": ("重要等级",),
"landing_project": ("落地项目",),
"contact_person": ("对接人",),
"contact_phone": ("对接人联系方式", "联系方式", "联系电话"),
"handler": ("经办人",),
"remark": ("备注",),
"spec": ("规格型号",),
"standard": ("符合标准",),
"application_scene": ("应用场景",),
"application_desc": ("应用说明", "应用场景说明"),
"replace_type": ("替代材料类型",),
"advantage": ("竞争优势",),
"advantage_desc": ("优势说明",),
"cost_compare": ("成本对比(%)", "成本对比"),
"cost_desc": ("成本说明",),
"cases": ("案例",),
"quality_level": ("质量等级",),
"durability_level": ("耐久等级",),
"eco_level": ("环保等级",),
"carbon_level": ("低碳等级",),
"score_level": ("总评分",),
"connection_method": ("连接方式",),
"construction_method": ("施工工艺",),
"limit_condition": ("限制条件",),
"status": ("状态",),
}
MAJOR_CATEGORY_VALUE_MAP = {str(value): value for value, _ in Material.MAJOR_CATEGORY_CHOICES}
MAJOR_CATEGORY_VALUE_MAP.update({label: value for value, label in Material.MAJOR_CATEGORY_CHOICES})
MAJOR_CATEGORY_VALUE_MAP["室内"] = "decoration"
STAGE_VALUE_MAP = {str(value): value for value, _ in Material.STAGE_CHOICES}
STAGE_VALUE_MAP.update({label: value for value, label in Material.STAGE_CHOICES})
IMPORTANCE_LEVEL_VALUE_MAP = {str(value): value for value, _ in Material.IMPORTANCE_LEVEL_CHOICES}
IMPORTANCE_LEVEL_VALUE_MAP.update({label: value for value, label in Material.IMPORTANCE_LEVEL_CHOICES})
REPLACE_TYPE_VALUE_MAP = {str(value): value for value, _ in Material.REPLACE_TYPE_CHOICES}
REPLACE_TYPE_VALUE_MAP.update({label: value for value, label in Material.REPLACE_TYPE_CHOICES})
APPLICATION_SCENE_VALUE_MAP = {str(value): value for value, _ in Material.APPLICATION_SCENE_CHOICES}
APPLICATION_SCENE_VALUE_MAP.update({label: value for value, label in Material.APPLICATION_SCENE_CHOICES})
ADVANTAGE_VALUE_MAP = {str(value): value for value, _ in Material.ADVANTAGE_CHOICES}
ADVANTAGE_VALUE_MAP.update({label: value for value, label in Material.ADVANTAGE_CHOICES})
STAR_LEVEL_VALUE_MAP = {str(value): value for value, _ in Material.STAR_LEVEL_CHOICES}
STAR_LEVEL_VALUE_MAP.update({label: value for value, label in Material.STAR_LEVEL_CHOICES})
STATUS_VALUE_MAP = {str(value): value for value, _ in Material.STATUS_CHOICES}
STATUS_VALUE_MAP.update({label: value for value, label in Material.STATUS_CHOICES})
DECIMAL_2_PLACES = Decimal("0.01")
MAX_COST_COMPARE = Decimal("999.99")
def _cell(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _normalize_header(value: Any) -> str:
return re.sub(r"\s+", "", _cell(value))
def _single_line(value: Any, max_len: int = 255) -> str:
text = _cell(value)
if not text:
return ""
text = re.sub(r"\s+", " ", text)
return text[:max_len].strip()
def _parse_mapped_choice(value: Any, mapping: Dict[str, Any]) -> Optional[Any]:
text = _cell(value)
if not text:
return None
return mapping.get(text)
def _parse_multi_choice(value: Any, mapping: Dict[str, Any]) -> List[Any]:
text = _cell(value)
if not text:
return []
parsed: List[Any] = []
seen = set()
for item in MULTI_VALUE_SPLIT_RE.split(text):
normalized = item.strip()
if not normalized:
continue
mapped = mapping.get(normalized)
if mapped is None or mapped in seen:
continue
parsed.append(mapped)
seen.add(mapped)
return parsed
def _parse_decimal(value: Any) -> Optional[Decimal]:
text = _cell(value)
if not text:
return None
normalized = text.replace("%", "").replace("", "").replace(",", "").strip()
try:
parsed = Decimal(normalized).quantize(DECIMAL_2_PLACES, rounding=ROUND_HALF_UP)
except (InvalidOperation, ValueError):
return None
if parsed.copy_abs() > MAX_COST_COMPARE:
return None
return parsed
def _optional_text(value: Any) -> Optional[str]:
text = _cell(value)
return text or None
def _optional_single_line(value: Any, max_len: int = 255) -> Optional[str]:
text = _single_line(value, max_len=max_len)
return text or None
def _find_header_row(rows: List[Tuple[Any, ...]]) -> Tuple[int, Dict[str, int]]:
required_fields = ("major_category", "material_category", "material_name", "unit_name")
for row_index, row in enumerate(rows[:10]):
header_index = {
_normalize_header(name): idx
for idx, name in enumerate(row)
if _normalize_header(name)
}
if all(any(_normalize_header(alias) in header_index for alias in HEADER_ALIASES[field]) for field in required_fields):
return row_index, header_index
preferred_headers = [HEADER_ALIASES[field][0] for field in required_fields]
raise ValueError(f"缺少必要表头: {', '.join(preferred_headers)}")
def _unique_username(user_model, base: str) -> str:
if not user_model.objects.filter(username=base).exists():
return base
for index in range(2, 10000):
candidate = f"{base}{index}"
if not user_model.objects.filter(username=candidate).exists():
return candidate
raise RuntimeError(f"无法为账号分配唯一用户名: {base}")
def _ensure_factory_user(factory: Factory, unit_name: str) -> bool:
user_model = get_user_model()
existing_user = user_model.objects.filter(role="user", factory_id=factory.id).order_by("id").first()
if existing_user:
return False
username_base = _single_line(unit_name, max_len=150) or f"factory{factory.id}"
username = _unique_username(user_model, username_base)
user_model.objects.create_user(
username=username,
password=INITIAL_PASSWORD,
role="user",
factory=factory,
)
return True
def _resolve_factory(
unit_name: str,
factory_name: str,
factory_cache: Dict[Tuple[str, str], Optional[Factory]],
unrecognized_factory: Factory,
) -> Tuple[Factory, bool, bool]:
if not unit_name and not factory_name:
return unrecognized_factory, True, False
cache_key = (unit_name, factory_name)
if cache_key not in factory_cache:
search_terms: List[str] = []
for source in (unit_name, factory_name):
if not source:
continue
search_terms.append(source)
search_terms.extend(part.strip() for part in UNIT_SPLIT_RE.split(source) if part.strip())
deduped_terms: List[str] = []
seen_terms = set()
for term in search_terms:
if term not in seen_terms:
deduped_terms.append(term)
seen_terms.add(term)
matched_factory = None
all_factories = list(Factory.objects.all())
for term in deduped_terms:
matched_factory = Factory.objects.filter(brand=term).first()
if matched_factory:
break
matched_factory = Factory.objects.filter(factory_name=term).first()
if matched_factory:
break
matched_factory = Factory.objects.filter(brand__icontains=term).first()
if matched_factory:
break
matched_factory = Factory.objects.filter(factory_name__icontains=term).first()
if matched_factory:
break
term_lower = term.lower()
for factory in all_factories:
brand = (factory.brand or "").lower()
full_name = (factory.factory_name or "").lower()
if (brand and brand in term_lower) or (full_name and full_name in term_lower):
matched_factory = factory
break
if matched_factory:
break
factory_cache[cache_key] = matched_factory
factory = factory_cache[cache_key]
if factory:
return factory, False, False
brand = _single_line(unit_name or factory_name, max_len=100)
full_name = _single_line(factory_name or unit_name, max_len=255) or brand
created_factory = Factory.objects.create(
factory_name=full_name,
brand=brand,
province="北京",
city="北京",
district="北京",
)
_ensure_factory_user(created_factory, brand or full_name)
factory_cache[cache_key] = created_factory
return created_factory, False, True
def import_materials_plan_excel(file_obj) -> Dict[str, int]:
workbook = openpyxl.load_workbook(file_obj, read_only=True, data_only=True)
worksheet = workbook[workbook.sheetnames[0]]
rows = list(worksheet.iter_rows(values_only=True))
workbook.close()
if len(rows) < 2:
raise ValueError("Excel 内容不足,未找到表头或数据。")
header_row_index, header_index = _find_header_row(rows)
has_status_column = any(_normalize_header(alias) in header_index for alias in HEADER_ALIASES["status"])
def get(row: Tuple[Any, ...], field: str) -> str:
for alias in HEADER_ALIASES.get(field, ()):
idx = header_index.get(_normalize_header(alias), -1)
if 0 <= idx < len(row):
value = _cell(row[idx])
if value:
return value
return ""
unrecognized_factory, _ = Factory.objects.get_or_create(
brand="未识别的品牌",
defaults={
"factory_name": "未识别的品牌工厂",
"province": "-",
"city": "-",
},
)
created = 0
updated = 0
skipped = 0
unresolved_factory = 0
created_factory = 0
created_user = 0
factory_cache: Dict[Tuple[str, str], Optional[Factory]] = {}
current_major_category = ""
for row in rows[header_row_index + 1:]:
if not row:
continue
row_values = [_cell(value) for value in row]
if not any(row_values):
continue
major_raw = get(row, "major_category")
if major_raw:
current_major_category = major_raw
material_name = _single_line(get(row, "material_name"))
material_category = _single_line(get(row, "material_category"))
if not material_name or not material_category or not current_major_category:
skipped += 1
continue
major_category_value = MAJOR_CATEGORY_VALUE_MAP.get(
current_major_category,
MAJOR_CATEGORY_MAP.get(current_major_category, "architecture"),
)
unit_name = get(row, "unit_name")
factory_name = get(row, "factory_name")
factory, is_unresolved, is_created_factory = _resolve_factory(unit_name, factory_name, factory_cache, unrecognized_factory)
if is_unresolved:
unresolved_factory += 1
if is_created_factory:
created_factory += 1
created_user += 1
defaults = {"factory": factory}
material_subcategory = _optional_single_line(get(row, "material_subcategory"))
if material_subcategory is not None:
defaults["material_subcategory"] = material_subcategory
stage = _parse_mapped_choice(get(row, "stage"), STAGE_VALUE_MAP)
if stage is not None:
defaults["stage"] = stage
importance_level = _parse_mapped_choice(get(row, "importance_level"), IMPORTANCE_LEVEL_VALUE_MAP)
if importance_level is not None:
defaults["importance_level"] = importance_level
for field_name in ("landing_project", "contact_person", "contact_phone", "handler", "remark", "spec", "standard", "connection_method", "construction_method"):
value = _optional_single_line(get(row, field_name))
if value is not None:
defaults[field_name] = value
for field_name in ("application_desc", "advantage_desc", "cost_desc", "cases", "limit_condition"):
value = _optional_text(get(row, field_name))
if value is not None:
defaults[field_name] = value
application_scene = _parse_multi_choice(get(row, "application_scene"), APPLICATION_SCENE_VALUE_MAP)
if application_scene:
defaults["application_scene"] = application_scene
replace_type = _parse_mapped_choice(get(row, "replace_type"), REPLACE_TYPE_VALUE_MAP)
if replace_type is not None:
defaults["replace_type"] = replace_type
advantage = _parse_multi_choice(get(row, "advantage"), ADVANTAGE_VALUE_MAP)
if advantage:
defaults["advantage"] = advantage
cost_compare = _parse_decimal(get(row, "cost_compare"))
if cost_compare is not None:
defaults["cost_compare"] = cost_compare
for source_field, target_field in (
("quality_level", "quality_level"),
("durability_level", "durability_level"),
("eco_level", "eco_level"),
("carbon_level", "carbon_level"),
("score_level", "score_level"),
):
star_level = _parse_mapped_choice(get(row, source_field), STAR_LEVEL_VALUE_MAP)
if star_level is not None:
defaults[target_field] = star_level
if has_status_column:
status_value = _parse_mapped_choice(get(row, "status"), STATUS_VALUE_MAP)
if status_value is not None:
defaults["status"] = status_value
else:
defaults["status"] = "approved"
existing_material = Material.objects.filter(
name=material_name,
major_category=major_category_value,
material_category=material_category,
).order_by("-updated_at", "-id").first()
if existing_material:
for field_name, value in defaults.items():
setattr(existing_material, field_name, value)
existing_material.save()
updated += 1
else:
Material.objects.create(
name=material_name,
major_category=major_category_value,
material_category=material_category,
**defaults,
)
created += 1
return {
"created": created,
"updated": updated,
"skipped": skipped,
"unresolved_factory": unresolved_factory,
"created_factory": created_factory,
"created_user": created_user,
}