Skip to main content
UncategorizedPrediction507 lines

Swarm Intelligence Forecasting

Quick Summary18 lines
Swarm intelligence forecasting harnesses the collective decision-making behavior observed in biological swarms (bees, fish, birds) and applies it to human and AI groups to produce forecasts that consistently outperform individual experts and traditional polls. Unlike simple averaging (wisdom of crowds), swarm intelligence systems create real-time feedback loops where participants influence each other dynamically, converging on answers that reflect the group's collective knowledge.

## Key Points

1. **Scout bees** explore potential sites independently
2. **Waggle dances** communicate site quality to the swarm
3. **Recruitment** — better sites attract more scouts through more vigorous dances
4. **Quorum sensing** — once enough scouts commit to a site, the swarm moves
5. **Cross-inhibition** — scouts for competing sites gradually stop dancing
- **Diversity of exploration**: Many agents independently sample the solution space
- **Local interaction**: Agents influence nearby agents, not the whole group at once
- **Positive feedback**: Good solutions attract more attention
- **Negative feedback**: Poor solutions are gradually abandoned
- **Quorum threshold**: Decisions crystallize when confidence reaches a threshold
- **NFL game predictions**: Swarms of football fans outperformed ESPN experts and Vegas spreads in multiple seasons
- **Oscar predictions**: A swarm of 50 movie fans correctly predicted 76% of Oscar categories vs 64% for the NYT critic
skilldb get prediction-skills/swarm-intelligence-forecastingFull skill: 507 lines
Paste into your CLAUDE.md or agent config

Swarm Intelligence Forecasting

Overview

Swarm intelligence forecasting harnesses the collective decision-making behavior observed in biological swarms (bees, fish, birds) and applies it to human and AI groups to produce forecasts that consistently outperform individual experts and traditional polls. Unlike simple averaging (wisdom of crowds), swarm intelligence systems create real-time feedback loops where participants influence each other dynamically, converging on answers that reflect the group's collective knowledge.

Biological Foundations

How Natural Swarms Decide

Honeybee swarms choose new nest sites through a process that neuroscientists have compared to how the human brain makes decisions:

  1. Scout bees explore potential sites independently
  2. Waggle dances communicate site quality to the swarm
  3. Recruitment — better sites attract more scouts through more vigorous dances
  4. Quorum sensing — once enough scouts commit to a site, the swarm moves
  5. Cross-inhibition — scouts for competing sites gradually stop dancing

This process reliably selects the best option 80%+ of the time, even when individual scouts only visit 1-2 sites.

Key Principles from Biology

  • Diversity of exploration: Many agents independently sample the solution space
  • Local interaction: Agents influence nearby agents, not the whole group at once
  • Positive feedback: Good solutions attract more attention
  • Negative feedback: Poor solutions are gradually abandoned
  • Quorum threshold: Decisions crystallize when confidence reaches a threshold

Artificial Swarm Intelligence (ASI)

The Unanimous AI Approach

Unanimous AI (now Thinkscape) pioneered Artificial Swarm Intelligence, a system called Swarm AI that connects human groups into real-time swarms. Unlike polls or votes, participants simultaneously negotiate an answer by pulling a virtual "puck" toward their preferred option, creating a dynamic system with continuous feedback.

How Swarm AI Works

Step 1: Question posed to the swarm
        "What will the S&P 500 do this quarter?"
        Options: [Strong Up, Up, Flat, Down, Strong Down]

Step 2: Participants connect via web interface
        Each person controls a magnetic "pull"
        All pulls act on a central puck simultaneously

Step 3: Real-time negotiation
        - Puck moves toward consensus
        - Participants see puck movement and adjust
        - Confident participants pull harder
        - Uncertain participants ease off
        - System converges in 30-60 seconds

Step 4: Answer and confidence
        - Final puck position = group answer
        - Convergence speed = confidence level
        - Oscillation patterns = disagreement indicators

Why Swarms Beat Polls

FeatureTraditional PollSwarm Intelligence
InteractionNone (independent votes)Real-time mutual influence
WeightingEqual or predeterminedEmergent from conviction
ConvergenceAggregation after collectionDynamic real-time convergence
Information flowOne-way (individual to system)Bidirectional feedback loops
ConfidenceSelf-reported (unreliable)Revealed through behavior
ResultDistribution of opinionsNegotiated consensus

Conviction Weighting

The critical innovation: participants reveal their conviction through their behavior, not self-reporting:

class SwarmParticipant:
    """Model of a single swarm participant's behavior."""

    def __init__(self, participant_id: str):
        self.id = participant_id
        self.pull_vector = (0.0, 0.0)  # Direction and magnitude
        self.consistency = 0.0  # How stable their pull is over time
        self.reaction_time = 0.0  # How quickly they engage

    def compute_effective_influence(self) -> float:
        """
        Influence is determined by behavioral signals, not self-report.
        Confident participants: pull hard, consistently, quickly.
        Uncertain participants: pull weakly, vacillate, delay.
        """
        magnitude = math.sqrt(self.pull_vector[0]**2 + self.pull_vector[1]**2)
        return magnitude * self.consistency * (1 / max(self.reaction_time, 0.1))


class SwarmEngine:
    """Simplified model of a Swarm AI negotiation engine."""

    def __init__(self, options: list[str]):
        self.options = options
        self.puck_position = [0.0, 0.0]  # Center of option space
        self.participants = []
        self.option_positions = self._layout_options()
        self.history = []

    def _layout_options(self) -> dict:
        """Place options in a circle around the center."""
        n = len(self.options)
        positions = {}
        for i, option in enumerate(self.options):
            angle = 2 * math.pi * i / n
            positions[option] = (math.cos(angle), math.sin(angle))
        return positions

    def step(self, dt: float = 0.1):
        """Advance the simulation by one timestep."""
        total_force = [0.0, 0.0]
        total_weight = 0.0

        for participant in self.participants:
            influence = participant.compute_effective_influence()
            total_force[0] += participant.pull_vector[0] * influence
            total_force[1] += participant.pull_vector[1] * influence
            total_weight += influence

        if total_weight > 0:
            # Normalize and apply with damping
            damping = 0.8
            self.puck_position[0] += (total_force[0] / total_weight) * dt * damping
            self.puck_position[1] += (total_force[1] / total_weight) * dt * damping

        self.history.append(tuple(self.puck_position))

    def get_result(self) -> dict:
        """Determine which option the puck is closest to."""
        distances = {}
        for option, pos in self.option_positions.items():
            dx = self.puck_position[0] - pos[0]
            dy = self.puck_position[1] - pos[1]
            distances[option] = math.sqrt(dx**2 + dy**2)

        closest = min(distances, key=distances.get)
        convergence = 1.0 - min(distances.values())  # How close to an option

        return {
            'answer': closest,
            'confidence': convergence,
            'oscillation': self._measure_oscillation()
        }

    def _measure_oscillation(self) -> float:
        """High oscillation = high disagreement."""
        if len(self.history) < 10:
            return 0.0
        recent = self.history[-10:]
        direction_changes = 0
        for i in range(2, len(recent)):
            dx1 = recent[i][0] - recent[i-1][0]
            dx2 = recent[i-1][0] - recent[i-2][0]
            if dx1 * dx2 < 0:  # Direction reversal
                direction_changes += 1
        return direction_changes / (len(recent) - 2)

Consensus Formation Algorithms

Particle Swarm Optimization (PSO) for Forecasting

PSO can optimize forecast parameters by treating each possible forecast as a particle:

import numpy as np

class ForecastPSO:
    """Use Particle Swarm Optimization to find consensus forecast."""

    def __init__(self, n_particles: int, n_dimensions: int):
        self.n_particles = n_particles
        self.positions = np.random.uniform(0, 1, (n_particles, n_dimensions))
        self.velocities = np.random.uniform(-0.1, 0.1, (n_particles, n_dimensions))
        self.personal_best_positions = self.positions.copy()
        self.personal_best_scores = np.full(n_particles, float('inf'))
        self.global_best_position = None
        self.global_best_score = float('inf')

        # PSO parameters
        self.w = 0.7    # Inertia weight
        self.c1 = 1.5   # Cognitive (personal best) attraction
        self.c2 = 1.5   # Social (global best) attraction

    def evaluate(self, fitness_fn):
        """Evaluate all particles against the fitness function."""
        for i in range(self.n_particles):
            score = fitness_fn(self.positions[i])
            if score < self.personal_best_scores[i]:
                self.personal_best_scores[i] = score
                self.personal_best_positions[i] = self.positions[i].copy()
            if score < self.global_best_score:
                self.global_best_score = score
                self.global_best_position = self.positions[i].copy()

    def update(self):
        """Update velocities and positions."""
        r1 = np.random.random(self.positions.shape)
        r2 = np.random.random(self.positions.shape)

        cognitive = self.c1 * r1 * (self.personal_best_positions - self.positions)
        social = self.c2 * r2 * (self.global_best_position - self.positions)

        self.velocities = self.w * self.velocities + cognitive + social
        self.positions += self.velocities
        self.positions = np.clip(self.positions, 0, 1)  # Keep in bounds

    def optimize(self, fitness_fn, n_iterations: int = 100):
        for _ in range(n_iterations):
            self.evaluate(fitness_fn)
            self.update()
        return self.global_best_position, self.global_best_score

Ant Colony Optimization for Path-Based Forecasting

class AntColonyForecaster:
    """
    Model forecast scenarios as paths through a decision graph.
    Ants explore paths; pheromone accumulates on likely scenarios.
    """

    def __init__(self, decision_nodes: dict, n_ants: int = 50):
        self.nodes = decision_nodes  # {node: [possible_next_nodes]}
        self.n_ants = n_ants
        self.pheromone = {}  # (from_node, to_node) -> strength
        self.evaporation_rate = 0.1
        self.alpha = 1.0  # Pheromone importance
        self.beta = 2.0   # Heuristic importance

        # Initialize pheromone
        for node, neighbors in decision_nodes.items():
            for neighbor in neighbors:
                self.pheromone[(node, neighbor)] = 1.0

    def run_ant(self, start_node: str, heuristic_fn) -> list:
        """Single ant traverses the decision graph."""
        path = [start_node]
        current = start_node

        while current in self.nodes and self.nodes[current]:
            neighbors = self.nodes[current]
            probabilities = []
            for neighbor in neighbors:
                tau = self.pheromone.get((current, neighbor), 0.01) ** self.alpha
                eta = heuristic_fn(current, neighbor) ** self.beta
                probabilities.append(tau * eta)

            total = sum(probabilities)
            probabilities = [p / total for p in probabilities]

            chosen = np.random.choice(neighbors, p=probabilities)
            path.append(chosen)
            current = chosen

        return path

    def update_pheromone(self, paths: list, scores: list):
        """Evaporate and deposit pheromone based on path quality."""
        # Evaporation
        for key in self.pheromone:
            self.pheromone[key] *= (1 - self.evaporation_rate)

        # Deposit
        for path, score in zip(paths, scores):
            deposit = 1.0 / max(score, 0.01)
            for i in range(len(path) - 1):
                edge = (path[i], path[i+1])
                self.pheromone[edge] = self.pheromone.get(edge, 0) + deposit

    def forecast(self, start: str, heuristic_fn, n_iterations: int = 50) -> dict:
        """Run the colony and return path frequency distribution."""
        path_counts = {}

        for _ in range(n_iterations):
            paths = []
            scores = []
            for _ in range(self.n_ants):
                path = self.run_ant(start, heuristic_fn)
                path_key = "->".join(path)
                paths.append(path)
                scores.append(heuristic_fn(path[0], path[-1]))
                path_counts[path_key] = path_counts.get(path_key, 0) + 1

            self.update_pheromone(paths, scores)

        total_runs = sum(path_counts.values())
        return {
            path: count / total_runs
            for path, count in sorted(
                path_counts.items(), key=lambda x: -x[1]
            )[:10]
        }

Real-Time Synchronization of Distributed Inputs

Architecture for Distributed Swarm Forecasting

import asyncio
import websockets
import json

class DistributedSwarmServer:
    """WebSocket server for real-time swarm forecasting sessions."""

    def __init__(self, question: str, options: list[str], duration: int = 60):
        self.question = question
        self.engine = SwarmEngine(options)
        self.duration = duration
        self.clients = {}
        self.running = False

    async def register(self, websocket, participant_id: str):
        """Register a new swarm participant."""
        participant = SwarmParticipant(participant_id)
        self.engine.participants.append(participant)
        self.clients[participant_id] = {
            'websocket': websocket,
            'participant': participant
        }
        await websocket.send(json.dumps({
            'type': 'registered',
            'question': self.question,
            'options': self.engine.options,
            'option_positions': {
                k: list(v) for k, v in self.engine.option_positions.items()
            }
        }))

    async def handle_input(self, participant_id: str, data: dict):
        """Process a participant's pull vector update."""
        participant = self.clients[participant_id]['participant']
        participant.pull_vector = (data['dx'], data['dy'])
        participant.consistency = data.get('consistency', 0.5)
        participant.reaction_time = data.get('reaction_time', 1.0)

    async def broadcast_state(self):
        """Send current puck position to all participants."""
        state = {
            'type': 'state_update',
            'puck': list(self.engine.puck_position),
            'n_participants': len(self.clients),
            'time_remaining': self.duration
        }
        message = json.dumps(state)
        await asyncio.gather(*[
            client['websocket'].send(message)
            for client in self.clients.values()
        ])

    async def run_session(self):
        """Run a complete swarm session."""
        self.running = True
        steps = 0
        while self.duration > 0 and self.running:
            self.engine.step(dt=0.05)
            if steps % 3 == 0:  # Broadcast every 150ms
                await self.broadcast_state()
            await asyncio.sleep(0.05)
            self.duration -= 0.05
            steps += 1

        result = self.engine.get_result()
        await self._broadcast_result(result)
        return result

Latency Compensation

Distributed swarms must handle network latency. Key strategies:

class LatencyCompensator:
    """Compensate for network delays in distributed swarm inputs."""

    def __init__(self, max_latency_ms: int = 500):
        self.max_latency = max_latency_ms
        self.input_buffer = []

    def add_input(self, participant_id: str, vector: tuple,
                  client_timestamp: float, server_timestamp: float):
        """Buffer and timestamp-adjust incoming inputs."""
        latency = (server_timestamp - client_timestamp) * 1000

        if latency > self.max_latency:
            # Input too stale, apply with decay
            decay = max(0, 1 - (latency - self.max_latency) / 1000)
            vector = (vector[0] * decay, vector[1] * decay)

        self.input_buffer.append({
            'participant': participant_id,
            'vector': vector,
            'latency_ms': latency,
            'weight': 1.0 / max(1, latency / 100)  # Down-weight laggy inputs
        })

    def get_adjusted_forces(self) -> list:
        """Return latency-weighted input vectors."""
        return self.input_buffer

Documented Performance Results

Unanimous AI Research Findings

Studies have shown swarm intelligence forecasting outperforming:

  • NFL game predictions: Swarms of football fans outperformed ESPN experts and Vegas spreads in multiple seasons
  • Oscar predictions: A swarm of 50 movie fans correctly predicted 76% of Oscar categories vs 64% for the NYT critic
  • Financial forecasting: Swarms of amateur traders outperformed individual analysts in weekly stock direction predictions
  • Medical diagnosis: Groups of radiologists in swarm configuration showed 33% improvement in diagnostic accuracy over independent votes

Why Amplification Occurs

The amplification effect happens because swarms:

  1. Surface tacit knowledge: Participants act on gut feelings they cannot articulate
  2. Filter noise: Random errors cancel out while signal reinforces
  3. Enable conviction signaling: Those who know more pull harder, naturally weighting expertise
  4. Create emergent reasoning: The real-time negotiation process generates insights no individual had
  5. Avoid herding: Unlike sequential polls, simultaneous participation prevents anchoring bias

Hybrid AI-Human Swarms

Combining LLM Agents with Human Participants

class HybridSwarm:
    """Combine human participants with AI agents in a swarm."""

    def __init__(self, question: str, options: list[str]):
        self.engine = SwarmEngine(options)
        self.human_participants = []
        self.ai_agents = []

    def add_ai_agent(self, model_name: str, specialty: str, weight: float = 0.5):
        """Add an AI agent to the swarm with capped influence."""
        agent = SwarmParticipant(f"ai_{model_name}_{specialty}")
        # AI agents get reduced max influence to prevent domination
        agent._max_influence = weight
        self.ai_agents.append({
            'participant': agent,
            'model': model_name,
            'specialty': specialty
        })
        self.engine.participants.append(agent)

    async def get_ai_pull(self, agent_info: dict, current_state: dict) -> tuple:
        """Query an LLM for its pull vector given the current swarm state."""
        prompt = f"""You are participating in a swarm forecast.
Question: {current_state['question']}
Options: {current_state['options']}
Current puck position suggests: {current_state['current_leading']}
Your specialty: {agent_info['specialty']}

Rate your confidence in each option (0-10) and explain briefly.
Respond as JSON: {{"preferred_option": "...", "confidence": 0-10}}"""

        # Call LLM and parse response
        response = await call_llm(agent_info['model'], prompt)
        preferred = response['preferred_option']
        confidence = response['confidence'] / 10.0

        # Convert to pull vector toward preferred option
        option_pos = self.engine.option_positions[preferred]
        dx = option_pos[0] - self.engine.puck_position[0]
        dy = option_pos[1] - self.engine.puck_position[1]
        magnitude = math.sqrt(dx**2 + dy**2)

        if magnitude > 0:
            return (dx/magnitude * confidence, dy/magnitude * confidence)
        return (0, 0)

Design Principles for Effective Swarms

  1. Optimal group size: 20-50 participants for most forecasting tasks
  2. Diversity matters more than expertise: Include diverse viewpoints, not just domain experts
  3. Session duration: 45-90 seconds per question optimal for human swarms
  4. Question framing: Use concrete, specific questions with clear options
  5. Independence before swarming: Collect initial independent estimates, then swarm for the final answer
  6. Repeated sessions: Run multiple swarm sessions and average for higher accuracy
  7. Real-time feedback: Participants must see the puck moving; delayed feedback destroys swarm dynamics

Key Takeaways

  1. Swarm intelligence differs fundamentally from polling or voting: it creates a real-time dynamical system where conviction naturally weights contributions
  2. The biological inspiration (honeybees, fish schools) provides proven algorithms for robust group decision-making
  3. Swarm forecasting consistently amplifies group accuracy by 20-30% beyond simple averaging
  4. Hybrid human-AI swarms can combine the strengths of both: human intuition and tacit knowledge with AI's data processing and consistency
  5. Latency management is critical for distributed swarms; the real-time feedback loop is what makes the method work
  6. Group diversity and proper session design matter more than individual participant expertise

Install this skill directly: skilldb add prediction-skills

Get CLI access →