Crowd Wisdom Aggregation
Crowd wisdom aggregation transforms individual judgments from diverse forecasters into collective predictions that consistently outperform any individual. This goes beyond simple averaging: structured techniques like the Delphi method, extremizing aggregation, the surprisingly popular algorithm, and expert weighting can extract signal from noise, correct for cognitive biases, and identify hidden knowledge in the crowd. Getting aggregation right is often more valuable than improving any individual forecast. ## Key Points 1. The Delphi method produces convergent group estimates through iterative anonymous feedback; 2-3 rounds typically suffice 2. Simple averages are systematically too moderate; extremizing (pushing away from 50%) corrects for shared-information dilution 3. The Surprisingly Popular algorithm extracts hidden knowledge by comparing actual answers to predicted answers; minority knowledge surfaces 4. Performance-based weighting dominates expertise-declared weighting; past accuracy is the best predictor of future accuracy 5. Bayesian weighting naturally upweights accurate forecasters and downweights inaccurate ones as data accumulates 6. Cognitive bias correction (anchoring, availability, groupthink) should be applied during aggregation, not expected from individual forecasters 7. Structured elicitation (base rate first, then adjustments, then devil's advocate) produces more calibrated individual forecasts before they even enter aggregation 8. The optimal aggregation method depends on the forecaster pool: equal weighting for diverse novices, performance weighting for tracked experts, extremizing for all
skilldb get prediction-skills/crowd-wisdom-aggregationFull skill: 674 linesCrowd Wisdom Aggregation
Overview
Crowd wisdom aggregation transforms individual judgments from diverse forecasters into collective predictions that consistently outperform any individual. This goes beyond simple averaging: structured techniques like the Delphi method, extremizing aggregation, the surprisingly popular algorithm, and expert weighting can extract signal from noise, correct for cognitive biases, and identify hidden knowledge in the crowd. Getting aggregation right is often more valuable than improving any individual forecast.
The Delphi Method
Classical Delphi
The Delphi method uses iterative rounds of anonymous forecasting with controlled feedback to achieve convergent, calibrated group estimates:
class DelphiProcess:
"""
Multi-round Delphi forecasting process.
Key features:
- Anonymity: prevents dominance by authority or personality
- Iteration: multiple rounds allow learning from others
- Controlled feedback: participants see group statistics, not individuals
- Statistical aggregation: final answer uses median and interquartile range
"""
def __init__(self, question: str, n_rounds: int = 3):
self.question = question
self.n_rounds = n_rounds
self.rounds = []
self.participants = {}
def run_round(self, responses: dict) -> dict:
"""
Process one round of Delphi responses.
responses: {participant_id: {
'estimate': float,
'confidence': float, # 0-1
'reasoning': str
}}
"""
estimates = [r['estimate'] for r in responses.values()]
confidences = [r['confidence'] for r in responses.values()]
round_summary = {
'round_number': len(self.rounds) + 1,
'n_responses': len(responses),
'median': np.median(estimates),
'mean': np.mean(estimates),
'q25': np.percentile(estimates, 25),
'q75': np.percentile(estimates, 75),
'iqr': np.percentile(estimates, 75) - np.percentile(estimates, 25),
'std': np.std(estimates),
'avg_confidence': np.mean(confidences),
'responses': responses,
'convergence': self._measure_convergence(estimates)
}
self.rounds.append(round_summary)
return round_summary
def generate_feedback(self, round_num: int) -> dict:
"""Generate anonymous feedback for the next round."""
summary = self.rounds[round_num]
# Identify outliers to ask for reasoning
median = summary['median']
iqr = summary['iqr']
outliers = {
pid: resp for pid, resp in summary['responses'].items()
if abs(resp['estimate'] - median) > 1.5 * iqr
}
return {
'group_statistics': {
'median': summary['median'],
'interquartile_range': (summary['q25'], summary['q75']),
'number_of_respondents': summary['n_responses']
},
'your_position': 'To be personalized per participant',
'outlier_reasoning': [
r['reasoning'] for r in outliers.values()
] if outliers else [],
'instruction': (
'Please reconsider your estimate in light of the group statistics. '
'You may maintain your original estimate if you believe it is well-founded, '
'or adjust it. Please provide updated reasoning.'
)
}
def _measure_convergence(self, estimates: list) -> float:
"""Measure how much the group has converged (0=dispersed, 1=converged)."""
if len(estimates) < 2:
return 0
cv = np.std(estimates) / abs(np.mean(estimates)) if np.mean(estimates) != 0 else 1
return max(0, 1 - cv)
def final_result(self) -> dict:
"""Extract the final Delphi result."""
last_round = self.rounds[-1]
# Check convergence across rounds
convergence_trend = [r['convergence'] for r in self.rounds]
converged = len(convergence_trend) > 1 and convergence_trend[-1] > convergence_trend[0]
return {
'best_estimate': last_round['median'],
'uncertainty_range': (last_round['q25'], last_round['q75']),
'n_rounds': len(self.rounds),
'converged': converged,
'convergence_trend': convergence_trend,
'final_spread': last_round['iqr'],
'n_participants': last_round['n_responses']
}
Modified Delphi Variants
class PolicyDelphi:
"""
Policy Delphi: designed for policy analysis, not consensus.
Goal is to explore ALL defensible positions, not converge.
"""
def __init__(self, policy_question: str, options: list):
self.question = policy_question
self.options = options
self.rounds = []
def collect_votes(self, responses: dict) -> dict:
"""
Responses include: desirability, feasibility, and importance ratings.
"""
results = {}
for option in self.options:
option_responses = {
pid: resp[option] for pid, resp in responses.items()
if option in resp
}
if option_responses:
desirability = [r.get('desirability', 3) for r in option_responses.values()]
feasibility = [r.get('feasibility', 3) for r in option_responses.values()]
importance = [r.get('importance', 3) for r in option_responses.values()]
results[option] = {
'avg_desirability': np.mean(desirability),
'avg_feasibility': np.mean(feasibility),
'avg_importance': np.mean(importance),
'consensus': np.std(desirability) < 1.0,
'polarized': np.std(desirability) > 2.0,
'combined_score': (
np.mean(desirability) * 0.4 +
np.mean(feasibility) * 0.3 +
np.mean(importance) * 0.3
)
}
return results
class RealTimeDelphi:
"""
Real-Time Delphi: continuous process without discrete rounds.
Participants can update their estimates at any time and see
running group statistics.
"""
def __init__(self, question: str):
self.question = question
self.estimates = {}
self.history = []
def update_estimate(self, participant_id: str, estimate: float,
confidence: float, reasoning: str = ''):
self.estimates[participant_id] = {
'estimate': estimate,
'confidence': confidence,
'reasoning': reasoning,
'timestamp': len(self.history)
}
self._record_state()
def current_state(self) -> dict:
estimates = [e['estimate'] for e in self.estimates.values()]
confidences = [e['confidence'] for e in self.estimates.values()]
if not estimates:
return {'n_participants': 0}
# Confidence-weighted aggregation
weighted_sum = sum(
e['estimate'] * e['confidence'] for e in self.estimates.values()
)
weight_total = sum(e['confidence'] for e in self.estimates.values())
weighted_mean = weighted_sum / weight_total if weight_total > 0 else 0
return {
'n_participants': len(self.estimates),
'simple_median': np.median(estimates),
'weighted_mean': weighted_mean,
'iqr': (np.percentile(estimates, 25), np.percentile(estimates, 75)),
'convergence': 1 - np.std(estimates) / abs(np.mean(estimates)) if np.mean(estimates) != 0 else 0,
'avg_confidence': np.mean(confidences)
}
def _record_state(self):
self.history.append(self.current_state())
Extremizing Aggregation
Why Simple Averages Are Too Moderate
Simple averaging produces forecasts that are too close to 50%. This happens because each forecaster shares some common information and has some unique information. Averaging double-counts the common information while diluting the unique information.
class ExtremizingAggregator:
"""
Extremize aggregate forecasts to correct for information dilution.
Push the average probability away from 50%.
"""
@staticmethod
def extremize(probability: float, factor: float = 1.5) -> float:
"""
Transform probability using extremizing factor.
factor > 1: push away from 50% (correct for dilution)
factor = 1: no change
factor < 1: push toward 50% (reduce extremity)
Uses log-odds transformation for proper behavior.
"""
if probability <= 0.001 or probability >= 0.999:
return probability
# Convert to log-odds
log_odds = np.log(probability / (1 - probability))
# Extremize
extremized_log_odds = log_odds * factor
# Convert back
result = 1 / (1 + np.exp(-extremized_log_odds))
return np.clip(result, 0.01, 0.99)
@staticmethod
def optimal_extremizing_factor(probabilities: list, outcomes: list) -> float:
"""
Find the optimal extremizing factor from historical data.
Minimize Brier score over historical forecasts.
"""
from scipy.optimize import minimize_scalar
mean_probs = np.array(probabilities)
outs = np.array(outcomes, dtype=float)
def brier_at_factor(factor):
extremized = np.array([
ExtremizingAggregator.extremize(p, factor) for p in mean_probs
])
return np.mean((extremized - outs) ** 2)
result = minimize_scalar(brier_at_factor, bounds=(0.5, 3.0), method='bounded')
return result.x
@staticmethod
def aggregate_and_extremize(forecaster_probs: list,
extremizing_factor: float = None,
n_forecasters: int = None) -> dict:
"""
Aggregate multiple forecaster probabilities and extremize.
If no explicit factor is given, use a heuristic based on
the number of forecasters (more forecasters = more extremizing needed).
"""
if not forecaster_probs:
return {'error': 'No forecasts'}
mean_prob = np.mean(forecaster_probs)
if extremizing_factor is None:
n = n_forecasters or len(forecaster_probs)
# Heuristic from Baron et al. (2014)
extremizing_factor = 1 + 0.15 * np.log(n)
extremized = ExtremizingAggregator.extremize(mean_prob, extremizing_factor)
return {
'simple_average': mean_prob,
'extremized': extremized,
'factor_used': extremizing_factor,
'shift': extremized - mean_prob,
'n_forecasters': len(forecaster_probs)
}
The Surprisingly Popular Algorithm
Leveraging Meta-Knowledge
class SurprisinglyPopular:
"""
The Surprisingly Popular algorithm (Prelec, Seung & McCoy, 2017).
Key insight: ask people not just for their answer, but also what
they think OTHER people will answer. Answers that are more popular
than people predict are more likely to be correct.
This extracts knowledge from minorities who know things the
majority doesn't.
"""
def __init__(self):
self.responses = []
def add_response(self, respondent_id: str, own_answer: str,
predicted_distribution: dict):
"""
own_answer: the respondent's own answer
predicted_distribution: {'yes': 0.6, 'no': 0.4} — what they think
the overall distribution of answers will be
"""
self.responses.append({
'id': respondent_id,
'answer': own_answer,
'prediction': predicted_distribution
})
def find_surprisingly_popular(self) -> dict:
"""
Compare actual vote distribution with predicted distribution.
The answer that is MORE popular than predicted is the
surprisingly popular answer — and more likely correct.
"""
if not self.responses:
return {'error': 'No responses'}
# Actual distribution
answer_counts = {}
for r in self.responses:
answer_counts[r['answer']] = answer_counts.get(r['answer'], 0) + 1
total = len(self.responses)
actual_distribution = {
answer: count / total for answer, count in answer_counts.items()
}
# Average predicted distribution
all_answers = list(actual_distribution.keys())
predicted_distribution = {}
for answer in all_answers:
predictions = [
r['prediction'].get(answer, 0) for r in self.responses
]
predicted_distribution[answer] = np.mean(predictions)
# Surprisingly popular: actual - predicted
surprise_scores = {}
for answer in all_answers:
surprise = actual_distribution.get(answer, 0) - predicted_distribution.get(answer, 0)
surprise_scores[answer] = surprise
# The surprisingly popular answer
sp_answer = max(surprise_scores, key=surprise_scores.get)
return {
'actual_distribution': actual_distribution,
'predicted_distribution': predicted_distribution,
'surprise_scores': surprise_scores,
'surprisingly_popular_answer': sp_answer,
'surprise_magnitude': surprise_scores[sp_answer],
'majority_answer': max(actual_distribution, key=actual_distribution.get),
'agrees_with_majority': sp_answer == max(actual_distribution, key=actual_distribution.get)
}
def continuous_version(self, estimates: list, predictions_of_others: list) -> dict:
"""
Continuous version for probability estimates.
Each person gives their estimate AND their prediction of the group average.
The SP correction pushes the aggregate toward those who
believe the crowd underestimates/overestimates.
"""
estimates = np.array(estimates)
predictions = np.array(predictions_of_others)
actual_mean = np.mean(estimates)
predicted_mean = np.mean(predictions)
# Surprise direction: actual mean is higher/lower than predicted
surprise = actual_mean - predicted_mean
# SP-adjusted estimate pushes further in the surprise direction
sp_estimate = actual_mean + surprise
return {
'simple_average': actual_mean,
'average_of_predictions': predicted_mean,
'surprise': surprise,
'sp_adjusted_estimate': np.clip(sp_estimate, 0, 1),
'direction': 'higher than expected' if surprise > 0 else 'lower than expected'
}
Expert Weighting
How to Weight Forecasters
class ExpertWeighter:
"""Methods for weighting forecasters based on track record and expertise."""
def __init__(self):
self.forecasters = {}
def register_forecaster(self, forecaster_id: str, track_record: dict = None):
self.forecasters[forecaster_id] = {
'track_record': track_record or {},
'weight': 1.0,
'n_forecasts': 0,
'cumulative_score': 0
}
def update_track_record(self, forecaster_id: str, brier_score: float):
f = self.forecasters[forecaster_id]
f['n_forecasts'] += 1
f['cumulative_score'] += brier_score
def compute_weights(self, method: str = 'performance') -> dict:
"""Compute forecaster weights using specified method."""
if method == 'performance':
return self._performance_weights()
elif method == 'expertise_declared':
return self._expertise_weights()
elif method == 'bayesian':
return self._bayesian_weights()
elif method == 'equal':
return {fid: 1.0 / len(self.forecasters) for fid in self.forecasters}
return {}
def _performance_weights(self) -> dict:
"""Weight by inverse Brier score (better accuracy = higher weight)."""
weights = {}
for fid, f in self.forecasters.items():
if f['n_forecasts'] > 0:
avg_brier = f['cumulative_score'] / f['n_forecasts']
# Inverse Brier: lower score = higher weight
weights[fid] = 1 / max(avg_brier, 0.01)
else:
weights[fid] = 1.0 # Default for new forecasters
# Normalize
total = sum(weights.values())
return {fid: w / total for fid, w in weights.items()}
def _bayesian_weights(self) -> dict:
"""
Bayesian weighting: update weights based on likelihood of
observed outcomes under each forecaster's predictions.
Start with equal priors, update with each resolved question.
"""
weights = {fid: 1.0 for fid in self.forecasters}
for fid, f in self.forecasters.items():
# Weight = exp(-cumulative Brier)
# Higher cumulative Brier (worse) = lower weight
if f['n_forecasts'] > 0:
avg_brier = f['cumulative_score'] / f['n_forecasts']
weights[fid] = np.exp(-5 * avg_brier) # Scaling factor for sensitivity
total = sum(weights.values())
return {fid: w / total for fid, w in weights.items()}
def _expertise_weights(self) -> dict:
"""Weight by self-declared domain expertise (less reliable)."""
weights = {}
for fid, f in self.forecasters.items():
expertise = f.get('track_record', {}).get('expertise_level', 0.5)
weights[fid] = expertise
total = sum(weights.values())
return {fid: w / total for fid, w in weights.items()}
def weighted_aggregate(self, forecasts: dict, method: str = 'performance') -> dict:
"""
Produce a weighted aggregate forecast.
forecasts: {forecaster_id: probability}
"""
weights = self.compute_weights(method)
weighted_sum = 0
total_weight = 0
for fid, prob in forecasts.items():
w = weights.get(fid, 1.0 / len(forecasts))
weighted_sum += prob * w
total_weight += w
result = weighted_sum / total_weight if total_weight > 0 else 0.5
return {
'weighted_aggregate': result,
'simple_average': np.mean(list(forecasts.values())),
'weights_used': weights,
'method': method,
'n_forecasters': len(forecasts)
}
Dealing with Cognitive Biases
Structured Debiasing During Aggregation
class BiasAwareAggregator:
"""Aggregation methods that correct for known cognitive biases."""
@staticmethod
def correct_anchoring(estimates: list, anchor_value: float = None) -> list:
"""
If an anchor was present (a starting number or previous estimate),
adjust estimates away from it.
"""
if anchor_value is None:
return estimates
adjusted = []
for est in estimates:
distance_from_anchor = est - anchor_value
# Estimates close to the anchor are likely biased; pull them further away
adjustment = distance_from_anchor * 0.3
adjusted.append(est + adjustment)
return adjusted
@staticmethod
def correct_availability_bias(probability: float,
media_coverage_intensity: float) -> float:
"""
Events with heavy media coverage are overestimated.
Adjust downward when media attention is disproportionate.
"""
# media_coverage_intensity: 0 (none) to 1 (saturating coverage)
if media_coverage_intensity > 0.7:
# Heavy coverage -> deflate probability
deflation = (media_coverage_intensity - 0.7) * 0.15
return max(0.01, probability - deflation)
return probability
@staticmethod
def correct_groupthink(individual_estimates: list,
discussion_estimates: list) -> list:
"""
After group discussion, estimates often converge too much.
Restore some of the pre-discussion variance.
"""
individual_var = np.var(individual_estimates)
discussion_var = np.var(discussion_estimates)
if discussion_var < individual_var * 0.5:
# Too much convergence; partially restore spread
mean = np.mean(discussion_estimates)
target_var = individual_var * 0.7 # Aim for 70% of original spread
scale = np.sqrt(target_var / max(discussion_var, 0.001))
restored = [mean + (est - mean) * scale for est in discussion_estimates]
return restored
return discussion_estimates
@staticmethod
def pre_mortem_adjustment(probability: float, n_failure_modes: int) -> float:
"""
Pre-mortem: imagine the forecast was wrong.
How many ways could it fail? More failure modes = lower confidence.
"""
adjustment = min(0.15, n_failure_modes * 0.03)
if probability > 0.5:
return probability - adjustment
else:
return probability + adjustment
Forecast Elicitation Best Practices
How to Ask for Forecasts
class ElicitationProtocol:
"""Best practices for eliciting forecasts from individuals."""
@staticmethod
def structured_elicitation(question: str) -> dict:
"""Generate a structured elicitation form."""
return {
'question': question,
'steps': [
{
'step': 1,
'instruction': 'Think of the BASE RATE. How often does this type of event happen?',
'field': 'base_rate_estimate'
},
{
'step': 2,
'instruction': 'List 3 factors that make this case MORE likely than the base rate.',
'field': 'factors_increasing'
},
{
'step': 3,
'instruction': 'List 3 factors that make this case LESS likely than the base rate.',
'field': 'factors_decreasing'
},
{
'step': 4,
'instruction': 'Provide your INITIAL probability estimate (0-100%).',
'field': 'initial_estimate'
},
{
'step': 5,
'instruction': 'Now imagine you are WRONG. What would the strongest argument against your estimate be?',
'field': 'devils_advocate'
},
{
'step': 6,
'instruction': 'Provide your FINAL probability estimate (0-100%).',
'field': 'final_estimate'
},
{
'step': 7,
'instruction': 'Rate your confidence in this estimate (1-5).',
'field': 'confidence'
}
]
}
@staticmethod
def probability_scale_guide() -> dict:
"""Help non-probabilists express uncertainty."""
return {
'99%': 'Virtually certain — as sure as you can be',
'90%': 'Very likely — would be surprised if wrong',
'75%': 'Likely — more probable than not, considerably',
'60%': 'Lean yes — somewhat more likely than not',
'50%': 'Coin flip — genuinely uncertain',
'40%': 'Lean no — somewhat less likely than not',
'25%': 'Unlikely — considerably more likely to not happen',
'10%': 'Very unlikely — would be surprised if it happened',
'1%': 'Virtually impossible — nearly certain it will not happen',
'note': 'Avoid using exactly 50% unless truly indifferent. '
'Even slight leanings (51% or 49%) carry information.'
}
Key Takeaways
- The Delphi method produces convergent group estimates through iterative anonymous feedback; 2-3 rounds typically suffice
- Simple averages are systematically too moderate; extremizing (pushing away from 50%) corrects for shared-information dilution
- The Surprisingly Popular algorithm extracts hidden knowledge by comparing actual answers to predicted answers; minority knowledge surfaces
- Performance-based weighting dominates expertise-declared weighting; past accuracy is the best predictor of future accuracy
- Bayesian weighting naturally upweights accurate forecasters and downweights inaccurate ones as data accumulates
- Cognitive bias correction (anchoring, availability, groupthink) should be applied during aggregation, not expected from individual forecasters
- Structured elicitation (base rate first, then adjustments, then devil's advocate) produces more calibrated individual forecasts before they even enter aggregation
- The optimal aggregation method depends on the forecaster pool: equal weighting for diverse novices, performance weighting for tracked experts, extremizing for all
Install this skill directly: skilldb add prediction-skills