216 lines
6.9 KiB
Python
216 lines
6.9 KiB
Python
import re
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import openpyxl
|
||
from django.contrib.auth import get_user_model
|
||
|
||
from apps.factory.models import Factory
|
||
from apps.material.models import Material
|
||
|
||
|
||
MAJOR_CATEGORY_MAP = {
|
||
"建筑": "architecture",
|
||
"景观": "landscape",
|
||
"设备": "equipment",
|
||
"装修": "decoration",
|
||
"室内": "decoration",
|
||
}
|
||
|
||
STAGE_VALUES = {choice[0] for choice in Material.STAGE_CHOICES}
|
||
IMPORTANCE_LEVEL_VALUES = {choice[0] for choice in Material.IMPORTANCE_LEVEL_CHOICES}
|
||
UNIT_SPLIT_RE = re.compile(r"[\s,,、/;;]+")
|
||
INITIAL_PASSWORD = "abc!0000"
|
||
|
||
|
||
def _cell(value: Any) -> str:
|
||
if value is None:
|
||
return ""
|
||
return str(value).strip()
|
||
|
||
|
||
def _single_line(value: Any, max_len: int = 255) -> str:
|
||
text = _cell(value)
|
||
if not text:
|
||
return ""
|
||
text = re.sub(r"\s+", " ", text)
|
||
return text[:max_len].strip()
|
||
|
||
|
||
def _parse_choice(value: Any, allowed_values: set) -> Optional[str]:
|
||
text = _cell(value)
|
||
if not text:
|
||
return None
|
||
return text if text in allowed_values else None
|
||
|
||
|
||
def _unique_username(user_model, base: str) -> str:
|
||
if not user_model.objects.filter(username=base).exists():
|
||
return base
|
||
for index in range(2, 10000):
|
||
candidate = f"{base}{index}"
|
||
if not user_model.objects.filter(username=candidate).exists():
|
||
return candidate
|
||
raise RuntimeError(f"无法为账号分配唯一用户名: {base}")
|
||
|
||
|
||
def _ensure_factory_user(factory: Factory, unit_name: str) -> bool:
|
||
user_model = get_user_model()
|
||
existing_user = user_model.objects.filter(role="user", factory_id=factory.id).order_by("id").first()
|
||
if existing_user:
|
||
return False
|
||
|
||
username_base = _single_line(unit_name, max_len=150) or f"factory{factory.id}"
|
||
username = _unique_username(user_model, username_base)
|
||
user_model.objects.create_user(
|
||
username=username,
|
||
password=INITIAL_PASSWORD,
|
||
role="user",
|
||
factory=factory,
|
||
)
|
||
return True
|
||
|
||
|
||
def _resolve_factory(unit_name: str, factory_cache: Dict[str, Optional[Factory]], unrecognized_factory: Factory) -> Tuple[Factory, bool, bool]:
|
||
if not unit_name:
|
||
return unrecognized_factory, True, False
|
||
|
||
if unit_name not in factory_cache:
|
||
candidates = [part.strip() for part in UNIT_SPLIT_RE.split(unit_name) if part.strip()]
|
||
search_terms = candidates or [unit_name]
|
||
|
||
matched_factory = None
|
||
for term in search_terms:
|
||
matched_factory = Factory.objects.filter(brand=term).first()
|
||
if matched_factory:
|
||
break
|
||
matched_factory = Factory.objects.filter(brand__icontains=term).first()
|
||
if matched_factory:
|
||
break
|
||
term_lower = term.lower()
|
||
for factory in Factory.objects.all():
|
||
brand = (factory.brand or "").lower()
|
||
if brand and brand in term_lower:
|
||
matched_factory = factory
|
||
break
|
||
if matched_factory:
|
||
break
|
||
|
||
factory_cache[unit_name] = matched_factory
|
||
|
||
factory = factory_cache[unit_name]
|
||
if factory:
|
||
return factory, False, False
|
||
|
||
created_factory = Factory.objects.create(
|
||
factory_name=unit_name,
|
||
brand=unit_name,
|
||
province="北京",
|
||
city="北京",
|
||
district="北京",
|
||
)
|
||
_ensure_factory_user(created_factory, unit_name)
|
||
factory_cache[unit_name] = created_factory
|
||
return created_factory, False, True
|
||
|
||
|
||
def import_materials_plan_excel(file_obj) -> Dict[str, int]:
|
||
workbook = openpyxl.load_workbook(file_obj, read_only=True, data_only=True)
|
||
worksheet = workbook[workbook.sheetnames[0]]
|
||
rows = list(worksheet.iter_rows(values_only=True))
|
||
workbook.close()
|
||
|
||
if len(rows) < 3:
|
||
raise ValueError("Excel 内容不足,未找到表头或数据。")
|
||
|
||
header = [_cell(value) for value in rows[1]]
|
||
header_index = {name: idx for idx, name in enumerate(header) if name}
|
||
|
||
required_headers = ["材料大类", "细分种类", "材料名称", "材料单位名称"]
|
||
missing_headers = [name for name in required_headers if name not in header_index]
|
||
if missing_headers:
|
||
raise ValueError(f"缺少必要表头: {', '.join(missing_headers)}")
|
||
|
||
def get(row: Tuple[Any, ...], key: str) -> str:
|
||
idx = header_index.get(key, -1)
|
||
if idx < 0 or idx >= len(row):
|
||
return ""
|
||
return _cell(row[idx])
|
||
|
||
unrecognized_factory, _ = Factory.objects.get_or_create(
|
||
brand="未识别的品牌",
|
||
defaults={
|
||
"factory_name": "未识别的品牌工厂",
|
||
"province": "-",
|
||
"city": "-",
|
||
},
|
||
)
|
||
|
||
created = 0
|
||
updated = 0
|
||
skipped = 0
|
||
unresolved_factory = 0
|
||
created_factory = 0
|
||
created_user = 0
|
||
factory_cache: Dict[str, Optional[Factory]] = {}
|
||
current_major_category = ""
|
||
|
||
for row in rows[2:]:
|
||
if not row:
|
||
continue
|
||
|
||
row_values = [_cell(value) for value in row]
|
||
if not any(row_values):
|
||
continue
|
||
|
||
major_raw = get(row, "材料大类")
|
||
if major_raw:
|
||
current_major_category = major_raw
|
||
|
||
material_name = _single_line(get(row, "材料名称"))
|
||
material_category = _single_line(get(row, "细分种类"))
|
||
if not material_name or not material_category or not current_major_category:
|
||
skipped += 1
|
||
continue
|
||
|
||
unit_name = get(row, "材料单位名称")
|
||
factory, is_unresolved, is_created_factory = _resolve_factory(unit_name, factory_cache, unrecognized_factory)
|
||
if is_unresolved:
|
||
unresolved_factory += 1
|
||
if is_created_factory:
|
||
created_factory += 1
|
||
created_user += 1
|
||
|
||
defaults = {
|
||
"stage": _parse_choice(get(row, "阶段"), STAGE_VALUES),
|
||
"importance_level": _parse_choice(get(row, "重要等级"), IMPORTANCE_LEVEL_VALUES),
|
||
"landing_project": _single_line(get(row, "落地项目")) or None,
|
||
"contact_person": _single_line(get(row, "对接人")) or None,
|
||
"contact_phone": _single_line(get(row, "对接人联系方式")) or None,
|
||
"handler": _single_line(get(row, "经办人")) or None,
|
||
"remark": _single_line(get(row, "备注")) or None,
|
||
"factory": factory,
|
||
"status": "approved",
|
||
}
|
||
|
||
material, created_flag = Material.objects.update_or_create(
|
||
name=material_name,
|
||
major_category=MAJOR_CATEGORY_MAP.get(current_major_category, "architecture"),
|
||
material_category=material_category,
|
||
factory=factory,
|
||
defaults=defaults,
|
||
)
|
||
|
||
if created_flag:
|
||
created += 1
|
||
else:
|
||
updated += 1
|
||
|
||
return {
|
||
"created": created,
|
||
"updated": updated,
|
||
"skipped": skipped,
|
||
"unresolved_factory": unresolved_factory,
|
||
"created_factory": created_factory,
|
||
"created_user": created_user,
|
||
}
|