Skip to main content
UncategorizedPrediction674 lines

Crowd Wisdom Aggregation

Quick Summary14 lines
Crowd wisdom aggregation transforms individual judgments from diverse forecasters into collective predictions that consistently outperform any individual. This goes beyond simple averaging: structured techniques like the Delphi method, extremizing aggregation, the surprisingly popular algorithm, and expert weighting can extract signal from noise, correct for cognitive biases, and identify hidden knowledge in the crowd. Getting aggregation right is often more valuable than improving any individual forecast.

## Key Points

1. The Delphi method produces convergent group estimates through iterative anonymous feedback; 2-3 rounds typically suffice
2. Simple averages are systematically too moderate; extremizing (pushing away from 50%) corrects for shared-information dilution
3. The Surprisingly Popular algorithm extracts hidden knowledge by comparing actual answers to predicted answers; minority knowledge surfaces
4. Performance-based weighting dominates expertise-declared weighting; past accuracy is the best predictor of future accuracy
5. Bayesian weighting naturally upweights accurate forecasters and downweights inaccurate ones as data accumulates
6. Cognitive bias correction (anchoring, availability, groupthink) should be applied during aggregation, not expected from individual forecasters
7. Structured elicitation (base rate first, then adjustments, then devil's advocate) produces more calibrated individual forecasts before they even enter aggregation
8. The optimal aggregation method depends on the forecaster pool: equal weighting for diverse novices, performance weighting for tracked experts, extremizing for all
skilldb get prediction-skills/crowd-wisdom-aggregationFull skill: 674 lines
Paste into your CLAUDE.md or agent config

Crowd Wisdom Aggregation

Overview

Crowd wisdom aggregation transforms individual judgments from diverse forecasters into collective predictions that consistently outperform any individual. This goes beyond simple averaging: structured techniques like the Delphi method, extremizing aggregation, the surprisingly popular algorithm, and expert weighting can extract signal from noise, correct for cognitive biases, and identify hidden knowledge in the crowd. Getting aggregation right is often more valuable than improving any individual forecast.

The Delphi Method

Classical Delphi

The Delphi method uses iterative rounds of anonymous forecasting with controlled feedback to achieve convergent, calibrated group estimates:

class DelphiProcess:
    """
    Multi-round Delphi forecasting process.

    Key features:
    - Anonymity: prevents dominance by authority or personality
    - Iteration: multiple rounds allow learning from others
    - Controlled feedback: participants see group statistics, not individuals
    - Statistical aggregation: final answer uses median and interquartile range
    """

    def __init__(self, question: str, n_rounds: int = 3):
        self.question = question
        self.n_rounds = n_rounds
        self.rounds = []
        self.participants = {}

    def run_round(self, responses: dict) -> dict:
        """
        Process one round of Delphi responses.

        responses: {participant_id: {
            'estimate': float,
            'confidence': float,  # 0-1
            'reasoning': str
        }}
        """
        estimates = [r['estimate'] for r in responses.values()]
        confidences = [r['confidence'] for r in responses.values()]

        round_summary = {
            'round_number': len(self.rounds) + 1,
            'n_responses': len(responses),
            'median': np.median(estimates),
            'mean': np.mean(estimates),
            'q25': np.percentile(estimates, 25),
            'q75': np.percentile(estimates, 75),
            'iqr': np.percentile(estimates, 75) - np.percentile(estimates, 25),
            'std': np.std(estimates),
            'avg_confidence': np.mean(confidences),
            'responses': responses,
            'convergence': self._measure_convergence(estimates)
        }

        self.rounds.append(round_summary)
        return round_summary

    def generate_feedback(self, round_num: int) -> dict:
        """Generate anonymous feedback for the next round."""
        summary = self.rounds[round_num]

        # Identify outliers to ask for reasoning
        median = summary['median']
        iqr = summary['iqr']
        outliers = {
            pid: resp for pid, resp in summary['responses'].items()
            if abs(resp['estimate'] - median) > 1.5 * iqr
        }

        return {
            'group_statistics': {
                'median': summary['median'],
                'interquartile_range': (summary['q25'], summary['q75']),
                'number_of_respondents': summary['n_responses']
            },
            'your_position': 'To be personalized per participant',
            'outlier_reasoning': [
                r['reasoning'] for r in outliers.values()
            ] if outliers else [],
            'instruction': (
                'Please reconsider your estimate in light of the group statistics. '
                'You may maintain your original estimate if you believe it is well-founded, '
                'or adjust it. Please provide updated reasoning.'
            )
        }

    def _measure_convergence(self, estimates: list) -> float:
        """Measure how much the group has converged (0=dispersed, 1=converged)."""
        if len(estimates) < 2:
            return 0
        cv = np.std(estimates) / abs(np.mean(estimates)) if np.mean(estimates) != 0 else 1
        return max(0, 1 - cv)

    def final_result(self) -> dict:
        """Extract the final Delphi result."""
        last_round = self.rounds[-1]

        # Check convergence across rounds
        convergence_trend = [r['convergence'] for r in self.rounds]
        converged = len(convergence_trend) > 1 and convergence_trend[-1] > convergence_trend[0]

        return {
            'best_estimate': last_round['median'],
            'uncertainty_range': (last_round['q25'], last_round['q75']),
            'n_rounds': len(self.rounds),
            'converged': converged,
            'convergence_trend': convergence_trend,
            'final_spread': last_round['iqr'],
            'n_participants': last_round['n_responses']
        }

Modified Delphi Variants

class PolicyDelphi:
    """
    Policy Delphi: designed for policy analysis, not consensus.
    Goal is to explore ALL defensible positions, not converge.
    """

    def __init__(self, policy_question: str, options: list):
        self.question = policy_question
        self.options = options
        self.rounds = []

    def collect_votes(self, responses: dict) -> dict:
        """
        Responses include: desirability, feasibility, and importance ratings.
        """
        results = {}
        for option in self.options:
            option_responses = {
                pid: resp[option] for pid, resp in responses.items()
                if option in resp
            }
            if option_responses:
                desirability = [r.get('desirability', 3) for r in option_responses.values()]
                feasibility = [r.get('feasibility', 3) for r in option_responses.values()]
                importance = [r.get('importance', 3) for r in option_responses.values()]

                results[option] = {
                    'avg_desirability': np.mean(desirability),
                    'avg_feasibility': np.mean(feasibility),
                    'avg_importance': np.mean(importance),
                    'consensus': np.std(desirability) < 1.0,
                    'polarized': np.std(desirability) > 2.0,
                    'combined_score': (
                        np.mean(desirability) * 0.4 +
                        np.mean(feasibility) * 0.3 +
                        np.mean(importance) * 0.3
                    )
                }

        return results


class RealTimeDelphi:
    """
    Real-Time Delphi: continuous process without discrete rounds.
    Participants can update their estimates at any time and see
    running group statistics.
    """

    def __init__(self, question: str):
        self.question = question
        self.estimates = {}
        self.history = []

    def update_estimate(self, participant_id: str, estimate: float,
                         confidence: float, reasoning: str = ''):
        self.estimates[participant_id] = {
            'estimate': estimate,
            'confidence': confidence,
            'reasoning': reasoning,
            'timestamp': len(self.history)
        }
        self._record_state()

    def current_state(self) -> dict:
        estimates = [e['estimate'] for e in self.estimates.values()]
        confidences = [e['confidence'] for e in self.estimates.values()]

        if not estimates:
            return {'n_participants': 0}

        # Confidence-weighted aggregation
        weighted_sum = sum(
            e['estimate'] * e['confidence'] for e in self.estimates.values()
        )
        weight_total = sum(e['confidence'] for e in self.estimates.values())
        weighted_mean = weighted_sum / weight_total if weight_total > 0 else 0

        return {
            'n_participants': len(self.estimates),
            'simple_median': np.median(estimates),
            'weighted_mean': weighted_mean,
            'iqr': (np.percentile(estimates, 25), np.percentile(estimates, 75)),
            'convergence': 1 - np.std(estimates) / abs(np.mean(estimates)) if np.mean(estimates) != 0 else 0,
            'avg_confidence': np.mean(confidences)
        }

    def _record_state(self):
        self.history.append(self.current_state())

Extremizing Aggregation

Why Simple Averages Are Too Moderate

Simple averaging produces forecasts that are too close to 50%. This happens because each forecaster shares some common information and has some unique information. Averaging double-counts the common information while diluting the unique information.

class ExtremizingAggregator:
    """
    Extremize aggregate forecasts to correct for information dilution.
    Push the average probability away from 50%.
    """

    @staticmethod
    def extremize(probability: float, factor: float = 1.5) -> float:
        """
        Transform probability using extremizing factor.
        factor > 1: push away from 50% (correct for dilution)
        factor = 1: no change
        factor < 1: push toward 50% (reduce extremity)

        Uses log-odds transformation for proper behavior.
        """
        if probability <= 0.001 or probability >= 0.999:
            return probability

        # Convert to log-odds
        log_odds = np.log(probability / (1 - probability))

        # Extremize
        extremized_log_odds = log_odds * factor

        # Convert back
        result = 1 / (1 + np.exp(-extremized_log_odds))
        return np.clip(result, 0.01, 0.99)

    @staticmethod
    def optimal_extremizing_factor(probabilities: list, outcomes: list) -> float:
        """
        Find the optimal extremizing factor from historical data.
        Minimize Brier score over historical forecasts.
        """
        from scipy.optimize import minimize_scalar

        mean_probs = np.array(probabilities)
        outs = np.array(outcomes, dtype=float)

        def brier_at_factor(factor):
            extremized = np.array([
                ExtremizingAggregator.extremize(p, factor) for p in mean_probs
            ])
            return np.mean((extremized - outs) ** 2)

        result = minimize_scalar(brier_at_factor, bounds=(0.5, 3.0), method='bounded')
        return result.x

    @staticmethod
    def aggregate_and_extremize(forecaster_probs: list,
                                 extremizing_factor: float = None,
                                 n_forecasters: int = None) -> dict:
        """
        Aggregate multiple forecaster probabilities and extremize.

        If no explicit factor is given, use a heuristic based on
        the number of forecasters (more forecasters = more extremizing needed).
        """
        if not forecaster_probs:
            return {'error': 'No forecasts'}

        mean_prob = np.mean(forecaster_probs)

        if extremizing_factor is None:
            n = n_forecasters or len(forecaster_probs)
            # Heuristic from Baron et al. (2014)
            extremizing_factor = 1 + 0.15 * np.log(n)

        extremized = ExtremizingAggregator.extremize(mean_prob, extremizing_factor)

        return {
            'simple_average': mean_prob,
            'extremized': extremized,
            'factor_used': extremizing_factor,
            'shift': extremized - mean_prob,
            'n_forecasters': len(forecaster_probs)
        }

The Surprisingly Popular Algorithm

Leveraging Meta-Knowledge

class SurprisinglyPopular:
    """
    The Surprisingly Popular algorithm (Prelec, Seung & McCoy, 2017).

    Key insight: ask people not just for their answer, but also what
    they think OTHER people will answer. Answers that are more popular
    than people predict are more likely to be correct.

    This extracts knowledge from minorities who know things the
    majority doesn't.
    """

    def __init__(self):
        self.responses = []

    def add_response(self, respondent_id: str, own_answer: str,
                     predicted_distribution: dict):
        """
        own_answer: the respondent's own answer
        predicted_distribution: {'yes': 0.6, 'no': 0.4} — what they think
                                the overall distribution of answers will be
        """
        self.responses.append({
            'id': respondent_id,
            'answer': own_answer,
            'prediction': predicted_distribution
        })

    def find_surprisingly_popular(self) -> dict:
        """
        Compare actual vote distribution with predicted distribution.
        The answer that is MORE popular than predicted is the
        surprisingly popular answer — and more likely correct.
        """
        if not self.responses:
            return {'error': 'No responses'}

        # Actual distribution
        answer_counts = {}
        for r in self.responses:
            answer_counts[r['answer']] = answer_counts.get(r['answer'], 0) + 1

        total = len(self.responses)
        actual_distribution = {
            answer: count / total for answer, count in answer_counts.items()
        }

        # Average predicted distribution
        all_answers = list(actual_distribution.keys())
        predicted_distribution = {}
        for answer in all_answers:
            predictions = [
                r['prediction'].get(answer, 0) for r in self.responses
            ]
            predicted_distribution[answer] = np.mean(predictions)

        # Surprisingly popular: actual - predicted
        surprise_scores = {}
        for answer in all_answers:
            surprise = actual_distribution.get(answer, 0) - predicted_distribution.get(answer, 0)
            surprise_scores[answer] = surprise

        # The surprisingly popular answer
        sp_answer = max(surprise_scores, key=surprise_scores.get)

        return {
            'actual_distribution': actual_distribution,
            'predicted_distribution': predicted_distribution,
            'surprise_scores': surprise_scores,
            'surprisingly_popular_answer': sp_answer,
            'surprise_magnitude': surprise_scores[sp_answer],
            'majority_answer': max(actual_distribution, key=actual_distribution.get),
            'agrees_with_majority': sp_answer == max(actual_distribution, key=actual_distribution.get)
        }

    def continuous_version(self, estimates: list, predictions_of_others: list) -> dict:
        """
        Continuous version for probability estimates.
        Each person gives their estimate AND their prediction of the group average.

        The SP correction pushes the aggregate toward those who
        believe the crowd underestimates/overestimates.
        """
        estimates = np.array(estimates)
        predictions = np.array(predictions_of_others)

        actual_mean = np.mean(estimates)
        predicted_mean = np.mean(predictions)

        # Surprise direction: actual mean is higher/lower than predicted
        surprise = actual_mean - predicted_mean

        # SP-adjusted estimate pushes further in the surprise direction
        sp_estimate = actual_mean + surprise

        return {
            'simple_average': actual_mean,
            'average_of_predictions': predicted_mean,
            'surprise': surprise,
            'sp_adjusted_estimate': np.clip(sp_estimate, 0, 1),
            'direction': 'higher than expected' if surprise > 0 else 'lower than expected'
        }

Expert Weighting

How to Weight Forecasters

class ExpertWeighter:
    """Methods for weighting forecasters based on track record and expertise."""

    def __init__(self):
        self.forecasters = {}

    def register_forecaster(self, forecaster_id: str, track_record: dict = None):
        self.forecasters[forecaster_id] = {
            'track_record': track_record or {},
            'weight': 1.0,
            'n_forecasts': 0,
            'cumulative_score': 0
        }

    def update_track_record(self, forecaster_id: str, brier_score: float):
        f = self.forecasters[forecaster_id]
        f['n_forecasts'] += 1
        f['cumulative_score'] += brier_score

    def compute_weights(self, method: str = 'performance') -> dict:
        """Compute forecaster weights using specified method."""

        if method == 'performance':
            return self._performance_weights()
        elif method == 'expertise_declared':
            return self._expertise_weights()
        elif method == 'bayesian':
            return self._bayesian_weights()
        elif method == 'equal':
            return {fid: 1.0 / len(self.forecasters) for fid in self.forecasters}

        return {}

    def _performance_weights(self) -> dict:
        """Weight by inverse Brier score (better accuracy = higher weight)."""
        weights = {}
        for fid, f in self.forecasters.items():
            if f['n_forecasts'] > 0:
                avg_brier = f['cumulative_score'] / f['n_forecasts']
                # Inverse Brier: lower score = higher weight
                weights[fid] = 1 / max(avg_brier, 0.01)
            else:
                weights[fid] = 1.0  # Default for new forecasters

        # Normalize
        total = sum(weights.values())
        return {fid: w / total for fid, w in weights.items()}

    def _bayesian_weights(self) -> dict:
        """
        Bayesian weighting: update weights based on likelihood of
        observed outcomes under each forecaster's predictions.
        Start with equal priors, update with each resolved question.
        """
        weights = {fid: 1.0 for fid in self.forecasters}

        for fid, f in self.forecasters.items():
            # Weight = exp(-cumulative Brier)
            # Higher cumulative Brier (worse) = lower weight
            if f['n_forecasts'] > 0:
                avg_brier = f['cumulative_score'] / f['n_forecasts']
                weights[fid] = np.exp(-5 * avg_brier)  # Scaling factor for sensitivity

        total = sum(weights.values())
        return {fid: w / total for fid, w in weights.items()}

    def _expertise_weights(self) -> dict:
        """Weight by self-declared domain expertise (less reliable)."""
        weights = {}
        for fid, f in self.forecasters.items():
            expertise = f.get('track_record', {}).get('expertise_level', 0.5)
            weights[fid] = expertise

        total = sum(weights.values())
        return {fid: w / total for fid, w in weights.items()}

    def weighted_aggregate(self, forecasts: dict, method: str = 'performance') -> dict:
        """
        Produce a weighted aggregate forecast.
        forecasts: {forecaster_id: probability}
        """
        weights = self.compute_weights(method)

        weighted_sum = 0
        total_weight = 0

        for fid, prob in forecasts.items():
            w = weights.get(fid, 1.0 / len(forecasts))
            weighted_sum += prob * w
            total_weight += w

        result = weighted_sum / total_weight if total_weight > 0 else 0.5

        return {
            'weighted_aggregate': result,
            'simple_average': np.mean(list(forecasts.values())),
            'weights_used': weights,
            'method': method,
            'n_forecasters': len(forecasts)
        }

Dealing with Cognitive Biases

Structured Debiasing During Aggregation

class BiasAwareAggregator:
    """Aggregation methods that correct for known cognitive biases."""

    @staticmethod
    def correct_anchoring(estimates: list, anchor_value: float = None) -> list:
        """
        If an anchor was present (a starting number or previous estimate),
        adjust estimates away from it.
        """
        if anchor_value is None:
            return estimates

        adjusted = []
        for est in estimates:
            distance_from_anchor = est - anchor_value
            # Estimates close to the anchor are likely biased; pull them further away
            adjustment = distance_from_anchor * 0.3
            adjusted.append(est + adjustment)
        return adjusted

    @staticmethod
    def correct_availability_bias(probability: float,
                                   media_coverage_intensity: float) -> float:
        """
        Events with heavy media coverage are overestimated.
        Adjust downward when media attention is disproportionate.
        """
        # media_coverage_intensity: 0 (none) to 1 (saturating coverage)
        if media_coverage_intensity > 0.7:
            # Heavy coverage -> deflate probability
            deflation = (media_coverage_intensity - 0.7) * 0.15
            return max(0.01, probability - deflation)
        return probability

    @staticmethod
    def correct_groupthink(individual_estimates: list,
                           discussion_estimates: list) -> list:
        """
        After group discussion, estimates often converge too much.
        Restore some of the pre-discussion variance.
        """
        individual_var = np.var(individual_estimates)
        discussion_var = np.var(discussion_estimates)

        if discussion_var < individual_var * 0.5:
            # Too much convergence; partially restore spread
            mean = np.mean(discussion_estimates)
            target_var = individual_var * 0.7  # Aim for 70% of original spread
            scale = np.sqrt(target_var / max(discussion_var, 0.001))

            restored = [mean + (est - mean) * scale for est in discussion_estimates]
            return restored

        return discussion_estimates

    @staticmethod
    def pre_mortem_adjustment(probability: float, n_failure_modes: int) -> float:
        """
        Pre-mortem: imagine the forecast was wrong.
        How many ways could it fail? More failure modes = lower confidence.
        """
        adjustment = min(0.15, n_failure_modes * 0.03)

        if probability > 0.5:
            return probability - adjustment
        else:
            return probability + adjustment

Forecast Elicitation Best Practices

How to Ask for Forecasts

class ElicitationProtocol:
    """Best practices for eliciting forecasts from individuals."""

    @staticmethod
    def structured_elicitation(question: str) -> dict:
        """Generate a structured elicitation form."""
        return {
            'question': question,
            'steps': [
                {
                    'step': 1,
                    'instruction': 'Think of the BASE RATE. How often does this type of event happen?',
                    'field': 'base_rate_estimate'
                },
                {
                    'step': 2,
                    'instruction': 'List 3 factors that make this case MORE likely than the base rate.',
                    'field': 'factors_increasing'
                },
                {
                    'step': 3,
                    'instruction': 'List 3 factors that make this case LESS likely than the base rate.',
                    'field': 'factors_decreasing'
                },
                {
                    'step': 4,
                    'instruction': 'Provide your INITIAL probability estimate (0-100%).',
                    'field': 'initial_estimate'
                },
                {
                    'step': 5,
                    'instruction': 'Now imagine you are WRONG. What would the strongest argument against your estimate be?',
                    'field': 'devils_advocate'
                },
                {
                    'step': 6,
                    'instruction': 'Provide your FINAL probability estimate (0-100%).',
                    'field': 'final_estimate'
                },
                {
                    'step': 7,
                    'instruction': 'Rate your confidence in this estimate (1-5).',
                    'field': 'confidence'
                }
            ]
        }

    @staticmethod
    def probability_scale_guide() -> dict:
        """Help non-probabilists express uncertainty."""
        return {
            '99%': 'Virtually certain — as sure as you can be',
            '90%': 'Very likely — would be surprised if wrong',
            '75%': 'Likely — more probable than not, considerably',
            '60%': 'Lean yes — somewhat more likely than not',
            '50%': 'Coin flip — genuinely uncertain',
            '40%': 'Lean no — somewhat less likely than not',
            '25%': 'Unlikely — considerably more likely to not happen',
            '10%': 'Very unlikely — would be surprised if it happened',
            '1%':  'Virtually impossible — nearly certain it will not happen',
            'note': 'Avoid using exactly 50% unless truly indifferent. '
                    'Even slight leanings (51% or 49%) carry information.'
        }

Key Takeaways

  1. The Delphi method produces convergent group estimates through iterative anonymous feedback; 2-3 rounds typically suffice
  2. Simple averages are systematically too moderate; extremizing (pushing away from 50%) corrects for shared-information dilution
  3. The Surprisingly Popular algorithm extracts hidden knowledge by comparing actual answers to predicted answers; minority knowledge surfaces
  4. Performance-based weighting dominates expertise-declared weighting; past accuracy is the best predictor of future accuracy
  5. Bayesian weighting naturally upweights accurate forecasters and downweights inaccurate ones as data accumulates
  6. Cognitive bias correction (anchoring, availability, groupthink) should be applied during aggregation, not expected from individual forecasters
  7. Structured elicitation (base rate first, then adjustments, then devil's advocate) produces more calibrated individual forecasts before they even enter aggregation
  8. The optimal aggregation method depends on the forecaster pool: equal weighting for diverse novices, performance weighting for tracked experts, extremizing for all

Install this skill directly: skilldb add prediction-skills

Get CLI access →