Skip to main content
Technology & EngineeringBackground Jobs Services353 lines

BullMQ

"BullMQ: Redis-based job queue, workers, delayed jobs, rate limiting, job priorities, repeatable jobs, concurrency, dashboard"

Quick Summary18 lines
BullMQ is a high-performance, Redis-based job queue for Node.js. It provides a robust foundation for background job processing with fine-grained control over execution semantics: priorities, delays, rate limiting, retries, concurrency, and repeatable schedules. Unlike managed services, BullMQ runs on your own infrastructure with Redis as the sole dependency, giving you full control over data locality, cost, and scaling.

## Key Points

- **Set `maxRetriesPerRequest: null`** on the Redis connection for workers. BullMQ requires this to block-wait for jobs correctly.
- **Use `removeOnComplete` and `removeOnFail`** to prevent Redis memory from growing unbounded with completed job data.
- **Assign job IDs for deduplication** when the same logical job might be enqueued multiple times (e.g., user sync after multiple profile edits).
- **Use FlowProducer for dependent jobs** rather than manually triggering child jobs inside a worker. Flows give you built-in dependency resolution.
- **Monitor with QueueEvents or Bull Board.** Production queues need visibility into backlogs, failure rates, and processing times.
- **Scale workers horizontally** by running the same worker code on multiple processes or machines. Redis coordinates job distribution automatically.
- **Use `job.updateProgress`** for long-running jobs so the dashboard and API consumers can track completion percentage.
- **Close queues and workers gracefully** on process shutdown to avoid stuck jobs.
- **Sharing a Redis connection between Queue and Worker.** Workers need `maxRetriesPerRequest: null`; other Redis clients may not. Use separate connection configs.
- **Storing large payloads in job data.** Redis holds all job data in memory. Store large data in S3/database and pass a reference in the job.
- **Not setting `removeOnComplete`.** Without cleanup, Redis accumulates completed job records indefinitely, leading to memory exhaustion.
- **Using BullMQ for real-time pub/sub.** BullMQ is a job queue, not a message broker. Use Redis pub/sub or WebSockets for real-time communication.
skilldb get background-jobs-services-skills/BullMQFull skill: 353 lines
Paste into your CLAUDE.md or agent config

BullMQ

Core Philosophy

BullMQ is a high-performance, Redis-based job queue for Node.js. It provides a robust foundation for background job processing with fine-grained control over execution semantics: priorities, delays, rate limiting, retries, concurrency, and repeatable schedules. Unlike managed services, BullMQ runs on your own infrastructure with Redis as the sole dependency, giving you full control over data locality, cost, and scaling.

The architecture separates producers (code that adds jobs) from consumers (workers that process them). This decoupling lets you scale workers independently, deploy them on different machines, and process different queues with different concurrency settings. BullMQ uses Redis Streams under the hood for reliable, ordered message delivery with at-least-once processing guarantees.

Setup

Installation and Queue Configuration

// npm install bullmq ioredis

// lib/queue/connection.ts
import { ConnectionOptions } from "bullmq";

export const redisConnection: ConnectionOptions = {
  host: process.env.REDIS_HOST ?? "localhost",
  port: parseInt(process.env.REDIS_PORT ?? "6379"),
  password: process.env.REDIS_PASSWORD,
  maxRetriesPerRequest: null, // Required for BullMQ workers
};

Defining Queues

// lib/queue/queues.ts
import { Queue } from "bullmq";
import { redisConnection } from "./connection";

export const emailQueue = new Queue("email", {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: "exponential", delay: 2000 },
    removeOnComplete: { age: 86400, count: 1000 }, // Keep last 1000 or 24h
    removeOnFail: { age: 604800 },                 // Keep failed jobs for 7 days
  },
});

export const processingQueue = new Queue("processing", {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 5,
    backoff: { type: "exponential", delay: 5000 },
  },
});

Worker Setup

// workers/email-worker.ts
import { Worker, Job } from "bullmq";
import { redisConnection } from "../lib/queue/connection";

interface EmailJobData {
  to: string;
  subject: string;
  templateId: string;
  vars: Record<string, string>;
}

const worker = new Worker<EmailJobData>(
  "email",
  async (job: Job<EmailJobData>) => {
    const { to, subject, templateId, vars } = job.data;

    await job.updateProgress(10);
    const html = await renderTemplate(templateId, vars);

    await job.updateProgress(50);
    const result = await sendEmail({ to, subject, html });

    await job.updateProgress(100);
    return { messageId: result.messageId };
  },
  {
    connection: redisConnection,
    concurrency: 10,           // Process 10 jobs simultaneously
    limiter: {
      max: 100,                // Max 100 jobs
      duration: 60_000,        // per 60 seconds
    },
  }
);

worker.on("completed", (job) => {
  console.log(`Job ${job.id} completed: ${job.returnvalue?.messageId}`);
});

worker.on("failed", (job, err) => {
  console.error(`Job ${job?.id} failed: ${err.message}`);
});

Key Techniques

Adding Jobs with Options

// app/api/notifications/route.ts
import { emailQueue, processingQueue } from "@/lib/queue/queues";

export async function POST(req: Request) {
  const { userId, type } = await req.json();

  // Basic job
  await emailQueue.add("welcome", {
    to: "user@example.com",
    subject: "Welcome!",
    templateId: "welcome",
    vars: { name: "Alice" },
  });

  // Delayed job — sends in 30 minutes
  await emailQueue.add(
    "followup",
    { to: "user@example.com", subject: "How's it going?", templateId: "followup", vars: {} },
    { delay: 30 * 60 * 1000 }
  );

  // Priority job — lower number = higher priority
  await emailQueue.add(
    "password-reset",
    { to: "user@example.com", subject: "Reset password", templateId: "reset", vars: {} },
    { priority: 1 }
  );

  // Job with custom ID for deduplication
  await processingQueue.add(
    "sync-user",
    { userId },
    { jobId: `sync-user-${userId}` } // Same ID = job is not added if it already exists
  );

  return Response.json({ queued: true });
}

Repeatable (Cron) Jobs

// lib/queue/setup-repeatable.ts
import { emailQueue, processingQueue } from "./queues";

export async function setupRepeatableJobs() {
  // Daily digest at 8 AM UTC
  await emailQueue.add(
    "daily-digest",
    { templateId: "digest" },
    {
      repeat: { pattern: "0 8 * * *" },
      jobId: "daily-digest", // Ensures only one repeatable job with this config
    }
  );

  // Process stale records every 15 minutes
  await processingQueue.add(
    "cleanup-stale",
    {},
    {
      repeat: { every: 15 * 60 * 1000 },
      jobId: "cleanup-stale",
    }
  );

  // Remove outdated repeatable jobs
  const repeatableJobs = await processingQueue.getRepeatableJobs();
  for (const job of repeatableJobs) {
    if (job.name === "old-deprecated-job") {
      await processingQueue.removeRepeatableByKey(job.key);
    }
  }
}

Flow Producer for Job Dependencies

// lib/queue/flows.ts
import { FlowProducer } from "bullmq";
import { redisConnection } from "./connection";

const flowProducer = new FlowProducer({ connection: redisConnection });

export async function createOrderFlow(orderId: string) {
  // Parent job waits for all children to complete before running
  const flow = await flowProducer.add({
    name: "finalize-order",
    queueName: "processing",
    data: { orderId },
    children: [
      {
        name: "charge-payment",
        queueName: "processing",
        data: { orderId },
      },
      {
        name: "reserve-inventory",
        queueName: "processing",
        data: { orderId },
      },
      {
        name: "send-confirmation",
        queueName: "email",
        data: {
          to: "customer@example.com",
          subject: "Order confirmed",
          templateId: "order-confirm",
          vars: { orderId },
        },
      },
    ],
  });

  return flow;
}

Worker with Named Processors

// workers/processing-worker.ts
import { Worker, Job } from "bullmq";
import { redisConnection } from "../lib/queue/connection";

type ProcessingJobData =
  | { type: "sync-user"; userId: string }
  | { type: "generate-report"; orgId: string; month: string }
  | { type: "cleanup-stale" };

const worker = new Worker<ProcessingJobData>(
  "processing",
  async (job: Job<ProcessingJobData>) => {
    switch (job.name) {
      case "sync-user":
        return await syncUserToCRM(job.data as { type: "sync-user"; userId: string });

      case "generate-report":
        return await generateMonthlyReport(job.data as { type: "generate-report"; orgId: string; month: string });

      case "cleanup-stale":
        return await cleanupStaleRecords();

      case "finalize-order":
        // Access children results via getChildrenValues
        const childResults = await job.getChildrenValues();
        return await finalizeOrder(job.data, childResults);

      default:
        throw new Error(`Unknown job name: ${job.name}`);
    }
  },
  {
    connection: redisConnection,
    concurrency: 5,
  }
);

Job Progress and Events

// Monitoring job progress from the API
import { Queue, QueueEvents } from "bullmq";
import { redisConnection } from "@/lib/queue/connection";

export async function getJobStatus(queueName: string, jobId: string) {
  const queue = new Queue(queueName, { connection: redisConnection });
  const job = await queue.getJob(jobId);

  if (!job) return null;

  return {
    id: job.id,
    name: job.name,
    state: await job.getState(),
    progress: job.progress,
    returnValue: job.returnvalue,
    failedReason: job.failedReason,
    attemptsMade: job.attemptsMade,
    timestamp: job.timestamp,
  };
}

// Real-time events via QueueEvents
const queueEvents = new QueueEvents("processing", { connection: redisConnection });

queueEvents.on("completed", ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed`, returnvalue);
});

queueEvents.on("failed", ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} failed: ${failedReason}`);
});

queueEvents.on("progress", ({ jobId, data }) => {
  console.log(`Job ${jobId} progress: ${data}%`);
});

Bull Board Dashboard

// app/api/admin/queues/route.ts (Next.js integration with Bull Board)
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ExpressAdapter } from "@bull-board/express";
import { emailQueue, processingQueue } from "@/lib/queue/queues";

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/api/admin/queues");

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(processingQueue),
  ],
  serverAdapter,
});

// Mount the adapter with your framework's routing

Best Practices

  • Set maxRetriesPerRequest: null on the Redis connection for workers. BullMQ requires this to block-wait for jobs correctly.
  • Use removeOnComplete and removeOnFail to prevent Redis memory from growing unbounded with completed job data.
  • Assign job IDs for deduplication when the same logical job might be enqueued multiple times (e.g., user sync after multiple profile edits).
  • Use FlowProducer for dependent jobs rather than manually triggering child jobs inside a worker. Flows give you built-in dependency resolution.
  • Monitor with QueueEvents or Bull Board. Production queues need visibility into backlogs, failure rates, and processing times.
  • Scale workers horizontally by running the same worker code on multiple processes or machines. Redis coordinates job distribution automatically.
  • Use job.updateProgress for long-running jobs so the dashboard and API consumers can track completion percentage.
  • Close queues and workers gracefully on process shutdown to avoid stuck jobs.

Anti-Patterns

  • Sharing a Redis connection between Queue and Worker. Workers need maxRetriesPerRequest: null; other Redis clients may not. Use separate connection configs.
  • Storing large payloads in job data. Redis holds all job data in memory. Store large data in S3/database and pass a reference in the job.
  • Not setting removeOnComplete. Without cleanup, Redis accumulates completed job records indefinitely, leading to memory exhaustion.
  • Using BullMQ for real-time pub/sub. BullMQ is a job queue, not a message broker. Use Redis pub/sub or WebSockets for real-time communication.
  • Ignoring the failed event. Unhandled job failures silently accumulate. Always log failures and set up alerts.
  • Running workers inside serverless functions. Workers need persistent processes. Deploy workers as long-running services, not in Lambda or Vercel Functions.

Install this skill directly: skilldb add background-jobs-services-skills

Get CLI access →