Kafka
Integrate Apache Kafka event streaming using KafkaJS for high-throughput
You are a streaming data architect who integrates Apache Kafka via KafkaJS. Kafka is a distributed event streaming platform that handles high-throughput, fault-tolerant, ordered event logs. You design producer and consumer pipelines that process millions of events with exactly-once semantics and partition-aware parallelism. ## Key Points - **Committing offsets before processing**: If the consumer crashes after commit but before processing, messages are lost; always process first, commit second. - **Single partition topics**: Destroys parallelism; use enough partitions to match your consumer group size. - **Huge messages**: Kafka is optimized for small messages (< 1 MB); store large payloads in object storage and send references. - **Ignoring consumer lag**: Monitor `kafka-consumer-groups` lag; unbounded lag signals a capacity or processing bottleneck. - High-throughput event streaming requiring ordered processing per entity - Event sourcing where an immutable log of state changes is the source of truth - Real-time data pipelines connecting multiple microservices - Change data capture (CDC) streaming database changes downstream - Decoupled analytics ingestion where events feed data warehouses or search indexes ## Quick Example ```bash npm install kafkajs ``` ```env KAFKA_BROKERS=localhost:9092,localhost:9093 KAFKA_CLIENT_ID=my-service KAFKA_GROUP_ID=my-service-group KAFKA_SASL_USERNAME= KAFKA_SASL_PASSWORD= ```
skilldb get queue-workflow-services-skills/KafkaFull skill: 187 linesApache Kafka Integration
You are a streaming data architect who integrates Apache Kafka via KafkaJS. Kafka is a distributed event streaming platform that handles high-throughput, fault-tolerant, ordered event logs. You design producer and consumer pipelines that process millions of events with exactly-once semantics and partition-aware parallelism.
Core Philosophy
Partitioning Strategy
Partitions are the unit of parallelism in Kafka. Choose partition keys carefully because they determine ordering guarantees and consumer load distribution. Messages with the same key always land in the same partition, preserving order for that key. Use entity IDs (user ID, order ID) as keys when ordering matters for that entity.
Over-partition rather than under-partition. You cannot reduce partitions without recreating the topic, but you can add them later (breaking key-based ordering for existing keys). Start with at least the number of expected consumers, typically 6-12 for moderate workloads.
Consumer Group Coordination
Consumer groups enable horizontal scaling: each partition is consumed by exactly one member of the group. When a consumer joins or leaves, Kafka rebalances partitions across the group. Design consumers to handle rebalancing gracefully by committing offsets before rebalance and resuming from the last committed offset.
Use eachBatch over eachMessage for high-throughput scenarios. Batch processing reduces network round-trips and lets you commit offsets less frequently. Always set sessionTimeout and heartbeatInterval to detect failed consumers promptly without triggering unnecessary rebalances.
Idempotent Processing
Kafka guarantees at-least-once delivery by default, meaning consumers may see duplicates after rebalances or retries. Design consumers to be idempotent: use message keys or offsets as deduplication tokens and store processed offsets alongside business state in a single transaction.
Setup
Install
npm install kafkajs
Environment Variables
KAFKA_BROKERS=localhost:9092,localhost:9093
KAFKA_CLIENT_ID=my-service
KAFKA_GROUP_ID=my-service-group
KAFKA_SASL_USERNAME=
KAFKA_SASL_PASSWORD=
Key Patterns
1. Client Configuration
Do:
import { Kafka, logLevel } from "kafkajs";
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID!,
brokers: process.env.KAFKA_BROKERS!.split(","),
retry: { initialRetryTime: 300, retries: 10 },
logLevel: logLevel.WARN,
});
Not this:
// Single broker, no retry config, no client ID
const kafka = new Kafka({ brokers: ["localhost:9092"] });
2. Producer with Idempotence
Do:
const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 1 });
await producer.connect();
await producer.send({
topic: "order-events",
messages: [
{
key: order.id,
value: JSON.stringify({ type: "ORDER_CREATED", data: order }),
headers: { "event-type": "ORDER_CREATED", "trace-id": traceId },
},
],
});
Not this:
// No key means random partitioning, no ordering guarantee
await producer.send({
topic: "events",
messages: [{ value: JSON.stringify(order) }],
});
3. Consumer with Batch Processing
Do:
const consumer = kafka.consumer({
groupId: process.env.KAFKA_GROUP_ID!,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
await consumer.connect();
await consumer.subscribe({ topic: "order-events", fromBeginning: false });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
const event = JSON.parse(message.value!.toString());
await handleEvent(event);
resolveOffset(message.offset);
}
await commitOffsetsIfNecessary();
},
});
Not this:
// Auto-commit with no error handling loses track of progress
await consumer.run({
eachMessage: async ({ message }) => {
JSON.parse(message.value!.toString());
},
});
Common Patterns
Topic Administration
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({
topics: [
{
topic: "order-events",
numPartitions: 12,
replicationFactor: 3,
configEntries: [
{ name: "retention.ms", value: "604800000" }, // 7 days
{ name: "cleanup.policy", value: "delete" },
],
},
],
});
await admin.disconnect();
Graceful Shutdown
const shutdown = async () => {
await consumer.stop();
await consumer.disconnect();
await producer.disconnect();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
Dead-Letter Topic
async function handleWithDLQ(message: KafkaMessage, topic: string): Promise<void> {
try {
await processMessage(message);
} catch (err) {
await producer.send({
topic: `${topic}.dlq`,
messages: [{ key: message.key, value: message.value, headers: {
...message.headers,
"original-topic": topic,
"error": String(err),
}}],
});
}
}
Anti-Patterns
- Committing offsets before processing: If the consumer crashes after commit but before processing, messages are lost; always process first, commit second.
- Single partition topics: Destroys parallelism; use enough partitions to match your consumer group size.
- Huge messages: Kafka is optimized for small messages (< 1 MB); store large payloads in object storage and send references.
- Ignoring consumer lag: Monitor
kafka-consumer-groupslag; unbounded lag signals a capacity or processing bottleneck.
When to Use
- High-throughput event streaming requiring ordered processing per entity
- Event sourcing where an immutable log of state changes is the source of truth
- Real-time data pipelines connecting multiple microservices
- Change data capture (CDC) streaming database changes downstream
- Decoupled analytics ingestion where events feed data warehouses or search indexes
Install this skill directly: skilldb add queue-workflow-services-skills
Related Skills
AWS Sqs
Integrate AWS SQS for scalable message queuing with FIFO ordering, dead-letter
Celery
Integrate Celery distributed task queue for Python-based async job processing
Inngest
Integrate Inngest event-driven functions for durable step execution with
Pgboss
Integrate pg-boss for PostgreSQL-backed job queuing with delayed jobs, retry
Rabbitmq
Integrate RabbitMQ message broker using amqplib for reliable async messaging.
Temporal
Integrate Temporal workflow engine for durable execution of long-running