"""Change-point model fitting algorithms for building energy analysis.
This module contains pure change-point modeling functions for statistical
analysis of energy consumption patterns with respect to temperature.
"""
from __future__ import annotations
import logging
from math import isclose
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.lines import Line2D
from pydantic import BaseModel, Field
from scipy import optimize, stats
# ChangePointModelResult defined at end of file to avoid circular imports
from better_lbnl_os.constants import (
DEFAULT_CVRMSE_THRESHOLD,
DEFAULT_R2_THRESHOLD,
DEFAULT_SIGNIFICANT_PVAL,
)
logger = logging.getLogger(__name__)
# Default thresholds now sourced from data.constants
[docs]
def fit_changepoint_model(
x: np.ndarray,
y: np.ndarray,
min_r_squared: float = DEFAULT_R2_THRESHOLD,
max_cv_rmse: float = DEFAULT_CVRMSE_THRESHOLD,
) -> ChangePointModelResult:
"""Fit a change-point model to any x,y data relationship.
This is the main entry point for change-point model fitting. It automatically
determines the best model type (1P, 3P, or 5P) based on statistical significance
and model quality metrics.
Common usage examples:
- Energy analysis: x=temperature, y=energy_use
- Price analysis: x=price, y=demand
- Time series: x=time, y=usage
Args:
x: Array of independent variable values (e.g., temperature, price, time)
y: Array of dependent variable values (e.g., energy_use, demand, usage)
min_r_squared: Minimum R² threshold for model acceptance
max_cv_rmse: Maximum CV-RMSE threshold for model acceptance
Returns:
ChangePointModelResult with fitted coefficients and quality metrics
Raises:
ValueError: If input arrays are invalid or empty
Exception: If model fitting fails
"""
# Input validation
_validate_model_inputs(x, y)
# Check for constant or near-constant data (early 1P detection)
# If data has no variance, skip changepoint fitting and use 1P model directly
y_std = np.std(y)
y_mean = np.mean(y)
if y_std < 1e-6 or (y_mean != 0 and y_std / abs(y_mean) < 0.01):
# Data is essentially constant, use 1P model (baseload only)
return _fit_1p_model(x, y, max_cv_rmse)
# Set up bounds for model fitting
bounds = _create_model_bounds(x, y)
# Try fitting with different change-point bounds
search_bounds = _create_changepoint_search_bounds(x, n_bins=8)
fit_results = []
for cp_bounds in search_bounds:
try:
# Update bounds for this iteration
iteration_bounds = bounds.copy()
iteration_bounds[0][1] = cp_bounds[0][0] # left changepoint lower
iteration_bounds[1][1] = cp_bounds[0][1] # left changepoint upper
iteration_bounds[0][3] = cp_bounds[1][0] # right changepoint lower
iteration_bounds[1][3] = cp_bounds[1][1] # right changepoint upper
result = _fit_model_once(x, y, iteration_bounds)
fit_results.append(result)
except Exception:
# Some bounds combinations may fail - this is expected
continue
if not fit_results:
raise Exception("Could not fit any change-point model with given data")
# Select best model and determine type
optimal_model = _select_optimal_model(fit_results, x, y, min_r_squared, max_cv_rmse)
return optimal_model
def _validate_model_inputs(x: np.ndarray, y: np.ndarray) -> None:
"""Validate inputs for change-point model fitting."""
if not np.any(x):
raise ValueError("x must have at least one element")
if not np.any(y):
raise ValueError("y must have at least one element")
if np.size(y) != np.size(x):
raise ValueError("x and y arrays must have the same length")
if np.size(x) < 3:
raise ValueError("Need at least 3 data points for change-point modeling")
if all(np.isnan(x)):
raise ValueError("x data cannot be all NaN")
def _create_model_bounds(x: np.ndarray, y: np.ndarray) -> list:
"""Create bounds for model coefficient optimization."""
left_slope_bounds = [-np.inf, 0] # left slope (negative for energy/temperature)
left_cp_bounds = [np.min(x), np.max(x)] # left changepoint
baseline_bounds = [np.min(y), np.max(y)] # baseline value
right_cp_bounds = [np.min(x), np.max(x)] # right changepoint
right_slope_bounds = [0, np.inf] # right slope (positive for energy/temperature)
return [
[
left_slope_bounds[0],
left_cp_bounds[0],
baseline_bounds[0],
right_cp_bounds[0],
right_slope_bounds[0],
],
[
left_slope_bounds[1],
left_cp_bounds[1],
baseline_bounds[1],
right_cp_bounds[1],
right_slope_bounds[1],
],
]
def _create_changepoint_search_bounds(x: np.ndarray, n_bins: int = 4) -> list:
"""Create search bounds for left and right changepoints."""
bin_width = np.ptp(x) / n_bins
marks = [np.min(x) + i * bin_width for i in range(n_bins + 1)]
bounds_list = []
for i in range(len(marks) - 1):
for j in range(i + 1, len(marks) - 1):
bounds_list.append(
[
(marks[i], marks[i + 1]), # left changepoint bounds
(marks[j], marks[j + 1]), # right changepoint bounds
]
)
return bounds_list
def _fit_model_once(x: np.ndarray, y: np.ndarray, bounds: list) -> dict:
"""Fit the piecewise linear model once with given bounds."""
# Perform curve fitting
popt, pcov = optimize.curve_fit(
f=piecewise_linear_5p, xdata=x, ydata=y, bounds=bounds, method="dogbox"
)
# Calculate model quality metrics
y_predicted = piecewise_linear_5p(x, *popt)
r2 = calculate_r_squared(y, y_predicted)
cvrmse = calculate_cvrmse(y, y_predicted)
# Check slope significance
pval_left, valid_left = _check_slope_significance(popt[0], x, y, popt, is_left_slope=True)
pval_right, valid_right = _check_slope_significance(popt[4], x, y, popt, is_left_slope=False)
return {
"coefficients": popt,
"covariance": pcov,
"r_squared": r2,
"cvrmse": cvrmse,
"heating_pvalue": pval_left, # Keep legacy key names for compatibility
"cooling_pvalue": pval_right,
"heating_significant": valid_left,
"cooling_significant": valid_right,
}
def _check_slope_significance(
slope: float, x: np.ndarray, y: np.ndarray, coefficients: np.ndarray, is_left_slope: bool
) -> tuple[float | None, bool]:
"""Check if a left or right slope is statistically significant."""
if isclose(slope, 0, abs_tol=1e-5):
return None, False
if is_left_slope:
# Check left slope significance
changepoint = coefficients[1]
mask = x <= changepoint
else:
# Check right slope significance
changepoint = coefficients[3]
mask = x >= changepoint
x_subset = x[mask]
y_subset = y[mask]
if len(x_subset) <= 2:
return np.inf, False
y_predicted = piecewise_linear_5p(x_subset, *coefficients)
pvalue = _calculate_slope_pvalue(slope, x_subset, y_subset, y_predicted)
return pvalue, pvalue < DEFAULT_SIGNIFICANT_PVAL
def _calculate_slope_pvalue(
slope: float, x_data: np.ndarray, y_data: np.ndarray, y_predicted: np.ndarray
) -> float:
"""Calculate p-value for regression slope significance."""
if len(x_data) <= 2:
return np.inf
# Calculate standard error of slope
residuals = y_data - y_predicted
sample_variance = np.sum(residuals**2) / (len(x_data) - 2)
sum_squares_x = np.sum((x_data - np.mean(x_data)) ** 2)
standard_error = np.sqrt(sample_variance / sum_squares_x)
# Calculate t-statistic and p-value
t_statistic = slope / standard_error
pvalue = stats.t.sf(np.abs(t_statistic), len(x_data) - 1) * 2 # two-tailed test
return pvalue
def _select_optimal_model(
fit_results: list, x: np.ndarray, y: np.ndarray, min_r_squared: float, max_cv_rmse: float
) -> ChangePointModelResult:
"""Select the optimal model from fit results and determine model type."""
# Convert results to DataFrame for easier analysis
rows = []
for result in fit_results:
coeff = result["coefficients"]
row = [
coeff[0],
coeff[1],
coeff[2],
coeff[3],
coeff[4], # coefficients
result["r_squared"],
result["cvrmse"],
result["heating_pvalue"],
result["cooling_pvalue"],
result["heating_significant"],
result["cooling_significant"],
]
rows.append(row)
df_fits = pd.DataFrame(
rows,
columns=[
"heating_slope",
"heating_changepoint",
"baseload",
"cooling_changepoint",
"cooling_slope",
"r_squared",
"cvrmse",
"heating_pvalue",
"cooling_pvalue",
"heating_significant",
"cooling_significant",
],
)
# Filter for models with at least one significant slope
df_significant = df_fits[(df_fits["heating_significant"]) | (df_fits["cooling_significant"])]
if len(df_significant) > 0:
# Select model with highest R²
best_idx = df_significant["r_squared"].idxmax()
best_model = df_significant.loc[best_idx]
# Determine model type and validate
model_type, coefficients = _determine_model_type(best_model, x, y, min_r_squared)
if model_type != "No-fit":
return ChangePointModelResult(
model_type=model_type,
heating_slope=coefficients.get("heating_slope"),
heating_change_point=coefficients.get("heating_changepoint"),
baseload=coefficients["baseload"],
cooling_change_point=coefficients.get("cooling_changepoint"),
cooling_slope=coefficients.get("cooling_slope"),
r_squared=best_model["r_squared"],
cvrmse=best_model["cvrmse"],
heating_pvalue=best_model["heating_pvalue"],
cooling_pvalue=best_model["cooling_pvalue"],
)
# Try 1P model as fallback
return _fit_1p_model(x, y, max_cv_rmse)
def _determine_model_type(
model_row: pd.Series, x: np.ndarray, y: np.ndarray, min_r_squared: float
) -> tuple[str, dict]:
"""Determine model type (5P, 3P, etc.) and extract coefficients."""
heating_significant = model_row["heating_significant"]
cooling_significant = model_row["cooling_significant"]
if heating_significant and cooling_significant:
# 5P model
coefficients = {
"heating_slope": model_row["heating_slope"],
"heating_changepoint": model_row["heating_changepoint"],
"baseload": model_row["baseload"],
"cooling_changepoint": model_row["cooling_changepoint"],
"cooling_slope": model_row["cooling_slope"],
}
# Validate R² threshold
test_coeffs = [
coefficients["heating_slope"],
coefficients["heating_changepoint"],
coefficients["baseload"],
coefficients["cooling_changepoint"],
coefficients["cooling_slope"],
]
if _check_r2_threshold(x, y, test_coeffs, min_r_squared):
return "5P", coefficients
elif cooling_significant and not heating_significant:
# 3P cooling model
coefficients = {
"heating_slope": None,
"heating_changepoint": None,
"baseload": model_row["baseload"],
"cooling_changepoint": model_row["cooling_changepoint"],
"cooling_slope": model_row["cooling_slope"],
}
test_coeffs = [
None,
None,
coefficients["baseload"],
coefficients["cooling_changepoint"],
coefficients["cooling_slope"],
]
if _check_r2_threshold(x, y, test_coeffs, min_r_squared):
return "3P Cooling", coefficients
elif heating_significant and not cooling_significant:
# 3P heating model
coefficients = {
"heating_slope": model_row["heating_slope"],
"heating_changepoint": model_row["heating_changepoint"],
"baseload": model_row["baseload"],
"cooling_changepoint": None,
"cooling_slope": None,
}
test_coeffs = [
coefficients["heating_slope"],
coefficients["heating_changepoint"],
coefficients["baseload"],
None,
None,
]
if _check_r2_threshold(x, y, test_coeffs, min_r_squared):
return "3P Heating", coefficients
return "No-fit", {}
def _check_r2_threshold(
x: np.ndarray, y: np.ndarray, coefficients: list, min_r_squared: float
) -> bool:
"""Check if model meets R² threshold."""
predicted = piecewise_linear_5p(x, *coefficients)
r2 = calculate_r_squared(y, predicted)
return r2 >= min_r_squared
def _fit_1p_model(x: np.ndarray, y: np.ndarray, max_cv_rmse: float) -> ChangePointModelResult:
"""Fit a 1P (constant) model as fallback."""
baseload = np.mean(y)
predicted = np.full_like(y, baseload)
r2 = calculate_r_squared(y, predicted)
cvrmse = calculate_cvrmse(y, predicted)
if cvrmse <= max_cv_rmse:
model_type = "1P"
else:
model_type = "No-fit"
return ChangePointModelResult(
model_type=model_type,
heating_slope=None,
heating_change_point=None,
baseload=baseload,
cooling_change_point=None,
cooling_slope=None,
r_squared=r2,
cvrmse=cvrmse,
heating_pvalue=None,
cooling_pvalue=None,
)
[docs]
def piecewise_linear_5p(
x: np.ndarray,
heating_slope: float | None,
heating_changepoint: float | None,
baseload: float,
cooling_changepoint: float | None,
cooling_slope: float | None,
) -> np.ndarray:
r"""Five-parameter piecewise linear function for change-point modeling.
This function implements the classic change-point model:
- Heating slope (negative) below heating changepoint
- Flat baseload between changepoints
- Cooling slope (positive) above cooling changepoint
Visual representation:
k1 \ / k2
\ /
y0 \__________/
cpL cpR
Where:
k1 = heating_slope (typically negative)
k2 = cooling_slope (typically positive)
y0 = baseload (constant energy use)
cpL = heating_changepoint
cpR = cooling_changepoint
Args:
x: Temperature values
heating_slope: Slope for heating regime (typically negative)
heating_changepoint: Temperature where heating turns on
baseload: Constant energy use in neutral zone
cooling_changepoint: Temperature where cooling turns on
cooling_slope: Slope for cooling regime (typically positive)
Returns:
Predicted energy use values
"""
if baseload is None:
return np.full_like(x, np.nan)
# Handle 1P model (baseload only)
if all(
param is None or np.isnan(param)
for param in [heating_slope, heating_changepoint, cooling_changepoint, cooling_slope]
):
return np.full_like(x, baseload)
# Handle 3P models by setting missing parameters
if (
heating_changepoint is None
or heating_slope is None
or np.isnan(heating_changepoint)
or np.isnan(heating_slope)
):
heating_changepoint = cooling_changepoint
heating_slope = 0
if (
cooling_changepoint is None
or cooling_slope is None
or np.isnan(cooling_changepoint)
or np.isnan(cooling_slope)
):
cooling_changepoint = heating_changepoint
cooling_slope = 0
# Define conditions and functions for piecewise model
conditions = [
x < heating_changepoint,
(x >= heating_changepoint) & (x <= cooling_changepoint),
x > cooling_changepoint,
]
functions = [
lambda x: heating_slope * x + baseload - heating_slope * heating_changepoint,
lambda x: baseload,
lambda x: cooling_slope * x + baseload - cooling_slope * cooling_changepoint,
]
return np.piecewise(x, conditions, functions)
[docs]
def calculate_r_squared(y_actual: np.ndarray, y_predicted: np.ndarray | float) -> float:
"""Calculate R-squared (coefficient of determination).
Args:
y_actual: Actual values
y_predicted: Predicted values
Returns:
R-squared value between 0 and 1
Raises:
ValueError: If inputs are invalid
Exception: If there's no variance in actual values
"""
if not isinstance(y_actual, np.ndarray):
raise ValueError("y_actual must be a numpy array")
if y_actual.size == 0:
raise ValueError("y_actual cannot be empty")
if not isinstance(y_predicted, (np.ndarray, float)):
raise ValueError("y_predicted must be numpy array or float")
if isinstance(y_predicted, np.ndarray) and y_predicted.size == 0:
raise ValueError("y_predicted cannot be empty array")
residuals = y_actual - y_predicted
ss_residuals = np.sum(residuals**2)
ss_total = np.sum((y_actual - np.mean(y_actual)) ** 2)
# For constant data (no variance), R² is undefined but we return 0
# This occurs when fitting 1P model to constant data
if ss_total == 0:
# If residuals are also 0 (perfect prediction of constant), return 1
# Otherwise return 0 (model predicts mean, which is the best we can do)
return 1.0 if ss_residuals == 0 else 0.0
return 1 - (ss_residuals / ss_total)
[docs]
def calculate_cvrmse(y_actual: np.ndarray, y_predicted: np.ndarray) -> float:
"""Calculate Coefficient of Variation of Root Mean Squared Error.
Args:
y_actual: Actual values
y_predicted: Predicted values
Returns:
CV-RMSE value
"""
rmse = np.sqrt(np.mean((y_actual - y_predicted) ** 2))
mean_actual = np.mean(y_actual)
return rmse / mean_actual if mean_actual != 0 else np.inf
[docs]
def plot_changepoint_model(
x: np.ndarray,
y: np.ndarray,
model_result: ChangePointModelResult,
x_label: str = "X",
y_label: str = "Y",
title: str | None = None,
figsize: tuple[int, int] = (12, 6),
save_path: str | None = None,
) -> tuple[plt.Figure, plt.Axes]:
"""Plot change-point model results with data points and fitted line.
Args:
x: Independent variable data (e.g., temperature)
y: Dependent variable data (e.g., energy use)
model_result: Fitted change-point model result
x_label: Label for x-axis
y_label: Label for y-axis
title: Plot title (auto-generated if None)
figsize: Figure size tuple
save_path: Path to save figure (optional)
Returns:
Figure and axes objects
"""
fig, ax = plt.subplots(1, 1, figsize=figsize)
fig.subplots_adjust(right=0.75)
# Always work with numpy arrays to avoid pandas indexing surprises
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
# Plot data points
scatter = ax.scatter(x, y, alpha=0.6, s=30, c="k", label="Data")
if model_result.model_type == "No-fit":
ax.set_title(f"No valid model fit ({len(x)} data points)")
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
return fig, ax
# Create a smooth x-range for plotting the model curve
x_min, x_max = np.min(x), np.max(x)
# Guard against a single-point input which would collapse np.ptp
span = np.ptp(x) or max(abs(x_min), 1.0)
x_range = np.linspace(x_min - 0.05 * span, x_max + 0.05 * span, 300)
y_range = piecewise_linear_5p(
x_range,
model_result.heating_slope,
model_result.heating_change_point,
model_result.baseload,
model_result.cooling_change_point,
model_result.cooling_slope,
)
# Adjust scale so plotted curve matches the units of provided y-values when they differ
# (e.g., model fit on kWh/sqft/day but caller plots monthly EUI).
pred_at_x = piecewise_linear_5p(
x,
model_result.heating_slope,
model_result.heating_change_point,
model_result.baseload,
model_result.cooling_change_point,
model_result.cooling_slope,
)
valid_mask = np.isfinite(pred_at_x) & np.isfinite(y)
with np.errstate(invalid="ignore", divide="ignore"):
denom = (
float(np.dot(pred_at_x[valid_mask], pred_at_x[valid_mask])) if pred_at_x.size else 0.0
)
if denom > 0:
scale = float(np.dot(y[valid_mask], pred_at_x[valid_mask]) / denom)
if not np.isnan(scale) and not isclose(scale, 1.0, rel_tol=0.05, abs_tol=1e-3):
y_range = y_range * scale
# Plot individual segments so the baseline and active slopes are easier to read
baseline_color = "#6b6b6b"
heating_color = "#d62728"
cooling_color = "#1f77b4"
def _is_zero(value: float | None) -> bool:
return value is None or isclose(value, 0.0, abs_tol=1e-6)
def _plot_segment(
x_vals: np.ndarray,
y_vals: np.ndarray,
*,
color: str,
label: str | None,
) -> Line2D | None:
if x_vals.size == 0:
return None
(line,) = ax.plot(x_vals, y_vals, color=color, linewidth=2, label=label)
return line
# Heating segment (left of heating change point)
heating_cp = model_result.heating_change_point
heating_line: Line2D | None = None
if heating_cp is not None and not _is_zero(model_result.heating_slope):
mask = x_range <= heating_cp
heating_line = _plot_segment(
x_range[mask],
y_range[mask],
color=heating_color,
label="Heating Slope",
)
else:
heating_cp = None
# Baseline / neutral segment
neutral_start = heating_cp if heating_cp is not None else x_range[0]
cooling_cp = model_result.cooling_change_point
neutral_end = cooling_cp if cooling_cp is not None else x_range[-1]
baseline_mask = (x_range >= neutral_start) & (x_range <= neutral_end)
baseline_line = _plot_segment(
x_range[baseline_mask],
y_range[baseline_mask],
color=baseline_color,
label="Baseload",
)
# Cooling segment (right of cooling change point)
cooling_line: Line2D | None = None
if cooling_cp is not None and not _is_zero(model_result.cooling_slope):
mask = x_range >= cooling_cp
cooling_line = _plot_segment(
x_range[mask],
y_range[mask],
color=cooling_color,
label="Cooling Slope",
)
# Add changepoint markers
# No explicit changepoint markers; values remain in annotation box
# Create info text
model_type_map = {
"1P": "1-Parameter",
"3P Heating": "3-Parameter Heating",
"3P Cooling": "3-Parameter Cooling",
"5P": "5-Parameter",
}
model_label_long = model_type_map.get(model_result.model_type, model_result.model_type)
info_text = f"Model: {model_label_long}\n"
info_text += f"R² = {model_result.r_squared:.3f}\n"
info_text += f"CV-RMSE = {model_result.cvrmse:.3f}\n"
info_text += f"Baseload = {model_result.baseload:.2f}"
if model_result.heating_slope is not None:
info_text += f"\nHeating Slope = {model_result.heating_slope:.3f}"
if model_result.heating_change_point is not None:
info_text += f"\nHeating Change-point = {model_result.heating_change_point:.1f}"
if model_result.cooling_slope is not None:
info_text += f"\nCooling Slope = {model_result.cooling_slope:.3f}"
if model_result.cooling_change_point is not None:
info_text += f"\nCooling Change-point = {model_result.cooling_change_point:.1f}"
# Add info box outside the plotting area, top-right
ax.text(
1.02,
0.98,
info_text,
transform=ax.transAxes,
fontsize=10,
verticalalignment="top",
horizontalalignment="left",
bbox={"boxstyle": "round", "facecolor": "white", "alpha": 0.8},
)
# Set labels and title
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
if title is None:
title = f"Change-Point Model ({len(x)} data points)"
ax.set_title(title)
legend_handles = [scatter]
legend_labels = ["Data"]
for line, label in (
(heating_line, "Heating Slope"),
(cooling_line, "Cooling Slope"),
(baseline_line, "Baseload"),
):
if line is not None and label not in legend_labels:
legend_handles.append(line)
legend_labels.append(label)
ax.legend(
legend_handles,
legend_labels,
loc="upper left",
bbox_to_anchor=(1.02, 0.7),
borderaxespad=0.0,
frameon=True,
)
ax.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=300, bbox_inches="tight")
return fig, ax
[docs]
class ChangePointModelResult(BaseModel):
"""Result of change-point model fitting.
Contains all coefficients, goodness-of-fit metrics, and metadata
from fitting a change-point model to energy usage data.
"""
heating_slope: float | None = Field(None, description="Heating slope coefficient")
heating_change_point: float | None = Field(None, description="Heating change point temperature")
baseload: float = Field(..., description="Baseload consumption")
cooling_change_point: float | None = Field(None, description="Cooling change point temperature")
cooling_slope: float | None = Field(None, description="Cooling slope coefficient")
r_squared: float = Field(..., ge=0, le=1, description="R-squared value")
cvrmse: float = Field(..., ge=0, description="CV(RMSE) value")
model_type: str = Field(..., description="Model type (1P, 3P Heating, 3P Cooling, 5P)")
heating_pvalue: float | None = Field(None, description="P-value for heating slope significance")
cooling_pvalue: float | None = Field(None, description="P-value for cooling slope significance")
[docs]
def is_valid(self, min_r_squared: float = 0.6, max_cvrmse: float = 0.5) -> bool:
"""Check if model meets quality thresholds."""
return self.r_squared >= min_r_squared and self.cvrmse <= max_cvrmse
[docs]
def get_model_complexity(self) -> int:
"""Get number of parameters in the model."""
model_params = {"1P": 1, "3P Heating": 3, "3P Cooling": 3, "5P": 5}
return model_params.get(self.model_type, 1)
[docs]
def estimate_annual_consumption(self, annual_hdd: float, annual_cdd: float) -> float:
"""Estimate annual energy consumption using heating/cooling degree days."""
annual = self.baseload * 365
if self.heating_slope:
annual += self.heating_slope * annual_hdd
if self.cooling_slope:
annual += self.cooling_slope * annual_cdd
return annual