Event-Driven Serverless Patterns
Build event-driven serverless architectures with EventBridge, DynamoDB Streams, and saga patterns for distributed Node.js applications
Event-Driven Serverless Patterns
Overview
Event-driven serverless architecture is the backbone of modern distributed systems that need to scale independently, remain loosely coupled, and react to state changes in real time. Instead of services calling each other directly through synchronous HTTP requests, they emit events and let downstream consumers react asynchronously. This article walks through the core patterns you need to build production-grade event-driven systems on AWS serverless infrastructure using Node.js.
Prerequisites
- Node.js 18+ installed locally
- AWS account with IAM permissions for Lambda, EventBridge, DynamoDB, S3, API Gateway, and SQS
- AWS CLI configured with credentials
- Basic understanding of AWS Lambda and serverless concepts
- Familiarity with the Serverless Framework or AWS SAM
Event-Driven Architecture Fundamentals
The core idea is simple: producers emit events, and consumers subscribe to them. There is no direct coupling between the two. An event represents a fact — something that happened. It is immutable. Once an order is placed, that event exists forever.
There are three roles in any event-driven system:
- Producers — services that generate events when state changes occur
- Event Bus — the routing infrastructure that delivers events to the right consumers
- Consumers — services that react to events they care about
// A simple event structure
var event = {
source: "order-service",
detailType: "OrderPlaced",
time: new Date().toISOString(),
detail: {
orderId: "ord-12345",
customerId: "cust-789",
items: [
{ productId: "prod-001", quantity: 2, price: 29.99 },
{ productId: "prod-002", quantity: 1, price: 49.99 }
],
total: 109.97,
currency: "USD"
}
};
The key distinction from request-response is that the producer does not wait for anything. It fires the event and moves on. This decoupling means the inventory service can go down for maintenance without breaking the order flow. Events queue up and get processed when the service comes back online.
EventBridge as an Event Bus
Amazon EventBridge is the managed event bus that sits at the center of your event-driven architecture. It accepts events from any source, matches them against rules you define, and routes them to target services. Think of it as a sophisticated message router with built-in schema management.
var AWS = require("aws-sdk");
var eventbridge = new AWS.EventBridge({ region: "us-east-1" });
function publishEvent(source, detailType, detail) {
var params = {
Entries: [
{
Source: source,
DetailType: detailType,
Detail: JSON.stringify(detail),
EventBusName: "ecommerce-events",
Time: new Date()
}
]
};
return eventbridge.putEvents(params).promise();
}
// Usage in a Lambda handler
exports.handler = function(event, context, callback) {
var orderData = JSON.parse(event.body);
publishEvent("order-service", "OrderPlaced", {
orderId: orderData.orderId,
customerId: orderData.customerId,
items: orderData.items,
total: orderData.total,
timestamp: Date.now()
})
.then(function(result) {
console.log("Event published:", JSON.stringify(result));
callback(null, {
statusCode: 200,
body: JSON.stringify({ message: "Order placed", orderId: orderData.orderId })
});
})
.catch(function(err) {
console.error("Failed to publish event:", err);
callback(null, { statusCode: 500, body: JSON.stringify({ error: "Internal error" }) });
});
};
Creating a custom event bus keeps your events isolated from the default bus and gives you fine-grained control over access policies:
var createBusParams = {
Name: "ecommerce-events",
Tags: [
{ Key: "environment", Value: "production" },
{ Key: "team", Value: "platform" }
]
};
eventbridge.createEventBus(createBusParams).promise()
.then(function(result) {
console.log("Event bus created:", result.EventBusArn);
});
Custom Event Schemas
Defining schemas up front prevents chaos. When ten services produce events with no contract, you end up with inconsistent payloads that break consumers at runtime. EventBridge has a schema registry, but I prefer defining schemas in code and validating them before publishing.
var Ajv = require("ajv");
var ajv = new Ajv();
var orderPlacedSchema = {
type: "object",
required: ["orderId", "customerId", "items", "total", "timestamp"],
properties: {
orderId: { type: "string", pattern: "^ord-[a-z0-9]+$" },
customerId: { type: "string", pattern: "^cust-[a-z0-9]+$" },
items: {
type: "array",
minItems: 1,
items: {
type: "object",
required: ["productId", "quantity", "price"],
properties: {
productId: { type: "string" },
quantity: { type: "integer", minimum: 1 },
price: { type: "number", minimum: 0 }
}
}
},
total: { type: "number", minimum: 0 },
timestamp: { type: "number" }
},
additionalProperties: false
};
var validate = ajv.compile(orderPlacedSchema);
function validateAndPublish(detailType, detail, schema) {
var valid = validate(detail);
if (!valid) {
var errorMsg = "Schema validation failed: " + JSON.stringify(validate.errors);
console.error(errorMsg);
return Promise.reject(new Error(errorMsg));
}
return publishEvent("order-service", detailType, detail);
}
Register your schemas in the EventBridge Schema Registry so other teams can discover them:
var schemas = new AWS.Schemas({ region: "us-east-1" });
var schemaParams = {
Content: JSON.stringify({
openapi: "3.0.0",
info: { title: "OrderPlaced", version: "1" },
paths: {},
components: {
schemas: {
OrderPlaced: orderPlacedSchema
}
}
}),
RegistryName: "ecommerce-events",
SchemaName: "order-service@OrderPlaced",
Type: "OpenApi3"
};
schemas.createSchema(schemaParams).promise();
Event Routing Rules
EventBridge rules match incoming events using JSON-based content filtering. You can filter on any field in the event envelope or the detail payload. This is where the real power lives — each consumer only receives the events it cares about.
var ruleParams = {
Name: "route-order-to-inventory",
EventBusName: "ecommerce-events",
EventPattern: JSON.stringify({
source: ["order-service"],
"detail-type": ["OrderPlaced"],
detail: {
total: [{ numeric: [">", 0] }]
}
}),
State: "ENABLED",
Description: "Routes order events to inventory service"
};
eventbridge.putRule(ruleParams).promise()
.then(function() {
return eventbridge.putTargets({
Rule: "route-order-to-inventory",
EventBusName: "ecommerce-events",
Targets: [
{
Id: "inventory-lambda",
Arn: "arn:aws:lambda:us-east-1:123456789:function:inventory-handler",
RetryPolicy: {
MaximumRetryAttempts: 3,
MaximumEventAgeInSeconds: 3600
},
DeadLetterConfig: {
Arn: "arn:aws:sqs:us-east-1:123456789:inventory-dlq"
}
}
]
}).promise();
});
Content-based filtering supports prefix matching, suffix matching, numeric comparisons, and exists checks. Use these to avoid over-triggering consumers:
{
"source": ["order-service"],
"detail-type": ["OrderPlaced"],
"detail": {
"items": {
"productId": [{ "prefix": "prod-electronics-" }]
},
"total": [{ "numeric": [">=", 100, "<=", 10000] }],
"promoCode": [{ "exists": true }]
}
}
S3 Event Notifications
S3 buckets are natural event producers. File uploads, deletions, and transitions trigger events that you can route through EventBridge. This is essential for data pipelines, image processing, and ETL workflows.
// Lambda triggered by S3 event via EventBridge
exports.handler = function(event, context, callback) {
var s3Event = event.detail;
var bucket = s3Event.bucket.name;
var key = s3Event.object.key;
var size = s3Event.object.size;
console.log("Processing file:", bucket + "/" + key, "Size:", size);
if (key.startsWith("uploads/invoices/")) {
return processInvoice(bucket, key)
.then(function(result) {
callback(null, result);
})
.catch(function(err) {
console.error("Invoice processing failed:", err);
callback(err);
});
}
if (key.startsWith("uploads/images/")) {
return generateThumbnails(bucket, key)
.then(function(result) {
callback(null, result);
})
.catch(function(err) {
console.error("Thumbnail generation failed:", err);
callback(err);
});
}
callback(null, { message: "No handler for prefix" });
};
function processInvoice(bucket, key) {
var s3 = new AWS.S3();
return s3.getObject({ Bucket: bucket, Key: key }).promise()
.then(function(data) {
var content = data.Body.toString("utf-8");
// Parse and store invoice data
return { processed: true, key: key };
});
}
Enable EventBridge notifications on your S3 bucket through CloudFormation or the SDK:
# CloudFormation
InvoiceBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ecommerce-invoices
NotificationConfiguration:
EventBridgeConfiguration:
EventBridgeEnabled: true
DynamoDB Streams as Event Sources
DynamoDB Streams capture every write operation on a table and deliver them as an ordered sequence of records. This is the change data capture (CDC) pattern — your Lambda reacts to inserts, updates, and deletes without polling.
// Lambda triggered by DynamoDB Stream
exports.handler = function(event, context, callback) {
var promises = event.Records.map(function(record) {
var eventName = record.eventName; // INSERT, MODIFY, REMOVE
var newImage = record.dynamodb.NewImage;
var oldImage = record.dynamodb.OldImage;
if (eventName === "INSERT") {
return handleNewOrder(AWS.DynamoDB.Converter.unmarshall(newImage));
}
if (eventName === "MODIFY") {
var newData = AWS.DynamoDB.Converter.unmarshall(newImage);
var oldData = AWS.DynamoDB.Converter.unmarshall(oldImage);
if (oldData.status !== newData.status) {
return handleStatusChange(newData, oldData.status, newData.status);
}
}
if (eventName === "REMOVE") {
return handleOrderCancelled(AWS.DynamoDB.Converter.unmarshall(oldImage));
}
return Promise.resolve();
});
Promise.all(promises)
.then(function() { callback(null, "Success"); })
.catch(function(err) { callback(err); });
};
function handleStatusChange(order, oldStatus, newStatus) {
console.log("Order", order.orderId, "changed from", oldStatus, "to", newStatus);
return publishEvent("order-service", "OrderStatusChanged", {
orderId: order.orderId,
previousStatus: oldStatus,
newStatus: newStatus,
timestamp: Date.now()
});
}
Configure the event source mapping to control batch size and parallelization:
# SAM template
OrderStreamFunction:
Type: AWS::Serverless::Function
Properties:
Handler: stream-handler.handler
Runtime: nodejs18.x
Events:
OrderStream:
Type: DynamoDB
Properties:
Stream: !GetAtt OrdersTable.StreamArn
StartingPosition: TRIM_HORIZON
BatchSize: 25
MaximumBatchingWindowInSeconds: 5
MaximumRetryAttempts: 3
BisectBatchOnFunctionError: true
ParallelizationFactor: 5
DestinationConfig:
OnFailure:
Destination: !GetAtt StreamDLQ.Arn
API Gateway WebSocket for Real-Time Events
When clients need real-time updates (order status changes, live notifications), WebSocket APIs through API Gateway let you push events directly to connected browsers or mobile apps.
var AWS = require("aws-sdk");
// Connection handler
exports.connect = function(event, context, callback) {
var connectionId = event.requestContext.connectionId;
var userId = event.queryStringParameters.userId;
var dynamodb = new AWS.DynamoDB.DocumentClient();
dynamodb.put({
TableName: "websocket-connections",
Item: {
connectionId: connectionId,
userId: userId,
connectedAt: Date.now(),
ttl: Math.floor(Date.now() / 1000) + 86400
}
}).promise()
.then(function() {
callback(null, { statusCode: 200, body: "Connected" });
})
.catch(function(err) {
callback(null, { statusCode: 500, body: "Failed to connect" });
});
};
// Push event to connected clients
exports.pushToUser = function(userId, payload) {
var dynamodb = new AWS.DynamoDB.DocumentClient();
var apigateway;
return dynamodb.query({
TableName: "websocket-connections",
IndexName: "userId-index",
KeyConditionExpression: "userId = :uid",
ExpressionAttributeValues: { ":uid": userId }
}).promise()
.then(function(result) {
apigateway = new AWS.ApiGatewayManagementApi({
endpoint: process.env.WEBSOCKET_ENDPOINT
});
var sends = result.Items.map(function(connection) {
return apigateway.postToConnection({
ConnectionId: connection.connectionId,
Data: JSON.stringify(payload)
}).promise()
.catch(function(err) {
if (err.statusCode === 410) {
// Connection is stale, clean it up
return dynamodb.delete({
TableName: "websocket-connections",
Key: { connectionId: connection.connectionId }
}).promise();
}
throw err;
});
});
return Promise.all(sends);
});
};
Event Sourcing Pattern in Serverless
Event sourcing stores every state change as an immutable event rather than overwriting the current state. The current state is derived by replaying events in order. This gives you a complete audit trail and the ability to reconstruct state at any point in time.
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var EVENT_STORE_TABLE = "event-store";
function appendEvent(aggregateId, eventType, eventData, expectedVersion) {
var version = expectedVersion + 1;
var params = {
TableName: EVENT_STORE_TABLE,
Item: {
aggregateId: aggregateId,
version: version,
eventType: eventType,
data: eventData,
timestamp: new Date().toISOString(),
eventId: require("crypto").randomUUID()
},
ConditionExpression: "attribute_not_exists(aggregateId) AND attribute_not_exists(version)"
};
return dynamodb.put(params).promise();
}
function getEvents(aggregateId) {
return dynamodb.query({
TableName: EVENT_STORE_TABLE,
KeyConditionExpression: "aggregateId = :id",
ExpressionAttributeValues: { ":id": aggregateId },
ScanIndexForward: true
}).promise()
.then(function(result) {
return result.Items;
});
}
function rebuildState(aggregateId) {
return getEvents(aggregateId).then(function(events) {
var state = { status: "unknown", items: [], total: 0 };
events.forEach(function(event) {
switch (event.eventType) {
case "OrderCreated":
state.orderId = event.data.orderId;
state.customerId = event.data.customerId;
state.status = "created";
state.createdAt = event.timestamp;
break;
case "ItemAdded":
state.items.push(event.data.item);
state.total = state.total + (event.data.item.price * event.data.item.quantity);
break;
case "ItemRemoved":
state.items = state.items.filter(function(i) {
return i.productId !== event.data.productId;
});
state.total = state.items.reduce(function(sum, i) {
return sum + (i.price * i.quantity);
}, 0);
break;
case "OrderConfirmed":
state.status = "confirmed";
state.confirmedAt = event.timestamp;
break;
case "OrderShipped":
state.status = "shipped";
state.trackingNumber = event.data.trackingNumber;
break;
case "OrderCancelled":
state.status = "cancelled";
state.cancelReason = event.data.reason;
break;
}
});
state.version = events.length;
return state;
});
}
CQRS with Serverless
Command Query Responsibility Segregation (CQRS) separates your write model from your read model. Commands mutate state through events. Queries read from optimized projections. In serverless, DynamoDB Streams naturally bridge the two sides.
// Command side — write handler
exports.placeOrder = function(event, context, callback) {
var command = JSON.parse(event.body);
// Validate command
if (!command.customerId || !command.items || command.items.length === 0) {
return callback(null, {
statusCode: 400,
body: JSON.stringify({ error: "Invalid order command" })
});
}
var orderId = "ord-" + Date.now().toString(36);
// Write to event store (command side)
appendEvent(orderId, "OrderCreated", {
orderId: orderId,
customerId: command.customerId,
items: command.items,
total: command.items.reduce(function(sum, item) {
return sum + (item.price * item.quantity);
}, 0)
}, 0)
.then(function() {
callback(null, {
statusCode: 201,
body: JSON.stringify({ orderId: orderId })
});
})
.catch(function(err) {
callback(null, {
statusCode: 500,
body: JSON.stringify({ error: "Failed to place order" })
});
});
};
// Query side — read projection handler
// Triggered by DynamoDB Stream on the event store table
exports.updateProjection = function(event, context, callback) {
var dynamodb = new AWS.DynamoDB.DocumentClient();
var updates = event.Records
.filter(function(record) { return record.eventName === "INSERT"; })
.map(function(record) {
var item = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
if (item.eventType === "OrderCreated") {
return dynamodb.put({
TableName: "orders-read-model",
Item: {
orderId: item.data.orderId,
customerId: item.data.customerId,
status: "created",
total: item.data.total,
itemCount: item.data.items.length,
createdAt: item.timestamp,
updatedAt: item.timestamp
}
}).promise();
}
if (item.eventType === "OrderShipped") {
return dynamodb.update({
TableName: "orders-read-model",
Key: { orderId: item.aggregateId },
UpdateExpression: "SET #s = :status, trackingNumber = :tracking, updatedAt = :ts",
ExpressionAttributeNames: { "#s": "status" },
ExpressionAttributeValues: {
":status": "shipped",
":tracking": item.data.trackingNumber,
":ts": item.timestamp
}
}).promise();
}
return Promise.resolve();
});
Promise.all(updates)
.then(function() { callback(null, "Projections updated"); })
.catch(function(err) { callback(err); });
};
// Query side — read handler
exports.getOrder = function(event, context, callback) {
var orderId = event.pathParameters.orderId;
var dynamodb = new AWS.DynamoDB.DocumentClient();
dynamodb.get({
TableName: "orders-read-model",
Key: { orderId: orderId }
}).promise()
.then(function(result) {
if (!result.Item) {
return callback(null, { statusCode: 404, body: JSON.stringify({ error: "Not found" }) });
}
callback(null, { statusCode: 200, body: JSON.stringify(result.Item) });
});
};
Saga Pattern for Distributed Transactions
In microservices there is no distributed transaction coordinator. The saga pattern chains local transactions across services using events. Each step either succeeds and triggers the next step, or fails and triggers compensating actions to roll back previous steps.
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var eventbridge = new AWS.EventBridge();
var SAGA_TABLE = "order-saga-state";
// Step 1: Start saga — reserve inventory
exports.startOrderSaga = function(event, context, callback) {
var order = event.detail;
var sagaId = "saga-" + order.orderId;
dynamodb.put({
TableName: SAGA_TABLE,
Item: {
sagaId: sagaId,
orderId: order.orderId,
status: "STARTED",
steps: {
inventory: "PENDING",
payment: "PENDING",
shipping: "PENDING"
},
createdAt: Date.now()
}
}).promise()
.then(function() {
return publishEvent("saga-orchestrator", "ReserveInventory", {
sagaId: sagaId,
orderId: order.orderId,
items: order.items
});
})
.then(function() {
callback(null, { sagaId: sagaId, status: "STARTED" });
})
.catch(function(err) {
console.error("Saga start failed:", err);
callback(err);
});
};
// Step 2: Inventory reserved — process payment
exports.handleInventoryReserved = function(event, context, callback) {
var detail = event.detail;
var sagaId = detail.sagaId;
updateSagaStep(sagaId, "inventory", "COMPLETED")
.then(function() {
return publishEvent("saga-orchestrator", "ProcessPayment", {
sagaId: sagaId,
orderId: detail.orderId,
amount: detail.totalAmount
});
})
.then(function() { callback(null, "Payment step triggered"); })
.catch(function(err) { callback(err); });
};
// Step 3: Payment processed — arrange shipping
exports.handlePaymentProcessed = function(event, context, callback) {
var detail = event.detail;
var sagaId = detail.sagaId;
updateSagaStep(sagaId, "payment", "COMPLETED")
.then(function() {
return publishEvent("saga-orchestrator", "ArrangeShipping", {
sagaId: sagaId,
orderId: detail.orderId,
address: detail.shippingAddress
});
})
.then(function() { callback(null, "Shipping step triggered"); })
.catch(function(err) { callback(err); });
};
// Compensation: Payment failed — release inventory
exports.handlePaymentFailed = function(event, context, callback) {
var detail = event.detail;
var sagaId = detail.sagaId;
updateSagaStep(sagaId, "payment", "FAILED")
.then(function() {
return publishEvent("saga-orchestrator", "ReleaseInventory", {
sagaId: sagaId,
orderId: detail.orderId,
items: detail.items,
reason: "Payment failed: " + detail.error
});
})
.then(function() {
return updateSagaStatus(sagaId, "COMPENSATING");
})
.then(function() { callback(null, "Compensation triggered"); })
.catch(function(err) { callback(err); });
};
function updateSagaStep(sagaId, step, status) {
return dynamodb.update({
TableName: SAGA_TABLE,
Key: { sagaId: sagaId },
UpdateExpression: "SET steps.#step = :status, updatedAt = :ts",
ExpressionAttributeNames: { "#step": step },
ExpressionAttributeValues: { ":status": status, ":ts": Date.now() }
}).promise();
}
function updateSagaStatus(sagaId, status) {
return dynamodb.update({
TableName: SAGA_TABLE,
Key: { sagaId: sagaId },
UpdateExpression: "SET #s = :status, updatedAt = :ts",
ExpressionAttributeNames: { "#s": "status" },
ExpressionAttributeValues: { ":status": status, ":ts": Date.now() }
}).promise();
}
Idempotency in Event Handlers
Events can be delivered more than once. EventBridge guarantees at-least-once delivery. DynamoDB Streams can replay records during shard splits. Your handlers must be idempotent — processing the same event twice should produce the same result.
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var crypto = require("crypto");
var IDEMPOTENCY_TABLE = "idempotency-keys";
var TTL_SECONDS = 86400; // 24 hours
function withIdempotency(eventId, handler) {
var now = Math.floor(Date.now() / 1000);
return dynamodb.put({
TableName: IDEMPOTENCY_TABLE,
Item: {
idempotencyKey: eventId,
status: "PROCESSING",
createdAt: now,
ttl: now + TTL_SECONDS
},
ConditionExpression: "attribute_not_exists(idempotencyKey)"
}).promise()
.then(function() {
return handler();
})
.then(function(result) {
return dynamodb.update({
TableName: IDEMPOTENCY_TABLE,
Key: { idempotencyKey: eventId },
UpdateExpression: "SET #s = :status, #r = :result",
ExpressionAttributeNames: { "#s": "status", "#r": "result" },
ExpressionAttributeValues: {
":status": "COMPLETED",
":result": JSON.stringify(result)
}
}).promise()
.then(function() { return result; });
})
.catch(function(err) {
if (err.code === "ConditionalCheckFailedException") {
console.log("Duplicate event detected, skipping:", eventId);
return dynamodb.get({
TableName: IDEMPOTENCY_TABLE,
Key: { idempotencyKey: eventId }
}).promise()
.then(function(data) {
if (data.Item && data.Item.status === "COMPLETED") {
return JSON.parse(data.Item.result);
}
// Still processing from another invocation
return { status: "ALREADY_PROCESSING" };
});
}
throw err;
});
}
// Usage
exports.handler = function(event, context, callback) {
var detail = event.detail;
var eventId = detail.orderId + "-" + detail.eventType + "-" + detail.timestamp;
withIdempotency(eventId, function() {
return processOrder(detail);
})
.then(function(result) {
callback(null, result);
})
.catch(function(err) {
callback(err);
});
};
Event Replay and Dead Letter Handling
When things go wrong, and they will, you need dead letter queues (DLQs) and the ability to replay failed events. Every EventBridge rule target should have a DLQ configured. Build a replay mechanism that redrives messages from the DLQ back to the event bus.
var AWS = require("aws-sdk");
var sqs = new AWS.SQS();
var eventbridge = new AWS.EventBridge();
// Replay failed events from DLQ
exports.replayDLQ = function(event, context, callback) {
var dlqUrl = process.env.DLQ_URL;
var maxMessages = parseInt(event.queryStringParameters.max) || 10;
function processMessages(count) {
if (count >= maxMessages) {
return Promise.resolve({ replayed: count });
}
return sqs.receiveMessage({
QueueUrl: dlqUrl,
MaxNumberOfMessages: Math.min(10, maxMessages - count),
WaitTimeSeconds: 1
}).promise()
.then(function(data) {
if (!data.Messages || data.Messages.length === 0) {
return { replayed: count };
}
var replays = data.Messages.map(function(msg) {
var originalEvent = JSON.parse(msg.Body);
return eventbridge.putEvents({
Entries: [{
Source: originalEvent.source,
DetailType: originalEvent["detail-type"],
Detail: JSON.stringify(originalEvent.detail),
EventBusName: "ecommerce-events"
}]
}).promise()
.then(function() {
return sqs.deleteMessage({
QueueUrl: dlqUrl,
ReceiptHandle: msg.ReceiptHandle
}).promise();
});
});
return Promise.all(replays).then(function() {
return processMessages(count + data.Messages.length);
});
});
}
processMessages(0)
.then(function(result) {
callback(null, {
statusCode: 200,
body: JSON.stringify(result)
});
})
.catch(function(err) {
callback(null, {
statusCode: 500,
body: JSON.stringify({ error: err.message })
});
});
};
Set up CloudWatch alarms on your DLQ so you know immediately when events fail:
DLQAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: inventory-dlq-messages
AlarmDescription: Messages in inventory dead letter queue
MetricName: ApproximateNumberOfMessagesVisible
Namespace: AWS/SQS
Statistic: Sum
Period: 60
EvaluationPeriods: 1
Threshold: 1
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: QueueName
Value: !GetAtt InventoryDLQ.QueueName
AlarmActions:
- !Ref AlertSNSTopic
Cross-Service Event Contracts
When multiple teams own different services, you need explicit contracts for events. I recommend a shared npm package that contains event types, schemas, and helper functions. This prevents drift between producers and consumers.
// shared-events/index.js — published as @company/event-contracts
var schemas = require("./schemas");
var validators = {};
var EVENT_TYPES = {
ORDER_PLACED: "OrderPlaced",
ORDER_CONFIRMED: "OrderConfirmed",
ORDER_SHIPPED: "OrderShipped",
ORDER_CANCELLED: "OrderCancelled",
INVENTORY_RESERVED: "InventoryReserved",
INVENTORY_RELEASED: "InventoryReleased",
PAYMENT_PROCESSED: "PaymentProcessed",
PAYMENT_FAILED: "PaymentFailed",
NOTIFICATION_SENT: "NotificationSent"
};
var SOURCES = {
ORDER_SERVICE: "order-service",
INVENTORY_SERVICE: "inventory-service",
PAYMENT_SERVICE: "payment-service",
NOTIFICATION_SERVICE: "notification-service"
};
function createEvent(source, detailType, detail) {
if (!SOURCES[source]) {
throw new Error("Unknown source: " + source);
}
if (!Object.values(EVENT_TYPES).includes(detailType)) {
throw new Error("Unknown event type: " + detailType);
}
return {
Source: SOURCES[source],
DetailType: detailType,
Detail: JSON.stringify(detail),
EventBusName: "ecommerce-events",
Time: new Date()
};
}
module.exports = {
EVENT_TYPES: EVENT_TYPES,
SOURCES: SOURCES,
createEvent: createEvent,
schemas: schemas
};
Complete Working Example: E-Commerce Event System
Here is a complete SAM template and Lambda functions for an e-commerce event system. When an order is placed, EventBridge routes the event to three independent consumers: inventory reservation, customer notification, and analytics ingestion.
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Runtime: nodejs18.x
Timeout: 30
MemorySize: 256
Environment:
Variables:
EVENT_BUS_NAME: !Ref EcommerceEventBus
Resources:
EcommerceEventBus:
Type: AWS::Events::EventBus
Properties:
Name: ecommerce-events
# Producer: Order API
OrderFunction:
Type: AWS::Serverless::Function
Properties:
Handler: src/order/handler.createOrder
Policies:
- EventBridgePutEventsPolicy:
EventBusName: !Ref EcommerceEventBus
- DynamoDBCrudPolicy:
TableName: !Ref OrdersTable
Events:
CreateOrder:
Type: Api
Properties:
Path: /orders
Method: POST
# Consumer 1: Inventory Service
InventoryFunction:
Type: AWS::Serverless::Function
Properties:
Handler: src/inventory/handler.reserveInventory
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref InventoryTable
- EventBridgePutEventsPolicy:
EventBusName: !Ref EcommerceEventBus
Events:
OrderPlaced:
Type: EventBridgeRule
Properties:
EventBusName: !Ref EcommerceEventBus
Pattern:
source: ["order-service"]
detail-type: ["OrderPlaced"]
DeadLetterConfig:
Arn: !GetAtt InventoryDLQ.Arn
RetryPolicy:
MaximumRetryAttempts: 3
# Consumer 2: Notification Service
NotificationFunction:
Type: AWS::Serverless::Function
Properties:
Handler: src/notification/handler.sendOrderConfirmation
Policies:
- SESCrudPolicy:
IdentityName: !Ref SESIdentity
Events:
OrderPlaced:
Type: EventBridgeRule
Properties:
EventBusName: !Ref EcommerceEventBus
Pattern:
source: ["order-service"]
detail-type: ["OrderPlaced"]
# Consumer 3: Analytics Service
AnalyticsFunction:
Type: AWS::Serverless::Function
Properties:
Handler: src/analytics/handler.ingestOrderEvent
Policies:
- KinesisStreamCrudPolicy:
StreamName: !Ref AnalyticsStream
Events:
AllOrderEvents:
Type: EventBridgeRule
Properties:
EventBusName: !Ref EcommerceEventBus
Pattern:
source: ["order-service"]
OrdersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: orders
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: orderId
AttributeType: S
KeySchema:
- AttributeName: orderId
KeyType: HASH
InventoryTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: inventory
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: productId
AttributeType: S
KeySchema:
- AttributeName: productId
KeyType: HASH
InventoryDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: inventory-dlq
MessageRetentionPeriod: 1209600
AnalyticsStream:
Type: AWS::Kinesis::Stream
Properties:
Name: order-analytics
ShardCount: 1
The order handler (producer):
// src/order/handler.js
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var eventbridge = new AWS.EventBridge();
exports.createOrder = function(event, context, callback) {
var body = JSON.parse(event.body);
var orderId = "ord-" + Date.now().toString(36) + Math.random().toString(36).substr(2, 4);
var order = {
orderId: orderId,
customerId: body.customerId,
customerEmail: body.email,
items: body.items,
total: body.items.reduce(function(sum, item) {
return sum + (item.price * item.quantity);
}, 0),
status: "placed",
createdAt: new Date().toISOString()
};
dynamodb.put({
TableName: "orders",
Item: order
}).promise()
.then(function() {
return eventbridge.putEvents({
Entries: [{
Source: "order-service",
DetailType: "OrderPlaced",
Detail: JSON.stringify(order),
EventBusName: process.env.EVENT_BUS_NAME
}]
}).promise();
})
.then(function(result) {
if (result.FailedEntryCount > 0) {
console.error("EventBridge publish partial failure:", JSON.stringify(result.Entries));
}
callback(null, {
statusCode: 201,
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ orderId: orderId, status: "placed" })
});
})
.catch(function(err) {
console.error("Order creation failed:", err);
callback(null, {
statusCode: 500,
body: JSON.stringify({ error: "Failed to create order" })
});
});
};
The inventory consumer:
// src/inventory/handler.js
var AWS = require("aws-sdk");
var dynamodb = new AWS.DynamoDB.DocumentClient();
var eventbridge = new AWS.EventBridge();
exports.reserveInventory = function(event, context, callback) {
var order = event.detail;
console.log("Reserving inventory for order:", order.orderId);
var reservations = order.items.map(function(item) {
return dynamodb.update({
TableName: "inventory",
Key: { productId: item.productId },
UpdateExpression: "SET availableQty = availableQty - :qty, reservedQty = reservedQty + :qty",
ConditionExpression: "availableQty >= :qty",
ExpressionAttributeValues: { ":qty": item.quantity },
ReturnValues: "ALL_NEW"
}).promise()
.then(function(result) {
return { productId: item.productId, reserved: true, remaining: result.Attributes.availableQty };
})
.catch(function(err) {
if (err.code === "ConditionalCheckFailedException") {
return { productId: item.productId, reserved: false, reason: "Insufficient stock" };
}
throw err;
});
});
Promise.all(reservations)
.then(function(results) {
var allReserved = results.every(function(r) { return r.reserved; });
if (allReserved) {
return eventbridge.putEvents({
Entries: [{
Source: "inventory-service",
DetailType: "InventoryReserved",
Detail: JSON.stringify({
orderId: order.orderId,
items: results,
totalAmount: order.total
}),
EventBusName: process.env.EVENT_BUS_NAME
}]
}).promise();
} else {
var failedItems = results.filter(function(r) { return !r.reserved; });
// Rollback successful reservations
var rollbacks = results
.filter(function(r) { return r.reserved; })
.map(function(r) {
var originalItem = order.items.find(function(i) { return i.productId === r.productId; });
return dynamodb.update({
TableName: "inventory",
Key: { productId: r.productId },
UpdateExpression: "SET availableQty = availableQty + :qty, reservedQty = reservedQty - :qty",
ExpressionAttributeValues: { ":qty": originalItem.quantity }
}).promise();
});
return Promise.all(rollbacks).then(function() {
return eventbridge.putEvents({
Entries: [{
Source: "inventory-service",
DetailType: "InventoryReservationFailed",
Detail: JSON.stringify({
orderId: order.orderId,
failedItems: failedItems,
reason: "Insufficient stock for one or more items"
}),
EventBusName: process.env.EVENT_BUS_NAME
}]
}).promise();
});
}
})
.then(function() { callback(null, "Inventory processed"); })
.catch(function(err) {
console.error("Inventory reservation error:", err);
callback(err);
});
};
The notification consumer:
// src/notification/handler.js
var AWS = require("aws-sdk");
var ses = new AWS.SES();
exports.sendOrderConfirmation = function(event, context, callback) {
var order = event.detail;
var itemList = order.items.map(function(item) {
return " - " + item.productId + " (qty: " + item.quantity + ") — $" + item.price.toFixed(2);
}).join("\n");
var params = {
Source: "[email protected]",
Destination: { ToAddresses: [order.customerEmail] },
Message: {
Subject: { Data: "Order Confirmation — " + order.orderId },
Body: {
Text: {
Data: "Thank you for your order!\n\n" +
"Order ID: " + order.orderId + "\n" +
"Items:\n" + itemList + "\n\n" +
"Total: $" + order.total.toFixed(2) + "\n\n" +
"We will notify you when your order ships."
}
}
}
};
ses.sendEmail(params).promise()
.then(function(result) {
console.log("Confirmation sent:", result.MessageId);
callback(null, "Email sent");
})
.catch(function(err) {
console.error("Email send failed:", err);
callback(err);
});
};
The analytics consumer:
// src/analytics/handler.js
var AWS = require("aws-sdk");
var kinesis = new AWS.Kinesis();
exports.ingestOrderEvent = function(event, context, callback) {
var detail = event.detail;
var detailType = event["detail-type"];
var analyticsRecord = {
eventType: detailType,
orderId: detail.orderId,
customerId: detail.customerId,
total: detail.total,
itemCount: detail.items ? detail.items.length : 0,
timestamp: new Date().toISOString(),
source: event.source
};
kinesis.putRecord({
StreamName: "order-analytics",
Data: JSON.stringify(analyticsRecord),
PartitionKey: detail.orderId
}).promise()
.then(function() {
console.log("Analytics event ingested:", detailType, detail.orderId);
callback(null, "Ingested");
})
.catch(function(err) {
console.error("Analytics ingestion failed:", err);
callback(err);
});
};
Common Issues and Troubleshooting
1. EventBridge event never reaches the target Lambda
Error: No matching rule found for event
This usually means your event pattern does not match the event structure. Remember that detail-type in the rule pattern must exactly match the DetailType you publish. Case matters. Check your pattern with the EventBridge sandbox: go to the EventBridge console, select your bus, and use the event pattern tester. Also verify the rule is in the ENABLED state and attached to the correct event bus name.
2. DynamoDB Stream Lambda receives partial batches or duplicate records
Error: Lambda function timeout exceeded. Records will be retried.
When your Lambda times out while processing a DynamoDB Stream batch, the entire batch retries from the beginning. This is why idempotency matters. Set BisectBatchOnFunctionError: true in your event source mapping so Lambda splits the batch in half and retries smaller chunks. Also reduce BatchSize and increase function timeout.
3. EventBridge PutEvents returns success but FailedEntryCount is greater than zero
// Response from putEvents
{
"FailedEntryCount": 1,
"Entries": [
{
"ErrorCode": "InternalFailure",
"ErrorMessage": "An internal error occurred"
}
]
}
Always check result.FailedEntryCount after calling putEvents(). A 200 response does not mean all events succeeded. Implement retry logic for failed entries with exponential backoff. This is a common production bug — most developers only check the HTTP status code.
4. WebSocket postToConnection fails with 410 GoneException
GoneException: Connection ID abcd1234 is no longer available
Connections go stale when clients disconnect without sending a proper close frame. Mobile clients frequently do this. Always wrap postToConnection in a try-catch, and when you get a 410 status code, delete the stale connection record from your connections table. Do not let stale connections accumulate or you will waste Lambda invocations trying to post to dead connections.
5. Saga state gets corrupted when multiple events arrive simultaneously
ConditionalCheckFailedException: The conditional request failed
When two saga steps try to update the same saga record at the same time, you get conflicts. Use DynamoDB conditional writes with version numbers to detect conflicts. Alternatively, structure your saga state table so each step writes to a separate attribute (as shown in the saga example above), which avoids write conflicts entirely.
Best Practices
Always configure dead letter queues on every EventBridge rule target and every Lambda event source mapping. Events that cannot be processed must go somewhere observable, not disappear silently.
Validate events at the boundary. Validate schema at publish time (producer side) and at consumption time (consumer side). Schemas will evolve, and defensive parsing prevents cascading failures across services.
Design events as facts, not commands. Name events in past tense —
OrderPlaced, notPlaceOrder. Events describe what happened. Commands describe what should happen. Mixing the two creates tight coupling.Keep event payloads lean. Include identifiers and essential data, not entire database records. Consumers can look up additional details if needed. Large payloads hit EventBridge's 256KB limit and increase costs.
Implement idempotency in every consumer. At-least-once delivery is the norm. Use idempotency keys derived from the event's unique identifier plus the event type. Store processing results in DynamoDB with a TTL so the idempotency table does not grow unbounded.
Version your event schemas. Add a
schemaVersionfield to every event. Consumers should handle unknown versions gracefully by logging and skipping rather than crashing. This allows you to evolve schemas without coordinating deployments across all teams simultaneously.Use content-based filtering aggressively. Do not have consumers receive all events and filter in code. Push filtering to EventBridge rules. This reduces Lambda invocations, lowers cost, and improves latency.
Monitor event age and DLQ depth. Set CloudWatch alarms on
ApproximateAgeOfOldestMessagefor SQS queues andIngestionToInvocationStartLatencyfor EventBridge. If events are getting old, your consumers cannot keep up.Test event flows with integration tests. Unit testing individual Lambda handlers is not enough. Deploy to a test environment and publish real events through the bus. Verify that the right consumers fire in the right order. Use EventBridge's archive and replay feature to capture production event patterns for test fixtures.