Concurrency
Elixir processes and message passing for concurrent and parallel programming
You are an expert in Elixir concurrency primitives for building concurrent and parallel systems.
## Key Points
- **Process**: A lightweight, isolated unit of execution with its own memory. Created with `spawn/1` or higher-level abstractions.
- **Message Passing**: Processes communicate by sending messages to each other's mailbox. Messages are copies, not shared references.
- **Mailbox**: Each process has a FIFO queue of incoming messages. `receive` selectively pattern-matches on the mailbox.
- **Links and Monitors**: Links create a bidirectional failure coupling; monitors provide unidirectional failure notifications.
- **Task**: A higher-level abstraction for running a single computation asynchronously and retrieving its result.
- **Agent**: A simple wrapper around state that is accessed and updated via functions.
- Prefer `Task` and `Task.Supervisor` over raw `spawn` for most use cases. They provide better error handling, supervision, and result retrieval.
- Use `Task.async_stream/3` with `max_concurrency` for bounded parallel processing. It provides natural backpressure.
- Use `Process.monitor/1` rather than `Process.link/1` when you need to react to failures without crashing the monitoring process.
- Always set timeouts on `Task.await/2` and `GenServer.call/3`. The default of 5 seconds may be too short for real workloads.
- Use `System.schedulers_online/0` as a baseline for concurrency limits — it returns the number of CPU cores available.
- Keep messages small. Since messages are copied between process heaps, sending large binaries repeatedly is expensive (except for refc binaries over 64 bytes, which are reference-counted).
## Quick Example
```elixir
{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)
Agent.update(MyApp.Cache, &Map.put(&1, :key, "value"))
value = Agent.get(MyApp.Cache, &Map.get(&1, :key))
```skilldb get elixir-phoenix-skills/ConcurrencyFull skill: 264 linesProcesses and Message Passing — Elixir/Phoenix
You are an expert in Elixir concurrency primitives for building concurrent and parallel systems.
Overview
Elixir runs on the BEAM virtual machine, which provides lightweight processes (not OS threads), preemptive scheduling, and message passing as the sole mechanism for inter-process communication. Each process has its own heap, communicates via asynchronous messages, and can be created cheaply — a typical BEAM node can run millions of processes concurrently.
Core Philosophy
Elixir's concurrency model is not an add-on — it is the foundation the entire language is built on. Every request in a Phoenix application already runs in its own process. Every GenServer, every Channel, every Task is a process. Understanding processes and message passing is not an advanced topic; it is the baseline for writing Elixir at all. The language gives you the primitives, but the discipline of using them correctly determines whether your system is robust or fragile.
The shared-nothing architecture of BEAM processes is a feature, not a limitation. Because processes communicate only through message passing and never share memory, you eliminate entire categories of concurrency bugs: no data races, no mutex deadlocks, no corrupted shared state. The tradeoff is that data must be explicitly sent between processes, which means you think carefully about what crosses process boundaries and how much copying is acceptable.
Supervision and fault tolerance go hand-in-hand with concurrency. A spawned process that crashes silently is worse than one that never existed, because its work is lost with no recovery path. The BEAM philosophy is "let it crash" — but only within a supervision tree that knows how to restart the failed process cleanly. Unsupervised concurrency is not concurrency; it is hope.
Anti-Patterns
-
Unsupervised Spawns: Using bare
spawn/1for work that matters. If the spawned process crashes, no one is notified and no recovery happens. UseTask.Supervisorfor fire-and-forget work andTask.asyncunder a supervisor for work whose result you need. -
Unlimited Concurrency: Spawning a Task for every item in a large collection without bounding parallelism. This can overwhelm databases, exhaust connection pools, and flood external APIs. Always use
Task.async_stream/3with amax_concurrencyoption or a pool library. -
Synchronous Call Cycles: Two GenServers calling each other with
GenServer.call/2, creating a deadlock where each waits for the other to respond. Break the cycle by usingcastfor one direction, spawning an intermediary process, or restructuring the communication flow. -
Large Messages Between Processes: Sending multi-megabyte data structures between processes on every message. Since messages are copied (except large refc binaries), this creates significant garbage collection pressure. Send references (PIDs, ETS table references, or database IDs) instead of full data when possible.
-
Ignoring Mailbox Growth: Building a process that receives messages faster than it can process them, without backpressure or monitoring. The mailbox grows unboundedly, consuming memory until the node crashes. Use
GenServer.callfor backpressure, or implement explicit flow control.
Core Concepts
- Process: A lightweight, isolated unit of execution with its own memory. Created with
spawn/1or higher-level abstractions. - Message Passing: Processes communicate by sending messages to each other's mailbox. Messages are copies, not shared references.
- Mailbox: Each process has a FIFO queue of incoming messages.
receiveselectively pattern-matches on the mailbox. - Links and Monitors: Links create a bidirectional failure coupling; monitors provide unidirectional failure notifications.
- Task: A higher-level abstraction for running a single computation asynchronously and retrieving its result.
- Agent: A simple wrapper around state that is accessed and updated via functions.
Implementation Patterns
Basic Process and Message Passing
# Spawn a process and send it a message
pid = spawn(fn ->
receive do
{:greet, name} ->
IO.puts("Hello, #{name}!")
:stop ->
IO.puts("Stopping")
end
end)
send(pid, {:greet, "Alice"})
Recursive Process with State
defmodule Counter do
def start(initial \\ 0) do
spawn(fn -> loop(initial) end)
end
defp loop(count) do
receive do
{:increment, amount} ->
loop(count + amount)
{:get, caller} ->
send(caller, {:count, count})
loop(count)
:stop ->
:ok
end
end
end
pid = Counter.start(0)
send(pid, {:increment, 5})
send(pid, {:get, self()})
receive do
{:count, value} -> IO.puts("Count: #{value}")
end
Links and Monitors
# Links: bidirectional — if one crashes, the other is killed
pid = spawn_link(fn ->
Process.sleep(1000)
raise "boom"
end)
# Monitor: unidirectional — receive a :DOWN message instead of crashing
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process died: #{inspect(reason)}")
end
Task for Async/Await
# Simple async/await
task = Task.async(fn ->
# expensive computation
Enum.sum(1..1_000_000)
end)
result = Task.await(task, 10_000)
# Multiple tasks in parallel
urls = ["https://a.com", "https://b.com", "https://c.com"]
tasks =
Enum.map(urls, fn url ->
Task.async(fn -> HTTPClient.get(url) end)
end)
results = Task.await_many(tasks, 15_000)
Task.Supervisor for Fault-Tolerant Concurrency
# In your supervision tree
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
# Fire-and-forget
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
MyApp.Emails.send_welcome(user)
end)
# Async with result, supervised
task =
Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
MyApp.Reports.generate(params)
end)
report = Task.await(task)
# Async stream with concurrency limit
MyApp.TaskSupervisor
|> Task.Supervisor.async_stream(items, &process_item/1,
max_concurrency: 10,
ordered: false,
timeout: 30_000
)
|> Enum.reduce([], fn
{:ok, result}, acc -> [result | acc]
{:exit, reason}, acc ->
Logger.warning("Task failed: #{inspect(reason)}")
acc
end)
Agent for Simple State
{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)
Agent.update(MyApp.Cache, &Map.put(&1, :key, "value"))
value = Agent.get(MyApp.Cache, &Map.get(&1, :key))
Process-Based Rate Limiter
defmodule MyApp.RateLimiter do
use GenServer
def start_link(opts) do
max_per_second = Keyword.get(opts, :max_per_second, 10)
GenServer.start_link(__MODULE__, max_per_second, name: __MODULE__)
end
def allow?(key) do
GenServer.call(__MODULE__, {:check, key})
end
@impl true
def init(max_per_second) do
schedule_reset()
{:ok, %{max: max_per_second, counts: %{}}}
end
@impl true
def handle_call({:check, key}, _from, state) do
current = Map.get(state.counts, key, 0)
if current < state.max do
{:reply, true, put_in(state, [:counts, key], current + 1)}
else
{:reply, false, state}
end
end
@impl true
def handle_info(:reset, state) do
schedule_reset()
{:noreply, %{state | counts: %{}}}
end
defp schedule_reset do
Process.send_after(self(), :reset, 1_000)
end
end
Parallel Map with Backpressure
defmodule MyApp.Parallel do
@doc "Maps over a collection with bounded concurrency."
def map(collection, fun, opts \\ []) do
max_concurrency = Keyword.get(opts, :max_concurrency, System.schedulers_online())
timeout = Keyword.get(opts, :timeout, 30_000)
collection
|> Task.async_stream(fun,
max_concurrency: max_concurrency,
timeout: timeout,
ordered: true
)
|> Enum.map(fn {:ok, result} -> result end)
end
end
# Usage
results = MyApp.Parallel.map(user_ids, &MyApp.Accounts.fetch_profile/1, max_concurrency: 20)
Best Practices
- Prefer
TaskandTask.Supervisorover rawspawnfor most use cases. They provide better error handling, supervision, and result retrieval. - Use
Task.async_stream/3withmax_concurrencyfor bounded parallel processing. It provides natural backpressure. - Use
Process.monitor/1rather thanProcess.link/1when you need to react to failures without crashing the monitoring process. - Always set timeouts on
Task.await/2andGenServer.call/3. The default of 5 seconds may be too short for real workloads. - Use
System.schedulers_online/0as a baseline for concurrency limits — it returns the number of CPU cores available. - Keep messages small. Since messages are copied between process heaps, sending large binaries repeatedly is expensive (except for refc binaries over 64 bytes, which are reference-counted).
Common Pitfalls
- Unhandled messages filling the mailbox: A process that never reads certain message types accumulates them forever, leading to memory growth. Always include a catch-all clause or use selective receive carefully.
- Deadlocks with synchronous calls: Two processes calling each other with
GenServer.callwill deadlock. Break cycles by usingcastor spawning an intermediary. - Spawning unsupervised processes: A
spawnthat crashes vanishes silently. Always supervise long-lived processes. For short tasks, useTask.Supervisor. - Shared-nothing surprise: Data passed between processes is copied. Modifying a large data structure in one process does not affect the copy in another.
- Blocking the scheduler: A single process running a long CPU-bound computation (tight loop, NIF) can starve other processes on the same scheduler. Use
Process.sleep(0)to yield, or run heavy work in a dirty scheduler. - Too many concurrent tasks: Spawning millions of tasks hitting a database or external API at once will overwhelm the target. Always use
max_concurrencyor a pool.
Install this skill directly: skilldb add elixir-phoenix-skills
Related Skills
Channels
Phoenix Channels and PubSub for real-time bidirectional communication
Deployment
Deploying Elixir/Phoenix applications with Mix releases, Docker, and Fly.io
Ecto
Ecto patterns for database schemas, queries, changesets, and migrations in Elixir
Genserver
GenServer patterns for stateful processes in Elixir OTP applications
Otp Supervision
OTP supervision tree design for building fault-tolerant Elixir applications
Phoenix Liveview
Phoenix LiveView patterns for building real-time, server-rendered interactive UIs