Skip to main content
Technology & EngineeringLlm Integration350 lines

Streaming

Streaming LLM responses with SSE, WebSockets, and backpressure handling

Quick Summary28 lines
You are an expert in streaming LLM responses using Server-Sent Events, WebSockets, and incremental rendering in applications.

## Key Points

- Always set `Content-Type: text/event-stream` and `Cache-Control: no-cache` headers for SSE endpoints.
- Use `AbortController` to let users cancel in-progress streams and avoid wasting API credits.
- Buffer partial SSE lines on the client; chunks may split across `reader.read()` calls.
- Detect client disconnects server-side and abort the upstream LLM stream to save tokens.
- Use `ReadableStream` in edge runtimes (Next.js, Cloudflare Workers) instead of `res.write()`.
- Render streamed text with `whitespace-pre-wrap` to preserve formatting during generation.
- Accumulate the full response alongside streaming for logging, caching, or database storage.
- Not buffering partial SSE lines, causing `JSON.parse` errors when a chunk splits mid-event.
- Forgetting to check for `data: [DONE]` before parsing, which crashes on the termination signal.
- Using `EventSource` API for POST requests; `EventSource` only supports GET. Use `fetch` with `ReadableStream` instead.
- Not flushing response data in Node.js when behind a reverse proxy (nginx requires `X-Accel-Buffering: no`).
- Setting React state on every token with a new string concatenation; this is correct with `setOutput(prev => prev + token)` but wrong with stale closure references.

## Quick Example

```
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"Hello"}}]}

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" world"}}]}

data: [DONE]
```
skilldb get llm-integration-skills/StreamingFull skill: 350 lines
Paste into your CLAUDE.md or agent config

Streaming — LLM Integration

You are an expert in streaming LLM responses using Server-Sent Events, WebSockets, and incremental rendering in applications.

Overview

Streaming delivers LLM-generated tokens to users as they are produced, reducing perceived latency from seconds to milliseconds for the first visible token. Most LLM APIs support streaming via Server-Sent Events (SSE). Implementing streaming correctly requires handling chunked responses, parsing SSE protocols, managing connection lifecycle, and rendering partial content in the UI.

Core Concepts

Server-Sent Events (SSE) Protocol

SSE is a unidirectional protocol where the server pushes events to the client over a single HTTP connection:

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"Hello"}}]}

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" world"}}]}

data: [DONE]

Each event is prefixed with data: , separated by double newlines, and the stream ends with data: [DONE].

OpenAI Streaming

import OpenAI from "openai";

const openai = new OpenAI();

async function streamChat(prompt: string): Promise<string> {
  const stream = await openai.chat.completions.create({
    model: "gpt-4o",
    messages: [{ role: "user", content: prompt }],
    stream: true,
  });

  let fullText = "";
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content ?? "";
    fullText += content;
    process.stdout.write(content);
  }
  return fullText;
}

Anthropic Streaming

import Anthropic from "@anthropic-ai/sdk";

const anthropic = new Anthropic();

async function streamClaude(prompt: string): Promise<string> {
  let fullText = "";

  const stream = anthropic.messages.stream({
    model: "claude-sonnet-4-20250514",
    max_tokens: 1024,
    messages: [{ role: "user", content: prompt }],
  });

  stream.on("text", (text) => {
    fullText += text;
    process.stdout.write(text);
  });

  await stream.finalMessage();
  return fullText;
}

Implementation Patterns

Express SSE Endpoint

import express from "express";
import OpenAI from "openai";

const app = express();
const openai = new OpenAI();

app.post("/api/chat", async (req, res) => {
  const { messages } = req.body;

  // Set SSE headers
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const stream = await openai.chat.completions.create({
    model: "gpt-4o",
    messages,
    stream: true,
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      res.write(`data: ${JSON.stringify({ content })}\n\n`);
    }
  }

  res.write("data: [DONE]\n\n");
  res.end();
});

Next.js App Router with ReadableStream

// app/api/stream/route.ts
import OpenAI from "openai";

const openai = new OpenAI();

export async function POST(req: Request) {
  const { prompt } = await req.json();

  const stream = await openai.chat.completions.create({
    model: "gpt-4o",
    messages: [{ role: "user", content: prompt }],
    stream: true,
  });

  const encoder = new TextEncoder();

  const readable = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`));
        }
      }
      controller.enqueue(encoder.encode("data: [DONE]\n\n"));
      controller.close();
    },
  });

  return new Response(readable, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
    },
  });
}

Browser Client: Consuming SSE with fetch

async function streamFromAPI(prompt: string, onToken: (token: string) => void): Promise<string> {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages: [{ role: "user", content: prompt }] }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let fullText = "";
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    const lines = buffer.split("\n");
    buffer = lines.pop() ?? "";

    for (const line of lines) {
      if (line.startsWith("data: ") && line !== "data: [DONE]") {
        const data = JSON.parse(line.slice(6));
        if (data.content) {
          fullText += data.content;
          onToken(data.content);
        }
      }
    }
  }

  return fullText;
}

React Streaming Component

"use client";

import { useState, useCallback } from "react";

export function StreamingChat() {
  const [output, setOutput] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);

  const handleStream = useCallback(async (prompt: string) => {
    setOutput("");
    setIsStreaming(true);

    await streamFromAPI(prompt, (token) => {
      setOutput((prev) => prev + token);
    });

    setIsStreaming(false);
  }, []);

  return (
    <div>
      <button onClick={() => handleStream("Explain React hooks")} disabled={isStreaming}>
        {isStreaming ? "Streaming..." : "Ask"}
      </button>
      <div className="whitespace-pre-wrap">{output}</div>
    </div>
  );
}

Abort / Cancel Streaming

const controller = new AbortController();

// Start streaming
const response = await fetch("/api/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ prompt }),
  signal: controller.signal,
});

// Cancel from UI
function handleCancel() {
  controller.abort();
}

Server-Side: Detecting Client Disconnect

app.post("/api/chat", async (req, res) => {
  let aborted = false;
  req.on("close", () => {
    aborted = true;
  });

  res.setHeader("Content-Type", "text/event-stream");

  const stream = await openai.chat.completions.create({
    model: "gpt-4o",
    messages: req.body.messages,
    stream: true,
  });

  for await (const chunk of stream) {
    if (aborted) {
      stream.controller.abort();
      break;
    }
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      res.write(`data: ${JSON.stringify({ content })}\n\n`);
    }
  }

  res.end();
});

WebSocket Alternative

import { WebSocketServer } from "ws";

const wss = new WebSocketServer({ port: 8080 });

wss.on("connection", (ws) => {
  ws.on("message", async (data) => {
    const { prompt } = JSON.parse(data.toString());

    const stream = await openai.chat.completions.create({
      model: "gpt-4o",
      messages: [{ role: "user", content: prompt }],
      stream: true,
    });

    for await (const chunk of stream) {
      if (ws.readyState !== ws.OPEN) break;
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        ws.send(JSON.stringify({ type: "token", content }));
      }
    }

    ws.send(JSON.stringify({ type: "done" }));
  });
});

Best Practices

  • Always set Content-Type: text/event-stream and Cache-Control: no-cache headers for SSE endpoints.
  • Use AbortController to let users cancel in-progress streams and avoid wasting API credits.
  • Buffer partial SSE lines on the client; chunks may split across reader.read() calls.
  • Detect client disconnects server-side and abort the upstream LLM stream to save tokens.
  • Use ReadableStream in edge runtimes (Next.js, Cloudflare Workers) instead of res.write().
  • Render streamed text with whitespace-pre-wrap to preserve formatting during generation.
  • Accumulate the full response alongside streaming for logging, caching, or database storage.

Core Philosophy

Streaming is not an optimization -- it is a user experience requirement. When an LLM takes 3-5 seconds to generate a full response, showing nothing during that time feels like a broken application. Streaming delivers the first token in hundreds of milliseconds, turning a perceived hang into a fluid writing experience. For any user-facing LLM integration, the question is not whether to stream but how to implement it correctly.

The SSE protocol is simple but unforgiving. Each event is a line prefixed with data: , events are separated by double newlines, and the stream ends with data: [DONE]. This simplicity breaks down at the edges: TCP chunks can split mid-event, proxies can buffer responses, and network interruptions can leave the stream in an indeterminate state. Robust streaming code must buffer partial lines, handle incomplete events, detect disconnections, and implement timeouts. Treating SSE parsing as trivial is the source of the most common streaming bugs.

Resource management is the hidden complexity of streaming. Every active stream consumes a connection, server memory for buffering, and API credits for token generation. If the user navigates away or cancels the request, the server-side stream should be aborted to stop generating tokens that nobody will read. If the server does not detect client disconnection, it continues generating and billing for a response that has been abandoned. Proper cleanup -- abort controllers on the client, disconnect detection on the server -- is not a nice-to-have; it is a cost control mechanism.

Anti-Patterns

  • Not buffering partial SSE lines on the client: Assuming that each reader.read() call returns exactly one complete SSE event. TCP chunks can split an event across multiple reads, and calling JSON.parse on a partial line produces an error that crashes the stream handler.

  • No client disconnect detection on the server: Continuing to stream tokens from the LLM API after the client has disconnected. This wastes API credits and server resources generating a response that nobody will receive. Listen for the request close event and abort the upstream stream.

  • Using EventSource for POST requests: Attempting to use the browser's EventSource API to consume an SSE endpoint that requires a POST body. EventSource only supports GET requests. Use fetch with a ReadableStream reader instead.

  • Setting state with stale closures in React: Using setOutput(output + token) instead of setOutput(prev => prev + token) inside the streaming callback. The closure captures the initial value of output, causing every token to overwrite the previous one instead of appending.

  • No timeout or error handling for the stream: Assuming the stream always completes successfully. Network interruptions, proxy timeouts, and API errors can leave the stream hanging without a [DONE] event. Implement a read timeout and surface errors to the UI.

Common Pitfalls

  • Not buffering partial SSE lines, causing JSON.parse errors when a chunk splits mid-event.
  • Forgetting to check for data: [DONE] before parsing, which crashes on the termination signal.
  • Using EventSource API for POST requests; EventSource only supports GET. Use fetch with ReadableStream instead.
  • Not flushing response data in Node.js when behind a reverse proxy (nginx requires X-Accel-Buffering: no).
  • Setting React state on every token with a new string concatenation; this is correct with setOutput(prev => prev + token) but wrong with stale closure references.
  • Not handling network errors during streaming, leaving the UI in a permanent loading state.
  • Assuming the stream always ends cleanly; network drops can leave the connection hanging without a [DONE] event.

Install this skill directly: skilldb add llm-integration-skills

Get CLI access →