Celery
Celery patterns for distributed task queues, scheduling, retries, and worker management
You are an expert in Celery for building distributed task queues, scheduled jobs, and async processing pipelines.
## Key Points
- Set `task_acks_late=True` and `worker_prefetch_multiplier=1` so tasks are not lost if a worker crashes mid-execution.
- Use `bind=True` and `self.retry()` with exponential backoff for tasks that call external services.
- Set `expires` on tasks that become irrelevant after a time window (like sending a time-sensitive notification).
- Route CPU-heavy tasks to dedicated queues with limited concurrency to avoid starving I/O tasks.
- Use `shared_task` instead of `app.task` so tasks are portable across Django apps and test configurations.
- Store only small, serializable return values. Do not pass large objects through the result backend.
- Passing ORM objects as task arguments. Always pass IDs and re-fetch inside the task, because objects may be stale or unserializable.
- Running Beat and workers in the same process in production. Use separate processes: one for Beat, one or more for workers.
- Ignoring task result expiry. If `result_expires` is not set, result keys accumulate in Redis indefinitely.
- Using `task_always_eager=True` in production. This runs tasks synchronously and defeats the purpose of a task queue.
- Blocking the event loop in tasks that should be async. Celery workers are thread/process-based, not async. Do not mix in `asyncio.run()` inside Celery tasks without careful handling.
- Not setting `task_reject_on_worker_lost=True` when using `acks_late`, which can cause tasks to be stuck in an unacknowledged state.
## Quick Example
```bash
pip install "celery[redis]" redis
```
```python
# proj/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
```skilldb get python-web-skills/CeleryFull skill: 281 linesCelery — Python Web Development
You are an expert in Celery for building distributed task queues, scheduled jobs, and async processing pipelines.
Core Philosophy
Overview
Celery is a distributed task queue for Python that enables offloading work from web request cycles into background workers. It supports multiple message brokers (Redis, RabbitMQ), result backends, task routing, periodic scheduling with Celery Beat, and robust retry/error handling.
Setup & Configuration
pip install "celery[redis]" redis
# proj/celery.py
from celery import Celery
app = Celery("proj")
app.config_from_object("proj.celeryconfig")
app.autodiscover_tasks(["proj.tasks"])
# proj/celeryconfig.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "UTC"
enable_utc = True
task_acks_late = True
worker_prefetch_multiplier = 1
task_routes = {
"proj.tasks.send_email": {"queue": "email"},
"proj.tasks.process_image": {"queue": "media"},
}
task_default_queue = "default"
task_default_rate_limit = "10/s"
Django Integration
# proj/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
app = Celery("proj")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# proj/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
Core Patterns
Defining Tasks
# proj/tasks/email.py
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_backoff_max=600,
)
def send_email(self, to_address: str, subject: str, body: str):
logger.info("Sending email to %s", to_address)
try:
result = email_service.send(to=to_address, subject=subject, body=body)
return {"message_id": result.id}
except RateLimitExceeded as exc:
raise self.retry(exc=exc, countdown=120)
Calling Tasks
# Async dispatch (returns AsyncResult)
result = send_email.delay("user@example.com", "Welcome", "Hello!")
# With explicit options
result = send_email.apply_async(
args=["user@example.com", "Welcome", "Hello!"],
countdown=30, # delay execution by 30 seconds
expires=3600, # expire if not picked up within 1 hour
queue="email",
priority=5,
)
# Check result
if result.ready():
print(result.get(timeout=10))
Task Chains, Groups, and Chords
from celery import chain, group, chord
# Chain: sequential pipeline
pipeline = chain(
download_file.s(url),
process_file.s(),
upload_result.s(destination),
)
result = pipeline.apply_async()
# Group: parallel execution
batch = group(
process_image.s(image_id) for image_id in image_ids
)
group_result = batch.apply_async()
# Chord: group + callback when all complete
workflow = chord(
[fetch_data.s(source) for source in sources],
aggregate_results.s(),
)
workflow.apply_async()
Periodic Tasks with Celery Beat
# celeryconfig.py
from celery.schedules import crontab
beat_schedule = {
"cleanup-expired-sessions": {
"task": "proj.tasks.cleanup_sessions",
"schedule": crontab(minute=0, hour="*/6"),
},
"daily-report": {
"task": "proj.tasks.generate_daily_report",
"schedule": crontab(minute=0, hour=8),
"kwargs": {"report_type": "summary"},
},
"heartbeat": {
"task": "proj.tasks.heartbeat",
"schedule": 30.0, # every 30 seconds
},
}
# Start beat scheduler
celery -A proj beat --loglevel=info
# Start worker
celery -A proj worker --loglevel=info --concurrency=4 -Q default,email
Task Base Classes
from celery import Task
class DatabaseTask(Task):
_db_session = None
@property
def db(self):
if self._db_session is None:
self._db_session = create_session()
return self._db_session
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self._db_session is not None:
self._db_session.close()
self._db_session = None
@shared_task(base=DatabaseTask, bind=True)
def update_user_stats(self, user_id: int):
user = self.db.query(User).get(user_id)
user.recalculate_stats()
self.db.commit()
Monitoring and Signals
from celery.signals import task_failure, task_success, worker_ready
@task_failure.connect
def handle_task_failure(sender, task_id, exception, traceback, **kwargs):
logger.error("Task %s failed: %s", task_id, exception)
notify_sentry(exception, task_id=task_id)
@task_success.connect
def handle_task_success(sender, result, **kwargs):
metrics.increment("celery.task.success", tags={"task": sender.name})
@worker_ready.connect
def on_worker_ready(**kwargs):
logger.info("Worker is ready to accept tasks")
Testing
import pytest
from proj.tasks.email import send_email
@pytest.fixture
def celery_config():
return {
"task_always_eager": True,
"task_eager_propagates": True,
}
def test_send_email(celery_config):
result = send_email.delay("test@example.com", "Test", "Body")
assert result.successful()
assert "message_id" in result.result
Best Practices
- Set
task_acks_late=Trueandworker_prefetch_multiplier=1so tasks are not lost if a worker crashes mid-execution. - Use
bind=Trueandself.retry()with exponential backoff for tasks that call external services. - Set
expireson tasks that become irrelevant after a time window (like sending a time-sensitive notification). - Route CPU-heavy tasks to dedicated queues with limited concurrency to avoid starving I/O tasks.
- Use
shared_taskinstead ofapp.taskso tasks are portable across Django apps and test configurations. - Store only small, serializable return values. Do not pass large objects through the result backend.
Common Pitfalls
- Passing ORM objects as task arguments. Always pass IDs and re-fetch inside the task, because objects may be stale or unserializable.
- Running Beat and workers in the same process in production. Use separate processes: one for Beat, one or more for workers.
- Ignoring task result expiry. If
result_expiresis not set, result keys accumulate in Redis indefinitely. - Using
task_always_eager=Truein production. This runs tasks synchronously and defeats the purpose of a task queue. - Blocking the event loop in tasks that should be async. Celery workers are thread/process-based, not async. Do not mix in
asyncio.run()inside Celery tasks without careful handling. - Not setting
task_reject_on_worker_lost=Truewhen usingacks_late, which can cause tasks to be stuck in an unacknowledged state.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add python-web-skills
Related Skills
Django Admin
Django admin customization patterns for list views, forms, inlines, actions, and permissions
Django ORM
Django ORM patterns for models, querysets, migrations, and database optimization
Django REST Framework
Django REST Framework patterns for building, serializing, and securing RESTful APIs
Fastapi
FastAPI patterns for async APIs, dependency injection, Pydantic models, and OpenAPI integration
Flask
Flask application patterns for routing, blueprints, extensions, and application factories
Python Websockets
WebSocket patterns for real-time communication using FastAPI, Django Channels, and the websockets library