Implementing Streaming LLM Responses in Node.js
Complete guide to streaming LLM responses in Node.js with SSE, Express.js endpoints, client consumption, and reconnection handling.
Implementing Streaming LLM Responses in Node.js
Streaming LLM responses dramatically improves perceived performance by delivering tokens to the user as they are generated rather than waiting for the entire response to complete. This guide covers everything you need to build production-grade streaming pipelines in Node.js, from SDK-level streaming through server-sent events delivery to robust client-side consumption with reconnection logic and token tracking.
If you have ever watched ChatGPT type out a response word by word, you have seen streaming in action. Building that experience into your own applications is not difficult, but there are real engineering decisions around transport protocols, error recovery, backpressure, and tool use that separate a demo from production code. This article covers all of them.
Prerequisites
- Node.js 18 or later (for native fetch and ReadableStream support)
- An Anthropic API key or OpenAI API key (or both)
- Basic familiarity with Express.js
- Understanding of HTTP and event-driven programming
- npm packages:
express,@anthropic-ai/sdk,openai
Install the dependencies:
npm install express @anthropic-ai/sdk openai
Why Streaming Matters for LLM UX
The single most important metric for LLM-powered interfaces is time to first token (TTFT). When a user submits a prompt, there is an unavoidable delay while the model processes the input and begins generating. With a non-streaming request, the user sees nothing until the entire response is complete. For a response that takes eight seconds to generate, that is eight seconds of staring at a spinner.
With streaming, the first token arrives in 200-500 milliseconds for most models. The user sees text appearing immediately and can begin reading while the rest of the response generates. This is not just a cosmetic improvement. Studies on perceived latency show that users judge a two-second streaming response as faster than a one-second buffered response, because the continuous feedback eliminates the anxiety of waiting.
There is also a practical benefit: if the model starts generating something unhelpful, the user can cancel early without waiting for the full response and burning unnecessary tokens.
Streaming with the Anthropic SDK
The Anthropic Node.js SDK provides a messages.stream() method that returns an async iterable of server-sent events. Here is the basic pattern:
var Anthropic = require("@anthropic-ai/sdk");
var client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
function streamClaude(prompt, callback) {
var stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
stream.on("text", function (text) {
callback(null, { type: "text", content: text });
});
stream.on("message", function (message) {
callback(null, {
type: "done",
usage: message.usage,
});
});
stream.on("error", function (err) {
callback(err);
});
return stream;
}
The stream.on('text') event fires for each chunk of text as it arrives. The stream.on('message') event fires once when the complete message is available, which is where you get the final usage statistics. You can also use the async iterator pattern:
function streamClaudeAsync(prompt) {
var stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
return stream;
}
// Usage with for-await
async function consumeStream(prompt) {
var stream = streamClaudeAsync(prompt);
var events = stream.on("streamEvent");
for await (var event of stream) {
if (event.type === "content_block_delta") {
process.stdout.write(event.delta.text || "");
}
}
var finalMessage = await stream.finalMessage();
console.log("\nTokens used:", finalMessage.usage);
}
Streaming with the OpenAI SDK
The OpenAI SDK uses a stream: true parameter that changes the response into an async iterable:
var OpenAI = require("openai");
var openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
async function streamOpenAI(prompt) {
var stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
stream_options: { include_usage: true },
});
var fullContent = "";
for await (var chunk of stream) {
var delta = chunk.choices[0] && chunk.choices[0].delta;
if (delta && delta.content) {
fullContent += delta.content;
process.stdout.write(delta.content);
}
// Usage arrives in the final chunk
if (chunk.usage) {
console.log("\nTokens:", chunk.usage);
}
}
return fullContent;
}
Note the stream_options: { include_usage: true } parameter. Without this, you will not receive token counts during streaming. The usage data arrives in the very last chunk of the stream.
Server-Sent Events (SSE) for Browser Delivery
Server-sent events are the natural transport for streaming LLM responses to browsers. SSE is a simple, HTTP-based protocol where the server holds a connection open and pushes text events to the client. It has several advantages over alternatives for this use case:
- Built-in browser support via the
EventSourceAPI - Automatic reconnection with configurable retry intervals
- Last-Event-ID tracking for resuming interrupted streams
- Simple text protocol that works through proxies and CDNs
- No special server infrastructure unlike WebSockets
The SSE protocol is straightforward. The server responds with Content-Type: text/event-stream and sends events in this format:
event: token
data: {"text": "Hello"}
event: token
data: {"text": " world"}
event: done
data: {"usage": {"input_tokens": 12, "output_tokens": 45}}
Each event is separated by a blank line. The event: field names the event type, and data: carries the payload. You can also include an id: field for reconnection tracking.
Implementing an Express.js SSE Endpoint
Here is how to build a streaming endpoint that connects the Anthropic SDK to an SSE response:
var express = require("express");
var Anthropic = require("@anthropic-ai/sdk");
var app = express();
var client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });
app.use(express.json());
app.post("/api/chat/stream", function (req, res) {
var prompt = req.body.prompt;
if (!prompt) {
return res.status(400).json({ error: "prompt is required" });
}
// Set SSE headers
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no", // Disable Nginx buffering
});
var eventId = 0;
var aborted = false;
function sendEvent(eventType, data) {
if (aborted) return;
eventId++;
res.write("id: " + eventId + "\n");
res.write("event: " + eventType + "\n");
res.write("data: " + JSON.stringify(data) + "\n\n");
}
// Track client disconnect
req.on("close", function () {
aborted = true;
});
var stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 2048,
messages: [{ role: "user", content: prompt }],
});
var tokenCount = 0;
stream.on("text", function (text) {
tokenCount += estimateTokens(text);
sendEvent("token", { text: text, tokens: tokenCount });
});
stream.on("message", function (message) {
sendEvent("done", {
usage: message.usage,
stop_reason: message.stop_reason,
});
res.end();
});
stream.on("error", function (err) {
sendEvent("error", {
message: err.message || "Stream error occurred",
});
res.end();
});
});
// Rough token estimation (4 chars per token average)
function estimateTokens(text) {
return Math.ceil(text.length / 4);
}
app.listen(3000, function () {
console.log("Streaming server running on port 3000");
});
The X-Accel-Buffering: no header is critical if you are behind Nginx. Without it, Nginx will buffer the entire response before forwarding it to the client, completely defeating the purpose of streaming.
Client-Side Consumption with EventSource and Fetch
There are two approaches to consuming SSE on the client: the native EventSource API and fetch with a readable stream. Each has tradeoffs.
EventSource Approach
EventSource is the simplest option but only supports GET requests. For POST-based endpoints, you need a workaround:
// EventSource only works with GET, so use a session-based approach
var eventSource = new EventSource("/api/chat/stream?sessionId=" + sessionId);
eventSource.addEventListener("token", function (e) {
var data = JSON.parse(e.data);
document.getElementById("response").textContent += data.text;
});
eventSource.addEventListener("done", function (e) {
var data = JSON.parse(e.data);
console.log("Usage:", data.usage);
eventSource.close();
});
eventSource.addEventListener("error", function (e) {
if (eventSource.readyState === EventSource.CLOSED) {
console.log("Stream ended");
} else {
console.error("Stream error, reconnecting...");
}
});
Fetch Approach (Recommended)
The fetch API supports POST requests and gives you more control:
async function streamChat(prompt) {
var response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: prompt }),
});
if (!response.ok) {
throw new Error("HTTP " + response.status);
}
var reader = response.body.getReader();
var decoder = new TextDecoder();
var buffer = "";
var outputEl = document.getElementById("response");
while (true) {
var result = await reader.read();
if (result.done) break;
buffer += decoder.decode(result.value, { stream: true });
// Parse SSE events from buffer
var lines = buffer.split("\n");
buffer = lines.pop(); // Keep incomplete line in buffer
var eventType = null;
for (var i = 0; i < lines.length; i++) {
var line = lines[i].trim();
if (line.indexOf("event: ") === 0) {
eventType = line.substring(7);
} else if (line.indexOf("data: ") === 0 && eventType) {
var data = JSON.parse(line.substring(6));
handleEvent(eventType, data, outputEl);
eventType = null;
}
}
}
}
function handleEvent(eventType, data, outputEl) {
if (eventType === "token") {
outputEl.textContent += data.text;
} else if (eventType === "done") {
console.log("Complete. Tokens:", data.usage);
} else if (eventType === "error") {
console.error("Stream error:", data.message);
}
}
The fetch approach is better for most LLM applications because you typically need to send a conversation history in the request body, which requires POST.
Handling Stream Interruptions and Reconnection
Network interruptions are inevitable, especially on mobile connections. A robust streaming client needs reconnection logic with exponential backoff:
function StreamClient(url, options) {
this.url = url;
this.options = options || {};
this.retryCount = 0;
this.maxRetries = this.options.maxRetries || 5;
this.baseDelay = this.options.baseDelay || 1000;
this.lastEventId = null;
this.fullResponse = "";
this.abortController = null;
this.onToken = this.options.onToken || function () {};
this.onDone = this.options.onDone || function () {};
this.onError = this.options.onError || function () {};
}
StreamClient.prototype.start = function (body) {
this._body = body;
this._connect();
};
StreamClient.prototype._connect = function () {
var self = this;
self.abortController = new AbortController();
var headers = { "Content-Type": "application/json" };
if (self.lastEventId) {
headers["Last-Event-ID"] = self.lastEventId;
}
fetch(self.url, {
method: "POST",
headers: headers,
body: JSON.stringify(self._body),
signal: self.abortController.signal,
})
.then(function (response) {
if (!response.ok) throw new Error("HTTP " + response.status);
self.retryCount = 0; // Reset on successful connection
return self._readStream(response);
})
.catch(function (err) {
if (err.name === "AbortError") return;
self._handleDisconnect(err);
});
};
StreamClient.prototype._readStream = function (response) {
var self = this;
var reader = response.body.getReader();
var decoder = new TextDecoder();
var buffer = "";
function read() {
return reader.read().then(function (result) {
if (result.done) return;
buffer += decoder.decode(result.value, { stream: true });
var lines = buffer.split("\n");
buffer = lines.pop();
var eventType = null;
var eventId = null;
for (var i = 0; i < lines.length; i++) {
var line = lines[i].trim();
if (line.indexOf("id: ") === 0) {
eventId = line.substring(4);
} else if (line.indexOf("event: ") === 0) {
eventType = line.substring(7);
} else if (line.indexOf("data: ") === 0 && eventType) {
if (eventId) self.lastEventId = eventId;
var data = JSON.parse(line.substring(6));
self._dispatch(eventType, data);
eventType = null;
eventId = null;
}
}
return read();
});
}
return read();
};
StreamClient.prototype._dispatch = function (eventType, data) {
if (eventType === "token") {
this.fullResponse += data.text;
this.onToken(data);
} else if (eventType === "done") {
this.onDone(data);
} else if (eventType === "error") {
this.onError(new Error(data.message));
}
};
StreamClient.prototype._handleDisconnect = function (err) {
var self = this;
if (self.retryCount >= self.maxRetries) {
self.onError(new Error("Max retries exceeded: " + err.message));
return;
}
var delay = self.baseDelay * Math.pow(2, self.retryCount);
var jitter = Math.random() * delay * 0.1;
self.retryCount++;
console.log(
"Reconnecting in " + (delay + jitter) + "ms (attempt " + self.retryCount + ")"
);
setTimeout(function () {
self._connect();
}, delay + jitter);
};
StreamClient.prototype.abort = function () {
if (this.abortController) {
this.abortController.abort();
}
};
On the server side, you need to support the Last-Event-ID header. When a client reconnects, it sends this header so the server can resume from where the stream was interrupted. For LLM responses, this typically means caching the partial response and replaying missed events:
var streamCache = {};
app.post("/api/chat/stream", function (req, res) {
var sessionId = req.body.sessionId;
var lastEventId = parseInt(req.headers["last-event-id"] || "0", 10);
// Replay cached events if reconnecting
if (lastEventId > 0 && streamCache[sessionId]) {
var cached = streamCache[sessionId];
for (var i = 0; i < cached.events.length; i++) {
var evt = cached.events[i];
if (evt.id > lastEventId) {
res.write("id: " + evt.id + "\n");
res.write("event: " + evt.type + "\n");
res.write("data: " + JSON.stringify(evt.data) + "\n\n");
}
}
if (cached.complete) {
res.end();
return;
}
}
// Continue with live streaming...
});
Token Counting During Streaming
Accurate token counting during streaming is important for displaying usage information and enforcing limits. The Anthropic SDK provides exact token counts in the final message event, but you often want a running estimate during the stream:
var Anthropic = require("@anthropic-ai/sdk");
var client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });
async function streamWithTokenTracking(messages, onChunk) {
var stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 4096,
messages: messages,
});
var estimatedOutputTokens = 0;
var charCount = 0;
stream.on("text", function (text) {
charCount += text.length;
// Claude averages ~3.5 chars per token for English
estimatedOutputTokens = Math.ceil(charCount / 3.5);
onChunk({
text: text,
estimatedTokens: estimatedOutputTokens,
});
});
var finalMessage = await stream.finalMessage();
return {
estimatedTokens: estimatedOutputTokens,
actualUsage: finalMessage.usage,
accuracy:
(
(estimatedOutputTokens / finalMessage.usage.output_tokens) *
100
).toFixed(1) + "%",
};
}
The character-to-token ratio varies by language. For English text, 3.5 characters per token is a reasonable estimate for Claude models. For code, it is closer to 3.0. For CJK languages, the ratio can be as low as 1.5. If you need exact counts during streaming, you would need to use a tokenizer library, but the overhead is usually not worth it for a real-time display.
Streaming with Tool Use (Partial JSON Accumulation)
When streaming responses that involve tool use (function calling), you need to accumulate partial JSON from the tool input deltas. The model sends the JSON argument string in chunks, and you cannot parse it until the complete tool call is received:
function handleToolStream(stream) {
var toolCalls = {};
var currentBlockIndex = null;
stream.on("streamEvent", function (event) {
if (event.type === "content_block_start") {
var block = event.content_block;
if (block.type === "tool_use") {
currentBlockIndex = event.index;
toolCalls[event.index] = {
id: block.id,
name: block.name,
inputJson: "",
};
}
}
if (event.type === "content_block_delta") {
var delta = event.delta;
if (delta.type === "text_delta") {
// Regular text token
process.stdout.write(delta.text);
}
if (delta.type === "input_json_delta") {
// Accumulate partial JSON for tool call
toolCalls[event.index].inputJson += delta.partial_json;
}
}
if (event.type === "content_block_stop") {
if (toolCalls[event.index]) {
var call = toolCalls[event.index];
try {
call.parsedInput = JSON.parse(call.inputJson);
console.log(
"\nTool call ready:",
call.name,
call.parsedInput
);
} catch (err) {
console.error("Failed to parse tool input:", err.message);
}
}
}
});
return stream.finalMessage().then(function (message) {
return { message: message, toolCalls: toolCalls };
});
}
A common mistake is trying to parse the JSON on every delta. Do not do this. Accumulate the full string and parse once when the content block stops. Some developers try to use a streaming JSON parser to provide incremental UI updates for tool arguments, but the complexity is rarely worth it.
Backpressure Handling for Slow Clients
When the LLM generates tokens faster than the client can consume them, you get backpressure. This matters in production because a slow mobile connection can cause Node.js buffers to grow unboundedly if you are not careful:
app.post("/api/chat/stream", function (req, res) {
var prompt = req.body.prompt;
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
var stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 4096,
messages: [{ role: "user", content: prompt }],
});
var paused = false;
var queue = [];
function drain() {
while (queue.length > 0) {
var event = queue.shift();
var canContinue = res.write(event);
if (!canContinue) {
paused = true;
return;
}
}
paused = false;
}
res.on("drain", function () {
paused = false;
drain();
});
stream.on("text", function (text) {
var eventStr =
"event: token\ndata: " + JSON.stringify({ text: text }) + "\n\n";
if (paused) {
queue.push(eventStr);
} else {
var canContinue = res.write(eventStr);
if (!canContinue) {
paused = true;
}
}
});
stream.on("message", function (message) {
var eventStr =
"event: done\ndata: " +
JSON.stringify({ usage: message.usage }) +
"\n\n";
queue.push(eventStr);
drain();
if (queue.length === 0) {
res.end();
}
});
});
The res.write() method returns false when the internal buffer is full. When that happens, you should stop writing and wait for the drain event. In practice, LLM token generation is slow enough that this rarely triggers, but it is critical for handling edge cases like clients on extremely slow connections.
WebSocket Alternative for Bidirectional Streaming
While SSE is the right choice for most LLM streaming scenarios, WebSockets make sense when you need bidirectional communication, such as allowing the user to send follow-up messages or cancel requests without opening new connections:
var express = require("express");
var http = require("http");
var WebSocket = require("ws");
var Anthropic = require("@anthropic-ai/sdk");
var app = express();
var server = http.createServer(app);
var wss = new WebSocket.Server({ server: server });
var client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });
wss.on("connection", function (ws) {
var currentStream = null;
ws.on("message", function (raw) {
var msg = JSON.parse(raw.toString());
if (msg.type === "cancel" && currentStream) {
currentStream.abort();
currentStream = null;
ws.send(JSON.stringify({ type: "cancelled" }));
return;
}
if (msg.type === "chat") {
// Abort any existing stream
if (currentStream) {
currentStream.abort();
}
currentStream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 2048,
messages: msg.messages,
});
currentStream.on("text", function (text) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "token", text: text }));
}
});
currentStream.on("message", function (message) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(
JSON.stringify({ type: "done", usage: message.usage })
);
}
currentStream = null;
});
currentStream.on("error", function (err) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(
JSON.stringify({ type: "error", message: err.message })
);
}
currentStream = null;
});
}
});
ws.on("close", function () {
if (currentStream) {
currentStream.abort();
}
});
});
server.listen(3000, function () {
console.log("WebSocket streaming server on port 3000");
});
The main advantage here is the cancel message type. With SSE, the client can close the connection to stop receiving events, but you need additional logic to detect the disconnect and abort the upstream LLM request. With WebSockets, cancellation is explicit and immediate.
Testing Streaming Endpoints
Testing streaming endpoints requires different techniques than testing traditional request-response APIs. Here are three approaches:
Unit Testing the Stream Handler
var assert = require("assert");
// Mock the Anthropic stream
function createMockStream(chunks) {
var EventEmitter = require("events");
var emitter = new EventEmitter();
setTimeout(function () {
for (var i = 0; i < chunks.length; i++) {
emitter.emit("text", chunks[i]);
}
emitter.emit("message", {
usage: { input_tokens: 10, output_tokens: chunks.length * 5 },
stop_reason: "end_turn",
});
}, 10);
emitter.finalMessage = function () {
return new Promise(function (resolve) {
emitter.on("message", resolve);
});
};
return emitter;
}
// Test that all chunks arrive
function testStreamDelivery() {
var chunks = ["Hello", " ", "world", "!"];
var received = [];
var stream = createMockStream(chunks);
stream.on("text", function (text) {
received.push(text);
});
stream.on("message", function () {
assert.deepStrictEqual(received, chunks);
console.log("PASS: All chunks delivered");
});
}
testStreamDelivery();
Integration Testing with Supertest
var request = require("supertest");
var app = require("./app");
function testSSEEndpoint(done) {
var chunks = [];
request(app)
.post("/api/chat/stream")
.send({ prompt: "Say hello" })
.expect(200)
.expect("Content-Type", /text\/event-stream/)
.buffer(false)
.parse(function (res, callback) {
var data = "";
res.on("data", function (chunk) {
data += chunk.toString();
chunks.push(chunk.toString());
});
res.on("end", function () {
callback(null, data);
});
})
.end(function (err) {
if (err) return done(err);
var fullResponse = chunks.join("");
assert(fullResponse.indexOf("event: token") !== -1);
assert(fullResponse.indexOf("event: done") !== -1);
done();
});
}
Load Testing with Artillery
Create an artillery.yml configuration to test streaming under load:
config:
target: "http://localhost:3000"
phases:
- duration: 60
arrivalRate: 5
scenarios:
- name: "Stream chat"
flow:
- post:
url: "/api/chat/stream"
json:
prompt: "Write a haiku about Node.js"
capture:
- streaming: true
Complete Working Example
Here is a self-contained Express.js application that streams Claude responses via SSE to a browser client with reconnection handling, token tracking, and a clean UI:
Server (server.js)
var express = require("express");
var path = require("path");
var Anthropic = require("@anthropic-ai/sdk");
var app = express();
var client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });
var PORT = process.env.PORT || 3000;
// Session cache for reconnection support
var sessionCache = {};
app.use(express.json());
app.use(express.static(path.join(__dirname, "public")));
app.post("/api/chat/stream", function (req, res) {
var prompt = req.body.prompt;
var sessionId = req.body.sessionId || Date.now().toString(36);
var lastEventId = parseInt(req.headers["last-event-id"] || "0", 10);
if (!prompt) {
return res.status(400).json({ error: "prompt is required" });
}
// SSE headers
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
});
// Send retry interval (3 seconds)
res.write("retry: 3000\n\n");
// Initialize session cache
if (!sessionCache[sessionId]) {
sessionCache[sessionId] = { events: [], complete: false };
}
var session = sessionCache[sessionId];
// Replay missed events on reconnection
if (lastEventId > 0) {
for (var i = 0; i < session.events.length; i++) {
var cached = session.events[i];
if (cached.id > lastEventId) {
res.write("id: " + cached.id + "\n");
res.write("event: " + cached.type + "\n");
res.write("data: " + JSON.stringify(cached.data) + "\n\n");
}
}
if (session.complete) {
res.end();
return;
}
}
var eventId = session.events.length;
var aborted = false;
var charCount = 0;
function sendEvent(type, data) {
if (aborted) return;
eventId++;
var event = { id: eventId, type: type, data: data };
session.events.push(event);
res.write("id: " + eventId + "\n");
res.write("event: " + type + "\n");
res.write("data: " + JSON.stringify(data) + "\n\n");
}
req.on("close", function () {
aborted = true;
});
var stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 2048,
messages: [{ role: "user", content: prompt }],
});
stream.on("text", function (text) {
charCount += text.length;
sendEvent("token", {
text: text,
estimatedTokens: Math.ceil(charCount / 3.5),
});
});
stream.on("message", function (message) {
session.complete = true;
sendEvent("done", {
usage: message.usage,
stop_reason: message.stop_reason,
});
res.end();
// Clean up session cache after 5 minutes
setTimeout(function () {
delete sessionCache[sessionId];
}, 5 * 60 * 1000);
});
stream.on("error", function (err) {
sendEvent("error", { message: err.message });
res.end();
});
});
// Health check
app.get("/health", function (req, res) {
res.json({ status: "ok", sessions: Object.keys(sessionCache).length });
});
app.listen(PORT, function () {
console.log("Streaming server running on http://localhost:" + PORT);
});
Client (public/index.html)
<!DOCTYPE html>
<html>
<head>
<title>LLM Streaming Demo</title>
<style>
body {
font-family: -apple-system, sans-serif;
max-width: 720px;
margin: 40px auto;
padding: 0 20px;
}
#response {
white-space: pre-wrap;
background: #f5f5f5;
padding: 16px;
border-radius: 8px;
min-height: 100px;
margin-top: 16px;
font-size: 15px;
line-height: 1.6;
}
#stats {
margin-top: 8px;
color: #666;
font-size: 13px;
}
textarea {
width: 100%;
height: 80px;
font-size: 15px;
padding: 8px;
box-sizing: border-box;
}
button {
margin-top: 8px;
padding: 8px 20px;
font-size: 15px;
cursor: pointer;
}
</style>
</head>
<body>
<h1>Streaming LLM Demo</h1>
<textarea id="prompt" placeholder="Enter your prompt..."></textarea>
<br />
<button id="send" onclick="sendPrompt()">Send</button>
<button id="cancel" onclick="cancelStream()" disabled>Cancel</button>
<div id="response"></div>
<div id="stats"></div>
<script>
var abortController = null;
var startTime = null;
function sendPrompt() {
var prompt = document.getElementById("prompt").value.trim();
if (!prompt) return;
var responseEl = document.getElementById("response");
var statsEl = document.getElementById("stats");
responseEl.textContent = "";
statsEl.textContent = "Connecting...";
startTime = Date.now();
document.getElementById("send").disabled = true;
document.getElementById("cancel").disabled = false;
abortController = new AbortController();
var sessionId = Date.now().toString(36);
var firstToken = true;
fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: prompt,
sessionId: sessionId,
}),
signal: abortController.signal,
})
.then(function (response) {
if (!response.ok) throw new Error("HTTP " + response.status);
var reader = response.body.getReader();
var decoder = new TextDecoder();
var buffer = "";
function read() {
return reader.read().then(function (result) {
if (result.done) return;
buffer += decoder.decode(result.value, { stream: true });
var lines = buffer.split("\n");
buffer = lines.pop();
var eventType = null;
for (var i = 0; i < lines.length; i++) {
var line = lines[i].trim();
if (line.indexOf("event: ") === 0) {
eventType = line.substring(7);
} else if (line.indexOf("data: ") === 0 && eventType) {
var data = JSON.parse(line.substring(6));
if (eventType === "token") {
if (firstToken) {
var ttft = Date.now() - startTime;
statsEl.textContent =
"Time to first token: " + ttft + "ms";
firstToken = false;
}
responseEl.textContent += data.text;
statsEl.textContent =
"Streaming... ~" +
data.estimatedTokens +
" tokens";
} else if (eventType === "done") {
var elapsed = ((Date.now() - startTime) / 1000).toFixed(
1
);
statsEl.textContent =
"Done in " +
elapsed +
"s | Input: " +
data.usage.input_tokens +
" | Output: " +
data.usage.output_tokens +
" tokens";
finish();
} else if (eventType === "error") {
statsEl.textContent = "Error: " + data.message;
finish();
}
eventType = null;
}
}
return read();
});
}
return read();
})
.catch(function (err) {
if (err.name !== "AbortError") {
statsEl.textContent = "Error: " + err.message;
}
finish();
});
}
function cancelStream() {
if (abortController) {
abortController.abort();
document.getElementById("stats").textContent = "Cancelled";
finish();
}
}
function finish() {
document.getElementById("send").disabled = false;
document.getElementById("cancel").disabled = true;
abortController = null;
}
</script>
</body>
</html>
Common Issues and Troubleshooting
1. Nginx Buffering Kills Streaming
Error: The client receives the entire response at once after a long delay instead of streaming tokens.
Cause: Nginx buffers proxy responses by default. The entire SSE stream is held in Nginx memory until the response completes.
Fix: Add these headers and Nginx configuration:
# In your Express response headers:
X-Accel-Buffering: no
# In nginx.conf:
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
2. CloudFlare or CDN Buffering
Error: Streaming works locally but tokens arrive in large batches in production.
Cause: CloudFlare and similar CDNs buffer responses by default. Even with SSE headers, some CDNs will batch events.
Fix: Disable response buffering at the CDN level. In CloudFlare, you can use a Page Rule to bypass cache for your streaming endpoint, or move the streaming endpoint to a subdomain that is not proxied through the CDN.
3. CORS Errors with EventSource
Error: Access to XMLHttpRequest at 'https://api.example.com/stream' from origin 'https://example.com' has been blocked by CORS policy
Cause: EventSource does not support custom headers, so you cannot send an Authorization header. If your streaming endpoint is on a different origin, CORS preflight fails.
Fix: Either use the fetch-based approach (which supports CORS headers), put the streaming endpoint on the same origin, or use cookie-based authentication:
// Server-side CORS headers for SSE
app.use("/api/chat/stream", function (req, res, next) {
res.setHeader("Access-Control-Allow-Origin", "https://example.com");
res.setHeader("Access-Control-Allow-Credentials", "true");
res.setHeader("Access-Control-Allow-Methods", "POST, OPTIONS");
res.setHeader("Access-Control-Allow-Headers", "Content-Type, Last-Event-ID");
if (req.method === "OPTIONS") {
return res.sendStatus(204);
}
next();
});
4. Memory Leaks from Unclosed Streams
Error: Node.js process memory grows continuously under load, eventually crashing with FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory.
Cause: When clients disconnect without properly closing the connection, the LLM stream continues generating tokens that are written to a dead socket. The session cache accumulates without cleanup.
Fix: Always listen for the request close event and abort the upstream stream:
req.on("close", function () {
aborted = true;
if (stream && typeof stream.abort === "function") {
stream.abort();
}
// Clean up session cache
if (sessionCache[sessionId]) {
delete sessionCache[sessionId];
}
});
5. Rate Limit Errors During Streaming
Error: Error: 429 {"type":"error","error":{"type":"rate_limit_error","message":"Number of request tokens has exceeded your per-minute rate limit"}}
Cause: Streaming requests consume rate limit capacity for the full max_tokens value at request time, not as tokens are generated. A request with max_tokens: 4096 reserves that full capacity even if the response only uses 200 tokens.
Fix: Set max_tokens to a reasonable value for your use case rather than defaulting to the maximum. Implement a request queue with concurrency limits:
var activeStreams = 0;
var MAX_CONCURRENT = 10;
var waitQueue = [];
function acquireSlot(callback) {
if (activeStreams < MAX_CONCURRENT) {
activeStreams++;
callback();
} else {
waitQueue.push(callback);
}
}
function releaseSlot() {
activeStreams--;
if (waitQueue.length > 0) {
activeStreams++;
var next = waitQueue.shift();
next();
}
}
6. Incomplete SSE Parsing on Client
Error: Some token events are silently dropped, resulting in missing words or characters in the displayed response.
Cause: The client-side SSE parser splits on newlines but does not account for the possibility that a single reader.read() call may return a partial line. The incomplete line gets discarded.
Fix: Maintain a buffer that persists between read() calls, and only process complete lines (those followed by a newline). The incomplete trailing portion is kept in the buffer for the next read cycle. This is demonstrated in the fetch-based client code above where buffer = lines.pop() retains the partial line.
Best Practices
Always set
X-Accel-Buffering: noin your SSE response headers. Even if you are not currently behind Nginx, you might be in the future, and this header is harmless when Nginx is not present.Use POST with fetch for SSE instead of GET with EventSource. LLM requests typically need to send conversation history in the body, which GET cannot support. The EventSource API is simpler but too limited for real applications.
Abort the upstream LLM stream on client disconnect. If the client navigates away or cancels, you are still paying for every token the model generates. Listen for the request
closeevent and callstream.abort()immediately.Set realistic
max_tokensvalues. Do not default to the model's maximum context length. If your typical response is 500 tokens, setmax_tokensto 1024. This reduces rate limit pressure and costs.Implement session-based reconnection for long responses. Store events by session ID and replay missed events when a client reconnects with
Last-Event-ID. Clean up stale sessions after a timeout to prevent memory leaks.Send a heartbeat comment every 15-30 seconds to keep the connection alive through proxies and load balancers that have idle timeouts:
var heartbeat = setInterval(function () { if (!aborted) { res.write(": heartbeat\n\n"); } }, 15000);Lines starting with
:are SSE comments and are ignored by the client parser.Estimate tokens on the client for UI feedback but trust the server for billing. Character-based estimation is good enough for a progress indicator. Use the final
usageobject from the model for any token-based billing or logging.Test with artificial latency. Add a configurable delay between chunks during development to simulate slow model responses and verify your UI handles progressive rendering correctly.
Monitor time to first token (TTFT) in production. This metric directly correlates with user satisfaction. Set up alerts if TTFT exceeds your threshold (typically 1-2 seconds for interactive use cases).
Version your SSE event schema. Include a version field in your event data so clients can handle schema changes gracefully without breaking during deployments.