Skip to main content
Autonomous AgentsPrediction663 lines

Demand Forecasting

Quick Summary14 lines
Demand forecasting predicts future customer demand for products and services, enabling optimized inventory management, production planning, workforce scheduling, and financial planning. This skill covers seasonal pattern detection, promotional lift modeling, new product forecasting, hierarchical forecasting across product/geography hierarchies, intermittent demand handling (Croston's method), and forecast reconciliation techniques.

## Key Points

1. Demand decomposition into base, trend, seasonal, and promotional components is the essential first step
2. Seasonal patterns should be detected empirically (autocorrelation, FFT) not assumed; the period may not be what you expect
3. Promotional lift should account for forward buying (post-promotion dip) to avoid overestimating incremental value
4. New product forecasting requires analogous products or diffusion models since there is no history to extrapolate
5. Croston's method (or SBA) is essential for intermittent demand; standard methods overpredict by ignoring zero-demand periods
6. Hierarchical reconciliation ensures that SKU forecasts sum to category forecasts; MinT optimal reconciliation minimizes total variance
7. Use Weighted MAPE (WMAPE) over MAPE for demand with zeros; track bias separately to detect systematic over/under-forecasting
8. The tracking signal is the practical early warning system: when it exceeds +/-4, the model needs retraining
skilldb get prediction-skills/demand-forecastingFull skill: 663 lines
Paste into your CLAUDE.md or agent config

Demand Forecasting

Overview

Demand forecasting predicts future customer demand for products and services, enabling optimized inventory management, production planning, workforce scheduling, and financial planning. This skill covers seasonal pattern detection, promotional lift modeling, new product forecasting, hierarchical forecasting across product/geography hierarchies, intermittent demand handling (Croston's method), and forecast reconciliation techniques.

Foundations of Demand Forecasting

The Forecasting Hierarchy

Enterprise Level
  └── Business Unit
       └── Product Category
            └── Product Family
                 └── SKU Level
                      └── SKU × Location
                           └── SKU × Location × Channel

Forecasting at different levels involves tradeoffs:

class ForecastGranularity:
    """Guide for choosing forecast granularity."""

    LEVELS = {
        'aggregate': {
            'accuracy': 'highest',
            'actionability': 'lowest',
            'data_volume': 'most',
            'use_case': 'Financial planning, capacity planning'
        },
        'category': {
            'accuracy': 'high',
            'actionability': 'medium',
            'data_volume': 'moderate',
            'use_case': 'Assortment planning, category management'
        },
        'sku': {
            'accuracy': 'medium',
            'actionability': 'high',
            'data_volume': 'moderate per SKU',
            'use_case': 'Inventory management, replenishment'
        },
        'sku_location': {
            'accuracy': 'lower',
            'actionability': 'highest',
            'data_volume': 'sparse per combination',
            'use_case': 'Store-level replenishment, allocation'
        }
    }

Demand Decomposition

import numpy as np
import pandas as pd

class DemandDecomposer:
    """Decompose demand into components for forecasting."""

    def __init__(self, demand_series: pd.Series, frequency: str = 'W'):
        self.demand = demand_series
        self.frequency = frequency

    def decompose(self) -> dict:
        """Break demand into base, trend, seasonal, and promotional components."""

        # Base demand: median after removing outliers
        q25, q75 = np.percentile(self.demand.dropna(), [25, 75])
        iqr = q75 - q25
        base_demand = self.demand[
            (self.demand > q25 - 1.5 * iqr) & (self.demand < q75 + 1.5 * iqr)
        ].median()

        # Trend: linear regression on time
        t = np.arange(len(self.demand))
        valid = self.demand.dropna()
        valid_t = t[:len(valid)]
        if len(valid) > 2:
            coeffs = np.polyfit(valid_t, valid.values, 1)
            trend = coeffs[0]  # Units per period
        else:
            trend = 0

        # Seasonal indices
        if self.frequency == 'W':
            period = 52
        elif self.frequency == 'M':
            period = 12
        elif self.frequency == 'D':
            period = 7
        else:
            period = 12

        seasonal_indices = self._compute_seasonal_indices(period)

        # Promotional spikes: values > 2 standard deviations above deseasonalized
        deseasonalized = self._deseasonalize(seasonal_indices, period)
        std = deseasonalized.std()
        promotional_periods = deseasonalized[deseasonalized > deseasonalized.mean() + 2 * std]

        return {
            'base_demand': base_demand,
            'trend_per_period': trend,
            'seasonal_indices': seasonal_indices,
            'n_promotional_periods': len(promotional_periods),
            'avg_promotional_lift': (promotional_periods.mean() / base_demand - 1) if len(promotional_periods) > 0 else 0,
            'coefficient_of_variation': self.demand.std() / self.demand.mean() if self.demand.mean() > 0 else float('inf')
        }

    def _compute_seasonal_indices(self, period: int) -> np.ndarray:
        """Compute seasonal indices using ratio-to-moving-average method."""
        values = self.demand.values
        indices = np.ones(period)

        if len(values) < period * 2:
            return indices

        # Moving average
        ma = pd.Series(values).rolling(period, center=True).mean()

        # Ratio to MA
        ratios = values / ma.values

        # Average ratio for each season position
        for i in range(period):
            season_ratios = ratios[i::period]
            valid = season_ratios[~np.isnan(season_ratios)]
            if len(valid) > 0:
                indices[i] = np.median(valid)

        # Normalize so they average to 1
        indices = indices / np.mean(indices)
        return indices

    def _deseasonalize(self, indices: np.ndarray, period: int) -> pd.Series:
        values = self.demand.values.copy()
        for i in range(len(values)):
            values[i] = values[i] / indices[i % period]
        return pd.Series(values)

Seasonal Pattern Detection

class SeasonalityDetector:
    """Detect and characterize seasonal patterns in demand data."""

    def __init__(self, demand: pd.Series):
        self.demand = demand

    def detect_periods(self, max_period: int = 365) -> list:
        """Use autocorrelation to detect seasonal periods."""
        values = self.demand.dropna().values
        n = len(values)

        if n < max_period * 2:
            max_period = n // 3

        # Compute autocorrelation
        mean = np.mean(values)
        variance = np.var(values)
        autocorrelations = []

        for lag in range(1, max_period + 1):
            if variance == 0:
                autocorrelations.append(0)
                continue
            acf = np.mean((values[:-lag] - mean) * (values[lag:] - mean)) / variance
            autocorrelations.append(acf)

        # Find peaks in autocorrelation
        peaks = []
        for i in range(1, len(autocorrelations) - 1):
            if (autocorrelations[i] > autocorrelations[i-1] and
                autocorrelations[i] > autocorrelations[i+1] and
                autocorrelations[i] > 0.3):
                peaks.append({
                    'period': i + 1,
                    'autocorrelation': autocorrelations[i],
                    'strength': 'strong' if autocorrelations[i] > 0.6 else 'moderate'
                })

        return sorted(peaks, key=lambda x: -x['autocorrelation'])

    def fourier_analysis(self, top_n: int = 5) -> list:
        """Use FFT to identify dominant frequencies."""
        values = self.demand.dropna().values
        n = len(values)

        # Remove trend
        detrended = values - np.linspace(values[0], values[-1], n)

        # FFT
        fft_values = np.fft.rfft(detrended)
        frequencies = np.fft.rfftfreq(n)
        magnitudes = np.abs(fft_values)

        # Find top frequencies (skip DC component)
        top_indices = np.argsort(magnitudes[1:])[-top_n:] + 1

        results = []
        for idx in top_indices:
            period = 1 / frequencies[idx] if frequencies[idx] > 0 else float('inf')
            results.append({
                'period': period,
                'magnitude': magnitudes[idx],
                'relative_power': magnitudes[idx] / np.sum(magnitudes[1:])
            })

        return sorted(results, key=lambda x: -x['magnitude'])

Promotional Lift Modeling

class PromotionalLiftModel:
    """Model the impact of promotions on demand."""

    def __init__(self):
        self.promotion_history = []
        self.baseline_model = None

    def fit(self, demand: pd.Series, promotions: pd.DataFrame):
        """
        Fit a promotional lift model.

        demand: time series of actual demand
        promotions: DataFrame with columns
            ['start_date', 'end_date', 'type', 'discount_pct', 'feature_type']
        """
        # Compute baseline (demand without promotions)
        promo_periods = set()
        for _, promo in promotions.iterrows():
            dates = pd.date_range(promo['start_date'], promo['end_date'])
            promo_periods.update(dates)

        non_promo_demand = demand[~demand.index.isin(promo_periods)]
        self.baseline = non_promo_demand.mean()

        # Compute lift for each promotion type
        lift_by_type = {}
        for _, promo in promotions.iterrows():
            dates = pd.date_range(promo['start_date'], promo['end_date'])
            promo_demand = demand[demand.index.isin(dates)]

            if len(promo_demand) > 0:
                lift = promo_demand.mean() / self.baseline - 1
                ptype = promo['type']
                if ptype not in lift_by_type:
                    lift_by_type[ptype] = []
                lift_by_type[ptype].append({
                    'lift': lift,
                    'discount_pct': promo.get('discount_pct', 0),
                    'duration': len(dates)
                })

        self.lift_models = {}
        for ptype, records in lift_by_type.items():
            lifts = [r['lift'] for r in records]
            discounts = [r['discount_pct'] for r in records]

            self.lift_models[ptype] = {
                'avg_lift': np.mean(lifts),
                'median_lift': np.median(lifts),
                'lift_per_discount_pct': np.mean(lifts) / max(np.mean(discounts), 1),
                'n_observations': len(records)
            }

    def forecast_with_promotion(self, baseline_forecast: float,
                                 promo_type: str, discount_pct: float = 0) -> dict:
        """Forecast demand during a promotion period."""
        if promo_type not in self.lift_models:
            return {
                'forecast': baseline_forecast,
                'lift': 0,
                'warning': f'No data for promotion type: {promo_type}'
            }

        model = self.lift_models[promo_type]
        expected_lift = model['avg_lift']

        # Scale by discount depth if we have the relationship
        if discount_pct > 0 and model['lift_per_discount_pct'] > 0:
            expected_lift = model['lift_per_discount_pct'] * discount_pct

        promoted_forecast = baseline_forecast * (1 + expected_lift)

        # Post-promotion dip (cannibalization)
        post_promo_dip = -expected_lift * 0.3  # 30% of lift comes from forward buying

        return {
            'baseline_forecast': baseline_forecast,
            'promotional_forecast': promoted_forecast,
            'expected_lift_pct': expected_lift * 100,
            'post_promo_dip_pct': post_promo_dip * 100,
            'net_incremental': promoted_forecast - baseline_forecast + (baseline_forecast * post_promo_dip),
            'roi_note': 'Subtract cost of promotion to calculate ROI'
        }

New Product Forecasting

class NewProductForecaster:
    """Forecast demand for products with no sales history."""

    def __init__(self):
        self.analogous_products = []

    def analogy_method(self, new_product: dict,
                       analogous_histories: list) -> dict:
        """
        Forecast new product using analogous product history.

        new_product: {'name': str, 'category': str, 'price': float,
                      'features': dict, 'launch_support': float}
        analogous_histories: [{'product': dict, 'history': pd.Series,
                               'similarity': float}]
        """
        weighted_forecast = None
        total_weight = 0

        for analog in analogous_histories:
            weight = analog['similarity']
            history = analog['history'].values

            # Adjust for differences
            price_ratio = new_product.get('price', 100) / analog['product'].get('price', 100)
            price_adjustment = 1 / price_ratio  # Lower price = higher demand

            support_ratio = new_product.get('launch_support', 1) / analog['product'].get('launch_support', 1)

            adjusted_history = history * price_adjustment * support_ratio

            if weighted_forecast is None:
                weighted_forecast = adjusted_history * weight
            else:
                min_len = min(len(weighted_forecast), len(adjusted_history))
                weighted_forecast = weighted_forecast[:min_len] + adjusted_history[:min_len] * weight

            total_weight += weight

        if weighted_forecast is not None and total_weight > 0:
            weighted_forecast /= total_weight

        return {
            'forecast': weighted_forecast.tolist() if weighted_forecast is not None else [],
            'n_analogs_used': len(analogous_histories),
            'confidence': 'low' if len(analogous_histories) < 3 else 'medium',
            'note': 'New product forecasts are inherently uncertain. Plan for wide range.'
        }

    def diffusion_method(self, market_size: int, p: float = 0.03,
                         q: float = 0.38, periods: int = 24) -> dict:
        """Use Bass diffusion model for new product category."""
        cumulative = 0
        forecasts = []

        for t in range(periods):
            new_adopters = (p + q * cumulative / market_size) * (market_size - cumulative)
            new_adopters = max(0, new_adopters)
            cumulative += new_adopters
            forecasts.append({
                'period': t,
                'new_adopters': int(new_adopters),
                'cumulative': int(cumulative),
                'penetration': cumulative / market_size
            })

        return {
            'forecast': forecasts,
            'peak_period': max(forecasts, key=lambda x: x['new_adopters'])['period'],
            'time_to_50_pct': next(
                (f['period'] for f in forecasts if f['penetration'] >= 0.5), None
            )
        }

Intermittent Demand (Croston's Method)

class CrostonForecaster:
    """
    Croston's method for intermittent demand.
    Many SKUs have sporadic demand with lots of zeros.
    Standard methods fail; Croston separates the problem into:
    1. Forecasting demand SIZE (when it occurs)
    2. Forecasting demand INTERVAL (how often it occurs)
    """

    def __init__(self, alpha: float = 0.1):
        self.alpha = alpha

    def classify_demand(self, demand: np.ndarray) -> str:
        """Classify demand pattern to select appropriate method."""
        non_zero = demand[demand > 0]
        zero_fraction = 1 - len(non_zero) / len(demand)

        if zero_fraction < 0.3:
            return 'smooth'  # Use standard methods
        elif zero_fraction < 0.7:
            if np.std(non_zero) / np.mean(non_zero) < 0.5:
                return 'intermittent'  # Croston
            else:
                return 'lumpy'  # SBA or TSB
        else:
            return 'very_sparse'  # SBA or aggregate

    def fit_predict(self, demand: np.ndarray) -> dict:
        """
        Croston's method:
        - Track two series: demand size and inter-demand interval
        - Forecast each with exponential smoothing
        - Combine: forecast = size / interval
        """
        n = len(demand)

        # Initialize
        non_zero_indices = np.where(demand > 0)[0]
        if len(non_zero_indices) < 2:
            return {
                'forecast': np.mean(demand),
                'method': 'simple_mean',
                'warning': 'Too few non-zero demands for Croston'
            }

        # Initial estimates
        z = demand[non_zero_indices[0]]  # First non-zero demand size
        p = non_zero_indices[1] - non_zero_indices[0]  # First interval

        # Track forecasts
        z_forecasts = [z]
        p_forecasts = [p]
        q = 0  # Periods since last demand

        for t in range(1, n):
            q += 1
            if demand[t] > 0:
                # Update demand size estimate
                z = self.alpha * demand[t] + (1 - self.alpha) * z
                # Update interval estimate
                p = self.alpha * q + (1 - self.alpha) * p
                q = 0  # Reset counter

            z_forecasts.append(z)
            p_forecasts.append(p)

        # Per-period forecast = size / interval
        forecast = z / p if p > 0 else 0

        return {
            'forecast_per_period': forecast,
            'demand_size_estimate': z,
            'inter_demand_interval': p,
            'method': 'croston',
            'demand_classification': self.classify_demand(demand),
            'z_history': z_forecasts,
            'p_history': p_forecasts
        }

    def sba_method(self, demand: np.ndarray) -> dict:
        """
        Syntetos-Boylan Approximation: bias-corrected Croston.
        Croston tends to overestimate; SBA corrects this.
        """
        croston_result = self.fit_predict(demand)
        correction = 1 - self.alpha / 2

        sba_forecast = croston_result['forecast_per_period'] * correction

        return {
            'forecast_per_period': sba_forecast,
            'correction_factor': correction,
            'croston_forecast': croston_result['forecast_per_period'],
            'method': 'sba'
        }

Hierarchical Forecasting and Reconciliation

class HierarchicalForecaster:
    """
    Forecast at multiple levels of a hierarchy and reconcile.
    Ensures that detailed forecasts sum to aggregate forecasts.
    """

    def __init__(self, hierarchy: dict):
        """
        hierarchy: {'total': ['category_A', 'category_B'],
                     'category_A': ['sku_1', 'sku_2'],
                     'category_B': ['sku_3', 'sku_4']}
        """
        self.hierarchy = hierarchy
        self.forecasts = {}

    def add_forecast(self, node: str, forecast: float):
        self.forecasts[node] = forecast

    def bottom_up(self) -> dict:
        """
        Bottom-up: forecast at lowest level, aggregate up.
        Best when low-level data is rich and patterns are distinct.
        """
        reconciled = {}
        leaf_nodes = self._get_leaves()

        for leaf in leaf_nodes:
            reconciled[leaf] = self.forecasts.get(leaf, 0)

        # Aggregate up
        for parent, children in self.hierarchy.items():
            reconciled[parent] = sum(reconciled.get(c, 0) for c in children)

        return reconciled

    def top_down(self, allocation_method: str = 'proportional') -> dict:
        """
        Top-down: forecast at top level, allocate down using proportions.
        Best when low-level data is sparse or noisy.
        """
        reconciled = {}
        total_forecast = self.forecasts.get('total', 0)

        if allocation_method == 'proportional':
            # Allocate based on historical proportions
            for parent, children in self.hierarchy.items():
                parent_forecast = self.forecasts.get(parent, total_forecast)
                child_forecasts = {c: self.forecasts.get(c, 0) for c in children}
                child_total = sum(child_forecasts.values())

                if child_total > 0:
                    for child in children:
                        proportion = child_forecasts[child] / child_total
                        reconciled[child] = parent_forecast * proportion
                else:
                    for child in children:
                        reconciled[child] = parent_forecast / len(children)

            reconciled['total'] = total_forecast

        return reconciled

    def optimal_reconciliation(self, base_forecasts: dict,
                                residual_covariance: np.ndarray = None) -> dict:
        """
        Optimal reconciliation (Wickramasuriya et al., 2019).
        Uses MinT (Minimum Trace) to find reconciled forecasts
        that minimize total forecast variance.
        """
        # Build summing matrix S
        all_nodes = list(base_forecasts.keys())
        leaves = self._get_leaves()
        n_total = len(all_nodes)
        n_leaves = len(leaves)

        S = np.zeros((n_total, n_leaves))
        for i, node in enumerate(all_nodes):
            if node in leaves:
                j = leaves.index(node)
                S[i, j] = 1
            else:
                # Sum of children
                for child in self._get_descendants(node):
                    if child in leaves:
                        j = leaves.index(child)
                        S[i, j] = 1

        # Base forecast vector
        y_hat = np.array([base_forecasts[node] for node in all_nodes])

        if residual_covariance is None:
            # Use OLS reconciliation (identity covariance)
            W = np.eye(n_total)
        else:
            W = residual_covariance

        # MinT reconciliation
        W_inv = np.linalg.inv(W)
        P = S @ np.linalg.inv(S.T @ W_inv @ S) @ S.T @ W_inv

        y_reconciled = P @ y_hat

        return {node: y_reconciled[i] for i, node in enumerate(all_nodes)}

    def _get_leaves(self) -> list:
        parents = set(self.hierarchy.keys())
        all_children = set()
        for children in self.hierarchy.values():
            all_children.update(children)
        return [c for c in all_children if c not in parents]

    def _get_descendants(self, node: str) -> list:
        descendants = []
        children = self.hierarchy.get(node, [])
        for child in children:
            descendants.append(child)
            descendants.extend(self._get_descendants(child))
        return descendants

Forecast Accuracy Metrics for Demand

class DemandForecastMetrics:
    """Metrics specific to demand forecasting evaluation."""

    @staticmethod
    def mape(actual: np.ndarray, forecast: np.ndarray) -> float:
        """Mean Absolute Percentage Error. Avoid when actuals contain zeros."""
        mask = actual != 0
        return np.mean(np.abs((actual[mask] - forecast[mask]) / actual[mask])) * 100

    @staticmethod
    def wmape(actual: np.ndarray, forecast: np.ndarray) -> float:
        """Weighted MAPE: handles zeros better than MAPE."""
        return np.sum(np.abs(actual - forecast)) / np.sum(np.abs(actual)) * 100

    @staticmethod
    def bias(actual: np.ndarray, forecast: np.ndarray) -> float:
        """Forecast bias: positive = over-forecasting, negative = under-forecasting."""
        return np.mean(forecast - actual) / np.mean(actual) * 100

    @staticmethod
    def tracking_signal(actual: np.ndarray, forecast: np.ndarray,
                         periods: int = 12) -> float:
        """
        Tracking signal: cumulative error / MAD.
        Should stay between -4 and 4. Outside indicates systematic bias.
        """
        errors = forecast - actual
        cumulative_error = np.sum(errors[-periods:])
        mad = np.mean(np.abs(errors[-periods:]))
        return cumulative_error / mad if mad > 0 else 0

    @staticmethod
    def forecast_value_added(naive_error: float, model_error: float) -> float:
        """Does the model add value over a naive forecast?"""
        return (naive_error - model_error) / naive_error * 100

Key Takeaways

  1. Demand decomposition into base, trend, seasonal, and promotional components is the essential first step
  2. Seasonal patterns should be detected empirically (autocorrelation, FFT) not assumed; the period may not be what you expect
  3. Promotional lift should account for forward buying (post-promotion dip) to avoid overestimating incremental value
  4. New product forecasting requires analogous products or diffusion models since there is no history to extrapolate
  5. Croston's method (or SBA) is essential for intermittent demand; standard methods overpredict by ignoring zero-demand periods
  6. Hierarchical reconciliation ensures that SKU forecasts sum to category forecasts; MinT optimal reconciliation minimizes total variance
  7. Use Weighted MAPE (WMAPE) over MAPE for demand with zeros; track bias separately to detect systematic over/under-forecasting
  8. The tracking signal is the practical early warning system: when it exceeds +/-4, the model needs retraining

Install this skill directly: skilldb add prediction-skills

Get CLI access →