312 lines
11 KiB
Python
312 lines
11 KiB
Python
"""Smoke: 3 个新加的科学计算 skill 通路验证(pymatgen / stats_ml / plot_pub)。
|
|
|
|
跑法: .venv/Scripts/python.exe scripts/smoke_scientific_skills.py
|
|
|
|
依赖:`pip install pymatgen mp-api scikit-learn statsmodels`(PyMC 可选,装了就测)。
|
|
|
|
不依赖网络默认情况下(MP_API_KEY 没配则跳过 mp_rester 联网那一段)。
|
|
不动 DB / workspace,产物落系统临时目录,跑完即丢。
|
|
|
|
按 skill 顺序 4 段:
|
|
step A — pymatgen import + CEMENT_PHASES 几个查询 + mp_rester 未配 key 抛错
|
|
step B — stats_ml 三库装包 + 小 OLS / RandomForest smoke
|
|
step C — plot_pub apply_pub_style + 最小 XRD-like 图出 PNG
|
|
step D —(可选)MP_API_KEY 配了就联网拉一条 Materials Project 数据
|
|
|
|
任一步异常 [FAIL] 标注后继续下一步,保证整条链路看一遍。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
# Windows GBK 控制台编码问题: 强制 stdout / stderr utf-8(memory 里这条已踩过)
|
|
if hasattr(sys.stdout, "buffer"):
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
|
if hasattr(sys.stderr, "buffer"):
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
|
|
|
|
|
# 读 .env 注入 MP_API_KEY 等(litellm 链路外手动加载)
|
|
env_file = ROOT / ".env"
|
|
if env_file.exists():
|
|
for line in env_file.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, _, v = line.partition("=")
|
|
os.environ.setdefault(k.strip(), v.strip())
|
|
|
|
|
|
def _hr(title: str) -> None:
|
|
print()
|
|
print("=" * 60)
|
|
print(f"[{title}]")
|
|
print("=" * 60)
|
|
|
|
|
|
def _ok(msg: str) -> None:
|
|
print(f"[OK] {msg}")
|
|
|
|
|
|
def _fail(msg: str) -> None:
|
|
print(f"[FAIL] {msg}")
|
|
|
|
|
|
def _skip(msg: str) -> None:
|
|
print(f"[SKIP] {msg}")
|
|
|
|
|
|
def _info(msg: str) -> None:
|
|
print(f"[INFO] {msg}")
|
|
|
|
|
|
def step_a_pymatgen() -> None:
|
|
_hr("step A: pymatgen skill")
|
|
|
|
# A1: helper import
|
|
try:
|
|
from skills.pymatgen.materials import CEMENT_PHASES, lookup_phase
|
|
_ok(f"import skills.pymatgen.materials (CEMENT_PHASES 条目数={len(CEMENT_PHASES)})")
|
|
except Exception as e:
|
|
_fail(f"import skills.pymatgen.materials: {type(e).__name__}: {e}")
|
|
return
|
|
|
|
# A2: 典型查询
|
|
cases = [
|
|
("C3S", "Ca3SiO5"),
|
|
("硅酸三钙", "Ca3SiO5"),
|
|
("alite", "Ca3SiO5"), # 大小写不敏感
|
|
("钙矾石", "Ca6Al2(SO4)3(OH)12·26H2O"),
|
|
("莫来石", "Al6Si2O13"),
|
|
("方镁石", "MgO"),
|
|
("石英", "SiO2"),
|
|
]
|
|
for name, expected in cases:
|
|
try:
|
|
got = lookup_phase(name)
|
|
if got == expected:
|
|
_ok(f"lookup_phase({name!r}) -> {got}")
|
|
else:
|
|
_fail(f"lookup_phase({name!r}) -> {got},期望 {expected}")
|
|
except Exception as e:
|
|
_fail(f"lookup_phase({name!r}) raised {type(e).__name__}: {e}")
|
|
|
|
# A3: 未命中抛 KeyError
|
|
try:
|
|
lookup_phase("根本不存在的相_xyz123")
|
|
_fail("lookup_phase 未命中应抛 KeyError,没抛")
|
|
except KeyError as e:
|
|
_ok(f"lookup_phase 未命中正确抛 KeyError (msg 含建议: {'补到' in str(e)})")
|
|
except Exception as e:
|
|
_fail(f"lookup_phase 未命中应抛 KeyError,实际 {type(e).__name__}")
|
|
|
|
# A4: pymatgen 本体 import
|
|
try:
|
|
from pymatgen.core import Structure, Lattice, Molecule
|
|
from pymatgen.analysis.diffraction.xrd import XRDCalculator
|
|
from pymatgen.symmetry.analyzer import SpacegroupAnalyzer
|
|
_ok("pymatgen 核心类 import 全通 (Structure / XRDCalculator / SpacegroupAnalyzer)")
|
|
except Exception as e:
|
|
_fail(f"pymatgen 本体 import: {type(e).__name__}: {e}")
|
|
return
|
|
|
|
# A5: 构造一个简单的立方结构 + XRDCalculator 跑一次
|
|
try:
|
|
from pymatgen.core import Structure, Lattice
|
|
from pymatgen.analysis.diffraction.xrd import XRDCalculator
|
|
# MgO,方镁石,典型耐火材料相
|
|
lattice = Lattice.cubic(4.21) # MgO a≈4.21Å
|
|
struct = Structure(lattice, ["Mg", "O"], [[0, 0, 0], [0.5, 0.5, 0.5]])
|
|
xrd = XRDCalculator(wavelength="CuKa")
|
|
pattern = xrd.get_pattern(struct, two_theta_range=(20, 80))
|
|
_ok(f"XRDCalculator on MgO 结构: {len(pattern.x)} 个峰,2θ 范围 [{pattern.x[0]:.1f}, {pattern.x[-1]:.1f}]")
|
|
except Exception as e:
|
|
_fail(f"XRDCalculator smoke: {type(e).__name__}: {e}")
|
|
|
|
# A6: MP host tool 未配 key 返回 [Error](key 只在宿主读,不进 sandbox)
|
|
has_key = bool(os.environ.get("MP_API_KEY"))
|
|
if has_key:
|
|
_info("MP_API_KEY 已配置,skip 缺 key 报错验证(下面 step D 测真实查询)")
|
|
else:
|
|
try:
|
|
from tools.materials_project import MaterialsProjectSearchSummaryTool
|
|
out = MaterialsProjectSearchSummaryTool().execute(formula="Ca3SiO5")
|
|
if out.startswith("[Error]") and "MP_API_KEY" in out:
|
|
_ok("mp_search_summary 未配 key 返回 [Error] 含 MP_API_KEY 提示")
|
|
else:
|
|
_fail(f"未配 key 应返回 [Error] 含 MP_API_KEY,实际: {out[:120]}")
|
|
except Exception as e:
|
|
_fail(f"mp_search_summary 未配 key 应返回 [Error] 而非抛异常: {type(e).__name__}: {e}")
|
|
|
|
|
|
def step_b_stats_ml() -> None:
|
|
_hr("step B: stats_ml skill")
|
|
|
|
# B1: sklearn
|
|
try:
|
|
import numpy as np
|
|
from sklearn.ensemble import RandomForestRegressor
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.pipeline import Pipeline
|
|
from sklearn.model_selection import cross_val_score
|
|
|
|
# 造一个 fake 配方-强度数据(50 样本 6 特征)
|
|
rng = np.random.default_rng(42)
|
|
X = rng.uniform(0, 1, size=(50, 6)) # 6 个掺合料比例
|
|
y = (X @ rng.uniform(20, 80, size=6)) + rng.normal(0, 5, size=50) # 强度 MPa
|
|
pipe = Pipeline([
|
|
("scaler", StandardScaler()),
|
|
("model", RandomForestRegressor(n_estimators=50, random_state=42)),
|
|
])
|
|
scores = cross_val_score(pipe, X, y, cv=5, scoring="r2")
|
|
_ok(f"sklearn RandomForest 5-fold R²: mean={scores.mean():.3f} std={scores.std():.3f}")
|
|
except Exception as e:
|
|
_fail(f"sklearn smoke: {type(e).__name__}: {e}")
|
|
|
|
# B2: statsmodels
|
|
try:
|
|
import numpy as np
|
|
import pandas as pd
|
|
import statsmodels.formula.api as smf
|
|
rng = np.random.default_rng(42)
|
|
df = pd.DataFrame({
|
|
"x1": rng.uniform(0, 1, 50),
|
|
"x2": rng.uniform(0, 1, 50),
|
|
})
|
|
df["y"] = 3 * df["x1"] - 2 * df["x2"] + rng.normal(0, 0.3, 50)
|
|
model = smf.ols("y ~ x1 + x2", data=df).fit()
|
|
r2 = model.rsquared
|
|
p_x1 = model.pvalues["x1"]
|
|
_ok(f"statsmodels OLS: R²={r2:.3f}, p(x1)={p_x1:.4f} (应 << 0.05)")
|
|
except Exception as e:
|
|
_fail(f"statsmodels smoke: {type(e).__name__}: {e}")
|
|
|
|
# B3: PyMC(可选)
|
|
try:
|
|
import pymc as pm
|
|
import arviz as az
|
|
_ok(f"pymc import OK (version={pm.__version__})")
|
|
# 不真跑采样(慢),只验 import
|
|
except ImportError:
|
|
_skip("PyMC / arviz 未装(可选依赖,要做贝叶斯再 pip install pymc arviz)")
|
|
except Exception as e:
|
|
_fail(f"PyMC import: {type(e).__name__}: {e}")
|
|
|
|
|
|
def step_c_plot_pub() -> None:
|
|
_hr("step C: plot_pub skill")
|
|
|
|
# C1: import + apply_pub_style
|
|
try:
|
|
from skills.plot_pub.style import apply_pub_style, reset_style, _find_chinese_font
|
|
font = _find_chinese_font()
|
|
if font:
|
|
_ok(f"_find_chinese_font 返 {font!r}")
|
|
else:
|
|
_info("系统未装中文字体候选 (SimHei/YaHei/WenQuanYi/Heiti),中文将显示方块")
|
|
|
|
apply_pub_style()
|
|
_ok("apply_pub_style() 调用通过")
|
|
except Exception as e:
|
|
_fail(f"plot_pub import / apply_pub_style: {type(e).__name__}: {e}")
|
|
return
|
|
|
|
# C2: 跑一个 minimal XRD-like 图
|
|
try:
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
tmp_dir = Path(tempfile.mkdtemp(prefix="zcbot_smoke_plot_"))
|
|
out_png = tmp_dir / "smoke_xrd.png"
|
|
out_pdf = tmp_dir / "smoke_xrd.pdf"
|
|
|
|
two_theta = np.linspace(5, 80, 1000)
|
|
# 假装是 MgO 衍射(几个高斯峰)
|
|
peaks = [(36.9, 1.0), (42.9, 0.7), (62.3, 0.5), (74.7, 0.3), (78.6, 0.4)]
|
|
intensity = np.zeros_like(two_theta)
|
|
for pos, h in peaks:
|
|
intensity += h * np.exp(-((two_theta - pos) ** 2) / (2 * 0.3 ** 2))
|
|
intensity += np.random.normal(0, 0.02, len(two_theta)) # 噪声
|
|
|
|
fig, ax = plt.subplots(figsize=(6, 4))
|
|
ax.plot(two_theta, intensity, "k-", lw=1.0, label="MgO 模拟谱")
|
|
ax.set_xlabel(r"$2\theta$ / °")
|
|
ax.set_ylabel("强度 / a.u.")
|
|
ax.set_xlim(5, 80)
|
|
ax.legend(frameon=False)
|
|
fig.tight_layout()
|
|
fig.savefig(out_png, dpi=200)
|
|
fig.savefig(out_pdf)
|
|
plt.close(fig)
|
|
|
|
png_size = out_png.stat().st_size
|
|
pdf_size = out_pdf.stat().st_size
|
|
_ok(f"出图 PNG ({png_size/1024:.1f}KB) + PDF ({pdf_size/1024:.1f}KB) -> {tmp_dir}")
|
|
except Exception as e:
|
|
_fail(f"plot_pub 出图: {type(e).__name__}: {e}")
|
|
|
|
# C3: 还原 rcParams 防污染后续步骤
|
|
try:
|
|
reset_style()
|
|
_ok("reset_style() 还原 matplotlib defaults")
|
|
except Exception as e:
|
|
_fail(f"reset_style: {type(e).__name__}: {e}")
|
|
|
|
|
|
def step_d_mp_online() -> None:
|
|
_hr("step D: Materials Project 联网(可选,需 MP_API_KEY)")
|
|
|
|
if not os.environ.get("MP_API_KEY"):
|
|
_skip("MP_API_KEY 未配,跳过联网查询(.env 加 MP_API_KEY=... 即可,免费申请 https://materialsproject.org/api)")
|
|
return
|
|
|
|
try:
|
|
import json
|
|
from skills.pymatgen.materials import lookup_phase
|
|
from tools.materials_project import MaterialsProjectSearchSummaryTool
|
|
formula = lookup_phase("C3S") # Ca3SiO5
|
|
t0 = time.time()
|
|
out = MaterialsProjectSearchSummaryTool().execute(
|
|
formula=formula,
|
|
fields=["material_id", "formula_pretty", "energy_above_hull"],
|
|
limit=3,
|
|
)
|
|
dt = (time.time() - t0) * 1000
|
|
if out.startswith("[Error]"):
|
|
_fail(f"mp_search_summary 查 {formula}: {out[:160]}")
|
|
return
|
|
docs = json.loads(out)
|
|
_ok(f"mp_search_summary 查 {formula}: 返回 {len(docs)} 条 in {dt:.0f}ms")
|
|
for d in docs[:3]:
|
|
print(f" {d.get('material_id')} {d.get('formula_pretty')} ehull={d.get('energy_above_hull')}")
|
|
except Exception as e:
|
|
_fail(f"mp_search_summary 联网查询: {type(e).__name__}: {e}")
|
|
|
|
|
|
def main() -> int:
|
|
print("=" * 60)
|
|
print("zcbot scientific skills smoke (pymatgen / stats_ml / plot_pub)")
|
|
print("=" * 60)
|
|
for fn in (step_a_pymatgen, step_b_stats_ml, step_c_plot_pub, step_d_mp_online):
|
|
try:
|
|
fn()
|
|
except Exception as e:
|
|
_fail(f"[{fn.__name__} crashed] {type(e).__name__}: {e}")
|
|
print()
|
|
print("=" * 60)
|
|
print("smoke done")
|
|
print("=" * 60)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|