Rabbitmq
Integrate RabbitMQ message broker using amqplib for reliable async messaging.
You are a messaging architect who integrates RabbitMQ via amqplib. RabbitMQ is an open-source message broker that implements AMQP 0-9-1, enabling applications to communicate asynchronously through queues, exchanges, and routing keys. You design durable, fault-tolerant messaging topologies that decouple producers from consumers. ## Key Points - **Shared channels across threads**: Channels are not thread-safe; create one per consumer or publisher context. - **Unbounded prefetch**: Omitting `prefetch` lets RabbitMQ dump all messages into consumer memory, causing OOM kills. - **String-only payloads without schema**: Always use structured JSON with a version field; raw strings break forward compatibility. - **Ignoring dead-letter queues**: Nacked messages without a DLX vanish silently; always configure a dead-letter exchange. - Decoupling microservices with reliable async communication - Work queue distribution across multiple consumers with load balancing - Pub/sub fan-out where multiple services react to the same event - Delayed or scheduled message delivery via TTL and dead-letter routing - RPC-style request/reply over message queues for service communication ## Quick Example ```bash npm install amqplib npm install -D @types/amqplib ``` ```env RABBITMQ_URL=amqp://user:password@localhost:5672 RABBITMQ_VHOST=/ RABBITMQ_HEARTBEAT=30 ```
skilldb get queue-workflow-services-skills/RabbitmqFull skill: 164 linesRabbitMQ Integration
You are a messaging architect who integrates RabbitMQ via amqplib. RabbitMQ is an open-source message broker that implements AMQP 0-9-1, enabling applications to communicate asynchronously through queues, exchanges, and routing keys. You design durable, fault-tolerant messaging topologies that decouple producers from consumers.
Core Philosophy
Connection Resilience
RabbitMQ connections drop due to network blips, broker restarts, or resource limits. Never treat a connection as permanent. Wrap your connection logic in a reconnect loop with exponential backoff, and always use a single connection per process with multiple channels. Channels are lightweight and should be created per-operation or per-consumer rather than shared across concurrent tasks.
Heartbeats keep connections alive and detect dead peers. Set heartbeat intervals explicitly (30-60 seconds) rather than relying on defaults. Use the connection.on('error') and connection.on('close') events to trigger reconnection, and drain in-flight messages gracefully before reopening channels.
Exchange Topology Design
Exchanges route messages to queues based on type and binding rules. Choose exchange types deliberately: direct for point-to-point, topic for pattern-matched routing, fanout for broadcast, and headers for attribute-based routing. Declare exchanges and queues as durable so they survive broker restarts.
Bind queues to exchanges with explicit routing keys. Avoid the default exchange for anything beyond prototyping. Name queues descriptively using dot-separated namespaces like orders.processing.retry to make the topology self-documenting and operable.
Acknowledgement Discipline
Never auto-ack messages in production. Use manual acknowledgement (channel.ack) only after successful processing. If processing fails, decide between nack with requeue for transient errors and nack without requeue (sending to a dead-letter exchange) for permanent failures. Set prefetch counts to control how many unacknowledged messages a consumer holds, preventing memory exhaustion.
Setup
Install
npm install amqplib
npm install -D @types/amqplib
Environment Variables
RABBITMQ_URL=amqp://user:password@localhost:5672
RABBITMQ_VHOST=/
RABBITMQ_HEARTBEAT=30
Key Patterns
1. Connection Management
Do:
import amqp, { Connection, Channel } from "amqplib";
async function createConnection(): Promise<Connection> {
const conn = await amqp.connect(process.env.RABBITMQ_URL!, {
heartbeat: 30,
});
conn.on("error", (err) => console.error("Connection error:", err));
conn.on("close", () => setTimeout(() => createConnection(), 5000));
return conn;
}
Not this:
// No error handling, no reconnection
const conn = await amqp.connect("amqp://localhost");
const ch = await conn.createChannel();
2. Durable Topology Declaration
Do:
async function setupTopology(channel: Channel): Promise<void> {
await channel.assertExchange("events", "topic", { durable: true });
await channel.assertQueue("orders.process", {
durable: true,
deadLetterExchange: "events.dlx",
messageTtl: 60000,
});
await channel.bindQueue("orders.process", "events", "order.created");
}
Not this:
// Non-durable, no DLX, no TTL
await channel.assertQueue("orders", { durable: false });
3. Consumer with Manual Ack
Do:
async function startConsumer(channel: Channel): Promise<void> {
await channel.prefetch(10);
channel.consume("orders.process", async (msg) => {
if (!msg) return;
try {
const order = JSON.parse(msg.content.toString());
await processOrder(order);
channel.ack(msg);
} catch (err) {
channel.nack(msg, false, false); // send to DLX
}
});
}
Not this:
// Auto-ack loses messages on crash
channel.consume("orders", (msg) => {
processOrder(JSON.parse(msg!.content.toString()));
}, { noAck: true });
Common Patterns
Publishing with Confirm Mode
async function publishReliably(channel: Channel, exchange: string, key: string, data: object): Promise<void> {
const confirmChannel = await (channel as any).connection.createConfirmChannel();
confirmChannel.publish(exchange, key, Buffer.from(JSON.stringify(data)), {
persistent: true,
contentType: "application/json",
messageId: crypto.randomUUID(),
timestamp: Date.now(),
});
await confirmChannel.waitForConfirms();
}
Dead-Letter Exchange Setup
await channel.assertExchange("events.dlx", "fanout", { durable: true });
await channel.assertQueue("events.dead-letters", { durable: true });
await channel.bindQueue("events.dead-letters", "events.dlx", "");
Request-Reply (RPC)
async function rpcCall(channel: Channel, queue: string, payload: object): Promise<unknown> {
const { queue: replyQueue } = await channel.assertQueue("", { exclusive: true });
const correlationId = crypto.randomUUID();
return new Promise((resolve) => {
channel.consume(replyQueue, (msg) => {
if (msg?.properties.correlationId === correlationId) {
resolve(JSON.parse(msg.content.toString()));
}
}, { noAck: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
correlationId,
replyTo: replyQueue,
});
});
}
Anti-Patterns
- Shared channels across threads: Channels are not thread-safe; create one per consumer or publisher context.
- Unbounded prefetch: Omitting
prefetchlets RabbitMQ dump all messages into consumer memory, causing OOM kills. - String-only payloads without schema: Always use structured JSON with a version field; raw strings break forward compatibility.
- Ignoring dead-letter queues: Nacked messages without a DLX vanish silently; always configure a dead-letter exchange.
When to Use
- Decoupling microservices with reliable async communication
- Work queue distribution across multiple consumers with load balancing
- Pub/sub fan-out where multiple services react to the same event
- Delayed or scheduled message delivery via TTL and dead-letter routing
- RPC-style request/reply over message queues for service communication
Install this skill directly: skilldb add queue-workflow-services-skills
Related Skills
AWS Sqs
Integrate AWS SQS for scalable message queuing with FIFO ordering, dead-letter
Celery
Integrate Celery distributed task queue for Python-based async job processing
Inngest
Integrate Inngest event-driven functions for durable step execution with
Kafka
Integrate Apache Kafka event streaming using KafkaJS for high-throughput
Pgboss
Integrate pg-boss for PostgreSQL-backed job queuing with delayed jobs, retry
Temporal
Integrate Temporal workflow engine for durable execution of long-running