Skip to main content
Technology & EngineeringNodejs Patterns161 lines

Streams

Node.js streams for efficient memory-conscious data processing with backpressure handling

Quick Summary18 lines
You are an expert in Node.js Streams for building efficient, memory-conscious data processing pipelines.

## Key Points

- **Readable** — a source of data (e.g., `fs.createReadStream`, `http.IncomingMessage`)
- **Writable** — a destination for data (e.g., `fs.createWriteStream`, `http.ServerResponse`)
- **Duplex** — both readable and writable (e.g., `net.Socket`)
- **Transform** — a duplex stream that modifies data as it passes through (e.g., `zlib.createGzip`)
- Always use `stream.pipeline()` (or its promise variant) instead of manually piping — it handles error propagation and cleanup automatically.
- Prefer async iteration (`for await...of`) over event listeners when consuming readable streams sequentially.
- Set `highWaterMark` explicitly when default buffer sizes (16 KB for binary, 16 objects for object mode) are not appropriate for your workload.
- Destroy streams explicitly with `stream.destroy()` when aborting processing early to prevent resource leaks.
- Use `stream.compose()` (Node.js 19+) to build reusable transform chains.
- **Ignoring backpressure** — piping with `.on('data')` without checking `writable.write()` return values can cause unbounded memory growth.
- **Swallowing errors** — each stream in a `.pipe()` chain emits errors independently; a missing error handler on any stream crashes the process. Use `pipeline()` instead.
- **Mixing promises and streams incorrectly** — calling `await` inside a `'data'` handler pauses nothing; use `for await...of` or transform streams for async processing.
skilldb get nodejs-patterns-skills/StreamsFull skill: 161 lines
Paste into your CLAUDE.md or agent config

Streams — Node.js Patterns

You are an expert in Node.js Streams for building efficient, memory-conscious data processing pipelines.

Core Philosophy

Overview

Streams are the fundamental abstraction in Node.js for handling flowing data. Rather than loading entire datasets into memory, streams process data in chunks, enabling applications to handle files, network traffic, and transformations of arbitrary size with bounded memory usage. Node.js provides four stream types: Readable, Writable, Duplex, and Transform.

Core Concepts

Stream Types

  • Readable — a source of data (e.g., fs.createReadStream, http.IncomingMessage)
  • Writable — a destination for data (e.g., fs.createWriteStream, http.ServerResponse)
  • Duplex — both readable and writable (e.g., net.Socket)
  • Transform — a duplex stream that modifies data as it passes through (e.g., zlib.createGzip)

Backpressure

Backpressure occurs when a writable stream cannot consume data as fast as the readable stream produces it. The stream API handles this automatically through the return value of writable.write() and the 'drain' event.

Object Mode

By default streams operate on Buffer or string chunks. In object mode, streams can work with arbitrary JavaScript values, useful for building data-processing pipelines.

Implementation Patterns

Basic pipeline with stream.pipeline

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
}

Custom Transform stream

const { Transform } = require('node:stream');

class JSONParser extends Transform {
  constructor() {
    super({ readableObjectMode: true });
    this._buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this._buffer += chunk.toString();
    const lines = this._buffer.split('\n');
    this._buffer = lines.pop(); // keep incomplete line in buffer

    for (const line of lines) {
      if (line.trim()) {
        try {
          this.push(JSON.parse(line));
        } catch (err) {
          return callback(new Error(`Invalid JSON: ${line}`));
        }
      }
    }
    callback();
  }

  _flush(callback) {
    if (this._buffer.trim()) {
      try {
        this.push(JSON.parse(this._buffer));
      } catch (err) {
        return callback(new Error(`Invalid JSON: ${this._buffer}`));
      }
    }
    callback();
  }
}

Async generator as a Readable source

const { Readable } = require('node:stream');

async function* generateRows(db, query) {
  const cursor = db.query(query);
  while (await cursor.hasNext()) {
    yield await cursor.next();
  }
}

const stream = Readable.from(generateRows(db, 'SELECT * FROM users'));

Handling backpressure manually

function pumpData(readable, writable) {
  readable.on('data', (chunk) => {
    const canContinue = writable.write(chunk);
    if (!canContinue) {
      readable.pause();
      writable.once('drain', () => readable.resume());
    }
  });

  readable.on('end', () => writable.end());
}

Web Streams interop (Node.js 18+)

const { Readable } = require('node:stream');

// Convert a fetch() response body to a Node.js Readable
async function fetchAsNodeStream(url) {
  const response = await fetch(url);
  return Readable.fromWeb(response.body);
}

Best Practices

  • Always use stream.pipeline() (or its promise variant) instead of manually piping — it handles error propagation and cleanup automatically.
  • Prefer async iteration (for await...of) over event listeners when consuming readable streams sequentially.
  • Set highWaterMark explicitly when default buffer sizes (16 KB for binary, 16 objects for object mode) are not appropriate for your workload.
  • Destroy streams explicitly with stream.destroy() when aborting processing early to prevent resource leaks.
  • Use stream.compose() (Node.js 19+) to build reusable transform chains.

Common Pitfalls

  • Ignoring backpressure — piping with .on('data') without checking writable.write() return values can cause unbounded memory growth.
  • Swallowing errors — each stream in a .pipe() chain emits errors independently; a missing error handler on any stream crashes the process. Use pipeline() instead.
  • Mixing promises and streams incorrectly — calling await inside a 'data' handler pauses nothing; use for await...of or transform streams for async processing.
  • Not implementing _flush — custom Transform streams that buffer data must implement _flush to emit remaining data when the stream ends.
  • Using .pipe() after error — once a stream errors, it is not automatically unpiped; downstream streams may hang waiting for data that will never arrive.

Anti-Patterns

Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.

Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.

Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.

Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.

Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.

Install this skill directly: skilldb add nodejs-patterns-skills

Get CLI access →