Source code for biochar.temperature_model

"""
Data-driven temperature × feedstock property model.

Maps pyrolysis temperature (and optionally feedstock) to biochar composition
targets (H/C, O/C, aromaticity) and a set of reference properties, fit from
public characterization data.

Two stages:

* :func:`build_model` (offline / dev) parses the source CSVs, computes molar
  H/C and O/C from elemental mass %, robustly smooths each property versus
  temperature, fits ``aromaticity = f(H/C)``, and writes a compact JSON
  artifact ``biochar/data/temperature_model.json``.
* :class:`TemperatureModel` (runtime) loads that JSON and answers queries with
  ``numpy.interp`` only — no scikit-learn/scipy/pandas needed.

Primary data source (definitive): **UC Davis Biochar Database**
(https://biochar.ucdavis.edu/), ``davis-biochar-db.csv``.
Aromaticity calibration only: ``biochar_data.csv`` (NMR aromaticity vs H/C).
Methodological parent: Wood, Mašek & Erastova, *Cell Reports Physical Science*
5(7), 2024, DOI 10.1016/j.xcrp.2024.102036.
"""

from __future__ import annotations

import csv
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np

logger = logging.getLogger(__name__)

_DATA_DIR = Path(__file__).parent / "data"
_MODEL_PATH = _DATA_DIR / "temperature_model.json"
_DAVIS_CSV = _DATA_DIR / "davis-biochar-db.csv"
_AROM_CSV = _DATA_DIR / "biochar_data.csv"

# Molar masses (g/mol) for converting elemental mass % to molar ratios.
_M_H, _M_C, _M_O = 1.008, 12.011, 15.999

# Smoothing grid and kernel.
_GRID = list(range(100, 1001, 25))     # °C
_BANDWIDTH = 50.0                       # °C, Gaussian kernel
_MAD_K = 3.0                            # windowed outlier trim threshold

# Feedstock taxonomy: davis ``Feedstock Composition`` label → normalized name.
# Exact (case-insensitive) match — "wood" is its OWN category, not soft/hard.
_FEEDSTOCK_MAP = {
    "soft wood": "softwood",
    "hard wood": "hardwood",
    "grass": "grass",
    "manure": "manure",
    "corn stover": "corn_stover",
    "wood": "wood",
}
#: First-class feedstocks eligible for per-group H/C and O/C overrides.
VALID_FEEDSTOCKS = tuple(sorted(set(_FEEDSTOCK_MAP.values())))

# Per-feedstock override is emitted only for these properties and only if a
# group passes the sufficiency gate below.
_OVERRIDE_PROPS = ("H_C_ratio", "O_C_ratio")
_GATE_MIN_N = 30
_GATE_MIN_TSPAN = 300.0

# Davis column indices (0-based). See module docstring / source header.
_DAVIS = {
    "feedstock": 1,
    "temp": 10,
    "ash_pct": 11,
    "C_pct": 13,        # "Total C (%) Used in Plot" — per project decision
    "N_pct": 14,
    "H_pct": 15,
    "O_pct": 16,
    "pH": 17,
    "ec_dsm": 18,
    "VM_pct": 41,
    "surface_area_m2g": 42,
}
# Reference properties read directly from davis columns (besides computed ratios).
_DAVIS_DIRECT = ("C_pct", "H_pct", "O_pct", "N_pct", "ash_pct", "pH",
                 "ec_dsm", "VM_pct", "surface_area_m2g")


# ---------------------------------------------------------------------------
# Small numeric helpers (numpy only)
# ---------------------------------------------------------------------------

def _to_float(x: Optional[str]) -> Optional[float]:
    if x is None:
        return None
    x = x.strip().rstrip("%").strip()
    if x == "":
        return None
    try:
        return float(x)
    except ValueError:
        return None


def _weighted_median(values: np.ndarray, weights: np.ndarray) -> float:
    order = np.argsort(values)
    v, w = values[order], weights[order]
    cw = np.cumsum(w)
    if cw[-1] <= 0:
        return float(np.median(values))
    return float(v[np.searchsorted(cw, 0.5 * cw[-1])])


def _mad_trim(t: np.ndarray, y: np.ndarray,
              window: float = 75.0, k: float = _MAD_K) -> np.ndarray:
    """Boolean mask keeping points within k·MAD of the local (windowed) median."""
    keep = np.ones(len(y), dtype=bool)
    for i in range(len(y)):
        local = np.abs(t - t[i]) <= window
        yl = y[local]
        if yl.size < 4:
            continue
        med = np.median(yl)
        mad = np.median(np.abs(yl - med))
        if mad > 0 and abs(y[i] - med) > k * mad:
            keep[i] = False
    return keep


def _smooth(t: np.ndarray, y: np.ndarray) -> dict:
    """Gaussian-kernel weighted-median smoothing of (t, y) onto :data:`_GRID`."""
    keep = _mad_trim(t, y)
    t, y = t[keep], y[keep]
    grid = np.array(_GRID, dtype=float)
    mean, spread, n = [], [], []
    for tc in grid:
        w = np.exp(-0.5 * ((t - tc) / _BANDWIDTH) ** 2)
        wsum = w.sum()
        if wsum < 1e-6 or t.size == 0:
            mean.append(float("nan")); spread.append(float("nan")); n.append(0)
            continue
        m = _weighted_median(y, w)
        sp = _weighted_median(np.abs(y - m), w)  # weighted MAD
        mean.append(m); spread.append(sp)
        n.append(int(np.count_nonzero(np.abs(t - tc) <= 2 * _BANDWIDTH)))
    mean = _fill_and_clamp(np.array(mean))
    return {
        "mean": [round(float(v), 5) for v in mean],
        "spread": [None if np.isnan(s) else round(float(s), 5) for s in spread],
        "n": n,
        "t_min": float(t.min()) if t.size else None,
        "t_max": float(t.max()) if t.size else None,
    }


def _fill_and_clamp(arr: np.ndarray) -> np.ndarray:
    """Replace NaN grid points by nearest finite neighbour (endpoint clamp)."""
    arr = arr.copy()
    finite = np.where(~np.isnan(arr))[0]
    if finite.size == 0:
        return np.zeros_like(arr)
    for i in range(len(arr)):
        if np.isnan(arr[i]):
            j = finite[np.argmin(np.abs(finite - i))]
            arr[i] = arr[j]
    return arr


# ---------------------------------------------------------------------------
# Offline build
# ---------------------------------------------------------------------------

def _read_csv(path: Path) -> List[List[str]]:
    with open(path, newline="", encoding="utf-8", errors="replace") as fh:
        return list(csv.reader(fh))[1:]  # drop header


def _davis_records(rows: List[List[str]]) -> Dict[str, List[Tuple[float, float, Optional[str]]]]:
    """Return {property: [(temp, value, feedstock_norm), ...]} from davis rows."""
    out: Dict[str, list] = {p: [] for p in
                            ("H_C_ratio", "O_C_ratio", *_DAVIS_DIRECT)}
    ci = _DAVIS
    for r in rows:
        if len(r) <= ci["surface_area_m2g"]:
            continue
        temp = _to_float(r[ci["temp"]])
        if temp is None:
            continue
        feed = _classify_feedstock(r[ci["feedstock"]])
        c = _to_float(r[ci["C_pct"]])
        h = _to_float(r[ci["H_pct"]])
        o = _to_float(r[ci["O_pct"]])
        if c and h and c > 0:
            out["H_C_ratio"].append((temp, (h / _M_H) / (c / _M_C), feed))
        if c and o and c > 0:
            out["O_C_ratio"].append((temp, (o / _M_O) / (c / _M_C), feed))
        for prop in _DAVIS_DIRECT:
            v = _to_float(r[ci[prop]])
            if v is not None:
                out[prop].append((temp, v, feed))
    return out


def _fit_aromaticity(arom_rows: List[List[str]]) -> dict:
    """Linear (clamped) fit aromaticity% = a + b·(H/C) from biochar_data.csv."""
    HC, AR = 4, 16
    pts = []
    for r in arom_rows:
        if len(r) <= AR:
            continue
        hc, ar = _to_float(r[HC]), _to_float(r[AR])
        if hc is not None and ar is not None:
            pts.append((hc, ar))
    x = np.array([p[0] for p in pts]); y = np.array([p[1] for p in pts])
    b, a = np.polyfit(x, y, 1)  # slope, intercept
    pred = a + b * x
    ss_res = float(np.sum((y - pred) ** 2))
    ss_tot = float(np.sum((y - y.mean()) ** 2))
    r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
    return {"a": round(float(a), 5), "b": round(float(b), 5),
            "n": len(pts), "r2": round(r2, 4),
            "hc_min": round(float(x.min()), 4), "hc_max": round(float(x.max()), 4)}


def _dedup_report(davis_rows, arom_rows) -> int:
    """Count biochar_data rows whose (Temp±10, H/C±0.02) match a davis row."""
    ci = _DAVIS
    keys = set()
    for r in davis_rows:
        if len(r) <= ci["H_pct"]:
            continue
        t = _to_float(r[ci["temp"]]); c = _to_float(r[ci["C_pct"]]); h = _to_float(r[ci["H_pct"]])
        if t and c and h and c > 0:
            keys.add((round(t / 10) * 10, round((h / _M_H) / (c / _M_C), 2)))
    dups = 0
    for r in arom_rows:
        if len(r) <= 5:
            continue
        t, hc = _to_float(r[5]), _to_float(r[4])
        if t is not None and hc is not None and (round(t / 10) * 10, round(hc, 2)) in keys:
            dups += 1
    return dups


[docs] def build_model(output_path: Optional[Path] = None) -> Path: """ Build the temperature-model JSON artifact from the bundled CSVs. Dev/offline entry point:: python -c "from biochar.temperature_model import build_model; build_model()" Returns the path written. Prints a short QC report (dedup count, per-property point counts, aromaticity fit R²). """ davis_rows = _read_csv(_DAVIS_CSV) arom_rows = _read_csv(_AROM_CSV) dups = _dedup_report(davis_rows, arom_rows) print(f"[build] davis rows={len(davis_rows)} aromaticity rows={len(arom_rows)}") print(f"[build] davis is definitive; {dups} biochar_data rows duplicate davis " f"(Temp±10, H/C±0.02) — not merged (aromaticity sourced separately)") recs = _davis_records(davis_rows) properties: Dict[str, dict] = {} for prop, triples in recs.items(): if not triples: continue t = np.array([x[0] for x in triples], dtype=float) y = np.array([x[1] for x in triples], dtype=float) properties[prop] = _smooth(t, y) print(f"[build] {prop:18s} n={len(triples):4d}") # Per-feedstock overrides (H/C, O/C only) for gate-passing first-class groups. overrides: Dict[str, dict] = {} for fs in VALID_FEEDSTOCKS: for prop in _OVERRIDE_PROPS: triples = [x for x in recs[prop] if x[2] == fs] if len(triples) < _GATE_MIN_N: continue t = np.array([x[0] for x in triples], dtype=float) if float(t.max() - t.min()) < _GATE_MIN_TSPAN: continue y = np.array([x[1] for x in triples], dtype=float) overrides.setdefault(fs, {})[prop] = _smooth(t, y) if fs in overrides: print(f"[build] override[{fs}] props={list(overrides[fs])}") arom = _fit_aromaticity(arom_rows) print(f"[build] aromaticity = {arom['a']} + {arom['b']}·(H/C) " f"(n={arom['n']}, R²={arom['r2']})") artifact = { "schema": 1, "grid_celsius": _GRID, "bandwidth_celsius": _BANDWIDTH, "properties": properties, "feedstock_overrides": overrides, "aromaticity_fit": arom, "provenance": { "primary_source": "UC Davis Biochar Database", "primary_url": "https://biochar.ucdavis.edu/", "aromaticity_source": "biochar_data.csv (NMR aromaticity vs H/C)", "method_ref": "Wood, Mašek & Erastova, Cell Rep. Phys. Sci. 5(7) 2024, " "DOI 10.1016/j.xcrp.2024.102036", "davis_rows": len(davis_rows), "duplicate_rows_vs_biochar_data": dups, "feedstock_note": "woody/lignocellulosic-dominated; unmapped labels pooled", }, } path = output_path or _MODEL_PATH path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as fh: json.dump(artifact, fh, indent=2) print(f"[build] wrote {path}") return path
def _classify_feedstock(raw: str) -> Optional[str]: """Normalize a davis feedstock label; return None if not a first-class group.""" return _FEEDSTOCK_MAP.get(raw.strip().lower()) # --------------------------------------------------------------------------- # Runtime # ---------------------------------------------------------------------------
[docs] class TemperatureModel: """ Query the bundled temperature × feedstock property model. Args: model_path: Path to the JSON artifact (defaults to the bundled file). """ def __init__(self, model_path: Optional[Path] = None): path = Path(model_path) if model_path else _MODEL_PATH with open(path, encoding="utf-8") as fh: self._m = json.load(fh) self._grid = np.array(self._m["grid_celsius"], dtype=float) # -- property access ---------------------------------------------------- def _series(self, prop: str, feedstock: Optional[str]): """Return (mean_array, t_min, t_max) honouring a feedstock override.""" if (feedstock and prop in _OVERRIDE_PROPS and feedstock in self._m["feedstock_overrides"] and prop in self._m["feedstock_overrides"][feedstock]): s = self._m["feedstock_overrides"][feedstock][prop] return np.array(s["mean"], dtype=float), s.get("t_min"), s.get("t_max") s = self._m["properties"].get(prop) if s is None: return None, None, None return np.array(s["mean"], dtype=float), s.get("t_min"), s.get("t_max")
[docs] def predict(self, temperature: float, prop: str, feedstock: Optional[str] = None) -> float: """ Predict *prop* at *temperature* (°C). Uses the feedstock-specific curve only for H/C and O/C when that group has one and *temperature* is within its support; otherwise the pooled curve. Clamped to the grid ends. """ use_fs = feedstock # Fall back to pooled if the override doesn't cover this temperature. if feedstock and prop in _OVERRIDE_PROPS: ov = self._m["feedstock_overrides"].get(feedstock, {}).get(prop) if ov is None or not (_in_range(temperature, ov.get("t_min"), ov.get("t_max"))): use_fs = None mean, _, _ = self._series(prop, use_fs) if mean is None: raise KeyError(f"Unknown property: {prop!r}") return float(np.interp(temperature, self._grid, mean))
[docs] def aromaticity_from_hc(self, hc: float) -> float: f = self._m["aromaticity_fit"] return float(min(100.0, max(0.0, f["a"] + f["b"] * hc)))
[docs] def composition(self, temperature: float, feedstock: Optional[str] = None) -> Dict[str, float]: """The three generator targets at *temperature* (and *feedstock*).""" hc = self.predict(temperature, "H_C_ratio", feedstock) oc = self.predict(temperature, "O_C_ratio", feedstock) return { "H_C_ratio": hc, "O_C_ratio": oc, "aromaticity_percent": self.aromaticity_from_hc(hc), }
[docs] def predict_all(self, temperature: float, feedstock: Optional[str] = None) -> Dict[str, float]: """Every modeled property at *temperature* (reference query).""" out = self.composition(temperature, feedstock) for prop in self._m["properties"]: if prop not in out: out[prop] = self.predict(temperature, prop, feedstock) return out
@property def feedstocks(self) -> Tuple[str, ...]: return VALID_FEEDSTOCKS @property def provenance(self) -> dict: return dict(self._m.get("provenance", {}))
def _in_range(t: float, lo, hi) -> bool: return lo is not None and hi is not None and lo <= t <= hi # Cached default model (loaded once; runtime needs only numpy + json). _DEFAULT_MODEL: Optional[TemperatureModel] = None def get_default_model() -> TemperatureModel: """Return a process-wide cached :class:`TemperatureModel` for the bundled artifact.""" global _DEFAULT_MODEL if _DEFAULT_MODEL is None: _DEFAULT_MODEL = TemperatureModel() return _DEFAULT_MODEL
[docs] def properties(temperature: float, feedstock: Optional[str] = None) -> Dict[str, float]: """ Convenience: all modeled biochar properties at a pyrolysis *temperature* (°C), optionally for a *feedstock* (one of :data:`VALID_FEEDSTOCKS`). Example:: import biochar biochar.properties(600, feedstock="softwood") """ if feedstock is not None and feedstock not in VALID_FEEDSTOCKS: raise ValueError( f"feedstock must be one of {VALID_FEEDSTOCKS} or None, got {feedstock!r}" ) return get_default_model().predict_all(temperature, feedstock)
# --------------------------------------------------------------------------- # Refit-and-compare helper (for when Charchive DB arrives) # ---------------------------------------------------------------------------
[docs] def compare_models(path_a: Path, path_b: Path) -> Dict[str, dict]: """ Compare two model artifacts on their shared grid; report per-property mean absolute and max prediction delta. Used to evaluate a refit (e.g. davis-only vs davis+Charchive). """ a = json.load(open(path_a)); b = json.load(open(path_b)) out = {} shared = set(a["properties"]) & set(b["properties"]) for prop in sorted(shared): ma = np.array(a["properties"][prop]["mean"], dtype=float) mb = np.array(b["properties"][prop]["mean"], dtype=float) n = min(len(ma), len(mb)) d = np.abs(ma[:n] - mb[:n]) out[prop] = {"mean_abs_delta": round(float(d.mean()), 5), "max_abs_delta": round(float(d.max()), 5)} return out