Skip to main content
Technology & EngineeringElixir Phoenix264 lines

Concurrency

Elixir processes and message passing for concurrent and parallel programming

Quick Summary28 lines
You are an expert in Elixir concurrency primitives for building concurrent and parallel systems.

## Key Points

- **Process**: A lightweight, isolated unit of execution with its own memory. Created with `spawn/1` or higher-level abstractions.
- **Message Passing**: Processes communicate by sending messages to each other's mailbox. Messages are copies, not shared references.
- **Mailbox**: Each process has a FIFO queue of incoming messages. `receive` selectively pattern-matches on the mailbox.
- **Links and Monitors**: Links create a bidirectional failure coupling; monitors provide unidirectional failure notifications.
- **Task**: A higher-level abstraction for running a single computation asynchronously and retrieving its result.
- **Agent**: A simple wrapper around state that is accessed and updated via functions.
- Prefer `Task` and `Task.Supervisor` over raw `spawn` for most use cases. They provide better error handling, supervision, and result retrieval.
- Use `Task.async_stream/3` with `max_concurrency` for bounded parallel processing. It provides natural backpressure.
- Use `Process.monitor/1` rather than `Process.link/1` when you need to react to failures without crashing the monitoring process.
- Always set timeouts on `Task.await/2` and `GenServer.call/3`. The default of 5 seconds may be too short for real workloads.
- Use `System.schedulers_online/0` as a baseline for concurrency limits — it returns the number of CPU cores available.
- Keep messages small. Since messages are copied between process heaps, sending large binaries repeatedly is expensive (except for refc binaries over 64 bytes, which are reference-counted).

## Quick Example

```elixir
{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)

Agent.update(MyApp.Cache, &Map.put(&1, :key, "value"))

value = Agent.get(MyApp.Cache, &Map.get(&1, :key))
```
skilldb get elixir-phoenix-skills/ConcurrencyFull skill: 264 lines
Paste into your CLAUDE.md or agent config

Processes and Message Passing — Elixir/Phoenix

You are an expert in Elixir concurrency primitives for building concurrent and parallel systems.

Overview

Elixir runs on the BEAM virtual machine, which provides lightweight processes (not OS threads), preemptive scheduling, and message passing as the sole mechanism for inter-process communication. Each process has its own heap, communicates via asynchronous messages, and can be created cheaply — a typical BEAM node can run millions of processes concurrently.

Core Philosophy

Elixir's concurrency model is not an add-on — it is the foundation the entire language is built on. Every request in a Phoenix application already runs in its own process. Every GenServer, every Channel, every Task is a process. Understanding processes and message passing is not an advanced topic; it is the baseline for writing Elixir at all. The language gives you the primitives, but the discipline of using them correctly determines whether your system is robust or fragile.

The shared-nothing architecture of BEAM processes is a feature, not a limitation. Because processes communicate only through message passing and never share memory, you eliminate entire categories of concurrency bugs: no data races, no mutex deadlocks, no corrupted shared state. The tradeoff is that data must be explicitly sent between processes, which means you think carefully about what crosses process boundaries and how much copying is acceptable.

Supervision and fault tolerance go hand-in-hand with concurrency. A spawned process that crashes silently is worse than one that never existed, because its work is lost with no recovery path. The BEAM philosophy is "let it crash" — but only within a supervision tree that knows how to restart the failed process cleanly. Unsupervised concurrency is not concurrency; it is hope.

Anti-Patterns

  • Unsupervised Spawns: Using bare spawn/1 for work that matters. If the spawned process crashes, no one is notified and no recovery happens. Use Task.Supervisor for fire-and-forget work and Task.async under a supervisor for work whose result you need.

  • Unlimited Concurrency: Spawning a Task for every item in a large collection without bounding parallelism. This can overwhelm databases, exhaust connection pools, and flood external APIs. Always use Task.async_stream/3 with a max_concurrency option or a pool library.

  • Synchronous Call Cycles: Two GenServers calling each other with GenServer.call/2, creating a deadlock where each waits for the other to respond. Break the cycle by using cast for one direction, spawning an intermediary process, or restructuring the communication flow.

  • Large Messages Between Processes: Sending multi-megabyte data structures between processes on every message. Since messages are copied (except large refc binaries), this creates significant garbage collection pressure. Send references (PIDs, ETS table references, or database IDs) instead of full data when possible.

  • Ignoring Mailbox Growth: Building a process that receives messages faster than it can process them, without backpressure or monitoring. The mailbox grows unboundedly, consuming memory until the node crashes. Use GenServer.call for backpressure, or implement explicit flow control.

Core Concepts

  • Process: A lightweight, isolated unit of execution with its own memory. Created with spawn/1 or higher-level abstractions.
  • Message Passing: Processes communicate by sending messages to each other's mailbox. Messages are copies, not shared references.
  • Mailbox: Each process has a FIFO queue of incoming messages. receive selectively pattern-matches on the mailbox.
  • Links and Monitors: Links create a bidirectional failure coupling; monitors provide unidirectional failure notifications.
  • Task: A higher-level abstraction for running a single computation asynchronously and retrieving its result.
  • Agent: A simple wrapper around state that is accessed and updated via functions.

Implementation Patterns

Basic Process and Message Passing

# Spawn a process and send it a message
pid = spawn(fn ->
  receive do
    {:greet, name} ->
      IO.puts("Hello, #{name}!")

    :stop ->
      IO.puts("Stopping")
  end
end)

send(pid, {:greet, "Alice"})

Recursive Process with State

defmodule Counter do
  def start(initial \\ 0) do
    spawn(fn -> loop(initial) end)
  end

  defp loop(count) do
    receive do
      {:increment, amount} ->
        loop(count + amount)

      {:get, caller} ->
        send(caller, {:count, count})
        loop(count)

      :stop ->
        :ok
    end
  end
end

pid = Counter.start(0)
send(pid, {:increment, 5})
send(pid, {:get, self()})

receive do
  {:count, value} -> IO.puts("Count: #{value}")
end

Links and Monitors

# Links: bidirectional — if one crashes, the other is killed
pid = spawn_link(fn ->
  Process.sleep(1000)
  raise "boom"
end)

# Monitor: unidirectional — receive a :DOWN message instead of crashing
ref = Process.monitor(pid)

receive do
  {:DOWN, ^ref, :process, ^pid, reason} ->
    IO.puts("Process died: #{inspect(reason)}")
end

Task for Async/Await

# Simple async/await
task = Task.async(fn ->
  # expensive computation
  Enum.sum(1..1_000_000)
end)

result = Task.await(task, 10_000)

# Multiple tasks in parallel
urls = ["https://a.com", "https://b.com", "https://c.com"]

tasks =
  Enum.map(urls, fn url ->
    Task.async(fn -> HTTPClient.get(url) end)
  end)

results = Task.await_many(tasks, 15_000)

Task.Supervisor for Fault-Tolerant Concurrency

# In your supervision tree
children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

# Fire-and-forget
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
  MyApp.Emails.send_welcome(user)
end)

# Async with result, supervised
task =
  Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
    MyApp.Reports.generate(params)
  end)

report = Task.await(task)

# Async stream with concurrency limit
MyApp.TaskSupervisor
|> Task.Supervisor.async_stream(items, &process_item/1,
  max_concurrency: 10,
  ordered: false,
  timeout: 30_000
)
|> Enum.reduce([], fn
  {:ok, result}, acc -> [result | acc]
  {:exit, reason}, acc ->
    Logger.warning("Task failed: #{inspect(reason)}")
    acc
end)

Agent for Simple State

{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)

Agent.update(MyApp.Cache, &Map.put(&1, :key, "value"))

value = Agent.get(MyApp.Cache, &Map.get(&1, :key))

Process-Based Rate Limiter

defmodule MyApp.RateLimiter do
  use GenServer

  def start_link(opts) do
    max_per_second = Keyword.get(opts, :max_per_second, 10)
    GenServer.start_link(__MODULE__, max_per_second, name: __MODULE__)
  end

  def allow?(key) do
    GenServer.call(__MODULE__, {:check, key})
  end

  @impl true
  def init(max_per_second) do
    schedule_reset()
    {:ok, %{max: max_per_second, counts: %{}}}
  end

  @impl true
  def handle_call({:check, key}, _from, state) do
    current = Map.get(state.counts, key, 0)

    if current < state.max do
      {:reply, true, put_in(state, [:counts, key], current + 1)}
    else
      {:reply, false, state}
    end
  end

  @impl true
  def handle_info(:reset, state) do
    schedule_reset()
    {:noreply, %{state | counts: %{}}}
  end

  defp schedule_reset do
    Process.send_after(self(), :reset, 1_000)
  end
end

Parallel Map with Backpressure

defmodule MyApp.Parallel do
  @doc "Maps over a collection with bounded concurrency."
  def map(collection, fun, opts \\ []) do
    max_concurrency = Keyword.get(opts, :max_concurrency, System.schedulers_online())
    timeout = Keyword.get(opts, :timeout, 30_000)

    collection
    |> Task.async_stream(fun,
      max_concurrency: max_concurrency,
      timeout: timeout,
      ordered: true
    )
    |> Enum.map(fn {:ok, result} -> result end)
  end
end

# Usage
results = MyApp.Parallel.map(user_ids, &MyApp.Accounts.fetch_profile/1, max_concurrency: 20)

Best Practices

  • Prefer Task and Task.Supervisor over raw spawn for most use cases. They provide better error handling, supervision, and result retrieval.
  • Use Task.async_stream/3 with max_concurrency for bounded parallel processing. It provides natural backpressure.
  • Use Process.monitor/1 rather than Process.link/1 when you need to react to failures without crashing the monitoring process.
  • Always set timeouts on Task.await/2 and GenServer.call/3. The default of 5 seconds may be too short for real workloads.
  • Use System.schedulers_online/0 as a baseline for concurrency limits — it returns the number of CPU cores available.
  • Keep messages small. Since messages are copied between process heaps, sending large binaries repeatedly is expensive (except for refc binaries over 64 bytes, which are reference-counted).

Common Pitfalls

  • Unhandled messages filling the mailbox: A process that never reads certain message types accumulates them forever, leading to memory growth. Always include a catch-all clause or use selective receive carefully.
  • Deadlocks with synchronous calls: Two processes calling each other with GenServer.call will deadlock. Break cycles by using cast or spawning an intermediary.
  • Spawning unsupervised processes: A spawn that crashes vanishes silently. Always supervise long-lived processes. For short tasks, use Task.Supervisor.
  • Shared-nothing surprise: Data passed between processes is copied. Modifying a large data structure in one process does not affect the copy in another.
  • Blocking the scheduler: A single process running a long CPU-bound computation (tight loop, NIF) can starve other processes on the same scheduler. Use Process.sleep(0) to yield, or run heavy work in a dirty scheduler.
  • Too many concurrent tasks: Spawning millions of tasks hitting a database or external API at once will overwhelm the target. Always use max_concurrency or a pool.

Install this skill directly: skilldb add elixir-phoenix-skills

Get CLI access →