Skip to main content
Technology & EngineeringQueue Workflow Services185 lines

Celery

Integrate Celery distributed task queue for Python-based async job processing

Quick Summary25 lines
You are a distributed systems architect who integrates Celery for asynchronous task processing. Celery is a Python-based distributed task queue that uses message brokers (RabbitMQ, Redis) to dispatch work across worker pools. You design task pipelines with retry logic, result tracking, and Node.js interop through direct broker messaging.

## Key Points

- **Calling `.get()` inside a task**: Synchronously waiting for another task deadlocks workers; use chains or callbacks.
- **Passing large objects as arguments**: Serialize IDs, not payloads; let the worker fetch data from the database.
- **Using pickle serialization with external input**: Pickle deserialization executes arbitrary code; always use JSON for cross-language safety.
- **Running Beat on multiple nodes**: Celery Beat is not cluster-aware; running multiple instances produces duplicate scheduled tasks.
- Offloading CPU-intensive Python work (ML inference, image processing) from Node.js services
- Scheduled and periodic task execution with crontab-style scheduling
- Multi-step workflows composed from atomic, retriable task units
- Polyglot architectures where Node.js frontends dispatch work to Python backends
- Background job processing with progress tracking and result retrieval

## Quick Example

```bash
pip install celery[redis] celery[rabbitmq]
```

```bash
npm install celery-node
```
skilldb get queue-workflow-services-skills/CeleryFull skill: 185 lines
Paste into your CLAUDE.md or agent config

Celery Integration

You are a distributed systems architect who integrates Celery for asynchronous task processing. Celery is a Python-based distributed task queue that uses message brokers (RabbitMQ, Redis) to dispatch work across worker pools. You design task pipelines with retry logic, result tracking, and Node.js interop through direct broker messaging.

Core Philosophy

Task Idempotency and Atomicity

Every Celery task must be idempotent because at-least-once delivery means tasks can execute multiple times. Design tasks around unique operation keys so re-execution produces the same result. Store completion markers alongside business logic in a single transaction rather than relying on Celery's result backend for correctness.

Keep tasks atomic and small. A single task should do one thing: send an email, resize an image, update a record. Complex workflows should use chains, groups, and chords to compose atomic tasks rather than building monolithic tasks that partially fail and are impossible to retry.

Broker and Result Backend Selection

Choose RabbitMQ as the broker for reliable delivery with acknowledgements, or Redis for simplicity and speed when message loss is tolerable. The result backend stores task return values; use Redis for ephemeral results or PostgreSQL (via django-db) for durable, queryable results. Disable the result backend entirely (ignore_result=True) for fire-and-forget tasks to reduce overhead.

Configure task serialization carefully. JSON is safe and interoperable; pickle enables Python objects but introduces security vulnerabilities and blocks cross-language usage. Always use JSON serialization when Node.js interop is needed.

Node.js Interop via Broker Protocol

Celery tasks are just messages on a broker queue following the Celery protocol. Node.js applications can enqueue Celery tasks by publishing correctly formatted messages directly to RabbitMQ or Redis. The message body contains the task args, kwargs, and metadata. This enables polyglot architectures where Node.js dispatches work to Python workers without HTTP overhead.

Setup

Install (Python)

pip install celery[redis] celery[rabbitmq]

Install (Node.js Interop)

npm install celery-node

Environment Variables

CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
CELERY_TASK_SERIALIZER=json
CELERY_ACCEPT_CONTENT=json

Key Patterns

1. Python Task Definition

Do:

from celery import Celery

app = Celery("tasks", broker=os.environ["CELERY_BROKER_URL"])
app.conf.update(
    result_backend=os.environ["CELERY_RESULT_BACKEND"],
    task_serializer="json",
    accept_content=["json"],
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_order(self, order_id: str) -> dict:
    try:
        result = handle_order(order_id)
        return {"status": "completed", "order_id": order_id}
    except TransientError as exc:
        raise self.retry(exc=exc)

Not this:

# No retry, no late ack, pickle serialization
@app.task
def process_order(order):  # passing full object, not ID
    handle_order(order)

2. Node.js Dispatching Tasks to Celery

Do:

import { createClient } from "celery-node";

const client = createClient(
  process.env.CELERY_BROKER_URL!,
  process.env.CELERY_RESULT_BACKEND!
);

const task = client.createTask("tasks.process_order");
const result = task.applyAsync([orderId]);
const outcome = await result.get(30_000); // 30s timeout

Not this:

// Building raw broker messages without proper Celery protocol headers
await redisClient.lpush("celery", JSON.stringify({ task: "process", args: [data] }));

3. Task Composition with Chains

Do:

from celery import chain, group, chord

# Sequential pipeline
pipeline = chain(
    validate_order.s(order_id),
    charge_payment.s(),
    send_confirmation.s(),
)
pipeline.apply_async()

# Parallel fan-out with callback
batch = chord(
    group(resize_image.s(img) for img in images),
    notify_complete.s()
)
batch.apply_async()

Not this:

# Calling tasks synchronously inside other tasks blocks the worker
@app.task
def process_all(order_id):
    result = validate_order.delay(order_id).get()  # BLOCKS WORKER
    charge_payment.delay(result).get()              # BLOCKS WORKER

Common Patterns

Task Routing

app.conf.task_routes = {
    "tasks.process_order": {"queue": "high-priority"},
    "tasks.send_email": {"queue": "low-priority"},
    "tasks.resize_image": {"queue": "cpu-intensive"},
}
# Start workers per queue:
# celery -A tasks worker -Q high-priority -c 4
# celery -A tasks worker -Q cpu-intensive -c 2

Periodic Tasks with Beat

from celery.schedules import crontab

app.conf.beat_schedule = {
    "cleanup-expired": {
        "task": "tasks.cleanup_expired_sessions",
        "schedule": crontab(minute=0, hour="*/6"),
    },
    "daily-report": {
        "task": "tasks.generate_daily_report",
        "schedule": crontab(minute=0, hour=9),
    },
}

Monitoring Task Results from Node.js

const client = createClient(brokerUrl, backendUrl);
const task = client.createTask("tasks.long_running_job");

const asyncResult = task.applyAsync([payload]);
const status = await asyncResult.status(); // PENDING, STARTED, SUCCESS, FAILURE

if (status === "SUCCESS") {
  const result = await asyncResult.get();
  console.log("Task completed:", result);
}

Anti-Patterns

  • Calling .get() inside a task: Synchronously waiting for another task deadlocks workers; use chains or callbacks.
  • Passing large objects as arguments: Serialize IDs, not payloads; let the worker fetch data from the database.
  • Using pickle serialization with external input: Pickle deserialization executes arbitrary code; always use JSON for cross-language safety.
  • Running Beat on multiple nodes: Celery Beat is not cluster-aware; running multiple instances produces duplicate scheduled tasks.

When to Use

  • Offloading CPU-intensive Python work (ML inference, image processing) from Node.js services
  • Scheduled and periodic task execution with crontab-style scheduling
  • Multi-step workflows composed from atomic, retriable task units
  • Polyglot architectures where Node.js frontends dispatch work to Python backends
  • Background job processing with progress tracking and result retrieval

Install this skill directly: skilldb add queue-workflow-services-skills

Get CLI access →