Streaming
Streaming LLM responses with SSE, WebSockets, and backpressure handling
You are an expert in streaming LLM responses using Server-Sent Events, WebSockets, and incremental rendering in applications.
## Key Points
- Always set `Content-Type: text/event-stream` and `Cache-Control: no-cache` headers for SSE endpoints.
- Use `AbortController` to let users cancel in-progress streams and avoid wasting API credits.
- Buffer partial SSE lines on the client; chunks may split across `reader.read()` calls.
- Detect client disconnects server-side and abort the upstream LLM stream to save tokens.
- Use `ReadableStream` in edge runtimes (Next.js, Cloudflare Workers) instead of `res.write()`.
- Render streamed text with `whitespace-pre-wrap` to preserve formatting during generation.
- Accumulate the full response alongside streaming for logging, caching, or database storage.
- Not buffering partial SSE lines, causing `JSON.parse` errors when a chunk splits mid-event.
- Forgetting to check for `data: [DONE]` before parsing, which crashes on the termination signal.
- Using `EventSource` API for POST requests; `EventSource` only supports GET. Use `fetch` with `ReadableStream` instead.
- Not flushing response data in Node.js when behind a reverse proxy (nginx requires `X-Accel-Buffering: no`).
- Setting React state on every token with a new string concatenation; this is correct with `setOutput(prev => prev + token)` but wrong with stale closure references.
## Quick Example
```
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"Hello"}}]}
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" world"}}]}
data: [DONE]
```skilldb get llm-integration-skills/StreamingFull skill: 350 linesStreaming — LLM Integration
You are an expert in streaming LLM responses using Server-Sent Events, WebSockets, and incremental rendering in applications.
Overview
Streaming delivers LLM-generated tokens to users as they are produced, reducing perceived latency from seconds to milliseconds for the first visible token. Most LLM APIs support streaming via Server-Sent Events (SSE). Implementing streaming correctly requires handling chunked responses, parsing SSE protocols, managing connection lifecycle, and rendering partial content in the UI.
Core Concepts
Server-Sent Events (SSE) Protocol
SSE is a unidirectional protocol where the server pushes events to the client over a single HTTP connection:
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"Hello"}}]}
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" world"}}]}
data: [DONE]
Each event is prefixed with data: , separated by double newlines, and the stream ends with data: [DONE].
OpenAI Streaming
import OpenAI from "openai";
const openai = new OpenAI();
async function streamChat(prompt: string): Promise<string> {
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
let fullText = "";
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content ?? "";
fullText += content;
process.stdout.write(content);
}
return fullText;
}
Anthropic Streaming
import Anthropic from "@anthropic-ai/sdk";
const anthropic = new Anthropic();
async function streamClaude(prompt: string): Promise<string> {
let fullText = "";
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
stream.on("text", (text) => {
fullText += text;
process.stdout.write(text);
});
await stream.finalMessage();
return fullText;
}
Implementation Patterns
Express SSE Endpoint
import express from "express";
import OpenAI from "openai";
const app = express();
const openai = new OpenAI();
app.post("/api/chat", async (req, res) => {
const { messages } = req.body;
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write("data: [DONE]\n\n");
res.end();
});
Next.js App Router with ReadableStream
// app/api/stream/route.ts
import OpenAI from "openai";
const openai = new OpenAI();
export async function POST(req: Request) {
const { prompt } = await req.json();
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`));
}
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
},
});
return new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
},
});
}
Browser Client: Consuming SSE with fetch
async function streamFromAPI(prompt: string, onToken: (token: string) => void): Promise<string> {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages: [{ role: "user", content: prompt }] }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let fullText = "";
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const data = JSON.parse(line.slice(6));
if (data.content) {
fullText += data.content;
onToken(data.content);
}
}
}
}
return fullText;
}
React Streaming Component
"use client";
import { useState, useCallback } from "react";
export function StreamingChat() {
const [output, setOutput] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const handleStream = useCallback(async (prompt: string) => {
setOutput("");
setIsStreaming(true);
await streamFromAPI(prompt, (token) => {
setOutput((prev) => prev + token);
});
setIsStreaming(false);
}, []);
return (
<div>
<button onClick={() => handleStream("Explain React hooks")} disabled={isStreaming}>
{isStreaming ? "Streaming..." : "Ask"}
</button>
<div className="whitespace-pre-wrap">{output}</div>
</div>
);
}
Abort / Cancel Streaming
const controller = new AbortController();
// Start streaming
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
signal: controller.signal,
});
// Cancel from UI
function handleCancel() {
controller.abort();
}
Server-Side: Detecting Client Disconnect
app.post("/api/chat", async (req, res) => {
let aborted = false;
req.on("close", () => {
aborted = true;
});
res.setHeader("Content-Type", "text/event-stream");
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: req.body.messages,
stream: true,
});
for await (const chunk of stream) {
if (aborted) {
stream.controller.abort();
break;
}
const content = chunk.choices[0]?.delta?.content;
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.end();
});
WebSocket Alternative
import { WebSocketServer } from "ws";
const wss = new WebSocketServer({ port: 8080 });
wss.on("connection", (ws) => {
ws.on("message", async (data) => {
const { prompt } = JSON.parse(data.toString());
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
for await (const chunk of stream) {
if (ws.readyState !== ws.OPEN) break;
const content = chunk.choices[0]?.delta?.content;
if (content) {
ws.send(JSON.stringify({ type: "token", content }));
}
}
ws.send(JSON.stringify({ type: "done" }));
});
});
Best Practices
- Always set
Content-Type: text/event-streamandCache-Control: no-cacheheaders for SSE endpoints. - Use
AbortControllerto let users cancel in-progress streams and avoid wasting API credits. - Buffer partial SSE lines on the client; chunks may split across
reader.read()calls. - Detect client disconnects server-side and abort the upstream LLM stream to save tokens.
- Use
ReadableStreamin edge runtimes (Next.js, Cloudflare Workers) instead ofres.write(). - Render streamed text with
whitespace-pre-wrapto preserve formatting during generation. - Accumulate the full response alongside streaming for logging, caching, or database storage.
Core Philosophy
Streaming is not an optimization -- it is a user experience requirement. When an LLM takes 3-5 seconds to generate a full response, showing nothing during that time feels like a broken application. Streaming delivers the first token in hundreds of milliseconds, turning a perceived hang into a fluid writing experience. For any user-facing LLM integration, the question is not whether to stream but how to implement it correctly.
The SSE protocol is simple but unforgiving. Each event is a line prefixed with data: , events are separated by double newlines, and the stream ends with data: [DONE]. This simplicity breaks down at the edges: TCP chunks can split mid-event, proxies can buffer responses, and network interruptions can leave the stream in an indeterminate state. Robust streaming code must buffer partial lines, handle incomplete events, detect disconnections, and implement timeouts. Treating SSE parsing as trivial is the source of the most common streaming bugs.
Resource management is the hidden complexity of streaming. Every active stream consumes a connection, server memory for buffering, and API credits for token generation. If the user navigates away or cancels the request, the server-side stream should be aborted to stop generating tokens that nobody will read. If the server does not detect client disconnection, it continues generating and billing for a response that has been abandoned. Proper cleanup -- abort controllers on the client, disconnect detection on the server -- is not a nice-to-have; it is a cost control mechanism.
Anti-Patterns
-
Not buffering partial SSE lines on the client: Assuming that each
reader.read()call returns exactly one complete SSE event. TCP chunks can split an event across multiple reads, and callingJSON.parseon a partial line produces an error that crashes the stream handler. -
No client disconnect detection on the server: Continuing to stream tokens from the LLM API after the client has disconnected. This wastes API credits and server resources generating a response that nobody will receive. Listen for the request
closeevent and abort the upstream stream. -
Using
EventSourcefor POST requests: Attempting to use the browser'sEventSourceAPI to consume an SSE endpoint that requires a POST body.EventSourceonly supports GET requests. Usefetchwith aReadableStreamreader instead. -
Setting state with stale closures in React: Using
setOutput(output + token)instead ofsetOutput(prev => prev + token)inside the streaming callback. The closure captures the initial value ofoutput, causing every token to overwrite the previous one instead of appending. -
No timeout or error handling for the stream: Assuming the stream always completes successfully. Network interruptions, proxy timeouts, and API errors can leave the stream hanging without a
[DONE]event. Implement a read timeout and surface errors to the UI.
Common Pitfalls
- Not buffering partial SSE lines, causing
JSON.parseerrors when a chunk splits mid-event. - Forgetting to check for
data: [DONE]before parsing, which crashes on the termination signal. - Using
EventSourceAPI for POST requests;EventSourceonly supports GET. UsefetchwithReadableStreaminstead. - Not flushing response data in Node.js when behind a reverse proxy (nginx requires
X-Accel-Buffering: no). - Setting React state on every token with a new string concatenation; this is correct with
setOutput(prev => prev + token)but wrong with stale closure references. - Not handling network errors during streaming, leaving the UI in a permanent loading state.
- Assuming the stream always ends cleanly; network drops can leave the connection hanging without a
[DONE]event.
Install this skill directly: skilldb add llm-integration-skills
Related Skills
Anthropic API
Anthropic Claude API integration for messages, streaming, and tool use
Embeddings
Text embeddings and semantic search with vector databases for LLM applications
Function Calling
Function/tool calling patterns for connecting LLMs to external APIs and data sources
Langchain
LangChain orchestration for chains, agents, memory, and retrieval workflows
Openai API
OpenAI API integration patterns for chat completions, embeddings, and assistants
Rag Pipeline
Building retrieval-augmented generation pipelines with document ingestion, retrieval, and synthesis