Skip to main content
Technology & EngineeringPython Patterns217 lines

Async Patterns

Asyncio patterns for concurrent I/O-bound programming in Python

Quick Summary18 lines
You are an expert in Python asyncio patterns for writing efficient, correct concurrent code.

## Key Points

- **Coroutines** are defined with `async def` and paused/resumed at `await` expressions.
- **Tasks** (`asyncio.create_task`) schedule coroutines for concurrent execution on the event loop.
- **`asyncio.gather`** runs multiple awaitables concurrently and collects results.
- **`asyncio.TaskGroup`** (Python 3.11+) provides structured concurrency with automatic cancellation on failure.
- **`asyncio.Semaphore`** limits concurrency to avoid overwhelming resources.
- **The event loop** is single-threaded; blocking calls freeze everything unless offloaded with `run_in_executor`.
- Use `asyncio.run()` as the single entry point; avoid manually managing the event loop.
- Prefer `TaskGroup` (3.11+) over bare `gather` for structured concurrency and cleaner error handling.
- Always set timeouts on network operations to prevent indefinite hangs.
- Use `asyncio.Semaphore` to bound concurrency when hitting rate-limited APIs or constrained resources.
- Offload CPU-bound or blocking I/O work to `run_in_executor` — never block the event loop.
- Use `async with` and `async for` for resource management and iteration in async contexts.
skilldb get python-patterns-skills/Async PatternsFull skill: 217 lines
Paste into your CLAUDE.md or agent config

Async Patterns — Python Patterns

You are an expert in Python asyncio patterns for writing efficient, correct concurrent code.

Overview

Python's asyncio module provides an event loop, coroutines, tasks, and synchronization primitives for cooperative multitasking. It excels at I/O-bound workloads — network requests, database queries, file operations — where threads would waste resources waiting. Understanding the event loop model and structured concurrency patterns is key to writing reliable async code.

Core Philosophy

Asynchronous programming in Python is fundamentally about cooperative multitasking — your code voluntarily yields control at await points so other tasks can progress. This is not parallelism in the traditional sense; it is concurrency achieved through an event loop that multiplexes I/O-bound operations on a single thread. The key mental model is that your program is a collection of interleaved conversations with external systems (networks, databases, file systems), and asyncio lets you manage all those conversations simultaneously without dedicating a thread to each one.

Structured concurrency is the modern discipline that should guide your async design. Rather than spawning tasks into the void and hoping they complete, you scope task lifetimes to well-defined blocks using TaskGroup or async with constructs. This ensures that errors propagate predictably, resources are cleaned up, and you never lose track of background work. Think of it as the async equivalent of structured programming — no goto, no fire-and-forget.

Async code should be written with back-pressure and resource bounds in mind from the start. An unbounded gather of ten thousand HTTP requests will overwhelm both your process and the remote server. Semaphores, queues with max sizes, and connection pooling are not afterthoughts — they are essential structural elements. The best async code looks almost boring: clear entry points, bounded concurrency, explicit timeouts, and straightforward error handling.

Core Concepts

  • Coroutines are defined with async def and paused/resumed at await expressions.
  • Tasks (asyncio.create_task) schedule coroutines for concurrent execution on the event loop.
  • asyncio.gather runs multiple awaitables concurrently and collects results.
  • asyncio.TaskGroup (Python 3.11+) provides structured concurrency with automatic cancellation on failure.
  • asyncio.Semaphore limits concurrency to avoid overwhelming resources.
  • The event loop is single-threaded; blocking calls freeze everything unless offloaded with run_in_executor.

Implementation Patterns

Basic concurrent execution

import asyncio
import aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["https://example.com/a", "https://example.com/b", "https://example.com/c"]
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*(fetch(session, u) for u in urls))
    return results

asyncio.run(main())

TaskGroup for structured concurrency (3.11+)

import asyncio

async def process_item(item: str) -> str:
    await asyncio.sleep(1)
    return item.upper()

async def main():
    results = []

    async with asyncio.TaskGroup() as tg:
        for item in ["alpha", "beta", "gamma"]:
            task = tg.create_task(process_item(item))
            results.append(task)

    # All tasks complete here or the group raises ExceptionGroup
    return [t.result() for t in results]

Semaphore to limit concurrency

import asyncio

async def rate_limited_fetch(sem: asyncio.Semaphore, url: str) -> str:
    async with sem:
        # At most N concurrent requests
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()

async def main():
    sem = asyncio.Semaphore(10)
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]
    tasks = [rate_limited_fetch(sem, url) for url in urls]
    results = await asyncio.gather(*tasks)

Producer-consumer with asyncio.Queue

import asyncio

async def producer(queue: asyncio.Queue, items: list[str]):
    for item in items:
        await queue.put(item)
    await queue.put(None)  # sentinel

async def consumer(queue: asyncio.Queue, worker_id: int):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            await queue.put(None)  # propagate sentinel
            break
        print(f"Worker {worker_id} processing {item}")
        await asyncio.sleep(0.5)
        queue.task_done()

async def main():
    queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=20)
    items = [f"task-{i}" for i in range(50)]

    producer_task = asyncio.create_task(producer(queue, items))
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(5)]

    await producer_task
    await queue.join()
    for c in consumers:
        c.cancel()

Running blocking code in executor

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io_operation(path: str) -> bytes:
    with open(path, "rb") as f:
        return f.read()

async def main():
    loop = asyncio.get_running_loop()
    # Offload blocking call to thread pool
    data = await loop.run_in_executor(None, blocking_io_operation, "/tmp/large_file.bin")
    # Or with a custom executor
    with ThreadPoolExecutor(max_workers=4) as pool:
        data = await loop.run_in_executor(pool, blocking_io_operation, "/tmp/large_file.bin")

Timeout and cancellation

import asyncio

async def slow_operation():
    await asyncio.sleep(30)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
    except asyncio.TimeoutError:
        print("Operation timed out")

    # Or with asyncio.timeout (3.11+)
    try:
        async with asyncio.timeout(5.0):
            result = await slow_operation()
    except TimeoutError:
        print("Operation timed out")

Async generator

import asyncio
from typing import AsyncIterator

async def paginated_fetch(url: str) -> AsyncIterator[dict]:
    page = 1
    while True:
        data = await fetch_page(url, page)
        if not data["items"]:
            break
        for item in data["items"]:
            yield item
        page += 1

async def main():
    async for item in paginated_fetch("https://api.example.com/items"):
        process(item)

Best Practices

  • Use asyncio.run() as the single entry point; avoid manually managing the event loop.
  • Prefer TaskGroup (3.11+) over bare gather for structured concurrency and cleaner error handling.
  • Always set timeouts on network operations to prevent indefinite hangs.
  • Use asyncio.Semaphore to bound concurrency when hitting rate-limited APIs or constrained resources.
  • Offload CPU-bound or blocking I/O work to run_in_executor — never block the event loop.
  • Use async with and async for for resource management and iteration in async contexts.

Common Pitfalls

  • Blocking the event loop with synchronous calls (e.g., time.sleep, requests.get, CPU work) freezes all concurrent tasks.
  • Forgetting to await a coroutine returns a coroutine object instead of executing it, with no error at the call site.
  • Fire-and-forget tasks created with create_task can be garbage collected; keep a reference to avoid silent cancellation.
  • Mixing asyncio.gather with tasks that raise — by default one exception propagates and others are cancelled; use return_exceptions=True if you want all results.
  • Nested asyncio.run() raises RuntimeError — use await within an already-running loop, or use asyncio.run_coroutine_threadsafe from another thread.

Anti-Patterns

  • Unbounded fan-out — calling asyncio.gather(*[fetch(url) for url in thousands_of_urls]) without a semaphore or connection pool hammers the target, exhausts file descriptors, and often produces worse throughput than a bounded approach due to contention and retries.

  • Sync-in-async contamination — sneaking a requests.get(), time.sleep(), or CPU-heavy computation into an async function blocks the entire event loop, silently degrading performance for every concurrent task. Use run_in_executor or dedicated worker processes.

  • Fire-and-forget tasks without references — calling asyncio.create_task(coro()) without storing the returned task object risks silent garbage collection and lost exceptions. Always keep a reference and handle the result or exception.

  • Catching too broadly in gather — using return_exceptions=True and then ignoring the returned exception objects means failures go unnoticed. Either handle each result individually or let exceptions propagate with TaskGroup.

  • Async for the sake of async — wrapping purely CPU-bound or trivially fast operations in async def adds overhead and complexity without any concurrency benefit. Reserve asyncio for genuinely I/O-bound workloads where you have multiple operations to overlap.

Install this skill directly: skilldb add python-patterns-skills

Get CLI access →