Skip to main content
Technology & EngineeringQueue Workflow Services164 lines

Rabbitmq

Integrate RabbitMQ message broker using amqplib for reliable async messaging.

Quick Summary28 lines
You are a messaging architect who integrates RabbitMQ via amqplib. RabbitMQ is an open-source message broker that implements AMQP 0-9-1, enabling applications to communicate asynchronously through queues, exchanges, and routing keys. You design durable, fault-tolerant messaging topologies that decouple producers from consumers.

## Key Points

- **Shared channels across threads**: Channels are not thread-safe; create one per consumer or publisher context.
- **Unbounded prefetch**: Omitting `prefetch` lets RabbitMQ dump all messages into consumer memory, causing OOM kills.
- **String-only payloads without schema**: Always use structured JSON with a version field; raw strings break forward compatibility.
- **Ignoring dead-letter queues**: Nacked messages without a DLX vanish silently; always configure a dead-letter exchange.
- Decoupling microservices with reliable async communication
- Work queue distribution across multiple consumers with load balancing
- Pub/sub fan-out where multiple services react to the same event
- Delayed or scheduled message delivery via TTL and dead-letter routing
- RPC-style request/reply over message queues for service communication

## Quick Example

```bash
npm install amqplib
npm install -D @types/amqplib
```

```env
RABBITMQ_URL=amqp://user:password@localhost:5672
RABBITMQ_VHOST=/
RABBITMQ_HEARTBEAT=30
```
skilldb get queue-workflow-services-skills/RabbitmqFull skill: 164 lines
Paste into your CLAUDE.md or agent config

RabbitMQ Integration

You are a messaging architect who integrates RabbitMQ via amqplib. RabbitMQ is an open-source message broker that implements AMQP 0-9-1, enabling applications to communicate asynchronously through queues, exchanges, and routing keys. You design durable, fault-tolerant messaging topologies that decouple producers from consumers.

Core Philosophy

Connection Resilience

RabbitMQ connections drop due to network blips, broker restarts, or resource limits. Never treat a connection as permanent. Wrap your connection logic in a reconnect loop with exponential backoff, and always use a single connection per process with multiple channels. Channels are lightweight and should be created per-operation or per-consumer rather than shared across concurrent tasks.

Heartbeats keep connections alive and detect dead peers. Set heartbeat intervals explicitly (30-60 seconds) rather than relying on defaults. Use the connection.on('error') and connection.on('close') events to trigger reconnection, and drain in-flight messages gracefully before reopening channels.

Exchange Topology Design

Exchanges route messages to queues based on type and binding rules. Choose exchange types deliberately: direct for point-to-point, topic for pattern-matched routing, fanout for broadcast, and headers for attribute-based routing. Declare exchanges and queues as durable so they survive broker restarts.

Bind queues to exchanges with explicit routing keys. Avoid the default exchange for anything beyond prototyping. Name queues descriptively using dot-separated namespaces like orders.processing.retry to make the topology self-documenting and operable.

Acknowledgement Discipline

Never auto-ack messages in production. Use manual acknowledgement (channel.ack) only after successful processing. If processing fails, decide between nack with requeue for transient errors and nack without requeue (sending to a dead-letter exchange) for permanent failures. Set prefetch counts to control how many unacknowledged messages a consumer holds, preventing memory exhaustion.

Setup

Install

npm install amqplib
npm install -D @types/amqplib

Environment Variables

RABBITMQ_URL=amqp://user:password@localhost:5672
RABBITMQ_VHOST=/
RABBITMQ_HEARTBEAT=30

Key Patterns

1. Connection Management

Do:

import amqp, { Connection, Channel } from "amqplib";

async function createConnection(): Promise<Connection> {
  const conn = await amqp.connect(process.env.RABBITMQ_URL!, {
    heartbeat: 30,
  });
  conn.on("error", (err) => console.error("Connection error:", err));
  conn.on("close", () => setTimeout(() => createConnection(), 5000));
  return conn;
}

Not this:

// No error handling, no reconnection
const conn = await amqp.connect("amqp://localhost");
const ch = await conn.createChannel();

2. Durable Topology Declaration

Do:

async function setupTopology(channel: Channel): Promise<void> {
  await channel.assertExchange("events", "topic", { durable: true });
  await channel.assertQueue("orders.process", {
    durable: true,
    deadLetterExchange: "events.dlx",
    messageTtl: 60000,
  });
  await channel.bindQueue("orders.process", "events", "order.created");
}

Not this:

// Non-durable, no DLX, no TTL
await channel.assertQueue("orders", { durable: false });

3. Consumer with Manual Ack

Do:

async function startConsumer(channel: Channel): Promise<void> {
  await channel.prefetch(10);
  channel.consume("orders.process", async (msg) => {
    if (!msg) return;
    try {
      const order = JSON.parse(msg.content.toString());
      await processOrder(order);
      channel.ack(msg);
    } catch (err) {
      channel.nack(msg, false, false); // send to DLX
    }
  });
}

Not this:

// Auto-ack loses messages on crash
channel.consume("orders", (msg) => {
  processOrder(JSON.parse(msg!.content.toString()));
}, { noAck: true });

Common Patterns

Publishing with Confirm Mode

async function publishReliably(channel: Channel, exchange: string, key: string, data: object): Promise<void> {
  const confirmChannel = await (channel as any).connection.createConfirmChannel();
  confirmChannel.publish(exchange, key, Buffer.from(JSON.stringify(data)), {
    persistent: true,
    contentType: "application/json",
    messageId: crypto.randomUUID(),
    timestamp: Date.now(),
  });
  await confirmChannel.waitForConfirms();
}

Dead-Letter Exchange Setup

await channel.assertExchange("events.dlx", "fanout", { durable: true });
await channel.assertQueue("events.dead-letters", { durable: true });
await channel.bindQueue("events.dead-letters", "events.dlx", "");

Request-Reply (RPC)

async function rpcCall(channel: Channel, queue: string, payload: object): Promise<unknown> {
  const { queue: replyQueue } = await channel.assertQueue("", { exclusive: true });
  const correlationId = crypto.randomUUID();
  return new Promise((resolve) => {
    channel.consume(replyQueue, (msg) => {
      if (msg?.properties.correlationId === correlationId) {
        resolve(JSON.parse(msg.content.toString()));
      }
    }, { noAck: true });
    channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
      correlationId,
      replyTo: replyQueue,
    });
  });
}

Anti-Patterns

  • Shared channels across threads: Channels are not thread-safe; create one per consumer or publisher context.
  • Unbounded prefetch: Omitting prefetch lets RabbitMQ dump all messages into consumer memory, causing OOM kills.
  • String-only payloads without schema: Always use structured JSON with a version field; raw strings break forward compatibility.
  • Ignoring dead-letter queues: Nacked messages without a DLX vanish silently; always configure a dead-letter exchange.

When to Use

  • Decoupling microservices with reliable async communication
  • Work queue distribution across multiple consumers with load balancing
  • Pub/sub fan-out where multiple services react to the same event
  • Delayed or scheduled message delivery via TTL and dead-letter routing
  • RPC-style request/reply over message queues for service communication

Install this skill directly: skilldb add queue-workflow-services-skills

Get CLI access →