417 lines
15 KiB
Python
417 lines
15 KiB
Python
import re
|
||
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import openpyxl
|
||
from django.contrib.auth import get_user_model
|
||
|
||
from apps.factory.models import Factory
|
||
from apps.material.models import Material
|
||
|
||
|
||
MAJOR_CATEGORY_MAP = {
|
||
"建筑": "architecture",
|
||
"景观": "landscape",
|
||
"设备": "equipment",
|
||
"装修": "decoration",
|
||
"室内": "decoration",
|
||
}
|
||
|
||
UNIT_SPLIT_RE = re.compile(r"[\s,,、/;;]+")
|
||
MULTI_VALUE_SPLIT_RE = re.compile(r"[\n,,、/;;]+")
|
||
INITIAL_PASSWORD = "abc!0000"
|
||
HEADER_ALIASES = {
|
||
"major_category": ("材料大类", "专业类别"),
|
||
"material_category": ("细分种类", "材料分类"),
|
||
"material_subcategory": ("材料子类", "材料子分类"),
|
||
"material_name": ("材料名称",),
|
||
"unit_name": ("材料单位名称", "所属工厂", "品牌"),
|
||
"factory_name": ("工厂全称", "生产工厂全称"),
|
||
"stage": ("阶段",),
|
||
"importance_level": ("重要等级",),
|
||
"landing_project": ("落地项目",),
|
||
"contact_person": ("对接人",),
|
||
"contact_phone": ("对接人联系方式", "联系方式", "联系电话"),
|
||
"handler": ("经办人",),
|
||
"remark": ("备注",),
|
||
"spec": ("规格型号",),
|
||
"standard": ("符合标准",),
|
||
"application_scene": ("应用场景",),
|
||
"application_desc": ("应用说明", "应用场景说明"),
|
||
"replace_type": ("替代材料类型",),
|
||
"advantage": ("竞争优势",),
|
||
"advantage_desc": ("优势说明",),
|
||
"cost_compare": ("成本对比(%)", "成本对比"),
|
||
"cost_desc": ("成本说明",),
|
||
"cases": ("案例",),
|
||
"quality_level": ("质量等级",),
|
||
"durability_level": ("耐久等级",),
|
||
"eco_level": ("环保等级",),
|
||
"carbon_level": ("低碳等级",),
|
||
"score_level": ("总评分",),
|
||
"connection_method": ("连接方式",),
|
||
"construction_method": ("施工工艺",),
|
||
"limit_condition": ("限制条件",),
|
||
"status": ("状态",),
|
||
}
|
||
MAJOR_CATEGORY_VALUE_MAP = {str(value): value for value, _ in Material.MAJOR_CATEGORY_CHOICES}
|
||
MAJOR_CATEGORY_VALUE_MAP.update({label: value for value, label in Material.MAJOR_CATEGORY_CHOICES})
|
||
MAJOR_CATEGORY_VALUE_MAP["室内"] = "decoration"
|
||
STAGE_VALUE_MAP = {str(value): value for value, _ in Material.STAGE_CHOICES}
|
||
STAGE_VALUE_MAP.update({label: value for value, label in Material.STAGE_CHOICES})
|
||
IMPORTANCE_LEVEL_VALUE_MAP = {str(value): value for value, _ in Material.IMPORTANCE_LEVEL_CHOICES}
|
||
IMPORTANCE_LEVEL_VALUE_MAP.update({label: value for value, label in Material.IMPORTANCE_LEVEL_CHOICES})
|
||
REPLACE_TYPE_VALUE_MAP = {str(value): value for value, _ in Material.REPLACE_TYPE_CHOICES}
|
||
REPLACE_TYPE_VALUE_MAP.update({label: value for value, label in Material.REPLACE_TYPE_CHOICES})
|
||
APPLICATION_SCENE_VALUE_MAP = {str(value): value for value, _ in Material.APPLICATION_SCENE_CHOICES}
|
||
APPLICATION_SCENE_VALUE_MAP.update({label: value for value, label in Material.APPLICATION_SCENE_CHOICES})
|
||
ADVANTAGE_VALUE_MAP = {str(value): value for value, _ in Material.ADVANTAGE_CHOICES}
|
||
ADVANTAGE_VALUE_MAP.update({label: value for value, label in Material.ADVANTAGE_CHOICES})
|
||
STAR_LEVEL_VALUE_MAP = {str(value): value for value, _ in Material.STAR_LEVEL_CHOICES}
|
||
STAR_LEVEL_VALUE_MAP.update({label: value for value, label in Material.STAR_LEVEL_CHOICES})
|
||
STATUS_VALUE_MAP = {str(value): value for value, _ in Material.STATUS_CHOICES}
|
||
STATUS_VALUE_MAP.update({label: value for value, label in Material.STATUS_CHOICES})
|
||
DECIMAL_2_PLACES = Decimal("0.01")
|
||
MAX_COST_COMPARE = Decimal("999.99")
|
||
|
||
|
||
def _cell(value: Any) -> str:
|
||
if value is None:
|
||
return ""
|
||
return str(value).strip()
|
||
|
||
|
||
def _normalize_header(value: Any) -> str:
|
||
return re.sub(r"\s+", "", _cell(value))
|
||
|
||
|
||
def _single_line(value: Any, max_len: int = 255) -> str:
|
||
text = _cell(value)
|
||
if not text:
|
||
return ""
|
||
text = re.sub(r"\s+", " ", text)
|
||
return text[:max_len].strip()
|
||
|
||
|
||
def _parse_mapped_choice(value: Any, mapping: Dict[str, Any]) -> Optional[Any]:
|
||
text = _cell(value)
|
||
if not text:
|
||
return None
|
||
return mapping.get(text)
|
||
|
||
|
||
def _parse_multi_choice(value: Any, mapping: Dict[str, Any]) -> List[Any]:
|
||
text = _cell(value)
|
||
if not text:
|
||
return []
|
||
|
||
parsed: List[Any] = []
|
||
seen = set()
|
||
for item in MULTI_VALUE_SPLIT_RE.split(text):
|
||
normalized = item.strip()
|
||
if not normalized:
|
||
continue
|
||
mapped = mapping.get(normalized)
|
||
if mapped is None or mapped in seen:
|
||
continue
|
||
parsed.append(mapped)
|
||
seen.add(mapped)
|
||
return parsed
|
||
|
||
|
||
def _parse_decimal(value: Any) -> Optional[Decimal]:
|
||
text = _cell(value)
|
||
if not text:
|
||
return None
|
||
normalized = text.replace("%", "").replace("%", "").replace(",", "").strip()
|
||
try:
|
||
parsed = Decimal(normalized).quantize(DECIMAL_2_PLACES, rounding=ROUND_HALF_UP)
|
||
except (InvalidOperation, ValueError):
|
||
return None
|
||
if parsed.copy_abs() > MAX_COST_COMPARE:
|
||
return None
|
||
return parsed
|
||
|
||
|
||
def _optional_text(value: Any) -> Optional[str]:
|
||
text = _cell(value)
|
||
return text or None
|
||
|
||
|
||
def _optional_single_line(value: Any, max_len: int = 255) -> Optional[str]:
|
||
text = _single_line(value, max_len=max_len)
|
||
return text or None
|
||
|
||
|
||
def _find_header_row(rows: List[Tuple[Any, ...]]) -> Tuple[int, Dict[str, int]]:
|
||
required_fields = ("major_category", "material_category", "material_name", "unit_name")
|
||
|
||
for row_index, row in enumerate(rows[:10]):
|
||
header_index = {
|
||
_normalize_header(name): idx
|
||
for idx, name in enumerate(row)
|
||
if _normalize_header(name)
|
||
}
|
||
if all(any(_normalize_header(alias) in header_index for alias in HEADER_ALIASES[field]) for field in required_fields):
|
||
return row_index, header_index
|
||
|
||
preferred_headers = [HEADER_ALIASES[field][0] for field in required_fields]
|
||
raise ValueError(f"缺少必要表头: {', '.join(preferred_headers)}")
|
||
|
||
|
||
def _unique_username(user_model, base: str) -> str:
|
||
if not user_model.objects.filter(username=base).exists():
|
||
return base
|
||
for index in range(2, 10000):
|
||
candidate = f"{base}{index}"
|
||
if not user_model.objects.filter(username=candidate).exists():
|
||
return candidate
|
||
raise RuntimeError(f"无法为账号分配唯一用户名: {base}")
|
||
|
||
|
||
def _ensure_factory_user(factory: Factory, unit_name: str) -> bool:
|
||
user_model = get_user_model()
|
||
existing_user = user_model.objects.filter(role="user", factory_id=factory.id).order_by("id").first()
|
||
if existing_user:
|
||
return False
|
||
|
||
username_base = _single_line(unit_name, max_len=150) or f"factory{factory.id}"
|
||
username = _unique_username(user_model, username_base)
|
||
user_model.objects.create_user(
|
||
username=username,
|
||
password=INITIAL_PASSWORD,
|
||
role="user",
|
||
factory=factory,
|
||
)
|
||
return True
|
||
|
||
|
||
def _resolve_factory(
|
||
unit_name: str,
|
||
factory_name: str,
|
||
factory_cache: Dict[Tuple[str, str], Optional[Factory]],
|
||
unrecognized_factory: Factory,
|
||
) -> Tuple[Factory, bool, bool]:
|
||
if not unit_name and not factory_name:
|
||
return unrecognized_factory, True, False
|
||
|
||
cache_key = (unit_name, factory_name)
|
||
|
||
if cache_key not in factory_cache:
|
||
search_terms: List[str] = []
|
||
for source in (unit_name, factory_name):
|
||
if not source:
|
||
continue
|
||
search_terms.append(source)
|
||
search_terms.extend(part.strip() for part in UNIT_SPLIT_RE.split(source) if part.strip())
|
||
|
||
deduped_terms: List[str] = []
|
||
seen_terms = set()
|
||
for term in search_terms:
|
||
if term not in seen_terms:
|
||
deduped_terms.append(term)
|
||
seen_terms.add(term)
|
||
|
||
matched_factory = None
|
||
all_factories = list(Factory.objects.all())
|
||
for term in deduped_terms:
|
||
matched_factory = Factory.objects.filter(brand=term).first()
|
||
if matched_factory:
|
||
break
|
||
matched_factory = Factory.objects.filter(factory_name=term).first()
|
||
if matched_factory:
|
||
break
|
||
matched_factory = Factory.objects.filter(brand__icontains=term).first()
|
||
if matched_factory:
|
||
break
|
||
matched_factory = Factory.objects.filter(factory_name__icontains=term).first()
|
||
if matched_factory:
|
||
break
|
||
term_lower = term.lower()
|
||
for factory in all_factories:
|
||
short_name = (factory.short_name or "").lower()
|
||
full_name = (factory.factory_name or "").lower()
|
||
if (short_name and short_name in term_lower) or (full_name and full_name in term_lower):
|
||
matched_factory = factory
|
||
break
|
||
if matched_factory:
|
||
break
|
||
|
||
factory_cache[cache_key] = matched_factory
|
||
|
||
factory = factory_cache[cache_key]
|
||
if factory:
|
||
return factory, False, False
|
||
|
||
brand = _single_line(unit_name or factory_name, max_len=100)
|
||
full_name = _single_line(factory_name or unit_name, max_len=255) or brand
|
||
created_factory = Factory.objects.create(
|
||
factory_name=full_name,
|
||
brand=brand,
|
||
province="北京",
|
||
city="北京",
|
||
district="北京",
|
||
)
|
||
_ensure_factory_user(created_factory, brand or full_name)
|
||
factory_cache[cache_key] = created_factory
|
||
return created_factory, False, True
|
||
|
||
|
||
def import_materials_plan_excel(file_obj) -> Dict[str, int]:
|
||
workbook = openpyxl.load_workbook(file_obj, read_only=True, data_only=True)
|
||
worksheet = workbook[workbook.sheetnames[0]]
|
||
rows = list(worksheet.iter_rows(values_only=True))
|
||
workbook.close()
|
||
|
||
if len(rows) < 2:
|
||
raise ValueError("Excel 内容不足,未找到表头或数据。")
|
||
|
||
header_row_index, header_index = _find_header_row(rows)
|
||
has_status_column = any(_normalize_header(alias) in header_index for alias in HEADER_ALIASES["status"])
|
||
|
||
def get(row: Tuple[Any, ...], field: str) -> str:
|
||
for alias in HEADER_ALIASES.get(field, ()):
|
||
idx = header_index.get(_normalize_header(alias), -1)
|
||
if 0 <= idx < len(row):
|
||
value = _cell(row[idx])
|
||
if value:
|
||
return value
|
||
return ""
|
||
|
||
unrecognized_factory, _ = Factory.objects.get_or_create(
|
||
brand="未识别的品牌",
|
||
defaults={
|
||
"factory_name": "未识别的品牌工厂",
|
||
"province": "-",
|
||
"city": "-",
|
||
},
|
||
)
|
||
|
||
created = 0
|
||
updated = 0
|
||
skipped = 0
|
||
unresolved_factory = 0
|
||
created_factory = 0
|
||
created_user = 0
|
||
factory_cache: Dict[Tuple[str, str], Optional[Factory]] = {}
|
||
current_major_category = ""
|
||
|
||
for row in rows[header_row_index + 1:]:
|
||
if not row:
|
||
continue
|
||
|
||
row_values = [_cell(value) for value in row]
|
||
if not any(row_values):
|
||
continue
|
||
|
||
major_raw = get(row, "major_category")
|
||
if major_raw:
|
||
current_major_category = major_raw
|
||
|
||
material_name = _single_line(get(row, "material_name"))
|
||
material_category = _single_line(get(row, "material_category"))
|
||
if not material_name or not material_category or not current_major_category:
|
||
skipped += 1
|
||
continue
|
||
|
||
major_category_value = MAJOR_CATEGORY_VALUE_MAP.get(
|
||
current_major_category,
|
||
MAJOR_CATEGORY_MAP.get(current_major_category, "architecture"),
|
||
)
|
||
|
||
unit_name = get(row, "unit_name")
|
||
factory_name = get(row, "factory_name")
|
||
factory, is_unresolved, is_created_factory = _resolve_factory(unit_name, factory_name, factory_cache, unrecognized_factory)
|
||
if is_unresolved:
|
||
unresolved_factory += 1
|
||
if is_created_factory:
|
||
created_factory += 1
|
||
created_user += 1
|
||
|
||
defaults = {"factory": factory}
|
||
|
||
material_subcategory = _optional_single_line(get(row, "material_subcategory"))
|
||
if material_subcategory is not None:
|
||
defaults["material_subcategory"] = material_subcategory
|
||
|
||
stage = _parse_mapped_choice(get(row, "stage"), STAGE_VALUE_MAP)
|
||
if stage is not None:
|
||
defaults["stage"] = stage
|
||
|
||
importance_level = _parse_mapped_choice(get(row, "importance_level"), IMPORTANCE_LEVEL_VALUE_MAP)
|
||
if importance_level is not None:
|
||
defaults["importance_level"] = importance_level
|
||
|
||
for field_name in ("landing_project", "contact_person", "contact_phone", "handler", "remark", "spec", "standard", "connection_method", "construction_method"):
|
||
value = _optional_single_line(get(row, field_name))
|
||
if value is not None:
|
||
defaults[field_name] = value
|
||
|
||
for field_name in ("application_desc", "advantage_desc", "cost_desc", "cases", "limit_condition"):
|
||
value = _optional_text(get(row, field_name))
|
||
if value is not None:
|
||
defaults[field_name] = value
|
||
|
||
application_scene = _parse_multi_choice(get(row, "application_scene"), APPLICATION_SCENE_VALUE_MAP)
|
||
if application_scene:
|
||
defaults["application_scene"] = application_scene
|
||
|
||
replace_type = _parse_mapped_choice(get(row, "replace_type"), REPLACE_TYPE_VALUE_MAP)
|
||
if replace_type is not None:
|
||
defaults["replace_type"] = replace_type
|
||
|
||
advantage = _parse_multi_choice(get(row, "advantage"), ADVANTAGE_VALUE_MAP)
|
||
if advantage:
|
||
defaults["advantage"] = advantage
|
||
|
||
cost_compare = _parse_decimal(get(row, "cost_compare"))
|
||
if cost_compare is not None:
|
||
defaults["cost_compare"] = cost_compare
|
||
|
||
for source_field, target_field in (
|
||
("quality_level", "quality_level"),
|
||
("durability_level", "durability_level"),
|
||
("eco_level", "eco_level"),
|
||
("carbon_level", "carbon_level"),
|
||
("score_level", "score_level"),
|
||
):
|
||
star_level = _parse_mapped_choice(get(row, source_field), STAR_LEVEL_VALUE_MAP)
|
||
if star_level is not None:
|
||
defaults[target_field] = star_level
|
||
|
||
if has_status_column:
|
||
status_value = _parse_mapped_choice(get(row, "status"), STATUS_VALUE_MAP)
|
||
if status_value is not None:
|
||
defaults["status"] = status_value
|
||
else:
|
||
defaults["status"] = "approved"
|
||
|
||
existing_material = Material.objects.filter(
|
||
name=material_name,
|
||
major_category=major_category_value,
|
||
material_category=material_category,
|
||
).order_by("-updated_at", "-id").first()
|
||
|
||
if existing_material:
|
||
for field_name, value in defaults.items():
|
||
setattr(existing_material, field_name, value)
|
||
existing_material.save()
|
||
updated += 1
|
||
else:
|
||
Material.objects.create(
|
||
name=material_name,
|
||
major_category=major_category_value,
|
||
material_category=material_category,
|
||
**defaults,
|
||
)
|
||
created += 1
|
||
|
||
return {
|
||
"created": created,
|
||
"updated": updated,
|
||
"skipped": skipped,
|
||
"unresolved_factory": unresolved_factory,
|
||
"created_factory": created_factory,
|
||
"created_user": created_user,
|
||
}
|