feat: 工厂简称改为品牌(唯一)、材料/工厂/分类导入命令、Excel材料导入(模糊匹配+未识别品牌工厂)

Made-with: Cursor
This commit is contained in:
caoqianming 2026-03-13 11:25:44 +08:00
parent b1c0e94ab1
commit b88510ca0a
19 changed files with 926 additions and 21 deletions

View File

@ -118,7 +118,7 @@ mat/
- dealer_name: 经销商名称
- product_category: 产品分类
- factory_name: 生产工厂全称
- factory_short_name: 工厂简称
- brand: 品牌(唯一)
- province: 省
- city: 市
- district: 区

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,284 @@
import re
import secrets
import sys
from dataclasses import dataclass
from typing import List, Optional, Tuple
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
from django.db import transaction
from apps.factory.models import Factory
_DEFAULT_INPUT = """生产工厂全称\t品牌\t省市\t详细地址\t官网链接
立邦涂料中国有限公司\t立邦中国\t上海市\t上海市浦东新区金桥出口加工区南区创业路287号\thttps://www.nipponpaint.com.cn/
广州立邦涂料有限公司\t广州立邦\t广东省广州市\t广州经济技术开发区风华二路1号\thttp://www.nipponpaint.com.cn
黑龙江嘉达建材科技发展有限公司\t黑龙江嘉达\t黑龙江省齐齐哈尔市\t黑龙江省齐齐哈尔市铁锋区北疆中小企业园孵化基地B区\t
吉林市嘉达慧宇建筑节能材料有限责任公司\t吉林嘉达\t吉林省吉林市\t吉林市吉林经济技术开发区政达街139号\t
深圳市荣耀达新材料科技有限公司 (隶属北京荣耀达科技控股有限公司)\t深圳荣耀达\t广东省深圳市\t深圳市福田区深业泰然大厦\t
上海典跃建材科技有限公司\t上海典跃\t上海市\t上海市闵行区万康路328号环普云创4号楼\twww.chinasrs.com.cn
深圳市百欧森环保科技股份有限公司\t百欧森\t广东省深圳市\t广东省深圳市南山区桃源街道塘朗工业区B区44栋厂房第二层B区\twww.baiousen.com
广东马可波罗陶瓷有限公司\t马可波罗\t广东省东莞市\t广东省东莞市沙田镇立沙东路66号502室\thttp://www.marcopolo.com.cn/
佛山市东鹏陶瓷发展有限公司\t东鹏发展\t广东省佛山市\t佛山市禅城区季华西路127号首层\thttp://www.dongpeng.net
广东新明珠陶瓷集团有限公司\t新明珠集团\t广东省佛山市\t佛山市禅城区南庄镇华夏陶瓷博览城\thttp://slab.newpearl.com/
蒙娜丽莎集团股份有限公司\t蒙娜丽莎\t广东省佛山市\t佛山市 (总部)\thttp://www.monalisa.com.cn/
佛山新中源陶瓷建材有限公司\t新中源\t广东省佛山市\t佛山市禅城区南庄镇石南大道190号中源企业大厦\thttps://www.newzhongyuan.com/
上海斯米克健康环境科技有限公司\t斯米克\t上海市\t未找到公开信息\thttp://www.cimictiles.com/
普纳尼亚集团陶瓷股份有限公司\t帕纳尼亚\t意大利\t意大利,蒙蒂那,41034芬那艾米丽,帕纳瑞巴萨街22/A\t
阿鲁克邦复合材料(江苏)有限公司\t阿鲁克邦\t江苏省常州市\t中国江苏常州钟楼区合欢南路10号\thttps://www.alucobond.com.cn
张家港飞腾复合新材料股份有限公司 (曾用名张家港飞腾铝塑板股份有限公司)\t张家港飞腾\t江苏省苏州市\t张家港保税区环保新材料产业园华达路77号\twww.feiteng.cn
江阴天虹板业有限公司 (虹铠甲为其产品系列名称)\t江阴天虹\t江苏省无锡市\t未找到公开信息\twww.sky-rainbow.com
佛山市铸辉金属装饰材料有限公司\t佛山铸辉\t广东省佛山市\t未找到公开信息\thttp://zhuhui.cn.makepolo.com
王力安防科技股份有限公司\t王力\t浙江省金华市\t未找到公开信息\thttp://www.wanglianfang.com
温州麦克辛石业有限公司\t温州麦克辛\t浙江省温州市\t公开信息未找到详细地址\t
青岛英博建筑装饰有限公司 (曾用名青岛英博石业有限公司)\t青岛英博\t山东省青岛市\t未找到公开信息\twww.yingbozhuangshi.com
广西碳歌环保新材料股份有限公司\t碳歌\t广西壮族自治区梧州市\t广西壮族自治区藤县中和陶瓷园区B01-02-01地块\twww.toco.cc
大巨龙集团 品牌归属典跃集团上海梦想空间文化科技有限公司\t大巨龙集团\t上海市\t上海市金山工业区金腾璐1106\twww.da-ju-long.com
深圳洛赛声学技术有限公司\t洛赛声学\t广东省深圳市\t未找到公开信息\thttp://www.clocell.com
甘肃世宁新型材料有限公司\t世宁新材\t甘肃省庆阳市\t甘肃省庆阳市宁县和盛镇工业集中区\t
罗姆上海有限公司 Röhm GmbH 中国全资子公司\t罗姆\t上海市\t上海市闵行区春东路55号1号楼\twww.roehm.com
韩国奥姆勒自动机械有限公司北京代表处\t奥姆勒\t北京市\t仅为代表处无生产工厂\t
北京博联建材机械厂\t博联\t北京市\t北京市昌平区南口镇南大街21号\t
广东新星联合科技有限公司\t广东新星联合\t广东省深圳市\t深圳市宝安区西乡街道共乐社区铁仔路50号凤凰智谷A栋1406\t
"""
_WS_RE = re.compile(r"\s+")
_TAB_SPLIT_RE = re.compile(r"\t+")
# Close to Django's allowed username chars: letters/digits/underscore and @.+-.
_USERNAME_ALLOWED_RE = re.compile(r"[^\w@.+-]+", flags=re.UNICODE)
@dataclass(frozen=True)
class Row:
factory_name: str
brand: str
province_raw: str
address: str
website_raw: str
def _norm(s: str) -> str:
return _WS_RE.sub(" ", (s or "").strip())
def _parse_province_city(s: str) -> Tuple[str, str]:
s = _norm(s)
if not s:
return "", ""
# e.g. "广东省广州市" -> ("广东省", "广州市")
if "" in s and s.endswith(""):
idx = s.find("")
if idx != -1 and idx + 1 < len(s):
return s[: idx + 1], s[idx + 1 :]
# e.g. "广西壮族自治区梧州市"
if "自治区" in s and s.endswith(""):
idx = s.find("自治区")
if idx != -1 and idx + 3 < len(s):
return s[: idx + 3], s[idx + 3 :]
# e.g. "北京市" / "上海市"
if s.endswith(""):
return s, s
# Other (e.g. "意大利")
return s, s
def _normalize_website(s: str) -> Optional[str]:
s = _norm(s)
if not s:
return None
if s.startswith(("http://", "https://")):
return s
return f"http://{s}"
def _parse_line(line: str) -> Optional[Row]:
raw = line.strip()
if not raw:
return None
if raw.startswith("生产工厂全称"):
return None
parts = [p for p in _TAB_SPLIT_RE.split(raw) if p is not None]
# tolerate missing trailing columns
while len(parts) < 5:
parts.append("")
factory_name = _norm(parts[0])
short_name = _norm(parts[1])
province_raw = _norm(parts[2])
address = _norm(parts[3])
website_raw = _norm(parts[4])
if not factory_name or not short_name:
return None
return Row(
factory_name=factory_name,
brand=short_name,
province_raw=province_raw,
address=address,
website_raw=website_raw,
)
def parse_text(text: str) -> List[Row]:
rows: List[Row] = []
for line in text.splitlines():
row = _parse_line(line)
if row:
rows.append(row)
return rows
def _make_username_base(brand: str) -> str:
base = _norm(brand)
base = base.replace(" ", "")
base = _USERNAME_ALLOWED_RE.sub("", base)
base = base.strip("._-@+")
if not base:
base = "user"
return base.lower()
def _unique_username(UserModel, base: str) -> str:
if not UserModel.objects.filter(username=base).exists():
return base
for i in range(2, 10000):
candidate = f"{base}{i}"
if not UserModel.objects.filter(username=candidate).exists():
return candidate
raise RuntimeError(f"Unable to allocate unique username for base={base!r}")
class Command(BaseCommand):
help = "Import factories and create one normal user per factory."
def add_arguments(self, parser):
parser.add_argument("--file", dest="file", default=None, help="Input TSV text file path.")
parser.add_argument("--stdin", action="store_true", help="Read TSV from stdin.")
parser.add_argument("--dry-run", action="store_true", help="Parse and report only; no DB writes.")
parser.add_argument(
"--password",
dest="password",
default=None,
help="Set the same initial password for all created users. If omitted, random passwords are generated.",
)
parser.add_argument(
"--print-credentials",
action="store_true",
help="Print created/ensured usernames and passwords as TSV to stdout.",
)
@transaction.atomic
def handle(self, *args, **options):
file_path = options.get("file")
use_stdin: bool = bool(options.get("stdin"))
dry_run: bool = bool(options.get("dry_run"))
fixed_password: Optional[str] = options.get("password") or None
print_credentials: bool = bool(options.get("print_credentials"))
if file_path:
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
elif use_stdin:
text = sys.stdin.read()
else:
text = _DEFAULT_INPUT
rows = parse_text(text)
if not rows:
self.stdout.write(self.style.WARNING("No valid rows parsed; nothing to import."))
return
unique_factories = {(r.factory_name, r.brand) for r in rows}
self.stdout.write(f"Parsed rows: {len(rows)}")
self.stdout.write(f"Unique factories: {len(unique_factories)}")
if dry_run:
self.stdout.write(self.style.SUCCESS("Dry run complete (no DB changes)."))
return
UserModel = get_user_model()
created_factories = 0
created_users = 0
ensured_users = 0
# header for credential output
creds: List[Tuple[str, str, str]] = [] # (brand, username, password_or_blank)
for r in rows:
province, city = _parse_province_city(r.province_raw)
website = _normalize_website(r.website_raw)
factory, f_created = Factory.objects.get_or_create(
brand=r.brand,
defaults={
"factory_name": r.factory_name,
"province": province or "未知",
"city": city or "未知",
"address": r.address or None,
"website": website,
"dealer_name": None,
"product_category": None,
},
)
if f_created:
created_factories += 1
else:
changed = False
if factory.province != (province or factory.province):
factory.province = province or factory.province
changed = True
if factory.city != (city or factory.city):
factory.city = city or factory.city
changed = True
if r.address and factory.address != r.address:
factory.address = r.address
changed = True
if website and factory.website != website:
factory.website = website
changed = True
if changed:
factory.save()
# One normal user per factory. If exists, keep it; otherwise create a new one.
existing_user = (
UserModel.objects.filter(role="user", factory_id=factory.id).order_by("id").first()
)
if existing_user:
ensured_users += 1
creds.append((factory.brand, existing_user.username, ""))
continue
base = _make_username_base(factory.brand)
username = _unique_username(UserModel, base)
password = fixed_password or secrets.token_urlsafe(12)
user = UserModel.objects.create_user(
username=username,
password=password,
role="user",
factory=factory,
)
created_users += 1
creds.append((factory.brand, user.username, password))
self.stdout.write(
self.style.SUCCESS(
f"Import complete. Created factories: {created_factories}, created users: {created_users}, ensured existing users: {ensured_users}."
)
)
if print_credentials:
self.stdout.write("品牌\t用户名\t初始密码(仅新建时输出)")
for short_name, username, password in creds:
self.stdout.write(f"{short_name}\t{username}\t{password}")

View File

@ -0,0 +1,23 @@
# Generated by Django 4.2.7
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('factory', '0002_make_dealer_name_optional'),
]
operations = [
migrations.RenameField(
model_name='factory',
old_name='factory_short_name',
new_name='brand',
),
migrations.AlterField(
model_name='factory',
name='brand',
field=models.CharField(max_length=100, unique=True, verbose_name='品牌'),
),
]

View File

@ -7,7 +7,7 @@ class Factory(models.Model):
dealer_name = models.CharField(max_length=255, blank=True, null=True, verbose_name='经销商名称')
product_category = models.CharField(max_length=255, blank=True, null=True, verbose_name='产品分类')
factory_name = models.CharField(max_length=255, verbose_name='生产工厂全称')
factory_short_name = models.CharField(max_length=100, verbose_name='工厂简称')
brand = models.CharField(max_length=100, unique=True, verbose_name='品牌')
province = models.CharField(max_length=50, verbose_name='')
city = models.CharField(max_length=50, verbose_name='')
district = models.CharField(max_length=50, blank=True, null=True, verbose_name='')

View File

@ -11,7 +11,7 @@ class FactorySerializer(serializers.ModelSerializer):
class Meta:
model = Factory
fields = ['id', 'dealer_name', 'product_category', 'factory_name',
'factory_short_name', 'province', 'city', 'district',
'brand', 'province', 'city', 'district',
'address', 'website', 'created_at', 'updated_at', 'material_count']
read_only_fields = ['id', 'created_at', 'updated_at', 'material_count']
@ -28,4 +28,4 @@ class FactoryListSerializer(serializers.ModelSerializer):
"""
class Meta:
model = Factory
fields = ['id', 'factory_name', 'factory_short_name', 'province', 'city', 'dealer_name']
fields = ['id', 'factory_name', 'brand', 'province', 'city', 'dealer_name']

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,233 @@
import re
import sys
from dataclasses import dataclass
from typing import List, Optional, Tuple
from django.core.management.base import BaseCommand
from django.db import transaction
from apps.material.models import MaterialCategory, MaterialSubcategory
_DEFAULT_INPUT = """
门窗\t智能门窗智能升降护栏
机电产品\t机电产品
外墙\t涂料
外墙\t涂料
外墙\t涂料
外墙\t涂料
外墙\t涂料
外墙\t涂料
外墙\t涂料
外墙\t涂料
内墙\t涂料
外墙\t涂料
外墙\t涂料
地坪\t地坪
地坪\t地坪
地坪\t水磨石
墙材\t墙板
电工\t电工类
瓷砖\t岩板
瓷砖\t大理石
瓷砖\t玻化砖
瓷砖\t云石代
石膏板\t脱硫石膏板
石膏板\t脱硫石膏板
石膏板\t脱硫石膏板
石膏板\t脱硫石膏板
石膏板\t石膏基纤维板
石膏板\t脱硫石膏板
龙骨\t轻钢龙骨
腻子\t腻子粉
腻子\t腻子粉
石膏板\t抹灰石膏
石膏板\t抹灰石膏
石膏板\t嵌缝石膏
石膏板\t石膏基自流平
石膏板\t石膏基自流平
\t瓷砖胶
\t瓷砖胶
\t瓷砖胶
固剂\t界面剂
固剂\t界面剂
腻子\t腻子
石膏板\t找平石膏
石膏板\t嵌缝石膏
涂料\t节能材料
涂料\t平面金属漆
涂料\t质感金属漆
门窗\t入户门
墙体\t保温板无机材料
墙体\t预制墙板无机材料
涂料\t涂料
板材\t人造板
装配式外墙 高分子材料保温类
板材\t人造板
板材\t人造板
涂料\t地下工程\t防水涂料
\t瓷砖胶
地毯\t商用地垫S垫疏水垫方块毯
"""
_LINE_SPLIT_RE = re.compile(r"\t+| {2,}")
_SUB_SPLIT_RE = re.compile(r"[、,,;]+")
_WS_RE = re.compile(r"\s+")
@dataclass(frozen=True)
class Row:
category: str
subcategories: Tuple[str, ...]
def _normalize_cell(s: str) -> str:
s = s.strip()
s = _WS_RE.sub(" ", s)
return s
def _parse_line(line: str) -> Optional[Row]:
raw = line.strip()
if not raw:
return None
if raw.startswith("材料分类"):
return None
parts = [p for p in _LINE_SPLIT_RE.split(raw) if p.strip()]
if len(parts) >= 2:
category = _normalize_cell(parts[0])
sub_raw = _normalize_cell(" ".join(parts[1:]))
else:
# Fallback: split by whitespace into 2 chunks
chunks = [c for c in _WS_RE.split(raw) if c.strip()]
if len(chunks) < 2:
return None
category = _normalize_cell(chunks[0])
sub_raw = _normalize_cell(" ".join(chunks[1:]))
subs: List[str] = []
for token in _SUB_SPLIT_RE.split(sub_raw):
token = _normalize_cell(token)
if token:
subs.append(token)
if not category or not subs:
return None
return Row(category=category, subcategories=tuple(subs))
def parse_text(text: str) -> List[Row]:
rows: List[Row] = []
for line in text.splitlines():
row = _parse_line(line)
if row:
rows.append(row)
return rows
def _category_value(category_name: str) -> str:
# Keep it human-readable; unique constraint is enforced at DB level.
return category_name
def _subcategory_value(category_name: str, subcategory_name: str) -> str:
# MaterialSubcategory.value is globally unique, so include category to avoid collisions.
value = f"{category_name}/{subcategory_name}"
return value[:255]
class Command(BaseCommand):
help = "Import MaterialCategory & MaterialSubcategory from text/tsv."
def add_arguments(self, parser):
parser.add_argument(
"--file",
dest="file",
default=None,
help="Input text file path. If omitted, uses built-in default dataset (unless --stdin is specified).",
)
parser.add_argument(
"--stdin",
action="store_true",
help="Read import text from stdin (e.g. pipe a file or paste).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and report counts only; do not write to DB.",
)
@transaction.atomic
def handle(self, *args, **options):
dry_run: bool = bool(options["dry_run"])
file_path = options.get("file")
use_stdin: bool = bool(options.get("stdin"))
if file_path:
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
elif use_stdin:
text = sys.stdin.read()
else:
text = _DEFAULT_INPUT
rows = parse_text(text)
if not rows:
self.stdout.write(self.style.WARNING("No valid rows parsed; nothing to import."))
return
unique_categories = {r.category for r in rows}
unique_subs = {(r.category, s) for r in rows for s in r.subcategories}
self.stdout.write(f"Parsed rows: {len(rows)}")
self.stdout.write(f"Unique categories: {len(unique_categories)}")
self.stdout.write(f"Unique subcategories (category-scoped): {len(unique_subs)}")
if dry_run:
self.stdout.write(self.style.SUCCESS("Dry run complete (no DB changes)."))
return
created_categories = 0
created_subcategories = 0
category_by_value = {}
for category_name in sorted(unique_categories):
value = _category_value(category_name)
obj, created = MaterialCategory.objects.get_or_create(
value=value, defaults={"name": category_name}
)
if not created and obj.name != category_name:
obj.name = category_name
obj.save(update_fields=["name"])
if created:
created_categories += 1
category_by_value[value] = obj
for category_name, sub_name in sorted(unique_subs):
cat_value = _category_value(category_name)
category = category_by_value[cat_value]
sub_value = _subcategory_value(category_name, sub_name)
obj, created = MaterialSubcategory.objects.get_or_create(
value=sub_value,
defaults={"category": category, "name": sub_name},
)
changed = False
if obj.category_id != category.id:
obj.category = category
changed = True
if obj.name != sub_name:
obj.name = sub_name
changed = True
if changed:
obj.save(update_fields=["category", "name"])
if created:
created_subcategories += 1
self.stdout.write(
self.style.SUCCESS(
f"Import complete. Created categories: {created_categories}, created subcategories: {created_subcategories}."
)
)

View File

@ -0,0 +1,305 @@
# -*- coding: utf-8 -*-
"""
Excel材料库sheet 导入材料数据
表头行材料ID, 材料名称, 专业类别, 材料分类, 材料子类, 品牌, 规格型号, 符合标准, ...
工厂解析依据品牌列与 Factory.brand 模糊匹配取首行未匹配时关联到未识别的品牌工厂不存在则自动创建
"""
import re
from decimal import Decimal, InvalidOperation
from typing import Any, Dict, List, Optional, Tuple
from django.core.management.base import BaseCommand
from django.db import transaction
from apps.factory.models import Factory
from apps.material.models import Material
# 专业类别:中文 -> 模型 choice value
MAJOR_CATEGORY_MAP = {
"建筑": "architecture",
"景观": "landscape",
"设备": "equipment",
"装修": "decoration",
}
# 应用场景:中文 -> 模型 choice value
APPLICATION_SCENE_MAP = {
"府系": "fu",
"境系": "jing",
"城系": "cheng",
"住系": "zhu",
"保障房": "affordable",
}
# 替代类型
REPLACE_TYPE_MAP = {"平替": "alternative", "新研发": "new_development"}
# 竞争优势
ADVANTAGE_MAP = {"品质": "quality", "成本": "cost"}
# 星级:中文 -> 1/2/3
STAR_LEVEL_MAP = {"": 1, "": 2, "": 3, "1": 1, "2": 2, "3": 3}
def _cell(v: Any) -> str:
if v is None:
return ""
s = str(v).strip()
return s
def _norm_header(h: Any) -> str:
return _cell(h).replace("\n", " ").strip()
def _parse_application_scene(s: str) -> List[str]:
if not s:
return []
out = []
for part in re.split(r"[\s,,、;]+", s):
key = part.strip()
if key in APPLICATION_SCENE_MAP:
out.append(APPLICATION_SCENE_MAP[key])
return list(dict.fromkeys(out)) # preserve order, dedup
def _parse_advantage(s: str) -> List[str]:
if not s:
return []
out = []
for part in re.split(r"[\s,,、;]+", s):
key = part.strip()
if key in ADVANTAGE_MAP:
out.append(ADVANTAGE_MAP[key])
return list(dict.fromkeys(out))
def _parse_star(s: str) -> Optional[int]:
if not s:
return None
s = _cell(s)
return STAR_LEVEL_MAP.get(s)
def _parse_cost_compare(s: str) -> Optional[Decimal]:
if not s:
return None
s = _cell(s)
try:
return Decimal(s)
except (InvalidOperation, ValueError):
return None
def _first_line(s: str) -> str:
if not s:
return ""
return (s.split("\n")[0] or "").strip()
def _single_line(s: str, max_len: int = 255) -> str:
if not s:
return ""
s = re.sub(r"\s+", " ", _cell(s))[:max_len]
return s.strip() or ""
class Command(BaseCommand):
help = "Import materials from Excel file, sheet name '材料库'."
def add_arguments(self, parser):
parser.add_argument("excel_path", nargs="?", default=None, help="Path to the .xlsx file.")
parser.add_argument(
"--sheet",
default="材料库",
help="Sheet name to read (default: 材料库).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only parse and report row count; do not write to DB.",
)
@transaction.atomic
def handle(self, *args, **options):
import openpyxl
excel_path = options.get("excel_path")
if not excel_path:
self.stdout.write(self.style.ERROR("Please provide excel_path (path to .xlsx)."))
return
sheet_name = options.get("sheet") or "材料库"
dry_run = bool(options.get("dry_run"))
try:
wb = openpyxl.load_workbook(excel_path, read_only=True, data_only=True)
except Exception as e:
self.stdout.write(self.style.ERROR("Failed to open Excel: %s" % e))
return
if sheet_name not in wb.sheetnames:
self.stdout.write(self.style.ERROR("Sheet '%s' not found. Available: %s" % (sheet_name, wb.sheetnames)))
wb.close()
return
ws = wb[sheet_name]
rows = list(ws.iter_rows(values_only=True))
wb.close()
if len(rows) < 2:
self.stdout.write(self.style.WARNING("No header or data rows."))
return
# Build column index by header (first row that looks like header)
header_row = None
header_idx = {}
for i, row in enumerate(rows):
if not row:
continue
headers = [_norm_header(c) for c in row]
if "材料名称" in headers:
header_row = row
for j, h in enumerate(headers):
if h:
header_idx[h] = j
break
if not header_idx or "材料名称" not in header_idx:
self.stdout.write(self.style.ERROR("Header row with '材料名称' not found."))
return
data_start = i + 1 # first data row after header
def get(row: Tuple, key: str, default: str = "") -> str:
j = header_idx.get(key, -1)
if j < 0 or j >= len(row):
return default
return _cell(row[j])
# Resolve factory by 品牌:精确匹配 -> 模糊匹配(品牌包含 Excel 值 或 Excel 值包含品牌)
factory_cache: Dict[str, Optional[Factory]] = {}
def get_factory(brand_name: str) -> Optional[Factory]:
if not brand_name:
return None
if brand_name not in factory_cache:
# 1) 精确匹配
f = Factory.objects.filter(brand=brand_name).first()
if f:
factory_cache[brand_name] = f
return f
# 2) 模糊:工厂品牌包含 Excel 值(如 Excel「立邦」匹配「立邦中国」
f = Factory.objects.filter(brand__icontains=brand_name).first()
if f:
factory_cache[brand_name] = f
return f
# 3) 模糊Excel 值包含某工厂品牌(如 Excel「立邦中国\n广州立邦」取首行后仍可匹配
brand_lower = brand_name.lower()
for factory in Factory.objects.all():
if factory.brand and factory.brand.lower() in brand_lower:
factory_cache[brand_name] = factory
return factory
factory_cache[brand_name] = None
return factory_cache[brand_name]
# 未识别品牌工厂:品牌列无法匹配到任一工厂时,统一关联到此工厂(不存在则创建)
UNRECOGNIZED_BRAND = "未识别的品牌"
unrecognized_factory, _ = Factory.objects.get_or_create(
brand=UNRECOGNIZED_BRAND,
defaults={
"factory_name": "未识别的品牌工厂",
"province": "-",
"city": "-",
},
)
created = 0
updated = 0
skipped = 0
errors: List[str] = []
for i in range(data_start, len(rows)):
row = rows[i]
if not row:
continue
name = get(row, "材料名称")
if not name:
continue
# Factory: 依据「品牌」列模糊匹配 Factory.brand取首行未匹配则关联到「未识别的品牌」工厂
brand_val = _first_line(get(row, "品牌"))
factory = get_factory(brand_val) if brand_val else None
if not factory:
factory = unrecognized_factory
major_cn = get(row, "专业类别")
major_category = MAJOR_CATEGORY_MAP.get(major_cn, "architecture")
material_category = _single_line(get(row, "材料分类"))
material_subcategory = _single_line(get(row, "材料子类"))
if not material_category:
material_category = "-"
if not material_subcategory:
material_subcategory = "-"
application_scene = _parse_application_scene(get(row, "应用场景"))
replace_type = REPLACE_TYPE_MAP.get(_cell(get(row, "替代材料")))
advantage = _parse_advantage(get(row, "竞争优势"))
data = {
"name": name[:255],
"major_category": major_category,
"material_category": material_category,
"material_subcategory": material_subcategory,
"spec": _single_line(get(row, "规格型号")) or None,
"standard": _single_line(get(row, "符合标准")) or None,
"application_scene": application_scene or None,
"application_desc": _cell(get(row, "应用场景说明")) or None,
"replace_type": replace_type,
"advantage_desc": _cell(get(row, "竞争优势说明")) or None,
"advantage": advantage or None,
"cost_compare": _parse_cost_compare(get(row, "成本")),
"cost_desc": _cell(get(row, "成本")) or None,
"cases": _cell(get(row, "应用案例")) or None,
"quality_level": _parse_star(get(row, "质量提升")),
"durability_level": _parse_star(get(row, "耐久可靠")),
"eco_level": _parse_star(get(row, "环保健康")),
"carbon_level": _parse_star(get(row, "循环低碳")),
"score_level": _parse_star(get(row, "评分等级")),
"connection_method": _single_line(get(row, "连接方式")) or None,
"construction_method": _single_line(get(row, "施工工艺")) or None,
"limit_condition": _cell(get(row, "限制条件")) or None,
"factory": factory,
"status": "draft",
}
if dry_run:
created += 1
continue
material_id = None
if header_idx.get("材料ID") is not None and row[header_idx["材料ID"]] is not None:
try:
material_id = int(row[header_idx["材料ID"]])
except (TypeError, ValueError):
pass
if material_id and Material.objects.filter(id=material_id).exists():
m = Material.objects.get(id=material_id)
for k, v in data.items():
setattr(m, k, v)
m.save()
updated += 1
else:
Material.objects.create(**data)
created += 1
if dry_run:
self.stdout.write(self.style.SUCCESS("Dry run: would process %s rows." % created))
return
self.stdout.write(
self.style.SUCCESS("Import complete. Created: %s, Updated: %s, Skipped: %s." % (created, updated, skipped))
)
if errors:
for msg in errors[:20]:
self.stdout.write(self.style.WARNING(msg))
if len(errors) > 20:
self.stdout.write(self.style.WARNING("... and %s more." % (len(errors) - 20)))

View File

@ -18,7 +18,7 @@ class MaterialSerializer(serializers.ModelSerializer):
材料序列化器
"""
factory_name = serializers.CharField(source='factory.factory_name', read_only=True)
factory_short_name = serializers.CharField(source='factory.factory_short_name', read_only=True)
brand = serializers.CharField(source='factory.brand', read_only=True)
major_category_display = serializers.CharField(source='get_major_category_display', read_only=True)
replace_type_display = serializers.CharField(source='get_replace_type_display', read_only=True)
application_scene = JSONListField(
@ -33,6 +33,9 @@ class MaterialSerializer(serializers.ModelSerializer):
)
advantage_display = serializers.SerializerMethodField()
application_scene_display = serializers.SerializerMethodField()
brochure = serializers.CharField(
required=False, allow_blank=True, allow_null=True, default='',
)
brochure_url = serializers.SerializerMethodField()
class Meta:
@ -44,14 +47,11 @@ class MaterialSerializer(serializers.ModelSerializer):
'advantage_desc', 'cost_compare', 'cost_desc', 'cases', 'brochure',
'brochure_url', 'quality_level', 'durability_level', 'eco_level',
'carbon_level', 'score_level', 'connection_method', 'construction_method',
'limit_condition', 'factory', 'factory_name', 'factory_short_name',
'limit_condition', 'factory', 'factory_name', 'brand',
'status', 'created_at', 'updated_at']
read_only_fields = ['id', 'created_at', 'updated_at']
def get_brochure_url(self, obj):
"""
获取宣传页图片URL
"""
if obj.brochure:
request = self.context.get('request')
if request:
@ -78,7 +78,7 @@ class MaterialListSerializer(serializers.ModelSerializer):
材料列表序列化器简化版
"""
factory_name = serializers.CharField(source='factory.factory_name', read_only=True)
factory_short_name = serializers.CharField(source='factory.factory_short_name', read_only=True)
brand = serializers.CharField(source='factory.brand', read_only=True)
major_category_display = serializers.CharField(source='get_major_category_display', read_only=True)
status_display = serializers.CharField(source='get_status_display', read_only=True)
@ -86,7 +86,7 @@ class MaterialListSerializer(serializers.ModelSerializer):
model = Material
fields = ['id', 'name', 'major_category', 'major_category_display',
'material_category', 'material_subcategory', 'factory',
'factory_name', 'factory_short_name', 'status', 'status_display']
'factory_name', 'brand', 'status', 'status_display']
class MaterialCategorySerializer(serializers.ModelSerializer):

View File

@ -188,7 +188,7 @@ def factory_statistics(request):
})
factories_list = list(Factory.objects.values(
'id', 'factory_name', 'factory_short_name', 'province', 'city', 'website'
'id', 'factory_name', 'brand', 'province', 'city', 'website'
))
return Response({

View File

@ -7,6 +7,8 @@ from django.conf import settings
from django.conf.urls.static import static
from django.views.generic import TemplateView
from .views import upload_image
urlpatterns = [
path('admin/', admin.site.urls),
path('api/auth/', include('apps.authentication.urls')),
@ -14,6 +16,7 @@ urlpatterns = [
path('api/material/', include('apps.material.urls')),
path('api/dictionary/', include('apps.dictionary.urls')),
path('api/statistics/', include('apps.statistics.urls')),
path('api/upload/', upload_image, name='upload-image'),
]
# 开发环境下提供媒体文件服务

52
backend/config/views.py Normal file
View File

@ -0,0 +1,52 @@
import os
import uuid
from django.conf import settings
from rest_framework.decorators import api_view, permission_classes, parser_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.parsers import MultiPartParser
from rest_framework.response import Response
from rest_framework import status
ALLOWED_IMAGE_TYPES = {
'image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/bmp',
}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
@api_view(['POST'])
@permission_classes([IsAuthenticated])
@parser_classes([MultiPartParser])
def upload_image(request):
file = request.FILES.get('file')
if not file:
return Response({'detail': '未提供文件'}, status=status.HTTP_400_BAD_REQUEST)
if file.content_type not in ALLOWED_IMAGE_TYPES:
return Response(
{'detail': '仅支持 JPG/PNG/GIF/WEBP/BMP 格式的图片'},
status=status.HTTP_400_BAD_REQUEST,
)
if file.size > MAX_FILE_SIZE:
return Response(
{'detail': '文件大小不能超过 10MB'},
status=status.HTTP_400_BAD_REQUEST,
)
ext = os.path.splitext(file.name)[1].lower()
filename = f"{uuid.uuid4().hex}{ext}"
relative_path = f"uploads/{filename}"
full_path = os.path.join(settings.MEDIA_ROOT, relative_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'wb+') as dest:
for chunk in file.chunks():
dest.write(chunk)
url = request.build_absolute_uri(f"/{settings.MEDIA_URL}{relative_path}")
return Response({'url': url, 'path': relative_path})

View File

@ -5,3 +5,4 @@ psycopg2-binary==2.9.9
django-cors-headers==4.3.0
Pillow==10.1.0
python-decouple==3.8
openpyxl==3.1.2

View File

@ -1,4 +1,4 @@
<template>
<template>
<div class="page">
<div class="page-title">
工厂详情
@ -7,7 +7,7 @@
<div class="card" v-if="factory">
<el-descriptions :column="2" border>
<el-descriptions-item label="工厂全称">{{ factory.factory_name }}</el-descriptions-item>
<el-descriptions-item label="工厂简称">{{ factory.factory_short_name }}</el-descriptions-item>
<el-descriptions-item label="品牌">{{ factory.brand }}</el-descriptions-item>
<el-descriptions-item label="经销商">{{ factory.dealer_name }}</el-descriptions-item>
<el-descriptions-item label="产品分类">{{ factory.product_category }}</el-descriptions-item>
<el-descriptions-item label="地区">{{ formatRegion(factory.province, factory.city, factory.district) }}</el-descriptions-item>

View File

@ -6,7 +6,7 @@
</div>
<el-table v-loading="tableLoading" :data="factories" border :max-height="560">
<el-table-column prop="factory_name" label="工厂全称" />
<el-table-column prop="factory_short_name" label="工厂简称" />
<el-table-column prop="brand" label="品牌" />
<el-table-column prop="dealer_name" label="经销商" />
<el-table-column label="地区">
<template #default="scope">
@ -47,8 +47,8 @@
<el-form-item label="工厂全称" required>
<el-input v-model="form.factory_name" />
</el-form-item>
<el-form-item label="工厂简称" required>
<el-input v-model="form.factory_short_name" />
<el-form-item label="品牌" required>
<el-input v-model="form.brand" />
</el-form-item>
<el-form-item label="省市区" required>
<el-cascader
@ -103,7 +103,7 @@ const form = reactive({
dealer_name: '',
product_category: '',
factory_name: '',
factory_short_name: '',
brand: '',
province: '',
city: '',
district: '',
@ -126,7 +126,7 @@ const resetForm = () => {
form.dealer_name = ''
form.product_category = ''
form.factory_name = ''
form.factory_short_name = ''
form.brand = ''
form.province = ''
form.city = ''
form.district = ''

View File

@ -18,7 +18,7 @@
<el-table-column prop="major_category_display" label="专业类别" />
<el-table-column prop="material_category" label="材料分类" />
<el-table-column prop="material_subcategory" label="材料子类" />
<el-table-column prop="factory_short_name" label="所属工厂" />
<el-table-column prop="brand" label="所属工厂" />
<el-table-column prop="status_display" label="状态" width="120" />
<el-table-column label="操作" width="320">
<template #default="scope">
@ -152,7 +152,7 @@
</el-form-item>
<el-form-item label="所属工厂" v-if="isAdmin">
<el-select v-model="form.factory">
<el-option v-for="item in factories" :key="item.id" :label="item.factory_name" :value="item.id" />
<el-option v-for="item in factories" :key="item.id" :label="item.brand" :value="item.id" />
</el-select>
</el-form-item>
</el-form>