Node.js Stream Processing for Large Datasets
A hands-on guide to Node.js streams for processing large datasets efficiently, covering Transform streams, backpressure, CSV processing, streaming HTTP responses, and memory-efficient patterns.
Node.js Stream Processing for Large Datasets
Overview
Streams are how you process data that does not fit comfortably in memory -- or data that should not have to. Instead of loading an entire file, database dump, or API response into a buffer, streams let you process data in chunks, keeping memory usage flat regardless of input size. If you have ever watched a Node.js process crash with a heap allocation failure on a 2GB CSV file, streams are the fix.
Prerequisites
- Node.js 18+ installed
- Working knowledge of JavaScript and Node.js core modules
- Familiarity with
fs,http, andBufferbasics - A terminal with at least one large file to experiment with (we will generate one)
Why Streams Matter
The difference between buffered I/O and streaming I/O is not academic. It is the difference between a process that works on your test data and one that works in production.
The Memory Problem
Here is the naive approach to reading a file:
var fs = require('fs');
fs.readFile('/data/sales-records.csv', 'utf8', function(err, data) {
if (err) throw err;
var lines = data.split('\n');
console.log('Total lines: ' + lines.length);
console.log('Memory used: ' + Math.round(process.memoryUsage().heapUsed / 1024 / 1024) + ' MB');
});
For a 1GB file, this allocates at least 1GB of memory just to hold the raw string, plus another 1-2GB for the split array. Your process will likely consume 2-3GB before it even starts processing.
Memory used: 2847 MB
Now the streaming version:
var fs = require('fs');
var readline = require('readline');
var lineCount = 0;
var rl = readline.createInterface({
input: fs.createReadStream('/data/sales-records.csv'),
crlfDelay: Infinity
});
rl.on('line', function(line) {
lineCount++;
});
rl.on('close', function() {
console.log('Total lines: ' + lineCount);
console.log('Memory used: ' + Math.round(process.memoryUsage().heapUsed / 1024 / 1024) + ' MB');
});
Memory used: 14 MB
Same file. Same result. 200x less memory. The streaming version processes one chunk at a time and discards it before reading the next. Memory stays flat whether the file is 100MB or 100GB.
The Four Stream Types
Node.js has four fundamental stream types. Every stream in the standard library and most third-party libraries is built on one of these.
Readable Streams
A source of data. You consume from it.
var fs = require('fs');
var readable = fs.createReadStream('/data/input.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
readable.on('data', function(chunk) {
console.log('Received ' + chunk.length + ' characters');
});
readable.on('end', function() {
console.log('Done reading');
});
readable.on('error', function(err) {
console.error('Read error: ' + err.message);
});
Common readable streams: fs.createReadStream(), http.IncomingMessage (request), process.stdin, crypto.createHash() output.
Writable Streams
A destination for data. You push data into it.
var fs = require('fs');
var writable = fs.createWriteStream('/data/output.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024
});
writable.write('id,name,email\n');
writable.write('1,Shane,[email protected]\n');
writable.end('2,Alex,[email protected]\n');
writable.on('finish', function() {
console.log('All data written');
});
Common writable streams: fs.createWriteStream(), http.ServerResponse (response), process.stdout.
Transform Streams
Both readable and writable. Data goes in, gets modified, comes out. This is where most of the interesting work happens.
var stream = require('stream');
var upperCase = new stream.Transform({
transform: function(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCase).pipe(process.stdout);
Common transform streams: zlib.createGzip(), crypto.createCipheriv(), custom data processors.
Duplex Streams
Readable and writable independently. The read side and write side operate on different data -- unlike Transform where output is derived from input.
var net = require('net');
// A TCP socket is a Duplex stream
var server = net.createServer(function(socket) {
// socket is Duplex: you read what the client sends, you write what you send back
socket.on('data', function(data) {
socket.write('Echo: ' + data.toString());
});
});
Common duplex streams: TCP sockets, WebSocket connections.
Piping and Chaining
Piping connects the output of one stream to the input of another. Chaining is piping multiple transforms together.
var fs = require('fs');
var zlib = require('zlib');
// Read a file, compress it, write the compressed version
fs.createReadStream('/data/large-dataset.csv')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('/data/large-dataset.csv.gz'));
This reads the file in 64KB chunks, compresses each chunk, and writes the compressed output. Peak memory usage stays under 20MB regardless of file size.
You can chain as many transforms as you need:
var fs = require('fs');
var zlib = require('zlib');
var crypto = require('crypto');
fs.createReadStream('/data/sensitive.csv')
.pipe(zlib.createGzip())
.pipe(crypto.createCipheriv('aes-256-cbc', key, iv))
.pipe(fs.createWriteStream('/data/sensitive.csv.gz.enc'));
Backpressure
Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. If a readable stream produces data faster than a writable stream can consume it, data backs up. Without backpressure, that backed-up data sits in memory and your process balloons.
Node.js handles backpressure automatically when you use .pipe(). Here is what happens under the hood:
- The readable stream pushes a chunk.
- The writable stream's internal buffer fills up.
.write()returnsfalse, signaling "stop sending."- The readable stream pauses.
- The writable stream drains its buffer.
- The writable stream emits
'drain'. - The readable stream resumes.
Manual Backpressure Handling
If you are writing to a stream manually (without .pipe()), you must handle backpressure yourself:
var fs = require('fs');
var writable = fs.createWriteStream('/data/output.txt');
var i = 0;
var max = 10000000;
function writeNext() {
var ok = true;
while (i < max && ok) {
i++;
var data = 'Line ' + i + ': ' + 'x'.repeat(200) + '\n';
if (i === max) {
writable.end(data);
} else {
ok = writable.write(data);
}
}
if (i < max) {
writable.once('drain', writeNext);
}
}
writeNext();
Without the drain check, this loop would buffer all 10 million lines in memory before writing any of them. With it, memory stays flat.
Building Custom Transform Streams
Custom transforms are where streams become genuinely powerful. You can build reusable processing steps and compose them.
CSV Line Parser
var stream = require('stream');
function CSVParser(options) {
var headers = null;
var delimiter = (options && options.delimiter) || ',';
return new stream.Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
var lines = chunk.toString().split('\n');
for (var i = 0; i < lines.length; i++) {
var line = lines[i].trim();
if (!line) continue;
var fields = line.split(delimiter);
if (!headers) {
headers = fields;
continue;
}
var record = {};
for (var j = 0; j < headers.length; j++) {
record[headers[j]] = fields[j] || '';
}
this.push(record);
}
callback();
}
});
}
There is a problem with this parser: chunks from the readable stream do not necessarily split on line boundaries. A chunk might end in the middle of a line. You need to buffer the partial line and prepend it to the next chunk.
Correct Line-Splitting Transform
var stream = require('stream');
function LineSplitter() {
var buffer = '';
return new stream.Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
buffer += chunk.toString();
var lines = buffer.split('\n');
buffer = lines.pop(); // last element might be incomplete
for (var i = 0; i < lines.length; i++) {
var line = lines[i].trim();
if (line) {
this.push(line);
}
}
callback();
},
flush: function(callback) {
if (buffer.trim()) {
this.push(buffer.trim());
}
callback();
}
});
}
The flush method is called when the readable side ends. It handles the last line if the file does not end with a newline.
Filter Transform
var stream = require('stream');
function FilterTransform(predicate) {
return new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
if (predicate(record)) {
this.push(record);
}
callback();
}
});
}
Processing CSV Files Line by Line
Putting the pieces together, here is a complete CSV processing pipeline:
var fs = require('fs');
var stream = require('stream');
function LineSplitter() {
var buffer = '';
return new stream.Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
buffer += chunk.toString();
var lines = buffer.split('\n');
buffer = lines.pop();
for (var i = 0; i < lines.length; i++) {
if (lines[i].trim()) this.push(lines[i].trim());
}
callback();
},
flush: function(callback) {
if (buffer.trim()) this.push(buffer.trim());
callback();
}
});
}
function CSVRecordParser() {
var headers = null;
return new stream.Transform({
objectMode: true,
transform: function(line, encoding, callback) {
var fields = line.split(',');
if (!headers) {
headers = fields;
callback();
return;
}
var record = {};
for (var i = 0; i < headers.length; i++) {
record[headers[i]] = fields[i] || '';
}
this.push(record);
callback();
}
});
}
var processed = 0;
var filtered = 0;
fs.createReadStream('/data/sales.csv')
.pipe(LineSplitter())
.pipe(CSVRecordParser())
.pipe(new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
processed++;
var amount = parseFloat(record.amount);
if (amount > 1000) {
filtered++;
this.push(JSON.stringify(record) + '\n');
}
callback();
}
}))
.pipe(fs.createWriteStream('/data/high-value-sales.jsonl'))
.on('finish', function() {
console.log('Processed: ' + processed + ', High-value: ' + filtered);
console.log('Memory: ' + Math.round(process.memoryUsage().heapUsed / 1024 / 1024) + ' MB');
});
Streaming JSON Parsing
Parsing a large JSON file is one of the most common stream use cases. JSON.parse() requires the entire string in memory. For a 500MB JSON array, that means at least 1GB of memory.
The JSONStream package (or the newer stream-json) parses JSON incrementally:
npm install JSONStream
var fs = require('fs');
var JSONStream = require('JSONStream');
var count = 0;
fs.createReadStream('/data/large-array.json')
.pipe(JSONStream.parse('*'))
.on('data', function(record) {
count++;
// process one record at a time
})
.on('end', function() {
console.log('Processed ' + count + ' records');
console.log('Memory: ' + Math.round(process.memoryUsage().heapUsed / 1024 / 1024) + ' MB');
});
For a 500MB JSON file with 2 million records:
Processed 2000000 records
Memory: 22 MB
For newline-delimited JSON (JSONL/NDJSON), you do not need a special parser. Each line is a complete JSON object:
var fs = require('fs');
var readline = require('readline');
var rl = readline.createInterface({
input: fs.createReadStream('/data/events.jsonl')
});
rl.on('line', function(line) {
var event = JSON.parse(line);
// process event
});
Streaming HTTP Responses in Express
Streams work naturally with Express responses because res is a writable stream. Instead of building a complete response in memory, you can stream it directly to the client.
Streaming a File Download
var express = require('express');
var fs = require('fs');
var path = require('path');
var app = express();
app.get('/download/:filename', function(req, res) {
var filePath = path.join('/data/exports', req.params.filename);
fs.stat(filePath, function(err, stats) {
if (err) return res.status(404).json({ error: 'File not found' });
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Disposition', 'attachment; filename="' + req.params.filename + '"');
res.setHeader('Content-Length', stats.size);
var readStream = fs.createReadStream(filePath);
readStream.pipe(res);
readStream.on('error', function(err) {
console.error('Stream error: ' + err.message);
if (!res.headersSent) {
res.status(500).json({ error: 'Download failed' });
}
});
});
});
Streaming a Large JSON API Response
When an API endpoint returns thousands of records, building the entire JSON array in memory is wasteful. Stream it:
app.get('/api/export/users', function(req, res) {
var db = getDatabase(); // your database connection
var cursor = db.collection('users').find({}).stream();
var first = true;
res.setHeader('Content-Type', 'application/json');
res.write('[');
cursor.on('data', function(doc) {
if (!first) res.write(',');
first = false;
res.write(JSON.stringify(doc));
});
cursor.on('end', function() {
res.write(']');
res.end();
});
cursor.on('error', function(err) {
console.error('Cursor error: ' + err.message);
if (!res.headersSent) {
res.status(500).json({ error: 'Export failed' });
}
});
});
Streaming CSV Export
app.get('/api/export/csv', function(req, res) {
var db = getDatabase();
var cursor = db.collection('orders').find({}).stream();
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', 'attachment; filename="orders.csv"');
res.write('id,customer,amount,date\n');
cursor.on('data', function(doc) {
res.write(doc._id + ',' + doc.customer + ',' + doc.amount + ',' + doc.date + '\n');
});
cursor.on('end', function() {
res.end();
});
});
Stream Error Handling Patterns
Stream errors are one of the most common sources of unhandled exceptions in Node.js. Every stream in a pipeline can emit an error, and errors do not propagate through .pipe().
The Problem with pipe()
var fs = require('fs');
var zlib = require('zlib');
// If the read stream errors, the gzip and write streams are NOT cleaned up
fs.createReadStream('/nonexistent/file.csv')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('/data/output.csv.gz'));
// This crashes with: Error: ENOENT: no such file or directory
With .pipe(), you need error handlers on every stream:
var readable = fs.createReadStream('/data/input.csv');
var gzip = zlib.createGzip();
var writable = fs.createWriteStream('/data/output.csv.gz');
readable.on('error', handleError);
gzip.on('error', handleError);
writable.on('error', handleError);
readable.pipe(gzip).pipe(writable);
function handleError(err) {
console.error('Stream error: ' + err.message);
readable.destroy();
gzip.destroy();
writable.destroy();
}
This is tedious and error-prone. Enter pipeline().
pipeline() vs pipe()
The stream.pipeline() function (added in Node.js 10) solves the error propagation problem. It properly destroys all streams if any stream in the chain errors, and calls a single callback when the pipeline completes or fails.
var fs = require('fs');
var zlib = require('zlib');
var stream = require('stream');
stream.pipeline(
fs.createReadStream('/data/input.csv'),
zlib.createGzip(),
fs.createWriteStream('/data/output.csv.gz'),
function(err) {
if (err) {
console.error('Pipeline failed: ' + err.message);
} else {
console.log('Pipeline complete');
}
}
);
There is also a promise-based version:
var fs = require('fs');
var zlib = require('zlib');
var stream = require('stream');
var pipeline = require('util').promisify(stream.pipeline);
async function compressFile(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
} catch (err) {
console.error('Compression failed: ' + err.message);
}
}
Use pipeline() for everything. There is no good reason to use .pipe() in new code. pipeline() handles cleanup, error propagation, and backpressure correctly. .pipe() does not propagate errors and leaks streams on failure.
Object Mode Streams
By default, streams operate on Buffer or string chunks. Object mode lets streams pass JavaScript objects between transforms. This is how you build data processing pipelines that operate on records rather than raw bytes.
var stream = require('stream');
var objectReadable = new stream.Readable({
objectMode: true,
read: function() {
// push objects instead of strings or buffers
this.push({ id: 1, name: 'Widget', price: 29.99 });
this.push({ id: 2, name: 'Gadget', price: 49.99 });
this.push(null); // end of stream
}
});
var priceMultiplier = new stream.Transform({
objectMode: true,
transform: function(product, encoding, callback) {
product.priceWithTax = Math.round(product.price * 1.08 * 100) / 100;
this.push(product);
callback();
}
});
var objectWriter = new stream.Writable({
objectMode: true,
write: function(product, encoding, callback) {
console.log(product.name + ': $' + product.priceWithTax);
callback();
}
});
objectReadable.pipe(priceMultiplier).pipe(objectWriter);
Widget: $32.39
Gadget: $53.99
Important: highWaterMark in object mode counts objects, not bytes. The default is 16 objects. If your objects are large (e.g., each containing a 1MB buffer), 16 objects means 16MB of buffering. Adjust accordingly:
var transform = new stream.Transform({
objectMode: true,
highWaterMark: 4, // only buffer 4 objects
transform: function(chunk, encoding, callback) {
// ...
}
});
Complete Working Example
This script generates a large CSV file, then processes it through a streaming pipeline that parses records, filters them, transforms the data, and writes results to a new file. It logs memory usage throughout to prove it stays flat.
var fs = require('fs');
var stream = require('stream');
var path = require('path');
var pipeline = require('util').promisify(stream.pipeline);
var INPUT_FILE = path.join(__dirname, 'sample-data.csv');
var OUTPUT_FILE = path.join(__dirname, 'processed-output.csv');
var RECORD_COUNT = 2000000;
// Step 1: Generate a large CSV file
function generateCSV(filePath, count, callback) {
var ws = fs.createWriteStream(filePath);
var categories = ['Electronics', 'Clothing', 'Food', 'Books', 'Home', 'Sports'];
var regions = ['North', 'South', 'East', 'West', 'Central'];
ws.write('id,product,category,region,quantity,unit_price,date\n');
var i = 0;
function writeNext() {
var ok = true;
while (i < count && ok) {
i++;
var category = categories[i % categories.length];
var region = regions[i % regions.length];
var quantity = Math.floor(Math.random() * 100) + 1;
var unitPrice = (Math.random() * 500 + 1).toFixed(2);
var month = String(Math.floor(Math.random() * 12) + 1).padStart(2, '0');
var day = String(Math.floor(Math.random() * 28) + 1).padStart(2, '0');
var line = i + ',' + 'Product_' + i + ',' + category + ',' + region + ',' +
quantity + ',' + unitPrice + ',2025-' + month + '-' + day + '\n';
if (i === count) {
ws.end(line);
} else {
ok = ws.write(line);
}
}
if (i < count) {
ws.once('drain', writeNext);
}
}
ws.on('finish', function() {
var stats = fs.statSync(filePath);
console.log('Generated ' + count + ' records (' + Math.round(stats.size / 1024 / 1024) + ' MB)');
callback();
});
writeNext();
}
// Step 2: Build streaming transforms
function LineSplitter() {
var buffer = '';
return new stream.Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
buffer += chunk.toString();
var lines = buffer.split('\n');
buffer = lines.pop();
for (var i = 0; i < lines.length; i++) {
if (lines[i].trim()) this.push(lines[i].trim());
}
callback();
},
flush: function(callback) {
if (buffer.trim()) this.push(buffer.trim());
callback();
}
});
}
function CSVRecordParser() {
var headers = null;
return new stream.Transform({
objectMode: true,
transform: function(line, encoding, callback) {
var fields = line.split(',');
if (!headers) {
headers = fields;
callback();
return;
}
var record = {};
for (var i = 0; i < headers.length; i++) {
record[headers[i]] = fields[i] || '';
}
this.push(record);
callback();
}
});
}
function RecordFilter(predicate) {
return new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
if (predicate(record)) {
this.push(record);
}
callback();
}
});
}
function RecordTransformer(transformFn) {
return new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
this.push(transformFn(record));
callback();
}
});
}
function CSVSerializer(headers) {
var headerWritten = false;
return new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
if (!headerWritten) {
this.push(headers.join(',') + '\n');
headerWritten = true;
}
var values = headers.map(function(h) { return record[h] || ''; });
this.push(values.join(',') + '\n');
callback();
}
});
}
// Step 3: Run the pipeline
function logMemory(label) {
var mem = process.memoryUsage();
console.log('[' + label + '] Heap: ' + Math.round(mem.heapUsed / 1024 / 1024) + ' MB, ' +
'RSS: ' + Math.round(mem.rss / 1024 / 1024) + ' MB');
}
async function processCSV() {
var totalRead = 0;
var totalFiltered = 0;
var startTime = Date.now();
logMemory('Before processing');
// Log memory every 2 seconds during processing
var memInterval = setInterval(function() {
logMemory('Processing (' + totalRead + ' records read)');
}, 2000);
var counter = new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
totalRead++;
this.push(record);
callback();
}
});
var filteredCounter = new stream.Transform({
objectMode: true,
transform: function(record, encoding, callback) {
totalFiltered++;
this.push(record);
callback();
}
});
try {
await pipeline(
fs.createReadStream(INPUT_FILE, { highWaterMark: 64 * 1024 }),
LineSplitter(),
CSVRecordParser(),
counter,
// Filter: only Electronics and Books with quantity > 50
RecordFilter(function(r) {
return (r.category === 'Electronics' || r.category === 'Books') &&
parseInt(r.quantity) > 50;
}),
// Transform: compute total and add a revenue tier
RecordTransformer(function(r) {
var total = parseInt(r.quantity) * parseFloat(r.unit_price);
r.total = total.toFixed(2);
if (total > 10000) {
r.tier = 'high';
} else if (total > 5000) {
r.tier = 'medium';
} else {
r.tier = 'low';
}
return r;
}),
filteredCounter,
CSVSerializer(['id', 'product', 'category', 'region', 'quantity', 'unit_price', 'total', 'tier']),
fs.createWriteStream(OUTPUT_FILE)
);
clearInterval(memInterval);
var elapsed = ((Date.now() - startTime) / 1000).toFixed(2);
var outputStats = fs.statSync(OUTPUT_FILE);
console.log('\n--- Results ---');
console.log('Records read: ' + totalRead);
console.log('Records output: ' + totalFiltered);
console.log('Output file size: ' + Math.round(outputStats.size / 1024) + ' KB');
console.log('Time elapsed: ' + elapsed + 's');
logMemory('After processing');
} catch (err) {
clearInterval(memInterval);
console.error('Pipeline failed: ' + err.message);
process.exit(1);
}
}
// Run
console.log('Generating test data...');
generateCSV(INPUT_FILE, RECORD_COUNT, function() {
console.log('Processing with streams...\n');
processCSV();
});
Expected Output
Generating test data...
Generated 2000000 records (142 MB)
Processing with streams...
[Before processing] Heap: 8 MB, RSS: 42 MB
[Processing (489213 records read)] Heap: 14 MB, RSS: 55 MB
[Processing (1021847 records read)] Heap: 12 MB, RSS: 55 MB
[Processing (1558392 records read)] Heap: 13 MB, RSS: 55 MB
--- Results ---
Records read: 2000000
Records output: 164283
Output file size: 9847 KB
Time elapsed: 4.72s
[After processing] Heap: 11 MB, RSS: 54 MB
Notice the heap usage: it never exceeds 15MB despite processing a 142MB file with 2 million records. The RSS (resident set size) stays flat at ~55MB. If you ran this same dataset through fs.readFile() and JSON.parse(), you would see 400MB+ of memory usage.
Common Issues and Troubleshooting
1. "Error: stream.push() after EOF"
Error [ERR_STREAM_PUSH_AFTER_EOF]: stream.push() after EOF
You called this.push() after already pushing null (which signals end-of-stream). This usually happens when an async operation inside transform() completes after the stream has been destroyed. Fix: check this.destroyed before pushing:
transform: function(chunk, encoding, callback) {
doAsyncWork(chunk, function(err, result) {
if (!this.destroyed) {
this.push(result);
}
callback();
}.bind(this));
}
2. "MaxListenersExceededWarning: Possible EventEmitter memory leak detected"
MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
11 error listeners added to [ReadStream].
You are creating streams inside a loop or request handler without properly destroying them. Every stream is an EventEmitter. If you attach listeners in a loop, they accumulate. Fix: destroy streams when done and remove listeners.
// Wrong: creating a read stream per request without cleanup
app.get('/data', function(req, res) {
var rs = fs.createReadStream('/data/file.csv');
rs.pipe(res);
// If the client disconnects, the read stream is never closed
// Fix: clean up on client disconnect
req.on('close', function() {
rs.destroy();
});
});
3. "Cannot pipe, not readable" or "write after end"
Error [ERR_STREAM_WRITE_AFTER_END]: write after end
You are writing to a stream after calling .end(), or piping from a stream that has already emitted 'end'. Common cause: reusing a stream across multiple requests. Streams are not reusable. Create a new stream instance per use.
4. Data loss in Transform streams: missing flush()
Your transform processes all chunks but drops the last few records. The transform() method handles complete chunks, but the final chunk may be partial. Implement flush() to handle remaining buffered data:
var transform = new stream.Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
this._buffer = (this._buffer || '') + chunk.toString();
var lines = this._buffer.split('\n');
this._buffer = lines.pop(); // keep incomplete line
for (var i = 0; i < lines.length; i++) {
this.push(lines[i]);
}
callback();
},
flush: function(callback) {
// Do not forget the last line
if (this._buffer && this._buffer.trim()) {
this.push(this._buffer.trim());
}
callback();
}
});
5. Streams hang indefinitely with no data and no error
The stream sits open doing nothing. Common causes: you forgot to call callback() in a transform function, or you forgot to push null to end a custom readable. Every code path in transform() must call callback():
// Wrong: callback not called when record is filtered out
transform: function(record, encoding, callback) {
if (record.active) {
this.push(record);
callback(); // called here
}
// but NOT called when record.active is false -- stream hangs
}
// Fix: always call callback
transform: function(record, encoding, callback) {
if (record.active) {
this.push(record);
}
callback(); // always called
}
Best Practices
Always use
pipeline()over.pipe(). It handles error propagation, stream cleanup, and backpressure correctly..pipe()silently leaks streams on error.Set appropriate
highWaterMarkvalues. The default (64KB for byte streams, 16 objects for object mode) is reasonable for most cases. If you are processing large objects, lower it. If you need throughput on small records, raise it.Destroy streams explicitly when you are done with them. Especially in HTTP handlers where clients may disconnect. Call
stream.destroy()in error handlers and on client disconnect events.Always call
callback()in transform functions. Every code path -- success, error, filtered record -- must call the callback. A missing callback causes the stream to hang indefinitely with no error message.Implement
flush()on Transform streams that buffer data. If your transform accumulates partial data between chunks (like a line splitter), theflush()method processes whatever remains when the input ends.Prefer NDJSON (newline-delimited JSON) over JSON arrays for large datasets. NDJSON is trivially streamable -- each line is a complete JSON object. JSON arrays require a streaming parser.
Monitor memory usage during development. Add
process.memoryUsage()logging to your pipeline to verify memory stays flat. If heap usage grows linearly with input size, you have a buffering problem.Do not mix object mode and byte mode without an explicit boundary. If a transform accepts objects and needs to output bytes (or vice versa), set
readableObjectModeandwritableObjectModeindependently instead of using the globalobjectModeflag.Use
stream.finished()to detect when a stream is done. It handles all edge cases (error, premature close, destroy) that'end'and'finish'events miss individually.
var stream = require('stream');
stream.finished(myStream, function(err) {
if (err) {
console.error('Stream failed: ' + err.message);
} else {
console.log('Stream completed cleanly');
}
});
