Autonomous AgentsPrediction663 lines
Demand Forecasting
Quick Summary14 lines
Demand forecasting predicts future customer demand for products and services, enabling optimized inventory management, production planning, workforce scheduling, and financial planning. This skill covers seasonal pattern detection, promotional lift modeling, new product forecasting, hierarchical forecasting across product/geography hierarchies, intermittent demand handling (Croston's method), and forecast reconciliation techniques. ## Key Points 1. Demand decomposition into base, trend, seasonal, and promotional components is the essential first step 2. Seasonal patterns should be detected empirically (autocorrelation, FFT) not assumed; the period may not be what you expect 3. Promotional lift should account for forward buying (post-promotion dip) to avoid overestimating incremental value 4. New product forecasting requires analogous products or diffusion models since there is no history to extrapolate 5. Croston's method (or SBA) is essential for intermittent demand; standard methods overpredict by ignoring zero-demand periods 6. Hierarchical reconciliation ensures that SKU forecasts sum to category forecasts; MinT optimal reconciliation minimizes total variance 7. Use Weighted MAPE (WMAPE) over MAPE for demand with zeros; track bias separately to detect systematic over/under-forecasting 8. The tracking signal is the practical early warning system: when it exceeds +/-4, the model needs retraining
skilldb get prediction-skills/demand-forecastingFull skill: 663 linesPaste into your CLAUDE.md or agent config
Demand Forecasting
Overview
Demand forecasting predicts future customer demand for products and services, enabling optimized inventory management, production planning, workforce scheduling, and financial planning. This skill covers seasonal pattern detection, promotional lift modeling, new product forecasting, hierarchical forecasting across product/geography hierarchies, intermittent demand handling (Croston's method), and forecast reconciliation techniques.
Foundations of Demand Forecasting
The Forecasting Hierarchy
Enterprise Level
└── Business Unit
└── Product Category
└── Product Family
└── SKU Level
└── SKU × Location
└── SKU × Location × Channel
Forecasting at different levels involves tradeoffs:
class ForecastGranularity:
"""Guide for choosing forecast granularity."""
LEVELS = {
'aggregate': {
'accuracy': 'highest',
'actionability': 'lowest',
'data_volume': 'most',
'use_case': 'Financial planning, capacity planning'
},
'category': {
'accuracy': 'high',
'actionability': 'medium',
'data_volume': 'moderate',
'use_case': 'Assortment planning, category management'
},
'sku': {
'accuracy': 'medium',
'actionability': 'high',
'data_volume': 'moderate per SKU',
'use_case': 'Inventory management, replenishment'
},
'sku_location': {
'accuracy': 'lower',
'actionability': 'highest',
'data_volume': 'sparse per combination',
'use_case': 'Store-level replenishment, allocation'
}
}
Demand Decomposition
import numpy as np
import pandas as pd
class DemandDecomposer:
"""Decompose demand into components for forecasting."""
def __init__(self, demand_series: pd.Series, frequency: str = 'W'):
self.demand = demand_series
self.frequency = frequency
def decompose(self) -> dict:
"""Break demand into base, trend, seasonal, and promotional components."""
# Base demand: median after removing outliers
q25, q75 = np.percentile(self.demand.dropna(), [25, 75])
iqr = q75 - q25
base_demand = self.demand[
(self.demand > q25 - 1.5 * iqr) & (self.demand < q75 + 1.5 * iqr)
].median()
# Trend: linear regression on time
t = np.arange(len(self.demand))
valid = self.demand.dropna()
valid_t = t[:len(valid)]
if len(valid) > 2:
coeffs = np.polyfit(valid_t, valid.values, 1)
trend = coeffs[0] # Units per period
else:
trend = 0
# Seasonal indices
if self.frequency == 'W':
period = 52
elif self.frequency == 'M':
period = 12
elif self.frequency == 'D':
period = 7
else:
period = 12
seasonal_indices = self._compute_seasonal_indices(period)
# Promotional spikes: values > 2 standard deviations above deseasonalized
deseasonalized = self._deseasonalize(seasonal_indices, period)
std = deseasonalized.std()
promotional_periods = deseasonalized[deseasonalized > deseasonalized.mean() + 2 * std]
return {
'base_demand': base_demand,
'trend_per_period': trend,
'seasonal_indices': seasonal_indices,
'n_promotional_periods': len(promotional_periods),
'avg_promotional_lift': (promotional_periods.mean() / base_demand - 1) if len(promotional_periods) > 0 else 0,
'coefficient_of_variation': self.demand.std() / self.demand.mean() if self.demand.mean() > 0 else float('inf')
}
def _compute_seasonal_indices(self, period: int) -> np.ndarray:
"""Compute seasonal indices using ratio-to-moving-average method."""
values = self.demand.values
indices = np.ones(period)
if len(values) < period * 2:
return indices
# Moving average
ma = pd.Series(values).rolling(period, center=True).mean()
# Ratio to MA
ratios = values / ma.values
# Average ratio for each season position
for i in range(period):
season_ratios = ratios[i::period]
valid = season_ratios[~np.isnan(season_ratios)]
if len(valid) > 0:
indices[i] = np.median(valid)
# Normalize so they average to 1
indices = indices / np.mean(indices)
return indices
def _deseasonalize(self, indices: np.ndarray, period: int) -> pd.Series:
values = self.demand.values.copy()
for i in range(len(values)):
values[i] = values[i] / indices[i % period]
return pd.Series(values)
Seasonal Pattern Detection
class SeasonalityDetector:
"""Detect and characterize seasonal patterns in demand data."""
def __init__(self, demand: pd.Series):
self.demand = demand
def detect_periods(self, max_period: int = 365) -> list:
"""Use autocorrelation to detect seasonal periods."""
values = self.demand.dropna().values
n = len(values)
if n < max_period * 2:
max_period = n // 3
# Compute autocorrelation
mean = np.mean(values)
variance = np.var(values)
autocorrelations = []
for lag in range(1, max_period + 1):
if variance == 0:
autocorrelations.append(0)
continue
acf = np.mean((values[:-lag] - mean) * (values[lag:] - mean)) / variance
autocorrelations.append(acf)
# Find peaks in autocorrelation
peaks = []
for i in range(1, len(autocorrelations) - 1):
if (autocorrelations[i] > autocorrelations[i-1] and
autocorrelations[i] > autocorrelations[i+1] and
autocorrelations[i] > 0.3):
peaks.append({
'period': i + 1,
'autocorrelation': autocorrelations[i],
'strength': 'strong' if autocorrelations[i] > 0.6 else 'moderate'
})
return sorted(peaks, key=lambda x: -x['autocorrelation'])
def fourier_analysis(self, top_n: int = 5) -> list:
"""Use FFT to identify dominant frequencies."""
values = self.demand.dropna().values
n = len(values)
# Remove trend
detrended = values - np.linspace(values[0], values[-1], n)
# FFT
fft_values = np.fft.rfft(detrended)
frequencies = np.fft.rfftfreq(n)
magnitudes = np.abs(fft_values)
# Find top frequencies (skip DC component)
top_indices = np.argsort(magnitudes[1:])[-top_n:] + 1
results = []
for idx in top_indices:
period = 1 / frequencies[idx] if frequencies[idx] > 0 else float('inf')
results.append({
'period': period,
'magnitude': magnitudes[idx],
'relative_power': magnitudes[idx] / np.sum(magnitudes[1:])
})
return sorted(results, key=lambda x: -x['magnitude'])
Promotional Lift Modeling
class PromotionalLiftModel:
"""Model the impact of promotions on demand."""
def __init__(self):
self.promotion_history = []
self.baseline_model = None
def fit(self, demand: pd.Series, promotions: pd.DataFrame):
"""
Fit a promotional lift model.
demand: time series of actual demand
promotions: DataFrame with columns
['start_date', 'end_date', 'type', 'discount_pct', 'feature_type']
"""
# Compute baseline (demand without promotions)
promo_periods = set()
for _, promo in promotions.iterrows():
dates = pd.date_range(promo['start_date'], promo['end_date'])
promo_periods.update(dates)
non_promo_demand = demand[~demand.index.isin(promo_periods)]
self.baseline = non_promo_demand.mean()
# Compute lift for each promotion type
lift_by_type = {}
for _, promo in promotions.iterrows():
dates = pd.date_range(promo['start_date'], promo['end_date'])
promo_demand = demand[demand.index.isin(dates)]
if len(promo_demand) > 0:
lift = promo_demand.mean() / self.baseline - 1
ptype = promo['type']
if ptype not in lift_by_type:
lift_by_type[ptype] = []
lift_by_type[ptype].append({
'lift': lift,
'discount_pct': promo.get('discount_pct', 0),
'duration': len(dates)
})
self.lift_models = {}
for ptype, records in lift_by_type.items():
lifts = [r['lift'] for r in records]
discounts = [r['discount_pct'] for r in records]
self.lift_models[ptype] = {
'avg_lift': np.mean(lifts),
'median_lift': np.median(lifts),
'lift_per_discount_pct': np.mean(lifts) / max(np.mean(discounts), 1),
'n_observations': len(records)
}
def forecast_with_promotion(self, baseline_forecast: float,
promo_type: str, discount_pct: float = 0) -> dict:
"""Forecast demand during a promotion period."""
if promo_type not in self.lift_models:
return {
'forecast': baseline_forecast,
'lift': 0,
'warning': f'No data for promotion type: {promo_type}'
}
model = self.lift_models[promo_type]
expected_lift = model['avg_lift']
# Scale by discount depth if we have the relationship
if discount_pct > 0 and model['lift_per_discount_pct'] > 0:
expected_lift = model['lift_per_discount_pct'] * discount_pct
promoted_forecast = baseline_forecast * (1 + expected_lift)
# Post-promotion dip (cannibalization)
post_promo_dip = -expected_lift * 0.3 # 30% of lift comes from forward buying
return {
'baseline_forecast': baseline_forecast,
'promotional_forecast': promoted_forecast,
'expected_lift_pct': expected_lift * 100,
'post_promo_dip_pct': post_promo_dip * 100,
'net_incremental': promoted_forecast - baseline_forecast + (baseline_forecast * post_promo_dip),
'roi_note': 'Subtract cost of promotion to calculate ROI'
}
New Product Forecasting
class NewProductForecaster:
"""Forecast demand for products with no sales history."""
def __init__(self):
self.analogous_products = []
def analogy_method(self, new_product: dict,
analogous_histories: list) -> dict:
"""
Forecast new product using analogous product history.
new_product: {'name': str, 'category': str, 'price': float,
'features': dict, 'launch_support': float}
analogous_histories: [{'product': dict, 'history': pd.Series,
'similarity': float}]
"""
weighted_forecast = None
total_weight = 0
for analog in analogous_histories:
weight = analog['similarity']
history = analog['history'].values
# Adjust for differences
price_ratio = new_product.get('price', 100) / analog['product'].get('price', 100)
price_adjustment = 1 / price_ratio # Lower price = higher demand
support_ratio = new_product.get('launch_support', 1) / analog['product'].get('launch_support', 1)
adjusted_history = history * price_adjustment * support_ratio
if weighted_forecast is None:
weighted_forecast = adjusted_history * weight
else:
min_len = min(len(weighted_forecast), len(adjusted_history))
weighted_forecast = weighted_forecast[:min_len] + adjusted_history[:min_len] * weight
total_weight += weight
if weighted_forecast is not None and total_weight > 0:
weighted_forecast /= total_weight
return {
'forecast': weighted_forecast.tolist() if weighted_forecast is not None else [],
'n_analogs_used': len(analogous_histories),
'confidence': 'low' if len(analogous_histories) < 3 else 'medium',
'note': 'New product forecasts are inherently uncertain. Plan for wide range.'
}
def diffusion_method(self, market_size: int, p: float = 0.03,
q: float = 0.38, periods: int = 24) -> dict:
"""Use Bass diffusion model for new product category."""
cumulative = 0
forecasts = []
for t in range(periods):
new_adopters = (p + q * cumulative / market_size) * (market_size - cumulative)
new_adopters = max(0, new_adopters)
cumulative += new_adopters
forecasts.append({
'period': t,
'new_adopters': int(new_adopters),
'cumulative': int(cumulative),
'penetration': cumulative / market_size
})
return {
'forecast': forecasts,
'peak_period': max(forecasts, key=lambda x: x['new_adopters'])['period'],
'time_to_50_pct': next(
(f['period'] for f in forecasts if f['penetration'] >= 0.5), None
)
}
Intermittent Demand (Croston's Method)
class CrostonForecaster:
"""
Croston's method for intermittent demand.
Many SKUs have sporadic demand with lots of zeros.
Standard methods fail; Croston separates the problem into:
1. Forecasting demand SIZE (when it occurs)
2. Forecasting demand INTERVAL (how often it occurs)
"""
def __init__(self, alpha: float = 0.1):
self.alpha = alpha
def classify_demand(self, demand: np.ndarray) -> str:
"""Classify demand pattern to select appropriate method."""
non_zero = demand[demand > 0]
zero_fraction = 1 - len(non_zero) / len(demand)
if zero_fraction < 0.3:
return 'smooth' # Use standard methods
elif zero_fraction < 0.7:
if np.std(non_zero) / np.mean(non_zero) < 0.5:
return 'intermittent' # Croston
else:
return 'lumpy' # SBA or TSB
else:
return 'very_sparse' # SBA or aggregate
def fit_predict(self, demand: np.ndarray) -> dict:
"""
Croston's method:
- Track two series: demand size and inter-demand interval
- Forecast each with exponential smoothing
- Combine: forecast = size / interval
"""
n = len(demand)
# Initialize
non_zero_indices = np.where(demand > 0)[0]
if len(non_zero_indices) < 2:
return {
'forecast': np.mean(demand),
'method': 'simple_mean',
'warning': 'Too few non-zero demands for Croston'
}
# Initial estimates
z = demand[non_zero_indices[0]] # First non-zero demand size
p = non_zero_indices[1] - non_zero_indices[0] # First interval
# Track forecasts
z_forecasts = [z]
p_forecasts = [p]
q = 0 # Periods since last demand
for t in range(1, n):
q += 1
if demand[t] > 0:
# Update demand size estimate
z = self.alpha * demand[t] + (1 - self.alpha) * z
# Update interval estimate
p = self.alpha * q + (1 - self.alpha) * p
q = 0 # Reset counter
z_forecasts.append(z)
p_forecasts.append(p)
# Per-period forecast = size / interval
forecast = z / p if p > 0 else 0
return {
'forecast_per_period': forecast,
'demand_size_estimate': z,
'inter_demand_interval': p,
'method': 'croston',
'demand_classification': self.classify_demand(demand),
'z_history': z_forecasts,
'p_history': p_forecasts
}
def sba_method(self, demand: np.ndarray) -> dict:
"""
Syntetos-Boylan Approximation: bias-corrected Croston.
Croston tends to overestimate; SBA corrects this.
"""
croston_result = self.fit_predict(demand)
correction = 1 - self.alpha / 2
sba_forecast = croston_result['forecast_per_period'] * correction
return {
'forecast_per_period': sba_forecast,
'correction_factor': correction,
'croston_forecast': croston_result['forecast_per_period'],
'method': 'sba'
}
Hierarchical Forecasting and Reconciliation
class HierarchicalForecaster:
"""
Forecast at multiple levels of a hierarchy and reconcile.
Ensures that detailed forecasts sum to aggregate forecasts.
"""
def __init__(self, hierarchy: dict):
"""
hierarchy: {'total': ['category_A', 'category_B'],
'category_A': ['sku_1', 'sku_2'],
'category_B': ['sku_3', 'sku_4']}
"""
self.hierarchy = hierarchy
self.forecasts = {}
def add_forecast(self, node: str, forecast: float):
self.forecasts[node] = forecast
def bottom_up(self) -> dict:
"""
Bottom-up: forecast at lowest level, aggregate up.
Best when low-level data is rich and patterns are distinct.
"""
reconciled = {}
leaf_nodes = self._get_leaves()
for leaf in leaf_nodes:
reconciled[leaf] = self.forecasts.get(leaf, 0)
# Aggregate up
for parent, children in self.hierarchy.items():
reconciled[parent] = sum(reconciled.get(c, 0) for c in children)
return reconciled
def top_down(self, allocation_method: str = 'proportional') -> dict:
"""
Top-down: forecast at top level, allocate down using proportions.
Best when low-level data is sparse or noisy.
"""
reconciled = {}
total_forecast = self.forecasts.get('total', 0)
if allocation_method == 'proportional':
# Allocate based on historical proportions
for parent, children in self.hierarchy.items():
parent_forecast = self.forecasts.get(parent, total_forecast)
child_forecasts = {c: self.forecasts.get(c, 0) for c in children}
child_total = sum(child_forecasts.values())
if child_total > 0:
for child in children:
proportion = child_forecasts[child] / child_total
reconciled[child] = parent_forecast * proportion
else:
for child in children:
reconciled[child] = parent_forecast / len(children)
reconciled['total'] = total_forecast
return reconciled
def optimal_reconciliation(self, base_forecasts: dict,
residual_covariance: np.ndarray = None) -> dict:
"""
Optimal reconciliation (Wickramasuriya et al., 2019).
Uses MinT (Minimum Trace) to find reconciled forecasts
that minimize total forecast variance.
"""
# Build summing matrix S
all_nodes = list(base_forecasts.keys())
leaves = self._get_leaves()
n_total = len(all_nodes)
n_leaves = len(leaves)
S = np.zeros((n_total, n_leaves))
for i, node in enumerate(all_nodes):
if node in leaves:
j = leaves.index(node)
S[i, j] = 1
else:
# Sum of children
for child in self._get_descendants(node):
if child in leaves:
j = leaves.index(child)
S[i, j] = 1
# Base forecast vector
y_hat = np.array([base_forecasts[node] for node in all_nodes])
if residual_covariance is None:
# Use OLS reconciliation (identity covariance)
W = np.eye(n_total)
else:
W = residual_covariance
# MinT reconciliation
W_inv = np.linalg.inv(W)
P = S @ np.linalg.inv(S.T @ W_inv @ S) @ S.T @ W_inv
y_reconciled = P @ y_hat
return {node: y_reconciled[i] for i, node in enumerate(all_nodes)}
def _get_leaves(self) -> list:
parents = set(self.hierarchy.keys())
all_children = set()
for children in self.hierarchy.values():
all_children.update(children)
return [c for c in all_children if c not in parents]
def _get_descendants(self, node: str) -> list:
descendants = []
children = self.hierarchy.get(node, [])
for child in children:
descendants.append(child)
descendants.extend(self._get_descendants(child))
return descendants
Forecast Accuracy Metrics for Demand
class DemandForecastMetrics:
"""Metrics specific to demand forecasting evaluation."""
@staticmethod
def mape(actual: np.ndarray, forecast: np.ndarray) -> float:
"""Mean Absolute Percentage Error. Avoid when actuals contain zeros."""
mask = actual != 0
return np.mean(np.abs((actual[mask] - forecast[mask]) / actual[mask])) * 100
@staticmethod
def wmape(actual: np.ndarray, forecast: np.ndarray) -> float:
"""Weighted MAPE: handles zeros better than MAPE."""
return np.sum(np.abs(actual - forecast)) / np.sum(np.abs(actual)) * 100
@staticmethod
def bias(actual: np.ndarray, forecast: np.ndarray) -> float:
"""Forecast bias: positive = over-forecasting, negative = under-forecasting."""
return np.mean(forecast - actual) / np.mean(actual) * 100
@staticmethod
def tracking_signal(actual: np.ndarray, forecast: np.ndarray,
periods: int = 12) -> float:
"""
Tracking signal: cumulative error / MAD.
Should stay between -4 and 4. Outside indicates systematic bias.
"""
errors = forecast - actual
cumulative_error = np.sum(errors[-periods:])
mad = np.mean(np.abs(errors[-periods:]))
return cumulative_error / mad if mad > 0 else 0
@staticmethod
def forecast_value_added(naive_error: float, model_error: float) -> float:
"""Does the model add value over a naive forecast?"""
return (naive_error - model_error) / naive_error * 100
Key Takeaways
- Demand decomposition into base, trend, seasonal, and promotional components is the essential first step
- Seasonal patterns should be detected empirically (autocorrelation, FFT) not assumed; the period may not be what you expect
- Promotional lift should account for forward buying (post-promotion dip) to avoid overestimating incremental value
- New product forecasting requires analogous products or diffusion models since there is no history to extrapolate
- Croston's method (or SBA) is essential for intermittent demand; standard methods overpredict by ignoring zero-demand periods
- Hierarchical reconciliation ensures that SKU forecasts sum to category forecasts; MinT optimal reconciliation minimizes total variance
- Use Weighted MAPE (WMAPE) over MAPE for demand with zeros; track bias separately to detect systematic over/under-forecasting
- The tracking signal is the practical early warning system: when it exceeds +/-4, the model needs retraining
Install this skill directly: skilldb add prediction-skills