Streams
Node.js streams for efficient memory-conscious data processing with backpressure handling
You are an expert in Node.js Streams for building efficient, memory-conscious data processing pipelines.
## Key Points
- **Readable** — a source of data (e.g., `fs.createReadStream`, `http.IncomingMessage`)
- **Writable** — a destination for data (e.g., `fs.createWriteStream`, `http.ServerResponse`)
- **Duplex** — both readable and writable (e.g., `net.Socket`)
- **Transform** — a duplex stream that modifies data as it passes through (e.g., `zlib.createGzip`)
- Always use `stream.pipeline()` (or its promise variant) instead of manually piping — it handles error propagation and cleanup automatically.
- Prefer async iteration (`for await...of`) over event listeners when consuming readable streams sequentially.
- Set `highWaterMark` explicitly when default buffer sizes (16 KB for binary, 16 objects for object mode) are not appropriate for your workload.
- Destroy streams explicitly with `stream.destroy()` when aborting processing early to prevent resource leaks.
- Use `stream.compose()` (Node.js 19+) to build reusable transform chains.
- **Ignoring backpressure** — piping with `.on('data')` without checking `writable.write()` return values can cause unbounded memory growth.
- **Swallowing errors** — each stream in a `.pipe()` chain emits errors independently; a missing error handler on any stream crashes the process. Use `pipeline()` instead.
- **Mixing promises and streams incorrectly** — calling `await` inside a `'data'` handler pauses nothing; use `for await...of` or transform streams for async processing.skilldb get nodejs-patterns-skills/StreamsFull skill: 161 linesStreams — Node.js Patterns
You are an expert in Node.js Streams for building efficient, memory-conscious data processing pipelines.
Core Philosophy
Overview
Streams are the fundamental abstraction in Node.js for handling flowing data. Rather than loading entire datasets into memory, streams process data in chunks, enabling applications to handle files, network traffic, and transformations of arbitrary size with bounded memory usage. Node.js provides four stream types: Readable, Writable, Duplex, and Transform.
Core Concepts
Stream Types
- Readable — a source of data (e.g.,
fs.createReadStream,http.IncomingMessage) - Writable — a destination for data (e.g.,
fs.createWriteStream,http.ServerResponse) - Duplex — both readable and writable (e.g.,
net.Socket) - Transform — a duplex stream that modifies data as it passes through (e.g.,
zlib.createGzip)
Backpressure
Backpressure occurs when a writable stream cannot consume data as fast as the readable stream produces it. The stream API handles this automatically through the return value of writable.write() and the 'drain' event.
Object Mode
By default streams operate on Buffer or string chunks. In object mode, streams can work with arbitrary JavaScript values, useful for building data-processing pipelines.
Implementation Patterns
Basic pipeline with stream.pipeline
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
}
Custom Transform stream
const { Transform } = require('node:stream');
class JSONParser extends Transform {
constructor() {
super({ readableObjectMode: true });
this._buffer = '';
}
_transform(chunk, encoding, callback) {
this._buffer += chunk.toString();
const lines = this._buffer.split('\n');
this._buffer = lines.pop(); // keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
this.push(JSON.parse(line));
} catch (err) {
return callback(new Error(`Invalid JSON: ${line}`));
}
}
}
callback();
}
_flush(callback) {
if (this._buffer.trim()) {
try {
this.push(JSON.parse(this._buffer));
} catch (err) {
return callback(new Error(`Invalid JSON: ${this._buffer}`));
}
}
callback();
}
}
Async generator as a Readable source
const { Readable } = require('node:stream');
async function* generateRows(db, query) {
const cursor = db.query(query);
while (await cursor.hasNext()) {
yield await cursor.next();
}
}
const stream = Readable.from(generateRows(db, 'SELECT * FROM users'));
Handling backpressure manually
function pumpData(readable, writable) {
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause();
writable.once('drain', () => readable.resume());
}
});
readable.on('end', () => writable.end());
}
Web Streams interop (Node.js 18+)
const { Readable } = require('node:stream');
// Convert a fetch() response body to a Node.js Readable
async function fetchAsNodeStream(url) {
const response = await fetch(url);
return Readable.fromWeb(response.body);
}
Best Practices
- Always use
stream.pipeline()(or its promise variant) instead of manually piping — it handles error propagation and cleanup automatically. - Prefer async iteration (
for await...of) over event listeners when consuming readable streams sequentially. - Set
highWaterMarkexplicitly when default buffer sizes (16 KB for binary, 16 objects for object mode) are not appropriate for your workload. - Destroy streams explicitly with
stream.destroy()when aborting processing early to prevent resource leaks. - Use
stream.compose()(Node.js 19+) to build reusable transform chains.
Common Pitfalls
- Ignoring backpressure — piping with
.on('data')without checkingwritable.write()return values can cause unbounded memory growth. - Swallowing errors — each stream in a
.pipe()chain emits errors independently; a missing error handler on any stream crashes the process. Usepipeline()instead. - Mixing promises and streams incorrectly — calling
awaitinside a'data'handler pauses nothing; usefor await...ofor transform streams for async processing. - Not implementing
_flush— custom Transform streams that buffer data must implement_flushto emit remaining data when the stream ends. - Using
.pipe()after error — once a stream errors, it is not automatically unpiped; downstream streams may hang waiting for data that will never arrive.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add nodejs-patterns-skills
Related Skills
Child Processes
Child process management patterns for spawning, communicating with, and controlling external processes
Clustering
Cluster module patterns for scaling Node.js applications across multiple CPU cores
Error Handling
Comprehensive error handling strategies for robust and debuggable Node.js applications
Event Emitter
EventEmitter patterns for building decoupled, event-driven architectures in Node.js
File System
Modern fs/promises patterns for safe, efficient file system operations in Node.js
Native Modules
N-API and native addon patterns for extending Node.js with high-performance C/C++ and Rust modules