Llm Apis

Batch Processing with LLM APIs

High-volume batch processing patterns for LLM APIs with concurrency control, job queues, checkpointing, and cost optimization in Node.js.

Batch Processing with LLM APIs

When you need to process hundreds or thousands of prompts through an LLM, firing off individual API calls in a loop will get you rate-limited, bankrupt, or both. Batch processing is how you turn chaotic, expensive, one-at-a-time API calls into a controlled, cost-efficient pipeline with proper error handling, progress tracking, and the ability to resume after failures.

This article covers the full landscape of batch processing with LLM APIs: native batch endpoints from Anthropic and OpenAI, custom concurrency-controlled processors, Redis-backed job queues, checkpointing strategies, and the real-world patterns I use in production to process tens of thousands of LLM requests reliably.

Prerequisites

  • Node.js 18+ installed
  • Redis server running locally or accessible remotely (for queue-based examples)
  • API keys for Anthropic and/or OpenAI
  • Familiarity with Promises and async patterns in Node.js
  • Basic understanding of LLM API request/response structures
npm install @anthropic-ai/sdk openai bullmq ioredis node-cron uuid

When to Use Batch Processing vs Real-Time Calls

Not every LLM workload needs batch processing. Here is how I decide:

Use real-time calls when:

  • You need sub-second response times for user-facing features
  • Volume is under 50 requests per minute
  • Each request is independent and triggered by user action
  • Latency matters more than cost

Use batch processing when:

  • You have 100+ requests to process in a single run
  • Results are not needed immediately (minutes to hours is acceptable)
  • You want the 50% cost discount from native batch APIs
  • You need to process a dataset: classify documents, generate embeddings, summarize records
  • You want retry logic, progress tracking, and resumability
  • You are running scheduled jobs (nightly processing, weekly reports)

The crossover point in my experience is around 50-100 requests. Below that, a simple loop with rate limiting works fine. Above that, you need structure.

Anthropic's Message Batches API

Anthropic offers a native Message Batches API that processes requests asynchronously at a 50% discount. You submit a batch, poll for completion, and retrieve results. This is the cheapest way to process large volumes of Claude requests.

var Anthropic = require("@anthropic-ai/sdk");

var client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

function createAnthropicBatch(items) {
  var requests = items.map(function (item, index) {
    return {
      custom_id: "item-" + index,
      params: {
        model: "claude-sonnet-4-20250514",
        max_tokens: 1024,
        messages: [
          { role: "user", content: item.prompt }
        ]
      }
    };
  });

  return client.messages.batches.create({ requests: requests });
}

function pollBatchUntilDone(batchId, intervalMs) {
  intervalMs = intervalMs || 30000;

  return new Promise(function (resolve, reject) {
    var timer = setInterval(function () {
      client.messages.batches.retrieve(batchId)
        .then(function (batch) {
          console.log(
            "Batch %s: %s — %d/%d complete",
            batchId,
            batch.processing_status,
            batch.request_counts.succeeded + batch.request_counts.errored,
            batch.request_counts.processing + batch.request_counts.succeeded + batch.request_counts.errored
          );

          if (batch.processing_status === "ended") {
            clearInterval(timer);
            resolve(batch);
          }
        })
        .catch(function (err) {
          clearInterval(timer);
          reject(err);
        });
    }, intervalMs);
  });
}

function retrieveBatchResults(batchId) {
  var results = [];

  return client.messages.batches.results(batchId)
    .then(function (resultStream) {
      return new Promise(function (resolve, reject) {
        resultStream.on("data", function (entry) {
          results.push({
            id: entry.custom_id,
            type: entry.result.type,
            content: entry.result.type === "succeeded"
              ? entry.result.message.content[0].text
              : entry.result.error
          });
        });

        resultStream.on("end", function () {
          resolve(results);
        });

        resultStream.on("error", function (err) {
          reject(err);
        });
      });
    });
}

// Usage
var items = [
  { prompt: "Summarize the key principles of REST API design." },
  { prompt: "Explain database connection pooling in 3 sentences." },
  { prompt: "What are the trade-offs of microservices vs monoliths?" }
  // ... hundreds more
];

createAnthropicBatch(items)
  .then(function (batch) {
    console.log("Batch created: " + batch.id);
    return pollBatchUntilDone(batch.id);
  })
  .then(function (batch) {
    return retrieveBatchResults(batch.id);
  })
  .then(function (results) {
    console.log("Received %d results", results.length);
    var succeeded = results.filter(function (r) { return r.type === "succeeded"; });
    var failed = results.filter(function (r) { return r.type !== "succeeded"; });
    console.log("Succeeded: %d, Failed: %d", succeeded.length, failed.length);
  })
  .catch(function (err) {
    console.error("Batch processing failed:", err.message);
  });

Anthropic batches can hold up to 10,000 requests and results are available for 29 days. The 50% discount applies to all tokens in the batch. For large-scale content generation, classification, or extraction workloads, this is the most cost-effective option.

OpenAI's Batch API

OpenAI offers a similar batch endpoint. You upload a JSONL file, create a batch, and poll for results. Same 50% discount principle applies.

var OpenAI = require("openai");
var fs = require("fs");
var path = require("path");

var openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

function buildBatchFile(items, outputPath) {
  var lines = items.map(function (item, index) {
    return JSON.stringify({
      custom_id: "request-" + index,
      method: "POST",
      url: "/v1/chat/completions",
      body: {
        model: "gpt-4o-mini",
        messages: [
          { role: "system", content: "You are a technical writing assistant." },
          { role: "user", content: item.prompt }
        ],
        max_tokens: 1024
      }
    });
  });

  fs.writeFileSync(outputPath, lines.join("\n"));
  return outputPath;
}

function submitOpenAIBatch(filePath) {
  return openai.files.create({
    file: fs.createReadStream(filePath),
    purpose: "batch"
  }).then(function (file) {
    console.log("Uploaded file: " + file.id);
    return openai.batches.create({
      input_file_id: file.id,
      endpoint: "/v1/chat/completions",
      completion_window: "24h"
    });
  });
}

function pollOpenAIBatch(batchId) {
  return new Promise(function (resolve, reject) {
    var timer = setInterval(function () {
      openai.batches.retrieve(batchId)
        .then(function (batch) {
          console.log(
            "Batch %s: %s — completed: %d, failed: %d, total: %d",
            batchId,
            batch.status,
            batch.request_counts.completed,
            batch.request_counts.failed,
            batch.request_counts.total
          );

          if (batch.status === "completed" || batch.status === "failed" || batch.status === "expired") {
            clearInterval(timer);
            resolve(batch);
          }
        })
        .catch(function (err) {
          clearInterval(timer);
          reject(err);
        });
    }, 60000); // OpenAI batches can take hours, poll every minute
  });
}

// Usage
var batchFilePath = path.join(__dirname, "batch-input.jsonl");
buildBatchFile(items, batchFilePath);

submitOpenAIBatch(batchFilePath)
  .then(function (batch) {
    console.log("OpenAI batch created: " + batch.id);
    return pollOpenAIBatch(batch.id);
  })
  .then(function (batch) {
    if (batch.output_file_id) {
      return openai.files.content(batch.output_file_id);
    }
    throw new Error("Batch failed with status: " + batch.status);
  })
  .then(function (response) {
    var text = response.text();
    var results = text.trim().split("\n").map(function (line) {
      return JSON.parse(line);
    });
    console.log("Retrieved %d results from OpenAI batch", results.length);
  });

OpenAI batches have a 24-hour completion window and support up to 50,000 requests per batch. The JSONL file format is slightly more cumbersome than Anthropic's structured API, but it works well for very large volumes.

Custom Batch Processor with Concurrency Control

Native batch APIs are great when you can wait hours for results. But often you need results in minutes, not hours, and you still have hundreds of requests. This is where a custom concurrency-controlled processor comes in.

The key pattern is a semaphore-style pool that limits how many requests are in flight simultaneously. This respects rate limits while maximizing throughput.

var uuid = require("uuid");

function BatchProcessor(options) {
  this.concurrency = options.concurrency || 5;
  this.retryAttempts = options.retryAttempts || 3;
  this.retryDelay = options.retryDelay || 2000;
  this.onProgress = options.onProgress || function () {};
  this.onItemComplete = options.onItemComplete || function () {};

  this.results = [];
  this.errors = [];
  this.completed = 0;
  this.total = 0;
  this.batchId = uuid.v4();
}

BatchProcessor.prototype.process = function (items, processFn) {
  var self = this;
  self.total = items.length;
  self.completed = 0;
  self.results = new Array(items.length);
  self.errors = [];

  console.log("Starting batch %s: %d items, concurrency %d", self.batchId, self.total, self.concurrency);

  return self._runPool(items, processFn).then(function () {
    return {
      batchId: self.batchId,
      total: self.total,
      succeeded: self.total - self.errors.length,
      failed: self.errors.length,
      results: self.results,
      errors: self.errors
    };
  });
};

BatchProcessor.prototype._runPool = function (items, processFn) {
  var self = this;
  var queue = items.map(function (item, index) {
    return { item: item, index: index };
  });
  var activeCount = 0;
  var queueIndex = 0;

  return new Promise(function (resolve, reject) {
    function startNext() {
      while (activeCount < self.concurrency && queueIndex < queue.length) {
        var entry = queue[queueIndex++];
        activeCount++;
        self._processWithRetry(entry.item, entry.index, processFn)
          .then(function (result) {
            activeCount--;
            startNext();
          })
          .catch(function (err) {
            activeCount--;
            startNext();
          });
      }

      if (activeCount === 0 && queueIndex >= queue.length) {
        resolve();
      }
    }

    startNext();
  });
};

BatchProcessor.prototype._processWithRetry = function (item, index, processFn) {
  var self = this;
  var attempts = 0;

  function attempt() {
    attempts++;
    return processFn(item).then(function (result) {
      self.results[index] = { status: "success", data: result, index: index };
      self.completed++;
      self.onItemComplete(index, result);
      self.onProgress(self.completed, self.total, self.errors.length);
      return result;
    }).catch(function (err) {
      if (attempts < self.retryAttempts && self._isRetryable(err)) {
        var delay = self.retryDelay * Math.pow(2, attempts - 1);
        console.log("Retrying item %d (attempt %d/%d) in %dms: %s", index, attempts, self.retryAttempts, delay, err.message);
        return new Promise(function (resolve) {
          setTimeout(resolve, delay);
        }).then(attempt);
      }

      self.results[index] = { status: "error", error: err.message, index: index };
      self.errors.push({ index: index, error: err.message, attempts: attempts });
      self.completed++;
      self.onProgress(self.completed, self.total, self.errors.length);
      return null;
    });
  }

  return attempt();
};

BatchProcessor.prototype._isRetryable = function (err) {
  if (err.status === 429) return true;  // Rate limited
  if (err.status === 529) return true;  // Overloaded (Anthropic)
  if (err.status >= 500) return true;   // Server errors
  if (err.code === "ECONNRESET") return true;
  if (err.code === "ETIMEDOUT") return true;
  return false;
};

Usage with the Anthropic SDK:

var Anthropic = require("@anthropic-ai/sdk");
var client = new Anthropic();

var processor = new BatchProcessor({
  concurrency: 10,
  retryAttempts: 3,
  retryDelay: 3000,
  onProgress: function (completed, total, errors) {
    var pct = Math.round((completed / total) * 100);
    process.stdout.write("\rProgress: " + completed + "/" + total + " (" + pct + "%) — " + errors + " errors");
  }
});

var prompts = []; // Array of 500 prompt strings
for (var i = 0; i < 500; i++) {
  prompts.push("Classify the following text into one of these categories: tech, business, science, health. Text: " + documents[i]);
}

processor.process(prompts, function (prompt) {
  return client.messages.create({
    model: "claude-sonnet-4-20250514",
    max_tokens: 256,
    messages: [{ role: "user", content: prompt }]
  }).then(function (response) {
    return response.content[0].text;
  });
}).then(function (report) {
  console.log("\nBatch complete: %d succeeded, %d failed", report.succeeded, report.failed);
});

A concurrency of 10 with 3 retries handles most API rate limits comfortably. For Anthropic's API, the default rate limit for Claude Sonnet is typically 50 requests per minute on lower-tier plans, so adjust concurrency accordingly.

Job Queues with BullMQ and Redis

For production workloads, you want durability. If your process crashes halfway through 1,000 requests, you do not want to start over. BullMQ provides persistent job queues backed by Redis with automatic retry, progress tracking, and worker management.

var { Queue, Worker, QueueEvents } = require("bullmq");
var IORedis = require("ioredis");
var Anthropic = require("@anthropic-ai/sdk");

var connection = new IORedis({
  host: process.env.REDIS_HOST || "127.0.0.1",
  port: process.env.REDIS_PORT || 6379,
  maxRetriesPerRequest: null
});

// Create the queue
var llmQueue = new Queue("llm-batch", {
  connection: connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: "exponential",
      delay: 5000
    },
    removeOnComplete: { count: 1000 },
    removeOnFail: { count: 5000 }
  }
});

// Add jobs to the queue
function enqueueBatch(batchId, items) {
  var jobs = items.map(function (item, index) {
    return {
      name: "llm-request",
      data: {
        batchId: batchId,
        index: index,
        prompt: item.prompt,
        model: item.model || "claude-sonnet-4-20250514",
        maxTokens: item.maxTokens || 1024
      },
      opts: {
        jobId: batchId + "-" + index,
        priority: item.priority || 0
      }
    };
  });

  return llmQueue.addBulk(jobs).then(function (added) {
    console.log("Enqueued %d jobs for batch %s", added.length, batchId);
    return batchId;
  });
}

// Worker that processes jobs
var client = new Anthropic();

var worker = new Worker("llm-batch", function (job) {
  var data = job.data;

  return client.messages.create({
    model: data.model,
    max_tokens: data.maxTokens,
    messages: [{ role: "user", content: data.prompt }]
  }).then(function (response) {
    var result = {
      batchId: data.batchId,
      index: data.index,
      text: response.content[0].text,
      usage: response.usage,
      model: response.model
    };

    job.updateProgress(100);
    return result;
  });
}, {
  connection: connection,
  concurrency: 8,
  limiter: {
    max: 40,
    duration: 60000  // 40 jobs per minute
  }
});

worker.on("completed", function (job, result) {
  console.log("Job %s completed (batch %s, index %d)", job.id, result.batchId, result.index);
});

worker.on("failed", function (job, err) {
  console.error("Job %s failed after %d attempts: %s", job.id, job.attemptsMade, err.message);
});

// Track batch progress
var queueEvents = new QueueEvents("llm-batch", { connection: connection });

function watchBatch(batchId, totalJobs) {
  var completed = 0;
  var failed = 0;

  return new Promise(function (resolve) {
    queueEvents.on("completed", function (event) {
      if (event.jobId.startsWith(batchId)) {
        completed++;
        if (completed + failed >= totalJobs) {
          resolve({ completed: completed, failed: failed });
        }
      }
    });

    queueEvents.on("failed", function (event) {
      if (event.jobId.startsWith(batchId)) {
        failed++;
        if (completed + failed >= totalJobs) {
          resolve({ completed: completed, failed: failed });
        }
      }
    });
  });
}

The BullMQ approach gives you persistence across restarts, built-in rate limiting via the limiter option, automatic exponential backoff on failures, and the ability to add priority to urgent jobs. In production, I run this with a separate worker process that can be scaled horizontally.

Handling Partial Failures in Batches

Partial failures are the norm with LLM batch processing. Network blips, rate limits, model timeouts, and content policy rejections will cause some percentage of requests to fail. Your system needs to handle this gracefully.

function processBatchWithPartialFailureHandling(items, processFn, options) {
  var maxRetryRounds = options.maxRetryRounds || 3;
  var allResults = new Array(items.length);
  var pendingIndices = items.map(function (_, i) { return i; });

  function runRound(round) {
    if (round > maxRetryRounds || pendingIndices.length === 0) {
      return Promise.resolve(allResults);
    }

    console.log("Round %d: processing %d items", round, pendingIndices.length);

    var promises = pendingIndices.map(function (idx) {
      return processFn(items[idx])
        .then(function (result) {
          return { index: idx, status: "success", data: result };
        })
        .catch(function (err) {
          return { index: idx, status: "error", error: err.message, retryable: isRetryable(err) };
        });
    });

    return Promise.allSettled(promises).then(function (settled) {
      var nextPending = [];

      settled.forEach(function (outcome) {
        var result = outcome.value;
        if (result.status === "success") {
          allResults[result.index] = result;
        } else if (result.retryable && round < maxRetryRounds) {
          nextPending.push(result.index);
        } else {
          allResults[result.index] = result; // Final failure
        }
      });

      pendingIndices = nextPending;

      if (nextPending.length > 0) {
        var delay = 5000 * Math.pow(2, round - 1);
        console.log("Waiting %dms before retry round %d (%d items)", delay, round + 1, nextPending.length);
        return new Promise(function (resolve) {
          setTimeout(resolve, delay);
        }).then(function () {
          return runRound(round + 1);
        });
      }

      return allResults;
    });
  }

  return runRound(1);
}

function isRetryable(err) {
  var retryableStatuses = [429, 500, 502, 503, 529];
  return retryableStatuses.indexOf(err.status) !== -1 ||
         err.code === "ECONNRESET" ||
         err.code === "ETIMEDOUT";
}

This pattern runs multiple "rounds" of processing, collecting successes and retrying only the failures. After the final round, any remaining failures are recorded permanently. I prefer this over per-item retry because it naturally groups retries together, which helps when the issue is a temporary rate limit or service degradation.

Checkpointing for Resumable Batch Jobs

For truly large batches (thousands of items), you need checkpointing. If your process crashes at item 847 of 2,000, you want to resume from item 848, not start over.

var fs = require("fs");
var path = require("path");

function CheckpointManager(checkpointDir) {
  this.checkpointDir = checkpointDir;
  if (!fs.existsSync(checkpointDir)) {
    fs.mkdirSync(checkpointDir, { recursive: true });
  }
}

CheckpointManager.prototype.getCheckpointPath = function (batchId) {
  return path.join(this.checkpointDir, batchId + ".json");
};

CheckpointManager.prototype.load = function (batchId) {
  var cpPath = this.getCheckpointPath(batchId);
  if (fs.existsSync(cpPath)) {
    var data = JSON.parse(fs.readFileSync(cpPath, "utf-8"));
    console.log("Loaded checkpoint for batch %s: %d/%d completed", batchId, data.completedCount, data.totalItems);
    return data;
  }
  return null;
};

CheckpointManager.prototype.save = function (batchId, state) {
  var cpPath = this.getCheckpointPath(batchId);
  var data = JSON.stringify(state, null, 2);
  // Write to temp file first, then rename for atomicity
  var tmpPath = cpPath + ".tmp";
  fs.writeFileSync(tmpPath, data);
  fs.renameSync(tmpPath, cpPath);
};

CheckpointManager.prototype.delete = function (batchId) {
  var cpPath = this.getCheckpointPath(batchId);
  if (fs.existsSync(cpPath)) {
    fs.unlinkSync(cpPath);
  }
};

function ResumableBatchProcessor(options) {
  this.concurrency = options.concurrency || 5;
  this.checkpointInterval = options.checkpointInterval || 10; // Save every N completions
  this.checkpointMgr = new CheckpointManager(options.checkpointDir || "./checkpoints");
  this.retryAttempts = options.retryAttempts || 3;
}

ResumableBatchProcessor.prototype.process = function (batchId, items, processFn) {
  var self = this;
  var checkpoint = self.checkpointMgr.load(batchId);
  var results = checkpoint ? checkpoint.results : new Array(items.length);
  var completedSet = {};
  var completedCount = 0;

  if (checkpoint) {
    checkpoint.results.forEach(function (r, i) {
      if (r && r.status === "success") {
        completedSet[i] = true;
        completedCount++;
      }
    });
    console.log("Resuming batch %s from checkpoint: %d already done", batchId, completedCount);
  }

  var pending = [];
  for (var i = 0; i < items.length; i++) {
    if (!completedSet[i]) {
      pending.push({ item: items[i], index: i });
    }
  }

  console.log("Processing %d remaining items (of %d total)", pending.length, items.length);

  var queueIndex = 0;
  var activeCount = 0;
  var sinceLastCheckpoint = 0;

  return new Promise(function (resolve) {
    function maybeCheckpoint() {
      sinceLastCheckpoint++;
      if (sinceLastCheckpoint >= self.checkpointInterval) {
        sinceLastCheckpoint = 0;
        self.checkpointMgr.save(batchId, {
          totalItems: items.length,
          completedCount: completedCount,
          results: results
        });
        console.log("Checkpoint saved: %d/%d", completedCount, items.length);
      }
    }

    function startNext() {
      while (activeCount < self.concurrency && queueIndex < pending.length) {
        var entry = pending[queueIndex++];
        activeCount++;

        (function (entry) {
          self._processWithRetry(entry.item, processFn).then(function (result) {
            results[entry.index] = { status: "success", data: result };
            completedCount++;
            activeCount--;
            maybeCheckpoint();
            startNext();
          }).catch(function (err) {
            results[entry.index] = { status: "error", error: err.message };
            completedCount++;
            activeCount--;
            maybeCheckpoint();
            startNext();
          });
        })(entry);
      }

      if (activeCount === 0 && queueIndex >= pending.length) {
        // Final checkpoint
        self.checkpointMgr.save(batchId, {
          totalItems: items.length,
          completedCount: completedCount,
          results: results
        });
        resolve({
          batchId: batchId,
          total: items.length,
          results: results
        });
      }
    }

    startNext();
  });
};

ResumableBatchProcessor.prototype._processWithRetry = function (item, processFn) {
  var self = this;
  var attempts = 0;

  function attempt() {
    attempts++;
    return processFn(item).catch(function (err) {
      if (attempts < self.retryAttempts && (err.status === 429 || err.status >= 500)) {
        var delay = 2000 * Math.pow(2, attempts - 1);
        return new Promise(function (resolve) {
          setTimeout(resolve, delay);
        }).then(attempt);
      }
      throw err;
    });
  }

  return attempt();
};

The checkpoint file is written atomically (write to temp, then rename) so a crash during the write does not corrupt the checkpoint. The checkpointInterval controls how often we persist — every 10 completions is a good balance between durability and I/O overhead.

Cost Optimization with Batch APIs

The economics of batch processing with LLMs are significant. Both Anthropic and OpenAI offer 50% discounts on batch API calls compared to real-time pricing.

Here is a cost comparison for processing 10,000 classification requests, each using approximately 500 input tokens and 100 output tokens:

Approach Input Cost Output Cost Total
Claude Sonnet (real-time) $15.00 $7.50 $22.50
Claude Sonnet (batch) $7.50 $3.75 $11.25
Claude Haiku (real-time) $4.00 $6.25 $10.25
Claude Haiku (batch) $2.00 $3.125 $5.125
GPT-4o-mini (real-time) $0.75 $3.00 $3.75
GPT-4o-mini (batch) $0.375 $1.50 $1.875

For workloads that can tolerate 24-hour turnaround, the batch discount is enormous. A nightly classification job that costs $22.50 per run in real-time drops to $11.25 with batch. Over a month of daily runs, that is $337.50 saved.

Other cost optimization strategies:

  • Right-size your model. Use Haiku for classification and extraction. Use Sonnet for generation and reasoning. Do not use Opus unless you need its full capabilities.
  • Minimize output tokens. For classification, instruct the model to return only the category name, not an explanation. Set max_tokens to the minimum viable value.
  • Cache system prompts. If all requests in a batch share the same system prompt, enable prompt caching to reduce input token costs.
  • Filter before sending. Pre-filter items that do not need LLM processing. A simple regex or keyword check can eliminate 20-30% of items before they hit the API.

Output Collection and Aggregation Patterns

Once a batch completes, you need to collect and structure the results. Here are patterns I use frequently.

function aggregateBatchResults(results) {
  var summary = {
    total: results.length,
    succeeded: 0,
    failed: 0,
    byCategory: {},
    tokenUsage: { input: 0, output: 0 },
    avgLatency: 0,
    results: []
  };

  var totalLatency = 0;

  results.forEach(function (r) {
    if (r.status === "success") {
      summary.succeeded++;
      summary.results.push(r.data);

      if (r.data.usage) {
        summary.tokenUsage.input += r.data.usage.input_tokens || 0;
        summary.tokenUsage.output += r.data.usage.output_tokens || 0;
      }

      if (r.data.category) {
        summary.byCategory[r.data.category] = (summary.byCategory[r.data.category] || 0) + 1;
      }

      if (r.data.latencyMs) {
        totalLatency += r.data.latencyMs;
      }
    } else {
      summary.failed++;
    }
  });

  summary.avgLatency = summary.succeeded > 0 ? Math.round(totalLatency / summary.succeeded) : 0;

  return summary;
}

// Write results to JSONL for downstream processing
function writeResultsToJSONL(results, outputPath) {
  var fs = require("fs");
  var stream = fs.createWriteStream(outputPath);

  results.forEach(function (r) {
    if (r.status === "success") {
      stream.write(JSON.stringify(r.data) + "\n");
    }
  });

  stream.end();
  console.log("Wrote %d results to %s", results.length, outputPath);
}

// Batch results into structured CSV
function writeResultsToCSV(results, outputPath, columns) {
  var fs = require("fs");
  var header = columns.join(",") + "\n";
  var lines = results
    .filter(function (r) { return r.status === "success"; })
    .map(function (r) {
      return columns.map(function (col) {
        var val = r.data[col] || "";
        // Escape commas and quotes in CSV
        if (typeof val === "string" && (val.indexOf(",") !== -1 || val.indexOf('"') !== -1)) {
          val = '"' + val.replace(/"/g, '""') + '"';
        }
        return val;
      }).join(",");
    });

  fs.writeFileSync(outputPath, header + lines.join("\n"));
  console.log("Wrote %d rows to %s", lines.length, outputPath);
}

Scheduling Batch Jobs with node-cron

Batch LLM jobs often run on a schedule — nightly content classification, weekly report generation, daily data enrichment. Use node-cron to trigger these reliably.

var cron = require("node-cron");
var { Queue } = require("bullmq");
var IORedis = require("ioredis");

var connection = new IORedis({ host: "127.0.0.1", port: 6379, maxRetriesPerRequest: null });
var llmQueue = new Queue("llm-batch", { connection: connection });

// Run nightly at 2 AM UTC
cron.schedule("0 2 * * *", function () {
  console.log("[%s] Starting nightly classification batch", new Date().toISOString());

  fetchUnclassifiedDocuments()
    .then(function (docs) {
      if (docs.length === 0) {
        console.log("No unclassified documents found");
        return;
      }

      var batchId = "nightly-" + new Date().toISOString().slice(0, 10);
      var jobs = docs.map(function (doc, i) {
        return {
          name: "classify",
          data: {
            batchId: batchId,
            documentId: doc.id,
            prompt: "Classify this document: " + doc.text.slice(0, 2000)
          },
          opts: { jobId: batchId + "-" + i }
        };
      });

      return llmQueue.addBulk(jobs).then(function () {
        console.log("Enqueued %d classification jobs as batch %s", jobs.length, batchId);
      });
    })
    .catch(function (err) {
      console.error("Nightly batch failed to start:", err.message);
    });
}, {
  timezone: "UTC"
});

function fetchUnclassifiedDocuments() {
  // Your database query here
  return Promise.resolve([]);
}

Monitoring Batch Throughput and Error Rates

In production, you need visibility into how your batches are performing. Track throughput, error rates, and cost per batch.

function BatchMonitor(options) {
  this.metrics = {
    batchesStarted: 0,
    batchesCompleted: 0,
    itemsProcessed: 0,
    itemsFailed: 0,
    totalInputTokens: 0,
    totalOutputTokens: 0,
    totalLatencyMs: 0,
    errors: {}
  };
  this.logInterval = options.logInterval || 30000;
  this.timer = null;
}

BatchMonitor.prototype.start = function () {
  var self = this;
  self.startTime = Date.now();

  self.timer = setInterval(function () {
    self.printStats();
  }, self.logInterval);
};

BatchMonitor.prototype.stop = function () {
  if (this.timer) {
    clearInterval(this.timer);
    this.timer = null;
  }
  this.printStats();
};

BatchMonitor.prototype.recordSuccess = function (usage, latencyMs) {
  this.metrics.itemsProcessed++;
  this.metrics.totalInputTokens += (usage.input_tokens || 0);
  this.metrics.totalOutputTokens += (usage.output_tokens || 0);
  this.metrics.totalLatencyMs += (latencyMs || 0);
};

BatchMonitor.prototype.recordFailure = function (errorType) {
  this.metrics.itemsFailed++;
  this.metrics.errors[errorType] = (this.metrics.errors[errorType] || 0) + 1;
};

BatchMonitor.prototype.printStats = function () {
  var elapsed = (Date.now() - this.startTime) / 1000;
  var m = this.metrics;
  var totalItems = m.itemsProcessed + m.itemsFailed;
  var throughput = totalItems > 0 ? (totalItems / elapsed).toFixed(2) : 0;
  var errorRate = totalItems > 0 ? ((m.itemsFailed / totalItems) * 100).toFixed(1) : 0;
  var avgLatency = m.itemsProcessed > 0 ? Math.round(m.totalLatencyMs / m.itemsProcessed) : 0;

  console.log("\n--- Batch Monitor ---");
  console.log("Elapsed: %ds | Throughput: %s items/sec", Math.round(elapsed), throughput);
  console.log("Processed: %d | Failed: %d | Error rate: %s%%", m.itemsProcessed, m.itemsFailed, errorRate);
  console.log("Tokens — Input: %d | Output: %d", m.totalInputTokens, m.totalOutputTokens);
  console.log("Avg latency: %dms", avgLatency);

  if (Object.keys(m.errors).length > 0) {
    console.log("Error breakdown:", JSON.stringify(m.errors));
  }
  console.log("--------------------\n");
};

Integrate this monitor into your worker:

var monitor = new BatchMonitor({ logInterval: 15000 });
monitor.start();

worker.on("completed", function (job, result) {
  monitor.recordSuccess(result.usage || {}, result.latencyMs || 0);
});

worker.on("failed", function (job, err) {
  var errorType = err.status ? "http_" + err.status : (err.code || "unknown");
  monitor.recordFailure(errorType);
});

// When shutting down
process.on("SIGTERM", function () {
  monitor.stop();
  worker.close();
});

Complete Working Example

Here is a full, production-ready batch processing system that ties everything together: a Node.js application that processes hundreds of LLM requests using BullMQ with concurrency control, progress tracking, error handling, and file-based checkpointing.

// batch-system.js — Complete LLM batch processing system
var { Queue, Worker, QueueEvents } = require("bullmq");
var IORedis = require("ioredis");
var Anthropic = require("@anthropic-ai/sdk");
var fs = require("fs");
var path = require("path");
var uuid = require("uuid");

// ---- Configuration ----
var CONFIG = {
  redis: {
    host: process.env.REDIS_HOST || "127.0.0.1",
    port: parseInt(process.env.REDIS_PORT) || 6379,
    maxRetriesPerRequest: null
  },
  queue: {
    name: "llm-batch-processor",
    concurrency: 8,
    limiter: { max: 40, duration: 60000 },
    retryAttempts: 3,
    retryBackoff: 5000
  },
  checkpoint: {
    dir: path.join(__dirname, "checkpoints"),
    interval: 25
  },
  output: {
    dir: path.join(__dirname, "output")
  }
};

// ---- Redis connection ----
var connection = new IORedis(CONFIG.redis);

// ---- Ensure directories ----
[CONFIG.checkpoint.dir, CONFIG.output.dir].forEach(function (dir) {
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir, { recursive: true });
  }
});

// ---- Anthropic client ----
var anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

// ---- Queue setup ----
var queue = new Queue(CONFIG.queue.name, {
  connection: connection,
  defaultJobOptions: {
    attempts: CONFIG.queue.retryAttempts,
    backoff: { type: "exponential", delay: CONFIG.queue.retryBackoff },
    removeOnComplete: { count: 500 },
    removeOnFail: { count: 2000 }
  }
});

// ---- Checkpoint manager ----
function loadCheckpoint(batchId) {
  var cpPath = path.join(CONFIG.checkpoint.dir, batchId + ".json");
  if (fs.existsSync(cpPath)) {
    return JSON.parse(fs.readFileSync(cpPath, "utf-8"));
  }
  return null;
}

function saveCheckpoint(batchId, data) {
  var cpPath = path.join(CONFIG.checkpoint.dir, batchId + ".json");
  var tmpPath = cpPath + ".tmp";
  fs.writeFileSync(tmpPath, JSON.stringify(data, null, 2));
  fs.renameSync(tmpPath, cpPath);
}

// ---- Submit a batch ----
function submitBatch(batchId, items, options) {
  options = options || {};
  var model = options.model || "claude-sonnet-4-20250514";
  var maxTokens = options.maxTokens || 512;
  var systemPrompt = options.systemPrompt || "";

  // Check for existing checkpoint
  var checkpoint = loadCheckpoint(batchId);
  var completedSet = {};

  if (checkpoint) {
    console.log("Found checkpoint: %d/%d already completed", checkpoint.completedIds.length, checkpoint.totalItems);
    checkpoint.completedIds.forEach(function (id) {
      completedSet[id] = true;
    });
  }

  // Filter out already-completed items
  var toEnqueue = [];
  items.forEach(function (item, index) {
    var jobId = batchId + ":" + index;
    if (!completedSet[jobId]) {
      toEnqueue.push({
        name: "llm-request",
        data: {
          batchId: batchId,
          jobId: jobId,
          index: index,
          prompt: item.prompt,
          metadata: item.metadata || {},
          model: model,
          maxTokens: maxTokens,
          systemPrompt: systemPrompt
        },
        opts: {
          jobId: jobId,
          priority: item.priority || 0
        }
      });
    }
  });

  if (toEnqueue.length === 0) {
    console.log("All items already completed for batch %s", batchId);
    return Promise.resolve({ batchId: batchId, enqueuedCount: 0 });
  }

  // Initialize checkpoint if new batch
  if (!checkpoint) {
    saveCheckpoint(batchId, {
      batchId: batchId,
      totalItems: items.length,
      completedIds: [],
      results: {},
      errors: {},
      startTime: new Date().toISOString()
    });
  }

  return queue.addBulk(toEnqueue).then(function (added) {
    console.log("Enqueued %d jobs for batch %s (%d skipped from checkpoint)",
      added.length, batchId, items.length - toEnqueue.length);
    return { batchId: batchId, enqueuedCount: added.length, total: items.length };
  });
}

// ---- Worker ----
var worker = new Worker(CONFIG.queue.name, function (job) {
  var data = job.data;
  var startTime = Date.now();

  var messages = [{ role: "user", content: data.prompt }];
  var params = {
    model: data.model,
    max_tokens: data.maxTokens,
    messages: messages
  };

  if (data.systemPrompt) {
    params.system = data.systemPrompt;
  }

  return anthropic.messages.create(params).then(function (response) {
    var latencyMs = Date.now() - startTime;

    return {
      batchId: data.batchId,
      jobId: data.jobId,
      index: data.index,
      text: response.content[0].text,
      metadata: data.metadata,
      usage: response.usage,
      model: response.model,
      latencyMs: latencyMs,
      completedAt: new Date().toISOString()
    };
  });
}, {
  connection: connection,
  concurrency: CONFIG.queue.concurrency,
  limiter: CONFIG.queue.limiter
});

// ---- Progress tracking ----
var batchProgress = {};

worker.on("completed", function (job, result) {
  var batchId = result.batchId;

  if (!batchProgress[batchId]) {
    batchProgress[batchId] = { completed: 0, failed: 0, sinceCheckpoint: 0 };
  }

  batchProgress[batchId].completed++;
  batchProgress[batchId].sinceCheckpoint++;

  // Periodic checkpoint
  if (batchProgress[batchId].sinceCheckpoint >= CONFIG.checkpoint.interval) {
    batchProgress[batchId].sinceCheckpoint = 0;

    var cp = loadCheckpoint(batchId);
    if (cp) {
      cp.completedIds.push(result.jobId);
      cp.results[result.jobId] = {
        index: result.index,
        text: result.text,
        usage: result.usage,
        latencyMs: result.latencyMs
      };
      saveCheckpoint(batchId, cp);
      console.log("[Checkpoint] Batch %s: %d completed", batchId, cp.completedIds.length);
    }
  }
});

worker.on("failed", function (job, err) {
  var batchId = job.data.batchId;

  if (!batchProgress[batchId]) {
    batchProgress[batchId] = { completed: 0, failed: 0, sinceCheckpoint: 0 };
  }

  batchProgress[batchId].failed++;
  console.error("[Failed] Job %s (attempt %d): %s", job.id, job.attemptsMade, err.message);

  // Record failure in checkpoint
  var cp = loadCheckpoint(batchId);
  if (cp) {
    cp.errors[job.data.jobId] = {
      index: job.data.index,
      error: err.message,
      attempts: job.attemptsMade
    };
    saveCheckpoint(batchId, cp);
  }
});

// ---- Collect results ----
function collectResults(batchId) {
  var cp = loadCheckpoint(batchId);
  if (!cp) {
    return Promise.reject(new Error("No checkpoint found for batch " + batchId));
  }

  var results = Object.keys(cp.results).map(function (key) {
    return cp.results[key];
  });

  var errors = Object.keys(cp.errors).map(function (key) {
    return cp.errors[key];
  });

  var summary = {
    batchId: batchId,
    totalItems: cp.totalItems,
    succeeded: results.length,
    failed: errors.length,
    remaining: cp.totalItems - results.length - errors.length,
    totalInputTokens: 0,
    totalOutputTokens: 0,
    avgLatencyMs: 0,
    results: results,
    errors: errors
  };

  var totalLatency = 0;
  results.forEach(function (r) {
    if (r.usage) {
      summary.totalInputTokens += r.usage.input_tokens || 0;
      summary.totalOutputTokens += r.usage.output_tokens || 0;
    }
    totalLatency += r.latencyMs || 0;
  });

  summary.avgLatencyMs = results.length > 0 ? Math.round(totalLatency / results.length) : 0;

  // Write results to output file
  var outputPath = path.join(CONFIG.output.dir, batchId + "-results.jsonl");
  var stream = fs.createWriteStream(outputPath);
  results.forEach(function (r) {
    stream.write(JSON.stringify(r) + "\n");
  });
  stream.end();

  console.log("\n=== Batch %s Summary ===", batchId);
  console.log("Succeeded: %d | Failed: %d | Remaining: %d", summary.succeeded, summary.failed, summary.remaining);
  console.log("Input tokens: %d | Output tokens: %d", summary.totalInputTokens, summary.totalOutputTokens);
  console.log("Avg latency: %dms", summary.avgLatencyMs);
  console.log("Results written to: %s", outputPath);

  return Promise.resolve(summary);
}

// ---- CLI interface ----
var command = process.argv[2];
var batchId = process.argv[3];

if (command === "submit") {
  // Example: node batch-system.js submit my-batch-001
  var inputFile = process.argv[4] || "items.json";
  var items = JSON.parse(fs.readFileSync(inputFile, "utf-8"));

  submitBatch(batchId || uuid.v4(), items, {
    model: "claude-sonnet-4-20250514",
    maxTokens: 512,
    systemPrompt: "You are a helpful technical assistant. Be concise."
  }).then(function (result) {
    console.log("Batch submitted:", result);
    console.log("Worker is processing... Press Ctrl+C to stop (progress is checkpointed).");
  });
} else if (command === "results") {
  // Example: node batch-system.js results my-batch-001
  collectResults(batchId).then(function (summary) {
    process.exit(0);
  });
} else if (command === "worker") {
  // Example: node batch-system.js worker
  console.log("Worker started. Concurrency: %d, Rate limit: %d/min",
    CONFIG.queue.concurrency, CONFIG.queue.limiter.max);
  console.log("Waiting for jobs...");
} else {
  console.log("Usage:");
  console.log("  node batch-system.js submit <batch-id> [items.json]");
  console.log("  node batch-system.js worker");
  console.log("  node batch-system.js results <batch-id>");
}

// ---- Graceful shutdown ----
process.on("SIGTERM", function () {
  console.log("Shutting down gracefully...");
  worker.close().then(function () {
    connection.disconnect();
    process.exit(0);
  });
});

process.on("SIGINT", function () {
  console.log("Interrupted. Saving final checkpoint...");
  Object.keys(batchProgress).forEach(function (bid) {
    var cp = loadCheckpoint(bid);
    if (cp) {
      saveCheckpoint(bid, cp);
    }
  });
  worker.close().then(function () {
    connection.disconnect();
    process.exit(0);
  });
});

module.exports = {
  submitBatch: submitBatch,
  collectResults: collectResults,
  queue: queue,
  worker: worker
};

The input file (items.json) should look like this:

[
  { "prompt": "Classify this document as tech, business, or science: Machine learning models are trained on...", "metadata": { "docId": "doc-001" } },
  { "prompt": "Classify this document as tech, business, or science: Q3 revenue exceeded expectations...", "metadata": { "docId": "doc-002" } },
  { "prompt": "Classify this document as tech, business, or science: The double-slit experiment demonstrates...", "metadata": { "docId": "doc-003" } }
]

Run it:

# Terminal 1: Start the worker
node batch-system.js worker

# Terminal 2: Submit a batch
node batch-system.js submit classification-2026-02 items.json

# Later: Collect results
node batch-system.js results classification-2026-02

Expected output:

Enqueued 500 jobs for batch classification-2026-02 (0 skipped from checkpoint)
Batch submitted: { batchId: 'classification-2026-02', enqueuedCount: 500, total: 500 }
Worker is processing... Press Ctrl+C to stop (progress is checkpointed).

[Checkpoint] Batch classification-2026-02: 25 completed
[Checkpoint] Batch classification-2026-02: 50 completed
[Checkpoint] Batch classification-2026-02: 75 completed
...
[Checkpoint] Batch classification-2026-02: 500 completed

=== Batch classification-2026-02 Summary ===
Succeeded: 497 | Failed: 3 | Remaining: 0
Input tokens: 284200 | Output tokens: 14850
Avg latency: 842ms
Results written to: output/classification-2026-02-results.jsonl

If the process crashes at item 300 and you restart it:

Found checkpoint: 300/500 already completed
Enqueued 200 jobs for batch classification-2026-02 (300 skipped from checkpoint)

That is the power of checkpointing. No wasted API calls, no duplicated work.

Common Issues & Troubleshooting

1. Rate Limit Exceeded (HTTP 429)

Error: 429 Too Many Requests
{"error":{"type":"rate_limit_error","message":"Number of request tokens has exceeded your per-minute rate limit"}}

This is the most common batch processing error. Your concurrency is too high for your API tier. Solutions:

  • Reduce the concurrency setting in your worker (try 4-6 for lower-tier plans)
  • Add a limiter to BullMQ: { max: 20, duration: 60000 } for 20 requests per minute
  • Implement exponential backoff in your retry logic (already handled in the examples above)
  • Request a rate limit increase from the API provider if your workload justifies it

2. Redis Connection Lost Mid-Batch

Error: connect ECONNREFUSED 127.0.0.1:6379
[ioredis] Unhandled error event: Error: connect ECONNREFUSED 127.0.0.1:6379

If Redis goes down, BullMQ workers will stop processing but will not lose jobs — they are persisted in Redis. When Redis comes back, the worker reconnects and resumes automatically. However, if Redis data is lost entirely, you lose the queue state. Mitigations:

  • Use Redis persistence (appendonly yes in redis.conf)
  • Configure maxRetriesPerRequest: null in your IORedis options so the client retries indefinitely
  • File-based checkpoints serve as a backup — even if Redis state is lost, you can resubmit only the incomplete items

3. Memory Exhaustion on Large Batches

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory

When processing thousands of results and holding them all in memory, you will hit the V8 heap limit. Solutions:

  • Stream results to disk instead of accumulating in an array: use fs.createWriteStream and write each result as it completes
  • Increase Node.js heap: node --max-old-space-size=4096 batch-system.js worker
  • Use removeOnComplete: { count: 100 } in BullMQ to limit how many completed jobs are kept in Redis memory
  • Process batches in chunks of 500-1000 items instead of submitting 10,000 at once

4. Checkpoint File Corruption

SyntaxError: Unexpected end of JSON input
    at JSON.parse (<anonymous>)
    at loadCheckpoint (batch-system.js:84:15)

This happens when the process crashes during a checkpoint write. The atomic write pattern (write to .tmp, then rename) prevents this in most cases. If you still encounter it:

  • Delete the corrupted checkpoint file and resubmit the batch (completed items will be re-processed, which costs money but is safe)
  • Add a validation step in loadCheckpoint that catches the parse error and returns null
  • Consider using SQLite instead of JSON files for checkpoint storage on very large batches

5. Anthropic Overloaded Error (HTTP 529)

Error: 529 Overloaded
{"error":{"type":"overloaded_error","message":"Anthropic's API is temporarily overloaded"}}

This is distinct from rate limiting — it means the service itself is under heavy load. The 529 status code is specific to Anthropic. Your retry logic should treat this as retryable with a longer backoff (10-30 seconds). If you see this consistently, switch to the native batch API, which handles queuing server-side.

Best Practices

  • Always implement checkpointing for batches over 100 items. API calls cost money. Reprocessing 500 items because of a crash at item 499 is unacceptable in production. File-based checkpoints add minimal overhead and save you from disaster.

  • Use native batch APIs for workloads that can tolerate latency. The 50% cost savings on both Anthropic and OpenAI batch endpoints adds up fast. A nightly classification job that takes 2 hours instead of 20 minutes is a worthwhile trade-off when it cuts your bill in half.

  • Set concurrency based on your API tier, not your hardware. Your machine can handle 100 concurrent HTTP requests easily. Your API rate limit cannot. Start with concurrency of 5, measure your throughput, and increase gradually. The BullMQ limiter option is your best friend here.

  • Separate your worker process from your submission process. Submit jobs from your application, process them in a dedicated worker. This lets you scale workers independently, restart workers without losing queue state, and run multiple workers for higher throughput.

  • Track token usage per batch and set budget alerts. A bug that generates 10x the expected output tokens will blow through your budget before you notice. Log usage.input_tokens and usage.output_tokens from every response and compare against your expected per-item budget.

  • Use Promise.allSettled instead of Promise.all for parallel processing. Promise.all rejects on the first failure, losing all other results. Promise.allSettled waits for everything and gives you both successes and failures. For batch processing, you never want one bad item to spoil the entire batch.

  • Pre-filter items before they hit the LLM. Simple rules can eliminate 20-30% of unnecessary API calls. Check for empty inputs, duplicate content, items already processed, or items that can be handled by regex. Every filtered item is money saved.

  • Log batch metadata for post-mortem analysis. Record the batch ID, start time, item count, model used, total tokens, error count, and wall-clock duration. When something goes wrong at 2 AM, these logs are the first thing you will reach for.

  • Design for idempotency. If the same item is processed twice, the result should be the same and nothing should break. Use the custom_id or jobId to deduplicate and never assume a batch ran exactly once.

References

Powered by Contentful