Serverless

Event-Driven Serverless Patterns

Build event-driven serverless architectures with EventBridge, DynamoDB Streams, and saga patterns for distributed Node.js applications

Event-Driven Serverless Patterns

Overview

Event-driven serverless architecture is the backbone of modern distributed systems that need to scale independently, remain loosely coupled, and react to state changes in real time. Instead of services calling each other directly through synchronous HTTP requests, they emit events and let downstream consumers react asynchronously. This article walks through the core patterns you need to build production-grade event-driven systems on AWS serverless infrastructure using Node.js.

Prerequisites

  • Node.js 18+ installed locally
  • AWS account with IAM permissions for Lambda, EventBridge, DynamoDB, S3, API Gateway, and SQS
  • AWS CLI configured with credentials
  • Basic understanding of AWS Lambda and serverless concepts
  • Familiarity with the Serverless Framework or AWS SAM

Event-Driven Architecture Fundamentals

The core idea is simple: producers emit events, and consumers subscribe to them. There is no direct coupling between the two. An event represents a fact — something that happened. It is immutable. Once an order is placed, that event exists forever.

There are three roles in any event-driven system:

  1. Producers — services that generate events when state changes occur
  2. Event Bus — the routing infrastructure that delivers events to the right consumers
  3. Consumers — services that react to events they care about
// A simple event structure
var event = {
  source: "order-service",
  detailType: "OrderPlaced",
  time: new Date().toISOString(),
  detail: {
    orderId: "ord-12345",
    customerId: "cust-789",
    items: [
      { productId: "prod-001", quantity: 2, price: 29.99 },
      { productId: "prod-002", quantity: 1, price: 49.99 }
    ],
    total: 109.97,
    currency: "USD"
  }
};

The key distinction from request-response is that the producer does not wait for anything. It fires the event and moves on. This decoupling means the inventory service can go down for maintenance without breaking the order flow. Events queue up and get processed when the service comes back online.

EventBridge as an Event Bus

Amazon EventBridge is the managed event bus that sits at the center of your event-driven architecture. It accepts events from any source, matches them against rules you define, and routes them to target services. Think of it as a sophisticated message router with built-in schema management.

var AWS = require("aws-sdk");
var eventbridge = new AWS.EventBridge({ region: "us-east-1" });

function publishEvent(source, detailType, detail) {
  var params = {
    Entries: [
      {
        Source: source,
        DetailType: detailType,
        Detail: JSON.stringify(detail),
        EventBusName: "ecommerce-events",
        Time: new Date()
      }
    ]
  };

  return eventbridge.putEvents(params).promise();
}

// Usage in a Lambda handler
exports.handler = function(event, context, callback) {
  var orderData = JSON.parse(event.body);

  publishEvent("order-service", "OrderPlaced", {
    orderId: orderData.orderId,
    customerId: orderData.customerId,
    items: orderData.items,
    total: orderData.total,
    timestamp: Date.now()
  })
  .then(function(result) {
    console.log("Event published:", JSON.stringify(result));
    callback(null, {
      statusCode: 200,
      body: JSON.stringify({ message: "Order placed", orderId: orderData.orderId })
    });
  })
  .catch(function(err) {
    console.error("Failed to publish event:", err);
    callback(null, { statusCode: 500, body: JSON.stringify({ error: "Internal error" }) });
  });
};

Creating a custom event bus keeps your events isolated from the default bus and gives you fine-grained control over access policies:

var createBusParams = {
  Name: "ecommerce-events",
  Tags: [
    { Key: "environment", Value: "production" },
    { Key: "team", Value: "platform" }
  ]
};

eventbridge.createEventBus(createBusParams).promise()
  .then(function(result) {
    console.log("Event bus created:", result.EventBusArn);
  });

Custom Event Schemas

Defining schemas up front prevents chaos. When ten services produce events with no contract, you end up with inconsistent payloads that break consumers at runtime. EventBridge has a schema registry, but I prefer defining schemas in code and validating them before publishing.

var Ajv = require("ajv");
var ajv = new Ajv();

var orderPlacedSchema = {
  type: "object",
  required: ["orderId", "customerId", "items", "total", "timestamp"],
  properties: {
    orderId: { type: "string", pattern: "^ord-[a-z0-9]+$" },
    customerId: { type: "string", pattern: "^cust-[a-z0-9]+$" },
    items: {
      type: "array",
      minItems: 1,
      items: {
        type: "object",
        required: ["productId", "quantity", "price"],
        properties: {
          productId: { type: "string" },
          quantity: { type: "integer", minimum: 1 },
          price: { type: "number", minimum: 0 }
        }
      }
    },
    total: { type: "number", minimum: 0 },
    timestamp: { type: "number" }
  },
  additionalProperties: false
};

var validate = ajv.compile(orderPlacedSchema);

function validateAndPublish(detailType, detail, schema) {
  var valid = validate(detail);
  if (!valid) {
    var errorMsg = "Schema validation failed: " + JSON.stringify(validate.errors);
    console.error(errorMsg);
    return Promise.reject(new Error(errorMsg));
  }
  return publishEvent("order-service", detailType, detail);
}

Register your schemas in the EventBridge Schema Registry so other teams can discover them:

var schemas = new AWS.Schemas({ region: "us-east-1" });

var schemaParams = {
  Content: JSON.stringify({
    openapi: "3.0.0",
    info: { title: "OrderPlaced", version: "1" },
    paths: {},
    components: {
      schemas: {
        OrderPlaced: orderPlacedSchema
      }
    }
  }),
  RegistryName: "ecommerce-events",
  SchemaName: "order-service@OrderPlaced",
  Type: "OpenApi3"
};

schemas.createSchema(schemaParams).promise();

Event Routing Rules

EventBridge rules match incoming events using JSON-based content filtering. You can filter on any field in the event envelope or the detail payload. This is where the real power lives — each consumer only receives the events it cares about.

var ruleParams = {
  Name: "route-order-to-inventory",
  EventBusName: "ecommerce-events",
  EventPattern: JSON.stringify({
    source: ["order-service"],
    "detail-type": ["OrderPlaced"],
    detail: {
      total: [{ numeric: [">", 0] }]
    }
  }),
  State: "ENABLED",
  Description: "Routes order events to inventory service"
};

eventbridge.putRule(ruleParams).promise()
  .then(function() {
    return eventbridge.putTargets({
      Rule: "route-order-to-inventory",
      EventBusName: "ecommerce-events",
      Targets: [
        {
          Id: "inventory-lambda",
          Arn: "arn:aws:lambda:us-east-1:123456789:function:inventory-handler",
          RetryPolicy: {
            MaximumRetryAttempts: 3,
            MaximumEventAgeInSeconds: 3600
          },
          DeadLetterConfig: {
            Arn: "arn:aws:sqs:us-east-1:123456789:inventory-dlq"
          }
        }
      ]
    }).promise();
  });

Content-based filtering supports prefix matching, suffix matching, numeric comparisons, and exists checks. Use these to avoid over-triggering consumers:

{
  "source": ["order-service"],
  "detail-type": ["OrderPlaced"],
  "detail": {
    "items": {
      "productId": [{ "prefix": "prod-electronics-" }]
    },
    "total": [{ "numeric": [">=", 100, "<=", 10000] }],
    "promoCode": [{ "exists": true }]
  }
}

S3 Event Notifications

S3 buckets are natural event producers. File uploads, deletions, and transitions trigger events that you can route through EventBridge. This is essential for data pipelines, image processing, and ETL workflows.

// Lambda triggered by S3 event via EventBridge
exports.handler = function(event, context, callback) {
  var s3Event = event.detail;
  var bucket = s3Event.bucket.name;
  var key = s3Event.object.key;
  var size = s3Event.object.size;

  console.log("Processing file:", bucket + "/" + key, "Size:", size);

  if (key.startsWith("uploads/invoices/")) {
    return processInvoice(bucket, key)
      .then(function(result) {
        callback(null, result);
      })
      .catch(function(err) {
        console.error("Invoice processing failed:", err);
        callback(err);
      });
  }

  if (key.startsWith("uploads/images/")) {
    return generateThumbnails(bucket, key)
      .then(function(result) {
        callback(null, result);
      })
      .catch(function(err) {
        console.error("Thumbnail generation failed:", err);
        callback(err);
      });
  }

  callback(null, { message: "No handler for prefix" });
};

function processInvoice(bucket, key) {
  var s3 = new AWS.S3();
  return s3.getObject({ Bucket: bucket, Key: key }).promise()
    .then(function(data) {
      var content = data.Body.toString("utf-8");
      // Parse and store invoice data
      return { processed: true, key: key };
    });
}

Enable EventBridge notifications on your S3 bucket through CloudFormation or the SDK:

# CloudFormation
InvoiceBucket:
  Type: AWS::S3::Bucket
  Properties:
    BucketName: ecommerce-invoices
    NotificationConfiguration:
      EventBridgeConfiguration:
        EventBridgeEnabled: true

DynamoDB Streams as Event Sources

DynamoDB Streams capture every write operation on a table and deliver them as an ordered sequence of records. This is the change data capture (CDC) pattern — your Lambda reacts to inserts, updates, and deletes without polling.

// Lambda triggered by DynamoDB Stream
exports.handler = function(event, context, callback) {
  var promises = event.Records.map(function(record) {
    var eventName = record.eventName; // INSERT, MODIFY, REMOVE
    var newImage = record.dynamodb.NewImage;
    var oldImage = record.dynamodb.OldImage;

    if (eventName === "INSERT") {
      return handleNewOrder(AWS.DynamoDB.Converter.unmarshall(newImage));
    }

    if (eventName === "MODIFY") {
      var newData = AWS.DynamoDB.Converter.unmarshall(newImage);
      var oldData = AWS.DynamoDB.Converter.unmarshall(oldImage);

      if (oldData.status !== newData.status) {
        return handleStatusChange(newData, oldData.status, newData.status);
      }
    }

    if (eventName === "REMOVE") {
      return handleOrderCancelled(AWS.DynamoDB.Converter.unmarshall(oldImage));
    }

    return Promise.resolve();
  });

  Promise.all(promises)
    .then(function() { callback(null, "Success"); })
    .catch(function(err) { callback(err); });
};

function handleStatusChange(order, oldStatus, newStatus) {
  console.log("Order", order.orderId, "changed from", oldStatus, "to", newStatus);

  return publishEvent("order-service", "OrderStatusChanged", {
    orderId: order.orderId,
    previousStatus: oldStatus,
    newStatus: newStatus,
    timestamp: Date.now()
  });
}

Configure the event source mapping to control batch size and parallelization:

# SAM template
OrderStreamFunction:
  Type: AWS::Serverless::Function
  Properties:
    Handler: stream-handler.handler
    Runtime: nodejs18.x
    Events:
      OrderStream:
        Type: DynamoDB
        Properties:
          Stream: !GetAtt OrdersTable.StreamArn
          StartingPosition: TRIM_HORIZON
          BatchSize: 25
          MaximumBatchingWindowInSeconds: 5
          MaximumRetryAttempts: 3
          BisectBatchOnFunctionError: true
          ParallelizationFactor: 5
          DestinationConfig:
            OnFailure:
              Destination: !GetAtt StreamDLQ.Arn

API Gateway WebSocket for Real-Time Events

When clients need real-time updates (order status changes, live notifications), WebSocket APIs through API Gateway let you push events directly to connected browsers or mobile apps.

var AWS = require("aws-sdk");

// Connection handler
exports.connect = function(event, context, callback) {
  var connectionId = event.requestContext.connectionId;
  var userId = event.queryStringParameters.userId;

  var dynamodb = new AWS.DynamoDB.DocumentClient();

  dynamodb.put({
    TableName: "websocket-connections",
    Item: {
      connectionId: connectionId,
      userId: userId,
      connectedAt: Date.now(),
      ttl: Math.floor(Date.now() / 1000) + 86400
    }
  }).promise()
  .then(function() {
    callback(null, { statusCode: 200, body: "Connected" });
  })
  .catch(function(err) {
    callback(null, { statusCode: 500, body: "Failed to connect" });
  });
};

// Push event to connected clients
exports.pushToUser = function(userId, payload) {
  var dynamodb = new AWS.DynamoDB.DocumentClient();
  var apigateway;

  return dynamodb.query({
    TableName: "websocket-connections",
    IndexName: "userId-index",
    KeyConditionExpression: "userId = :uid",
    ExpressionAttributeValues: { ":uid": userId }
  }).promise()
  .then(function(result) {
    apigateway = new AWS.ApiGatewayManagementApi({
      endpoint: process.env.WEBSOCKET_ENDPOINT
    });

    var sends = result.Items.map(function(connection) {
      return apigateway.postToConnection({
        ConnectionId: connection.connectionId,
        Data: JSON.stringify(payload)
      }).promise()
      .catch(function(err) {
        if (err.statusCode === 410) {
          // Connection is stale, clean it up
          return dynamodb.delete({
            TableName: "websocket-connections",
            Key: { connectionId: connection.connectionId }
          }).promise();
        }
        throw err;
      });
    });

    return Promise.all(sends);
  });
};

Event Sourcing Pattern in Serverless

Event sourcing stores every state change as an immutable event rather than overwriting the current state. The current state is derived by replaying events in order. This gives you a complete audit trail and the ability to reconstruct state at any point in time.

var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();

var EVENT_STORE_TABLE = "event-store";

function appendEvent(aggregateId, eventType, eventData, expectedVersion) {
  var version = expectedVersion + 1;

  var params = {
    TableName: EVENT_STORE_TABLE,
    Item: {
      aggregateId: aggregateId,
      version: version,
      eventType: eventType,
      data: eventData,
      timestamp: new Date().toISOString(),
      eventId: require("crypto").randomUUID()
    },
    ConditionExpression: "attribute_not_exists(aggregateId) AND attribute_not_exists(version)"
  };

  return dynamodb.put(params).promise();
}

function getEvents(aggregateId) {
  return dynamodb.query({
    TableName: EVENT_STORE_TABLE,
    KeyConditionExpression: "aggregateId = :id",
    ExpressionAttributeValues: { ":id": aggregateId },
    ScanIndexForward: true
  }).promise()
  .then(function(result) {
    return result.Items;
  });
}

function rebuildState(aggregateId) {
  return getEvents(aggregateId).then(function(events) {
    var state = { status: "unknown", items: [], total: 0 };

    events.forEach(function(event) {
      switch (event.eventType) {
        case "OrderCreated":
          state.orderId = event.data.orderId;
          state.customerId = event.data.customerId;
          state.status = "created";
          state.createdAt = event.timestamp;
          break;
        case "ItemAdded":
          state.items.push(event.data.item);
          state.total = state.total + (event.data.item.price * event.data.item.quantity);
          break;
        case "ItemRemoved":
          state.items = state.items.filter(function(i) {
            return i.productId !== event.data.productId;
          });
          state.total = state.items.reduce(function(sum, i) {
            return sum + (i.price * i.quantity);
          }, 0);
          break;
        case "OrderConfirmed":
          state.status = "confirmed";
          state.confirmedAt = event.timestamp;
          break;
        case "OrderShipped":
          state.status = "shipped";
          state.trackingNumber = event.data.trackingNumber;
          break;
        case "OrderCancelled":
          state.status = "cancelled";
          state.cancelReason = event.data.reason;
          break;
      }
    });

    state.version = events.length;
    return state;
  });
}

CQRS with Serverless

Command Query Responsibility Segregation (CQRS) separates your write model from your read model. Commands mutate state through events. Queries read from optimized projections. In serverless, DynamoDB Streams naturally bridge the two sides.

// Command side — write handler
exports.placeOrder = function(event, context, callback) {
  var command = JSON.parse(event.body);

  // Validate command
  if (!command.customerId || !command.items || command.items.length === 0) {
    return callback(null, {
      statusCode: 400,
      body: JSON.stringify({ error: "Invalid order command" })
    });
  }

  var orderId = "ord-" + Date.now().toString(36);

  // Write to event store (command side)
  appendEvent(orderId, "OrderCreated", {
    orderId: orderId,
    customerId: command.customerId,
    items: command.items,
    total: command.items.reduce(function(sum, item) {
      return sum + (item.price * item.quantity);
    }, 0)
  }, 0)
  .then(function() {
    callback(null, {
      statusCode: 201,
      body: JSON.stringify({ orderId: orderId })
    });
  })
  .catch(function(err) {
    callback(null, {
      statusCode: 500,
      body: JSON.stringify({ error: "Failed to place order" })
    });
  });
};

// Query side — read projection handler
// Triggered by DynamoDB Stream on the event store table
exports.updateProjection = function(event, context, callback) {
  var dynamodb = new AWS.DynamoDB.DocumentClient();

  var updates = event.Records
    .filter(function(record) { return record.eventName === "INSERT"; })
    .map(function(record) {
      var item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);

      if (item.eventType === "OrderCreated") {
        return dynamodb.put({
          TableName: "orders-read-model",
          Item: {
            orderId: item.data.orderId,
            customerId: item.data.customerId,
            status: "created",
            total: item.data.total,
            itemCount: item.data.items.length,
            createdAt: item.timestamp,
            updatedAt: item.timestamp
          }
        }).promise();
      }

      if (item.eventType === "OrderShipped") {
        return dynamodb.update({
          TableName: "orders-read-model",
          Key: { orderId: item.aggregateId },
          UpdateExpression: "SET #s = :status, trackingNumber = :tracking, updatedAt = :ts",
          ExpressionAttributeNames: { "#s": "status" },
          ExpressionAttributeValues: {
            ":status": "shipped",
            ":tracking": item.data.trackingNumber,
            ":ts": item.timestamp
          }
        }).promise();
      }

      return Promise.resolve();
    });

  Promise.all(updates)
    .then(function() { callback(null, "Projections updated"); })
    .catch(function(err) { callback(err); });
};

// Query side — read handler
exports.getOrder = function(event, context, callback) {
  var orderId = event.pathParameters.orderId;
  var dynamodb = new AWS.DynamoDB.DocumentClient();

  dynamodb.get({
    TableName: "orders-read-model",
    Key: { orderId: orderId }
  }).promise()
  .then(function(result) {
    if (!result.Item) {
      return callback(null, { statusCode: 404, body: JSON.stringify({ error: "Not found" }) });
    }
    callback(null, { statusCode: 200, body: JSON.stringify(result.Item) });
  });
};

Saga Pattern for Distributed Transactions

In microservices there is no distributed transaction coordinator. The saga pattern chains local transactions across services using events. Each step either succeeds and triggers the next step, or fails and triggers compensating actions to roll back previous steps.

var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var eventbridge = new AWS.EventBridge();

var SAGA_TABLE = "order-saga-state";

// Step 1: Start saga — reserve inventory
exports.startOrderSaga = function(event, context, callback) {
  var order = event.detail;
  var sagaId = "saga-" + order.orderId;

  dynamodb.put({
    TableName: SAGA_TABLE,
    Item: {
      sagaId: sagaId,
      orderId: order.orderId,
      status: "STARTED",
      steps: {
        inventory: "PENDING",
        payment: "PENDING",
        shipping: "PENDING"
      },
      createdAt: Date.now()
    }
  }).promise()
  .then(function() {
    return publishEvent("saga-orchestrator", "ReserveInventory", {
      sagaId: sagaId,
      orderId: order.orderId,
      items: order.items
    });
  })
  .then(function() {
    callback(null, { sagaId: sagaId, status: "STARTED" });
  })
  .catch(function(err) {
    console.error("Saga start failed:", err);
    callback(err);
  });
};

// Step 2: Inventory reserved — process payment
exports.handleInventoryReserved = function(event, context, callback) {
  var detail = event.detail;
  var sagaId = detail.sagaId;

  updateSagaStep(sagaId, "inventory", "COMPLETED")
    .then(function() {
      return publishEvent("saga-orchestrator", "ProcessPayment", {
        sagaId: sagaId,
        orderId: detail.orderId,
        amount: detail.totalAmount
      });
    })
    .then(function() { callback(null, "Payment step triggered"); })
    .catch(function(err) { callback(err); });
};

// Step 3: Payment processed — arrange shipping
exports.handlePaymentProcessed = function(event, context, callback) {
  var detail = event.detail;
  var sagaId = detail.sagaId;

  updateSagaStep(sagaId, "payment", "COMPLETED")
    .then(function() {
      return publishEvent("saga-orchestrator", "ArrangeShipping", {
        sagaId: sagaId,
        orderId: detail.orderId,
        address: detail.shippingAddress
      });
    })
    .then(function() { callback(null, "Shipping step triggered"); })
    .catch(function(err) { callback(err); });
};

// Compensation: Payment failed — release inventory
exports.handlePaymentFailed = function(event, context, callback) {
  var detail = event.detail;
  var sagaId = detail.sagaId;

  updateSagaStep(sagaId, "payment", "FAILED")
    .then(function() {
      return publishEvent("saga-orchestrator", "ReleaseInventory", {
        sagaId: sagaId,
        orderId: detail.orderId,
        items: detail.items,
        reason: "Payment failed: " + detail.error
      });
    })
    .then(function() {
      return updateSagaStatus(sagaId, "COMPENSATING");
    })
    .then(function() { callback(null, "Compensation triggered"); })
    .catch(function(err) { callback(err); });
};

function updateSagaStep(sagaId, step, status) {
  return dynamodb.update({
    TableName: SAGA_TABLE,
    Key: { sagaId: sagaId },
    UpdateExpression: "SET steps.#step = :status, updatedAt = :ts",
    ExpressionAttributeNames: { "#step": step },
    ExpressionAttributeValues: { ":status": status, ":ts": Date.now() }
  }).promise();
}

function updateSagaStatus(sagaId, status) {
  return dynamodb.update({
    TableName: SAGA_TABLE,
    Key: { sagaId: sagaId },
    UpdateExpression: "SET #s = :status, updatedAt = :ts",
    ExpressionAttributeNames: { "#s": "status" },
    ExpressionAttributeValues: { ":status": status, ":ts": Date.now() }
  }).promise();
}

Idempotency in Event Handlers

Events can be delivered more than once. EventBridge guarantees at-least-once delivery. DynamoDB Streams can replay records during shard splits. Your handlers must be idempotent — processing the same event twice should produce the same result.

var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var crypto = require("crypto");

var IDEMPOTENCY_TABLE = "idempotency-keys";
var TTL_SECONDS = 86400; // 24 hours

function withIdempotency(eventId, handler) {
  var now = Math.floor(Date.now() / 1000);

  return dynamodb.put({
    TableName: IDEMPOTENCY_TABLE,
    Item: {
      idempotencyKey: eventId,
      status: "PROCESSING",
      createdAt: now,
      ttl: now + TTL_SECONDS
    },
    ConditionExpression: "attribute_not_exists(idempotencyKey)"
  }).promise()
  .then(function() {
    return handler();
  })
  .then(function(result) {
    return dynamodb.update({
      TableName: IDEMPOTENCY_TABLE,
      Key: { idempotencyKey: eventId },
      UpdateExpression: "SET #s = :status, #r = :result",
      ExpressionAttributeNames: { "#s": "status", "#r": "result" },
      ExpressionAttributeValues: {
        ":status": "COMPLETED",
        ":result": JSON.stringify(result)
      }
    }).promise()
    .then(function() { return result; });
  })
  .catch(function(err) {
    if (err.code === "ConditionalCheckFailedException") {
      console.log("Duplicate event detected, skipping:", eventId);
      return dynamodb.get({
        TableName: IDEMPOTENCY_TABLE,
        Key: { idempotencyKey: eventId }
      }).promise()
      .then(function(data) {
        if (data.Item && data.Item.status === "COMPLETED") {
          return JSON.parse(data.Item.result);
        }
        // Still processing from another invocation
        return { status: "ALREADY_PROCESSING" };
      });
    }
    throw err;
  });
}

// Usage
exports.handler = function(event, context, callback) {
  var detail = event.detail;
  var eventId = detail.orderId + "-" + detail.eventType + "-" + detail.timestamp;

  withIdempotency(eventId, function() {
    return processOrder(detail);
  })
  .then(function(result) {
    callback(null, result);
  })
  .catch(function(err) {
    callback(err);
  });
};

Event Replay and Dead Letter Handling

When things go wrong, and they will, you need dead letter queues (DLQs) and the ability to replay failed events. Every EventBridge rule target should have a DLQ configured. Build a replay mechanism that redrives messages from the DLQ back to the event bus.

var AWS = require("aws-sdk");
var sqs = new AWS.SQS();
var eventbridge = new AWS.EventBridge();

// Replay failed events from DLQ
exports.replayDLQ = function(event, context, callback) {
  var dlqUrl = process.env.DLQ_URL;
  var maxMessages = parseInt(event.queryStringParameters.max) || 10;

  function processMessages(count) {
    if (count >= maxMessages) {
      return Promise.resolve({ replayed: count });
    }

    return sqs.receiveMessage({
      QueueUrl: dlqUrl,
      MaxNumberOfMessages: Math.min(10, maxMessages - count),
      WaitTimeSeconds: 1
    }).promise()
    .then(function(data) {
      if (!data.Messages || data.Messages.length === 0) {
        return { replayed: count };
      }

      var replays = data.Messages.map(function(msg) {
        var originalEvent = JSON.parse(msg.Body);

        return eventbridge.putEvents({
          Entries: [{
            Source: originalEvent.source,
            DetailType: originalEvent["detail-type"],
            Detail: JSON.stringify(originalEvent.detail),
            EventBusName: "ecommerce-events"
          }]
        }).promise()
        .then(function() {
          return sqs.deleteMessage({
            QueueUrl: dlqUrl,
            ReceiptHandle: msg.ReceiptHandle
          }).promise();
        });
      });

      return Promise.all(replays).then(function() {
        return processMessages(count + data.Messages.length);
      });
    });
  }

  processMessages(0)
    .then(function(result) {
      callback(null, {
        statusCode: 200,
        body: JSON.stringify(result)
      });
    })
    .catch(function(err) {
      callback(null, {
        statusCode: 500,
        body: JSON.stringify({ error: err.message })
      });
    });
};

Set up CloudWatch alarms on your DLQ so you know immediately when events fail:

DLQAlarm:
  Type: AWS::CloudWatch::Alarm
  Properties:
    AlarmName: inventory-dlq-messages
    AlarmDescription: Messages in inventory dead letter queue
    MetricName: ApproximateNumberOfMessagesVisible
    Namespace: AWS/SQS
    Statistic: Sum
    Period: 60
    EvaluationPeriods: 1
    Threshold: 1
    ComparisonOperator: GreaterThanOrEqualToThreshold
    Dimensions:
      - Name: QueueName
        Value: !GetAtt InventoryDLQ.QueueName
    AlarmActions:
      - !Ref AlertSNSTopic

Cross-Service Event Contracts

When multiple teams own different services, you need explicit contracts for events. I recommend a shared npm package that contains event types, schemas, and helper functions. This prevents drift between producers and consumers.

// shared-events/index.js — published as @company/event-contracts
var schemas = require("./schemas");
var validators = {};

var EVENT_TYPES = {
  ORDER_PLACED: "OrderPlaced",
  ORDER_CONFIRMED: "OrderConfirmed",
  ORDER_SHIPPED: "OrderShipped",
  ORDER_CANCELLED: "OrderCancelled",
  INVENTORY_RESERVED: "InventoryReserved",
  INVENTORY_RELEASED: "InventoryReleased",
  PAYMENT_PROCESSED: "PaymentProcessed",
  PAYMENT_FAILED: "PaymentFailed",
  NOTIFICATION_SENT: "NotificationSent"
};

var SOURCES = {
  ORDER_SERVICE: "order-service",
  INVENTORY_SERVICE: "inventory-service",
  PAYMENT_SERVICE: "payment-service",
  NOTIFICATION_SERVICE: "notification-service"
};

function createEvent(source, detailType, detail) {
  if (!SOURCES[source]) {
    throw new Error("Unknown source: " + source);
  }
  if (!Object.values(EVENT_TYPES).includes(detailType)) {
    throw new Error("Unknown event type: " + detailType);
  }

  return {
    Source: SOURCES[source],
    DetailType: detailType,
    Detail: JSON.stringify(detail),
    EventBusName: "ecommerce-events",
    Time: new Date()
  };
}

module.exports = {
  EVENT_TYPES: EVENT_TYPES,
  SOURCES: SOURCES,
  createEvent: createEvent,
  schemas: schemas
};

Complete Working Example: E-Commerce Event System

Here is a complete SAM template and Lambda functions for an e-commerce event system. When an order is placed, EventBridge routes the event to three independent consumers: inventory reservation, customer notification, and analytics ingestion.

# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Globals:
  Function:
    Runtime: nodejs18.x
    Timeout: 30
    MemorySize: 256
    Environment:
      Variables:
        EVENT_BUS_NAME: !Ref EcommerceEventBus

Resources:
  EcommerceEventBus:
    Type: AWS::Events::EventBus
    Properties:
      Name: ecommerce-events

  # Producer: Order API
  OrderFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: src/order/handler.createOrder
      Policies:
        - EventBridgePutEventsPolicy:
            EventBusName: !Ref EcommerceEventBus
        - DynamoDBCrudPolicy:
            TableName: !Ref OrdersTable
      Events:
        CreateOrder:
          Type: Api
          Properties:
            Path: /orders
            Method: POST

  # Consumer 1: Inventory Service
  InventoryFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: src/inventory/handler.reserveInventory
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref InventoryTable
        - EventBridgePutEventsPolicy:
            EventBusName: !Ref EcommerceEventBus
      Events:
        OrderPlaced:
          Type: EventBridgeRule
          Properties:
            EventBusName: !Ref EcommerceEventBus
            Pattern:
              source: ["order-service"]
              detail-type: ["OrderPlaced"]
            DeadLetterConfig:
              Arn: !GetAtt InventoryDLQ.Arn
            RetryPolicy:
              MaximumRetryAttempts: 3

  # Consumer 2: Notification Service
  NotificationFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: src/notification/handler.sendOrderConfirmation
      Policies:
        - SESCrudPolicy:
            IdentityName: !Ref SESIdentity
      Events:
        OrderPlaced:
          Type: EventBridgeRule
          Properties:
            EventBusName: !Ref EcommerceEventBus
            Pattern:
              source: ["order-service"]
              detail-type: ["OrderPlaced"]

  # Consumer 3: Analytics Service
  AnalyticsFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: src/analytics/handler.ingestOrderEvent
      Policies:
        - KinesisStreamCrudPolicy:
            StreamName: !Ref AnalyticsStream
      Events:
        AllOrderEvents:
          Type: EventBridgeRule
          Properties:
            EventBusName: !Ref EcommerceEventBus
            Pattern:
              source: ["order-service"]

  OrdersTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: orders
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: orderId
          AttributeType: S
      KeySchema:
        - AttributeName: orderId
          KeyType: HASH

  InventoryTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: inventory
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: productId
          AttributeType: S
      KeySchema:
        - AttributeName: productId
          KeyType: HASH

  InventoryDLQ:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: inventory-dlq
      MessageRetentionPeriod: 1209600

  AnalyticsStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: order-analytics
      ShardCount: 1

The order handler (producer):

// src/order/handler.js
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var eventbridge = new AWS.EventBridge();

exports.createOrder = function(event, context, callback) {
  var body = JSON.parse(event.body);
  var orderId = "ord-" + Date.now().toString(36) + Math.random().toString(36).substr(2, 4);

  var order = {
    orderId: orderId,
    customerId: body.customerId,
    customerEmail: body.email,
    items: body.items,
    total: body.items.reduce(function(sum, item) {
      return sum + (item.price * item.quantity);
    }, 0),
    status: "placed",
    createdAt: new Date().toISOString()
  };

  dynamodb.put({
    TableName: "orders",
    Item: order
  }).promise()
  .then(function() {
    return eventbridge.putEvents({
      Entries: [{
        Source: "order-service",
        DetailType: "OrderPlaced",
        Detail: JSON.stringify(order),
        EventBusName: process.env.EVENT_BUS_NAME
      }]
    }).promise();
  })
  .then(function(result) {
    if (result.FailedEntryCount > 0) {
      console.error("EventBridge publish partial failure:", JSON.stringify(result.Entries));
    }
    callback(null, {
      statusCode: 201,
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ orderId: orderId, status: "placed" })
    });
  })
  .catch(function(err) {
    console.error("Order creation failed:", err);
    callback(null, {
      statusCode: 500,
      body: JSON.stringify({ error: "Failed to create order" })
    });
  });
};

The inventory consumer:

// src/inventory/handler.js
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var eventbridge = new AWS.EventBridge();

exports.reserveInventory = function(event, context, callback) {
  var order = event.detail;
  console.log("Reserving inventory for order:", order.orderId);

  var reservations = order.items.map(function(item) {
    return dynamodb.update({
      TableName: "inventory",
      Key: { productId: item.productId },
      UpdateExpression: "SET availableQty = availableQty - :qty, reservedQty = reservedQty + :qty",
      ConditionExpression: "availableQty >= :qty",
      ExpressionAttributeValues: { ":qty": item.quantity },
      ReturnValues: "ALL_NEW"
    }).promise()
    .then(function(result) {
      return { productId: item.productId, reserved: true, remaining: result.Attributes.availableQty };
    })
    .catch(function(err) {
      if (err.code === "ConditionalCheckFailedException") {
        return { productId: item.productId, reserved: false, reason: "Insufficient stock" };
      }
      throw err;
    });
  });

  Promise.all(reservations)
    .then(function(results) {
      var allReserved = results.every(function(r) { return r.reserved; });

      if (allReserved) {
        return eventbridge.putEvents({
          Entries: [{
            Source: "inventory-service",
            DetailType: "InventoryReserved",
            Detail: JSON.stringify({
              orderId: order.orderId,
              items: results,
              totalAmount: order.total
            }),
            EventBusName: process.env.EVENT_BUS_NAME
          }]
        }).promise();
      } else {
        var failedItems = results.filter(function(r) { return !r.reserved; });
        // Rollback successful reservations
        var rollbacks = results
          .filter(function(r) { return r.reserved; })
          .map(function(r) {
            var originalItem = order.items.find(function(i) { return i.productId === r.productId; });
            return dynamodb.update({
              TableName: "inventory",
              Key: { productId: r.productId },
              UpdateExpression: "SET availableQty = availableQty + :qty, reservedQty = reservedQty - :qty",
              ExpressionAttributeValues: { ":qty": originalItem.quantity }
            }).promise();
          });

        return Promise.all(rollbacks).then(function() {
          return eventbridge.putEvents({
            Entries: [{
              Source: "inventory-service",
              DetailType: "InventoryReservationFailed",
              Detail: JSON.stringify({
                orderId: order.orderId,
                failedItems: failedItems,
                reason: "Insufficient stock for one or more items"
              }),
              EventBusName: process.env.EVENT_BUS_NAME
            }]
          }).promise();
        });
      }
    })
    .then(function() { callback(null, "Inventory processed"); })
    .catch(function(err) {
      console.error("Inventory reservation error:", err);
      callback(err);
    });
};

The notification consumer:

// src/notification/handler.js
var AWS = require("aws-sdk");
var ses = new AWS.SES();

exports.sendOrderConfirmation = function(event, context, callback) {
  var order = event.detail;

  var itemList = order.items.map(function(item) {
    return "  - " + item.productId + " (qty: " + item.quantity + ") — $" + item.price.toFixed(2);
  }).join("\n");

  var params = {
    Source: "[email protected]",
    Destination: { ToAddresses: [order.customerEmail] },
    Message: {
      Subject: { Data: "Order Confirmation — " + order.orderId },
      Body: {
        Text: {
          Data: "Thank you for your order!\n\n" +
            "Order ID: " + order.orderId + "\n" +
            "Items:\n" + itemList + "\n\n" +
            "Total: $" + order.total.toFixed(2) + "\n\n" +
            "We will notify you when your order ships."
        }
      }
    }
  };

  ses.sendEmail(params).promise()
    .then(function(result) {
      console.log("Confirmation sent:", result.MessageId);
      callback(null, "Email sent");
    })
    .catch(function(err) {
      console.error("Email send failed:", err);
      callback(err);
    });
};

The analytics consumer:

// src/analytics/handler.js
var AWS = require("aws-sdk");
var kinesis = new AWS.Kinesis();

exports.ingestOrderEvent = function(event, context, callback) {
  var detail = event.detail;
  var detailType = event["detail-type"];

  var analyticsRecord = {
    eventType: detailType,
    orderId: detail.orderId,
    customerId: detail.customerId,
    total: detail.total,
    itemCount: detail.items ? detail.items.length : 0,
    timestamp: new Date().toISOString(),
    source: event.source
  };

  kinesis.putRecord({
    StreamName: "order-analytics",
    Data: JSON.stringify(analyticsRecord),
    PartitionKey: detail.orderId
  }).promise()
  .then(function() {
    console.log("Analytics event ingested:", detailType, detail.orderId);
    callback(null, "Ingested");
  })
  .catch(function(err) {
    console.error("Analytics ingestion failed:", err);
    callback(err);
  });
};

Common Issues and Troubleshooting

1. EventBridge event never reaches the target Lambda

Error: No matching rule found for event

This usually means your event pattern does not match the event structure. Remember that detail-type in the rule pattern must exactly match the DetailType you publish. Case matters. Check your pattern with the EventBridge sandbox: go to the EventBridge console, select your bus, and use the event pattern tester. Also verify the rule is in the ENABLED state and attached to the correct event bus name.

2. DynamoDB Stream Lambda receives partial batches or duplicate records

Error: Lambda function timeout exceeded. Records will be retried.

When your Lambda times out while processing a DynamoDB Stream batch, the entire batch retries from the beginning. This is why idempotency matters. Set BisectBatchOnFunctionError: true in your event source mapping so Lambda splits the batch in half and retries smaller chunks. Also reduce BatchSize and increase function timeout.

3. EventBridge PutEvents returns success but FailedEntryCount is greater than zero

// Response from putEvents
{
  "FailedEntryCount": 1,
  "Entries": [
    {
      "ErrorCode": "InternalFailure",
      "ErrorMessage": "An internal error occurred"
    }
  ]
}

Always check result.FailedEntryCount after calling putEvents(). A 200 response does not mean all events succeeded. Implement retry logic for failed entries with exponential backoff. This is a common production bug — most developers only check the HTTP status code.

4. WebSocket postToConnection fails with 410 GoneException

GoneException: Connection ID abcd1234 is no longer available

Connections go stale when clients disconnect without sending a proper close frame. Mobile clients frequently do this. Always wrap postToConnection in a try-catch, and when you get a 410 status code, delete the stale connection record from your connections table. Do not let stale connections accumulate or you will waste Lambda invocations trying to post to dead connections.

5. Saga state gets corrupted when multiple events arrive simultaneously

ConditionalCheckFailedException: The conditional request failed

When two saga steps try to update the same saga record at the same time, you get conflicts. Use DynamoDB conditional writes with version numbers to detect conflicts. Alternatively, structure your saga state table so each step writes to a separate attribute (as shown in the saga example above), which avoids write conflicts entirely.

Best Practices

  • Always configure dead letter queues on every EventBridge rule target and every Lambda event source mapping. Events that cannot be processed must go somewhere observable, not disappear silently.

  • Validate events at the boundary. Validate schema at publish time (producer side) and at consumption time (consumer side). Schemas will evolve, and defensive parsing prevents cascading failures across services.

  • Design events as facts, not commands. Name events in past tense — OrderPlaced, not PlaceOrder. Events describe what happened. Commands describe what should happen. Mixing the two creates tight coupling.

  • Keep event payloads lean. Include identifiers and essential data, not entire database records. Consumers can look up additional details if needed. Large payloads hit EventBridge's 256KB limit and increase costs.

  • Implement idempotency in every consumer. At-least-once delivery is the norm. Use idempotency keys derived from the event's unique identifier plus the event type. Store processing results in DynamoDB with a TTL so the idempotency table does not grow unbounded.

  • Version your event schemas. Add a schemaVersion field to every event. Consumers should handle unknown versions gracefully by logging and skipping rather than crashing. This allows you to evolve schemas without coordinating deployments across all teams simultaneously.

  • Use content-based filtering aggressively. Do not have consumers receive all events and filter in code. Push filtering to EventBridge rules. This reduces Lambda invocations, lowers cost, and improves latency.

  • Monitor event age and DLQ depth. Set CloudWatch alarms on ApproximateAgeOfOldestMessage for SQS queues and IngestionToInvocationStartLatency for EventBridge. If events are getting old, your consumers cannot keep up.

  • Test event flows with integration tests. Unit testing individual Lambda handlers is not enough. Deploy to a test environment and publish real events through the bus. Verify that the right consumers fire in the right order. Use EventBridge's archive and replay feature to capture production event patterns for test fixtures.

References

Powered by Contentful