Skip to main content
Autonomous AgentsPrediction660 lines

Time Series Forecasting

Quick Summary21 lines
Time series forecasting predicts future values based on historically ordered observations. From stock prices to server load to disease spread, time series data is everywhere. This skill covers the full spectrum from classical statistical methods (ARIMA, exponential smoothing) through modern approaches (Prophet, neural networks with LSTMs and transformers), including seasonal decomposition, trend analysis, and anomaly detection.

## Key Points

1. Always decompose the series first (trend, seasonal, residual) to understand what you are forecasting
2. Test for stationarity before applying ARIMA; differencing is the standard remedy for non-stationarity
3. Exponential smoothing (Holt-Winters) is a strong baseline that is often competitive with complex methods
4. Prophet excels at business forecasting with multiple seasonalities and interpretable components
5. LSTMs and transformers require more data but can capture nonlinear patterns that statistical methods miss
6. Anomaly detection should run alongside forecasting to flag when the generating process may have changed
7. Model selection depends on data size, complexity, and whether interpretability is required
8. Always evaluate with proper time-series cross-validation (expanding or sliding window), never random splits

## Quick Example

```
Observed = Trend + Seasonality + Cyclical + Residual  (additive)
Observed = Trend × Seasonality × Cyclical × Residual  (multiplicative)
```
skilldb get prediction-skills/time-series-forecastingFull skill: 660 lines
Paste into your CLAUDE.md or agent config

Time Series Forecasting

Overview

Time series forecasting predicts future values based on historically ordered observations. From stock prices to server load to disease spread, time series data is everywhere. This skill covers the full spectrum from classical statistical methods (ARIMA, exponential smoothing) through modern approaches (Prophet, neural networks with LSTMs and transformers), including seasonal decomposition, trend analysis, and anomaly detection.

Foundations

Time Series Components

Every time series can be decomposed into fundamental components:

Observed = Trend + Seasonality + Cyclical + Residual  (additive)
Observed = Trend × Seasonality × Cyclical × Residual  (multiplicative)
import numpy as np
import pandas as pd
from scipy import signal

class TimeSeriesDecomposition:
    """Decompose a time series into trend, seasonal, and residual components."""

    def __init__(self, data: np.ndarray, period: int, model: str = 'additive'):
        self.data = data
        self.period = period
        self.model = model

    def decompose(self) -> dict:
        """Classical decomposition."""
        n = len(self.data)

        # Trend: centered moving average
        trend = self._moving_average(self.data, self.period)

        # Detrend
        if self.model == 'additive':
            detrended = self.data - trend
        else:
            detrended = self.data / np.where(trend != 0, trend, 1)

        # Seasonal: average detrended values for each position in the cycle
        seasonal = np.zeros(n)
        for i in range(self.period):
            indices = range(i, n, self.period)
            valid = [detrended[j] for j in indices if not np.isnan(detrended[j])]
            if valid:
                seasonal_value = np.mean(valid)
                for j in indices:
                    seasonal[j] = seasonal_value

        # Residual
        if self.model == 'additive':
            residual = self.data - trend - seasonal
        else:
            residual = self.data / (trend * seasonal + 1e-10)

        return {
            'observed': self.data,
            'trend': trend,
            'seasonal': seasonal,
            'residual': residual
        }

    def _moving_average(self, data: np.ndarray, window: int) -> np.ndarray:
        """Centered moving average."""
        result = np.full(len(data), np.nan)
        half = window // 2
        for i in range(half, len(data) - half):
            result[i] = np.mean(data[i - half:i + half + 1])
        return result

Stationarity

Most forecasting methods require or benefit from stationary data (constant mean and variance over time).

from scipy import stats

def test_stationarity(series: np.ndarray) -> dict:
    """Test if a time series is stationary using multiple tests."""
    results = {}

    # ADF test (requires statsmodels)
    try:
        from statsmodels.tsa.stattools import adfuller
        adf_result = adfuller(series)
        results['adf'] = {
            'statistic': adf_result[0],
            'p_value': adf_result[1],
            'is_stationary': adf_result[1] < 0.05
        }
    except ImportError:
        pass

    # Simple rolling statistics check
    n = len(series)
    half = n // 2
    first_half = series[:half]
    second_half = series[half:]

    results['mean_shift'] = {
        'first_half_mean': np.mean(first_half),
        'second_half_mean': np.mean(second_half),
        'relative_change': abs(np.mean(second_half) - np.mean(first_half)) / (abs(np.mean(first_half)) + 1e-10)
    }

    results['variance_shift'] = {
        'first_half_var': np.var(first_half),
        'second_half_var': np.var(second_half),
        'ratio': np.var(second_half) / (np.var(first_half) + 1e-10)
    }

    return results


def make_stationary(series: np.ndarray) -> tuple:
    """Apply differencing to make a series stationary."""
    # First difference
    diff1 = np.diff(series)

    stationarity = test_stationarity(diff1)
    if stationarity.get('adf', {}).get('is_stationary', True):
        return diff1, 1  # d=1

    # Second difference
    diff2 = np.diff(diff1)
    return diff2, 2  # d=2

Classical Methods

ARIMA (AutoRegressive Integrated Moving Average)

class SimpleARIMA:
    """
    Simplified ARIMA implementation for understanding.
    For production use statsmodels or pmdarima.

    ARIMA(p, d, q):
      p = autoregressive order (how many past values to use)
      d = differencing order (how many times to difference)
      q = moving average order (how many past errors to use)
    """

    def __init__(self, p: int, d: int, q: int):
        self.p = p
        self.d = d
        self.q = q
        self.ar_params = None
        self.ma_params = None
        self.constant = 0

    def fit(self, series: np.ndarray):
        """Fit ARIMA model using OLS for AR component."""
        # Difference the series
        diff_series = series.copy()
        for _ in range(self.d):
            diff_series = np.diff(diff_series)

        # Fit AR component using least squares
        if self.p > 0:
            X = np.column_stack([
                diff_series[self.p - i - 1:-i-1 if i+1 < len(diff_series) else None]
                for i in range(self.p)
            ])
            y = diff_series[self.p:]
            min_len = min(len(X), len(y))
            X = X[:min_len]
            y = y[:min_len]

            # OLS
            self.ar_params = np.linalg.lstsq(X, y, rcond=None)[0]
            self.constant = np.mean(y) - X @ self.ar_params
            self._residuals = y - (X @ self.ar_params + self.constant)
        else:
            self.ar_params = np.array([])
            self._residuals = diff_series

        self._original_series = series
        self._diff_series = diff_series

    def forecast(self, steps: int) -> np.ndarray:
        """Generate forecasts for future time steps."""
        # Forecast differenced series
        diff_forecast = []
        history = list(self._diff_series)
        errors = list(self._residuals) if self._residuals is not None else []

        for _ in range(steps):
            ar_term = 0
            if self.p > 0 and len(history) >= self.p:
                for i in range(self.p):
                    ar_term += self.ar_params[i] * history[-(i+1)]

            forecast_val = self.constant + ar_term
            diff_forecast.append(forecast_val)
            history.append(forecast_val)
            errors.append(0)

        diff_forecast = np.array(diff_forecast)

        # Integrate (undo differencing)
        result = diff_forecast.copy()
        for _ in range(self.d):
            last_val = self._original_series[-1] if self.d == 1 else self._diff_series[-1]
            result = np.cumsum(np.concatenate([[last_val], result]))[1:]

        return result

Auto-ARIMA for Automatic Model Selection

def auto_arima_selection(series: np.ndarray, max_p: int = 5,
                          max_d: int = 2, max_q: int = 5) -> dict:
    """
    Select optimal ARIMA parameters using information criteria.
    In production, use pmdarima.auto_arima().
    """
    best_aic = float('inf')
    best_params = (0, 0, 0)

    # Determine d
    from statsmodels.tsa.stattools import adfuller
    d = 0
    temp = series.copy()
    for i in range(max_d + 1):
        result = adfuller(temp)
        if result[1] < 0.05:
            d = i
            break
        temp = np.diff(temp)
    else:
        d = max_d

    # Search over p, q
    for p in range(max_p + 1):
        for q in range(max_q + 1):
            try:
                from statsmodels.tsa.arima.model import ARIMA
                model = ARIMA(series, order=(p, d, q))
                fitted = model.fit()
                if fitted.aic < best_aic:
                    best_aic = fitted.aic
                    best_params = (p, d, q)
            except Exception:
                continue

    return {
        'order': best_params,
        'aic': best_aic,
        'p': best_params[0],
        'd': best_params[1],
        'q': best_params[2]
    }

Exponential Smoothing

class ExponentialSmoothing:
    """
    Holt-Winters triple exponential smoothing.
    Handles level, trend, and seasonality.
    """

    def __init__(self, alpha: float = 0.3, beta: float = 0.1,
                 gamma: float = 0.1, period: int = 12,
                 trend_type: str = 'additive',
                 seasonal_type: str = 'additive'):
        self.alpha = alpha  # Level smoothing
        self.beta = beta    # Trend smoothing
        self.gamma = gamma  # Seasonal smoothing
        self.period = period
        self.trend_type = trend_type
        self.seasonal_type = seasonal_type

    def fit(self, series: np.ndarray):
        """Fit the model to historical data."""
        n = len(series)
        self.level = np.zeros(n)
        self.trend = np.zeros(n)
        self.season = np.zeros(n)

        # Initialize
        self.level[0] = series[0]
        self.trend[0] = (series[min(self.period, n-1)] - series[0]) / self.period

        # Initialize seasonal factors
        for i in range(min(self.period, n)):
            self.season[i] = series[i] - np.mean(series[:self.period])

        # Iterative smoothing
        for t in range(1, n):
            season_idx = t - self.period if t >= self.period else t

            if self.seasonal_type == 'additive':
                self.level[t] = self.alpha * (series[t] - self.season[season_idx]) + \
                                (1 - self.alpha) * (self.level[t-1] + self.trend[t-1])
                self.trend[t] = self.beta * (self.level[t] - self.level[t-1]) + \
                                (1 - self.beta) * self.trend[t-1]
                if t >= self.period:
                    self.season[t] = self.gamma * (series[t] - self.level[t]) + \
                                     (1 - self.gamma) * self.season[season_idx]
            else:  # multiplicative
                self.level[t] = self.alpha * (series[t] / max(self.season[season_idx], 0.01)) + \
                                (1 - self.alpha) * (self.level[t-1] + self.trend[t-1])
                self.trend[t] = self.beta * (self.level[t] - self.level[t-1]) + \
                                (1 - self.beta) * self.trend[t-1]
                if t >= self.period:
                    self.season[t] = self.gamma * (series[t] / max(self.level[t], 0.01)) + \
                                     (1 - self.gamma) * self.season[season_idx]

        self._series = series
        self._n = n

    def forecast(self, steps: int) -> np.ndarray:
        """Forecast future values."""
        forecasts = np.zeros(steps)
        last_level = self.level[self._n - 1]
        last_trend = self.trend[self._n - 1]

        for h in range(1, steps + 1):
            season_idx = self._n - self.period + ((h - 1) % self.period)

            if self.seasonal_type == 'additive':
                forecasts[h-1] = last_level + h * last_trend + self.season[season_idx]
            else:
                forecasts[h-1] = (last_level + h * last_trend) * self.season[season_idx]

        return forecasts

Prophet

Facebook Prophet for Business Forecasting

def prophet_forecast(df: pd.DataFrame, periods: int = 365,
                     changepoints: list = None) -> dict:
    """
    Prophet forecasting with customization.
    Expects df with columns 'ds' (date) and 'y' (value).

    Prophet excels at:
    - Multiple seasonalities (daily, weekly, yearly)
    - Holiday effects
    - Handling missing data
    - Changepoint detection
    - Interpretable components
    """
    from prophet import Prophet

    model = Prophet(
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=False,
        changepoint_prior_scale=0.05,  # Flexibility of trend changes
        seasonality_prior_scale=10.0,   # Flexibility of seasonality
        changepoints=changepoints
    )

    # Add custom seasonalities
    model.add_seasonality(
        name='monthly',
        period=30.5,
        fourier_order=5
    )

    # Add holidays/events
    # model.add_country_holidays(country_name='US')

    model.fit(df)

    # Make future dataframe
    future = model.make_future_dataframe(periods=periods)
    forecast = model.predict(future)

    # Extract components
    return {
        'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']],
        'trend': forecast[['ds', 'trend']],
        'seasonality': {
            'weekly': forecast[['ds', 'weekly']],
            'yearly': forecast[['ds', 'yearly']],
        },
        'changepoints': model.changepoints.tolist(),
        'model': model
    }

Neural Network Approaches

LSTM for Time Series

import torch
import torch.nn as nn

class LSTMForecaster(nn.Module):
    """LSTM-based time series forecaster."""

    def __init__(self, input_size: int = 1, hidden_size: int = 64,
                 num_layers: int = 2, output_size: int = 1,
                 dropout: float = 0.2):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_size)
        lstm_out, _ = self.lstm(x)
        # Take the last time step's output
        last_output = lstm_out[:, -1, :]
        prediction = self.fc(last_output)
        return prediction


def train_lstm_forecaster(series: np.ndarray, lookback: int = 30,
                          forecast_horizon: int = 7,
                          epochs: int = 100) -> dict:
    """Train an LSTM forecaster on time series data."""

    # Normalize
    mean = np.mean(series)
    std = np.std(series)
    normalized = (series - mean) / std

    # Create sequences
    X, y = [], []
    for i in range(len(normalized) - lookback - forecast_horizon):
        X.append(normalized[i:i + lookback])
        y.append(normalized[i + lookback:i + lookback + forecast_horizon])

    X = torch.FloatTensor(np.array(X)).unsqueeze(-1)
    y = torch.FloatTensor(np.array(y))

    # Split
    split = int(0.8 * len(X))
    X_train, X_val = X[:split], X[split:]
    y_train, y_val = y[:split], y[split:]

    # Model
    model = LSTMForecaster(output_size=forecast_horizon)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Train
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 20 == 0:
            model.eval()
            with torch.no_grad():
                val_output = model(X_val)
                val_loss = criterion(val_output, y_val)
            print(f"Epoch {epoch+1}: Train Loss={loss.item():.4f}, Val Loss={val_loss.item():.4f}")

    return {
        'model': model,
        'mean': mean,
        'std': std,
        'lookback': lookback,
        'forecast_horizon': forecast_horizon
    }

Transformer for Time Series

class TimeSeriesTransformer(nn.Module):
    """Transformer-based time series forecaster."""

    def __init__(self, input_size: int = 1, d_model: int = 64,
                 nhead: int = 4, num_layers: int = 2,
                 output_size: int = 1, dropout: float = 0.1):
        super().__init__()

        self.input_projection = nn.Linear(input_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead,
            dim_feedforward=d_model * 4,
            dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.output_projection = nn.Linear(d_model, output_size)

    def forward(self, x):
        x = self.input_projection(x)
        x = self.positional_encoding(x)
        x = self.transformer(x)
        output = self.output_projection(x[:, -1, :])
        return output


class PositionalEncoding(nn.Module):
    """Positional encoding for transformer."""

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

Anomaly Detection in Time Series

class TimeSeriesAnomalyDetector:
    """Detect anomalies in time series data."""

    def __init__(self, window_size: int = 20, threshold_sigma: float = 3.0):
        self.window_size = window_size
        self.threshold_sigma = threshold_sigma

    def detect_statistical(self, series: np.ndarray) -> list:
        """Detect anomalies using rolling statistics."""
        anomalies = []

        for i in range(self.window_size, len(series)):
            window = series[i - self.window_size:i]
            mean = np.mean(window)
            std = np.std(window)

            if std > 0:
                z_score = abs(series[i] - mean) / std
                if z_score > self.threshold_sigma:
                    anomalies.append({
                        'index': i,
                        'value': series[i],
                        'expected': mean,
                        'z_score': z_score,
                        'type': 'spike' if series[i] > mean else 'dip'
                    })

        return anomalies

    def detect_seasonal(self, series: np.ndarray, period: int) -> list:
        """Detect anomalies relative to seasonal pattern."""
        anomalies = []

        # Build seasonal profile
        seasonal_means = {}
        seasonal_stds = {}
        for pos in range(period):
            values = series[pos::period]
            seasonal_means[pos] = np.mean(values)
            seasonal_stds[pos] = np.std(values)

        for i, value in enumerate(series):
            pos = i % period
            expected = seasonal_means[pos]
            std = seasonal_stds[pos]

            if std > 0:
                z_score = abs(value - expected) / std
                if z_score > self.threshold_sigma:
                    anomalies.append({
                        'index': i,
                        'value': value,
                        'seasonal_expected': expected,
                        'z_score': z_score
                    })

        return anomalies

    def detect_changepoints(self, series: np.ndarray,
                           min_segment: int = 10) -> list:
        """Detect structural changes in the time series."""
        changepoints = []

        for i in range(min_segment, len(series) - min_segment):
            before = series[i - min_segment:i]
            after = series[i:i + min_segment]

            # t-test for mean difference
            t_stat, p_value = stats.ttest_ind(before, after)

            if p_value < 0.001:
                changepoints.append({
                    'index': i,
                    'before_mean': np.mean(before),
                    'after_mean': np.mean(after),
                    'p_value': p_value,
                    'shift': np.mean(after) - np.mean(before)
                })

        # Filter to keep only most significant changepoints
        if changepoints:
            changepoints.sort(key=lambda x: x['p_value'])
            filtered = [changepoints[0]]
            for cp in changepoints[1:]:
                if all(abs(cp['index'] - f['index']) > min_segment for f in filtered):
                    filtered.append(cp)
            return filtered

        return changepoints

Model Selection Guide

Dataset Size    | Seasonality | Trend  | Best Approaches
< 2 periods    | No          | No     | Simple average, Naive
< 2 periods    | No          | Yes    | Linear regression, Holt
2-10 periods   | Yes         | Any    | Holt-Winters, SARIMA
10+ periods    | Yes         | Any    | Prophet, SARIMA, LSTM
100+ periods   | Complex     | Any    | Transformer, N-BEATS, ensemble
Any            | Unknown     | Any    | Auto-ARIMA, then validate

Key Takeaways

  1. Always decompose the series first (trend, seasonal, residual) to understand what you are forecasting
  2. Test for stationarity before applying ARIMA; differencing is the standard remedy for non-stationarity
  3. Exponential smoothing (Holt-Winters) is a strong baseline that is often competitive with complex methods
  4. Prophet excels at business forecasting with multiple seasonalities and interpretable components
  5. LSTMs and transformers require more data but can capture nonlinear patterns that statistical methods miss
  6. Anomaly detection should run alongside forecasting to flag when the generating process may have changed
  7. Model selection depends on data size, complexity, and whether interpretability is required
  8. Always evaluate with proper time-series cross-validation (expanding or sliding window), never random splits

Install this skill directly: skilldb add prediction-skills

Get CLI access →