Autonomous AgentsPrediction660 lines
Time Series Forecasting
Quick Summary21 lines
Time series forecasting predicts future values based on historically ordered observations. From stock prices to server load to disease spread, time series data is everywhere. This skill covers the full spectrum from classical statistical methods (ARIMA, exponential smoothing) through modern approaches (Prophet, neural networks with LSTMs and transformers), including seasonal decomposition, trend analysis, and anomaly detection. ## Key Points 1. Always decompose the series first (trend, seasonal, residual) to understand what you are forecasting 2. Test for stationarity before applying ARIMA; differencing is the standard remedy for non-stationarity 3. Exponential smoothing (Holt-Winters) is a strong baseline that is often competitive with complex methods 4. Prophet excels at business forecasting with multiple seasonalities and interpretable components 5. LSTMs and transformers require more data but can capture nonlinear patterns that statistical methods miss 6. Anomaly detection should run alongside forecasting to flag when the generating process may have changed 7. Model selection depends on data size, complexity, and whether interpretability is required 8. Always evaluate with proper time-series cross-validation (expanding or sliding window), never random splits ## Quick Example ``` Observed = Trend + Seasonality + Cyclical + Residual (additive) Observed = Trend × Seasonality × Cyclical × Residual (multiplicative) ```
skilldb get prediction-skills/time-series-forecastingFull skill: 660 linesPaste into your CLAUDE.md or agent config
Time Series Forecasting
Overview
Time series forecasting predicts future values based on historically ordered observations. From stock prices to server load to disease spread, time series data is everywhere. This skill covers the full spectrum from classical statistical methods (ARIMA, exponential smoothing) through modern approaches (Prophet, neural networks with LSTMs and transformers), including seasonal decomposition, trend analysis, and anomaly detection.
Foundations
Time Series Components
Every time series can be decomposed into fundamental components:
Observed = Trend + Seasonality + Cyclical + Residual (additive)
Observed = Trend × Seasonality × Cyclical × Residual (multiplicative)
import numpy as np
import pandas as pd
from scipy import signal
class TimeSeriesDecomposition:
"""Decompose a time series into trend, seasonal, and residual components."""
def __init__(self, data: np.ndarray, period: int, model: str = 'additive'):
self.data = data
self.period = period
self.model = model
def decompose(self) -> dict:
"""Classical decomposition."""
n = len(self.data)
# Trend: centered moving average
trend = self._moving_average(self.data, self.period)
# Detrend
if self.model == 'additive':
detrended = self.data - trend
else:
detrended = self.data / np.where(trend != 0, trend, 1)
# Seasonal: average detrended values for each position in the cycle
seasonal = np.zeros(n)
for i in range(self.period):
indices = range(i, n, self.period)
valid = [detrended[j] for j in indices if not np.isnan(detrended[j])]
if valid:
seasonal_value = np.mean(valid)
for j in indices:
seasonal[j] = seasonal_value
# Residual
if self.model == 'additive':
residual = self.data - trend - seasonal
else:
residual = self.data / (trend * seasonal + 1e-10)
return {
'observed': self.data,
'trend': trend,
'seasonal': seasonal,
'residual': residual
}
def _moving_average(self, data: np.ndarray, window: int) -> np.ndarray:
"""Centered moving average."""
result = np.full(len(data), np.nan)
half = window // 2
for i in range(half, len(data) - half):
result[i] = np.mean(data[i - half:i + half + 1])
return result
Stationarity
Most forecasting methods require or benefit from stationary data (constant mean and variance over time).
from scipy import stats
def test_stationarity(series: np.ndarray) -> dict:
"""Test if a time series is stationary using multiple tests."""
results = {}
# ADF test (requires statsmodels)
try:
from statsmodels.tsa.stattools import adfuller
adf_result = adfuller(series)
results['adf'] = {
'statistic': adf_result[0],
'p_value': adf_result[1],
'is_stationary': adf_result[1] < 0.05
}
except ImportError:
pass
# Simple rolling statistics check
n = len(series)
half = n // 2
first_half = series[:half]
second_half = series[half:]
results['mean_shift'] = {
'first_half_mean': np.mean(first_half),
'second_half_mean': np.mean(second_half),
'relative_change': abs(np.mean(second_half) - np.mean(first_half)) / (abs(np.mean(first_half)) + 1e-10)
}
results['variance_shift'] = {
'first_half_var': np.var(first_half),
'second_half_var': np.var(second_half),
'ratio': np.var(second_half) / (np.var(first_half) + 1e-10)
}
return results
def make_stationary(series: np.ndarray) -> tuple:
"""Apply differencing to make a series stationary."""
# First difference
diff1 = np.diff(series)
stationarity = test_stationarity(diff1)
if stationarity.get('adf', {}).get('is_stationary', True):
return diff1, 1 # d=1
# Second difference
diff2 = np.diff(diff1)
return diff2, 2 # d=2
Classical Methods
ARIMA (AutoRegressive Integrated Moving Average)
class SimpleARIMA:
"""
Simplified ARIMA implementation for understanding.
For production use statsmodels or pmdarima.
ARIMA(p, d, q):
p = autoregressive order (how many past values to use)
d = differencing order (how many times to difference)
q = moving average order (how many past errors to use)
"""
def __init__(self, p: int, d: int, q: int):
self.p = p
self.d = d
self.q = q
self.ar_params = None
self.ma_params = None
self.constant = 0
def fit(self, series: np.ndarray):
"""Fit ARIMA model using OLS for AR component."""
# Difference the series
diff_series = series.copy()
for _ in range(self.d):
diff_series = np.diff(diff_series)
# Fit AR component using least squares
if self.p > 0:
X = np.column_stack([
diff_series[self.p - i - 1:-i-1 if i+1 < len(diff_series) else None]
for i in range(self.p)
])
y = diff_series[self.p:]
min_len = min(len(X), len(y))
X = X[:min_len]
y = y[:min_len]
# OLS
self.ar_params = np.linalg.lstsq(X, y, rcond=None)[0]
self.constant = np.mean(y) - X @ self.ar_params
self._residuals = y - (X @ self.ar_params + self.constant)
else:
self.ar_params = np.array([])
self._residuals = diff_series
self._original_series = series
self._diff_series = diff_series
def forecast(self, steps: int) -> np.ndarray:
"""Generate forecasts for future time steps."""
# Forecast differenced series
diff_forecast = []
history = list(self._diff_series)
errors = list(self._residuals) if self._residuals is not None else []
for _ in range(steps):
ar_term = 0
if self.p > 0 and len(history) >= self.p:
for i in range(self.p):
ar_term += self.ar_params[i] * history[-(i+1)]
forecast_val = self.constant + ar_term
diff_forecast.append(forecast_val)
history.append(forecast_val)
errors.append(0)
diff_forecast = np.array(diff_forecast)
# Integrate (undo differencing)
result = diff_forecast.copy()
for _ in range(self.d):
last_val = self._original_series[-1] if self.d == 1 else self._diff_series[-1]
result = np.cumsum(np.concatenate([[last_val], result]))[1:]
return result
Auto-ARIMA for Automatic Model Selection
def auto_arima_selection(series: np.ndarray, max_p: int = 5,
max_d: int = 2, max_q: int = 5) -> dict:
"""
Select optimal ARIMA parameters using information criteria.
In production, use pmdarima.auto_arima().
"""
best_aic = float('inf')
best_params = (0, 0, 0)
# Determine d
from statsmodels.tsa.stattools import adfuller
d = 0
temp = series.copy()
for i in range(max_d + 1):
result = adfuller(temp)
if result[1] < 0.05:
d = i
break
temp = np.diff(temp)
else:
d = max_d
# Search over p, q
for p in range(max_p + 1):
for q in range(max_q + 1):
try:
from statsmodels.tsa.arima.model import ARIMA
model = ARIMA(series, order=(p, d, q))
fitted = model.fit()
if fitted.aic < best_aic:
best_aic = fitted.aic
best_params = (p, d, q)
except Exception:
continue
return {
'order': best_params,
'aic': best_aic,
'p': best_params[0],
'd': best_params[1],
'q': best_params[2]
}
Exponential Smoothing
class ExponentialSmoothing:
"""
Holt-Winters triple exponential smoothing.
Handles level, trend, and seasonality.
"""
def __init__(self, alpha: float = 0.3, beta: float = 0.1,
gamma: float = 0.1, period: int = 12,
trend_type: str = 'additive',
seasonal_type: str = 'additive'):
self.alpha = alpha # Level smoothing
self.beta = beta # Trend smoothing
self.gamma = gamma # Seasonal smoothing
self.period = period
self.trend_type = trend_type
self.seasonal_type = seasonal_type
def fit(self, series: np.ndarray):
"""Fit the model to historical data."""
n = len(series)
self.level = np.zeros(n)
self.trend = np.zeros(n)
self.season = np.zeros(n)
# Initialize
self.level[0] = series[0]
self.trend[0] = (series[min(self.period, n-1)] - series[0]) / self.period
# Initialize seasonal factors
for i in range(min(self.period, n)):
self.season[i] = series[i] - np.mean(series[:self.period])
# Iterative smoothing
for t in range(1, n):
season_idx = t - self.period if t >= self.period else t
if self.seasonal_type == 'additive':
self.level[t] = self.alpha * (series[t] - self.season[season_idx]) + \
(1 - self.alpha) * (self.level[t-1] + self.trend[t-1])
self.trend[t] = self.beta * (self.level[t] - self.level[t-1]) + \
(1 - self.beta) * self.trend[t-1]
if t >= self.period:
self.season[t] = self.gamma * (series[t] - self.level[t]) + \
(1 - self.gamma) * self.season[season_idx]
else: # multiplicative
self.level[t] = self.alpha * (series[t] / max(self.season[season_idx], 0.01)) + \
(1 - self.alpha) * (self.level[t-1] + self.trend[t-1])
self.trend[t] = self.beta * (self.level[t] - self.level[t-1]) + \
(1 - self.beta) * self.trend[t-1]
if t >= self.period:
self.season[t] = self.gamma * (series[t] / max(self.level[t], 0.01)) + \
(1 - self.gamma) * self.season[season_idx]
self._series = series
self._n = n
def forecast(self, steps: int) -> np.ndarray:
"""Forecast future values."""
forecasts = np.zeros(steps)
last_level = self.level[self._n - 1]
last_trend = self.trend[self._n - 1]
for h in range(1, steps + 1):
season_idx = self._n - self.period + ((h - 1) % self.period)
if self.seasonal_type == 'additive':
forecasts[h-1] = last_level + h * last_trend + self.season[season_idx]
else:
forecasts[h-1] = (last_level + h * last_trend) * self.season[season_idx]
return forecasts
Prophet
Facebook Prophet for Business Forecasting
def prophet_forecast(df: pd.DataFrame, periods: int = 365,
changepoints: list = None) -> dict:
"""
Prophet forecasting with customization.
Expects df with columns 'ds' (date) and 'y' (value).
Prophet excels at:
- Multiple seasonalities (daily, weekly, yearly)
- Holiday effects
- Handling missing data
- Changepoint detection
- Interpretable components
"""
from prophet import Prophet
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
changepoint_prior_scale=0.05, # Flexibility of trend changes
seasonality_prior_scale=10.0, # Flexibility of seasonality
changepoints=changepoints
)
# Add custom seasonalities
model.add_seasonality(
name='monthly',
period=30.5,
fourier_order=5
)
# Add holidays/events
# model.add_country_holidays(country_name='US')
model.fit(df)
# Make future dataframe
future = model.make_future_dataframe(periods=periods)
forecast = model.predict(future)
# Extract components
return {
'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']],
'trend': forecast[['ds', 'trend']],
'seasonality': {
'weekly': forecast[['ds', 'weekly']],
'yearly': forecast[['ds', 'yearly']],
},
'changepoints': model.changepoints.tolist(),
'model': model
}
Neural Network Approaches
LSTM for Time Series
import torch
import torch.nn as nn
class LSTMForecaster(nn.Module):
"""LSTM-based time series forecaster."""
def __init__(self, input_size: int = 1, hidden_size: int = 64,
num_layers: int = 2, output_size: int = 1,
dropout: float = 0.2):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# x shape: (batch_size, sequence_length, input_size)
lstm_out, _ = self.lstm(x)
# Take the last time step's output
last_output = lstm_out[:, -1, :]
prediction = self.fc(last_output)
return prediction
def train_lstm_forecaster(series: np.ndarray, lookback: int = 30,
forecast_horizon: int = 7,
epochs: int = 100) -> dict:
"""Train an LSTM forecaster on time series data."""
# Normalize
mean = np.mean(series)
std = np.std(series)
normalized = (series - mean) / std
# Create sequences
X, y = [], []
for i in range(len(normalized) - lookback - forecast_horizon):
X.append(normalized[i:i + lookback])
y.append(normalized[i + lookback:i + lookback + forecast_horizon])
X = torch.FloatTensor(np.array(X)).unsqueeze(-1)
y = torch.FloatTensor(np.array(y))
# Split
split = int(0.8 * len(X))
X_train, X_val = X[:split], X[split:]
y_train, y_val = y[:split], y[split:]
# Model
model = LSTMForecaster(output_size=forecast_horizon)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Train
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
output = model(X_train)
loss = criterion(output, y_train)
loss.backward()
optimizer.step()
if (epoch + 1) % 20 == 0:
model.eval()
with torch.no_grad():
val_output = model(X_val)
val_loss = criterion(val_output, y_val)
print(f"Epoch {epoch+1}: Train Loss={loss.item():.4f}, Val Loss={val_loss.item():.4f}")
return {
'model': model,
'mean': mean,
'std': std,
'lookback': lookback,
'forecast_horizon': forecast_horizon
}
Transformer for Time Series
class TimeSeriesTransformer(nn.Module):
"""Transformer-based time series forecaster."""
def __init__(self, input_size: int = 1, d_model: int = 64,
nhead: int = 4, num_layers: int = 2,
output_size: int = 1, dropout: float = 0.1):
super().__init__()
self.input_projection = nn.Linear(input_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, dropout)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead,
dim_feedforward=d_model * 4,
dropout=dropout, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.output_projection = nn.Linear(d_model, output_size)
def forward(self, x):
x = self.input_projection(x)
x = self.positional_encoding(x)
x = self.transformer(x)
output = self.output_projection(x[:, -1, :])
return output
class PositionalEncoding(nn.Module):
"""Positional encoding for transformer."""
def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)]
return self.dropout(x)
Anomaly Detection in Time Series
class TimeSeriesAnomalyDetector:
"""Detect anomalies in time series data."""
def __init__(self, window_size: int = 20, threshold_sigma: float = 3.0):
self.window_size = window_size
self.threshold_sigma = threshold_sigma
def detect_statistical(self, series: np.ndarray) -> list:
"""Detect anomalies using rolling statistics."""
anomalies = []
for i in range(self.window_size, len(series)):
window = series[i - self.window_size:i]
mean = np.mean(window)
std = np.std(window)
if std > 0:
z_score = abs(series[i] - mean) / std
if z_score > self.threshold_sigma:
anomalies.append({
'index': i,
'value': series[i],
'expected': mean,
'z_score': z_score,
'type': 'spike' if series[i] > mean else 'dip'
})
return anomalies
def detect_seasonal(self, series: np.ndarray, period: int) -> list:
"""Detect anomalies relative to seasonal pattern."""
anomalies = []
# Build seasonal profile
seasonal_means = {}
seasonal_stds = {}
for pos in range(period):
values = series[pos::period]
seasonal_means[pos] = np.mean(values)
seasonal_stds[pos] = np.std(values)
for i, value in enumerate(series):
pos = i % period
expected = seasonal_means[pos]
std = seasonal_stds[pos]
if std > 0:
z_score = abs(value - expected) / std
if z_score > self.threshold_sigma:
anomalies.append({
'index': i,
'value': value,
'seasonal_expected': expected,
'z_score': z_score
})
return anomalies
def detect_changepoints(self, series: np.ndarray,
min_segment: int = 10) -> list:
"""Detect structural changes in the time series."""
changepoints = []
for i in range(min_segment, len(series) - min_segment):
before = series[i - min_segment:i]
after = series[i:i + min_segment]
# t-test for mean difference
t_stat, p_value = stats.ttest_ind(before, after)
if p_value < 0.001:
changepoints.append({
'index': i,
'before_mean': np.mean(before),
'after_mean': np.mean(after),
'p_value': p_value,
'shift': np.mean(after) - np.mean(before)
})
# Filter to keep only most significant changepoints
if changepoints:
changepoints.sort(key=lambda x: x['p_value'])
filtered = [changepoints[0]]
for cp in changepoints[1:]:
if all(abs(cp['index'] - f['index']) > min_segment for f in filtered):
filtered.append(cp)
return filtered
return changepoints
Model Selection Guide
Dataset Size | Seasonality | Trend | Best Approaches
< 2 periods | No | No | Simple average, Naive
< 2 periods | No | Yes | Linear regression, Holt
2-10 periods | Yes | Any | Holt-Winters, SARIMA
10+ periods | Yes | Any | Prophet, SARIMA, LSTM
100+ periods | Complex | Any | Transformer, N-BEATS, ensemble
Any | Unknown | Any | Auto-ARIMA, then validate
Key Takeaways
- Always decompose the series first (trend, seasonal, residual) to understand what you are forecasting
- Test for stationarity before applying ARIMA; differencing is the standard remedy for non-stationarity
- Exponential smoothing (Holt-Winters) is a strong baseline that is often competitive with complex methods
- Prophet excels at business forecasting with multiple seasonalities and interpretable components
- LSTMs and transformers require more data but can capture nonlinear patterns that statistical methods miss
- Anomaly detection should run alongside forecasting to flag when the generating process may have changed
- Model selection depends on data size, complexity, and whether interpretability is required
- Always evaluate with proper time-series cross-validation (expanding or sliding window), never random splits
Install this skill directly: skilldb add prediction-skills