BullMQ
"BullMQ: Redis-based job queue, workers, delayed jobs, rate limiting, job priorities, repeatable jobs, concurrency, dashboard"
BullMQ is a high-performance, Redis-based job queue for Node.js. It provides a robust foundation for background job processing with fine-grained control over execution semantics: priorities, delays, rate limiting, retries, concurrency, and repeatable schedules. Unlike managed services, BullMQ runs on your own infrastructure with Redis as the sole dependency, giving you full control over data locality, cost, and scaling. ## Key Points - **Set `maxRetriesPerRequest: null`** on the Redis connection for workers. BullMQ requires this to block-wait for jobs correctly. - **Use `removeOnComplete` and `removeOnFail`** to prevent Redis memory from growing unbounded with completed job data. - **Assign job IDs for deduplication** when the same logical job might be enqueued multiple times (e.g., user sync after multiple profile edits). - **Use FlowProducer for dependent jobs** rather than manually triggering child jobs inside a worker. Flows give you built-in dependency resolution. - **Monitor with QueueEvents or Bull Board.** Production queues need visibility into backlogs, failure rates, and processing times. - **Scale workers horizontally** by running the same worker code on multiple processes or machines. Redis coordinates job distribution automatically. - **Use `job.updateProgress`** for long-running jobs so the dashboard and API consumers can track completion percentage. - **Close queues and workers gracefully** on process shutdown to avoid stuck jobs. - **Sharing a Redis connection between Queue and Worker.** Workers need `maxRetriesPerRequest: null`; other Redis clients may not. Use separate connection configs. - **Storing large payloads in job data.** Redis holds all job data in memory. Store large data in S3/database and pass a reference in the job. - **Not setting `removeOnComplete`.** Without cleanup, Redis accumulates completed job records indefinitely, leading to memory exhaustion. - **Using BullMQ for real-time pub/sub.** BullMQ is a job queue, not a message broker. Use Redis pub/sub or WebSockets for real-time communication.
skilldb get background-jobs-services-skills/BullMQFull skill: 353 linesBullMQ
Core Philosophy
BullMQ is a high-performance, Redis-based job queue for Node.js. It provides a robust foundation for background job processing with fine-grained control over execution semantics: priorities, delays, rate limiting, retries, concurrency, and repeatable schedules. Unlike managed services, BullMQ runs on your own infrastructure with Redis as the sole dependency, giving you full control over data locality, cost, and scaling.
The architecture separates producers (code that adds jobs) from consumers (workers that process them). This decoupling lets you scale workers independently, deploy them on different machines, and process different queues with different concurrency settings. BullMQ uses Redis Streams under the hood for reliable, ordered message delivery with at-least-once processing guarantees.
Setup
Installation and Queue Configuration
// npm install bullmq ioredis
// lib/queue/connection.ts
import { ConnectionOptions } from "bullmq";
export const redisConnection: ConnectionOptions = {
host: process.env.REDIS_HOST ?? "localhost",
port: parseInt(process.env.REDIS_PORT ?? "6379"),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // Required for BullMQ workers
};
Defining Queues
// lib/queue/queues.ts
import { Queue } from "bullmq";
import { redisConnection } from "./connection";
export const emailQueue = new Queue("email", {
connection: redisConnection,
defaultJobOptions: {
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
removeOnComplete: { age: 86400, count: 1000 }, // Keep last 1000 or 24h
removeOnFail: { age: 604800 }, // Keep failed jobs for 7 days
},
});
export const processingQueue = new Queue("processing", {
connection: redisConnection,
defaultJobOptions: {
attempts: 5,
backoff: { type: "exponential", delay: 5000 },
},
});
Worker Setup
// workers/email-worker.ts
import { Worker, Job } from "bullmq";
import { redisConnection } from "../lib/queue/connection";
interface EmailJobData {
to: string;
subject: string;
templateId: string;
vars: Record<string, string>;
}
const worker = new Worker<EmailJobData>(
"email",
async (job: Job<EmailJobData>) => {
const { to, subject, templateId, vars } = job.data;
await job.updateProgress(10);
const html = await renderTemplate(templateId, vars);
await job.updateProgress(50);
const result = await sendEmail({ to, subject, html });
await job.updateProgress(100);
return { messageId: result.messageId };
},
{
connection: redisConnection,
concurrency: 10, // Process 10 jobs simultaneously
limiter: {
max: 100, // Max 100 jobs
duration: 60_000, // per 60 seconds
},
}
);
worker.on("completed", (job) => {
console.log(`Job ${job.id} completed: ${job.returnvalue?.messageId}`);
});
worker.on("failed", (job, err) => {
console.error(`Job ${job?.id} failed: ${err.message}`);
});
Key Techniques
Adding Jobs with Options
// app/api/notifications/route.ts
import { emailQueue, processingQueue } from "@/lib/queue/queues";
export async function POST(req: Request) {
const { userId, type } = await req.json();
// Basic job
await emailQueue.add("welcome", {
to: "user@example.com",
subject: "Welcome!",
templateId: "welcome",
vars: { name: "Alice" },
});
// Delayed job — sends in 30 minutes
await emailQueue.add(
"followup",
{ to: "user@example.com", subject: "How's it going?", templateId: "followup", vars: {} },
{ delay: 30 * 60 * 1000 }
);
// Priority job — lower number = higher priority
await emailQueue.add(
"password-reset",
{ to: "user@example.com", subject: "Reset password", templateId: "reset", vars: {} },
{ priority: 1 }
);
// Job with custom ID for deduplication
await processingQueue.add(
"sync-user",
{ userId },
{ jobId: `sync-user-${userId}` } // Same ID = job is not added if it already exists
);
return Response.json({ queued: true });
}
Repeatable (Cron) Jobs
// lib/queue/setup-repeatable.ts
import { emailQueue, processingQueue } from "./queues";
export async function setupRepeatableJobs() {
// Daily digest at 8 AM UTC
await emailQueue.add(
"daily-digest",
{ templateId: "digest" },
{
repeat: { pattern: "0 8 * * *" },
jobId: "daily-digest", // Ensures only one repeatable job with this config
}
);
// Process stale records every 15 minutes
await processingQueue.add(
"cleanup-stale",
{},
{
repeat: { every: 15 * 60 * 1000 },
jobId: "cleanup-stale",
}
);
// Remove outdated repeatable jobs
const repeatableJobs = await processingQueue.getRepeatableJobs();
for (const job of repeatableJobs) {
if (job.name === "old-deprecated-job") {
await processingQueue.removeRepeatableByKey(job.key);
}
}
}
Flow Producer for Job Dependencies
// lib/queue/flows.ts
import { FlowProducer } from "bullmq";
import { redisConnection } from "./connection";
const flowProducer = new FlowProducer({ connection: redisConnection });
export async function createOrderFlow(orderId: string) {
// Parent job waits for all children to complete before running
const flow = await flowProducer.add({
name: "finalize-order",
queueName: "processing",
data: { orderId },
children: [
{
name: "charge-payment",
queueName: "processing",
data: { orderId },
},
{
name: "reserve-inventory",
queueName: "processing",
data: { orderId },
},
{
name: "send-confirmation",
queueName: "email",
data: {
to: "customer@example.com",
subject: "Order confirmed",
templateId: "order-confirm",
vars: { orderId },
},
},
],
});
return flow;
}
Worker with Named Processors
// workers/processing-worker.ts
import { Worker, Job } from "bullmq";
import { redisConnection } from "../lib/queue/connection";
type ProcessingJobData =
| { type: "sync-user"; userId: string }
| { type: "generate-report"; orgId: string; month: string }
| { type: "cleanup-stale" };
const worker = new Worker<ProcessingJobData>(
"processing",
async (job: Job<ProcessingJobData>) => {
switch (job.name) {
case "sync-user":
return await syncUserToCRM(job.data as { type: "sync-user"; userId: string });
case "generate-report":
return await generateMonthlyReport(job.data as { type: "generate-report"; orgId: string; month: string });
case "cleanup-stale":
return await cleanupStaleRecords();
case "finalize-order":
// Access children results via getChildrenValues
const childResults = await job.getChildrenValues();
return await finalizeOrder(job.data, childResults);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
},
{
connection: redisConnection,
concurrency: 5,
}
);
Job Progress and Events
// Monitoring job progress from the API
import { Queue, QueueEvents } from "bullmq";
import { redisConnection } from "@/lib/queue/connection";
export async function getJobStatus(queueName: string, jobId: string) {
const queue = new Queue(queueName, { connection: redisConnection });
const job = await queue.getJob(jobId);
if (!job) return null;
return {
id: job.id,
name: job.name,
state: await job.getState(),
progress: job.progress,
returnValue: job.returnvalue,
failedReason: job.failedReason,
attemptsMade: job.attemptsMade,
timestamp: job.timestamp,
};
}
// Real-time events via QueueEvents
const queueEvents = new QueueEvents("processing", { connection: redisConnection });
queueEvents.on("completed", ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed`, returnvalue);
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
queueEvents.on("progress", ({ jobId, data }) => {
console.log(`Job ${jobId} progress: ${data}%`);
});
Bull Board Dashboard
// app/api/admin/queues/route.ts (Next.js integration with Bull Board)
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ExpressAdapter } from "@bull-board/express";
import { emailQueue, processingQueue } from "@/lib/queue/queues";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/api/admin/queues");
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(processingQueue),
],
serverAdapter,
});
// Mount the adapter with your framework's routing
Best Practices
- Set
maxRetriesPerRequest: nullon the Redis connection for workers. BullMQ requires this to block-wait for jobs correctly. - Use
removeOnCompleteandremoveOnFailto prevent Redis memory from growing unbounded with completed job data. - Assign job IDs for deduplication when the same logical job might be enqueued multiple times (e.g., user sync after multiple profile edits).
- Use FlowProducer for dependent jobs rather than manually triggering child jobs inside a worker. Flows give you built-in dependency resolution.
- Monitor with QueueEvents or Bull Board. Production queues need visibility into backlogs, failure rates, and processing times.
- Scale workers horizontally by running the same worker code on multiple processes or machines. Redis coordinates job distribution automatically.
- Use
job.updateProgressfor long-running jobs so the dashboard and API consumers can track completion percentage. - Close queues and workers gracefully on process shutdown to avoid stuck jobs.
Anti-Patterns
- Sharing a Redis connection between Queue and Worker. Workers need
maxRetriesPerRequest: null; other Redis clients may not. Use separate connection configs. - Storing large payloads in job data. Redis holds all job data in memory. Store large data in S3/database and pass a reference in the job.
- Not setting
removeOnComplete. Without cleanup, Redis accumulates completed job records indefinitely, leading to memory exhaustion. - Using BullMQ for real-time pub/sub. BullMQ is a job queue, not a message broker. Use Redis pub/sub or WebSockets for real-time communication.
- Ignoring the
failedevent. Unhandled job failures silently accumulate. Always log failures and set up alerts. - Running workers inside serverless functions. Workers need persistent processes. Deploy workers as long-running services, not in Lambda or Vercel Functions.
Install this skill directly: skilldb add background-jobs-services-skills
Related Skills
Faktory
"Faktory: polyglot background job server, language-agnostic workers, job priorities, retries, scheduled jobs, batches, middleware, Web UI"
Graphile Worker
"Graphile Worker: PostgreSQL-backed job queue, no Redis needed, cron jobs, batch jobs, task isolation, migrations, LISTEN/NOTIFY, Node.js"
Inngest
"Inngest: event-driven functions, durable workflows, step functions, retries, cron, fan-out, sleep, Next.js integration"
Quirrel
"Quirrel: job queue for serverless/edge, cron jobs, delayed jobs, repeat scheduling, Next.js/Remix/SvelteKit integration, type-safe queues"
Temporal
"Temporal: durable execution, workflows, activities, signals, queries, retries, timers, TypeScript SDK"
Trigger.dev
"Trigger.dev: background jobs for Next.js/Node, long-running tasks, integrations, retry, cron, dashboard, v3 SDK"