Mcp

Building MCP Servers for Database Access

Complete guide to building MCP servers that provide AI models with structured database access, covering query tools, schema introspection, parameterized queries, connection pooling, result formatting, and production safety patterns.

Building MCP Servers for Database Access

Overview

Giving AI models access to your database through MCP is one of the most powerful integrations you can build — and one of the most dangerous if done carelessly. A well-built database MCP server lets models query data, explore schemas, and answer questions about your application state. A poorly built one gives models the ability to DROP TABLE in production. I have built database MCP servers for several teams, and the patterns here represent the safe middle ground: useful enough to be valuable, restricted enough to be trustworthy.

Prerequisites

  • Node.js 16 or later
  • @modelcontextprotocol/sdk package installed
  • PostgreSQL or MySQL database with read access credentials
  • pg package for PostgreSQL or mysql2 package for MySQL
  • Understanding of MCP tools and resources
  • A dedicated read-only database user for the MCP server

Database Access Architecture

The MCP server sits between the AI model and your database. Every query passes through validation, parameterization, and result formatting:

AI Model  →  MCP Client  →  MCP Server  →  Query Validator  →  Database
                                ↑
                          Safety Layer:
                          - Read-only enforcement
                          - Query timeout
                          - Row limit
                          - Schema filtering
                          - Audit logging

Creating a Read-Only Database User

Before writing any code, create a dedicated user with minimal permissions:

-- PostgreSQL: Create read-only user for MCP
CREATE USER mcp_reader WITH PASSWORD 'secure-random-password';

-- Grant connect
GRANT CONNECT ON DATABASE myapp TO mcp_reader;

-- Grant usage on schema
GRANT USAGE ON SCHEMA public TO mcp_reader;

-- Grant SELECT on all existing tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mcp_reader;

-- Grant SELECT on future tables
ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT ON TABLES TO mcp_reader;

-- Explicitly deny write operations
REVOKE INSERT, UPDATE, DELETE, TRUNCATE ON ALL TABLES IN SCHEMA public FROM mcp_reader;

-- Set statement timeout for safety (30 seconds)
ALTER USER mcp_reader SET statement_timeout = '30s';

-- Set max connections
ALTER USER mcp_reader CONNECTION LIMIT 5;
-- MySQL equivalent
CREATE USER 'mcp_reader'@'%' IDENTIFIED BY 'secure-random-password';
GRANT SELECT ON myapp.* TO 'mcp_reader'@'%';
SET GLOBAL max_user_connections = 5;
FLUSH PRIVILEGES;

Core MCP Database Server

var Server = require("@modelcontextprotocol/sdk/server/index.js").Server;
var StdioTransport = require("@modelcontextprotocol/sdk/server/stdio.js").StdioServerTransport;
var pg = require("pg");

var pool = new pg.Pool({
  connectionString: process.env.DATABASE_URL,
  max: 5,
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 5000,
  statement_timeout: 30000 // 30 second query timeout
});

var MAX_ROWS = 500;
var MAX_QUERY_LENGTH = 2000;

var server = new Server(
  { name: "database-server", version: "1.0.0" },
  {
    capabilities: {
      tools: {},
      resources: {}
    }
  }
);

// ---- Tools ----

server.setRequestHandler("tools/list", function() {
  return {
    tools: [
      {
        name: "query",
        description: "Execute a read-only SQL query against the database. Only SELECT statements are allowed. Results are limited to " + MAX_ROWS + " rows.",
        inputSchema: {
          type: "object",
          properties: {
            sql: {
              type: "string",
              description: "SQL SELECT query to execute"
            },
            params: {
              type: "array",
              items: { type: "string" },
              description: "Query parameters for $1, $2, etc. placeholders"
            }
          },
          required: ["sql"]
        }
      },
      {
        name: "describe_table",
        description: "Get detailed schema information for a specific table including columns, types, constraints, indexes, and row count.",
        inputSchema: {
          type: "object",
          properties: {
            table: {
              type: "string",
              description: "Table name (e.g., 'users' or 'public.users')"
            }
          },
          required: ["table"]
        }
      },
      {
        name: "list_tables",
        description: "List all tables in the database with their row counts and column counts.",
        inputSchema: {
          type: "object",
          properties: {
            schema: {
              type: "string",
              description: "Schema name (defaults to 'public')"
            }
          }
        }
      },
      {
        name: "explain_query",
        description: "Get the query execution plan without running the actual query. Useful for understanding performance.",
        inputSchema: {
          type: "object",
          properties: {
            sql: {
              type: "string",
              description: "SQL SELECT query to explain"
            }
          },
          required: ["sql"]
        }
      }
    ]
  };
});

server.setRequestHandler("tools/call", function(request) {
  var name = request.params.name;
  var args = request.params.arguments;

  switch (name) {
    case "query":
      return executeQuery(args.sql, args.params || []);
    case "describe_table":
      return describeTable(args.table);
    case "list_tables":
      return listTables(args.schema || "public");
    case "explain_query":
      return explainQuery(args.sql);
    default:
      throw new Error("Unknown tool: " + name);
  }
});

// ---- Query Execution ----

function validateQuery(sql) {
  if (!sql || typeof sql !== "string") {
    throw new Error("Query must be a non-empty string");
  }

  if (sql.length > MAX_QUERY_LENGTH) {
    throw new Error("Query exceeds maximum length of " + MAX_QUERY_LENGTH + " characters");
  }

  var normalized = sql.trim().toUpperCase();

  // Only allow SELECT and WITH (CTE) statements
  if (!normalized.startsWith("SELECT") && !normalized.startsWith("WITH")) {
    throw new Error("Only SELECT queries are allowed. Got: " + normalized.substring(0, 20) + "...");
  }

  // Block dangerous keywords even inside CTEs
  var forbidden = [
    "INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE",
    "TRUNCATE", "GRANT", "REVOKE", "EXECUTE", "EXEC",
    "INTO OUTFILE", "INTO DUMPFILE", "LOAD_FILE",
    "pg_read_file", "pg_write_file", "pg_ls_dir",
    "COPY", "\\\\copy"
  ];

  forbidden.forEach(function(keyword) {
    // Check for the keyword as a separate word (not part of another word)
    var regex = new RegExp("\\b" + keyword.replace(/\s+/g, "\\s+") + "\\b", "i");
    if (regex.test(sql)) {
      throw new Error("Forbidden keyword in query: " + keyword);
    }
  });

  return sql;
}

function executeQuery(sql, params) {
  try {
    sql = validateQuery(sql);
  } catch (e) {
    return {
      content: [{ type: "text", text: "Query validation error: " + e.message }],
      isError: true
    };
  }

  // Add LIMIT if not present
  var hasLimit = /\bLIMIT\s+\d+/i.test(sql);
  var querySql = hasLimit ? sql : sql.replace(/;?\s*$/, "") + " LIMIT " + MAX_ROWS;

  var startTime = Date.now();

  return pool.query(querySql, params)
    .then(function(result) {
      var duration = Date.now() - startTime;

      var output = {
        rowCount: result.rows.length,
        columns: result.fields.map(function(f) {
          return { name: f.name, dataTypeID: f.dataTypeID };
        }),
        rows: result.rows,
        duration: duration + "ms",
        truncated: !hasLimit && result.rows.length >= MAX_ROWS
      };

      if (output.truncated) {
        output.note = "Results limited to " + MAX_ROWS + " rows. Add your own LIMIT clause for different limits.";
      }

      return {
        content: [{ type: "text", text: JSON.stringify(output, null, 2) }]
      };
    })
    .catch(function(err) {
      return {
        content: [{ type: "text", text: "Query error: " + err.message }],
        isError: true
      };
    });
}

// ---- Schema Introspection ----

function describeTable(tableName) {
  // Parse schema.table format
  var parts = tableName.split(".");
  var schema = parts.length > 1 ? parts[0] : "public";
  var table = parts.length > 1 ? parts[1] : parts[0];

  // Validate identifiers
  if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(schema) || !/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(table)) {
    return {
      content: [{ type: "text", text: "Invalid table name. Use alphanumeric characters and underscores only." }],
      isError: true
    };
  }

  var queries = [
    // Columns
    pool.query(
      "SELECT column_name, data_type, character_maximum_length, " +
      "is_nullable, column_default, ordinal_position " +
      "FROM information_schema.columns " +
      "WHERE table_schema = $1 AND table_name = $2 " +
      "ORDER BY ordinal_position",
      [schema, table]
    ),
    // Primary key
    pool.query(
      "SELECT kcu.column_name " +
      "FROM information_schema.table_constraints tc " +
      "JOIN information_schema.key_column_usage kcu " +
      "  ON tc.constraint_name = kcu.constraint_name " +
      "WHERE tc.table_schema = $1 AND tc.table_name = $2 " +
      "  AND tc.constraint_type = 'PRIMARY KEY'",
      [schema, table]
    ),
    // Foreign keys
    pool.query(
      "SELECT kcu.column_name, ccu.table_name AS foreign_table, " +
      "ccu.column_name AS foreign_column " +
      "FROM information_schema.table_constraints tc " +
      "JOIN information_schema.key_column_usage kcu " +
      "  ON tc.constraint_name = kcu.constraint_name " +
      "JOIN information_schema.constraint_column_usage ccu " +
      "  ON tc.constraint_name = ccu.constraint_name " +
      "WHERE tc.table_schema = $1 AND tc.table_name = $2 " +
      "  AND tc.constraint_type = 'FOREIGN KEY'",
      [schema, table]
    ),
    // Indexes
    pool.query(
      "SELECT indexname, indexdef FROM pg_indexes " +
      "WHERE schemaname = $1 AND tablename = $2",
      [schema, table]
    ),
    // Row count (approximate for large tables)
    pool.query(
      "SELECT reltuples::bigint AS estimate " +
      "FROM pg_class " +
      "WHERE relname = $1",
      [table]
    ),
    // Sample rows
    pool.query("SELECT * FROM " + schema + "." + table + " LIMIT 5")
  ];

  return Promise.all(queries)
    .then(function(results) {
      var description = {
        table: schema + "." + table,
        columns: results[0].rows.map(function(col) {
          var type = col.data_type;
          if (col.character_maximum_length) {
            type += "(" + col.character_maximum_length + ")";
          }
          return {
            name: col.column_name,
            type: type,
            nullable: col.is_nullable === "YES",
            default: col.column_default,
            position: col.ordinal_position
          };
        }),
        primaryKey: results[1].rows.map(function(r) { return r.column_name; }),
        foreignKeys: results[2].rows.map(function(fk) {
          return {
            column: fk.column_name,
            references: fk.foreign_table + "." + fk.foreign_column
          };
        }),
        indexes: results[3].rows.map(function(idx) {
          return { name: idx.indexname, definition: idx.indexdef };
        }),
        estimatedRows: parseInt(results[4].rows[0] ? results[4].rows[0].estimate : 0),
        sampleRows: results[5].rows
      };

      return {
        content: [{ type: "text", text: JSON.stringify(description, null, 2) }]
      };
    })
    .catch(function(err) {
      return {
        content: [{ type: "text", text: "Error describing table: " + err.message }],
        isError: true
      };
    });
}

function listTables(schema) {
  if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(schema)) {
    return {
      content: [{ type: "text", text: "Invalid schema name" }],
      isError: true
    };
  }

  return pool.query(
    "SELECT t.table_name, t.table_type, " +
    "  (SELECT COUNT(*) FROM information_schema.columns c " +
    "   WHERE c.table_schema = t.table_schema AND c.table_name = t.table_name) as column_count, " +
    "  pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name)) as size_bytes " +
    "FROM information_schema.tables t " +
    "WHERE t.table_schema = $1 " +
    "ORDER BY t.table_name",
    [schema]
  ).then(function(result) {
    var tables = result.rows.map(function(row) {
      var sizeKB = Math.round(parseInt(row.size_bytes) / 1024);
      return {
        name: row.table_name,
        type: row.table_type,
        columns: parseInt(row.column_count),
        size: sizeKB > 1024 ? (sizeKB / 1024).toFixed(1) + " MB" : sizeKB + " KB"
      };
    });

    return {
      content: [{ type: "text", text: JSON.stringify({ schema: schema, tables: tables }, null, 2) }]
    };
  });
}

function explainQuery(sql) {
  try {
    sql = validateQuery(sql);
  } catch (e) {
    return {
      content: [{ type: "text", text: "Query validation error: " + e.message }],
      isError: true
    };
  }

  return pool.query("EXPLAIN (FORMAT JSON, ANALYZE false) " + sql)
    .then(function(result) {
      return {
        content: [{ type: "text", text: JSON.stringify(result.rows[0], null, 2) }]
      };
    })
    .catch(function(err) {
      return {
        content: [{ type: "text", text: "Explain error: " + err.message }],
        isError: true
      };
    });
}

// ---- Resources: Schema Overview ----

server.setRequestHandler("resources/list", function() {
  return pool.query(
    "SELECT table_name FROM information_schema.tables " +
    "WHERE table_schema = 'public' ORDER BY table_name"
  ).then(function(result) {
    var resources = [{
      uri: "db://schema",
      name: "Database Schema",
      description: "Complete schema with all tables, columns, keys, and relationships",
      mimeType: "application/json"
    }];

    result.rows.forEach(function(row) {
      resources.push({
        uri: "db://table/" + row.table_name,
        name: row.table_name,
        description: "Table: " + row.table_name,
        mimeType: "application/json"
      });
    });

    return { resources: resources };
  });
});

server.setRequestHandler("resources/read", function(request) {
  var uri = request.params.uri;

  if (uri === "db://schema") {
    return pool.query(
      "SELECT table_name, column_name, data_type, is_nullable " +
      "FROM information_schema.columns " +
      "WHERE table_schema = 'public' " +
      "ORDER BY table_name, ordinal_position"
    ).then(function(result) {
      var tables = {};
      result.rows.forEach(function(row) {
        if (!tables[row.table_name]) tables[row.table_name] = [];
        tables[row.table_name].push({
          column: row.column_name,
          type: row.data_type,
          nullable: row.is_nullable === "YES"
        });
      });

      return {
        contents: [{
          uri: uri,
          mimeType: "application/json",
          text: JSON.stringify(tables, null, 2)
        }]
      };
    });
  }

  var tableMatch = uri.match(/^db:\/\/table\/([a-zA-Z_]\w*)$/);
  if (tableMatch) {
    return describeTable(tableMatch[1]).then(function(toolResult) {
      return {
        contents: [{
          uri: uri,
          mimeType: "application/json",
          text: toolResult.content[0].text
        }]
      };
    });
  }

  throw new Error("Unknown resource: " + uri);
});

// ---- Audit Logging ----

var originalQuery = pool.query.bind(pool);
pool.query = function(sql, params) {
  var start = Date.now();
  return originalQuery(sql, params).then(function(result) {
    var duration = Date.now() - start;
    console.error("[AUDIT] " + new Date().toISOString()
      + " | " + duration + "ms"
      + " | rows:" + (result.rows ? result.rows.length : 0)
      + " | " + sql.substring(0, 100));
    return result;
  }).catch(function(err) {
    console.error("[AUDIT ERROR] " + new Date().toISOString()
      + " | " + sql.substring(0, 100)
      + " | " + err.message);
    throw err;
  });
};

// ---- Server Startup ----

var transport = new StdioTransport();
server.connect(transport).then(function() {
  console.error("Database MCP server running");
  console.error("  Max rows: " + MAX_ROWS);
  console.error("  Query timeout: 30s");
  console.error("  Connection pool: 5 max");
});

process.on("SIGINT", function() {
  pool.end();
  process.exit(0);
});

Connection Pooling and Performance

// Advanced connection pool configuration
var pg = require("pg");

var pool = new pg.Pool({
  connectionString: process.env.DATABASE_URL,

  // Pool sizing
  max: 5,                       // Max concurrent connections
  min: 1,                       // Keep at least 1 connection warm
  idleTimeoutMillis: 30000,     // Close idle connections after 30s
  connectionTimeoutMillis: 5000, // Fail fast if can't connect in 5s

  // Query safety
  statement_timeout: 30000,      // Kill queries after 30s
  query_timeout: 35000,          // Client-side timeout (slightly longer)

  // SSL for production
  ssl: process.env.NODE_ENV === "production" ? { rejectUnauthorized: true } : false
});

// Monitor pool health
pool.on("error", function(err) {
  console.error("Unexpected pool error:", err.message);
});

pool.on("connect", function() {
  console.error("New database connection established");
});

pool.on("remove", function() {
  console.error("Database connection removed from pool");
});

// Periodic health check
setInterval(function() {
  pool.query("SELECT 1")
    .then(function() {
      console.error("Pool health: OK (" + pool.totalCount + " total, " + pool.idleCount + " idle, " + pool.waitingCount + " waiting)");
    })
    .catch(function(err) {
      console.error("Pool health: FAILED - " + err.message);
    });
}, 60000);

Result Formatting

// Format query results for optimal AI consumption

function formatResults(result, options) {
  options = options || {};
  var format = options.format || "json";

  switch (format) {
    case "json":
      return JSON.stringify({
        columns: result.fields.map(function(f) { return f.name; }),
        rows: result.rows,
        rowCount: result.rows.length
      }, null, 2);

    case "table":
      return formatAsTable(result);

    case "csv":
      return formatAsCsv(result);

    default:
      return JSON.stringify(result.rows, null, 2);
  }
}

function formatAsTable(result) {
  if (result.rows.length === 0) return "(no rows)";

  var columns = result.fields.map(function(f) { return f.name; });

  // Calculate column widths
  var widths = columns.map(function(col, i) {
    var maxWidth = col.length;
    result.rows.forEach(function(row) {
      var val = String(row[col] === null ? "NULL" : row[col]);
      if (val.length > maxWidth) maxWidth = val.length;
    });
    return Math.min(maxWidth, 40); // Cap at 40 chars
  });

  // Build header
  var header = columns.map(function(col, i) {
    return col.padEnd(widths[i]);
  }).join(" | ");

  var separator = widths.map(function(w) {
    return "-".repeat(w);
  }).join("-+-");

  // Build rows
  var rows = result.rows.map(function(row) {
    return columns.map(function(col, i) {
      var val = String(row[col] === null ? "NULL" : row[col]);
      if (val.length > widths[i]) val = val.substring(0, widths[i] - 3) + "...";
      return val.padEnd(widths[i]);
    }).join(" | ");
  });

  return header + "\n" + separator + "\n" + rows.join("\n")
    + "\n\n(" + result.rows.length + " rows)";
}

function formatAsCsv(result) {
  var columns = result.fields.map(function(f) { return f.name; });
  var lines = [columns.join(",")];

  result.rows.forEach(function(row) {
    var values = columns.map(function(col) {
      var val = row[col];
      if (val === null) return "";
      val = String(val);
      if (val.indexOf(",") !== -1 || val.indexOf('"') !== -1 || val.indexOf("\n") !== -1) {
        return '"' + val.replace(/"/g, '""') + '"';
      }
      return val;
    });
    lines.push(values.join(","));
  });

  return lines.join("\n");
}

Common Issues & Troubleshooting

"permission denied for table users"

The MCP database user lacks SELECT permission on the table:

-- Check current grants
SELECT grantee, privilege_type
FROM information_schema.role_table_grants
WHERE table_name = 'users' AND grantee = 'mcp_reader';

-- Fix: grant SELECT
GRANT SELECT ON public.users TO mcp_reader;

Query Times Out on Large Tables

Error: canceling statement due to statement timeout

The 30-second timeout protects against runaway queries. Add appropriate WHERE clauses or LIMIT:

-- Instead of: SELECT * FROM logs
-- Use: SELECT * FROM logs WHERE created_at > NOW() - INTERVAL '1 day' LIMIT 100

"too many connections" Error

The MCP server's pool is exhausting the database's connection limit:

Error: remaining connection slots are reserved for
non-replication superuser connections

Reduce the pool's max setting or increase the database's max_connections. For MCP servers, 3-5 connections is usually sufficient.

Model Generates INSERT/UPDATE Queries

Despite clear tool descriptions, models sometimes attempt write operations. The query validator blocks these, but return a helpful message:

// In the error response:
"Only SELECT queries are allowed. If you need to modify data, "
+ "please describe what changes you need and I will execute them "
+ "through the appropriate application API."

Best Practices

  • Always use a read-only database user — Never connect the MCP server with credentials that can modify data. A dedicated read-only user with SELECT-only grants is non-negotiable.
  • Set query timeouts at multiple levels — Database user timeout, connection pool timeout, and application timeout. Belt, suspenders, and a parachute.
  • Validate all SQL before execution — Block INSERT, UPDATE, DELETE, DROP, and other write operations even if the database user cannot execute them. Defense in depth.
  • Limit result set sizes — Add LIMIT clauses automatically. Models rarely need more than 100-500 rows to answer a question.
  • Log every query for audit — Record what was queried, when, and how long it took. This log is essential for debugging and security review.
  • Expose schema as resources, queries as tools — Schema information is static context (resources). Running queries is an action (tools). This separation follows MCP conventions.
  • Use parameterized queries — Even with read-only access, support $1/$2 parameter placeholders to prevent SQL injection patterns in query text.
  • Return metadata with results — Include column names, types, row counts, and query duration. This context helps models interpret results correctly.

References

Powered by Contentful