zcbot/scripts/smoke_scientific_skills.py

308 lines
11 KiB
Python

"""Smoke: 3 个新加的科学计算 skill 通路验证(pymatgen / stats_ml / plot_pub)。
跑法: .venv/Scripts/python.exe scripts/smoke_scientific_skills.py
依赖:`pip install pymatgen mp-api scikit-learn statsmodels`(PyMC 可选,装了就测)。
不依赖网络默认情况下(MP_API_KEY 没配则跳过 mp_rester 联网那一段)。
不动 DB / workspace,产物落系统临时目录,跑完即丢。
按 skill 顺序 4 段:
step A — pymatgen import + CEMENT_PHASES 几个查询 + mp_rester 未配 key 抛错
step B — stats_ml 三库装包 + 小 OLS / RandomForest smoke
step C — plot_pub apply_pub_style + 最小 XRD-like 图出 PNG
step D —(可选)MP_API_KEY 配了就联网拉一条 Materials Project 数据
任一步异常 [FAIL] 标注后继续下一步,保证整条链路看一遍。
"""
from __future__ import annotations
import io
import os
import sys
import tempfile
import time
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
# Windows GBK 控制台编码问题: 强制 stdout / stderr utf-8(memory 里这条已踩过)
if hasattr(sys.stdout, "buffer"):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "buffer"):
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# 读 .env 注入 MP_API_KEY 等(litellm 链路外手动加载)
env_file = ROOT / ".env"
if env_file.exists():
for line in env_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
os.environ.setdefault(k.strip(), v.strip())
def _hr(title: str) -> None:
print()
print("=" * 60)
print(f"[{title}]")
print("=" * 60)
def _ok(msg: str) -> None:
print(f"[OK] {msg}")
def _fail(msg: str) -> None:
print(f"[FAIL] {msg}")
def _skip(msg: str) -> None:
print(f"[SKIP] {msg}")
def _info(msg: str) -> None:
print(f"[INFO] {msg}")
def step_a_pymatgen() -> None:
_hr("step A: pymatgen skill")
# A1: helper import
try:
from skills.pymatgen.materials import CEMENT_PHASES, lookup_phase, mp_rester
_ok(f"import skills.pymatgen.materials (CEMENT_PHASES 条目数={len(CEMENT_PHASES)})")
except Exception as e:
_fail(f"import skills.pymatgen.materials: {type(e).__name__}: {e}")
return
# A2: 典型查询
cases = [
("C3S", "Ca3SiO5"),
("硅酸三钙", "Ca3SiO5"),
("alite", "Ca3SiO5"), # 大小写不敏感
("钙矾石", "Ca6Al2(SO4)3(OH)12·26H2O"),
("莫来石", "Al6Si2O13"),
("方镁石", "MgO"),
("石英", "SiO2"),
]
for name, expected in cases:
try:
got = lookup_phase(name)
if got == expected:
_ok(f"lookup_phase({name!r}) -> {got}")
else:
_fail(f"lookup_phase({name!r}) -> {got},期望 {expected}")
except Exception as e:
_fail(f"lookup_phase({name!r}) raised {type(e).__name__}: {e}")
# A3: 未命中抛 KeyError
try:
lookup_phase("根本不存在的相_xyz123")
_fail("lookup_phase 未命中应抛 KeyError,没抛")
except KeyError as e:
_ok(f"lookup_phase 未命中正确抛 KeyError (msg 含建议: {'补到' in str(e)})")
except Exception as e:
_fail(f"lookup_phase 未命中应抛 KeyError,实际 {type(e).__name__}")
# A4: pymatgen 本体 import
try:
from pymatgen.core import Structure, Lattice, Molecule
from pymatgen.analysis.diffraction.xrd import XRDCalculator
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer
_ok("pymatgen 核心类 import 全通 (Structure / XRDCalculator / SpacegroupAnalyzer)")
except Exception as e:
_fail(f"pymatgen 本体 import: {type(e).__name__}: {e}")
return
# A5: 构造一个简单的立方结构 + XRDCalculator 跑一次
try:
from pymatgen.core import Structure, Lattice
from pymatgen.analysis.diffraction.xrd import XRDCalculator
# MgO,方镁石,典型耐火材料相
lattice = Lattice.cubic(4.21) # MgO a≈4.21Å
struct = Structure(lattice, ["Mg", "O"], [[0, 0, 0], [0.5, 0.5, 0.5]])
xrd = XRDCalculator(wavelength="CuKa")
pattern = xrd.get_pattern(struct, two_theta_range=(20, 80))
_ok(f"XRDCalculator on MgO 结构: {len(pattern.x)} 个峰,2θ 范围 [{pattern.x[0]:.1f}, {pattern.x[-1]:.1f}]")
except Exception as e:
_fail(f"XRDCalculator smoke: {type(e).__name__}: {e}")
# A6: mp_rester 未配 key 抛 RuntimeError
has_key = bool(os.environ.get("MP_API_KEY"))
if has_key:
_info("MP_API_KEY 已配置,skip 缺 key 抛错验证(下面 step D 测真实查询)")
else:
# 显式清掉 env 再测
try:
with mp_rester() as mpr:
_fail("MP_API_KEY 未配置时 mp_rester 应抛 RuntimeError,没抛")
except RuntimeError as e:
if "MP_API_KEY" in str(e) and "materialsproject" in str(e):
_ok("mp_rester 未配 key 正确抛 RuntimeError 含申请链接")
else:
_fail(f"RuntimeError 抛了但 msg 不对: {e}")
except Exception as e:
_fail(f"应抛 RuntimeError 实际 {type(e).__name__}: {e}")
def step_b_stats_ml() -> None:
_hr("step B: stats_ml skill")
# B1: sklearn
try:
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
# 造一个 fake 配方-强度数据(50 样本 6 特征)
rng = np.random.default_rng(42)
X = rng.uniform(0, 1, size=(50, 6)) # 6 个掺合料比例
y = (X @ rng.uniform(20, 80, size=6)) + rng.normal(0, 5, size=50) # 强度 MPa
pipe = Pipeline([
("scaler", StandardScaler()),
("model", RandomForestRegressor(n_estimators=50, random_state=42)),
])
scores = cross_val_score(pipe, X, y, cv=5, scoring="r2")
_ok(f"sklearn RandomForest 5-fold R²: mean={scores.mean():.3f} std={scores.std():.3f}")
except Exception as e:
_fail(f"sklearn smoke: {type(e).__name__}: {e}")
# B2: statsmodels
try:
import numpy as np
import pandas as pd
import statsmodels.formula.api as smf
rng = np.random.default_rng(42)
df = pd.DataFrame({
"x1": rng.uniform(0, 1, 50),
"x2": rng.uniform(0, 1, 50),
})
df["y"] = 3 * df["x1"] - 2 * df["x2"] + rng.normal(0, 0.3, 50)
model = smf.ols("y ~ x1 + x2", data=df).fit()
r2 = model.rsquared
p_x1 = model.pvalues["x1"]
_ok(f"statsmodels OLS: R²={r2:.3f}, p(x1)={p_x1:.4f} (应 << 0.05)")
except Exception as e:
_fail(f"statsmodels smoke: {type(e).__name__}: {e}")
# B3: PyMC(可选)
try:
import pymc as pm
import arviz as az
_ok(f"pymc import OK (version={pm.__version__})")
# 不真跑采样(慢),只验 import
except ImportError:
_skip("PyMC / arviz 未装(可选依赖,要做贝叶斯再 pip install pymc arviz)")
except Exception as e:
_fail(f"PyMC import: {type(e).__name__}: {e}")
def step_c_plot_pub() -> None:
_hr("step C: plot_pub skill")
# C1: import + apply_pub_style
try:
from skills.plot_pub.style import apply_pub_style, reset_style, _find_chinese_font
font = _find_chinese_font()
if font:
_ok(f"_find_chinese_font 返 {font!r}")
else:
_info("系统未装中文字体候选 (SimHei/YaHei/WenQuanYi/Heiti),中文将显示方块")
apply_pub_style()
_ok("apply_pub_style() 调用通过")
except Exception as e:
_fail(f"plot_pub import / apply_pub_style: {type(e).__name__}: {e}")
return
# C2: 跑一个 minimal XRD-like 图
try:
import numpy as np
import matplotlib.pyplot as plt
tmp_dir = Path(tempfile.mkdtemp(prefix="zcbot_smoke_plot_"))
out_png = tmp_dir / "smoke_xrd.png"
out_pdf = tmp_dir / "smoke_xrd.pdf"
two_theta = np.linspace(5, 80, 1000)
# 假装是 MgO 衍射(几个高斯峰)
peaks = [(36.9, 1.0), (42.9, 0.7), (62.3, 0.5), (74.7, 0.3), (78.6, 0.4)]
intensity = np.zeros_like(two_theta)
for pos, h in peaks:
intensity += h * np.exp(-((two_theta - pos) ** 2) / (2 * 0.3 ** 2))
intensity += np.random.normal(0, 0.02, len(two_theta)) # 噪声
fig, ax = plt.subplots(figsize=(6, 4))
ax.plot(two_theta, intensity, "k-", lw=1.0, label="MgO 模拟谱")
ax.set_xlabel(r"$2\theta$ / °")
ax.set_ylabel("强度 / a.u.")
ax.set_xlim(5, 80)
ax.legend(frameon=False)
fig.tight_layout()
fig.savefig(out_png, dpi=200)
fig.savefig(out_pdf)
plt.close(fig)
png_size = out_png.stat().st_size
pdf_size = out_pdf.stat().st_size
_ok(f"出图 PNG ({png_size/1024:.1f}KB) + PDF ({pdf_size/1024:.1f}KB) -> {tmp_dir}")
except Exception as e:
_fail(f"plot_pub 出图: {type(e).__name__}: {e}")
# C3: 还原 rcParams 防污染后续步骤
try:
reset_style()
_ok("reset_style() 还原 matplotlib defaults")
except Exception as e:
_fail(f"reset_style: {type(e).__name__}: {e}")
def step_d_mp_online() -> None:
_hr("step D: Materials Project 联网(可选,需 MP_API_KEY)")
if not os.environ.get("MP_API_KEY"):
_skip("MP_API_KEY 未配,跳过联网查询(.env 加 MP_API_KEY=... 即可,免费申请 https://materialsproject.org/api)")
return
try:
from skills.pymatgen.materials import mp_rester, lookup_phase
formula = lookup_phase("C3S") # Ca3SiO5
t0 = time.time()
with mp_rester() as mpr:
docs = mpr.materials.summary.search(
formula=formula,
fields=["material_id", "formula_pretty", "energy_above_hull"],
)
dt = (time.time() - t0) * 1000
_ok(f"mp_rester 查 {formula}: 返回 {len(docs)} 条 in {dt:.0f}ms")
for d in docs[:3]:
print(f" {d.material_id} {d.formula_pretty} ehull={d.energy_above_hull:.3f}")
except Exception as e:
_fail(f"mp_rester 联网查询: {type(e).__name__}: {e}")
def main() -> int:
print("=" * 60)
print("zcbot scientific skills smoke (pymatgen / stats_ml / plot_pub)")
print("=" * 60)
for fn in (step_a_pymatgen, step_b_stats_ml, step_c_plot_pub, step_d_mp_online):
try:
fn()
except Exception as e:
_fail(f"[{fn.__name__} crashed] {type(e).__name__}: {e}")
print()
print("=" * 60)
print("smoke done")
print("=" * 60)
return 0
if __name__ == "__main__":
sys.exit(main())