
Levo AI Security Research Panel
Research Team
TL;DR
This is a hands-on, developer-first security play: you’ll scaffold a minimal MCP server, lock down inputs, stop SSRF and prompt-driven abuse, add redaction and region routing, make writes idempotent, cap cost and concurrency, emit useful spans, and ship adversarial tests, so your agent can act without blowing up prod.
Who this is for
Full-stack developers and platform engineers wiring MCP tools into real systems who want practical, copy-paste controls that don’t stall delivery.
What you’ll ship in ~90 minutes
- A secure MCP server (Node or Python) with two tools (orders.export read + flag.set write).
- Transport/auth defaults (mTLS, short-TTL JWT, KMS secrets).
- Input schemas & allow-lists, SSRF-safe fetch, redaction/DLP.
- Idempotency keys, per-principal rate limits & budgets, stop rules.
- OpenTelemetry spans with actor/scope/policy attributes.
- Adversarial tests & fuzzing you can drop into CI.
- Kill switches (tool quarantine, read-only mode).
Threat model in 60 seconds
- Prompt injection & tool misuse → deny risky verbs without approvals; strict schemas.
- Over-scoped authority → purpose scopes, short sessions, JIT elevation.
- SSRF via fetch → URL allow-list, block link-local and private ranges.
- Mass assignment / injection → schema allow-lists, reject unknowns, escape output.
- Exfil & privacy drift → inline redaction, region routing, vendor allow-lists.
- Denial of wallet (loops) → budgets, rate & concurrency caps, stop rules.
Play 1 - Scaffold a secure MCP server
TypeScript (Node) quickstart
npm init -y
npm i @modelcontextprotocol/sdk zod
// src/server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const mcp = new McpServer({ name: "team-tools", version: "1.0.0" });
/** Utility: reject unknown fields everywhere */
const strict = (s: T) => s.strict();
/** READ: orders.export (masked by default) */
const ExportSchema = strict(z.object({
from: z.string().regex(/^\d{4}-\d{2}-\d{2}$/),
to: z.string().regex(/^\d{4}-\d{2}-\d{2}$/),
mask: z.array(z.enum(["email","phone"])).optional().default(["email"])
}));
mcp.registerTool("orders.export",
{ title: "Export orders", description: "Date range export with masking", inputSchema: ExportSchema },
async ({ from, to, mask }) => {
const rows = [
{ orderId: "1001", email: "a@ex.com", phone: "555-123", total: 30.5 },
{ orderId: "1002", email: "b@ex.com", phone: "555-987", total: 18.0 }
];
const masked = rows.map(r => ({
...r,
email: mask?.includes("email") ? "***@***" : r.email,
phone: mask?.includes("phone") ? "***-***" : r.phone
}));
return { content: [{ type: "json", json: { range: { from, to }, count: masked.length, rows: masked } }] };
}
);
/** WRITE: flag.set (prod >10% requires change ticket) + idempotency key */
const FlagSchema = strict(z.object({
name: z.string().min(1),
env: z.enum(["dev","staging","prod"]),
rolloutPct: z.number().min(0).max(100),
ticket: z.string().optional(),
idemKey: z.string().uuid().optional()
}));
const seenIdem = new Set();
mcp.registerTool("flag.set",
{ title: "Set feature flag", description: "Set rollout % with safe defaults", inputSchema: FlagSchema },
async ({ name, env, rolloutPct, ticket, idemKey }) => {
if (env === "prod" && rolloutPct > 10 && !ticket) {
return { content: [{ type: "text", text: "DENY: prod change >10% requires change ticket" }] };
}
if (idemKey) {
if (seenIdem.has(idemKey)) return { content: [{ type: "text", text: `OK (idempotent): ${name}@${rolloutPct}%` }] };
seenIdem.add(idemKey);
}
// TODO: call your flag service with retries/backoff
return { content: [{ type: "text", text: `Flag ${name} set to ${rolloutPct}% in ${env}` }] };
}
);
await mcp.connect(new StdioServerTransport());
console.log("MCP secure server ready");
Run:
node --loader ts-node/esm src/server.ts
Python (alternate)
# server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from pydantic import BaseModel, Field
from typing import List, Optional
srv = Server("team-tools", "1.0.0")
_seen = set()
class ExportSchema(BaseModel):
from_: str = Field(pattern=r"^\d{4}-\d{2}-\d{2}$", alias="from")
to: str = Field(pattern=r"^\d{4}-\d{2}-\d{2}$")
mask: List[str] = []
@srv.tool()
def orders_export(from_: str, to: str, mask: Optional[List[str]] = None):
rows=[{"orderId":"1001","email":"a@ex.com","phone":"555-123","total":30.5},
{"orderId":"1002","email":"b@ex.com","phone":"555-987","total":18.0}]
mask = mask or ["email"]
for r in rows:
if "email" in mask: r["email"]="***@***"
if "phone" in mask: r["phone"]="***-***"
return {"range":{"from":from_,"to":to}, "count":len(rows), "rows":rows}
@srv.tool()
def flag_set(name: str, env: str, rolloutPct: float, ticket: Optional[str]=None, idemKey: Optional[str]=None):
if env=="prod" and rolloutPct>10 and not ticket:
return "DENY: prod change >10% requires change ticket"
if idemKey:
if idemKey in _seen: return f"OK (idempotent): {name}@{rolloutPct}%"
_seen.add(idemKey)
return f"Flag {name} set to {rolloutPct}% in {env}"
if __name__ == "__main__":
stdio_server(srv)
Play 2 - Transport, tokens, secrets (sane defaults)
# security.baseline.yaml
transport:
tls_min_version: "1.2"
mtls: true
cert_rotation_days: 90
auth:
token_type: "JWT"
claims: ["aud","iss","sub","exp","nbf","jti"]
max_ttl_seconds: 600
replay_window_minutes: 15
secrets:
provider: "KMS|Vault"
rotation_days: 60
forbid_env_for_prod: true
Notes
- Verify
aud/iss
, reject clock skew >60s, store jti
for 15m for replay defense. - Use purpose-scoped grants:
orders.read
, flags.write:staging
; deny wildcards.
Play 3 - Schema allow-lists & mass-assignment defense
- Reject unknown fields (see
strict(z.object()
) / Pydantic default). - Enumerate strings where possible (
env
, segment
, region
). - Bounds for numbers (
0–100
for rolloutPct
). - Validate email/URL formats when you truly need them.
Play 4 - SSRF-safe fetch tool (copyable)
import { URL } from "node:url";
const DENY_HOSTS = new Set(["169.254.169.254","metadata.google.internal"]);
const PRIVATE_RANGES = [/^10\./, /^192\.168\./, /^172\.(1[6-9]|2\d|3[0-1])\./, /^127\./, /^0\./];
function isPrivate(host: string) { return [...PRIVATE_RANGES].some(r=>r.test(host)); }
mcp.registerTool("web.fetchSafe",
{ title:"Safe fetch", description:"HTTP GET with SSRF protections",
inputSchema: strict(z.object({ url: z.string().url() })) },
async ({ url }) => {
const u = new URL(url);
if (DENY_HOSTS.has(u.hostname) || isPrivate(u.hostname)) {
return { content: [{ type:"text", text:"DENY: SSRF-protected host" }] };
}
// TODO: add outbound allow-list
const res = await fetch(u, { method:"GET", redirect:"error" });
const text = await res.text();
return { content: [{ type:"text", text: text.slice(0, 2000) }] }; // size cap
});
Play 5 - Redaction & region routing (in the path)
# policy.yaml
- rule: redact-email-by-default
when: { tool: "orders.export" }
decision: allow
redact:
- field: "email"
mode: "mask"
- rule: route-eu-personal-data
when: { data.contains_pii: true, subject.region: "EU" }
route: "eu-west"
- rule: vendor-allow-list
when: { egress.domain: "*" }
decision: deny
unless: { egress.domain_in_allowlist: true }
Play 6 - Idempotency & safe writes
- Require
idemKey
(UUID) for any mutating tool. - Cache decisions for 24h; return
OK (idempotent)
for repeats. - On the service side, store operation IDs to dedupe retries.
Play 7 - Rate limits, budgets, and stop rules
{
"principal": "agent:copilot",
"limits": { "rpm": 60, "burst": 100, "concurrency": 5, "daily_cost_usd": 50 },
"stop_rules": { "max_plan_depth": 12, "max_retries": 3 }
}
- Enforce per principal and tenant.
- Emit budget use in spans so dashboards are trivial.
Play 8 - Spans & signed evidence you’ll actually use
Span names
agent.plan | mcp.tool.invoke | service.api.call | policy.evaluate | dlp.redact
Attributes (add to every span)
actor.id, actor.type, tool.name, scope.grants, policy.decision, pii.count, idem.key, tenant.id
- Store append-only signed logs keyed by traceId for audits.
Play 9 - Adversarial tests & fuzzing (drop into CI)
Jest (Node) snippets
it("denies risky prod flag change without ticket", async () => {
const res = await callTool("flag.set", { name:"checkout_v2", env:"prod", rolloutPct:50 });
expect(text(res)).toMatch(/DENY/);
});
it("rejects unknown fields (mass assignment)", async () => {
const res = await callTool("flag.set", { name:"f", env:"dev", rolloutPct:5, evil:"oops" });
expect(error(res)).toMatch(/unknown/i);
});
fast-check fuzzing idea
import fc from "fast-check";
it("rolloutPct stays in bounds", () => {
fc.assert(fc.property(fc.integer(-100, 200), async pct => {
const r = await callTool("flag.set", { name:"x", env:"dev", rolloutPct:pct });
// Expect validation error outside 0..100
}));
});
Pytest + hypothesis
from hypothesis import given, strategies as st
@given(st.integers(min_value=-50, max_value=150))
def test_pct_bounds(pct):
res = call_tool("flag_set", {"name":"x","env":"dev","rolloutPct":pct})
assert (-0 <= pct <= 100 and "set to" in res) or ("error" in res.lower())
Play 10 - Kill switches & quarantine
- Read-only mode: env flag that disables all mutating tools.
- Tool quarantine: deny a specific tool ID; return diagnostic text.
- Egress blocklist: instant add → enforced in fetch layer.
- Runbook: who approves, response timers, evidence export, rollback steps.
Example (very simple):
const READ_ONLY = process.env.MCP_READ_ONLY === "1";
mcp.registerTool("flag.set", /* ... */, async (args) => {
if (READ_ONLY) return { content: [{ type:"text", text:"DENY: MCP in read-only mode" }] };
// ...
});
Quick host config (Claude Desktop example)
{
"mcpServers": {
"team-tools-node": { "command": "node", "args": ["--loader","ts-node/esm","/ABS/PATH/src/server.ts"] },
"team-tools-py": { "command": "python", "args": ["/ABS/PATH/server.py"] }
}
}
Smoke test:
- “Export orders from 2025-08-25 to 2025-08-31 (emails masked)”
- “Set flag checkout_v2 to 50% in prod” → DENY without ticket
- “Fetch https://example.com with web.fetchSafe” → returns body (size-capped)
OWASP API Top 10 → MCP controls (pocket table)
Risk |
Control |
Test hint |
BOLA/BFLA |
Owner checks in tool logic; purpose scopes |
Foreign ID → 403 |
Broken auth |
mTLS + JWT (aud/exp/nbf/jti ), short TTL |
Replay jti → 401 |
Excess exposure |
Redaction defaults; minimal selects |
Masked fields present |
Lack of limits |
RPM/RPS & concurrency caps |
Stress → 429 not 5xx |
Mass assignment |
Strict schemas; reject unknowns |
Extra field → 400 |
Injection |
No eval; escape & validate |
Quotes/payloads safe |
SSRF |
Allow-list; block link-local/private |
169.254.* denied |
Misconfig |
Cache/header hygiene; size caps |
Headers present |
Inventory |
Catalog + signed servers; drift scans |
No unsigned servers |
Logging |
PII scrubbing; token masking |
Log scan clean |
Paste-ready checklists
Ship checklist
- Tools have strict schemas & examples
- SSRF-safe fetch with allow-list/blocklist
- Redaction + region routing policies applied
- Idempotency keys on writes
- Limits (RPM/RPS/concurrency) + budgets per principal/tenant
- Spans with actor/scope/policy + evidence logs signed
- Adversarial tests in CI (prompt injection, SSRF, mass assignment)
- Kill switches (read-only, quarantine, egress block)
Operate checklist
- Cert rotation ≤ 90 days; secrets via KMS/Vault
- Token TTL ≤ 10m; replay defense (
jti
) - Tabletop IR quarterly; logs retained per policy
- Drift report of catalog & policies daily