Mongodb

MongoDB Change Streams for Real-Time Applications

A comprehensive guide to MongoDB change streams covering real-time event processing, WebSocket integration, resume tokens, and building reactive Node.js applications.

MongoDB Change Streams for Real-Time Applications

Every application eventually needs to react to data changes in real time. Whether it is sending a notification when an order ships, invalidating a cache when a product price changes, or syncing data to a search index, the fundamental problem is the same: you need to know when something changed, what changed, and you need to know immediately.

For years, developers solved this with polling — hammering the database every few seconds asking "anything new?" — or by tailing the MongoDB oplog directly, which was fragile and undocumented. MongoDB 3.6 introduced change streams, a proper, supported API for watching changes as they happen. After using change streams in production systems processing millions of events per day, I can say they are one of the most underappreciated features in the MongoDB ecosystem.

What Change Streams Actually Are

Under the hood, MongoDB replication works through the oplog (operations log), a capped collection that records every write operation. Change streams provide a safe, abstracted interface for tailing this oplog without touching it directly. The critical distinction is that change streams use the aggregation framework to filter and transform events before delivering them to your application, and they guarantee ordering through a logical clock.

This matters because tailing the oplog directly meant parsing internal formats that could change between versions, handling the oplog rolling over, and dealing with sharded clusters where you had multiple oplogs. Change streams handle all of that for you.

The one prerequisite you cannot skip: change streams require a replica set. A standalone mongod instance will not work. If you are running locally for development, you need at minimum a single-node replica set. This trips up nearly everyone on their first attempt.

// Initialize a single-node replica set for local development
// Run this in the mongo shell after starting mongod with --replSet rs0
rs.initiate({
  _id: "rs0",
  members: [{ _id: 0, host: "localhost:27017" }]
});

Opening a Change Stream

Change streams can be opened at three levels, each with different scope:

var MongoClient = require("mongodb").MongoClient;

var uri = "mongodb://localhost:27017/?replicaSet=rs0";

MongoClient.connect(uri, function (err, client) {
  if (err) throw err;

  var db = client.db("myapp");

  // Collection level — watch a single collection
  var orderStream = db.collection("orders").watch();

  // Database level — watch all collections in a database
  var dbStream = db.watch();

  // Deployment level — watch all databases (requires MongoDB 4.0+)
  var clusterStream = client.watch();

  orderStream.on("change", function (change) {
    console.log("Order changed:", change.operationType);
  });
});

Collection-level streams are the most common and the most performant because MongoDB only needs to filter events for a single namespace. Database-level streams are useful when you need to coordinate changes across multiple collections. Deployment-level streams are rare in practice but invaluable for building replication or audit systems that need to see everything.

Change Event Structure

Every change event follows a consistent structure. Understanding this shape is essential for building reliable handlers:

{
  _id: { _data: "826..." },           // Resume token
  operationType: "insert",             // What happened
  clusterTime: Timestamp(1707840000, 1), // When it happened
  ns: {                                 // Where it happened
    db: "myapp",
    coll: "orders"
  },
  documentKey: { _id: ObjectId("...") }, // Which document
  fullDocument: { ... }                  // The document itself (for inserts)
}

The operationType field tells you what happened. The main types are:

Operation When It Fires
insert New document created
update Document modified via update operators
replace Document replaced entirely
delete Document removed
invalidate Collection dropped or renamed
drop Collection dropped (DB/cluster level)
rename Collection renamed (DB/cluster level)

For update operations, you get an updateDescription field that tells you exactly which fields changed:

{
  operationType: "update",
  updateDescription: {
    updatedFields: { status: "shipped", updatedAt: ISODate("...") },
    removedFields: [],
    truncatedArrays: []
  }
}

This granularity is powerful. You do not need to diff the entire document to figure out what changed — MongoDB tells you explicitly.

Filtering with Aggregation Pipelines

Opening a raw change stream on a busy collection is like drinking from a fire hose. The aggregation pipeline parameter lets you filter events server-side, so only relevant changes reach your application:

// Only watch for order status changes to "shipped"
var pipeline = [
  {
    $match: {
      operationType: "update",
      "updateDescription.updatedFields.status": "shipped"
    }
  }
];

var stream = db.collection("orders").watch(pipeline);

stream.on("change", function (change) {
  console.log("Order shipped:", change.documentKey._id);
  sendShipmentNotification(change.documentKey._id);
});

You can use $match, $project, $addFields, $replaceRoot, $redact, and $unset in the pipeline. This runs on the server before events are sent to your application, which reduces network traffic and processing load significantly.

// Watch for high-value inserts and project only needed fields
var pipeline = [
  {
    $match: {
      operationType: "insert",
      "fullDocument.total": { $gte: 1000 }
    }
  },
  {
    $project: {
      operationType: 1,
      "fullDocument.customerId": 1,
      "fullDocument.total": 1,
      "fullDocument.items": 1
    }
  }
];

var stream = db.collection("orders").watch(pipeline);

The key insight here is that these pipelines execute in the database, not in your Node.js process. If you have a collection receiving 10,000 writes per second but you only care about 50 of them, that filtering happens at the source.

The fullDocument Option

For update and replace operations, the change event does not include the full document by default. You get the updateDescription (which fields changed) but not the complete current state. The fullDocument option changes this:

// updateLookup: performs a find() to get the current document
var stream = db.collection("orders").watch([], {
  fullDocument: "updateLookup"
});

stream.on("change", function (change) {
  // change.fullDocument contains the complete document at lookup time
  console.log("Full order:", change.fullDocument);
});

There is a subtlety here that matters in production: updateLookup does a point-in-time read when the event is delivered, not when the change occurred. If the document was updated again between the original change and the lookup, you get the newer version. In high-write scenarios this can lead to stale or skipped intermediate states.

MongoDB 6.0 introduced pre-image and post-image support, which solves this problem by capturing the document state at the exact moment of the change:

// First, enable pre/post images on the collection
db.command({
  collMod: "orders",
  changeStreamPreAndPostImages: { enabled: true }
});

// Then request them when opening the stream
var stream = db.collection("orders").watch([], {
  fullDocument: "whenAvailable",
  fullDocumentBeforeChange: "whenAvailable"
});

stream.on("change", function (change) {
  console.log("Before:", change.fullDocumentBeforeChange);
  console.log("After:", change.fullDocument);
});

Pre-images and post-images are stored in a system collection and have a configurable expiry. This is a game-changer for audit logs and data synchronization where you need to know both the old and new state.

Resume Tokens and Fault Tolerance

This is where change streams separate themselves from every other approach. Every change event includes a resume token in the _id field. If your application crashes, restarts, or loses its database connection, you can resume the stream from exactly where you left off:

var lastResumeToken = null;

function startWatching(resumeAfter) {
  var options = {};
  if (resumeAfter) {
    options.resumeAfter = resumeAfter;
  }

  var stream = db.collection("orders").watch([], options);

  stream.on("change", function (change) {
    lastResumeToken = change._id;
    processChange(change);
    saveResumeToken(lastResumeToken);
  });

  stream.on("error", function (err) {
    console.error("Stream error:", err.message);
    setTimeout(function () {
      startWatching(lastResumeToken);
    }, 5000);
  });

  stream.on("close", function () {
    console.log("Stream closed, reconnecting...");
    setTimeout(function () {
      startWatching(lastResumeToken);
    }, 5000);
  });
}

The resume token is opaque — do not try to parse or construct one. Store it as-is in a durable location (a MongoDB collection, Redis, or a file on disk). When you resume, MongoDB replays all events that occurred after that token, in order, without duplicates.

There are two resume options:

  • resumeAfter — resume after the given token (you already processed that event)
  • startAfter — similar to resumeAfter but works after an invalidate event, allowing you to restart the stream even if the collection was dropped and recreated

You can also use startAtOperationTime to resume from a specific cluster time, which is useful for initial bootstrapping:

var stream = db.collection("orders").watch([], {
  startAtOperationTime: new Timestamp(1707840000, 1)
});

Change Streams vs Polling vs Oplog Tailing

Approach Latency Reliability Complexity Ordering
Polling Seconds Low Low None
Oplog tailing Milliseconds Fragile Very High Manual
Change streams Milliseconds High Low Guaranteed

Polling wastes resources on empty queries and introduces latency equal to your poll interval. Oplog tailing is fast but requires parsing internal formats, manually tracking position, and breaks across MongoDB versions. Change streams give you sub-second latency with built-in fault tolerance and guaranteed ordering. There is no reason to use the other approaches in any modern MongoDB deployment.

Complete Working Example: Real-Time Order Notification System

Here is a production-grade system that streams order changes to connected browser clients via Socket.io, with resume token persistence and automatic reconnection.

// server.js
var express = require("express");
var http = require("http");
var socketIo = require("socket.io");
var MongoClient = require("mongodb").MongoClient;

var app = express();
var server = http.createServer(app);
var io = socketIo(server);

var MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
var DB_NAME = "orderapp";
var RESUME_COLLECTION = "changeStreamResumeTokens";
var STREAM_NAME = "orderNotifications";

var db = null;
var changeStream = null;

function connectToMongo(callback) {
  MongoClient.connect(MONGO_URI, function (err, client) {
    if (err) return callback(err);
    db = client.db(DB_NAME);
    console.log("Connected to MongoDB");
    callback(null);
  });
}

function loadResumeToken(callback) {
  db.collection(RESUME_COLLECTION).findOne(
    { streamName: STREAM_NAME },
    function (err, doc) {
      if (err) return callback(err, null);
      var token = doc ? doc.resumeToken : null;
      callback(null, token);
    }
  );
}

function saveResumeToken(token, callback) {
  db.collection(RESUME_COLLECTION).updateOne(
    { streamName: STREAM_NAME },
    {
      $set: {
        resumeToken: token,
        updatedAt: new Date()
      }
    },
    { upsert: true },
    function (err) {
      if (err) console.error("Failed to save resume token:", err.message);
      if (callback) callback(err);
    }
  );
}

function handleChange(change) {
  var event = {
    type: change.operationType,
    orderId: change.documentKey._id.toString(),
    timestamp: new Date().toISOString()
  };

  if (change.operationType === "insert" && change.fullDocument) {
    event.order = {
      customerId: change.fullDocument.customerId,
      status: change.fullDocument.status,
      total: change.fullDocument.total,
      items: change.fullDocument.items
    };
    event.message = "New order placed: $" + change.fullDocument.total;
  }

  if (change.operationType === "update" && change.updateDescription) {
    event.updatedFields = change.updateDescription.updatedFields;
    if (change.updateDescription.updatedFields.status) {
      event.message = "Order status changed to: " +
        change.updateDescription.updatedFields.status;
    }
  }

  if (change.operationType === "delete") {
    event.message = "Order cancelled";
  }

  io.emit("orderChange", event);
  saveResumeToken(change._id);
}

function startChangeStream(resumeToken) {
  var pipeline = [
    {
      $match: {
        operationType: { $in: ["insert", "update", "delete"] }
      }
    }
  ];

  var options = {
    fullDocument: "updateLookup"
  };

  if (resumeToken) {
    options.resumeAfter = resumeToken;
    console.log("Resuming change stream from saved token");
  }

  changeStream = db.collection("orders").watch(pipeline, options);

  changeStream.on("change", function (change) {
    try {
      handleChange(change);
    } catch (err) {
      console.error("Error processing change:", err.message);
    }
  });

  changeStream.on("error", function (err) {
    console.error("Change stream error:", err.message);
    changeStream = null;
    scheduleReconnect();
  });

  changeStream.on("close", function () {
    console.log("Change stream closed");
    changeStream = null;
    scheduleReconnect();
  });

  console.log("Change stream started on orders collection");
}

var reconnectAttempts = 0;
var MAX_RECONNECT_DELAY = 30000;

function scheduleReconnect() {
  var delay = Math.min(
    1000 * Math.pow(2, reconnectAttempts),
    MAX_RECONNECT_DELAY
  );
  reconnectAttempts++;

  console.log("Reconnecting in " + delay + "ms (attempt " + reconnectAttempts + ")");

  setTimeout(function () {
    loadResumeToken(function (err, token) {
      if (err) {
        console.error("Failed to load resume token:", err.message);
        scheduleReconnect();
        return;
      }
      startChangeStream(token);
      reconnectAttempts = 0;
    });
  }, delay);
}

// Socket.io connection handling
io.on("connection", function (socket) {
  console.log("Client connected:", socket.id);

  socket.on("disconnect", function () {
    console.log("Client disconnected:", socket.id);
  });
});

// REST endpoint to create test orders
app.use(express.json());

app.post("/api/orders", function (req, res) {
  var order = {
    customerId: req.body.customerId || "cust_" + Date.now(),
    items: req.body.items || [],
    total: req.body.total || 0,
    status: "pending",
    createdAt: new Date()
  };

  db.collection("orders").insertOne(order, function (err, result) {
    if (err) return res.status(500).json({ error: err.message });
    order._id = result.insertedId;
    res.status(201).json(order);
  });
});

app.put("/api/orders/:id/status", function (req, res) {
  var ObjectId = require("mongodb").ObjectId;
  var id = new ObjectId(req.params.id);

  db.collection("orders").updateOne(
    { _id: id },
    {
      $set: {
        status: req.body.status,
        updatedAt: new Date()
      }
    },
    function (err, result) {
      if (err) return res.status(500).json({ error: err.message });
      if (result.matchedCount === 0) return res.status(404).json({ error: "Order not found" });
      res.json({ updated: true, status: req.body.status });
    }
  );
});

// Serve a simple client page
app.get("/", function (req, res) {
  res.send([
    "<!DOCTYPE html><html><head><title>Order Monitor</title>",
    '<script src="/socket.io/socket.io.js"></script></head>',
    "<body><h1>Live Order Feed</h1><div id=\"feed\"></div>",
    "<script>",
    "var socket = io();",
    "socket.on('orderChange', function(data) {",
    "  var div = document.createElement('div');",
    "  div.textContent = data.timestamp + ' - ' + data.message;",
    "  document.getElementById('feed').prepend(div);",
    "});",
    "</script></body></html>"
  ].join(""));
});

// Start everything
connectToMongo(function (err) {
  if (err) {
    console.error("Failed to connect to MongoDB:", err.message);
    process.exit(1);
  }

  loadResumeToken(function (err, token) {
    if (err) console.error("Could not load resume token:", err.message);
    startChangeStream(token);
  });

  server.listen(3000, function () {
    console.log("Server running on http://localhost:3000");
  });
});

This example handles the three things that matter most in production: pipeline filtering to reduce noise, resume token persistence so you never miss an event across restarts, and exponential backoff reconnection so a transient database hiccup does not crash your application.

Scaling Change Streams

Each change stream cursor is a single connection to a replica set member. In a sharded cluster, the mongos router opens one cursor per shard internally. This means:

  • One stream per process is fine. Do not open multiple streams on the same collection in the same process thinking it will be faster. It will not.
  • Distribute across secondaries. Use read preference secondary for change streams that do not need the absolute latest write:
var stream = db.collection("orders").watch([], {
  readPreference: "secondary"
});
  • Partition by pipeline. If you need different processing for different event types, use pipeline filtering rather than multiple streams. One stream with $match is cheaper than three separate streams.
  • Horizontal scaling means running change stream consumers on multiple application instances with coordination. Use a single consumer that publishes to a message queue (Redis Pub/Sub, RabbitMQ, or Kafka) and let multiple workers consume from that queue.

Performance Considerations

Change streams add minimal overhead to write operations. The oplog is written regardless of whether you have a change stream open. The cost is in the filtering and delivery:

  • Pipeline complexity matters. A simple $match on operationType is nearly free. A $lookup in your pipeline (if supported in your version) adds real latency.
  • fullDocument: "updateLookup" adds a read per event. On high-throughput collections, this means your change stream consumer is generating substantial read load. Use it only when you actually need the full document.
  • Pre-images and post-images consume storage. They are stored in a system collection with a configurable expiry (expireAfterSeconds). Size this based on your write volume and retention needs.
  • Batch processing. For high-throughput streams, accumulate events and process them in batches rather than one-at-a-time:
var buffer = [];
var BATCH_SIZE = 100;
var FLUSH_INTERVAL = 1000;

stream.on("change", function (change) {
  buffer.push(change);
  if (buffer.length >= BATCH_SIZE) {
    flushBuffer();
  }
});

setInterval(function () {
  if (buffer.length > 0) {
    flushBuffer();
  }
}, FLUSH_INTERVAL);

function flushBuffer() {
  var batch = buffer.splice(0, buffer.length);
  var lastToken = batch[batch.length - 1]._id;
  processBatch(batch, function (err) {
    if (!err) {
      saveResumeToken(lastToken);
    }
  });
}

Real-World Use Cases

Cache invalidation. Watch for updates on collections that back your cache layer. When a document changes, invalidate the corresponding cache key. This is far more reliable than TTL-based expiry because the invalidation is immediate and targeted.

Search index synchronization. Stream inserts, updates, and deletes to an Elasticsearch or Typesense index. The resume token guarantees you never miss a change, so your search index stays in perfect sync without full reindexing.

Notifications. The order notification example above is the most common use case. Trigger emails, push notifications, or SMS messages based on document state transitions.

Audit logging. With pre-image and post-image support, you can capture a complete before/after snapshot of every change and write it to an immutable audit collection. This satisfies compliance requirements without cluttering your application logic.

Data synchronization. Stream changes from one MongoDB deployment to another, or to a data warehouse. The ordering guarantees and resume tokens make this reliable enough for production data pipelines.

Materialized views. Maintain denormalized or aggregated views of your data that update in real time. When an order line item changes, automatically recalculate the order total in a summary collection.

Common Issues and Troubleshooting

"The $changeStream stage is only supported on replica sets" — You are running a standalone mongod. Even for local development, you must initialize a replica set. Start mongod with --replSet rs0 and run rs.initiate() in the shell.

Change stream misses events after restart — You are not persisting the resume token, or you are persisting it before processing the event. Always save the token after successfully processing the change. If your process crashes mid-processing, you will reprocess that event on restart, which is correct — design your handlers to be idempotent.

"Resume of change stream was not possible" error — The oplog has rolled past your resume token. This happens when your application was down for too long relative to your oplog size. Increase the oplog size or reduce downtime. As a fallback, catch this error and start a fresh stream without a resume token, then perform a full sync to reconcile.

fullDocument is null on update events — Without the fullDocument option, update events only include updateDescription. If you set fullDocument: "updateLookup" and still get null, the document was deleted between the change and the lookup. Handle this case explicitly in your code.

High memory usage on the change stream consumer — You are likely buffering too many events without processing them. Implement backpressure by using the cursor's next() method in a loop rather than the event emitter pattern, so you control the pace of consumption:

function processStream(stream) {
  stream.next().then(function (change) {
    return processChange(change).then(function () {
      return saveResumeToken(change._id);
    });
  }).then(function () {
    processStream(stream);
  }).catch(function (err) {
    console.error("Stream error:", err.message);
    scheduleReconnect();
  });
}

Best Practices

  1. Always persist resume tokens. Store them in MongoDB itself (a separate collection) or another durable store. Without resume tokens, a process restart means either missing events or reprocessing everything from the beginning.

  2. Make change handlers idempotent. Because resume tokens let you replay events, and because network issues can cause duplicate delivery, your handlers must produce the same result when processing the same event twice.

  3. Filter server-side with pipelines. Never open an unfiltered stream and discard events in application code. Let MongoDB filter events before they cross the network.

  4. Use exponential backoff for reconnection. A tight retry loop during a database outage generates useless connection attempts and log noise. Back off exponentially with a reasonable maximum delay.

  5. Monitor your oplog size. If your consumers can go offline for extended periods, ensure your oplog is large enough to retain events for that duration. Use db.getReplicationInfo() to check your oplog window.

  6. Separate concerns across streams. Do not overload a single change event handler with notification logic, cache invalidation, and audit logging. Either use a fan-out pattern through a message queue, or run separate consumers with separate resume tokens for each concern.

  7. Test with invalidate events. Collection drops and renames produce invalidate events that close the stream. Your reconnection logic must handle this gracefully, potentially using startAfter instead of resumeAfter.

  8. Set maxAwaitTimeMS for responsiveness. This controls how long the cursor waits on the server for new events before returning an empty batch. The default is fine for most cases, but latency-sensitive applications may want to tune it:

var stream = db.collection("orders").watch([], {
  maxAwaitTimeMS: 500
});

References

Powered by Contentful