Deploying AI Agents in Production
Deploy AI agents to production with Docker, queue-based scaling, health checks, graceful shutdown, and structured logging in Node.js.
Deploying AI Agents in Production
Running an AI agent on your laptop is one thing. Running it in production where real users depend on it, where it needs to survive node failures, handle bursty traffic, and degrade gracefully when an upstream API goes down — that is an entirely different problem. This article walks through the full production deployment lifecycle for AI agent services in Node.js, from containerization and queue-based scaling to circuit breakers, blue-green deployments, and structured observability.
Prerequisites
- Solid understanding of Node.js and Express
- Working knowledge of Docker and container orchestration basics
- Familiarity with message queues (Redis, RabbitMQ, or similar)
- Experience with PostgreSQL for persistent state
- Basic understanding of AI agent architecture (tool calling, multi-step reasoning)
- A DigitalOcean account (or equivalent cloud platform)
Production Readiness Checklist
Before you deploy an agent service, run through this checklist. Every item I have seen cause an outage in production at least once:
- Timeouts on every external call. LLM APIs can hang for 60+ seconds. Your HTTP client needs explicit timeouts.
- Retry logic with exponential backoff. Transient failures from OpenAI, Anthropic, and tool APIs are normal, not exceptional.
- Graceful shutdown. An agent mid-task cannot just be killed. It needs to finish or checkpoint its work.
- Health checks. Your load balancer and orchestrator need to know if your service is alive and ready.
- Structured logging.
console.log("something happened")does not scale. You need JSON logs with correlation IDs. - Persistent state. Agent tasks, conversation history, and tool results must survive process restarts.
- Rate limiting. Both inbound (protecting your service) and outbound (respecting API quotas).
- Circuit breakers. When a tool dependency is down, fail fast instead of queuing up timeout after timeout.
- Monitoring and alerting. You need to know when agents are failing, not when users tell you.
- Rollback plan. New agent versions can behave unpredictably. You need a fast path back to the previous version.
Containerizing Agent Services with Docker
Agent services have specific containerization concerns that generic web apps do not. The main one is that agent tasks are long-running. A typical web request finishes in milliseconds; an agent task can run for minutes. Your container needs to account for this.
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:8080/health || exit 1
CMD ["node", "server.js"]
The key detail here is the HEALTHCHECK directive. The --start-period=15s gives the agent service time to connect to its database and message queue before the orchestrator starts checking. The --interval=30s is generous because agent services can be CPU-intensive during inference coordination and you do not want health check overhead competing with real work.
Build and tag your image:
docker build -t agent-service:1.0.0 .
docker tag agent-service:1.0.0 registry.digitalocean.com/myregistry/agent-service:1.0.0
docker push registry.digitalocean.com/myregistry/agent-service:1.0.0
Deploying to DigitalOcean App Platform
DigitalOcean App Platform handles SSL, routing, and basic scaling. Here is an app spec for an agent service:
name: agent-service
region: nyc
services:
- name: agent-api
image:
registry_type: DOCR
repository: agent-service
tag: 1.0.0
instance_count: 2
instance_size_slug: professional-s
http_port: 8080
health_check:
http_path: /health
initial_delay_seconds: 15
period_seconds: 30
timeout_seconds: 10
success_threshold: 1
failure_threshold: 3
envs:
- key: NODE_ENV
value: production
- key: DATABASE_URL
type: SECRET
value: "${db.DATABASE_URL}"
- key: REDIS_URL
type: SECRET
value: "${redis.DATABASE_URL}"
- key: OPENAI_API_KEY
type: SECRET
value: "your-key-here"
- key: ANTHROPIC_API_KEY
type: SECRET
value: "your-key-here"
workers:
- name: agent-worker
image:
registry_type: DOCR
repository: agent-service
tag: 1.0.0
instance_count: 3
instance_size_slug: professional-m
envs:
- key: WORKER_MODE
value: "true"
- key: DATABASE_URL
type: SECRET
value: "${db.DATABASE_URL}"
- key: REDIS_URL
type: SECRET
value: "${redis.DATABASE_URL}"
- key: OPENAI_API_KEY
type: SECRET
value: "your-key-here"
databases:
- name: db
engine: PG
version: "16"
size: db-s-1vcpu-1gb
num_nodes: 1
- name: redis
engine: REDIS
version: "7"
size: db-s-1vcpu-1gb
num_nodes: 1
Notice the separation between services (the HTTP API that accepts agent task requests) and workers (the background processes that actually execute agent tasks). This separation is critical. The API should be lightweight and fast. The workers are where the heavy computation happens, and they scale independently.
Horizontal Scaling with Queue-Based Architecture
The single most important architectural decision for production agents is decoupling task submission from task execution. You do this with a message queue.
The pattern is straightforward: the API receives a request to run an agent task, writes the task to PostgreSQL, publishes a message to Redis, and returns a task ID immediately. Worker processes pick up messages from the queue and execute the agent logic. The client polls for results or receives a webhook callback.
Client -> API Server -> PostgreSQL (task record)
-> Redis Queue (task message)
Worker <- Redis Queue <- picks up message
Worker -> executes agent task
Worker -> PostgreSQL (updates task with result)
Worker -> Webhook callback (optional)
This architecture gives you several things for free: natural load distribution across workers, automatic retry on worker failure, the ability to scale workers independently of your API, and backpressure when the system is overloaded (tasks queue up instead of overwhelming workers).
Health Checks and Liveness Probes
Agent services need two types of health checks: liveness (is the process alive?) and readiness (can it accept new work?). The distinction matters. A worker that is alive but has lost its database connection should not receive new tasks.
var express = require("express");
var app = express();
var isReady = false;
var isShuttingDown = false;
var activeTaskCount = 0;
app.get("/health", function(req, res) {
if (isShuttingDown) {
return res.status(503).json({ status: "shutting_down" });
}
res.json({ status: "ok", uptime: process.uptime() });
});
app.get("/ready", function(req, res) {
if (!isReady || isShuttingDown) {
return res.status(503).json({
status: "not_ready",
reason: isShuttingDown ? "shutting_down" : "initializing"
});
}
res.json({
status: "ready",
active_tasks: activeTaskCount
});
});
The readiness probe is particularly important for workers. When a worker is processing its maximum concurrent tasks, it should report as not ready so the orchestrator does not route new work to it. When it finishes a task and has capacity, it reports ready again.
Graceful Shutdown
This is where most agent deployments fail the first time. When your orchestrator sends a SIGTERM (because of a deploy, a scale-down, or a node drain), your process has a limited window to clean up. For agent services, "clean up" means finishing in-progress tasks or checkpointing them so they can be resumed.
var SHUTDOWN_TIMEOUT = 30000; // 30 seconds
function gracefulShutdown(signal) {
console.log(JSON.stringify({
level: "info",
message: "Shutdown signal received",
signal: signal,
active_tasks: activeTaskCount
}));
isShuttingDown = true;
// Stop accepting new tasks from the queue
if (queueSubscription) {
queueSubscription.unsubscribe();
}
// Stop accepting new HTTP requests
server.close(function() {
console.log(JSON.stringify({
level: "info",
message: "HTTP server closed"
}));
});
// Wait for active tasks to complete
var shutdownTimer = setInterval(function() {
if (activeTaskCount === 0) {
clearInterval(shutdownTimer);
console.log(JSON.stringify({
level: "info",
message: "All tasks completed, exiting"
}));
process.exit(0);
}
}, 1000);
// Force exit after timeout
setTimeout(function() {
console.log(JSON.stringify({
level: "warn",
message: "Shutdown timeout reached, forcing exit",
abandoned_tasks: activeTaskCount
}));
process.exit(1);
}, SHUTDOWN_TIMEOUT);
}
process.on("SIGTERM", function() { gracefulShutdown("SIGTERM"); });
process.on("SIGINT", function() { gracefulShutdown("SIGINT"); });
The 30-second timeout is not arbitrary. Most orchestrators (including DigitalOcean App Platform and Kubernetes) default to a 30-second grace period before sending SIGKILL. If your agent tasks can run longer than that, you need to implement checkpointing — saving the agent's intermediate state to the database so another worker can resume it.
Persistent State with PostgreSQL
Agent tasks need durable state. The task record, the conversation history, tool call results, and the final output all need to survive process restarts. PostgreSQL is the right choice here because you need ACID transactions (an agent task moving from "processing" to "completed" needs to be atomic with writing the result).
CREATE TABLE agent_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_type VARCHAR(100) NOT NULL,
input JSONB NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
result JSONB,
error TEXT,
worker_id VARCHAR(100),
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 3,
checkpoint JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_agent_tasks_status ON agent_tasks(status);
CREATE INDEX idx_agent_tasks_created ON agent_tasks(created_at);
CREATE INDEX idx_agent_tasks_worker ON agent_tasks(worker_id);
The checkpoint column is where you store intermediate agent state. If a worker dies mid-task, another worker can pick it up and resume from the checkpoint instead of starting over. The attempts and max_attempts columns give you automatic retry logic — if a task fails, it goes back to the queue as long as it has not exceeded its retry limit.
var pg = require("pg");
var pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
function createTask(taskType, input) {
return pool.query(
"INSERT INTO agent_tasks (task_type, input) VALUES ($1, $2) RETURNING id",
[taskType, JSON.stringify(input)]
).then(function(result) {
return result.rows[0].id;
});
}
function claimTask(taskId, workerId) {
return pool.query(
"UPDATE agent_tasks SET status = 'processing', worker_id = $1, started_at = NOW(), attempts = attempts + 1, updated_at = NOW() WHERE id = $2 AND status = 'pending' RETURNING *",
[workerId, taskId]
).then(function(result) {
return result.rows[0] || null;
});
}
function completeTask(taskId, result) {
return pool.query(
"UPDATE agent_tasks SET status = 'completed', result = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2",
[JSON.stringify(result), taskId]
);
}
function failTask(taskId, error, checkpoint) {
return pool.query(
"UPDATE agent_tasks SET status = CASE WHEN attempts >= max_attempts THEN 'failed' ELSE 'pending' END, error = $1, checkpoint = $2, worker_id = NULL, updated_at = NOW() WHERE id = $3",
[error, checkpoint ? JSON.stringify(checkpoint) : null, taskId]
);
}
The claimTask function uses a WHERE clause that checks status = 'pending'. This means if two workers try to claim the same task simultaneously, only one succeeds. The row-level locking in PostgreSQL handles the race condition for you.
Rate Limiting and Quota Management
Production agent services face rate limits from two directions: inbound requests from clients and outbound calls to LLM APIs. You need to manage both.
For inbound rate limiting, use a sliding window counter in Redis:
var Redis = require("ioredis");
var redis = new Redis(process.env.REDIS_URL);
function rateLimiter(options) {
var windowMs = options.windowMs || 60000;
var max = options.max || 100;
var keyPrefix = options.keyPrefix || "rl";
return function(req, res, next) {
var key = keyPrefix + ":" + (req.ip || req.headers["x-forwarded-for"]);
var now = Date.now();
var windowStart = now - windowMs;
redis.multi()
.zremrangebyscore(key, 0, windowStart)
.zadd(key, now, now + ":" + Math.random())
.zcard(key)
.pexpire(key, windowMs)
.exec(function(err, results) {
if (err) {
return next(); // fail open
}
var count = results[2][1];
res.set("X-RateLimit-Limit", max);
res.set("X-RateLimit-Remaining", Math.max(0, max - count));
if (count > max) {
return res.status(429).json({
error: "rate_limit_exceeded",
retry_after_ms: windowMs
});
}
next();
});
};
}
app.use("/api/agents", rateLimiter({ windowMs: 60000, max: 30 }));
For outbound quota management against LLM APIs, use a token bucket pattern. This is critical because LLM APIs charge per token and have strict rate limits. Blowing through your quota means your entire agent service goes down.
function TokenBucket(tokensPerSecond, maxBurst) {
this.tokensPerSecond = tokensPerSecond;
this.maxBurst = maxBurst;
this.tokens = maxBurst;
this.lastRefill = Date.now();
}
TokenBucket.prototype.consume = function(count) {
var self = this;
this._refill();
if (this.tokens >= count) {
this.tokens -= count;
return Promise.resolve();
}
var waitTime = ((count - this.tokens) / this.tokensPerSecond) * 1000;
return new Promise(function(resolve) {
setTimeout(function() {
self._refill();
self.tokens -= count;
resolve();
}, waitTime);
});
};
TokenBucket.prototype._refill = function() {
var now = Date.now();
var elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(this.maxBurst, this.tokens + elapsed * this.tokensPerSecond);
this.lastRefill = now;
};
var apiQuota = new TokenBucket(50, 200); // 50 requests/sec, burst of 200
Circuit Breakers for Tool Dependencies
AI agents call external tools — web search APIs, databases, internal microservices. When one of those dependencies goes down, you do not want your agent to keep hammering it with requests that will time out. A circuit breaker detects failure patterns and short-circuits calls to unhealthy dependencies.
function CircuitBreaker(options) {
this.name = options.name;
this.threshold = options.threshold || 5;
this.resetTimeout = options.resetTimeout || 30000;
this.state = "closed"; // closed = normal, open = failing, half-open = testing
this.failures = 0;
this.lastFailure = null;
this.successesInHalfOpen = 0;
}
CircuitBreaker.prototype.call = function(fn) {
var self = this;
if (this.state === "open") {
if (Date.now() - this.lastFailure > this.resetTimeout) {
this.state = "half-open";
this.successesInHalfOpen = 0;
} else {
return Promise.reject(new Error(
"Circuit breaker " + this.name + " is open. Dependency unavailable."
));
}
}
return fn().then(function(result) {
if (self.state === "half-open") {
self.successesInHalfOpen++;
if (self.successesInHalfOpen >= 3) {
self.state = "closed";
self.failures = 0;
}
} else {
self.failures = 0;
}
return result;
}).catch(function(err) {
self.failures++;
self.lastFailure = Date.now();
if (self.failures >= self.threshold) {
self.state = "open";
console.log(JSON.stringify({
level: "error",
message: "Circuit breaker opened",
breaker: self.name,
failures: self.failures
}));
}
throw err;
});
};
var searchBreaker = new CircuitBreaker({ name: "web-search", threshold: 3, resetTimeout: 60000 });
var llmBreaker = new CircuitBreaker({ name: "llm-api", threshold: 5, resetTimeout: 30000 });
When the circuit is open, the agent can skip that tool and either use cached results, try a fallback tool, or inform the user that the capability is temporarily unavailable. This is much better than waiting 30 seconds for a timeout on every single request.
Blue-Green Deployments for Agent Updates
Agent behavior can change dramatically between versions. A small change to a system prompt or tool definition can completely alter the agent's output quality. Blue-green deployments let you run the old and new versions simultaneously, shift traffic gradually, and roll back instantly if the new version underperforms.
In DigitalOcean App Platform, you achieve this by deploying the new version as a separate worker pool while keeping the old workers running:
workers:
- name: agent-worker-blue
image:
repository: agent-service
tag: 1.0.0
instance_count: 3
- name: agent-worker-green
image:
repository: agent-service
tag: 1.1.0
instance_count: 1
Your task routing logic in the API determines which worker pool handles each task:
var CANARY_PERCENTAGE = 10; // send 10% of traffic to new version
function routeTask(taskId, taskType, input) {
var version = (Math.random() * 100 < CANARY_PERCENTAGE) ? "1.1.0" : "1.0.0";
return pool.query(
"UPDATE agent_tasks SET metadata = jsonb_set(COALESCE(metadata, '{}'), '{agent_version}', $1) WHERE id = $2",
[JSON.stringify(version), taskId]
).then(function() {
return redis.lpush("agent-tasks:" + version, JSON.stringify({ taskId: taskId }));
});
}
Workers subscribe to their version-specific queue. This lets you ramp up traffic to the new version gradually: 10%, 25%, 50%, 100%. At any point, if you see degraded performance, set CANARY_PERCENTAGE to 0 and all traffic goes back to the stable version.
Rollback Strategies
Rollback for agents is more nuanced than for typical web services because agent output quality is subjective and may not be immediately obvious. Here are the signals I watch:
- Task failure rate. If the new version fails more than 5% of tasks, roll back immediately.
- Average task duration. If tasks take significantly longer, the agent may be stuck in reasoning loops.
- Token usage per task. A spike in token consumption means the agent is doing more work, which may indicate confused behavior.
- User feedback scores. If you collect ratings on agent output, monitor the rolling average.
Automate the rollback decision:
function checkCanaryHealth(newVersion) {
return pool.query(
"SELECT COUNT(*) FILTER (WHERE status = 'failed') as failures, COUNT(*) FILTER (WHERE status = 'completed') as successes, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration FROM agent_tasks WHERE metadata->>'agent_version' = $1 AND created_at > NOW() - INTERVAL '1 hour'",
[newVersion]
).then(function(result) {
var stats = result.rows[0];
var total = parseInt(stats.failures) + parseInt(stats.successes);
if (total < 10) {
return { action: "wait", reason: "insufficient_data" };
}
var failureRate = parseInt(stats.failures) / total;
if (failureRate > 0.05) {
return { action: "rollback", reason: "high_failure_rate", rate: failureRate };
}
if (parseFloat(stats.avg_duration) > 120) {
return { action: "rollback", reason: "high_latency", avg_seconds: stats.avg_duration };
}
return { action: "proceed", stats: stats };
});
}
Production Logging and Structured Output
Unstructured logs are useless at scale. Every log line from your agent service should be a JSON object with consistent fields. This is non-negotiable.
function createLogger(context) {
var baseFields = {
service: "agent-service",
worker_id: context.workerId || "api",
version: process.env.APP_VERSION || "unknown"
};
return {
info: function(message, fields) {
var entry = Object.assign({}, baseFields, fields || {}, {
level: "info",
message: message,
timestamp: new Date().toISOString()
});
process.stdout.write(JSON.stringify(entry) + "\n");
},
error: function(message, fields) {
var entry = Object.assign({}, baseFields, fields || {}, {
level: "error",
message: message,
timestamp: new Date().toISOString()
});
process.stderr.write(JSON.stringify(entry) + "\n");
},
taskEvent: function(taskId, event, fields) {
var entry = Object.assign({}, baseFields, fields || {}, {
level: "info",
message: "task_event",
task_id: taskId,
event: event,
timestamp: new Date().toISOString()
});
process.stdout.write(JSON.stringify(entry) + "\n");
}
};
}
var logger = createLogger({ workerId: process.env.HOSTNAME || "local" });
Use the taskEvent method to create an audit trail for every agent task. Log when a task is claimed, when each tool is called, when the LLM responds, and when the task completes or fails. This gives you full traceability in your log aggregation platform.
Alerting on Failures and Degraded Performance
Set up alerts on these metrics at minimum:
- Task failure rate > 5% over a 5-minute window: Something is broken.
- Queue depth growing for more than 10 minutes: Workers are not keeping up.
- P95 task duration > 2x baseline: The agent is struggling.
- Circuit breaker open events: A dependency is down.
- Worker restart count > 3 in an hour: OOM kills or crashes.
A simple in-process metrics collector that you can expose for Prometheus or push to your monitoring system:
function MetricsCollector() {
this.counters = {};
this.gauges = {};
this.histograms = {};
}
MetricsCollector.prototype.increment = function(name, labels) {
var key = name + JSON.stringify(labels || {});
this.counters[key] = (this.counters[key] || 0) + 1;
};
MetricsCollector.prototype.gauge = function(name, value) {
this.gauges[name] = value;
};
MetricsCollector.prototype.observe = function(name, value) {
if (!this.histograms[name]) {
this.histograms[name] = [];
}
this.histograms[name].push(value);
// Keep last 1000 observations
if (this.histograms[name].length > 1000) {
this.histograms[name] = this.histograms[name].slice(-1000);
}
};
MetricsCollector.prototype.getSnapshot = function() {
var snapshot = {
counters: Object.assign({}, this.counters),
gauges: Object.assign({}, this.gauges),
histograms: {}
};
var self = this;
Object.keys(this.histograms).forEach(function(name) {
var values = self.histograms[name].slice().sort(function(a, b) { return a - b; });
snapshot.histograms[name] = {
count: values.length,
p50: values[Math.floor(values.length * 0.5)] || 0,
p95: values[Math.floor(values.length * 0.95)] || 0,
p99: values[Math.floor(values.length * 0.99)] || 0
};
});
return snapshot;
};
var metrics = new MetricsCollector();
app.get("/metrics", function(req, res) {
res.json(metrics.getSnapshot());
});
Complete Working Example
Here is a production-ready agent service that ties everything together: Docker deployment, health checks, graceful shutdown, queue-based task processing, circuit breakers, and structured logging.
// server.js - Production Agent Service
var express = require("express");
var pg = require("pg");
var Redis = require("ioredis");
var http = require("http");
var os = require("os");
var crypto = require("crypto");
// ---------- Configuration ----------
var PORT = process.env.PORT || 8080;
var WORKER_MODE = process.env.WORKER_MODE === "true";
var WORKER_ID = os.hostname() + "-" + crypto.randomBytes(4).toString("hex");
var MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS) || 3;
var SHUTDOWN_TIMEOUT = parseInt(process.env.SHUTDOWN_TIMEOUT) || 30000;
var QUEUE_NAME = "agent-tasks:" + (process.env.APP_VERSION || "default");
// ---------- Logger ----------
function createLogger(workerId) {
var base = { service: "agent-service", worker_id: workerId };
function write(level, message, fields) {
var entry = Object.assign({}, base, fields || {}, {
level: level,
message: message,
timestamp: new Date().toISOString()
});
var stream = level === "error" ? process.stderr : process.stdout;
stream.write(JSON.stringify(entry) + "\n");
}
return {
info: function(msg, f) { write("info", msg, f); },
warn: function(msg, f) { write("warn", msg, f); },
error: function(msg, f) { write("error", msg, f); }
};
}
var logger = createLogger(WORKER_ID);
// ---------- Database & Redis ----------
var dbPool = new pg.Pool({
connectionString: process.env.DATABASE_URL,
max: 10,
idleTimeoutMillis: 30000
});
var redis = new Redis(process.env.REDIS_URL, {
maxRetriesPerRequest: 3,
retryStrategy: function(times) {
return Math.min(times * 200, 5000);
}
});
// ---------- State ----------
var isReady = false;
var isShuttingDown = false;
var activeTaskCount = 0;
var server = null;
// ---------- Circuit Breaker ----------
function CircuitBreaker(name, threshold, resetTimeout) {
this.name = name;
this.threshold = threshold || 5;
this.resetTimeout = resetTimeout || 30000;
this.state = "closed";
this.failures = 0;
this.lastFailure = null;
}
CircuitBreaker.prototype.call = function(fn) {
var self = this;
if (this.state === "open") {
if (Date.now() - this.lastFailure > this.resetTimeout) {
this.state = "half-open";
} else {
return Promise.reject(new Error("Circuit " + this.name + " is open"));
}
}
return fn().then(function(result) {
if (self.state === "half-open") { self.state = "closed"; }
self.failures = 0;
return result;
}).catch(function(err) {
self.failures++;
self.lastFailure = Date.now();
if (self.failures >= self.threshold) {
self.state = "open";
logger.error("Circuit breaker opened", { breaker: self.name });
}
throw err;
});
};
var llmBreaker = new CircuitBreaker("llm-api", 5, 30000);
// ---------- Task Database Operations ----------
function createTask(taskType, input) {
return dbPool.query(
"INSERT INTO agent_tasks (task_type, input) VALUES ($1, $2) RETURNING id",
[taskType, JSON.stringify(input)]
).then(function(r) { return r.rows[0].id; });
}
function claimTask(taskId) {
return dbPool.query(
"UPDATE agent_tasks SET status = 'processing', worker_id = $1, started_at = NOW(), attempts = attempts + 1, updated_at = NOW() WHERE id = $2 AND (status = 'pending' OR (status = 'processing' AND updated_at < NOW() - INTERVAL '5 minutes')) RETURNING *",
[WORKER_ID, taskId]
).then(function(r) { return r.rows[0] || null; });
}
function completeTask(taskId, result) {
return dbPool.query(
"UPDATE agent_tasks SET status = 'completed', result = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2",
[JSON.stringify(result), taskId]
);
}
function failTask(taskId, error, checkpoint) {
return dbPool.query(
"UPDATE agent_tasks SET status = CASE WHEN attempts >= max_attempts THEN 'failed' ELSE 'pending' END, error = $1, checkpoint = $2, worker_id = NULL, updated_at = NOW() WHERE id = $3",
[error, checkpoint ? JSON.stringify(checkpoint) : null, taskId]
);
}
// ---------- Agent Execution ----------
function executeAgentTask(task) {
logger.info("Executing agent task", { task_id: task.id, type: task.task_type, attempt: task.attempts });
return llmBreaker.call(function() {
// Replace this with your actual agent logic:
// call LLM, execute tools, iterate until done
return callLLM(task.input, task.checkpoint);
}).then(function(result) {
return completeTask(task.id, result).then(function() {
logger.info("Task completed", { task_id: task.id, type: task.task_type });
});
}).catch(function(err) {
logger.error("Task failed", {
task_id: task.id,
error: err.message,
attempt: task.attempts
});
return failTask(task.id, err.message, null).then(function() {
if (task.attempts < task.max_attempts) {
return redis.lpush(QUEUE_NAME, JSON.stringify({ taskId: task.id }));
}
});
});
}
function callLLM(input, checkpoint) {
// Stub - replace with actual LLM integration
return new Promise(function(resolve) {
setTimeout(function() {
resolve({ response: "Agent completed task", input: input });
}, 2000);
});
}
// ---------- Worker Loop ----------
function startWorkerLoop() {
function poll() {
if (isShuttingDown || activeTaskCount >= MAX_CONCURRENT_TASKS) {
setTimeout(poll, 1000);
return;
}
redis.brpop(QUEUE_NAME, 5, function(err, result) {
if (err) {
logger.error("Queue poll error", { error: err.message });
setTimeout(poll, 2000);
return;
}
if (!result) {
poll();
return;
}
var message = JSON.parse(result[1]);
activeTaskCount++;
claimTask(message.taskId).then(function(task) {
if (!task) {
activeTaskCount--;
poll();
return;
}
return executeAgentTask(task).then(function() {
activeTaskCount--;
poll();
});
}).catch(function(err) {
activeTaskCount--;
logger.error("Worker error", { error: err.message });
poll();
});
});
}
poll();
logger.info("Worker loop started", { max_concurrent: MAX_CONCURRENT_TASKS, queue: QUEUE_NAME });
}
// ---------- HTTP API ----------
var app = express();
app.use(express.json());
app.get("/health", function(req, res) {
if (isShuttingDown) {
return res.status(503).json({ status: "shutting_down" });
}
res.json({ status: "ok", worker_id: WORKER_ID, uptime: process.uptime() });
});
app.get("/ready", function(req, res) {
if (!isReady || isShuttingDown) {
return res.status(503).json({ status: "not_ready" });
}
res.json({ status: "ready", active_tasks: activeTaskCount });
});
app.post("/api/tasks", function(req, res) {
if (isShuttingDown) {
return res.status(503).json({ error: "service_shutting_down" });
}
var taskType = req.body.task_type;
var input = req.body.input;
if (!taskType || !input) {
return res.status(400).json({ error: "task_type and input are required" });
}
createTask(taskType, input).then(function(taskId) {
return redis.lpush(QUEUE_NAME, JSON.stringify({ taskId: taskId })).then(function() {
logger.info("Task created", { task_id: taskId, type: taskType });
res.status(201).json({ task_id: taskId, status: "pending" });
});
}).catch(function(err) {
logger.error("Failed to create task", { error: err.message });
res.status(500).json({ error: "internal_error" });
});
});
app.get("/api/tasks/:id", function(req, res) {
dbPool.query("SELECT id, task_type, status, result, error, attempts, created_at, completed_at FROM agent_tasks WHERE id = $1", [req.params.id])
.then(function(r) {
if (r.rows.length === 0) {
return res.status(404).json({ error: "task_not_found" });
}
res.json(r.rows[0]);
}).catch(function(err) {
res.status(500).json({ error: "internal_error" });
});
});
// ---------- Graceful Shutdown ----------
function gracefulShutdown(signal) {
logger.info("Shutdown signal received", { signal: signal, active_tasks: activeTaskCount });
isShuttingDown = true;
if (server) {
server.close(function() {
logger.info("HTTP server closed");
});
}
var shutdownCheck = setInterval(function() {
if (activeTaskCount === 0) {
clearInterval(shutdownCheck);
logger.info("Clean shutdown complete");
dbPool.end().then(function() {
redis.quit().then(function() {
process.exit(0);
});
});
}
}, 1000);
setTimeout(function() {
logger.warn("Shutdown timeout, forcing exit", { abandoned_tasks: activeTaskCount });
process.exit(1);
}, SHUTDOWN_TIMEOUT);
}
process.on("SIGTERM", function() { gracefulShutdown("SIGTERM"); });
process.on("SIGINT", function() { gracefulShutdown("SIGINT"); });
// ---------- Startup ----------
function start() {
dbPool.query("SELECT 1").then(function() {
logger.info("Database connected");
return redis.ping();
}).then(function() {
logger.info("Redis connected");
isReady = true;
if (WORKER_MODE) {
startWorkerLoop();
}
server = http.createServer(app);
server.listen(PORT, function() {
logger.info("Server started", { port: PORT, mode: WORKER_MODE ? "worker" : "api" });
});
}).catch(function(err) {
logger.error("Startup failed", { error: err.message });
process.exit(1);
});
}
start();
This single file can run in both modes. Set WORKER_MODE=true for background task processing, or leave it unset for the HTTP API. Both modes include health checks, graceful shutdown, and structured logging.
Common Issues and Troubleshooting
1. Tasks Stuck in "processing" State Forever
ERROR: agent_tasks rows with status='processing' and updated_at older than 30 minutes
This happens when a worker dies without completing or failing the task. The claimTask function above includes a stale-task recovery clause (updated_at < NOW() - INTERVAL '5 minutes'), but you also need a periodic cleanup job:
function recoverStaleTasks() {
return dbPool.query(
"UPDATE agent_tasks SET status = 'pending', worker_id = NULL, updated_at = NOW() WHERE status = 'processing' AND updated_at < NOW() - INTERVAL '10 minutes' RETURNING id"
).then(function(result) {
result.rows.forEach(function(row) {
logger.warn("Recovered stale task", { task_id: row.id });
redis.lpush(QUEUE_NAME, JSON.stringify({ taskId: row.id }));
});
});
}
setInterval(recoverStaleTasks, 60000);
2. Redis Connection Drops Under Load
Error: connect ECONNREFUSED 127.0.0.1:6379
ReplyError: LOADING Redis is loading the dataset in memory
This usually means your Redis instance is too small or is restarting due to memory pressure. Agent task messages can be large if you include full context. Keep queue messages minimal (just the task ID), and store the full payload in PostgreSQL. Also configure the Redis client with retry logic and reconnection:
var redis = new Redis(process.env.REDIS_URL, {
maxRetriesPerRequest: 3,
retryStrategy: function(times) {
if (times > 10) {
logger.error("Redis retry limit exceeded");
return null; // stop retrying
}
return Math.min(times * 200, 5000);
},
reconnectOnError: function(err) {
return err.message.indexOf("READONLY") !== -1;
}
});
3. LLM API Rate Limit Errors Cascade into Mass Task Failures
Error: 429 Too Many Requests - Rate limit exceeded for model gpt-4
Error: 429 Rate limit reached for claude-3 in organization org-xxx
When multiple workers hit the LLM API simultaneously, you can exhaust your rate limit in seconds. The circuit breaker helps, but you also need distributed rate limiting. Use a Redis-based rate limiter that all workers share:
function distributedRateLimit(key, maxPerMinute) {
var now = Date.now();
return redis.multi()
.zremrangebyscore(key, 0, now - 60000)
.zcard(key)
.exec()
.then(function(results) {
var count = results[1][1];
if (count >= maxPerMinute) {
var waitMs = Math.ceil(60000 / maxPerMinute);
return new Promise(function(resolve) {
setTimeout(resolve, waitMs);
}).then(function() {
return distributedRateLimit(key, maxPerMinute);
});
}
return redis.zadd(key, now, now + ":" + Math.random());
});
}
4. Container OOM Killed During Large Agent Tasks
SIGKILL received - container killed by OOM
dmesg: Out of memory: Kill process (node) score 950
Agent tasks that involve large context windows or multiple tool call results can consume significant memory. A single agent conversation with 128k tokens of context can use 500MB+ of memory when including the HTTP response buffers. Solutions:
- Set
--max-old-space-sizein your Dockerfile:CMD ["node", "--max-old-space-size=512", "server.js"] - Stream LLM responses instead of buffering them entirely in memory.
- Limit the number of concurrent tasks per worker. Three concurrent tasks on a 1GB container is a good starting point.
- Truncate conversation history aggressively. Keep the system prompt and last N messages, not the entire conversation.
Best Practices
Separate your API from your workers. The API should be fast and lightweight. Workers do the heavy lifting. Scale them independently based on queue depth and task latency.
Make every task idempotent. Workers will retry failed tasks. If a task creates a resource, check whether it already exists before creating it again. Use the task ID as an idempotency key.
Set aggressive timeouts on LLM calls. A single hung LLM request can block a worker for minutes. Set a 60-second timeout on inference calls and a 120-second timeout on the overall task. If it hasn't completed by then, checkpoint and retry.
Log every state transition. When a task moves from pending to processing to completed, log it with the task ID, worker ID, and duration. When an agent calls a tool, log the tool name, latency, and whether it succeeded. This data is invaluable when debugging production issues.
Use canary deployments, not big-bang releases. Agent behavior is harder to test in staging because it depends on real-world input diversity. Send 5-10% of traffic to the new version, monitor for 30 minutes, then ramp up. Have automated rollback triggers.
Keep queue messages small. Store the full task payload in PostgreSQL and only put the task ID in the Redis queue. This keeps Redis memory usage predictable and makes your queue reliable under load.
Implement dead letter queues. After a task has failed its maximum number of retries, move it to a dead letter queue instead of discarding it. This gives you a record of systematic failures and lets you replay tasks after fixing the underlying issue.
Monitor token spend per task. LLM costs can spiral quickly in production. Track the token count for each task, set budget limits per task type, and alert when the average token consumption per task increases by more than 20%. A confused agent that loops burns money fast.
Test shutdown behavior explicitly. Do not assume graceful shutdown works. In your staging environment, send SIGTERM to a worker that is processing tasks and verify that tasks complete successfully. Docker's
docker stopcommand is an easy way to test this.