Agent State Persistence with PostgreSQL
Persist AI agent state with PostgreSQL including session management, checkpointing, execution traces, and resume capabilities in Node.js.
Agent State Persistence with PostgreSQL
Overview
AI agents that run multi-step workflows need durable state. If your agent process crashes mid-execution, you lose everything unless you have persistence in place. PostgreSQL gives you transactional guarantees, JSONB flexibility, row-level locking for concurrent access, and a query language powerful enough to debug agent behavior after the fact. This article covers schema design, checkpoint management, execution tracing, and crash recovery for agent systems backed by PostgreSQL.
Prerequisites
- Node.js v18+ installed
- PostgreSQL 14+ running locally or remotely
- Familiarity with the
pgmodule for Node.js - Basic understanding of AI agent architectures (tool-calling loops, multi-step reasoning)
- Working knowledge of SQL including JSONB operations
Install the required packages:
npm install pg uuid
Why Agent State Needs Persistence
Crash Recovery
Agents that orchestrate multi-step workflows can run for minutes or even hours. A typical agentic loop calls an LLM, gets back a tool invocation, executes that tool, feeds the result back to the LLM, and repeats. If your process dies at step 14 of a 20-step plan, you need to resume from step 14 without re-running the first 13 steps. That means storing every step as it completes.
I have seen production agent systems lose 40 minutes of work because someone deployed a new version during a long-running agent task. The fix was simple: persist every state transition to PostgreSQL and resume from the last checkpoint on restart.
Horizontal Scaling
When you scale agents across multiple server instances, in-memory state does not work. You need a shared persistence layer so that any instance can pick up an agent session. PostgreSQL handles this well because it gives you row-level locking, so two instances will never corrupt the same session simultaneously.
Debugging and Observability
Without persistence, debugging a misbehaving agent means reading logs. With a well-designed schema, you can query exactly what the agent did, what tools it called, what results it got, and where it went wrong. This is invaluable when an agent produces an unexpected output and you need to trace through its reasoning.
Audit Trails
In regulated industries or customer-facing applications, you may need to prove what your agent did and why. A PostgreSQL-backed execution trace gives you a permanent, queryable record of every decision the agent made.
Designing the Schema
A solid agent state schema needs four core tables: sessions, steps, checkpoints, and execution traces. Here is the schema I use in production:
-- Agent sessions: one row per agent invocation
CREATE TABLE agent_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_type VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'running'
CHECK (status IN ('running', 'paused', 'completed', 'failed', 'cancelled')),
input JSONB NOT NULL,
output JSONB,
metadata JSONB DEFAULT '{}',
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
version INTEGER NOT NULL DEFAULT 1
);
-- Agent steps: each LLM call or tool execution within a session
CREATE TABLE agent_steps (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
step_number INTEGER NOT NULL,
step_type VARCHAR(50) NOT NULL
CHECK (step_type IN ('llm_call', 'tool_call', 'tool_result', 'decision', 'user_input')),
tool_name VARCHAR(200),
input JSONB,
output JSONB,
token_usage JSONB,
duration_ms INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (session_id, step_number)
);
-- Checkpoints: snapshots of agent state at specific steps
CREATE TABLE agent_checkpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
step_number INTEGER NOT NULL,
conversation_history JSONB NOT NULL,
agent_state JSONB NOT NULL,
pending_actions JSONB DEFAULT '[]',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (session_id, step_number)
);
-- Conversation messages: full message history per session
CREATE TABLE agent_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES agent_sessions(id) ON DELETE CASCADE,
message_index INTEGER NOT NULL,
role VARCHAR(20) NOT NULL CHECK (role IN ('system', 'user', 'assistant', 'tool')),
content TEXT,
tool_calls JSONB,
tool_call_id VARCHAR(200),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (session_id, message_index)
);
-- Indexes for common queries
CREATE INDEX idx_sessions_status ON agent_sessions(status);
CREATE INDEX idx_sessions_agent_type ON agent_sessions(agent_type);
CREATE INDEX idx_sessions_created ON agent_sessions(created_at);
CREATE INDEX idx_steps_session ON agent_steps(session_id, step_number);
CREATE INDEX idx_checkpoints_session ON agent_checkpoints(session_id, step_number);
CREATE INDEX idx_messages_session ON agent_messages(session_id, message_index);
-- GIN index for JSONB queries on metadata
CREATE INDEX idx_sessions_metadata ON agent_sessions USING GIN (metadata);
A few design decisions worth noting. The step_number column is sequential per session, making it easy to resume from a known point. The UNIQUE constraint on (session_id, step_number) prevents duplicate steps from being recorded if a retry occurs. The agent_state JSONB column in checkpoints holds whatever arbitrary state your agent needs, which varies dramatically between agent types.
Storing Conversation History
Every agent session maintains a conversation with the LLM. You need to persist the full message array so you can reconstruct it on resume. The agent_messages table handles this:
var pg = require("pg");
var pool = new pg.Pool({
connectionString: process.env.POSTGRES_CONNECTION_STRING,
max: 20
});
function saveMessage(sessionId, messageIndex, message) {
var query = {
text: "INSERT INTO agent_messages (session_id, message_index, role, content, tool_calls, tool_call_id) " +
"VALUES ($1, $2, $3, $4, $5, $6) " +
"ON CONFLICT (session_id, message_index) DO UPDATE SET " +
"content = EXCLUDED.content, tool_calls = EXCLUDED.tool_calls",
values: [
sessionId,
messageIndex,
message.role,
message.content || null,
message.tool_calls ? JSON.stringify(message.tool_calls) : null,
message.tool_call_id || null
]
};
return pool.query(query);
}
function loadConversationHistory(sessionId) {
var query = {
text: "SELECT role, content, tool_calls, tool_call_id FROM agent_messages " +
"WHERE session_id = $1 ORDER BY message_index ASC",
values: [sessionId]
};
return pool.query(query).then(function (result) {
return result.rows.map(function (row) {
var msg = { role: row.role };
if (row.content) msg.content = row.content;
if (row.tool_calls) msg.tool_calls = row.tool_calls;
if (row.tool_call_id) msg.tool_call_id = row.tool_call_id;
return msg;
});
});
}
The ON CONFLICT clause is important. If your agent process crashes after writing a message but before advancing to the next step, the retry will attempt to insert the same message again. The upsert ensures idempotency.
Checkpoint Management
Checkpoints are snapshots of the full agent state at a given step. They are heavier than individual steps because they include the full conversation history and any internal agent state. I typically checkpoint every 5 steps and always checkpoint before expensive tool calls:
function saveCheckpoint(sessionId, stepNumber, conversationHistory, agentState, pendingActions) {
var query = {
text: "INSERT INTO agent_checkpoints (session_id, step_number, conversation_history, agent_state, pending_actions) " +
"VALUES ($1, $2, $3, $4, $5) " +
"ON CONFLICT (session_id, step_number) DO UPDATE SET " +
"conversation_history = EXCLUDED.conversation_history, " +
"agent_state = EXCLUDED.agent_state, " +
"pending_actions = EXCLUDED.pending_actions",
values: [
sessionId,
stepNumber,
JSON.stringify(conversationHistory),
JSON.stringify(agentState),
JSON.stringify(pendingActions || [])
]
};
return pool.query(query);
}
function loadLatestCheckpoint(sessionId) {
var query = {
text: "SELECT step_number, conversation_history, agent_state, pending_actions " +
"FROM agent_checkpoints WHERE session_id = $1 " +
"ORDER BY step_number DESC LIMIT 1",
values: [sessionId]
};
return pool.query(query).then(function (result) {
if (result.rows.length === 0) return null;
return result.rows[0];
});
}
The checkpoint's pending_actions array stores any tool calls that were requested by the LLM but not yet executed. This is critical for resume: you need to know whether the agent was waiting to execute a tool call when it crashed.
Implementing Save and Load State
The core persistence layer wraps all database operations into a clean API. Here is the full module:
// agentStateStore.js
var pg = require("pg");
var crypto = require("crypto");
var pool = new pg.Pool({
connectionString: process.env.POSTGRES_CONNECTION_STRING,
max: 20
});
var AgentStateStore = {
createSession: function (agentType, input, metadata) {
var query = {
text: "INSERT INTO agent_sessions (agent_type, input, metadata) " +
"VALUES ($1, $2, $3) RETURNING id, created_at",
values: [agentType, JSON.stringify(input), JSON.stringify(metadata || {})]
};
return pool.query(query).then(function (result) {
return result.rows[0];
});
},
updateSessionStatus: function (sessionId, status, output, errorMessage) {
var completedAt = (status === "completed" || status === "failed") ? "NOW()" : null;
var query = {
text: "UPDATE agent_sessions SET status = $2, output = $3, error_message = $4, " +
"updated_at = NOW(), completed_at = " + (completedAt || "completed_at") + ", " +
"version = version + 1 " +
"WHERE id = $1 RETURNING version",
values: [
sessionId,
status,
output ? JSON.stringify(output) : null,
errorMessage || null
]
};
return pool.query(query).then(function (result) {
return result.rows[0];
});
},
recordStep: function (sessionId, stepNumber, stepType, data) {
var startTime = data.startTime || Date.now();
var durationMs = data.durationMs || (Date.now() - startTime);
var query = {
text: "INSERT INTO agent_steps (session_id, step_number, step_type, tool_name, input, output, token_usage, duration_ms) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8) " +
"ON CONFLICT (session_id, step_number) DO NOTHING",
values: [
sessionId,
stepNumber,
stepType,
data.toolName || null,
data.input ? JSON.stringify(data.input) : null,
data.output ? JSON.stringify(data.output) : null,
data.tokenUsage ? JSON.stringify(data.tokenUsage) : null,
durationMs
]
};
return pool.query(query);
},
getSession: function (sessionId) {
var query = {
text: "SELECT * FROM agent_sessions WHERE id = $1",
values: [sessionId]
};
return pool.query(query).then(function (result) {
return result.rows[0] || null;
});
},
getSessionSteps: function (sessionId) {
var query = {
text: "SELECT * FROM agent_steps WHERE session_id = $1 ORDER BY step_number ASC",
values: [sessionId]
};
return pool.query(query).then(function (result) {
return result.rows;
});
},
getResumableState: function (sessionId) {
var self = this;
return Promise.all([
self.getSession(sessionId),
loadLatestCheckpoint(sessionId),
loadConversationHistory(sessionId)
]).then(function (results) {
var session = results[0];
var checkpoint = results[1];
var messages = results[2];
if (!session) return null;
if (session.status !== "running" && session.status !== "paused") return null;
return {
session: session,
checkpoint: checkpoint,
messages: checkpoint ? messages.slice(0, checkpoint.step_number) : messages,
resumeFromStep: checkpoint ? checkpoint.step_number : 0,
pendingActions: checkpoint ? checkpoint.pending_actions : []
};
});
}
};
module.exports = AgentStateStore;
Tracking Agent Execution Traces
Execution traces are the detailed log of everything the agent did. Unlike steps which are structured records, traces capture the raw decision-making process. I store these as JSONB to keep them flexible:
function recordTrace(sessionId, traceType, data) {
var query = {
text: "INSERT INTO agent_steps (session_id, step_number, step_type, input, output) " +
"VALUES ($1, (SELECT COALESCE(MAX(step_number), 0) + 1 FROM agent_steps WHERE session_id = $1), $2, $3, $4)",
values: [
sessionId,
traceType,
JSON.stringify({ timestamp: new Date().toISOString(), context: data.context }),
JSON.stringify({ result: data.result, reasoning: data.reasoning })
]
};
return pool.query(query);
}
This auto-increments the step number using a subquery. It is not as fast as tracking the step number in application code, but it is safe under concurrency.
Querying Agent History for Debugging
This is where PostgreSQL really shines compared to alternatives like Redis or DynamoDB. You can write ad-hoc queries to understand what your agents are doing:
-- Find all failed sessions in the last 24 hours
SELECT id, agent_type, error_message, created_at,
EXTRACT(EPOCH FROM (updated_at - created_at)) AS duration_seconds
FROM agent_sessions
WHERE status = 'failed'
AND created_at > NOW() - INTERVAL '24 hours'
ORDER BY created_at DESC;
-- Get the full execution trace for a specific session
SELECT s.step_number, s.step_type, s.tool_name,
s.input->>'prompt' AS prompt_preview,
s.duration_ms,
s.token_usage->>'total_tokens' AS total_tokens
FROM agent_steps s
WHERE s.session_id = 'your-session-id-here'
ORDER BY s.step_number;
-- Find which tools are slowest on average
SELECT tool_name,
COUNT(*) AS call_count,
AVG(duration_ms) AS avg_duration,
MAX(duration_ms) AS max_duration,
SUM((token_usage->>'total_tokens')::integer) AS total_tokens
FROM agent_steps
WHERE step_type = 'tool_call'
AND tool_name IS NOT NULL
AND created_at > NOW() - INTERVAL '7 days'
GROUP BY tool_name
ORDER BY avg_duration DESC;
-- Find sessions that used more than 50 steps (runaway agents)
SELECT s.id, s.agent_type, s.status, COUNT(st.id) AS step_count,
s.created_at, s.metadata
FROM agent_sessions s
JOIN agent_steps st ON st.session_id = s.id
GROUP BY s.id
HAVING COUNT(st.id) > 50
ORDER BY step_count DESC;
These queries are the primary reason I choose PostgreSQL over document stores for agent state. Being able to aggregate across sessions, join steps with sessions, and filter on JSONB fields is enormously valuable when you are debugging agent behavior at scale.
Here is a Node.js function that exposes a debugging API:
function getSessionDebugInfo(sessionId) {
var query = {
text: "SELECT s.id, s.agent_type, s.status, s.input, s.output, " +
"s.error_message, s.created_at, s.completed_at, s.metadata, " +
"(SELECT COUNT(*) FROM agent_steps WHERE session_id = s.id) AS step_count, " +
"(SELECT SUM(duration_ms) FROM agent_steps WHERE session_id = s.id) AS total_duration_ms, " +
"(SELECT SUM((token_usage->>'total_tokens')::integer) FROM agent_steps WHERE session_id = s.id AND token_usage IS NOT NULL) AS total_tokens " +
"FROM agent_sessions s WHERE s.id = $1",
values: [sessionId]
};
return pool.query(query).then(function (result) {
return result.rows[0] || null;
});
}
State Versioning for Schema Migrations
Agent state schemas evolve. You might add new fields to checkpoints, restructure step data, or change how tool calls are recorded. The version column on agent_sessions helps with this:
var CURRENT_SCHEMA_VERSION = 3;
function migrateSessionState(session) {
if (session.version === CURRENT_SCHEMA_VERSION) return session;
var state = session;
// v1 -> v2: Added token tracking
if (state.version < 2) {
state.metadata = state.metadata || {};
state.metadata.token_budget = state.metadata.token_budget || 100000;
state.version = 2;
}
// v2 -> v3: Restructured tool call format
if (state.version < 3) {
state.metadata.tool_config = state.metadata.tool_config || { max_retries: 3 };
state.version = 3;
}
// Persist the migrated version
var query = {
text: "UPDATE agent_sessions SET metadata = $2, version = $3 WHERE id = $1",
values: [state.id, JSON.stringify(state.metadata), CURRENT_SCHEMA_VERSION]
};
pool.query(query);
return state;
}
This lazy migration approach works well for agent state because you typically only need to migrate sessions that are being resumed. Completed sessions can stay in their original format since they are only read for debugging.
Optimizing Queries for Large Agent Histories
After running agents in production for a few months, the agent_steps table will be your largest. A busy system can generate millions of step rows. Here is how to keep queries fast:
Partitioning by Time
-- Convert agent_steps to a partitioned table
CREATE TABLE agent_steps_partitioned (
LIKE agent_steps INCLUDING ALL
) PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE agent_steps_2026_01 PARTITION OF agent_steps_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE agent_steps_2026_02 PARTITION OF agent_steps_partitioned
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
CREATE TABLE agent_steps_2026_03 PARTITION OF agent_steps_partitioned
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
Partial Indexes
-- Index only running sessions (the ones you query most)
CREATE INDEX idx_sessions_running
ON agent_sessions(created_at)
WHERE status = 'running';
-- Index only tool call steps for performance analysis
CREATE INDEX idx_steps_tool_calls
ON agent_steps(tool_name, duration_ms)
WHERE step_type = 'tool_call';
Limiting Conversation History
For agents with very long conversations, load only the last N messages plus the system prompt:
function loadRecentHistory(sessionId, maxMessages) {
maxMessages = maxMessages || 50;
var query = {
text: "(" +
" SELECT role, content, tool_calls, tool_call_id, message_index " +
" FROM agent_messages WHERE session_id = $1 AND role = 'system' " +
" ORDER BY message_index ASC LIMIT 1" +
") UNION ALL (" +
" SELECT role, content, tool_calls, tool_call_id, message_index " +
" FROM agent_messages WHERE session_id = $1 AND role != 'system' " +
" ORDER BY message_index DESC LIMIT $2" +
") ORDER BY message_index ASC",
values: [sessionId, maxMessages]
};
return pool.query(query).then(function (result) {
return result.rows.map(function (row) {
var msg = { role: row.role };
if (row.content) msg.content = row.content;
if (row.tool_calls) msg.tool_calls = row.tool_calls;
if (row.tool_call_id) msg.tool_call_id = row.tool_call_id;
return msg;
});
});
}
Archiving Completed Sessions
Completed agent sessions should not sit in your hot tables forever. Move them to an archive table or delete them after a retention period:
function archiveCompletedSessions(olderThanDays) {
olderThanDays = olderThanDays || 90;
var client;
return pool.connect().then(function (c) {
client = c;
return client.query("BEGIN");
}).then(function () {
// Archive sessions
return client.query(
"INSERT INTO agent_sessions_archive SELECT * FROM agent_sessions " +
"WHERE status IN ('completed', 'failed', 'cancelled') " +
"AND completed_at < NOW() - ($1 || ' days')::interval",
[olderThanDays]
);
}).then(function (result) {
var archivedCount = result.rowCount;
// Cascade delete will handle steps, checkpoints, messages
return client.query(
"DELETE FROM agent_sessions " +
"WHERE status IN ('completed', 'failed', 'cancelled') " +
"AND completed_at < NOW() - ($1 || ' days')::interval",
[olderThanDays]
).then(function () {
return client.query("COMMIT").then(function () {
client.release();
return archivedCount;
});
});
}).catch(function (err) {
if (client) {
client.query("ROLLBACK").then(function () {
client.release();
});
}
throw err;
});
}
Before running this, make sure you have created the agent_sessions_archive table with the same schema as agent_sessions.
State-Based Resume After Server Restart
On server startup, scan for interrupted sessions and resume them:
function findInterruptedSessions() {
var query = {
text: "SELECT id, agent_type, input, metadata FROM agent_sessions " +
"WHERE status = 'running' " +
"ORDER BY created_at ASC"
};
return pool.query(query).then(function (result) {
return result.rows;
});
}
function resumeSession(sessionId, agentRunner) {
var store = require("./agentStateStore");
return store.getResumableState(sessionId).then(function (state) {
if (!state) {
console.log("Session %s is not resumable", sessionId);
return null;
}
console.log("Resuming session %s from step %d", sessionId, state.resumeFromStep);
console.log(" Agent type: %s", state.session.agent_type);
console.log(" Pending actions: %d", state.pendingActions.length);
// Update status to show it is being resumed
return store.updateSessionStatus(sessionId, "running").then(function () {
return agentRunner.run({
sessionId: sessionId,
messages: state.messages,
agentState: state.checkpoint ? state.checkpoint.agent_state : {},
resumeFromStep: state.resumeFromStep,
pendingActions: state.pendingActions
});
});
});
}
function resumeAllOnStartup(agentRunner) {
return findInterruptedSessions().then(function (sessions) {
if (sessions.length === 0) {
console.log("No interrupted sessions to resume");
return;
}
console.log("Found %d interrupted sessions", sessions.length);
// Resume sessions sequentially to avoid overwhelming resources
var chain = Promise.resolve();
sessions.forEach(function (session) {
chain = chain.then(function () {
return resumeSession(session.id, agentRunner).catch(function (err) {
console.error("Failed to resume session %s: %s", session.id, err.message);
return store.updateSessionStatus(session.id, "failed", null, err.message);
});
});
});
return chain;
});
}
Call resumeAllOnStartup() in your Express app's startup sequence, after the database pool is ready but before you start accepting new requests.
Concurrent Agent State Access
When multiple server instances can process the same agent session, you need locking. PostgreSQL's SELECT ... FOR UPDATE provides row-level locking within a transaction:
function acquireSessionLock(sessionId) {
var client;
return pool.connect().then(function (c) {
client = c;
return client.query("BEGIN");
}).then(function () {
return client.query({
text: "SELECT id, status, version FROM agent_sessions WHERE id = $1 FOR UPDATE NOWAIT",
values: [sessionId]
});
}).then(function (result) {
if (result.rows.length === 0) {
throw new Error("Session not found: " + sessionId);
}
return {
session: result.rows[0],
client: client,
release: function (newStatus) {
var updateQuery = newStatus
? client.query("UPDATE agent_sessions SET status = $1, updated_at = NOW(), version = version + 1 WHERE id = $2", [newStatus, sessionId])
: Promise.resolve();
return updateQuery.then(function () {
return client.query("COMMIT");
}).then(function () {
client.release();
});
},
rollback: function () {
return client.query("ROLLBACK").then(function () {
client.release();
});
}
};
}).catch(function (err) {
if (client) client.release();
if (err.code === "55P03") {
// Lock not available - another instance is processing this session
throw new Error("Session " + sessionId + " is locked by another process");
}
throw err;
});
}
The NOWAIT keyword is important. Without it, the query will block until the lock is available, which could hang your request. With NOWAIT, it immediately throws error code 55P03 if the row is locked, and you can handle it gracefully.
JSONB Columns for Flexible Metadata
The metadata JSONB column on sessions is intentionally unstructured. Different agent types need different metadata. A research agent might track source URLs. A code generation agent might track file paths. Here is how to work with JSONB effectively:
function updateSessionMetadata(sessionId, updates) {
// Merge updates into existing metadata using jsonb_concat (||)
var query = {
text: "UPDATE agent_sessions SET metadata = metadata || $2, updated_at = NOW() WHERE id = $1 RETURNING metadata",
values: [sessionId, JSON.stringify(updates)]
};
return pool.query(query).then(function (result) {
return result.rows[0].metadata;
});
}
// Query sessions by metadata field
function findSessionsByMetadata(key, value) {
var query = {
text: "SELECT id, agent_type, status, metadata FROM agent_sessions WHERE metadata->>$1 = $2",
values: [key, value]
};
return pool.query(query).then(function (result) {
return result.rows;
});
}
// Query sessions with nested JSONB paths
function findSessionsByNestedMetadata(path, value) {
// path example: '{config,model}' to match metadata.config.model
var query = {
text: "SELECT id, agent_type, status, metadata FROM agent_sessions WHERE metadata #>> $1 = $2",
values: [path, value]
};
return pool.query(query).then(function (result) {
return result.rows;
});
}
The GIN index on metadata makes these JSONB queries performant even at scale.
Complete Working Example
Here is a fully integrated agent persistence layer with an Express.js query API for debugging:
// server.js - Agent state persistence server
var express = require("express");
var pg = require("pg");
var app = express();
app.use(express.json());
var pool = new pg.Pool({
connectionString: process.env.POSTGRES_CONNECTION_STRING,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000
});
// --- Persistence Layer ---
var AgentPersistence = {
createSession: function (agentType, input, metadata) {
return pool.query(
"INSERT INTO agent_sessions (agent_type, input, metadata) VALUES ($1, $2, $3) RETURNING *",
[agentType, JSON.stringify(input), JSON.stringify(metadata || {})]
).then(function (r) { return r.rows[0]; });
},
saveStep: function (sessionId, stepNumber, stepType, data) {
return pool.query(
"INSERT INTO agent_steps (session_id, step_number, step_type, tool_name, input, output, token_usage, duration_ms) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (session_id, step_number) DO NOTHING",
[sessionId, stepNumber, stepType, data.toolName || null,
JSON.stringify(data.input), JSON.stringify(data.output),
data.tokenUsage ? JSON.stringify(data.tokenUsage) : null, data.durationMs || 0]
);
},
saveCheckpoint: function (sessionId, stepNumber, messages, state) {
return pool.query(
"INSERT INTO agent_checkpoints (session_id, step_number, conversation_history, agent_state) " +
"VALUES ($1, $2, $3, $4) ON CONFLICT (session_id, step_number) DO UPDATE SET " +
"conversation_history = EXCLUDED.conversation_history, agent_state = EXCLUDED.agent_state",
[sessionId, stepNumber, JSON.stringify(messages), JSON.stringify(state)]
);
},
saveMessage: function (sessionId, index, msg) {
return pool.query(
"INSERT INTO agent_messages (session_id, message_index, role, content, tool_calls, tool_call_id) " +
"VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (session_id, message_index) DO UPDATE SET content = EXCLUDED.content",
[sessionId, index, msg.role, msg.content || null,
msg.tool_calls ? JSON.stringify(msg.tool_calls) : null, msg.tool_call_id || null]
);
},
completeSession: function (sessionId, status, output, error) {
return pool.query(
"UPDATE agent_sessions SET status = $2, output = $3, error_message = $4, " +
"completed_at = NOW(), updated_at = NOW(), version = version + 1 WHERE id = $1",
[sessionId, status, output ? JSON.stringify(output) : null, error || null]
);
},
getResumableState: function (sessionId) {
return Promise.all([
pool.query("SELECT * FROM agent_sessions WHERE id = $1", [sessionId]),
pool.query("SELECT * FROM agent_checkpoints WHERE session_id = $1 ORDER BY step_number DESC LIMIT 1", [sessionId]),
pool.query("SELECT * FROM agent_messages WHERE session_id = $1 ORDER BY message_index", [sessionId])
]).then(function (results) {
var session = results[0].rows[0];
var checkpoint = results[1].rows[0] || null;
var messages = results[2].rows;
if (!session || (session.status !== "running" && session.status !== "paused")) return null;
return { session: session, checkpoint: checkpoint, messages: messages };
});
}
};
// --- Sample Agent Runner ---
function runAgent(sessionId, options) {
var messages = options.messages || [];
var stepNumber = options.resumeFromStep || 0;
var maxSteps = options.maxSteps || 30;
var checkpointInterval = 5;
function executeStep() {
if (stepNumber >= maxSteps) {
return AgentPersistence.completeSession(sessionId, "completed", { reason: "max_steps" });
}
stepNumber++;
var startTime = Date.now();
// Simulate LLM call (replace with actual API call)
return simulateLLMCall(messages).then(function (response) {
var durationMs = Date.now() - startTime;
// Record the LLM step
return AgentPersistence.saveStep(sessionId, stepNumber, "llm_call", {
input: { messageCount: messages.length },
output: { response: response.content ? response.content.substring(0, 500) : null },
tokenUsage: response.usage,
durationMs: durationMs
}).then(function () {
// Save assistant message
messages.push({ role: "assistant", content: response.content, tool_calls: response.tool_calls });
return AgentPersistence.saveMessage(sessionId, messages.length - 1, messages[messages.length - 1]);
}).then(function () {
// Checkpoint periodically
if (stepNumber % checkpointInterval === 0) {
return AgentPersistence.saveCheckpoint(sessionId, stepNumber, messages, { stepNumber: stepNumber });
}
}).then(function () {
// Check if agent is done
if (response.content && !response.tool_calls) {
return AgentPersistence.completeSession(sessionId, "completed", { finalResponse: response.content });
}
// Handle tool calls
if (response.tool_calls && response.tool_calls.length > 0) {
return handleToolCalls(sessionId, stepNumber, messages, response.tool_calls).then(function () {
return executeStep();
});
}
});
}).catch(function (err) {
return AgentPersistence.completeSession(sessionId, "failed", null, err.message);
});
}
return executeStep();
}
function handleToolCalls(sessionId, stepNumber, messages, toolCalls) {
var chain = Promise.resolve();
toolCalls.forEach(function (toolCall) {
chain = chain.then(function () {
stepNumber++;
var startTime = Date.now();
// Simulate tool execution
var result = { output: "Tool result for " + toolCall.function.name };
var durationMs = Date.now() - startTime;
return AgentPersistence.saveStep(sessionId, stepNumber, "tool_call", {
toolName: toolCall.function.name,
input: JSON.parse(toolCall.function.arguments || "{}"),
output: result,
durationMs: durationMs
}).then(function () {
messages.push({ role: "tool", content: JSON.stringify(result), tool_call_id: toolCall.id });
return AgentPersistence.saveMessage(sessionId, messages.length - 1, messages[messages.length - 1]);
});
});
});
return chain;
}
function simulateLLMCall(messages) {
// Replace this with actual OpenAI/Anthropic API call
return Promise.resolve({
content: "Based on my analysis, the task is complete.",
tool_calls: null,
usage: { prompt_tokens: 500, completion_tokens: 100, total_tokens: 600 }
});
}
// --- Query API for Debugging ---
app.get("/api/agents/sessions", function (req, res) {
var status = req.query.status;
var agentType = req.query.agent_type;
var limit = parseInt(req.query.limit) || 50;
var conditions = [];
var values = [];
var paramIndex = 1;
if (status) {
conditions.push("status = $" + paramIndex++);
values.push(status);
}
if (agentType) {
conditions.push("agent_type = $" + paramIndex++);
values.push(agentType);
}
var whereClause = conditions.length > 0 ? "WHERE " + conditions.join(" AND ") : "";
pool.query(
"SELECT id, agent_type, status, created_at, completed_at, " +
"EXTRACT(EPOCH FROM (COALESCE(completed_at, NOW()) - created_at)) AS duration_seconds, " +
"(SELECT COUNT(*) FROM agent_steps WHERE session_id = agent_sessions.id) AS step_count " +
"FROM agent_sessions " + whereClause + " ORDER BY created_at DESC LIMIT $" + paramIndex,
values.concat([limit])
).then(function (result) {
res.json({ sessions: result.rows });
}).catch(function (err) {
res.status(500).json({ error: err.message });
});
});
app.get("/api/agents/sessions/:id", function (req, res) {
Promise.all([
pool.query("SELECT * FROM agent_sessions WHERE id = $1", [req.params.id]),
pool.query("SELECT * FROM agent_steps WHERE session_id = $1 ORDER BY step_number", [req.params.id]),
pool.query("SELECT COUNT(*) as msg_count FROM agent_messages WHERE session_id = $1", [req.params.id])
]).then(function (results) {
var session = results[0].rows[0];
if (!session) return res.status(404).json({ error: "Session not found" });
res.json({
session: session,
steps: results[1].rows,
messageCount: parseInt(results[2].rows[0].msg_count)
});
}).catch(function (err) {
res.status(500).json({ error: err.message });
});
});
app.get("/api/agents/analytics", function (req, res) {
var days = parseInt(req.query.days) || 7;
pool.query(
"SELECT agent_type, status, COUNT(*) AS session_count, " +
"AVG(EXTRACT(EPOCH FROM (COALESCE(completed_at, NOW()) - created_at))) AS avg_duration_seconds, " +
"SUM((SELECT COUNT(*) FROM agent_steps WHERE session_id = agent_sessions.id)) AS total_steps " +
"FROM agent_sessions WHERE created_at > NOW() - ($1 || ' days')::interval " +
"GROUP BY agent_type, status ORDER BY session_count DESC",
[days]
).then(function (result) {
res.json({ analytics: result.rows, period_days: days });
}).catch(function (err) {
res.status(500).json({ error: err.message });
});
});
app.post("/api/agents/resume/:id", function (req, res) {
AgentPersistence.getResumableState(req.params.id).then(function (state) {
if (!state) return res.status(404).json({ error: "Session not resumable" });
var messages = state.messages.map(function (m) {
return { role: m.role, content: m.content, tool_calls: m.tool_calls, tool_call_id: m.tool_call_id };
});
runAgent(req.params.id, {
messages: messages,
resumeFromStep: state.checkpoint ? state.checkpoint.step_number : 0
});
res.json({ status: "resuming", sessionId: req.params.id });
}).catch(function (err) {
res.status(500).json({ error: err.message });
});
});
var PORT = process.env.PORT || 3000;
app.listen(PORT, function () {
console.log("Agent persistence server running on port %d", PORT);
// Resume interrupted sessions on startup
pool.query("SELECT id FROM agent_sessions WHERE status = 'running'").then(function (result) {
if (result.rows.length > 0) {
console.log("Found %d interrupted sessions to resume", result.rows.length);
}
});
});
To test it:
# Start the server
POSTGRES_CONNECTION_STRING=postgresql://localhost:5432/agent_db node server.js
# Create a session
curl -X POST http://localhost:3000/api/agents/sessions \
-H "Content-Type: application/json" \
-d '{"agent_type": "research", "input": {"query": "test"}}'
# List sessions
curl http://localhost:3000/api/agents/sessions
# Get session details
curl http://localhost:3000/api/agents/sessions/YOUR_SESSION_ID
# View analytics
curl http://localhost:3000/api/agents/analytics?days=30
Common Issues and Troubleshooting
1. Connection Pool Exhaustion
Error: remaining connection slots are reserved for non-replication superuser connections
This happens when your agent creates too many concurrent database connections. Each agent step that does persistence work holds a connection from the pool. If you have 10 agents running simultaneously and each uses 2-3 concurrent queries, you hit the default PostgreSQL max_connections limit of 100 fast.
Fix: Limit your pool size and use connection queuing:
var pool = new pg.Pool({
connectionString: process.env.POSTGRES_CONNECTION_STRING,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000
});
Also consider PgBouncer for connection pooling at the infrastructure level.
2. JSONB Serialization Errors
Error: invalid input syntax for type json
Detail: Token "undefined" is invalid.
This occurs when you pass undefined values inside objects being serialized to JSONB. JavaScript's JSON.stringify silently strips undefined properties from objects, but if a top-level value is undefined, pg sends the literal string "undefined" to PostgreSQL.
Fix: Always validate before insert:
var safeJson = data !== undefined && data !== null ? JSON.stringify(data) : null;
3. Deadlocks During Concurrent Checkpoint Writes
ERROR: deadlock detected
DETAIL: Process 12345 waits for ShareLock on transaction 67890; blocked by process 11111.
This happens when two processes try to update checkpoints for the same session simultaneously. The ON CONFLICT upsert acquires different locks depending on whether the row exists, creating a deadlock window.
Fix: Use advisory locks keyed to the session ID:
function saveCheckpointSafe(sessionId, stepNumber, data) {
var client;
return pool.connect().then(function (c) {
client = c;
// Hash the session UUID to a bigint for advisory lock
var lockKey = hashSessionId(sessionId);
return client.query("SELECT pg_advisory_xact_lock($1)", [lockKey]);
}).then(function () {
return client.query(
"INSERT INTO agent_checkpoints (session_id, step_number, conversation_history, agent_state) " +
"VALUES ($1, $2, $3, $4) ON CONFLICT (session_id, step_number) DO UPDATE SET " +
"conversation_history = EXCLUDED.conversation_history, agent_state = EXCLUDED.agent_state",
[sessionId, stepNumber, JSON.stringify(data.messages), JSON.stringify(data.state)]
);
}).then(function (result) {
client.release();
return result;
}).catch(function (err) {
if (client) client.release();
throw err;
});
}
function hashSessionId(uuid) {
var hash = 0;
for (var i = 0; i < uuid.length; i++) {
var char = uuid.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // Convert to 32-bit integer
}
return hash;
}
4. Conversation History Too Large for JSONB Column
ERROR: the string size (268435456) exceeds the max allowed size (268435455)
PostgreSQL JSONB columns can hold up to 255 MB, but storing enormous conversation histories (thousands of messages with large tool outputs) will cause performance issues long before you hit that limit. Queries slow down, checkpoints take seconds to write, and memory usage spikes.
Fix: Truncate tool outputs before persisting and cap conversation history length:
function truncateForStorage(obj, maxLength) {
maxLength = maxLength || 10000;
var str = JSON.stringify(obj);
if (str.length <= maxLength) return obj;
// For strings, truncate directly
if (typeof obj === "string") {
return obj.substring(0, maxLength) + "... [truncated]";
}
// For objects, serialize and note truncation
return { _truncated: true, preview: str.substring(0, maxLength), originalLength: str.length };
}
5. Stale Sessions After Ungraceful Shutdown
If your process is killed with SIGKILL (e.g., OOM killer), sessions remain in running status permanently. The resume logic will try to pick them up but they may be in an inconsistent state.
Fix: Add a staleness check when resuming:
function findTrulyStaleSessions(staleAfterMinutes) {
staleAfterMinutes = staleAfterMinutes || 30;
return pool.query(
"SELECT id, agent_type, updated_at FROM agent_sessions " +
"WHERE status = 'running' AND updated_at < NOW() - ($1 || ' minutes')::interval",
[staleAfterMinutes]
).then(function (result) {
return result.rows;
});
}
Best Practices
Checkpoint at meaningful boundaries, not every step. Checkpointing after every single step adds overhead with little benefit. Checkpoint before expensive operations (API calls, file writes) and at regular intervals (every 5-10 steps). The goal is to limit re-work on resume, not eliminate it entirely.
Use
ON CONFLICTfor all inserts. Agent retries and crash recovery mean you will inevitably attempt to insert duplicate rows. Every insert should handle conflicts gracefully with eitherDO NOTHINGorDO UPDATE.Keep the
agent_stepstable lean. Do not store full LLM responses or complete tool outputs in the steps table. Store summaries and metadata there. If you need full content, put it in the messages table or in a separate blob store.Set a maximum step limit per session. Runaway agents that loop indefinitely will fill your database. Always enforce a
maxStepslimit and mark the session as failed when it is exceeded.Version your state schema from day one. The
versioncolumn costs nothing and saves you from painful migrations later. Every time you change what goes intometadata,agent_state, or checkpoint payloads, increment the version.Use
FOR UPDATE NOWAITinstead ofFOR UPDATEfor agent locks. Blocking locks in agent systems lead to cascading timeouts. Fail fast and let the caller retry or route to a different session.Archive aggressively. Completed sessions are rarely queried after the first few days. Move them to archive tables or cold storage on a regular schedule. Your active tables should only contain running and recently completed sessions.
Index JSONB columns with GIN indexes selectively. A GIN index on the full
metadatacolumn enables flexible querying, but it is expensive to maintain. If you only query specific keys, use expression indexes instead:CREATE INDEX ON agent_sessions ((metadata->>'customer_id')).Monitor checkpoint write latency. If checkpoints start taking more than 100ms, your conversation histories are probably too large. Implement the truncation strategies from the optimization section.
Use connection pooling at both the application and infrastructure level. The
pgpool handles application-level pooling, but PgBouncer between your app and PostgreSQL handles multi-instance pooling and reduces connection overhead significantly.