Building Agent APIs with Express.js
Build REST APIs for AI agents with Express.js including session management, SSE streaming, authentication, and rate limiting.
Building Agent APIs with Express.js
Overview
AI agents need well-structured APIs to receive tasks, stream responses, and report status. Express.js is a natural fit for building these APIs because it gives you full control over request handling, middleware composition, and response streaming without abstracting away the HTTP fundamentals that agent protocols depend on. In this article, we will build a production-grade agent API server from scratch, covering session management, Server-Sent Events streaming, task polling, authentication, rate limiting, and OpenAPI documentation.
Prerequisites
- Node.js v18 or later installed
- Familiarity with Express.js routing and middleware
- Basic understanding of how LLM-based agents work (tool calling, multi-step reasoning)
- A working OpenAI or Anthropic API key for the agent backend
- npm packages:
express,express-validator,express-rate-limit,uuid,cors,swagger-ui-express,jsonwebtoken,ws
Install everything up front:
npm install express express-validator express-rate-limit uuid cors swagger-ui-express jsonwebtoken ws
Designing REST APIs for Agent Interactions
Agent APIs differ from traditional CRUD APIs in several important ways. First, agent tasks are inherently asynchronous. A user submits a task, the agent reasons for seconds or minutes, and the result comes back later. Second, agents maintain conversational state across multiple interactions within a session. Third, agents produce streaming output as they think, which users expect to see in real time.
The resource model I have found works best in production looks like this:
POST /api/v1/agents/sessions Create a new agent session
GET /api/v1/agents/sessions/:id Get session details
DELETE /api/v1/agents/sessions/:id End a session
POST /api/v1/agents/sessions/:id/tasks Submit a task to a session
GET /api/v1/agents/tasks/:taskId Poll task status
GET /api/v1/agents/tasks/:taskId/stream Stream task output via SSE
GET /api/v1/agents/capabilities List available agent tools
Sessions hold conversational context. Tasks represent individual units of work within a session. This separation matters because it lets you run multiple tasks in a session sequentially (the agent remembers previous results) or tear down a session and start fresh.
Creating Agent Sessions
Sessions are the entry point for all agent interactions. When a client creates a session, the server allocates a conversation context, assigns a unique ID, and optionally configures which tools the agent can use.
var express = require("express");
var { v4: uuidv4 } = require("uuid");
var router = express.Router();
// In-memory session store (use Redis in production)
var sessions = {};
router.post("/sessions", function (req, res) {
var sessionId = uuidv4();
var session = {
id: sessionId,
userId: req.user.id,
createdAt: new Date().toISOString(),
status: "active",
tools: req.body.tools || ["web_search", "code_execution", "file_read"],
messages: [],
tasks: []
};
sessions[sessionId] = session;
res.status(201).json({
id: session.id,
status: session.status,
tools: session.tools,
createdAt: session.createdAt
});
});
router.get("/sessions/:id", function (req, res) {
var session = sessions[req.params.id];
if (!session) {
return res.status(404).json({ error: "Session not found" });
}
if (session.userId !== req.user.id) {
return res.status(403).json({ error: "Access denied" });
}
res.json(session);
});
router.delete("/sessions/:id", function (req, res) {
var session = sessions[req.params.id];
if (!session) {
return res.status(404).json({ error: "Session not found" });
}
session.status = "terminated";
delete sessions[req.params.id];
res.status(204).end();
});
module.exports = router;
In production, I store sessions in Redis with a TTL of 30 minutes. In-memory storage works for development but falls apart the moment you run multiple instances behind a load balancer.
Submitting Tasks to Agents
Tasks are where the real work happens. A task includes the user's prompt, gets queued for processing, and eventually produces a result. The key design decision is making task submission asynchronous: the POST returns immediately with a task ID, and the client polls or streams for results.
var tasks = {};
router.post("/sessions/:id/tasks", function (req, res) {
var session = sessions[req.params.id];
if (!session) {
return res.status(404).json({ error: "Session not found" });
}
var taskId = uuidv4();
var task = {
id: taskId,
sessionId: session.id,
prompt: req.body.prompt,
status: "queued",
result: null,
steps: [],
createdAt: new Date().toISOString(),
completedAt: null
};
tasks[taskId] = task;
session.tasks.push(taskId);
// Add the user message to session history
session.messages.push({ role: "user", content: req.body.prompt });
// Process the task asynchronously
processTask(task, session);
res.status(202).json({
taskId: task.id,
status: task.status,
pollUrl: "/api/v1/agents/tasks/" + taskId,
streamUrl: "/api/v1/agents/tasks/" + taskId + "/stream"
});
});
function processTask(task, session) {
task.status = "running";
// Simulate agent processing (replace with actual LLM calls)
callAgent(session.messages, session.tools)
.then(function (result) {
task.status = "completed";
task.result = result.content;
task.steps = result.steps;
task.completedAt = new Date().toISOString();
session.messages.push({ role: "assistant", content: result.content });
})
.catch(function (err) {
task.status = "failed";
task.result = { error: err.message };
task.completedAt = new Date().toISOString();
});
}
The 202 Accepted response is intentional. It tells the client that the request was received and is being processed, but the result is not ready yet. This is the correct HTTP semantics for async operations. I have seen too many agent APIs return 200 and block the request for 30+ seconds while the agent thinks. Do not do that.
Streaming Agent Responses via SSE
Server-Sent Events are the simplest way to stream agent output to browser clients. Unlike WebSockets, SSE works over standard HTTP, passes through proxies and load balancers without special configuration, and automatically reconnects on failure.
var EventEmitter = require("events");
var taskEmitters = {};
function getTaskEmitter(taskId) {
if (!taskEmitters[taskId]) {
taskEmitters[taskId] = new EventEmitter();
}
return taskEmitters[taskId];
}
router.get("/tasks/:taskId/stream", function (req, res) {
var task = tasks[req.params.taskId];
if (!task) {
return res.status(404).json({ error: "Task not found" });
}
// Set SSE headers
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
});
// Send initial status
res.write("event: status\n");
res.write("data: " + JSON.stringify({ status: task.status }) + "\n\n");
// If task is already complete, send result and close
if (task.status === "completed" || task.status === "failed") {
res.write("event: result\n");
res.write("data: " + JSON.stringify({ result: task.result }) + "\n\n");
res.end();
return;
}
var emitter = getTaskEmitter(task.id);
var onStep = function (step) {
res.write("event: step\n");
res.write("data: " + JSON.stringify(step) + "\n\n");
};
var onToken = function (token) {
res.write("event: token\n");
res.write("data: " + JSON.stringify({ token: token }) + "\n\n");
};
var onComplete = function (result) {
res.write("event: result\n");
res.write("data: " + JSON.stringify({ result: result }) + "\n\n");
res.write("event: done\n");
res.write("data: {}\n\n");
cleanup();
res.end();
};
var onError = function (error) {
res.write("event: error\n");
res.write("data: " + JSON.stringify({ error: error.message }) + "\n\n");
cleanup();
res.end();
};
function cleanup() {
emitter.removeListener("step", onStep);
emitter.removeListener("token", onToken);
emitter.removeListener("complete", onComplete);
emitter.removeListener("error", onError);
}
emitter.on("step", onStep);
emitter.on("token", onToken);
emitter.on("complete", onComplete);
emitter.on("error", onError);
// Clean up if client disconnects
req.on("close", function () {
cleanup();
});
});
The X-Accel-Buffering: no header is critical if you run behind nginx. Without it, nginx buffers the entire response and your client sees nothing until the stream ends, which defeats the purpose entirely. I spent two hours debugging this in production before I realized nginx was the culprit.
On the client side, consuming the SSE stream is straightforward:
var eventSource = new EventSource("/api/v1/agents/tasks/" + taskId + "/stream");
eventSource.addEventListener("token", function (e) {
var data = JSON.parse(e.data);
document.getElementById("output").textContent += data.token;
});
eventSource.addEventListener("step", function (e) {
var step = JSON.parse(e.data);
console.log("Agent step:", step.tool, step.input);
});
eventSource.addEventListener("done", function () {
eventSource.close();
});
eventSource.addEventListener("error", function () {
eventSource.close();
});
Polling for Task Status
Not every client can use SSE. Mobile apps, CLI tools, and some enterprise environments need a simple polling endpoint. The key is returning enough metadata that the client can make smart decisions about poll frequency.
router.get("/tasks/:taskId", function (req, res) {
var task = tasks[req.params.taskId];
if (!task) {
return res.status(404).json({ error: "Task not found" });
}
var response = {
id: task.id,
sessionId: task.sessionId,
status: task.status,
createdAt: task.createdAt,
completedAt: task.completedAt,
stepsCompleted: task.steps.length
};
if (task.status === "completed") {
response.result = task.result;
} else if (task.status === "failed") {
response.error = task.result;
}
// Suggest poll interval based on how long the task has been running
var elapsed = Date.now() - new Date(task.createdAt).getTime();
if (elapsed < 5000) {
response.retryAfter = 1;
} else if (elapsed < 30000) {
response.retryAfter = 3;
} else {
response.retryAfter = 5;
}
res.json(response);
});
I include a retryAfter field to hint at how often the client should poll. Early in the task, poll every second. After 5 seconds, back off to every 3 seconds. After 30 seconds, every 5. This keeps the API responsive without hammering the server.
Listing Agent Capabilities and Tools
Clients need to know what an agent can do before submitting tasks. A capabilities endpoint describes the available tools, supported models, and any constraints.
var AGENT_CAPABILITIES = {
tools: [
{
name: "web_search",
description: "Search the web for current information",
parameters: {
query: { type: "string", required: true },
maxResults: { type: "integer", default: 5 }
}
},
{
name: "code_execution",
description: "Execute JavaScript code in a sandboxed environment",
parameters: {
code: { type: "string", required: true },
timeout: { type: "integer", default: 30000 }
}
},
{
name: "file_read",
description: "Read file contents from the workspace",
parameters: {
path: { type: "string", required: true }
}
}
],
models: ["gpt-4o", "claude-sonnet-4"],
maxSessionDuration: 1800,
maxTasksPerSession: 50,
maxConcurrentTasks: 3
};
router.get("/capabilities", function (req, res) {
res.json(AGENT_CAPABILITIES);
});
This endpoint is essentially a service discovery mechanism. I have found it invaluable for building dynamic UIs that adapt to what the agent can actually do rather than hardcoding tool lists on the frontend.
Implementing Authentication and Per-User Agent Limits
Agent APIs consume expensive resources (LLM tokens, compute time), so authentication and usage limits are not optional. I use JWT tokens for authentication and track per-user quotas in memory (Redis in production).
var jwt = require("jsonwebtoken");
var JWT_SECRET = process.env.JWT_SECRET || "dev-secret-change-me";
var userQuotas = {};
function authMiddleware(req, res, next) {
var authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return res.status(401).json({ error: "Missing or invalid authorization header" });
}
var token = authHeader.split(" ")[1];
try {
var decoded = jwt.verify(token, JWT_SECRET);
req.user = decoded;
// Initialize quota tracking
if (!userQuotas[decoded.id]) {
userQuotas[decoded.id] = {
sessionsToday: 0,
tasksToday: 0,
lastReset: new Date().toDateString()
};
}
// Reset daily quotas
var quota = userQuotas[decoded.id];
if (quota.lastReset !== new Date().toDateString()) {
quota.sessionsToday = 0;
quota.tasksToday = 0;
quota.lastReset = new Date().toDateString();
}
next();
} catch (err) {
return res.status(401).json({ error: "Invalid or expired token" });
}
}
function quotaMiddleware(resource) {
return function (req, res, next) {
var quota = userQuotas[req.user.id];
var plan = req.user.plan || "free";
var limits = {
free: { sessionsPerDay: 5, tasksPerDay: 20 },
pro: { sessionsPerDay: 50, tasksPerDay: 500 },
enterprise: { sessionsPerDay: 1000, tasksPerDay: 10000 }
};
var userLimits = limits[plan] || limits.free;
if (resource === "session" && quota.sessionsToday >= userLimits.sessionsPerDay) {
return res.status(429).json({
error: "Daily session limit reached",
limit: userLimits.sessionsPerDay,
resetsAt: getNextMidnight()
});
}
if (resource === "task" && quota.tasksToday >= userLimits.tasksPerDay) {
return res.status(429).json({
error: "Daily task limit reached",
limit: userLimits.tasksPerDay,
resetsAt: getNextMidnight()
});
}
next();
};
}
function getNextMidnight() {
var tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
tomorrow.setHours(0, 0, 0, 0);
return tomorrow.toISOString();
}
The quota middleware is separate from rate limiting. Rate limiting protects the server from abuse. Quotas enforce business rules about how many agent interactions each pricing tier allows. Both are necessary.
Rate Limiting Agent API Endpoints
Agent endpoints are expensive to process, so rate limiting needs to be aggressive compared to a typical REST API. I apply different limits to different endpoint categories.
var rateLimit = require("express-rate-limit");
var sessionLimiter = rateLimit({
windowMs: 60 * 1000,
max: 10,
message: { error: "Too many session creation requests. Try again in a minute." },
standardHeaders: true,
legacyHeaders: false,
keyGenerator: function (req) {
return req.user ? req.user.id : req.ip;
}
});
var taskLimiter = rateLimit({
windowMs: 60 * 1000,
max: 30,
message: { error: "Too many task submissions. Slow down." },
standardHeaders: true,
legacyHeaders: false,
keyGenerator: function (req) {
return req.user ? req.user.id : req.ip;
}
});
var pollLimiter = rateLimit({
windowMs: 1000,
max: 10,
message: { error: "Polling too frequently. Use the retryAfter hint." },
standardHeaders: true,
legacyHeaders: false,
keyGenerator: function (req) {
return req.user ? req.user.id : req.ip;
}
});
Notice I key rate limits on req.user.id when available, falling back to IP. This prevents one user from exhausting the rate limit for everyone behind a corporate NAT.
WebSocket Support for Real-Time Agent Interaction
SSE works well for one-directional streaming, but some use cases need bidirectional communication. A user might want to interrupt an agent, provide additional input mid-task, or send rapid-fire messages in a conversational flow. WebSockets handle this cleanly.
var WebSocket = require("ws");
var http = require("http");
function setupWebSocket(server) {
var wss = new WebSocket.Server({ server: server, path: "/ws/agents" });
wss.on("connection", function (ws, req) {
var token = new URL(req.url, "http://localhost").searchParams.get("token");
try {
var user = jwt.verify(token, JWT_SECRET);
ws.user = user;
} catch (err) {
ws.close(4001, "Authentication failed");
return;
}
var sessionId = null;
ws.on("message", function (raw) {
var msg;
try {
msg = JSON.parse(raw);
} catch (e) {
ws.send(JSON.stringify({ type: "error", message: "Invalid JSON" }));
return;
}
if (msg.type === "create_session") {
sessionId = uuidv4();
sessions[sessionId] = {
id: sessionId,
userId: ws.user.id,
createdAt: new Date().toISOString(),
status: "active",
tools: msg.tools || ["web_search"],
messages: [],
tasks: []
};
ws.send(JSON.stringify({ type: "session_created", sessionId: sessionId }));
}
if (msg.type === "submit_task" && sessionId) {
var session = sessions[sessionId];
if (!session) {
ws.send(JSON.stringify({ type: "error", message: "Session not found" }));
return;
}
var taskId = uuidv4();
session.messages.push({ role: "user", content: msg.prompt });
ws.send(JSON.stringify({ type: "task_started", taskId: taskId }));
// Stream tokens back over WebSocket
streamAgentResponse(session, function (event, data) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: event, taskId: taskId, data: data }));
}
});
}
if (msg.type === "cancel_task") {
ws.send(JSON.stringify({ type: "task_cancelled", taskId: msg.taskId }));
}
});
ws.on("close", function () {
if (sessionId && sessions[sessionId]) {
sessions[sessionId].status = "disconnected";
}
});
});
return wss;
}
I pass the JWT token as a query parameter on the WebSocket URL because the WebSocket API in browsers does not support custom headers. This is a well-established pattern. Just make sure your token is short-lived if you go this route.
Handling Long-Running Agent Tasks with Async Patterns
Some agent tasks take minutes, not seconds. A research task that searches the web, reads multiple documents, and synthesizes a report might run for two to five minutes. You need a robust async pattern to handle this without tying up Express connections.
var taskQueue = [];
var MAX_CONCURRENT = 3;
var running = 0;
function enqueueTask(task, session) {
return new Promise(function (resolve, reject) {
taskQueue.push({
task: task,
session: session,
resolve: resolve,
reject: reject
});
processQueue();
});
}
function processQueue() {
while (running < MAX_CONCURRENT && taskQueue.length > 0) {
var item = taskQueue.shift();
running++;
executeAgentTask(item.task, item.session)
.then(function (result) {
running--;
item.resolve(result);
processQueue();
})
.catch(function (err) {
running--;
item.reject(err);
processQueue();
});
}
}
function executeAgentTask(task, session) {
var emitter = getTaskEmitter(task.id);
return new Promise(function (resolve, reject) {
var timeout = setTimeout(function () {
task.status = "timeout";
emitter.emit("error", new Error("Task timed out after 5 minutes"));
reject(new Error("Task timed out"));
}, 5 * 60 * 1000);
callAgent(session.messages, session.tools, {
onStep: function (step) {
task.steps.push(step);
emitter.emit("step", step);
},
onToken: function (token) {
emitter.emit("token", token);
}
})
.then(function (result) {
clearTimeout(timeout);
task.status = "completed";
task.result = result.content;
task.completedAt = new Date().toISOString();
emitter.emit("complete", result.content);
resolve(result);
})
.catch(function (err) {
clearTimeout(timeout);
task.status = "failed";
task.result = { error: err.message };
emitter.emit("error", err);
reject(err);
});
});
}
The concurrency limiter is critical. Without it, a burst of 50 simultaneous task submissions will spawn 50 concurrent LLM API calls, exhaust your rate limits, and probably crash your process. I have seen this happen in production when a client integration had a retry loop with no backoff. Three concurrent tasks is a reasonable default; adjust based on your LLM provider's rate limits.
Request Validation with express-validator
Never trust client input, especially when it gets forwarded to an LLM. Prompt injection is a real concern, but basic validation catches the obvious problems first.
var { body, param, validationResult } = require("express-validator");
var validateTaskSubmission = [
body("prompt")
.isString()
.trim()
.isLength({ min: 1, max: 10000 })
.withMessage("Prompt must be between 1 and 10000 characters"),
body("tools")
.optional()
.isArray()
.withMessage("Tools must be an array"),
body("tools.*")
.optional()
.isString()
.isIn(["web_search", "code_execution", "file_read"])
.withMessage("Invalid tool specified")
];
var validateSessionId = [
param("id")
.isUUID(4)
.withMessage("Invalid session ID format")
];
function handleValidationErrors(req, res, next) {
var errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({
error: "Validation failed",
details: errors.array().map(function (e) {
return { field: e.path, message: e.msg };
})
});
}
next();
}
// Usage in routes
router.post(
"/sessions/:id/tasks",
validateSessionId,
validateTaskSubmission,
handleValidationErrors,
function (req, res) {
// Handle validated request
}
);
The 10,000 character limit on prompts is intentional. Long prompts consume more tokens and increase costs. Set this limit based on what your pricing model supports. I have seen free-tier users submit 50KB prompts with entire codebases pasted in; the character limit prevents that.
API Versioning for Agent Endpoints
Agent APIs evolve rapidly. The tool schema changes, response formats shift, and new capabilities get added. Version your API from day one to avoid breaking existing clients.
var v1Router = require("./routes/v1/agents");
var v2Router = require("./routes/v2/agents");
app.use("/api/v1/agents", authMiddleware, v1Router);
app.use("/api/v2/agents", authMiddleware, v2Router);
// Redirect unversioned to latest stable
app.use("/api/agents", function (req, res) {
res.redirect(307, "/api/v1/agents" + req.url);
});
I use URL path versioning because it is explicit, easy to route, and works with any HTTP client. Header-based versioning (Accept headers) is more RESTful in theory but creates debugging headaches in practice. When a support ticket comes in, a URL like /api/v1/agents/sessions is immediately understandable. A header like Accept: application/vnd.agents.v1+json requires reading the full request logs.
CORS Configuration for Browser-Based Agent Clients
If your agent API serves browser-based clients (and it will), CORS configuration needs to be precise. Do not just set Access-Control-Allow-Origin: * and call it done.
var cors = require("cors");
var corsOptions = {
origin: function (origin, callback) {
var allowedOrigins = [
"https://app.yoursite.com",
"https://dashboard.yoursite.com"
];
// Allow requests with no origin (mobile apps, Postman, etc.)
if (!origin) {
return callback(null, true);
}
if (allowedOrigins.indexOf(origin) !== -1) {
callback(null, true);
} else {
callback(new Error("Not allowed by CORS"));
}
},
credentials: true,
methods: ["GET", "POST", "DELETE", "OPTIONS"],
allowedHeaders: ["Content-Type", "Authorization"],
exposedHeaders: ["X-RateLimit-Remaining", "X-RateLimit-Reset"],
maxAge: 86400
};
app.use("/api", cors(corsOptions));
Expose the rate limit headers so your frontend can show users how many requests they have left. The maxAge of 86400 seconds (24 hours) caches preflight responses so the browser does not send an OPTIONS request before every actual request. This cuts your request volume roughly in half for browser clients.
Implementing Agent API Documentation with OpenAPI/Swagger
Document your agent API with OpenAPI so clients can auto-generate SDKs and explore endpoints interactively.
var swaggerUi = require("swagger-ui-express");
var openApiSpec = {
openapi: "3.0.3",
info: {
title: "Agent API",
version: "1.0.0",
description: "REST API for interacting with AI agents"
},
servers: [
{ url: "/api/v1", description: "Production" }
],
paths: {
"/agents/sessions": {
post: {
summary: "Create a new agent session",
tags: ["Sessions"],
security: [{ bearerAuth: [] }],
requestBody: {
content: {
"application/json": {
schema: {
type: "object",
properties: {
tools: {
type: "array",
items: { type: "string" },
example: ["web_search", "code_execution"]
}
}
}
}
}
},
responses: {
"201": { description: "Session created" },
"401": { description: "Unauthorized" },
"429": { description: "Rate limit exceeded" }
}
}
},
"/agents/sessions/{id}/tasks": {
post: {
summary: "Submit a task to an agent session",
tags: ["Tasks"],
security: [{ bearerAuth: [] }],
parameters: [
{ name: "id", in: "path", required: true, schema: { type: "string", format: "uuid" } }
],
requestBody: {
content: {
"application/json": {
schema: {
type: "object",
required: ["prompt"],
properties: {
prompt: { type: "string", maxLength: 10000 }
}
}
}
}
},
responses: {
"202": { description: "Task accepted for processing" },
"404": { description: "Session not found" }
}
}
},
"/agents/tasks/{taskId}": {
get: {
summary: "Get task status and result",
tags: ["Tasks"],
security: [{ bearerAuth: [] }],
parameters: [
{ name: "taskId", in: "path", required: true, schema: { type: "string", format: "uuid" } }
],
responses: {
"200": { description: "Task status" },
"404": { description: "Task not found" }
}
}
},
"/agents/tasks/{taskId}/stream": {
get: {
summary: "Stream task output via Server-Sent Events",
tags: ["Tasks"],
security: [{ bearerAuth: [] }],
parameters: [
{ name: "taskId", in: "path", required: true, schema: { type: "string", format: "uuid" } }
],
responses: {
"200": {
description: "SSE stream of task events",
content: { "text/event-stream": {} }
}
}
}
},
"/agents/capabilities": {
get: {
summary: "List available agent capabilities and tools",
tags: ["Discovery"],
security: [{ bearerAuth: [] }],
responses: {
"200": { description: "Agent capabilities" }
}
}
}
},
components: {
securitySchemes: {
bearerAuth: {
type: "http",
scheme: "bearer",
bearerFormat: "JWT"
}
}
}
};
app.use("/docs", swaggerUi.serve, swaggerUi.setup(openApiSpec));
Complete Working Example
Here is a full Express.js agent API server that ties everything together. Save this as server.js and run it with node server.js.
var express = require("express");
var http = require("http");
var cors = require("cors");
var rateLimit = require("express-rate-limit");
var jwt = require("jsonwebtoken");
var { v4: uuidv4 } = require("uuid");
var { body, param, validationResult } = require("express-validator");
var WebSocket = require("ws");
var swaggerUi = require("swagger-ui-express");
var EventEmitter = require("events");
var app = express();
var server = http.createServer(app);
var JWT_SECRET = process.env.JWT_SECRET || "dev-secret-change-me";
var PORT = process.env.PORT || 3000;
// --------------- Data Stores ---------------
var sessions = {};
var tasks = {};
var taskEmitters = {};
var userQuotas = {};
var taskQueue = [];
var runningTasks = 0;
var MAX_CONCURRENT = 3;
// --------------- Middleware ---------------
app.use(express.json({ limit: "1mb" }));
app.use(cors({
origin: function (origin, callback) {
if (!origin) return callback(null, true);
var allowed = (process.env.ALLOWED_ORIGINS || "http://localhost:3000").split(",");
if (allowed.indexOf(origin) !== -1) {
callback(null, true);
} else {
callback(new Error("Not allowed by CORS"));
}
},
credentials: true,
exposedHeaders: ["X-RateLimit-Remaining", "X-RateLimit-Reset"]
}));
// --------------- Auth ---------------
function authMiddleware(req, res, next) {
var authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return res.status(401).json({ error: "Missing or invalid authorization header" });
}
try {
req.user = jwt.verify(authHeader.split(" ")[1], JWT_SECRET);
if (!userQuotas[req.user.id]) {
userQuotas[req.user.id] = { sessionsToday: 0, tasksToday: 0, lastReset: new Date().toDateString() };
}
var q = userQuotas[req.user.id];
if (q.lastReset !== new Date().toDateString()) {
q.sessionsToday = 0;
q.tasksToday = 0;
q.lastReset = new Date().toDateString();
}
next();
} catch (err) {
return res.status(401).json({ error: "Invalid or expired token" });
}
}
// --------------- Rate Limiters ---------------
var sessionLimiter = rateLimit({
windowMs: 60000, max: 10,
message: { error: "Too many session requests" },
keyGenerator: function (req) { return req.user ? req.user.id : req.ip; }
});
var taskLimiter = rateLimit({
windowMs: 60000, max: 30,
message: { error: "Too many task submissions" },
keyGenerator: function (req) { return req.user ? req.user.id : req.ip; }
});
// --------------- Helpers ---------------
function getTaskEmitter(taskId) {
if (!taskEmitters[taskId]) {
taskEmitters[taskId] = new EventEmitter();
}
return taskEmitters[taskId];
}
function simulateAgent(messages, tools, callbacks) {
return new Promise(function (resolve) {
var response = "Based on my analysis, here is what I found:\n\n";
var words = response.split("");
var index = 0;
callbacks.onStep({ tool: "reasoning", input: messages[messages.length - 1].content });
var interval = setInterval(function () {
if (index < words.length) {
callbacks.onToken(words[index]);
index++;
} else {
clearInterval(interval);
resolve({ content: response, steps: [{ tool: "reasoning", result: "Analysis complete" }] });
}
}, 20);
});
}
function processQueue() {
while (runningTasks < MAX_CONCURRENT && taskQueue.length > 0) {
var item = taskQueue.shift();
runningTasks++;
executeTask(item.task, item.session).finally(function () {
runningTasks--;
processQueue();
});
}
}
function executeTask(task, session) {
var emitter = getTaskEmitter(task.id);
task.status = "running";
return new Promise(function (resolve) {
var timeout = setTimeout(function () {
task.status = "timeout";
task.completedAt = new Date().toISOString();
emitter.emit("error", new Error("Task timed out after 5 minutes"));
resolve();
}, 5 * 60 * 1000);
simulateAgent(session.messages, session.tools, {
onStep: function (step) {
task.steps.push(step);
emitter.emit("step", step);
},
onToken: function (token) {
emitter.emit("token", token);
}
}).then(function (result) {
clearTimeout(timeout);
task.status = "completed";
task.result = result.content;
task.completedAt = new Date().toISOString();
session.messages.push({ role: "assistant", content: result.content });
emitter.emit("complete", result.content);
resolve();
}).catch(function (err) {
clearTimeout(timeout);
task.status = "failed";
task.result = { error: err.message };
task.completedAt = new Date().toISOString();
emitter.emit("error", err);
resolve();
});
});
}
// --------------- Validation ---------------
function handleValidation(req, res, next) {
var errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({
error: "Validation failed",
details: errors.array().map(function (e) { return { field: e.path, message: e.msg }; })
});
}
next();
}
// --------------- Routes ---------------
var router = express.Router();
// Create session
router.post("/sessions",
sessionLimiter,
body("tools").optional().isArray(),
handleValidation,
function (req, res) {
var id = uuidv4();
var quota = userQuotas[req.user.id];
quota.sessionsToday++;
sessions[id] = {
id: id,
userId: req.user.id,
createdAt: new Date().toISOString(),
status: "active",
tools: req.body.tools || ["web_search", "code_execution", "file_read"],
messages: [],
tasks: []
};
res.status(201).json({ id: id, status: "active", tools: sessions[id].tools, createdAt: sessions[id].createdAt });
}
);
// Get session
router.get("/sessions/:id",
param("id").isUUID(4),
handleValidation,
function (req, res) {
var s = sessions[req.params.id];
if (!s) return res.status(404).json({ error: "Session not found" });
if (s.userId !== req.user.id) return res.status(403).json({ error: "Access denied" });
res.json({ id: s.id, status: s.status, tools: s.tools, taskCount: s.tasks.length, createdAt: s.createdAt });
}
);
// Delete session
router.delete("/sessions/:id",
param("id").isUUID(4),
handleValidation,
function (req, res) {
var s = sessions[req.params.id];
if (!s) return res.status(404).json({ error: "Session not found" });
if (s.userId !== req.user.id) return res.status(403).json({ error: "Access denied" });
delete sessions[req.params.id];
res.status(204).end();
}
);
// Submit task
router.post("/sessions/:id/tasks",
taskLimiter,
param("id").isUUID(4),
body("prompt").isString().trim().isLength({ min: 1, max: 10000 }),
handleValidation,
function (req, res) {
var s = sessions[req.params.id];
if (!s) return res.status(404).json({ error: "Session not found" });
if (s.userId !== req.user.id) return res.status(403).json({ error: "Access denied" });
var quota = userQuotas[req.user.id];
quota.tasksToday++;
var taskId = uuidv4();
var task = {
id: taskId,
sessionId: s.id,
prompt: req.body.prompt,
status: "queued",
result: null,
steps: [],
createdAt: new Date().toISOString(),
completedAt: null
};
tasks[taskId] = task;
s.tasks.push(taskId);
s.messages.push({ role: "user", content: req.body.prompt });
taskQueue.push({ task: task, session: s });
processQueue();
res.status(202).json({
taskId: taskId,
status: "queued",
pollUrl: "/api/v1/agents/tasks/" + taskId,
streamUrl: "/api/v1/agents/tasks/" + taskId + "/stream"
});
}
);
// Poll task status
router.get("/tasks/:taskId",
param("taskId").isUUID(4),
handleValidation,
function (req, res) {
var t = tasks[req.params.taskId];
if (!t) return res.status(404).json({ error: "Task not found" });
var response = {
id: t.id, sessionId: t.sessionId, status: t.status,
stepsCompleted: t.steps.length, createdAt: t.createdAt, completedAt: t.completedAt
};
if (t.status === "completed") response.result = t.result;
if (t.status === "failed") response.error = t.result;
var elapsed = Date.now() - new Date(t.createdAt).getTime();
response.retryAfter = elapsed < 5000 ? 1 : elapsed < 30000 ? 3 : 5;
res.json(response);
}
);
// Stream task via SSE
router.get("/tasks/:taskId/stream",
param("taskId").isUUID(4),
handleValidation,
function (req, res) {
var t = tasks[req.params.taskId];
if (!t) return res.status(404).json({ error: "Task not found" });
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
});
res.write("event: status\ndata: " + JSON.stringify({ status: t.status }) + "\n\n");
if (t.status === "completed" || t.status === "failed") {
res.write("event: result\ndata: " + JSON.stringify({ result: t.result }) + "\n\n");
res.end();
return;
}
var emitter = getTaskEmitter(t.id);
var onStep = function (s) { res.write("event: step\ndata: " + JSON.stringify(s) + "\n\n"); };
var onToken = function (tk) { res.write("event: token\ndata: " + JSON.stringify({ token: tk }) + "\n\n"); };
var onComplete = function (r) {
res.write("event: result\ndata: " + JSON.stringify({ result: r }) + "\n\n");
res.write("event: done\ndata: {}\n\n");
cleanup();
res.end();
};
var onError = function (e) {
res.write("event: error\ndata: " + JSON.stringify({ error: e.message }) + "\n\n");
cleanup();
res.end();
};
function cleanup() {
emitter.removeListener("step", onStep);
emitter.removeListener("token", onToken);
emitter.removeListener("complete", onComplete);
emitter.removeListener("error", onError);
}
emitter.on("step", onStep);
emitter.on("token", onToken);
emitter.on("complete", onComplete);
emitter.on("error", onError);
req.on("close", cleanup);
}
);
// Capabilities
router.get("/capabilities", function (req, res) {
res.json({
tools: [
{ name: "web_search", description: "Search the web" },
{ name: "code_execution", description: "Execute JavaScript in sandbox" },
{ name: "file_read", description: "Read workspace files" }
],
models: ["gpt-4o", "claude-sonnet-4"],
maxSessionDuration: 1800,
maxTasksPerSession: 50
});
});
// Mount routes
app.use("/api/v1/agents", authMiddleware, router);
// Swagger docs (no auth required)
app.use("/docs", swaggerUi.serve, swaggerUi.setup({
openapi: "3.0.3",
info: { title: "Agent API", version: "1.0.0" },
servers: [{ url: "/api/v1" }],
paths: {
"/agents/sessions": { post: { summary: "Create session", tags: ["Sessions"] } },
"/agents/sessions/{id}/tasks": { post: { summary: "Submit task", tags: ["Tasks"] } },
"/agents/tasks/{taskId}": { get: { summary: "Poll task status", tags: ["Tasks"] } },
"/agents/tasks/{taskId}/stream": { get: { summary: "Stream task output", tags: ["Tasks"] } },
"/agents/capabilities": { get: { summary: "List capabilities", tags: ["Discovery"] } }
},
components: { securitySchemes: { bearerAuth: { type: "http", scheme: "bearer", bearerFormat: "JWT" } } }
}));
// Token generator for testing
app.post("/auth/token", express.json(), function (req, res) {
var token = jwt.sign(
{ id: req.body.userId || "user-1", plan: req.body.plan || "free" },
JWT_SECRET,
{ expiresIn: "24h" }
);
res.json({ token: token });
});
// WebSocket setup
var wss = new WebSocket.Server({ server: server, path: "/ws/agents" });
wss.on("connection", function (ws, req) {
var token = new URL(req.url, "http://localhost").searchParams.get("token");
try {
ws.user = jwt.verify(token, JWT_SECRET);
} catch (err) {
ws.close(4001, "Authentication failed");
return;
}
ws.on("message", function (raw) {
try {
var msg = JSON.parse(raw);
ws.send(JSON.stringify({ type: "ack", received: msg.type }));
} catch (e) {
ws.send(JSON.stringify({ type: "error", message: "Invalid JSON" }));
}
});
});
// Health check
app.get("/health", function (req, res) {
res.json({ status: "ok", activeSessions: Object.keys(sessions).length, queuedTasks: taskQueue.length });
});
server.listen(PORT, function () {
console.log("Agent API server running on port " + PORT);
console.log("Docs available at http://localhost:" + PORT + "/docs");
console.log("Health check at http://localhost:" + PORT + "/health");
});
Test it with curl:
# Get a JWT token
curl -X POST http://localhost:3000/auth/token \
-H "Content-Type: application/json" \
-d '{"userId": "user-1", "plan": "pro"}'
# Create a session (use the token from above)
curl -X POST http://localhost:3000/api/v1/agents/sessions \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{"tools": ["web_search"]}'
# Submit a task
curl -X POST http://localhost:3000/api/v1/agents/sessions/SESSION_ID/tasks \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{"prompt": "Analyze the current state of WebAssembly adoption"}'
# Poll for results
curl http://localhost:3000/api/v1/agents/tasks/TASK_ID \
-H "Authorization: Bearer YOUR_TOKEN"
# Stream results via SSE
curl -N http://localhost:3000/api/v1/agents/tasks/TASK_ID/stream \
-H "Authorization: Bearer YOUR_TOKEN"
Common Issues and Troubleshooting
1. SSE connections drop after 60 seconds
Error: ESOCKETTIMEDOUT
This happens when a reverse proxy (nginx, ALB, Cloudflare) has a shorter timeout than your agent's processing time. For nginx, increase proxy_read_timeout:
location /api/v1/agents/tasks/ {
proxy_pass http://backend;
proxy_read_timeout 300s;
proxy_buffering off;
}
For AWS ALB, increase the idle timeout in the target group settings. The default is 60 seconds, which is too short for most agent tasks.
2. "EventSource's response has a MIME type that is not text/event-stream"
EventSource's response has a MIME type ("application/json") that is not
"text/event-stream". Aborting the connection.
This means your SSE endpoint hit an error path that returned JSON instead of starting the event stream. Always set headers before any error checking that might send a non-SSE response, or handle errors as SSE events:
// Wrong: error returns JSON after SSE headers expected
if (!task) return res.status(404).json({ error: "Not found" });
// Right: return the 404 before starting SSE
// The task existence check must happen before res.writeHead()
3. Memory leak from abandoned EventEmitters
MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
11 complete listeners added to [EventEmitter].
This happens when clients connect to the SSE endpoint, disconnect without properly closing, and the event listeners accumulate. Always clean up listeners on req.close:
req.on("close", function () {
emitter.removeListener("step", onStep);
emitter.removeListener("token", onToken);
emitter.removeListener("complete", onComplete);
emitter.removeListener("error", onError);
delete taskEmitters[taskId]; // Also clean up the emitter itself
});
4. WebSocket authentication fails with "Unexpected server response: 401"
WebSocket connection to 'ws://localhost:3000/ws/agents' failed:
Unexpected server response: 401
You cannot send custom headers with browser WebSocket connections. Pass the token as a query parameter instead:
var ws = new WebSocket("ws://localhost:3000/ws/agents?token=" + jwtToken);
On the server, extract it from the URL:
var token = new URL(req.url, "http://localhost").searchParams.get("token");
5. "Request entity too large" when submitting tasks with context
PayloadTooLargeError: request entity too large
Express defaults to a 100KB body limit. For agent APIs where users paste documents or codebases as context, increase it:
app.use(express.json({ limit: "1mb" }));
Set this based on your maximum prompt size. I use 1MB as the Express limit and 10,000 characters as the validated prompt limit. The body limit catches oversized requests before they hit your validation logic.
Best Practices
Use 202 Accepted for task submission. Agent tasks are inherently asynchronous. Returning 200 and blocking until the agent finishes ties up connections and creates timeout issues. Return 202 with a poll URL and let the client decide how to wait for results.
Implement circuit breakers on LLM provider calls. If your LLM provider is down, fail fast instead of queuing hundreds of tasks that will all timeout. Track error rates and open the circuit breaker after 5 consecutive failures.
Set hard timeouts on every agent task. I have seen agents enter infinite loops, especially when tool calls return unexpected results. A 5-minute timeout kills the task and returns a meaningful error. Without it, the task runs forever and consumes resources.
Separate rate limiting from business quotas. Rate limiting (10 requests per minute) protects server stability. Quotas (20 tasks per day on the free plan) enforce business rules. They serve different purposes and should be implemented as separate middleware layers.
Version your API from day one. Agent capabilities change rapidly. What starts as a simple text-in/text-out API will grow to support tool calling, file uploads, multi-turn conversations, and structured output. URL-based versioning (
/api/v1/agents) is the most pragmatic approach.Clean up resources aggressively. Sessions, task emitters, and WebSocket connections all consume memory. Set TTLs on sessions (30 minutes of inactivity), delete task emitters after completion, and handle WebSocket disconnects immediately.
Log every agent interaction for debugging and auditing. When a user reports that the agent gave a wrong answer, you need the full conversation history, tool calls, and intermediate steps. Log the complete task lifecycle: creation, each step, completion or failure, and the final result.
Never forward raw user input to the LLM without sanitization. Even with validation, sanitize prompts to remove potential injection patterns. At minimum, strip control characters and enforce the character limit before sending anything to the LLM provider.
Use Redis for session storage in production. In-memory stores work for development, but any production deployment with multiple server instances needs shared state. Redis with a 30-minute TTL keeps sessions available across instances and automatically cleans up abandoned sessions.