Skip to main content
Technology & EngineeringPython Web281 lines

Celery

Celery patterns for distributed task queues, scheduling, retries, and worker management

Quick Summary31 lines
You are an expert in Celery for building distributed task queues, scheduled jobs, and async processing pipelines.

## Key Points

- Set `task_acks_late=True` and `worker_prefetch_multiplier=1` so tasks are not lost if a worker crashes mid-execution.
- Use `bind=True` and `self.retry()` with exponential backoff for tasks that call external services.
- Set `expires` on tasks that become irrelevant after a time window (like sending a time-sensitive notification).
- Route CPU-heavy tasks to dedicated queues with limited concurrency to avoid starving I/O tasks.
- Use `shared_task` instead of `app.task` so tasks are portable across Django apps and test configurations.
- Store only small, serializable return values. Do not pass large objects through the result backend.
- Passing ORM objects as task arguments. Always pass IDs and re-fetch inside the task, because objects may be stale or unserializable.
- Running Beat and workers in the same process in production. Use separate processes: one for Beat, one or more for workers.
- Ignoring task result expiry. If `result_expires` is not set, result keys accumulate in Redis indefinitely.
- Using `task_always_eager=True` in production. This runs tasks synchronously and defeats the purpose of a task queue.
- Blocking the event loop in tasks that should be async. Celery workers are thread/process-based, not async. Do not mix in `asyncio.run()` inside Celery tasks without careful handling.
- Not setting `task_reject_on_worker_lost=True` when using `acks_late`, which can cause tasks to be stuck in an unacknowledged state.

## Quick Example

```bash
pip install "celery[redis]" redis
```

```python
# proj/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)
```
skilldb get python-web-skills/CeleryFull skill: 281 lines
Paste into your CLAUDE.md or agent config

Celery — Python Web Development

You are an expert in Celery for building distributed task queues, scheduled jobs, and async processing pipelines.

Core Philosophy

Overview

Celery is a distributed task queue for Python that enables offloading work from web request cycles into background workers. It supports multiple message brokers (Redis, RabbitMQ), result backends, task routing, periodic scheduling with Celery Beat, and robust retry/error handling.

Setup & Configuration

pip install "celery[redis]" redis
# proj/celery.py
from celery import Celery

app = Celery("proj")
app.config_from_object("proj.celeryconfig")
app.autodiscover_tasks(["proj.tasks"])
# proj/celeryconfig.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"

task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "UTC"
enable_utc = True

task_acks_late = True
worker_prefetch_multiplier = 1

task_routes = {
    "proj.tasks.send_email": {"queue": "email"},
    "proj.tasks.process_image": {"queue": "media"},
}

task_default_queue = "default"
task_default_rate_limit = "10/s"

Django Integration

# proj/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")

app = Celery("proj")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# proj/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)

Core Patterns

Defining Tasks

# proj/tasks/email.py
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,
)
def send_email(self, to_address: str, subject: str, body: str):
    logger.info("Sending email to %s", to_address)
    try:
        result = email_service.send(to=to_address, subject=subject, body=body)
        return {"message_id": result.id}
    except RateLimitExceeded as exc:
        raise self.retry(exc=exc, countdown=120)

Calling Tasks

# Async dispatch (returns AsyncResult)
result = send_email.delay("user@example.com", "Welcome", "Hello!")

# With explicit options
result = send_email.apply_async(
    args=["user@example.com", "Welcome", "Hello!"],
    countdown=30,           # delay execution by 30 seconds
    expires=3600,           # expire if not picked up within 1 hour
    queue="email",
    priority=5,
)

# Check result
if result.ready():
    print(result.get(timeout=10))

Task Chains, Groups, and Chords

from celery import chain, group, chord

# Chain: sequential pipeline
pipeline = chain(
    download_file.s(url),
    process_file.s(),
    upload_result.s(destination),
)
result = pipeline.apply_async()

# Group: parallel execution
batch = group(
    process_image.s(image_id) for image_id in image_ids
)
group_result = batch.apply_async()

# Chord: group + callback when all complete
workflow = chord(
    [fetch_data.s(source) for source in sources],
    aggregate_results.s(),
)
workflow.apply_async()

Periodic Tasks with Celery Beat

# celeryconfig.py
from celery.schedules import crontab

beat_schedule = {
    "cleanup-expired-sessions": {
        "task": "proj.tasks.cleanup_sessions",
        "schedule": crontab(minute=0, hour="*/6"),
    },
    "daily-report": {
        "task": "proj.tasks.generate_daily_report",
        "schedule": crontab(minute=0, hour=8),
        "kwargs": {"report_type": "summary"},
    },
    "heartbeat": {
        "task": "proj.tasks.heartbeat",
        "schedule": 30.0,  # every 30 seconds
    },
}
# Start beat scheduler
celery -A proj beat --loglevel=info

# Start worker
celery -A proj worker --loglevel=info --concurrency=4 -Q default,email

Task Base Classes

from celery import Task


class DatabaseTask(Task):
    _db_session = None

    @property
    def db(self):
        if self._db_session is None:
            self._db_session = create_session()
        return self._db_session

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self._db_session is not None:
            self._db_session.close()
            self._db_session = None


@shared_task(base=DatabaseTask, bind=True)
def update_user_stats(self, user_id: int):
    user = self.db.query(User).get(user_id)
    user.recalculate_stats()
    self.db.commit()

Monitoring and Signals

from celery.signals import task_failure, task_success, worker_ready


@task_failure.connect
def handle_task_failure(sender, task_id, exception, traceback, **kwargs):
    logger.error("Task %s failed: %s", task_id, exception)
    notify_sentry(exception, task_id=task_id)


@task_success.connect
def handle_task_success(sender, result, **kwargs):
    metrics.increment("celery.task.success", tags={"task": sender.name})


@worker_ready.connect
def on_worker_ready(**kwargs):
    logger.info("Worker is ready to accept tasks")

Testing

import pytest
from proj.tasks.email import send_email


@pytest.fixture
def celery_config():
    return {
        "task_always_eager": True,
        "task_eager_propagates": True,
    }


def test_send_email(celery_config):
    result = send_email.delay("test@example.com", "Test", "Body")
    assert result.successful()
    assert "message_id" in result.result

Best Practices

  • Set task_acks_late=True and worker_prefetch_multiplier=1 so tasks are not lost if a worker crashes mid-execution.
  • Use bind=True and self.retry() with exponential backoff for tasks that call external services.
  • Set expires on tasks that become irrelevant after a time window (like sending a time-sensitive notification).
  • Route CPU-heavy tasks to dedicated queues with limited concurrency to avoid starving I/O tasks.
  • Use shared_task instead of app.task so tasks are portable across Django apps and test configurations.
  • Store only small, serializable return values. Do not pass large objects through the result backend.

Common Pitfalls

  • Passing ORM objects as task arguments. Always pass IDs and re-fetch inside the task, because objects may be stale or unserializable.
  • Running Beat and workers in the same process in production. Use separate processes: one for Beat, one or more for workers.
  • Ignoring task result expiry. If result_expires is not set, result keys accumulate in Redis indefinitely.
  • Using task_always_eager=True in production. This runs tasks synchronously and defeats the purpose of a task queue.
  • Blocking the event loop in tasks that should be async. Celery workers are thread/process-based, not async. Do not mix in asyncio.run() inside Celery tasks without careful handling.
  • Not setting task_reject_on_worker_lost=True when using acks_late, which can cause tasks to be stuck in an unacknowledged state.

Anti-Patterns

Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.

Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.

Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.

Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.

Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.

Install this skill directly: skilldb add python-web-skills

Get CLI access →