Async Patterns
Asyncio patterns for concurrent I/O-bound programming in Python
You are an expert in Python asyncio patterns for writing efficient, correct concurrent code. ## Key Points - **Coroutines** are defined with `async def` and paused/resumed at `await` expressions. - **Tasks** (`asyncio.create_task`) schedule coroutines for concurrent execution on the event loop. - **`asyncio.gather`** runs multiple awaitables concurrently and collects results. - **`asyncio.TaskGroup`** (Python 3.11+) provides structured concurrency with automatic cancellation on failure. - **`asyncio.Semaphore`** limits concurrency to avoid overwhelming resources. - **The event loop** is single-threaded; blocking calls freeze everything unless offloaded with `run_in_executor`. - Use `asyncio.run()` as the single entry point; avoid manually managing the event loop. - Prefer `TaskGroup` (3.11+) over bare `gather` for structured concurrency and cleaner error handling. - Always set timeouts on network operations to prevent indefinite hangs. - Use `asyncio.Semaphore` to bound concurrency when hitting rate-limited APIs or constrained resources. - Offload CPU-bound or blocking I/O work to `run_in_executor` — never block the event loop. - Use `async with` and `async for` for resource management and iteration in async contexts.
skilldb get python-patterns-skills/Async PatternsFull skill: 217 linesAsync Patterns — Python Patterns
You are an expert in Python asyncio patterns for writing efficient, correct concurrent code.
Overview
Python's asyncio module provides an event loop, coroutines, tasks, and synchronization primitives for cooperative multitasking. It excels at I/O-bound workloads — network requests, database queries, file operations — where threads would waste resources waiting. Understanding the event loop model and structured concurrency patterns is key to writing reliable async code.
Core Philosophy
Asynchronous programming in Python is fundamentally about cooperative multitasking — your code voluntarily yields control at await points so other tasks can progress. This is not parallelism in the traditional sense; it is concurrency achieved through an event loop that multiplexes I/O-bound operations on a single thread. The key mental model is that your program is a collection of interleaved conversations with external systems (networks, databases, file systems), and asyncio lets you manage all those conversations simultaneously without dedicating a thread to each one.
Structured concurrency is the modern discipline that should guide your async design. Rather than spawning tasks into the void and hoping they complete, you scope task lifetimes to well-defined blocks using TaskGroup or async with constructs. This ensures that errors propagate predictably, resources are cleaned up, and you never lose track of background work. Think of it as the async equivalent of structured programming — no goto, no fire-and-forget.
Async code should be written with back-pressure and resource bounds in mind from the start. An unbounded gather of ten thousand HTTP requests will overwhelm both your process and the remote server. Semaphores, queues with max sizes, and connection pooling are not afterthoughts — they are essential structural elements. The best async code looks almost boring: clear entry points, bounded concurrency, explicit timeouts, and straightforward error handling.
Core Concepts
- Coroutines are defined with
async defand paused/resumed atawaitexpressions. - Tasks (
asyncio.create_task) schedule coroutines for concurrent execution on the event loop. asyncio.gatherruns multiple awaitables concurrently and collects results.asyncio.TaskGroup(Python 3.11+) provides structured concurrency with automatic cancellation on failure.asyncio.Semaphorelimits concurrency to avoid overwhelming resources.- The event loop is single-threaded; blocking calls freeze everything unless offloaded with
run_in_executor.
Implementation Patterns
Basic concurrent execution
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> str:
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["https://example.com/a", "https://example.com/b", "https://example.com/c"]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*(fetch(session, u) for u in urls))
return results
asyncio.run(main())
TaskGroup for structured concurrency (3.11+)
import asyncio
async def process_item(item: str) -> str:
await asyncio.sleep(1)
return item.upper()
async def main():
results = []
async with asyncio.TaskGroup() as tg:
for item in ["alpha", "beta", "gamma"]:
task = tg.create_task(process_item(item))
results.append(task)
# All tasks complete here or the group raises ExceptionGroup
return [t.result() for t in results]
Semaphore to limit concurrency
import asyncio
async def rate_limited_fetch(sem: asyncio.Semaphore, url: str) -> str:
async with sem:
# At most N concurrent requests
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def main():
sem = asyncio.Semaphore(10)
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
tasks = [rate_limited_fetch(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
Producer-consumer with asyncio.Queue
import asyncio
async def producer(queue: asyncio.Queue, items: list[str]):
for item in items:
await queue.put(item)
await queue.put(None) # sentinel
async def consumer(queue: asyncio.Queue, worker_id: int):
while True:
item = await queue.get()
if item is None:
queue.task_done()
await queue.put(None) # propagate sentinel
break
print(f"Worker {worker_id} processing {item}")
await asyncio.sleep(0.5)
queue.task_done()
async def main():
queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=20)
items = [f"task-{i}" for i in range(50)]
producer_task = asyncio.create_task(producer(queue, items))
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(5)]
await producer_task
await queue.join()
for c in consumers:
c.cancel()
Running blocking code in executor
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io_operation(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()
async def main():
loop = asyncio.get_running_loop()
# Offload blocking call to thread pool
data = await loop.run_in_executor(None, blocking_io_operation, "/tmp/large_file.bin")
# Or with a custom executor
with ThreadPoolExecutor(max_workers=4) as pool:
data = await loop.run_in_executor(pool, blocking_io_operation, "/tmp/large_file.bin")
Timeout and cancellation
import asyncio
async def slow_operation():
await asyncio.sleep(30)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
# Or with asyncio.timeout (3.11+)
try:
async with asyncio.timeout(5.0):
result = await slow_operation()
except TimeoutError:
print("Operation timed out")
Async generator
import asyncio
from typing import AsyncIterator
async def paginated_fetch(url: str) -> AsyncIterator[dict]:
page = 1
while True:
data = await fetch_page(url, page)
if not data["items"]:
break
for item in data["items"]:
yield item
page += 1
async def main():
async for item in paginated_fetch("https://api.example.com/items"):
process(item)
Best Practices
- Use
asyncio.run()as the single entry point; avoid manually managing the event loop. - Prefer
TaskGroup(3.11+) over baregatherfor structured concurrency and cleaner error handling. - Always set timeouts on network operations to prevent indefinite hangs.
- Use
asyncio.Semaphoreto bound concurrency when hitting rate-limited APIs or constrained resources. - Offload CPU-bound or blocking I/O work to
run_in_executor— never block the event loop. - Use
async withandasync forfor resource management and iteration in async contexts.
Common Pitfalls
- Blocking the event loop with synchronous calls (e.g.,
time.sleep,requests.get, CPU work) freezes all concurrent tasks. - Forgetting to
awaita coroutine returns a coroutine object instead of executing it, with no error at the call site. - Fire-and-forget tasks created with
create_taskcan be garbage collected; keep a reference to avoid silent cancellation. - Mixing
asyncio.gatherwith tasks that raise — by default one exception propagates and others are cancelled; usereturn_exceptions=Trueif you want all results. - Nested
asyncio.run()raisesRuntimeError— useawaitwithin an already-running loop, or useasyncio.run_coroutine_threadsafefrom another thread.
Anti-Patterns
-
Unbounded fan-out — calling
asyncio.gather(*[fetch(url) for url in thousands_of_urls])without a semaphore or connection pool hammers the target, exhausts file descriptors, and often produces worse throughput than a bounded approach due to contention and retries. -
Sync-in-async contamination — sneaking a
requests.get(),time.sleep(), or CPU-heavy computation into an async function blocks the entire event loop, silently degrading performance for every concurrent task. Userun_in_executoror dedicated worker processes. -
Fire-and-forget tasks without references — calling
asyncio.create_task(coro())without storing the returned task object risks silent garbage collection and lost exceptions. Always keep a reference and handle the result or exception. -
Catching too broadly in gather — using
return_exceptions=Trueand then ignoring the returned exception objects means failures go unnoticed. Either handle each result individually or let exceptions propagate withTaskGroup. -
Async for the sake of async — wrapping purely CPU-bound or trivially fast operations in
async defadds overhead and complexity without any concurrency benefit. Reserve asyncio for genuinely I/O-bound workloads where you have multiple operations to overlap.
Install this skill directly: skilldb add python-patterns-skills
Related Skills
Context Managers
Context manager patterns using with statements for reliable resource management in Python
Dataclasses
Dataclass and Pydantic model patterns for structured data in Python
Decorators
Decorator patterns for wrapping, extending, and composing Python functions and classes
Dependency Injection
Dependency injection patterns for loosely coupled, testable Python applications
Generators
Generator and itertools patterns for memory-efficient data processing in Python
Metaclasses
Metaclass and descriptor patterns for advanced class customization in Python