LLM API Webhooks and Async Processing
Build async LLM processing systems with job queues, webhook delivery, status polling, and WebSocket notifications in Node.js.
LLM API Webhooks and Async Processing
Large language model API calls are slow. A single completion request can take anywhere from two to sixty seconds depending on the model, the prompt length, and the output size. If you are building any serious application on top of LLM APIs, you cannot afford to hold an HTTP connection open for that long, block your event loop, or leave your users staring at a spinner with no feedback. You need async processing, and you need it done right.
This article walks through the full architecture for asynchronous LLM job processing in Node.js: job queues, webhook delivery, status polling, WebSocket notifications, dead letter queues, priority tiers, and everything else you need to run LLM workloads at scale.
Prerequisites
- Node.js v18 or later
- Redis (local or hosted, required for Bull/BullMQ queues)
- Working familiarity with Express.js
- An LLM API key (OpenAI, Anthropic, or similar)
- Basic understanding of message queues and pub/sub patterns
Install the core dependencies:
npm install express bull ioredis axios ws uuid crypto
Why Async Processing Matters for LLM Workloads
Traditional web APIs respond in milliseconds. A database query takes 5-50ms. A cache hit takes 1-5ms. An LLM API call takes 2,000-60,000ms. That is not a typo. You are dealing with response times that are three to four orders of magnitude slower than what most backend engineers are accustomed to.
This creates several problems:
- Connection timeouts. Load balancers, reverse proxies, and clients all have timeout thresholds. A 30-second LLM call will trip many of them.
- Resource exhaustion. Each open connection consumes memory and a slot in your connection pool. Under load, synchronous LLM calls will exhaust your server capacity.
- Poor user experience. Users do not want to wait 30 seconds on a loading screen. They want to submit a request, go do something else, and get notified when it is done.
- Retry complexity. If a synchronous request fails halfway through, the client has to resubmit. With async processing, the queue handles retries automatically.
- No progress visibility. A synchronous call is a black box. An async job can report progress at each step.
The solution is to decouple the request submission from the request processing. The client submits a job, gets back a job ID immediately, and then either polls for status, listens on a WebSocket, or receives a webhook callback when the job completes.
Request-Response vs Fire-and-Forget Patterns
There are two fundamental patterns for handling slow operations:
Synchronous request-response: The client sends a request and waits for the response. Simple, but blocks the client and the server. Only acceptable for LLM calls under about 5 seconds.
Async fire-and-forget: The client submits a job, receives an acknowledgment with a job ID, and retrieves results later. This is what you want for anything that takes more than a few seconds.
Synchronous:
Client --POST /api/generate--> Server --call LLM--> Server --response--> Client
(client blocked for 15-30 seconds)
Asynchronous:
Client --POST /api/jobs--> Server --enqueue--> Queue
Client <--202 { jobId }-- Server
Worker --dequeue--> Worker --call LLM--> Worker --store result-->
Client --GET /api/jobs/:id--> Server --lookup--> { status: "completed", result: "..." }
The async model gives you natural retry behavior, horizontal scaling, priority ordering, and progress reporting. There is no good reason to use synchronous processing for LLM workloads in production.
Building the Async Job Queue with Bull and Redis
Bull is the battle-tested job queue for Node.js. It uses Redis as its backing store, supports delayed jobs, retries, priority queues, rate limiting, and dead letter queues. BullMQ is the newer version with a cleaner API, but Bull is still widely used and well-documented. We will use Bull here.
Queue Setup
var Queue = require("bull");
var Redis = require("ioredis");
var REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
var llmQueue = new Queue("llm-processing", REDIS_URL, {
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 5000
},
removeOnComplete: 100,
removeOnFail: 200,
timeout: 120000
}
});
module.exports = { llmQueue: llmQueue };
A few things to note here. The attempts: 3 with exponential backoff means that if an LLM API call fails (rate limit, server error, timeout), Bull will retry it automatically after 5 seconds, then 10 seconds, then 20 seconds. The timeout: 120000 kills any job that runs longer than two minutes. The removeOnComplete: 100 keeps only the last 100 completed jobs in Redis to prevent unbounded memory growth.
Submitting Jobs
var express = require("express");
var { v4: uuidv4 } = require("uuid");
var { llmQueue } = require("./queue");
var router = express.Router();
// Submit an LLM processing job
router.post("/api/jobs", function(req, res) {
var idempotencyKey = req.headers["idempotency-key"] || uuidv4();
var prompt = req.body.prompt;
var model = req.body.model || "gpt-4o";
var webhookUrl = req.body.webhook_url || null;
var priority = req.body.priority || 5;
var callbackHeaders = req.body.callback_headers || {};
if (!prompt) {
return res.status(400).json({ error: "prompt is required" });
}
// Check for duplicate submission using idempotency key
llmQueue.getJob(idempotencyKey).then(function(existingJob) {
if (existingJob) {
return res.status(200).json({
job_id: existingJob.id,
status: "already_submitted",
message: "Duplicate submission detected"
});
}
var jobData = {
prompt: prompt,
model: model,
webhook_url: webhookUrl,
callback_headers: callbackHeaders,
submitted_at: new Date().toISOString(),
idempotency_key: idempotencyKey
};
llmQueue.add(jobData, {
jobId: idempotencyKey,
priority: priority
}).then(function(job) {
res.status(202).json({
job_id: job.id,
status: "queued",
poll_url: "/api/jobs/" + job.id,
websocket_url: "/ws/jobs/" + job.id
});
}).catch(function(err) {
console.error("Failed to enqueue job:", err);
res.status(500).json({ error: "Failed to enqueue job" });
});
});
});
module.exports = router;
The 202 Accepted response is deliberate. It tells the client "I received your request and will process it, but it is not done yet." The response includes three ways for the client to get results: a polling URL, a WebSocket URL, and an optional webhook (submitted in the request body).
Idempotency Keys for Reliable Job Submission
Network failures happen. Clients retry. Without idempotency keys, a retry creates a duplicate job. With idempotency keys, the server detects the duplicate and returns the original job ID.
The pattern is simple. The client generates a UUID and sends it in the Idempotency-Key header. The server uses that UUID as the Bull job ID. If a job with that ID already exists, we return it instead of creating a new one.
// Client-side submission with idempotency
var axios = require("axios");
var { v4: uuidv4 } = require("uuid");
function submitLLMJob(prompt, options) {
var idempotencyKey = options.idempotencyKey || uuidv4();
return axios.post("https://api.example.com/api/jobs", {
prompt: prompt,
model: options.model || "gpt-4o",
webhook_url: options.webhookUrl || null,
priority: options.priority || 5
}, {
headers: {
"Content-Type": "application/json",
"Idempotency-Key": idempotencyKey
},
timeout: 10000
}).then(function(response) {
return response.data;
});
}
Store the idempotency key on the client side so that if the initial request times out, the retry uses the same key. This guarantees exactly-once job creation regardless of how many times the client retries.
Job Status Tracking and Polling Endpoints
Once a job is submitted, the client needs to check on it. The simplest approach is polling.
// Status polling endpoint
router.get("/api/jobs/:jobId", function(req, res) {
var jobId = req.params.jobId;
llmQueue.getJob(jobId).then(function(job) {
if (!job) {
return res.status(404).json({ error: "Job not found" });
}
return job.getState().then(function(state) {
var response = {
job_id: job.id,
status: state,
submitted_at: job.data.submitted_at,
model: job.data.model
};
if (state === "completed") {
response.result = job.returnvalue;
response.completed_at = new Date(job.finishedOn).toISOString();
response.processing_time_ms = job.finishedOn - job.processedOn;
}
if (state === "failed") {
response.error = job.failedReason;
response.attempts_made = job.attemptsMade;
}
if (state === "active") {
response.progress = job.progress();
response.started_at = new Date(job.processedOn).toISOString();
}
res.json(response);
});
}).catch(function(err) {
console.error("Status lookup failed:", err);
res.status(500).json({ error: "Failed to retrieve job status" });
});
});
For polling, use exponential backoff on the client side. Start with a 1-second interval, double it each time, and cap at 10 seconds. Do not poll every 100ms. That is a denial-of-service attack on your own API.
// Client-side polling with exponential backoff
function pollJobStatus(jobId, onComplete, onError) {
var interval = 1000;
var maxInterval = 10000;
function poll() {
axios.get("https://api.example.com/api/jobs/" + jobId)
.then(function(response) {
var job = response.data;
if (job.status === "completed") {
return onComplete(job);
}
if (job.status === "failed") {
return onError(new Error(job.error));
}
// Still processing, poll again with backoff
interval = Math.min(interval * 1.5, maxInterval);
setTimeout(poll, interval);
})
.catch(function(err) {
onError(err);
});
}
poll();
}
WebSocket Notifications for Job Completion
Polling works but it is wasteful. WebSockets give you real-time push notifications with zero wasted requests.
var WebSocket = require("ws");
var http = require("http");
var express = require("express");
var { llmQueue } = require("./queue");
var app = express();
var server = http.createServer(app);
var wss = new WebSocket.Server({ server: server, path: "/ws" });
// Track which clients are watching which jobs
var jobSubscribers = {};
wss.on("connection", function(ws, req) {
var subscribedJobs = [];
ws.on("message", function(message) {
try {
var data = JSON.parse(message);
if (data.type === "subscribe" && data.job_id) {
var jobId = data.job_id;
subscribedJobs.push(jobId);
if (!jobSubscribers[jobId]) {
jobSubscribers[jobId] = [];
}
jobSubscribers[jobId].push(ws);
ws.send(JSON.stringify({
type: "subscribed",
job_id: jobId
}));
}
} catch (err) {
ws.send(JSON.stringify({ type: "error", message: "Invalid message format" }));
}
});
ws.on("close", function() {
// Clean up subscriptions
subscribedJobs.forEach(function(jobId) {
if (jobSubscribers[jobId]) {
jobSubscribers[jobId] = jobSubscribers[jobId].filter(function(client) {
return client !== ws;
});
if (jobSubscribers[jobId].length === 0) {
delete jobSubscribers[jobId];
}
}
});
});
});
// Notify subscribers when jobs complete
function notifyJobComplete(jobId, result) {
var subscribers = jobSubscribers[jobId] || [];
var payload = JSON.stringify({
type: "job_completed",
job_id: jobId,
result: result,
completed_at: new Date().toISOString()
});
subscribers.forEach(function(ws) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(payload);
}
});
delete jobSubscribers[jobId];
}
function notifyJobProgress(jobId, progress, stepDescription) {
var subscribers = jobSubscribers[jobId] || [];
var payload = JSON.stringify({
type: "job_progress",
job_id: jobId,
progress: progress,
step: stepDescription
});
subscribers.forEach(function(ws) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(payload);
}
});
}
module.exports = {
server: server,
notifyJobComplete: notifyJobComplete,
notifyJobProgress: notifyJobProgress
};
Implementing Webhook Callbacks
Webhooks let external systems receive results without maintaining a persistent connection. When a job completes, you POST the result to whatever URL the client provided at submission time.
var axios = require("axios");
var crypto = require("crypto");
var WEBHOOK_SECRET = process.env.WEBHOOK_SECRET || "your-webhook-signing-secret";
var WEBHOOK_TIMEOUT = 10000;
var WEBHOOK_MAX_RETRIES = 3;
function signPayload(payload) {
var hmac = crypto.createHmac("sha256", WEBHOOK_SECRET);
hmac.update(JSON.stringify(payload));
return hmac.digest("hex");
}
function deliverWebhook(webhookUrl, payload, callbackHeaders, retryCount) {
retryCount = retryCount || 0;
var signature = signPayload(payload);
var headers = {
"Content-Type": "application/json",
"X-Webhook-Signature": signature,
"X-Webhook-Timestamp": Date.now().toString()
};
// Merge in any custom headers the client requested
Object.keys(callbackHeaders).forEach(function(key) {
headers[key] = callbackHeaders[key];
});
return axios.post(webhookUrl, payload, {
headers: headers,
timeout: WEBHOOK_TIMEOUT
}).then(function(response) {
console.log("Webhook delivered to", webhookUrl, "status:", response.status);
return { success: true, status: response.status };
}).catch(function(err) {
console.error("Webhook delivery failed:", webhookUrl, err.message);
if (retryCount < WEBHOOK_MAX_RETRIES) {
var delay = Math.pow(2, retryCount) * 1000;
return new Promise(function(resolve) {
setTimeout(function() {
resolve(deliverWebhook(webhookUrl, payload, callbackHeaders, retryCount + 1));
}, delay);
});
}
return { success: false, error: err.message, attempts: retryCount + 1 };
});
}
module.exports = { deliverWebhook: deliverWebhook, signPayload: signPayload };
Always sign your webhook payloads. The receiving server should verify the HMAC signature before trusting the payload. This prevents anyone from spoofing webhook deliveries.
On the receiving end, verification looks like this:
// Webhook receiver verification
function verifyWebhookSignature(req, secret) {
var signature = req.headers["x-webhook-signature"];
var payload = JSON.stringify(req.body);
var hmac = crypto.createHmac("sha256", secret);
hmac.update(payload);
var expected = hmac.digest("hex");
return crypto.timingSafeEqual(
Buffer.from(signature, "hex"),
Buffer.from(expected, "hex")
);
}
The Worker: Processing LLM Jobs
The worker is a separate process that pulls jobs from the queue and processes them. This separation is critical. Your web server handles HTTP requests. Your workers handle LLM processing. They scale independently.
var { llmQueue } = require("./queue");
var axios = require("axios");
var { deliverWebhook } = require("./webhooks");
var { notifyJobComplete, notifyJobProgress } = require("./websockets");
var OPENAI_API_KEY = process.env.OPENAI_API_KEY;
function callLLMApi(model, prompt) {
return axios.post("https://api.openai.com/v1/chat/completions", {
model: model,
messages: [{ role: "user", content: prompt }],
max_tokens: 4096
}, {
headers: {
"Authorization": "Bearer " + OPENAI_API_KEY,
"Content-Type": "application/json"
},
timeout: 90000
}).then(function(response) {
return {
content: response.data.choices[0].message.content,
usage: response.data.usage,
model: response.data.model
};
});
}
llmQueue.process(5, function(job) {
var data = job.data;
console.log("Processing job", job.id, "model:", data.model);
job.progress(10);
notifyJobProgress(job.id, 10, "Starting LLM API call");
return callLLMApi(data.model, data.prompt)
.then(function(result) {
job.progress(80);
notifyJobProgress(job.id, 80, "LLM response received");
// Deliver webhook if one was configured
if (data.webhook_url) {
var webhookPayload = {
event: "job.completed",
job_id: job.id,
result: result,
submitted_at: data.submitted_at,
completed_at: new Date().toISOString()
};
return deliverWebhook(data.webhook_url, webhookPayload, data.callback_headers || {})
.then(function() {
job.progress(100);
notifyJobComplete(job.id, result);
return result;
});
}
job.progress(100);
notifyJobComplete(job.id, result);
return result;
});
});
llmQueue.on("failed", function(job, err) {
console.error("Job", job.id, "failed after", job.attemptsMade, "attempts:", err.message);
// Notify via webhook that the job failed
if (job.data.webhook_url && job.attemptsMade >= job.opts.attempts) {
var failurePayload = {
event: "job.failed",
job_id: job.id,
error: err.message,
attempts: job.attemptsMade,
submitted_at: job.data.submitted_at,
failed_at: new Date().toISOString()
};
deliverWebhook(job.data.webhook_url, failurePayload, job.data.callback_headers || {});
}
});
console.log("LLM worker started, processing up to 5 concurrent jobs");
The process(5, ...) call tells Bull to run up to 5 jobs concurrently per worker process. Tune this based on your LLM API rate limits. If you are on OpenAI's tier-2 rate limit, you might be able to run 10-20 concurrent jobs. If you are on a lower tier, keep it at 2-3.
Dead Letter Queues for Failed LLM Jobs
When a job exhausts all retries, you need to know about it. Dead letter queues (DLQs) capture permanently failed jobs for manual review or automated reprocessing.
var Queue = require("bull");
var deadLetterQueue = new Queue("llm-dead-letter", process.env.REDIS_URL || "redis://127.0.0.1:6379");
// Move permanently failed jobs to the dead letter queue
llmQueue.on("failed", function(job, err) {
if (job.attemptsMade >= job.opts.attempts) {
deadLetterQueue.add({
original_job_id: job.id,
original_data: job.data,
error: err.message,
stack: err.stack,
attempts: job.attemptsMade,
failed_at: new Date().toISOString()
}, {
removeOnComplete: false
});
console.error("Job", job.id, "moved to dead letter queue after", job.attemptsMade, "failures");
}
});
// Admin endpoint to inspect and retry dead letter jobs
router.get("/api/admin/dead-letter", function(req, res) {
deadLetterQueue.getWaiting(0, 50).then(function(jobs) {
var results = jobs.map(function(job) {
return {
dlq_id: job.id,
original_job_id: job.data.original_job_id,
error: job.data.error,
failed_at: job.data.failed_at,
prompt_preview: job.data.original_data.prompt.substring(0, 100)
};
});
res.json({ count: results.length, jobs: results });
});
});
// Retry a dead letter job
router.post("/api/admin/dead-letter/:dlqId/retry", function(req, res) {
deadLetterQueue.getJob(req.params.dlqId).then(function(dlqJob) {
if (!dlqJob) {
return res.status(404).json({ error: "Dead letter job not found" });
}
return llmQueue.add(dlqJob.data.original_data, {
priority: 1
}).then(function(newJob) {
return dlqJob.remove().then(function() {
res.json({
message: "Job resubmitted",
new_job_id: newJob.id,
original_job_id: dlqJob.data.original_job_id
});
});
});
});
});
Priority Queues for Different User Tiers
Not all jobs are equal. Your paying customers should get priority over free-tier users. Bull supports numeric priorities where lower numbers mean higher priority.
var PRIORITY_MAP = {
enterprise: 1,
pro: 3,
free: 10
};
router.post("/api/jobs", function(req, res) {
var userTier = req.user ? req.user.tier : "free";
var priority = PRIORITY_MAP[userTier] || 10;
llmQueue.add(jobData, {
jobId: idempotencyKey,
priority: priority
}).then(function(job) {
res.status(202).json({
job_id: job.id,
status: "queued",
priority: userTier,
estimated_wait: getEstimatedWait(priority)
});
});
});
function getEstimatedWait(priority) {
return llmQueue.getWaitingCount().then(function(waiting) {
// Rough estimate: 10 seconds per job ahead of you, adjusted by priority
var effectiveQueue = Math.floor(waiting * (priority / 5));
return effectiveQueue * 10;
});
}
Progress Reporting for Multi-Step LLM Workflows
Real LLM workloads are not single API calls. They are multi-step pipelines: retrieve context, build a prompt, call the LLM, post-process the output, maybe call the LLM again. Report progress at each step.
function processMultiStepJob(job) {
var data = job.data;
var steps = [
{ name: "Retrieving context", weight: 15 },
{ name: "Building prompt", weight: 5 },
{ name: "Calling LLM (step 1)", weight: 30 },
{ name: "Processing intermediate result", weight: 10 },
{ name: "Calling LLM (step 2)", weight: 30 },
{ name: "Formatting output", weight: 10 }
];
var totalProgress = 0;
function updateProgress(stepIndex) {
totalProgress = 0;
for (var i = 0; i < stepIndex; i++) {
totalProgress += steps[i].weight;
}
job.progress(totalProgress);
notifyJobProgress(job.id, totalProgress, steps[stepIndex].name);
}
updateProgress(0);
return retrieveContext(data.prompt)
.then(function(context) {
updateProgress(1);
return buildPrompt(data.prompt, context);
})
.then(function(fullPrompt) {
updateProgress(2);
return callLLMApi(data.model, fullPrompt);
})
.then(function(firstResult) {
updateProgress(3);
return processIntermediate(firstResult);
})
.then(function(refinedPrompt) {
updateProgress(4);
return callLLMApi(data.model, refinedPrompt);
})
.then(function(finalResult) {
updateProgress(5);
return formatOutput(finalResult);
})
.then(function(formattedResult) {
job.progress(100);
notifyJobProgress(job.id, 100, "Complete");
return formattedResult;
});
}
Combining Webhooks with Server-Sent Events for Real-Time Updates
WebSockets are powerful but require a persistent bidirectional connection. Server-Sent Events (SSE) are simpler for one-way real-time updates and work through HTTP proxies without special configuration.
// SSE endpoint for job status updates
router.get("/api/jobs/:jobId/events", function(req, res) {
var jobId = req.params.jobId;
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
});
res.write("data: " + JSON.stringify({ type: "connected", job_id: jobId }) + "\n\n");
// Listen for job events
var progressHandler = function(job, progress) {
if (job.id.toString() === jobId) {
res.write("data: " + JSON.stringify({
type: "progress",
job_id: jobId,
progress: progress
}) + "\n\n");
}
};
var completedHandler = function(job, result) {
if (job.id.toString() === jobId) {
res.write("data: " + JSON.stringify({
type: "completed",
job_id: jobId,
result: result
}) + "\n\n");
cleanup();
res.end();
}
};
var failedHandler = function(job, err) {
if (job.id.toString() === jobId) {
res.write("data: " + JSON.stringify({
type: "failed",
job_id: jobId,
error: err.message
}) + "\n\n");
cleanup();
res.end();
}
};
function cleanup() {
llmQueue.removeListener("progress", progressHandler);
llmQueue.removeListener("completed", completedHandler);
llmQueue.removeListener("failed", failedHandler);
}
llmQueue.on("progress", progressHandler);
llmQueue.on("completed", completedHandler);
llmQueue.on("failed", failedHandler);
req.on("close", function() {
cleanup();
});
});
SSE is my preferred approach for browser clients. It auto-reconnects, works through CDNs and proxies, and the EventSource API is dead simple on the client side:
// Browser client using SSE
var source = new EventSource("/api/jobs/" + jobId + "/events");
source.onmessage = function(event) {
var data = JSON.parse(event.data);
if (data.type === "progress") {
updateProgressBar(data.progress);
}
if (data.type === "completed") {
displayResult(data.result);
source.close();
}
if (data.type === "failed") {
displayError(data.error);
source.close();
}
};
Scaling Workers Horizontally
Because workers are separate processes, you scale them independently from your web servers. Each worker process connects to the same Redis instance and pulls jobs from the same queue.
// worker.js - run multiple instances of this
var os = require("os");
var cluster = require("cluster");
var WORKER_CONCURRENCY = parseInt(process.env.WORKER_CONCURRENCY) || 5;
var WORKER_PROCESSES = parseInt(process.env.WORKER_PROCESSES) || os.cpus().length;
if (cluster.isMaster) {
console.log("Master process", process.pid, "starting", WORKER_PROCESSES, "workers");
for (var i = 0; i < WORKER_PROCESSES; i++) {
cluster.fork();
}
cluster.on("exit", function(worker, code) {
console.error("Worker", worker.process.pid, "died with code", code, "- restarting");
cluster.fork();
});
} else {
var { llmQueue } = require("./queue");
llmQueue.process(WORKER_CONCURRENCY, function(job) {
return processLLMJob(job);
});
console.log("Worker", process.pid, "started with concurrency", WORKER_CONCURRENCY);
}
With 4 worker processes each handling 5 concurrent jobs, you get 20 concurrent LLM API calls. Add more machines and you scale linearly. The only bottleneck is Redis and your LLM API rate limits.
Handling Long-Running LLM Tasks
Some LLM tasks take minutes, not seconds. Multi-step agents, batch document processing, RAG pipelines with large retrieval sets. For these, you need to extend your timeout and implement heartbeat-based liveness detection.
// Extended timeout with progress heartbeats
llmQueue.process(2, function(job) {
var heartbeatInterval = setInterval(function() {
// Bull uses progress updates as heartbeats
// This prevents the job from being marked as stalled
job.progress(job.progress());
}, 15000);
return processLongRunningJob(job)
.then(function(result) {
clearInterval(heartbeatInterval);
return result;
})
.catch(function(err) {
clearInterval(heartbeatInterval);
throw err;
});
});
// Configure longer stall detection for long-running jobs
var longRunningQueue = new Queue("llm-long-running", process.env.REDIS_URL, {
settings: {
stalledInterval: 60000,
maxStalledCount: 2,
lockDuration: 300000
},
defaultJobOptions: {
attempts: 2,
timeout: 600000
}
});
The stalledInterval controls how often Bull checks for stalled jobs. The lockDuration is how long a job lock is held before it is considered stalled. For long-running tasks, increase both of these significantly.
Batch Processing
When you need to process hundreds or thousands of prompts, submit them as a batch and track overall progress.
router.post("/api/batches", function(req, res) {
var prompts = req.body.prompts;
var batchId = uuidv4();
if (!Array.isArray(prompts) || prompts.length === 0) {
return res.status(400).json({ error: "prompts must be a non-empty array" });
}
if (prompts.length > 1000) {
return res.status(400).json({ error: "Maximum 1000 prompts per batch" });
}
var jobPromises = prompts.map(function(prompt, index) {
return llmQueue.add({
prompt: prompt,
model: req.body.model || "gpt-4o",
batch_id: batchId,
batch_index: index,
batch_total: prompts.length,
webhook_url: null
}, {
priority: 8
});
});
Promise.all(jobPromises).then(function(jobs) {
var jobIds = jobs.map(function(job) { return job.id; });
res.status(202).json({
batch_id: batchId,
job_count: jobs.length,
job_ids: jobIds,
poll_url: "/api/batches/" + batchId
});
});
});
// Batch status endpoint
router.get("/api/batches/:batchId", function(req, res) {
var batchId = req.params.batchId;
llmQueue.getJobs(["completed", "failed", "active", "waiting"]).then(function(jobs) {
var batchJobs = jobs.filter(function(job) {
return job.data.batch_id === batchId;
});
var statusCounts = { completed: 0, failed: 0, active: 0, waiting: 0 };
var statusPromises = batchJobs.map(function(job) {
return job.getState().then(function(state) {
statusCounts[state] = (statusCounts[state] || 0) + 1;
return { id: job.id, state: state, index: job.data.batch_index };
});
});
Promise.all(statusPromises).then(function(jobStatuses) {
var total = batchJobs.length;
var progress = total > 0 ? Math.round((statusCounts.completed / total) * 100) : 0;
res.json({
batch_id: batchId,
total: total,
progress: progress,
status_counts: statusCounts,
is_complete: statusCounts.completed + statusCounts.failed === total
});
});
});
});
Complete Working Example
Here is the full application wired together. Save this as your entry point and run it alongside a worker process.
// server.js - Main API server
var express = require("express");
var http = require("http");
var WebSocket = require("ws");
var Queue = require("bull");
var { v4: uuidv4 } = require("uuid");
var axios = require("axios");
var crypto = require("crypto");
var app = express();
var server = http.createServer(app);
app.use(express.json());
// ---- Queue Setup ----
var REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
var llmQueue = new Queue("llm-processing", REDIS_URL, {
defaultJobOptions: {
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
removeOnComplete: 100,
removeOnFail: 200,
timeout: 120000
}
});
var deadLetterQueue = new Queue("llm-dead-letter", REDIS_URL);
// ---- WebSocket Setup ----
var wss = new WebSocket.Server({ server: server, path: "/ws" });
var jobSubscribers = {};
wss.on("connection", function(ws) {
var subscriptions = [];
ws.on("message", function(raw) {
try {
var msg = JSON.parse(raw);
if (msg.type === "subscribe" && msg.job_id) {
subscriptions.push(msg.job_id);
if (!jobSubscribers[msg.job_id]) jobSubscribers[msg.job_id] = [];
jobSubscribers[msg.job_id].push(ws);
ws.send(JSON.stringify({ type: "subscribed", job_id: msg.job_id }));
}
} catch (e) { /* ignore malformed messages */ }
});
ws.on("close", function() {
subscriptions.forEach(function(jobId) {
if (jobSubscribers[jobId]) {
jobSubscribers[jobId] = jobSubscribers[jobId].filter(function(c) { return c !== ws; });
if (jobSubscribers[jobId].length === 0) delete jobSubscribers[jobId];
}
});
});
});
function broadcastToJob(jobId, payload) {
var subs = jobSubscribers[jobId] || [];
var msg = JSON.stringify(payload);
subs.forEach(function(ws) {
if (ws.readyState === WebSocket.OPEN) ws.send(msg);
});
}
// ---- Webhook Delivery ----
var WEBHOOK_SECRET = process.env.WEBHOOK_SECRET || "change-me-in-production";
function deliverWebhook(url, payload, headers, retries) {
retries = retries || 0;
var signature = crypto.createHmac("sha256", WEBHOOK_SECRET)
.update(JSON.stringify(payload)).digest("hex");
var finalHeaders = { "Content-Type": "application/json", "X-Webhook-Signature": signature };
Object.keys(headers || {}).forEach(function(k) { finalHeaders[k] = headers[k]; });
return axios.post(url, payload, { headers: finalHeaders, timeout: 10000 })
.catch(function(err) {
if (retries < 3) {
return new Promise(function(resolve) {
setTimeout(function() {
resolve(deliverWebhook(url, payload, headers, retries + 1));
}, Math.pow(2, retries) * 1000);
});
}
console.error("Webhook delivery permanently failed:", url, err.message);
});
}
// ---- API Routes ----
// Submit a job
app.post("/api/jobs", function(req, res) {
var idempotencyKey = req.headers["idempotency-key"] || uuidv4();
if (!req.body.prompt) {
return res.status(400).json({ error: "prompt is required" });
}
llmQueue.getJob(idempotencyKey).then(function(existing) {
if (existing) {
return res.status(200).json({ job_id: existing.id, status: "already_submitted" });
}
return llmQueue.add({
prompt: req.body.prompt,
model: req.body.model || "gpt-4o",
webhook_url: req.body.webhook_url || null,
callback_headers: req.body.callback_headers || {},
submitted_at: new Date().toISOString()
}, {
jobId: idempotencyKey,
priority: req.body.priority || 5
}).then(function(job) {
res.status(202).json({
job_id: job.id,
status: "queued",
poll_url: "/api/jobs/" + job.id
});
});
}).catch(function(err) {
console.error("Job submission error:", err);
res.status(500).json({ error: "Internal server error" });
});
});
// Check job status
app.get("/api/jobs/:jobId", function(req, res) {
llmQueue.getJob(req.params.jobId).then(function(job) {
if (!job) return res.status(404).json({ error: "Job not found" });
return job.getState().then(function(state) {
var response = { job_id: job.id, status: state, submitted_at: job.data.submitted_at };
if (state === "completed") {
response.result = job.returnvalue;
response.processing_time_ms = job.finishedOn - job.processedOn;
}
if (state === "failed") {
response.error = job.failedReason;
response.attempts_made = job.attemptsMade;
}
if (state === "active") {
response.progress = job.progress();
}
res.json(response);
});
});
});
// SSE stream for job events
app.get("/api/jobs/:jobId/events", function(req, res) {
var jobId = req.params.jobId;
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
});
var onProgress = function(job, progress) {
if (job.id.toString() === jobId)
res.write("data: " + JSON.stringify({ type: "progress", progress: progress }) + "\n\n");
};
var onComplete = function(job, result) {
if (job.id.toString() === jobId) {
res.write("data: " + JSON.stringify({ type: "completed", result: result }) + "\n\n");
cleanup(); res.end();
}
};
var onFailed = function(job, err) {
if (job.id.toString() === jobId) {
res.write("data: " + JSON.stringify({ type: "failed", error: err.message }) + "\n\n");
cleanup(); res.end();
}
};
function cleanup() {
llmQueue.removeListener("progress", onProgress);
llmQueue.removeListener("completed", onComplete);
llmQueue.removeListener("failed", onFailed);
}
llmQueue.on("progress", onProgress);
llmQueue.on("completed", onComplete);
llmQueue.on("failed", onFailed);
req.on("close", cleanup);
});
// ---- Health check ----
app.get("/health", function(req, res) {
res.json({ status: "ok", queue: "llm-processing", timestamp: new Date().toISOString() });
});
// ---- Start server ----
var PORT = process.env.PORT || 3000;
server.listen(PORT, function() {
console.log("LLM async API server running on port", PORT);
});
// worker.js - Run separately: node worker.js
var Queue = require("bull");
var axios = require("axios");
var REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
var OPENAI_API_KEY = process.env.OPENAI_API_KEY;
var llmQueue = new Queue("llm-processing", REDIS_URL);
var deadLetterQueue = new Queue("llm-dead-letter", REDIS_URL);
function callLLM(model, prompt) {
return axios.post("https://api.openai.com/v1/chat/completions", {
model: model,
messages: [{ role: "user", content: prompt }],
max_tokens: 4096
}, {
headers: { "Authorization": "Bearer " + OPENAI_API_KEY },
timeout: 90000
}).then(function(res) {
return {
content: res.data.choices[0].message.content,
usage: res.data.usage,
model: res.data.model
};
});
}
llmQueue.process(5, function(job) {
console.log("Processing job:", job.id);
job.progress(10);
return callLLM(job.data.model, job.data.prompt)
.then(function(result) {
job.progress(100);
console.log("Job completed:", job.id, "tokens:", result.usage.total_tokens);
return result;
});
});
llmQueue.on("failed", function(job, err) {
if (job.attemptsMade >= job.opts.attempts) {
deadLetterQueue.add({
original_job_id: job.id,
original_data: job.data,
error: err.message,
failed_at: new Date().toISOString()
});
console.error("Job", job.id, "moved to DLQ after", job.attemptsMade, "failures");
}
});
console.log("Worker started, concurrency: 5");
Start the system with two terminals:
# Terminal 1: API server
REDIS_URL=redis://localhost:6379 node server.js
# Terminal 2: Worker process
REDIS_URL=redis://localhost:6379 OPENAI_API_KEY=sk-... node worker.js
Test it:
# Submit a job
curl -X POST http://localhost:3000/api/jobs \
-H "Content-Type: application/json" \
-H "Idempotency-Key: test-123" \
-d '{"prompt": "Explain TCP handshake in 3 sentences", "model": "gpt-4o"}'
# Poll for status
curl http://localhost:3000/api/jobs/test-123
Common Issues and Troubleshooting
1. Redis Connection Refused
Error: connect ECONNREFUSED 127.0.0.1:6379
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1141:16)
Bull requires a running Redis instance. Make sure Redis is running. On macOS: brew services start redis. On Linux: sudo systemctl start redis. On Windows, use Docker: docker run -d -p 6379:6379 redis:7. Verify with redis-cli ping which should return PONG.
2. Job Stalled and Re-Processed
warning: Job 5f3a-b2c1 stalled and will be re-processed
This happens when a worker takes too long to send a progress update or the event loop is blocked. Bull assumes the worker died and reassigns the job. Fix this by either increasing stalledInterval in queue settings, or adding periodic job.progress() calls as heartbeats in long-running jobs. If your LLM calls take more than 30 seconds, set stalledInterval to at least 60000ms.
3. Webhook Signature Verification Fails
Error: Webhook signature mismatch - expected a]3f2b... got 7c91d0...
The most common cause is the webhook receiver parsing the body before verifying the signature. If your Express middleware runs express.json() before your verification middleware, the re-serialized body may differ from the original. Use express.raw({ type: 'application/json' }) for webhook endpoints and verify against the raw buffer, then parse JSON yourself.
4. Memory Leak from Completed Jobs
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
Bull stores completed and failed jobs in Redis by default. If you process millions of jobs without cleanup, Redis memory grows unbounded. Always set removeOnComplete and removeOnFail in your job options. Use a number (e.g., removeOnComplete: 100) to keep the last N jobs for debugging, or true to remove them immediately. Monitor Redis memory with redis-cli INFO memory.
5. Rate Limit Errors from LLM Provider
Error: Request failed with status code 429 - Rate limit exceeded
Your workers are submitting requests faster than your LLM API rate limit allows. Use Bull's built-in rate limiter to throttle job processing:
var rateLimitedQueue = new Queue("llm-processing", REDIS_URL, {
limiter: {
max: 50,
duration: 60000
}
});
This caps processing at 50 jobs per minute across all workers. Adjust based on your API tier.
6. WebSocket Connections Dropping Behind Load Balancer
WebSocket connection to 'wss://api.example.com/ws' failed: WebSocket is closed before the connection is established.
Most load balancers and reverse proxies have WebSocket timeout settings that are separate from HTTP timeouts. In Nginx, add proxy_read_timeout 3600s; and ensure proxy_set_header Upgrade $http_upgrade; and proxy_set_header Connection "upgrade"; are set. For AWS ALB, enable sticky sessions for WebSocket routes.
Best Practices
Always return 202 Accepted for async operations. Do not return 200. The 202 status code specifically means "the request has been accepted for processing, but the processing has not been completed." This is semantically correct and tells clients they need to check back later.
Implement idempotency keys on every job submission endpoint. Network failures cause retries. Retries without idempotency create duplicate work. Use UUIDs as job IDs and check for existing jobs before creating new ones.
Separate your web server from your workers. They have completely different scaling characteristics. Your web server needs to handle thousands of quick HTTP requests. Your workers need to handle a handful of slow LLM calls. Run them as separate processes or separate containers.
Set aggressive timeouts on LLM API calls. If a call has not responded in 90 seconds, it is probably not going to. Kill it and retry. The cost of a retry is much lower than the cost of a hung worker.
Monitor your dead letter queue. If jobs are consistently failing and landing in the DLQ, you have a systemic problem (wrong API key, model deprecation, prompt too large). Set up alerts when the DLQ size exceeds a threshold.
Use exponential backoff everywhere. For retries, for polling, for webhook delivery. Linear retries hammer downstream services. Exponential backoff is gentler and more effective.
Sign your webhook payloads with HMAC-SHA256. This is not optional. Without signatures, anyone can send fake webhook payloads to your clients' endpoints.
Keep job payloads small. Store large prompts and results in a database or object store, and reference them by ID in the job data. Redis is fast but expensive for large values. A 100KB prompt times a million jobs is 100GB of Redis memory.
Log job lifecycle events. Log when a job is submitted, started, progressed, completed, failed, and retried. When something goes wrong at 3 AM, these logs are the difference between a 5-minute fix and a 5-hour investigation.
Implement graceful shutdown in your workers. When you deploy new code, workers need to finish their current jobs before exiting. Use Bull's
queue.close()method which waits for active jobs to complete before shutting down the worker.
process.on("SIGTERM", function() {
console.log("SIGTERM received, closing queue...");
llmQueue.close().then(function() {
console.log("Queue closed, exiting");
process.exit(0);
});
});