Streams
Redis Streams for durable event processing with consumer groups
You are an expert in Redis Streams for building durable, ordered event processing pipelines with consumer groups. ## Key Points - **Use `MAXLEN ~` to cap stream size.** The `~` allows Redis to trim efficiently in blocks rather than to the exact count. Without trimming, streams grow unbounded. - **Acknowledge messages promptly.** Unacknowledged messages accumulate in the PEL, consuming memory and complicating recovery. - **Run a reclaimer process.** Use `XAUTOCLAIM` or `XPENDING` + `XCLAIM` periodically to pick up messages from crashed consumers. - **Use meaningful consumer names** (e.g., hostname or PID) so `XINFO CONSUMERS` output is actionable during debugging. - **Prefer consumer groups over raw `XREAD`** unless you specifically need each consumer to see every message. - **Forgetting `MKSTREAM` on group creation.** Without it, `XGROUP CREATE` fails if the stream does not exist yet. - **Using `$` as the group start ID in production.** This skips all messages produced before the group was created. Use `0` to process the full backlog, or a specific ID to start from a known point. - **Not handling the `BUSYGROUP` error.** Creating a group that already exists throws an error. Catch and ignore it for idempotent deployments. - **Reading with `>` and never retrying the PEL.** The `>` ID only delivers new messages. Pending messages from crashed consumers require explicit `XAUTOCLAIM` or `XCLAIM`. - **Trimming too aggressively.** If consumers lag behind the trim point, they lose messages. Monitor `last-delivered-id` against the stream's oldest entry.
skilldb get redis-skills/StreamsFull skill: 184 linesStreams — Redis
You are an expert in Redis Streams for building durable, ordered event processing pipelines with consumer groups.
Core Philosophy
Overview
Redis Streams is an append-only log data structure introduced in Redis 5.0. Unlike Pub/Sub, Streams persist messages, support consumer groups for load-balanced processing, and provide message acknowledgment. This makes Streams suitable for event sourcing, task queues, and any scenario requiring at-least-once delivery semantics.
Core Concepts
Stream Entries
Each entry has an auto-generated ID (timestamp-sequence, e.g., 1679012345678-0) and a set of field-value pairs. Entries are immutable once written and ordered by ID.
Consumer Groups
A consumer group tracks which messages have been delivered to its members. Multiple consumers in a group share the workload; each message is delivered to exactly one consumer in the group. Multiple groups on the same stream each receive all messages independently.
Pending Entries List (PEL)
When a consumer reads a message via XREADGROUP, it enters the PEL. The message stays pending until the consumer acknowledges it with XACK. This enables retry logic for failed processing.
Message ID
The default * ID lets Redis generate a timestamp-based ID. Custom IDs are allowed but must be monotonically increasing. The special ID $ means "only new messages from now on" and 0 means "from the beginning."
Implementation Patterns
Producing events
import Redis from "ioredis";
const redis = new Redis();
// Add an event to a stream
const id = await redis.xadd("stream:orders", "*",
"orderId", "abc-123",
"userId", "user:42",
"total", "59.99",
"status", "created"
);
console.log(`Event added with ID: ${id}`);
// Cap the stream to approximately 10,000 entries
await redis.xadd("stream:orders", "MAXLEN", "~", "10000", "*",
"orderId", "abc-124",
"userId", "user:43",
"total", "120.00",
"status", "created"
);
Creating a consumer group
// Create a group starting from the latest message
try {
await redis.xgroup("CREATE", "stream:orders", "order-processors", "$", "MKSTREAM");
} catch (err: any) {
if (!err.message.includes("BUSYGROUP")) throw err;
// Group already exists — safe to ignore
}
Consuming with a group
const CONSUMER_NAME = `worker-${process.pid}`;
const GROUP = "order-processors";
const STREAM = "stream:orders";
async function consumeLoop() {
while (true) {
// Read up to 10 new messages, block for 5 seconds if none available
const results = await redis.xreadgroup(
"GROUP", GROUP, CONSUMER_NAME,
"COUNT", "10",
"BLOCK", "5000",
"STREAMS", STREAM, ">"
);
if (!results) continue; // Timeout, no new messages
for (const [streamName, entries] of results) {
for (const [id, fields] of entries) {
const event = fieldsToObject(fields);
try {
await processOrder(event);
await redis.xack(STREAM, GROUP, id);
} catch (err) {
console.error(`Failed to process ${id}, will retry`, err);
// Message stays in PEL for later retry
}
}
}
}
}
function fieldsToObject(fields: string[]): Record<string, string> {
const obj: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1];
}
return obj;
}
Claiming and retrying stale messages
// Claim messages that have been pending for more than 60 seconds
async function reclaimStale() {
const staleMessages = await redis.xautoclaim(
STREAM, GROUP, CONSUMER_NAME,
60000, // min idle time in ms
"0-0", // start scanning from beginning of PEL
"COUNT", "20"
);
// staleMessages = [nextStartId, [[id, fields], ...], deletedIds]
const [nextId, entries] = staleMessages;
for (const [id, fields] of entries) {
const event = fieldsToObject(fields);
try {
await processOrder(event);
await redis.xack(STREAM, GROUP, id);
} catch (err) {
console.error(`Retry also failed for ${id}`, err);
}
}
}
Monitoring stream and group health
// Stream info
const info = await redis.xinfo("STREAM", STREAM);
// Returns: length, first-entry, last-entry, etc.
// Consumer group info
const groups = await redis.xinfo("GROUPS", STREAM);
// Returns per group: name, consumers, pending, last-delivered-id
// Per-consumer pending counts
const consumers = await redis.xinfo("CONSUMERS", STREAM, GROUP);
// Returns per consumer: name, pending, idle time
Best Practices
- Use
MAXLEN ~to cap stream size. The~allows Redis to trim efficiently in blocks rather than to the exact count. Without trimming, streams grow unbounded. - Acknowledge messages promptly. Unacknowledged messages accumulate in the PEL, consuming memory and complicating recovery.
- Run a reclaimer process. Use
XAUTOCLAIMorXPENDING+XCLAIMperiodically to pick up messages from crashed consumers. - Use meaningful consumer names (e.g., hostname or PID) so
XINFO CONSUMERSoutput is actionable during debugging. - Prefer consumer groups over raw
XREADunless you specifically need each consumer to see every message.
Common Pitfalls
- Forgetting
MKSTREAMon group creation. Without it,XGROUP CREATEfails if the stream does not exist yet. - Using
$as the group start ID in production. This skips all messages produced before the group was created. Use0to process the full backlog, or a specific ID to start from a known point. - Not handling the
BUSYGROUPerror. Creating a group that already exists throws an error. Catch and ignore it for idempotent deployments. - Reading with
>and never retrying the PEL. The>ID only delivers new messages. Pending messages from crashed consumers require explicitXAUTOCLAIMorXCLAIM. - Trimming too aggressively. If consumers lag behind the trim point, they lose messages. Monitor
last-delivered-idagainst the stream's oldest entry.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add redis-skills
Related Skills
Caching Patterns
Cache-aside, write-through, and write-behind caching strategies with Redis
Data Structures
Redis core data structures including strings, hashes, sets, sorted sets, and lists
Lua Scripting
Lua scripting in Redis for atomic multi-step operations
Pub Sub
Redis Pub/Sub messaging patterns for real-time event broadcasting
Sentinel Cluster
Redis Sentinel and Cluster configurations for high availability and horizontal scaling
Adversarial Code Review
Adversarial implementation review methodology that validates code completeness against requirements with fresh objectivity. Uses a coach-player dialectical loop to catch real gaps in security, logic, and data flow.