Aws

SQS and SNS Messaging Patterns

Build scalable messaging architectures with AWS SQS and SNS using fan-out patterns, dead letter queues, and Lambda integration

SQS and SNS Messaging Patterns

AWS Simple Queue Service (SQS) and Simple Notification Service (SNS) are the backbone of asynchronous communication in most AWS architectures. They decouple producers from consumers, absorb traffic spikes, and let you build systems that fail gracefully instead of catastrophically. If you are building anything beyond a monolith on AWS, you need to understand these services deeply.

Prerequisites

  • An AWS account with IAM permissions for SQS, SNS, and Lambda
  • Node.js 18+ installed locally
  • AWS SDK v3 for JavaScript (@aws-sdk/client-sqs, @aws-sdk/client-sns)
  • Basic understanding of distributed systems concepts
  • AWS CLI configured with valid credentials

Install the required packages:

npm install @aws-sdk/client-sqs @aws-sdk/client-sns @aws-sdk/client-lambda

SQS Standard vs FIFO Queues

SQS offers two queue types, and picking the wrong one will haunt you.

Standard queues provide at-least-once delivery with best-effort ordering. They scale to virtually unlimited throughput. Messages might be delivered more than once, and they might arrive out of order. For most workloads, this is fine. Your consumers should be idempotent anyway.

FIFO queues guarantee exactly-once processing and strict ordering within a message group. They cap at 3,000 messages per second with batching (300 without). FIFO queue names must end with .fifo.

var { SQSClient, CreateQueueCommand } = require("@aws-sdk/client-sqs");

var sqs = new SQSClient({ region: "us-east-1" });

// Create a standard queue
function createStandardQueue(name) {
  var params = {
    QueueName: name,
    Attributes: {
      DelaySeconds: "0",
      MessageRetentionPeriod: "345600", // 4 days
      VisibilityTimeout: "30",
      ReceiveMessageWaitTimeSeconds: "20" // long polling
    }
  };

  return sqs.send(new CreateQueueCommand(params));
}

// Create a FIFO queue
function createFifoQueue(name) {
  var params = {
    QueueName: name + ".fifo",
    Attributes: {
      FifoQueue: "true",
      ContentBasedDeduplication: "true",
      DeduplicationScope: "messageGroup",
      FifoThroughputLimit: "perMessageGroupId"
    }
  };

  return sqs.send(new CreateQueueCommand(params));
}

Use standard queues unless you have a hard requirement for ordering or exactly-once processing. I have seen teams choose FIFO queues "just in case" and then struggle with the throughput ceiling during a traffic spike.

Sending and Receiving Messages

The fundamental operations are straightforward, but there are details that matter in production.

Sending Messages

var { SendMessageCommand, SendMessageBatchCommand } = require("@aws-sdk/client-sqs");

var QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue";

function sendMessage(body, attributes) {
  var params = {
    QueueUrl: QUEUE_URL,
    MessageBody: JSON.stringify(body),
    MessageAttributes: attributes || {}
  };

  return sqs.send(new SendMessageCommand(params));
}

// Send a single order message
sendMessage(
  { orderId: "ORD-1234", customerId: "C-5678", total: 99.99 },
  {
    orderType: {
      DataType: "String",
      StringValue: "standard"
    },
    priority: {
      DataType: "Number",
      StringValue: "1"
    }
  }
).then(function(result) {
  console.log("Message sent:", result.MessageId);
}).catch(function(err) {
  console.error("Failed to send:", err.message);
});

Receiving Messages

var { ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");

function receiveMessages(maxMessages) {
  var params = {
    QueueUrl: QUEUE_URL,
    MaxNumberOfMessages: maxMessages || 10,
    WaitTimeSeconds: 20, // long polling
    VisibilityTimeout: 30,
    MessageAttributeNames: ["All"],
    AttributeNames: ["All"]
  };

  return sqs.send(new ReceiveMessageCommand(params));
}

function deleteMessage(receiptHandle) {
  var params = {
    QueueUrl: QUEUE_URL,
    ReceiptHandle: receiptHandle
  };

  return sqs.send(new DeleteMessageCommand(params));
}

// Poll and process messages
function pollQueue() {
  receiveMessages(10).then(function(response) {
    var messages = response.Messages || [];

    if (messages.length === 0) {
      console.log("No messages available");
      return pollQueue();
    }

    var promises = messages.map(function(message) {
      var body = JSON.parse(message.Body);
      console.log("Processing order:", body.orderId);

      return processOrder(body).then(function() {
        return deleteMessage(message.ReceiptHandle);
      });
    });

    return Promise.all(promises).then(function() {
      return pollQueue();
    });
  }).catch(function(err) {
    console.error("Poll error:", err.message);
    setTimeout(pollQueue, 5000); // back off on error
  });
}

Long Polling and Batch Operations

Short polling returns immediately even when the queue is empty, which wastes money and API calls. Long polling waits up to 20 seconds for a message to arrive. Always use long polling.

// Long polling is set per-receive or as a queue attribute
var params = {
  QueueUrl: QUEUE_URL,
  WaitTimeSeconds: 20, // 0 = short polling, 1-20 = long polling
  MaxNumberOfMessages: 10
};

Batch operations reduce API calls and cost. You can send or delete up to 10 messages in a single request.

var { SendMessageBatchCommand, DeleteMessageBatchCommand } = require("@aws-sdk/client-sqs");

function sendBatch(messages) {
  var entries = messages.map(function(msg, index) {
    return {
      Id: "msg_" + index,
      MessageBody: JSON.stringify(msg.body),
      MessageAttributes: msg.attributes || {},
      DelaySeconds: msg.delay || 0
    };
  });

  // SQS batch limit is 10 messages
  var batches = [];
  for (var i = 0; i < entries.length; i += 10) {
    batches.push(entries.slice(i, i + 10));
  }

  var promises = batches.map(function(batch) {
    return sqs.send(new SendMessageBatchCommand({
      QueueUrl: QUEUE_URL,
      Entries: batch
    }));
  });

  return Promise.all(promises).then(function(results) {
    var failed = [];
    results.forEach(function(result) {
      if (result.Failed && result.Failed.length > 0) {
        failed = failed.concat(result.Failed);
      }
    });

    if (failed.length > 0) {
      console.error("Failed to send", failed.length, "messages");
    }

    return results;
  });
}

function deleteBatch(messages) {
  var entries = messages.map(function(msg, index) {
    return {
      Id: "msg_" + index,
      ReceiptHandle: msg.ReceiptHandle
    };
  });

  return sqs.send(new DeleteMessageBatchCommand({
    QueueUrl: QUEUE_URL,
    Entries: entries
  }));
}

Dead Letter Queues

A dead letter queue (DLQ) catches messages that fail processing repeatedly. Without a DLQ, poison messages cycle forever through your queue, burning compute and hiding the problem.

// Create the DLQ first
function setupDeadLetterQueue() {
  return createStandardQueue("orders-dlq").then(function(dlqResult) {
    var dlqUrl = dlqResult.QueueUrl;

    // Get the DLQ ARN
    var { GetQueueAttributesCommand } = require("@aws-sdk/client-sqs");

    return sqs.send(new GetQueueAttributesCommand({
      QueueUrl: dlqUrl,
      AttributeNames: ["QueueArn"]
    })).then(function(attrs) {
      var dlqArn = attrs.Attributes.QueueArn;

      // Create the main queue with the redrive policy
      return sqs.send(new CreateQueueCommand({
        QueueName: "orders-queue",
        Attributes: {
          RedrivePolicy: JSON.stringify({
            deadLetterTargetArn: dlqArn,
            maxReceiveCount: "3" // move to DLQ after 3 failed attempts
          }),
          VisibilityTimeout: "30",
          ReceiveMessageWaitTimeSeconds: "20"
        }
      }));
    });
  });
}

Set maxReceiveCount thoughtfully. Too low and transient errors discard good messages. Too high and you waste processing on genuinely bad messages. I find 3-5 works for most workloads.

Always monitor your DLQ. A growing DLQ means something upstream is broken.

// Check DLQ depth
function checkDlqDepth(dlqUrl) {
  var { GetQueueAttributesCommand } = require("@aws-sdk/client-sqs");

  return sqs.send(new GetQueueAttributesCommand({
    QueueUrl: dlqUrl,
    AttributeNames: ["ApproximateNumberOfMessages"]
  })).then(function(result) {
    var depth = parseInt(result.Attributes.ApproximateNumberOfMessages);

    if (depth > 0) {
      console.warn("DLQ has", depth, "messages - investigate failures");
    }

    return depth;
  });
}

Message Visibility Timeout

When a consumer receives a message, SQS hides it from other consumers for the visibility timeout period. If the consumer does not delete the message before the timeout expires, the message becomes visible again and another consumer can pick it up. This is how SQS handles failed consumers without losing messages.

The default visibility timeout is 30 seconds. If your processing takes longer, you need to extend it.

var { ChangeMessageVisibilityCommand } = require("@aws-sdk/client-sqs");

function extendVisibility(receiptHandle, seconds) {
  return sqs.send(new ChangeMessageVisibilityCommand({
    QueueUrl: QUEUE_URL,
    ReceiptHandle: receiptHandle,
    VisibilityTimeout: seconds
  }));
}

// For long-running tasks, extend periodically
function processWithHeartbeat(message) {
  var handle = message.ReceiptHandle;
  var keepAlive = true;

  // Extend visibility every 20 seconds
  var heartbeat = setInterval(function() {
    if (!keepAlive) return;

    extendVisibility(handle, 30).catch(function(err) {
      console.error("Failed to extend visibility:", err.message);
    });
  }, 20000);

  return processOrder(JSON.parse(message.Body))
    .then(function(result) {
      keepAlive = false;
      clearInterval(heartbeat);
      return deleteMessage(handle);
    })
    .catch(function(err) {
      keepAlive = false;
      clearInterval(heartbeat);
      throw err;
    });
}

Set the visibility timeout to at least 6 times your expected processing time. This accounts for retries and variable latency without making messages invisible for too long on failure.

SNS Topics and Subscriptions

SNS is a pub/sub service. Producers publish messages to a topic, and SNS delivers copies to all subscribers. Subscribers can be SQS queues, Lambda functions, HTTP endpoints, email addresses, or SMS numbers.

var { SNSClient, CreateTopicCommand, SubscribeCommand, PublishCommand } = require("@aws-sdk/client-sns");

var sns = new SNSClient({ region: "us-east-1" });

function createTopic(name) {
  return sns.send(new CreateTopicCommand({ Name: name }));
}

function subscribeQueue(topicArn, queueArn) {
  return sns.send(new SubscribeCommand({
    TopicArn: topicArn,
    Protocol: "sqs",
    Endpoint: queueArn,
    Attributes: {
      RawMessageDelivery: "true" // skip SNS envelope wrapping
    }
  }));
}

function publishMessage(topicArn, message, attributes) {
  return sns.send(new PublishCommand({
    TopicArn: topicArn,
    Message: JSON.stringify(message),
    MessageAttributes: attributes || {}
  }));
}

Enable RawMessageDelivery for SQS subscriptions. Without it, SNS wraps your message in an envelope that your consumers have to unwrap. Raw delivery gives you the message body directly, which is almost always what you want.

Fan-Out Pattern: SNS to SQS

The fan-out pattern is the single most useful messaging pattern on AWS. One SNS topic fans out to multiple SQS queues, each serving a different purpose. A single order event can simultaneously trigger fulfillment, send a notification, update analytics, and sync to a data warehouse.

Producer → SNS Topic → SQS Queue (Fulfillment) → Consumer
                      → SQS Queue (Notifications) → Consumer
                      → SQS Queue (Analytics) → Consumer

This decouples the producer completely. Adding a new consumer means subscribing a new queue. No producer changes. No deployments.

function setupFanOut() {
  var topicArn;
  var queues = {};

  return createTopic("order-events").then(function(result) {
    topicArn = result.TopicArn;

    // Create the downstream queues
    return Promise.all([
      createStandardQueue("order-fulfillment"),
      createStandardQueue("order-notifications"),
      createStandardQueue("order-analytics")
    ]);
  }).then(function(queueResults) {
    // Get queue ARNs for subscriptions
    var { GetQueueAttributesCommand } = require("@aws-sdk/client-sqs");

    var arnPromises = queueResults.map(function(q) {
      return sqs.send(new GetQueueAttributesCommand({
        QueueUrl: q.QueueUrl,
        AttributeNames: ["QueueArn"]
      }));
    });

    return Promise.all(arnPromises);
  }).then(function(arnResults) {
    // Subscribe each queue to the topic
    var subscriptions = arnResults.map(function(result) {
      return subscribeQueue(topicArn, result.Attributes.QueueArn);
    });

    return Promise.all(subscriptions);
  }).then(function() {
    console.log("Fan-out configured:", topicArn);
    return topicArn;
  });
}

You must also set SQS queue policies to allow SNS to send messages:

function allowSnsToSendToSqs(queueUrl, queueArn, topicArn) {
  var { SetQueueAttributesCommand } = require("@aws-sdk/client-sqs");

  var policy = {
    Version: "2012-10-17",
    Statement: [{
      Sid: "AllowSNS",
      Effect: "Allow",
      Principal: { Service: "sns.amazonaws.com" },
      Action: "sqs:SendMessage",
      Resource: queueArn,
      Condition: {
        ArnEquals: { "aws:SourceArn": topicArn }
      }
    }]
  };

  return sqs.send(new SetQueueAttributesCommand({
    QueueUrl: queueUrl,
    Attributes: {
      Policy: JSON.stringify(policy)
    }
  }));
}

Forgetting this policy is the most common fan-out setup mistake. Messages silently disappear because SNS cannot deliver them.

Message Filtering Policies

SNS subscription filters let subscribers receive only the messages they care about. Without filters, every subscriber gets every message. Filters run on the SNS side so unwanted messages never touch your queues.

function subscribeWithFilter(topicArn, queueArn, filterPolicy) {
  return sns.send(new SubscribeCommand({
    TopicArn: topicArn,
    Protocol: "sqs",
    Endpoint: queueArn,
    Attributes: {
      RawMessageDelivery: "true",
      FilterPolicy: JSON.stringify(filterPolicy),
      FilterPolicyScope: "MessageAttributes"
    }
  }));
}

// Only receive high-value orders
subscribeWithFilter(topicArn, analyticsQueueArn, {
  orderType: ["premium", "enterprise"],
  orderTotal: [{ numeric: [">=", 1000] }]
});

// Only receive orders needing physical fulfillment
subscribeWithFilter(topicArn, fulfillmentQueueArn, {
  fulfillmentType: ["physical"],
  region: ["us-east", "us-west", "eu-west"]
});

// When publishing, include the attributes that filters match against
publishMessage(topicArn, orderData, {
  orderType: { DataType: "String", StringValue: "premium" },
  orderTotal: { DataType: "Number", StringValue: "2500" },
  fulfillmentType: { DataType: "String", StringValue: "physical" },
  region: { DataType: "String", StringValue: "us-east" }
});

Filter policies support string matching, numeric comparisons, prefix matching, and existence checks. They can dramatically reduce processing load on downstream consumers.

SQS as Lambda Event Source

Lambda can poll SQS directly, removing the need for your own polling infrastructure. This is the preferred pattern for serverless architectures.

// Lambda handler for SQS events
exports.handler = function(event, context, callback) {
  var promises = event.Records.map(function(record) {
    var body = JSON.parse(record.body);
    var messageId = record.messageId;
    var receiptHandle = record.receiptHandle;

    console.log("Processing message:", messageId, body);

    return processOrder(body);
  });

  Promise.all(promises).then(function(results) {
    callback(null, { statusCode: 200 });
  }).catch(function(err) {
    console.error("Processing failed:", err);
    callback(err);
  });
};

function processOrder(order) {
  // Your business logic here
  console.log("Fulfilling order:", order.orderId);
  return Promise.resolve({ processed: true });
}

Configure the event source mapping with the AWS CLI or CloudFormation:

# CloudFormation snippet
OrderProcessor:
  Type: AWS::Lambda::EventSourceMapping
  Properties:
    EventSourceArn: !GetAtt OrderQueue.Arn
    FunctionName: !Ref OrderProcessorFunction
    BatchSize: 10
    MaximumBatchingWindowInSeconds: 5
    FunctionResponseTypes:
      - ReportBatchItemFailures

The ReportBatchItemFailures setting is critical. Without it, if one message in a batch fails, Lambda retries the entire batch. With it, you can report which specific messages failed, and only those get retried.

// Lambda handler with partial batch failure reporting
exports.handler = function(event, context, callback) {
  var batchItemFailures = [];

  var promises = event.Records.map(function(record) {
    return processOrder(JSON.parse(record.body)).catch(function(err) {
      console.error("Failed to process:", record.messageId, err.message);
      batchItemFailures.push({ itemIdentifier: record.messageId });
    });
  });

  Promise.all(promises).then(function() {
    callback(null, { batchItemFailures: batchItemFailures });
  });
};

FIFO Exactly-Once Processing

FIFO queues use message group IDs to maintain ordering and deduplication IDs to prevent duplicates within a 5-minute window.

var { SendMessageCommand } = require("@aws-sdk/client-sqs");

var FIFO_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789012/orders.fifo";

function sendFifoMessage(body, groupId, deduplicationId) {
  var params = {
    QueueUrl: FIFO_QUEUE_URL,
    MessageBody: JSON.stringify(body),
    MessageGroupId: groupId,
    MessageDeduplicationId: deduplicationId
  };

  return sqs.send(new SendMessageCommand(params));
}

// Group by customer to maintain per-customer ordering
sendFifoMessage(
  { orderId: "ORD-001", action: "created", customerId: "C-100" },
  "customer-C-100",      // messages for this customer are ordered
  "ORD-001-created"       // prevents duplicate sends
);

sendFifoMessage(
  { orderId: "ORD-001", action: "paid", customerId: "C-100" },
  "customer-C-100",
  "ORD-001-paid"
);

Use the entity ID as the message group ID. Orders for customer A are ordered relative to each other, but they process in parallel with orders for customer B. This gives you both ordering guarantees and throughput scaling.

If you enable ContentBasedDeduplication on the queue, SQS generates the deduplication ID from a SHA-256 hash of the message body. This is convenient but dangerous if you ever send identical bodies for legitimately different events.

Message Attributes

Message attributes carry metadata alongside the body. They are essential for filtering, routing, and avoiding unnecessary deserialization.

function sendWithAttributes(body, metadata) {
  var messageAttributes = {};

  Object.keys(metadata).forEach(function(key) {
    var value = metadata[key];
    var dataType = typeof value === "number" ? "Number" : "String";

    messageAttributes[key] = {
      DataType: dataType,
      StringValue: String(value)
    };
  });

  return sqs.send(new SendMessageCommand({
    QueueUrl: QUEUE_URL,
    MessageBody: JSON.stringify(body),
    MessageAttributes: messageAttributes
  }));
}

// Usage
sendWithAttributes(
  { orderId: "ORD-1234", items: [{ sku: "WIDGET-A", qty: 5 }] },
  {
    eventType: "order.created",
    source: "checkout-service",
    version: 2,
    priority: 1,
    region: "us-east-1"
  }
);

SQS supports up to 10 message attributes per message. Each attribute has a name (up to 256 characters), a data type (String, Number, or Binary), and a value. Keep attribute names consistent across your system. I recommend creating a shared constants file for attribute names.

Retry Strategies

Not all failures are the same. Transient failures (network blips, throttling) should retry quickly. Persistent failures (malformed data, missing dependencies) should not.

function processWithRetry(message, attempt) {
  attempt = attempt || 1;
  var maxAttempts = 3;
  var body = JSON.parse(message.Body);

  return processOrder(body).then(function() {
    return deleteMessage(message.ReceiptHandle);
  }).catch(function(err) {
    if (isTransientError(err) && attempt < maxAttempts) {
      var delay = Math.pow(2, attempt) * 1000; // exponential backoff
      console.warn("Transient error, retry", attempt, "in", delay, "ms");

      return new Promise(function(resolve) {
        setTimeout(resolve, delay);
      }).then(function() {
        return processWithRetry(message, attempt + 1);
      });
    }

    // Permanent failure or max retries exceeded
    console.error("Giving up on message:", message.MessageId, err.message);
    // Let visibility timeout expire so DLQ picks it up
    throw err;
  });
}

function isTransientError(err) {
  var transientCodes = [
    "ThrottlingException",
    "ServiceUnavailable",
    "InternalServerError",
    "ECONNRESET",
    "ETIMEDOUT"
  ];

  return transientCodes.indexOf(err.code || err.name) !== -1;
}

Combine application-level retries with the SQS retry mechanism (redrive policy). Application retries handle transient issues immediately. The DLQ catches genuinely broken messages after the maxReceiveCount threshold.

Monitoring Queue Depth

CloudWatch metrics for SQS are your early warning system. The key metric is ApproximateNumberOfMessagesVisible — if it is growing, your consumers are not keeping up.

var { CloudWatchClient, PutMetricAlarmCommand } = require("@aws-sdk/client-cloudwatch");

var cloudwatch = new CloudWatchClient({ region: "us-east-1" });

function createQueueDepthAlarm(queueName, threshold) {
  return cloudwatch.send(new PutMetricAlarmCommand({
    AlarmName: queueName + "-depth-alarm",
    MetricName: "ApproximateNumberOfMessagesVisible",
    Namespace: "AWS/SQS",
    Statistic: "Average",
    Period: 300, // 5 minutes
    EvaluationPeriods: 3,
    Threshold: threshold,
    ComparisonOperator: "GreaterThanThreshold",
    AlarmActions: ["arn:aws:sns:us-east-1:123456789012:ops-alerts"],
    Dimensions: [{
      Name: "QueueName",
      Value: queueName
    }]
  }));
}

// Alert if more than 1000 messages are waiting for 15 minutes
createQueueDepthAlarm("orders-queue", 1000);

// DLQ should always be near zero
createQueueDepthAlarm("orders-dlq", 1);

Also monitor ApproximateAgeOfOldestMessage. A growing age means messages are sitting in the queue too long. Set alarms on both metrics.

Complete Working Example: Order Processing System

This example implements an order processing system using SNS fan-out to three SQS queues, each consumed by a Lambda function.

Infrastructure Setup

// infrastructure.js
var { SQSClient, CreateQueueCommand, GetQueueAttributesCommand, SetQueueAttributesCommand } = require("@aws-sdk/client-sqs");
var { SNSClient, CreateTopicCommand, SubscribeCommand } = require("@aws-sdk/client-sns");

var sqs = new SQSClient({ region: "us-east-1" });
var sns = new SNSClient({ region: "us-east-1" });

function setupOrderProcessingSystem() {
  var topicArn;
  var queueConfigs = [
    { name: "order-fulfillment", dlq: "order-fulfillment-dlq" },
    { name: "order-notifications", dlq: "order-notifications-dlq" },
    { name: "order-analytics", dlq: "order-analytics-dlq" }
  ];

  // Step 1: Create the SNS topic
  return sns.send(new CreateTopicCommand({ Name: "order-events" }))
    .then(function(topicResult) {
      topicArn = topicResult.TopicArn;
      console.log("Created topic:", topicArn);

      // Step 2: Create DLQs
      var dlqPromises = queueConfigs.map(function(config) {
        return sqs.send(new CreateQueueCommand({
          QueueName: config.dlq,
          Attributes: { MessageRetentionPeriod: "1209600" } // 14 days
        }));
      });

      return Promise.all(dlqPromises);
    })
    .then(function(dlqResults) {
      // Get DLQ ARNs
      var arnPromises = dlqResults.map(function(dlq) {
        return sqs.send(new GetQueueAttributesCommand({
          QueueUrl: dlq.QueueUrl,
          AttributeNames: ["QueueArn"]
        }));
      });

      return Promise.all(arnPromises);
    })
    .then(function(dlqArnResults) {
      // Step 3: Create main queues with redrive policies
      var queuePromises = queueConfigs.map(function(config, index) {
        var dlqArn = dlqArnResults[index].Attributes.QueueArn;

        return sqs.send(new CreateQueueCommand({
          QueueName: config.name,
          Attributes: {
            VisibilityTimeout: "60",
            ReceiveMessageWaitTimeSeconds: "20",
            RedrivePolicy: JSON.stringify({
              deadLetterTargetArn: dlqArn,
              maxReceiveCount: "3"
            })
          }
        }));
      });

      return Promise.all(queuePromises);
    })
    .then(function(queueResults) {
      // Get main queue ARNs
      var arnPromises = queueResults.map(function(q) {
        return sqs.send(new GetQueueAttributesCommand({
          QueueUrl: q.QueueUrl,
          AttributeNames: ["QueueArn"]
        }));
      });

      return Promise.all(arnPromises).then(function(arns) {
        return { queues: queueResults, arns: arns };
      });
    })
    .then(function(result) {
      // Step 4: Set queue policies and subscribe to SNS
      var setupPromises = result.arns.map(function(arnResult, index) {
        var queueArn = arnResult.Attributes.QueueArn;
        var queueUrl = result.queues[index].QueueUrl;

        // Allow SNS to send messages to this queue
        var policy = {
          Version: "2012-10-17",
          Statement: [{
            Sid: "AllowSNSSend",
            Effect: "Allow",
            Principal: { Service: "sns.amazonaws.com" },
            Action: "sqs:SendMessage",
            Resource: queueArn,
            Condition: {
              ArnEquals: { "aws:SourceArn": topicArn }
            }
          }]
        };

        return sqs.send(new SetQueueAttributesCommand({
          QueueUrl: queueUrl,
          Attributes: { Policy: JSON.stringify(policy) }
        })).then(function() {
          return sns.send(new SubscribeCommand({
            TopicArn: topicArn,
            Protocol: "sqs",
            Endpoint: queueArn,
            Attributes: { RawMessageDelivery: "true" }
          }));
        });
      });

      return Promise.all(setupPromises);
    })
    .then(function() {
      console.log("Order processing system configured successfully");
      return { topicArn: topicArn };
    });
}

setupOrderProcessingSystem().catch(function(err) {
  console.error("Setup failed:", err);
  process.exit(1);
});

Order Publisher

// publisher.js
var { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");

var sns = new SNSClient({ region: "us-east-1" });
var TOPIC_ARN = process.env.ORDER_TOPIC_ARN;

function publishOrderEvent(order, eventType) {
  var message = {
    eventType: eventType,
    timestamp: new Date().toISOString(),
    order: {
      orderId: order.orderId,
      customerId: order.customerId,
      items: order.items,
      total: order.total,
      shippingAddress: order.shippingAddress,
      email: order.email
    }
  };

  var attributes = {
    eventType: { DataType: "String", StringValue: eventType },
    orderTotal: { DataType: "Number", StringValue: String(order.total) },
    region: { DataType: "String", StringValue: order.region || "us-east" }
  };

  return sns.send(new PublishCommand({
    TopicArn: TOPIC_ARN,
    Message: JSON.stringify(message),
    MessageAttributes: attributes
  })).then(function(result) {
    console.log("Published", eventType, "for order", order.orderId, "MessageId:", result.MessageId);
    return result;
  });
}

// Express route handler example
function handleCheckout(req, res) {
  var order = req.body;
  order.orderId = "ORD-" + Date.now();

  // Save to database first, then publish event
  saveOrderToDatabase(order).then(function() {
    return publishOrderEvent(order, "order.created");
  }).then(function() {
    res.json({ success: true, orderId: order.orderId });
  }).catch(function(err) {
    console.error("Checkout failed:", err);
    res.status(500).json({ error: "Order processing failed" });
  });
}

function saveOrderToDatabase(order) {
  // Database save logic
  return Promise.resolve(order);
}

module.exports = { publishOrderEvent: publishOrderEvent };

Fulfillment Consumer (Lambda)

// fulfillment-handler.js
var { DynamoDBClient, PutItemCommand, GetItemCommand } = require("@aws-sdk/client-dynamodb");

var dynamodb = new DynamoDBClient({ region: "us-east-1" });

exports.handler = function(event, context, callback) {
  var batchItemFailures = [];

  var promises = event.Records.map(function(record) {
    var body = JSON.parse(record.body);
    var order = body.order;

    console.log("Processing fulfillment for:", order.orderId);

    // Idempotency check
    return checkIfProcessed(order.orderId).then(function(alreadyProcessed) {
      if (alreadyProcessed) {
        console.log("Order already fulfilled:", order.orderId);
        return;
      }

      return fulfillOrder(order).then(function() {
        return markAsProcessed(order.orderId);
      });
    }).catch(function(err) {
      console.error("Fulfillment failed for:", order.orderId, err.message);
      batchItemFailures.push({ itemIdentifier: record.messageId });
    });
  });

  Promise.all(promises).then(function() {
    callback(null, { batchItemFailures: batchItemFailures });
  });
};

function checkIfProcessed(orderId) {
  return dynamodb.send(new GetItemCommand({
    TableName: "order-fulfillment-log",
    Key: { orderId: { S: orderId } }
  })).then(function(result) {
    return !!result.Item;
  });
}

function fulfillOrder(order) {
  console.log("Reserving inventory for", order.items.length, "items");
  console.log("Creating shipping label for:", order.shippingAddress);

  // Actual fulfillment logic here
  return Promise.resolve();
}

function markAsProcessed(orderId) {
  return dynamodb.send(new PutItemCommand({
    TableName: "order-fulfillment-log",
    Item: {
      orderId: { S: orderId },
      processedAt: { S: new Date().toISOString() },
      ttl: { N: String(Math.floor(Date.now() / 1000) + 86400 * 30) } // 30 day TTL
    }
  }));
}

Notification Consumer (Lambda)

// notification-handler.js
var { SESClient, SendEmailCommand } = require("@aws-sdk/client-ses");

var ses = new SESClient({ region: "us-east-1" });

exports.handler = function(event, context, callback) {
  var batchItemFailures = [];

  var promises = event.Records.map(function(record) {
    var body = JSON.parse(record.body);
    var order = body.order;

    console.log("Sending notification for:", order.orderId);

    return sendOrderConfirmation(order).catch(function(err) {
      console.error("Notification failed for:", order.orderId, err.message);
      batchItemFailures.push({ itemIdentifier: record.messageId });
    });
  });

  Promise.all(promises).then(function() {
    callback(null, { batchItemFailures: batchItemFailures });
  });
};

function sendOrderConfirmation(order) {
  var params = {
    Destination: { ToAddresses: [order.email] },
    Message: {
      Subject: { Data: "Order Confirmation: " + order.orderId },
      Body: {
        Html: {
          Data: "<h1>Thank you for your order!</h1>" +
            "<p>Order ID: " + order.orderId + "</p>" +
            "<p>Total: $" + order.total.toFixed(2) + "</p>" +
            "<p>We will notify you when your order ships.</p>"
        }
      }
    },
    Source: "[email protected]"
  };

  return ses.send(new SendEmailCommand(params));
}

Analytics Consumer (Lambda)

// analytics-handler.js
var { FirehoseClient, PutRecordBatchCommand } = require("@aws-sdk/client-firehose");

var firehose = new FirehoseClient({ region: "us-east-1" });

exports.handler = function(event, context, callback) {
  var records = event.Records.map(function(record) {
    var body = JSON.parse(record.body);

    var analyticsEvent = {
      eventType: body.eventType,
      orderId: body.order.orderId,
      customerId: body.order.customerId,
      total: body.order.total,
      itemCount: body.order.items.length,
      region: body.order.region || "unknown",
      timestamp: body.timestamp,
      processedAt: new Date().toISOString()
    };

    return {
      Data: Buffer.from(JSON.stringify(analyticsEvent) + "\n")
    };
  });

  firehose.send(new PutRecordBatchCommand({
    DeliveryStreamName: "order-analytics-stream",
    Records: records
  })).then(function(result) {
    if (result.FailedPutCount > 0) {
      console.error("Failed to deliver", result.FailedPutCount, "records to Firehose");
    }

    console.log("Delivered", records.length, "analytics events");
    callback(null, { statusCode: 200 });
  }).catch(function(err) {
    console.error("Analytics delivery failed:", err);
    callback(err);
  });
};

Common Issues and Troubleshooting

1. Messages Not Arriving in SQS from SNS

Error: No messages received in SQS queue despite publishing to SNS topic
CloudWatch: NumberOfMessagesPublished shows 0 for the subscription

The queue policy does not allow SNS to send messages. Verify the SQS policy includes sqs:SendMessage with the correct SNS topic ARN as the source condition. Also check that the subscription is in the Confirmed state using aws sns list-subscriptions-by-topic.

2. Lambda Receives Same Messages Repeatedly

Error: Function invocation timed out after 30000ms
CloudWatch: ApproximateNumberOfMessagesNotVisible keeps growing

The Lambda function timeout exceeds the SQS visibility timeout. If Lambda takes 60 seconds but the visibility timeout is 30 seconds, the message becomes visible again while Lambda is still processing it. Set the SQS visibility timeout to at least 6 times the Lambda timeout. If your Lambda timeout is 30 seconds, set the visibility timeout to 180 seconds.

3. FIFO Queue Throttling

OverLimit: The maximum throughput for the FIFO queue has been reached.
Request ID: abc123-def456

You are hitting the FIFO throughput limit. With batching you get 3,000 messages per second per message group. If all messages use the same group ID, you are limiting yourself to a single partition. Spread messages across multiple group IDs to increase parallel processing. If you need higher throughput and can tolerate at-least-once delivery, switch to a standard queue with application-level deduplication.

4. Messages Going to DLQ Prematurely

ApproximateReceiveCount: 3 (maxReceiveCount is 3)
But processing was successful — message was still moved to DLQ

This happens when the consumer processes the message but fails to delete it before the visibility timeout expires. The message becomes visible, gets received again, and hits the maxReceiveCount threshold. Increase the visibility timeout, add a heartbeat to extend it during long operations, and make sure the delete call happens immediately after successful processing. Also check that you are using the correct receipt handle — receipt handles change on each receive.

5. Batch Partial Failures Not Working

All messages in batch retried even though only one failed
Lambda response: { statusCode: 200 }

You must enable ReportBatchItemFailures on the event source mapping AND return the correct response format: { batchItemFailures: [{ itemIdentifier: "messageId" }] }. Returning a status code does not trigger partial failure reporting. Also verify the response uses batchItemFailures (exact casing) as the key.

Best Practices

  • Always use long polling. Set ReceiveMessageWaitTimeSeconds to 20 on the queue or per request. Short polling wastes money and adds latency. There is no good reason to short poll in production.

  • Make consumers idempotent. SQS standard queues deliver at least once, and even FIFO queues can deliver duplicates during high-availability events. Use a deduplication table (DynamoDB with conditional writes works well) to track processed message IDs.

  • Set up dead letter queues on every queue. A DLQ is your safety net. Without one, poison messages cycle forever. Set maxReceiveCount between 3 and 5. Monitor the DLQ with CloudWatch alarms.

  • Use batch operations everywhere. Sending, receiving, and deleting in batches of 10 reduces your API call count by 10x and your costs proportionally. Always handle partial batch failures — check the Failed array in batch responses.

  • Enable raw message delivery for SQS subscriptions. Without it, SNS wraps your message in an envelope that adds unnecessary parsing complexity. Raw delivery gives your SQS consumers the exact message body from the publisher.

  • Size your visibility timeout correctly. It should be at least 6 times your average processing time. Too short and messages get processed multiple times. Too long and failed messages are invisible for extended periods. For Lambda, set it to 6 times the Lambda function timeout.

  • Use message filtering instead of application-level filtering. SNS filter policies are evaluated before delivery, so rejected messages never consume queue resources or Lambda invocations. Filtering at the SNS level is free. Filtering at the consumer level costs compute.

  • Implement exponential backoff for transient errors. Network issues and throttling are temporary. Immediate retries amplify the problem. Back off exponentially with jitter: delay = min(base * 2^attempt + random_jitter, max_delay).

  • Monitor queue age, not just depth. A queue with 100 messages might be fine if they arrived 2 seconds ago. It is a problem if the oldest message is 2 hours old. Set alarms on ApproximateAgeOfOldestMessage.

  • Keep messages small. SQS messages are limited to 256 KB. For larger payloads, store the data in S3 and send a reference in the message. The SQS Extended Client Library handles this pattern automatically.

References

Powered by Contentful