September 4, 2025

MCP Security: A Developer Play

Photo of the author of the blog post
Buchi Reddy B

CEO & Founder at LEVO

Photo of the author of the blog post
Levo AI Security Research Panel

Research Team

MCP Security: A Developer Play

TL;DR

This is a hands-on, developer-first security play: you’ll scaffold a minimal MCP server, lock down inputs, stop SSRF and prompt-driven abuse, add redaction and region routing, make writes idempotent, cap cost and concurrency, emit useful spans, and ship adversarial tests, so your agent can act without blowing up prod.

Who this is for

Full-stack developers and platform engineers wiring MCP tools into real systems who want practical, copy-paste controls that don’t stall delivery.

What you’ll ship in ~90 minutes

  • A secure MCP server (Node or Python) with two tools (orders.export read + flag.set write).
  • Transport/auth defaults (mTLS, short-TTL JWT, KMS secrets).
  • Input schemas & allow-lists, SSRF-safe fetch, redaction/DLP.
  • Idempotency keys, per-principal rate limits & budgets, stop rules.
  • OpenTelemetry spans with actor/scope/policy attributes.
  • Adversarial tests & fuzzing you can drop into CI.
  • Kill switches (tool quarantine, read-only mode).

Threat model in 60 seconds

  • Prompt injection & tool misuse → deny risky verbs without approvals; strict schemas.
  • Over-scoped authority → purpose scopes, short sessions, JIT elevation.
  • SSRF via fetch → URL allow-list, block link-local and private ranges.
  • Mass assignment / injection → schema allow-lists, reject unknowns, escape output.
  • Exfil & privacy drift → inline redaction, region routing, vendor allow-lists.
  • Denial of wallet (loops) → budgets, rate & concurrency caps, stop rules.

Play 1 - Scaffold a secure MCP server

TypeScript (Node) quickstart

BASH
npm init -y
npm i @modelcontextprotocol/sdk zod
TS
// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

const mcp = new McpServer({ name: "team-tools", version: "1.0.0" });

/** Utility: reject unknown fields everywhere */
const strict = (s: T) => s.strict();

/** READ: orders.export (masked by default) */
const ExportSchema = strict(z.object({
  from: z.string().regex(/^\d{4}-\d{2}-\d{2}$/),
  to:   z.string().regex(/^\d{4}-\d{2}-\d{2}$/),
  mask: z.array(z.enum(["email","phone"])).optional().default(["email"])
}));
mcp.registerTool("orders.export",
  { title: "Export orders", description: "Date range export with masking", inputSchema: ExportSchema },
  async ({ from, to, mask }) => {
    const rows = [
      { orderId: "1001", email: "a@ex.com", phone: "555-123", total: 30.5 },
      { orderId: "1002", email: "b@ex.com", phone: "555-987", total: 18.0 }
    ];
    const masked = rows.map(r => ({
      ...r,
      email: mask?.includes("email") ? "***@***" : r.email,
      phone: mask?.includes("phone") ? "***-***" : r.phone
    }));
    return { content: [{ type: "json", json: { range: { from, to }, count: masked.length, rows: masked } }] };
  }
);

/** WRITE: flag.set (prod >10% requires change ticket) + idempotency key */
const FlagSchema = strict(z.object({
  name: z.string().min(1),
  env: z.enum(["dev","staging","prod"]),
  rolloutPct: z.number().min(0).max(100),
  ticket: z.string().optional(),
  idemKey: z.string().uuid().optional()
}));
const seenIdem = new Set();
mcp.registerTool("flag.set",
  { title: "Set feature flag", description: "Set rollout % with safe defaults", inputSchema: FlagSchema },
  async ({ name, env, rolloutPct, ticket, idemKey }) => {
    if (env === "prod" && rolloutPct > 10 && !ticket) {
      return { content: [{ type: "text", text: "DENY: prod change >10% requires change ticket" }] };
    }
    if (idemKey) {
      if (seenIdem.has(idemKey)) return { content: [{ type: "text", text: `OK (idempotent): ${name}@${rolloutPct}%` }] };
      seenIdem.add(idemKey);
    }
    // TODO: call your flag service with retries/backoff
    return { content: [{ type: "text", text: `Flag ${name} set to ${rolloutPct}% in ${env}` }] };
  }
);

await mcp.connect(new StdioServerTransport());
console.log("MCP secure server ready");

Run:

BASH
node --loader ts-node/esm src/server.ts

Python (alternate)

BASH
pip install mcp pydantic
PYTHON
# server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from pydantic import BaseModel, Field
from typing import List, Optional

srv = Server("team-tools", "1.0.0")
_seen = set()

class ExportSchema(BaseModel):
    from_: str = Field(pattern=r"^\d{4}-\d{2}-\d{2}$", alias="from")
    to: str    = Field(pattern=r"^\d{4}-\d{2}-\d{2}$")
    mask: List[str] = []

@srv.tool()
def orders_export(from_: str, to: str, mask: Optional[List[str]] = None):
    rows=[{"orderId":"1001","email":"a@ex.com","phone":"555-123","total":30.5},
          {"orderId":"1002","email":"b@ex.com","phone":"555-987","total":18.0}]
    mask = mask or ["email"]
    for r in rows:
      if "email" in mask: r["email"]="***@***"
      if "phone" in mask: r["phone"]="***-***"
    return {"range":{"from":from_,"to":to}, "count":len(rows), "rows":rows}

@srv.tool()
def flag_set(name: str, env: str, rolloutPct: float, ticket: Optional[str]=None, idemKey: Optional[str]=None):
    if env=="prod" and rolloutPct>10 and not ticket:
        return "DENY: prod change >10% requires change ticket"
    if idemKey:
        if idemKey in _seen: return f"OK (idempotent): {name}@{rolloutPct}%"
        _seen.add(idemKey)
    return f"Flag {name} set to {rolloutPct}% in {env}"

if __name__ == "__main__":
    stdio_server(srv)

Play 2 - Transport, tokens, secrets (sane defaults)

YAML
# security.baseline.yaml
transport:
  tls_min_version: "1.2"
  mtls: true
  cert_rotation_days: 90
auth:
  token_type: "JWT"
  claims: ["aud","iss","sub","exp","nbf","jti"]
  max_ttl_seconds: 600
  replay_window_minutes: 15
secrets:
  provider: "KMS|Vault"
  rotation_days: 60
  forbid_env_for_prod: true

Notes

  • Verify aud/iss, reject clock skew >60s, store jti for 15m for replay defense.
  • Use purpose-scoped grants: orders.read, flags.write:staging; deny wildcards.

Play 3 - Schema allow-lists & mass-assignment defense

  • Reject unknown fields (see strict(z.object()) / Pydantic default).
  • Enumerate strings where possible (env, segment, region).
  • Bounds for numbers (0–100 for rolloutPct).
  • Validate email/URL formats when you truly need them.

Play 4 - SSRF-safe fetch tool (copyable)

TS
import { URL } from "node:url";
const DENY_HOSTS = new Set(["169.254.169.254","metadata.google.internal"]);
const PRIVATE_RANGES = [/^10\./, /^192\.168\./, /^172\.(1[6-9]|2\d|3[0-1])\./, /^127\./, /^0\./];

function isPrivate(host: string) { return [...PRIVATE_RANGES].some(r=>r.test(host)); }

mcp.registerTool("web.fetchSafe",
 { title:"Safe fetch", description:"HTTP GET with SSRF protections",
   inputSchema: strict(z.object({ url: z.string().url() })) },
 async ({ url }) => {
   const u = new URL(url);
   if (DENY_HOSTS.has(u.hostname) || isPrivate(u.hostname)) {
     return { content: [{ type:"text", text:"DENY: SSRF-protected host" }] };
   }
   // TODO: add outbound allow-list
   const res = await fetch(u, { method:"GET", redirect:"error" });
   const text = await res.text();
   return { content: [{ type:"text", text: text.slice(0, 2000) }] }; // size cap
 });

Play 5 - Redaction & region routing (in the path)

YAML
# policy.yaml
- rule: redact-email-by-default
  when: { tool: "orders.export" }
  decision: allow
  redact:
    - field: "email"
      mode: "mask"

- rule: route-eu-personal-data
  when: { data.contains_pii: true, subject.region: "EU" }
  route: "eu-west"

- rule: vendor-allow-list
  when: { egress.domain: "*" }
  decision: deny
  unless: { egress.domain_in_allowlist: true }

Play 6 - Idempotency & safe writes

  • Require idemKey (UUID) for any mutating tool.
  • Cache decisions for 24h; return OK (idempotent) for repeats.
  • On the service side, store operation IDs to dedupe retries.

Play 7 - Rate limits, budgets, and stop rules

JSON
{
  "principal": "agent:copilot",
  "limits": { "rpm": 60, "burst": 100, "concurrency": 5, "daily_cost_usd": 50 },
  "stop_rules": { "max_plan_depth": 12, "max_retries": 3 }
}
  • Enforce per principal and tenant.
  • Emit budget use in spans so dashboards are trivial.

Play 8 - Spans & signed evidence you’ll actually use

Span names

PGSQL
agent.plan | mcp.tool.invoke | service.api.call | policy.evaluate | dlp.redact

Attributes (add to every span)

PYTHON
actor.id, actor.type, tool.name, scope.grants, policy.decision, pii.count, idem.key, tenant.id
  • Store append-only signed logs keyed by traceId for audits.

Play 9 - Adversarial tests & fuzzing (drop into CI)

Jest (Node) snippets

TS
it("denies risky prod flag change without ticket", async () => {
  const res = await callTool("flag.set", { name:"checkout_v2", env:"prod", rolloutPct:50 });
  expect(text(res)).toMatch(/DENY/);
});

it("rejects unknown fields (mass assignment)", async () => {
  const res = await callTool("flag.set", { name:"f", env:"dev", rolloutPct:5, evil:"oops" });
  expect(error(res)).toMatch(/unknown/i);
});

fast-check fuzzing idea

TS
import fc from "fast-check";
it("rolloutPct stays in bounds", () => {
  fc.assert(fc.property(fc.integer(-100, 200), async pct => {
    const r = await callTool("flag.set", { name:"x", env:"dev", rolloutPct:pct });
    // Expect validation error outside 0..100
  }));
});

Pytest + hypothesis

PYTHON
from hypothesis import given, strategies as st
@given(st.integers(min_value=-50, max_value=150))
def test_pct_bounds(pct):
    res = call_tool("flag_set", {"name":"x","env":"dev","rolloutPct":pct})
    assert (-0 <= pct <= 100 and "set to" in res) or ("error" in res.lower())

Play 10 - Kill switches & quarantine

  • Read-only mode: env flag that disables all mutating tools.
  • Tool quarantine: deny a specific tool ID; return diagnostic text.
  • Egress blocklist: instant add → enforced in fetch layer.
  • Runbook: who approves, response timers, evidence export, rollback steps.

Example (very simple):

TS
const READ_ONLY = process.env.MCP_READ_ONLY === "1";
mcp.registerTool("flag.set", /* ... */, async (args) => {
  if (READ_ONLY) return { content: [{ type:"text", text:"DENY: MCP in read-only mode" }] };
  // ...
});

Quick host config (Claude Desktop example)

JSON
{
  "mcpServers": {
    "team-tools-node": { "command": "node", "args": ["--loader","ts-node/esm","/ABS/PATH/src/server.ts"] },
    "team-tools-py":   { "command": "python", "args": ["/ABS/PATH/server.py"] }
  }
}

Smoke test:

  • “Export orders from 2025-08-25 to 2025-08-31 (emails masked)”
  • “Set flag checkout_v2 to 50% in prod” → DENY without ticket
  • “Fetch https://example.com with web.fetchSafe” → returns body (size-capped)

OWASP API Top 10 → MCP controls (pocket table)

Risk Control Test hint
BOLA/BFLA Owner checks in tool logic; purpose scopes Foreign ID → 403
Broken auth mTLS + JWT (aud/exp/nbf/jti), short TTL Replay jti → 401
Excess exposure Redaction defaults; minimal selects Masked fields present
Lack of limits RPM/RPS & concurrency caps Stress → 429 not 5xx
Mass assignment Strict schemas; reject unknowns Extra field → 400
Injection No eval; escape & validate Quotes/payloads safe
SSRF Allow-list; block link-local/private 169.254.* denied
Misconfig Cache/header hygiene; size caps Headers present
Inventory Catalog + signed servers; drift scans No unsigned servers
Logging PII scrubbing; token masking Log scan clean

Paste-ready checklists

Ship checklist

  • Tools have strict schemas & examples
  • SSRF-safe fetch with allow-list/blocklist
  • Redaction + region routing policies applied
  • Idempotency keys on writes
  • Limits (RPM/RPS/concurrency) + budgets per principal/tenant
  • Spans with actor/scope/policy + evidence logs signed
  • Adversarial tests in CI (prompt injection, SSRF, mass assignment)
  • Kill switches (read-only, quarantine, egress block)

Operate checklist

  • Cert rotation ≤ 90 days; secrets via KMS/Vault
  • Token TTL ≤ 10m; replay defense (jti)
  • Tabletop IR quarterly; logs retained per policy
  • Drift report of catalog & policies daily

ON THIS PAGE

We didn’t join the API Security Bandwagon. We pioneered it!