"""Smoke: 3 个新加的科学计算 skill 通路验证(pymatgen / stats_ml / plot_pub)。 跑法: .venv/Scripts/python.exe scripts/smoke_scientific_skills.py 依赖:`pip install pymatgen mp-api scikit-learn statsmodels`(PyMC 可选,装了就测)。 不依赖网络默认情况下(MP_API_KEY 没配则跳过 mp_rester 联网那一段)。 不动 DB / workspace,产物落系统临时目录,跑完即丢。 按 skill 顺序 4 段: step A — pymatgen import + CEMENT_PHASES 几个查询 + mp_rester 未配 key 抛错 step B — stats_ml 三库装包 + 小 OLS / RandomForest smoke step C — plot_pub apply_pub_style + 最小 XRD-like 图出 PNG step D —(可选)MP_API_KEY 配了就联网拉一条 Materials Project 数据 任一步异常 [FAIL] 标注后继续下一步,保证整条链路看一遍。 """ from __future__ import annotations import io import os import sys import tempfile import time from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) # Windows GBK 控制台编码问题: 强制 stdout / stderr utf-8(memory 里这条已踩过) if hasattr(sys.stdout, "buffer"): sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") if hasattr(sys.stderr, "buffer"): sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") # 读 .env 注入 MP_API_KEY 等(litellm 链路外手动加载) env_file = ROOT / ".env" if env_file.exists(): for line in env_file.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") os.environ.setdefault(k.strip(), v.strip()) def _hr(title: str) -> None: print() print("=" * 60) print(f"[{title}]") print("=" * 60) def _ok(msg: str) -> None: print(f"[OK] {msg}") def _fail(msg: str) -> None: print(f"[FAIL] {msg}") def _skip(msg: str) -> None: print(f"[SKIP] {msg}") def _info(msg: str) -> None: print(f"[INFO] {msg}") def step_a_pymatgen() -> None: _hr("step A: pymatgen skill") # A1: helper import try: from skills.pymatgen.materials import CEMENT_PHASES, lookup_phase _ok(f"import skills.pymatgen.materials (CEMENT_PHASES 条目数={len(CEMENT_PHASES)})") except Exception as e: _fail(f"import skills.pymatgen.materials: {type(e).__name__}: {e}") return # A2: 典型查询 cases = [ ("C3S", "Ca3SiO5"), ("硅酸三钙", "Ca3SiO5"), ("alite", "Ca3SiO5"), # 大小写不敏感 ("钙矾石", "Ca6Al2(SO4)3(OH)12·26H2O"), ("莫来石", "Al6Si2O13"), ("方镁石", "MgO"), ("石英", "SiO2"), ] for name, expected in cases: try: got = lookup_phase(name) if got == expected: _ok(f"lookup_phase({name!r}) -> {got}") else: _fail(f"lookup_phase({name!r}) -> {got},期望 {expected}") except Exception as e: _fail(f"lookup_phase({name!r}) raised {type(e).__name__}: {e}") # A3: 未命中抛 KeyError try: lookup_phase("根本不存在的相_xyz123") _fail("lookup_phase 未命中应抛 KeyError,没抛") except KeyError as e: _ok(f"lookup_phase 未命中正确抛 KeyError (msg 含建议: {'补到' in str(e)})") except Exception as e: _fail(f"lookup_phase 未命中应抛 KeyError,实际 {type(e).__name__}") # A4: pymatgen 本体 import try: from pymatgen.core import Structure, Lattice, Molecule from pymatgen.analysis.diffraction.xrd import XRDCalculator from pymatgen.symmetry.analyzer import SpacegroupAnalyzer _ok("pymatgen 核心类 import 全通 (Structure / XRDCalculator / SpacegroupAnalyzer)") except Exception as e: _fail(f"pymatgen 本体 import: {type(e).__name__}: {e}") return # A5: 构造一个简单的立方结构 + XRDCalculator 跑一次 try: from pymatgen.core import Structure, Lattice from pymatgen.analysis.diffraction.xrd import XRDCalculator # MgO,方镁石,典型耐火材料相 lattice = Lattice.cubic(4.21) # MgO a≈4.21Å struct = Structure(lattice, ["Mg", "O"], [[0, 0, 0], [0.5, 0.5, 0.5]]) xrd = XRDCalculator(wavelength="CuKa") pattern = xrd.get_pattern(struct, two_theta_range=(20, 80)) _ok(f"XRDCalculator on MgO 结构: {len(pattern.x)} 个峰,2θ 范围 [{pattern.x[0]:.1f}, {pattern.x[-1]:.1f}]") except Exception as e: _fail(f"XRDCalculator smoke: {type(e).__name__}: {e}") # A6: MP host tool 未配 key 返回 [Error](key 只在宿主读,不进 sandbox) has_key = bool(os.environ.get("MP_API_KEY")) if has_key: _info("MP_API_KEY 已配置,skip 缺 key 报错验证(下面 step D 测真实查询)") else: try: from tools.materials_project import MaterialsProjectSearchSummaryTool out = MaterialsProjectSearchSummaryTool().execute(formula="Ca3SiO5") if out.startswith("[Error]") and "MP_API_KEY" in out: _ok("mp_search_summary 未配 key 返回 [Error] 含 MP_API_KEY 提示") else: _fail(f"未配 key 应返回 [Error] 含 MP_API_KEY,实际: {out[:120]}") except Exception as e: _fail(f"mp_search_summary 未配 key 应返回 [Error] 而非抛异常: {type(e).__name__}: {e}") def step_b_stats_ml() -> None: _hr("step B: stats_ml skill") # B1: sklearn try: import numpy as np from sklearn.ensemble import RandomForestRegressor from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline from sklearn.model_selection import cross_val_score # 造一个 fake 配方-强度数据(50 样本 6 特征) rng = np.random.default_rng(42) X = rng.uniform(0, 1, size=(50, 6)) # 6 个掺合料比例 y = (X @ rng.uniform(20, 80, size=6)) + rng.normal(0, 5, size=50) # 强度 MPa pipe = Pipeline([ ("scaler", StandardScaler()), ("model", RandomForestRegressor(n_estimators=50, random_state=42)), ]) scores = cross_val_score(pipe, X, y, cv=5, scoring="r2") _ok(f"sklearn RandomForest 5-fold R²: mean={scores.mean():.3f} std={scores.std():.3f}") except Exception as e: _fail(f"sklearn smoke: {type(e).__name__}: {e}") # B2: statsmodels try: import numpy as np import pandas as pd import statsmodels.formula.api as smf rng = np.random.default_rng(42) df = pd.DataFrame({ "x1": rng.uniform(0, 1, 50), "x2": rng.uniform(0, 1, 50), }) df["y"] = 3 * df["x1"] - 2 * df["x2"] + rng.normal(0, 0.3, 50) model = smf.ols("y ~ x1 + x2", data=df).fit() r2 = model.rsquared p_x1 = model.pvalues["x1"] _ok(f"statsmodels OLS: R²={r2:.3f}, p(x1)={p_x1:.4f} (应 << 0.05)") except Exception as e: _fail(f"statsmodels smoke: {type(e).__name__}: {e}") # B3: PyMC(可选) try: import pymc as pm import arviz as az _ok(f"pymc import OK (version={pm.__version__})") # 不真跑采样(慢),只验 import except ImportError: _skip("PyMC / arviz 未装(可选依赖,要做贝叶斯再 pip install pymc arviz)") except Exception as e: _fail(f"PyMC import: {type(e).__name__}: {e}") def step_c_plot_pub() -> None: _hr("step C: plot_pub skill") # C1: import + apply_pub_style try: from skills.plot_pub.style import apply_pub_style, reset_style, _find_chinese_font font = _find_chinese_font() if font: _ok(f"_find_chinese_font 返 {font!r}") else: _info("系统未装中文字体候选 (SimHei/YaHei/WenQuanYi/Heiti),中文将显示方块") apply_pub_style() _ok("apply_pub_style() 调用通过") except Exception as e: _fail(f"plot_pub import / apply_pub_style: {type(e).__name__}: {e}") return # C2: 跑一个 minimal XRD-like 图 try: import numpy as np import matplotlib.pyplot as plt tmp_dir = Path(tempfile.mkdtemp(prefix="zcbot_smoke_plot_")) out_png = tmp_dir / "smoke_xrd.png" out_pdf = tmp_dir / "smoke_xrd.pdf" two_theta = np.linspace(5, 80, 1000) # 假装是 MgO 衍射(几个高斯峰) peaks = [(36.9, 1.0), (42.9, 0.7), (62.3, 0.5), (74.7, 0.3), (78.6, 0.4)] intensity = np.zeros_like(two_theta) for pos, h in peaks: intensity += h * np.exp(-((two_theta - pos) ** 2) / (2 * 0.3 ** 2)) intensity += np.random.normal(0, 0.02, len(two_theta)) # 噪声 fig, ax = plt.subplots(figsize=(6, 4)) ax.plot(two_theta, intensity, "k-", lw=1.0, label="MgO 模拟谱") ax.set_xlabel(r"$2\theta$ / °") ax.set_ylabel("强度 / a.u.") ax.set_xlim(5, 80) ax.legend(frameon=False) fig.tight_layout() fig.savefig(out_png, dpi=200) fig.savefig(out_pdf) plt.close(fig) png_size = out_png.stat().st_size pdf_size = out_pdf.stat().st_size _ok(f"出图 PNG ({png_size/1024:.1f}KB) + PDF ({pdf_size/1024:.1f}KB) -> {tmp_dir}") except Exception as e: _fail(f"plot_pub 出图: {type(e).__name__}: {e}") # C3: 还原 rcParams 防污染后续步骤 try: reset_style() _ok("reset_style() 还原 matplotlib defaults") except Exception as e: _fail(f"reset_style: {type(e).__name__}: {e}") def step_d_mp_online() -> None: _hr("step D: Materials Project 联网(可选,需 MP_API_KEY)") if not os.environ.get("MP_API_KEY"): _skip("MP_API_KEY 未配,跳过联网查询(.env 加 MP_API_KEY=... 即可,免费申请 https://materialsproject.org/api)") return try: import json from skills.pymatgen.materials import lookup_phase from tools.materials_project import MaterialsProjectSearchSummaryTool formula = lookup_phase("C3S") # Ca3SiO5 t0 = time.time() out = MaterialsProjectSearchSummaryTool().execute( formula=formula, fields=["material_id", "formula_pretty", "energy_above_hull"], limit=3, ) dt = (time.time() - t0) * 1000 if out.startswith("[Error]"): _fail(f"mp_search_summary 查 {formula}: {out[:160]}") return docs = json.loads(out) _ok(f"mp_search_summary 查 {formula}: 返回 {len(docs)} 条 in {dt:.0f}ms") for d in docs[:3]: print(f" {d.get('material_id')} {d.get('formula_pretty')} ehull={d.get('energy_above_hull')}") except Exception as e: _fail(f"mp_search_summary 联网查询: {type(e).__name__}: {e}") def main() -> int: print("=" * 60) print("zcbot scientific skills smoke (pymatgen / stats_ml / plot_pub)") print("=" * 60) for fn in (step_a_pymatgen, step_b_stats_ml, step_c_plot_pub, step_d_mp_online): try: fn() except Exception as e: _fail(f"[{fn.__name__} crashed] {type(e).__name__}: {e}") print() print("=" * 60) print("smoke done") print("=" * 60) return 0 if __name__ == "__main__": sys.exit(main())