Copy #!/usr/bin/env python3
"""
MCP Server for traffic-based code modernization.
Captures HTTP traffic and provides tools for:
- Route discovery and grouping
- Schema inference from observed payloads
- Type generation (Pydantic, TypedDict, dataclass, etc.)
- Test data generation
- Before/after traffic comparison
"""
import base64
import hashlib
import json
import re
import sqlite3
import time
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Traffic Analysis")
DEVTOOLS_URL = "http://localhost:10001/devtools/api/events"
DB_PATH = Path(__file__).parent / ".traffic_cache.db"
# =============================================================================
# Path Normalization Patterns
# =============================================================================
DYNAMIC_PATTERNS = [
(re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', re.I), '{uuid}'),
(re.compile(r'\d{4}-\d{2}-\d{2}'), '{date}'),
(re.compile(r'[0-9a-f]{24}', re.I), '{object_id}'),
(re.compile(r'[A-Za-z0-9_-]{20,}'), '{token}'),
(re.compile(r'\d+'), '{id}'),
]
def _normalize_path(path: str) -> str:
"""Normalize a path by replacing dynamic segments with placeholders."""
if '?' in path:
path, query = path.split('?', 1)
else:
query = None
segments = path.split('/')
normalized = []
for segment in segments:
if not segment:
normalized.append(segment)
continue
replaced = segment
for pattern, placeholder in DYNAMIC_PATTERNS:
if pattern.fullmatch(segment):
replaced = placeholder
break
normalized.append(replaced)
return '/'.join(normalized)
# =============================================================================
# SQLite Database Management
# =============================================================================
def _get_db() -> sqlite3.Connection:
"""Get a database connection, creating schema if needed."""
db = sqlite3.connect(str(DB_PATH))
db.row_factory = sqlite3.Row
db.executescript("""
CREATE TABLE IF NOT EXISTS traffic (
id INTEGER PRIMARY KEY,
captured_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
host TEXT NOT NULL,
method TEXT NOT NULL,
path TEXT NOT NULL,
route_pattern TEXT NOT NULL,
status_code INTEGER,
request_headers TEXT,
request_body TEXT,
response_headers TEXT,
response_body TEXT,
process_exe TEXT,
content_type TEXT,
body_hash TEXT
);
CREATE INDEX IF NOT EXISTS idx_route ON traffic(route_pattern, host);
CREATE INDEX IF NOT EXISTS idx_captured ON traffic(captured_at);
CREATE INDEX IF NOT EXISTS idx_host ON traffic(host);
""")
return db
def _hash_body(body: str | None) -> str | None:
"""Create a hash of body content for deduplication."""
if not body:
return None
return hashlib.sha256(body.encode()).hexdigest()[:16]
# =============================================================================
# Traffic Capture (from DevTools API)
# =============================================================================
def _capture_raw_events(seconds: int = 30, max_events: int = 500):
"""Capture raw events from DevTools SSE stream."""
seconds = min(seconds, 300)
try:
with httpx.Client(timeout=seconds + 10) as client:
events = []
current_event_type = None
first_event_time = None
with client.stream("GET", DEVTOOLS_URL, timeout=seconds + 10) as response:
start = time.time()
for line in response.iter_lines():
if time.time() - start > seconds:
break
if line.startswith("event: "):
current_event_type = line[7:]
elif line.startswith("data: "):
try:
event_data = json.loads(line[6:])
event_ts = event_data.get("ts", "")
if first_event_time is None:
first_event_time = event_ts
if current_event_type == "process.started":
if event_ts[:20] == first_event_time[:20]:
continue
events.append({
"type": current_event_type,
"ts": event_ts,
"data": event_data.get("data", {})
})
except json.JSONDecodeError:
pass
if len(events) >= max_events:
break
return events
except httpx.ConnectError:
return {"error": "Cannot connect to DevTools API at localhost:10001",
"hint": "Start Qtap with --enable-dev-tools or ENABLE_DEV_TOOLS=true"}
except Exception as e:
return {"error": str(e)}
def _decode_http_transaction(event):
"""Decode a raw http_transaction event into structured data."""
inner = event.get("data", {})
if isinstance(inner, dict) and "data" in inner:
payload = inner.get("data")
if isinstance(payload, dict) and "data" in payload:
payload = payload.get("data")
if not isinstance(payload, str):
return None
try:
decoded = json.loads(base64.b64decode(payload))
res = decoded.get("response", {})
if res.get("body"):
try:
res["body"] = base64.b64decode(res["body"]).decode('utf-8', errors='replace')
except:
res["body"] = "[binary data]"
req = decoded.get("request", {})
if req.get("body"):
try:
req["body"] = base64.b64decode(req["body"]).decode('utf-8', errors='replace')
except:
req["body"] = "[binary data]"
return {
"ts": event.get("ts"),
"metadata": decoded.get("metadata", {}),
"request": req,
"response": res
}
except:
pass
return None
def _capture_http(seconds: int = 30, max_http: int = 500):
"""Capture and decode HTTP transactions."""
max_raw_events = max(seconds * 500, 10000)
events = _capture_raw_events(seconds=seconds, max_events=max_raw_events)
if isinstance(events, dict) and "error" in events:
return events
http_events = []
for event in events:
if event.get("type") == "request.http_transaction":
decoded = _decode_http_transaction(event)
if decoded:
http_events.append(decoded)
if len(http_events) >= max_http:
break
return http_events
# =============================================================================
# Schema Inference
# =============================================================================
def _infer_type(value: Any) -> dict:
"""Infer JSON Schema type from a Python value."""
if value is None:
return {"type": "null"}
elif isinstance(value, bool):
return {"type": "boolean"}
elif isinstance(value, int):
return {"type": "integer"}
elif isinstance(value, float):
return {"type": "number"}
elif isinstance(value, str):
if re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', value):
return {"type": "string", "format": "date-time"}
elif re.match(r'^\d{4}-\d{2}-\d{2}$', value):
return {"type": "string", "format": "date"}
elif re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
return {"type": "string", "format": "email"}
elif re.match(r'^https?://', value):
return {"type": "string", "format": "uri"}
elif re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', value, re.I):
return {"type": "string", "format": "uuid"}
return {"type": "string"}
elif isinstance(value, list):
if not value:
return {"type": "array", "items": {}}
item_schemas = [_infer_type(item) for item in value[:10] if item is not None]
if item_schemas:
merged = _merge_schemas(item_schemas)
return {"type": "array", "items": merged}
return {"type": "array", "items": {}}
elif isinstance(value, dict):
properties = {}
for k, v in value.items():
properties[k] = _infer_type(v)
return {
"type": "object",
"properties": properties,
"required": list(value.keys())
}
return {}
def _merge_schemas(schemas: list[dict]) -> dict:
"""Merge multiple JSON schemas into one that accepts all observed shapes."""
if not schemas:
return {}
if len(schemas) == 1:
return schemas[0]
types = set()
all_properties = defaultdict(list)
all_required = None
formats = set()
item_schemas = []
for schema in schemas:
schema_type = schema.get("type")
if schema_type:
types.add(schema_type)
if schema.get("format"):
formats.add(schema["format"])
if schema_type == "object" and "properties" in schema:
for prop, prop_schema in schema["properties"].items():
all_properties[prop].append(prop_schema)
required = set(schema.get("required", []))
if all_required is None:
all_required = required
else:
all_required = all_required & required
if schema_type == "array" and "items" in schema:
item_schemas.append(schema["items"])
result = {}
if len(types) == 1:
result["type"] = types.pop()
elif len(types) > 1:
if types == {"integer", "number"}:
result["type"] = "number"
elif "null" in types:
other_types = types - {"null"}
if len(other_types) == 1:
result["type"] = [other_types.pop(), "null"]
else:
result["type"] = list(types)
else:
result["type"] = list(types)
if len(formats) == 1:
result["format"] = formats.pop()
if all_properties:
merged_props = {}
for prop, prop_schemas in all_properties.items():
merged_props[prop] = _merge_schemas(prop_schemas)
result["properties"] = merged_props
if all_required:
result["required"] = sorted(all_required)
if item_schemas:
result["items"] = _merge_schemas(item_schemas)
return result
def _infer_schema_from_bodies(bodies: list[str], target_name: str = "Root") -> dict:
"""Infer JSON Schema from multiple body strings."""
schemas = []
for body in bodies:
if not body or body == "[binary data]":
continue
try:
data = json.loads(body)
schemas.append(_infer_type(data))
except json.JSONDecodeError:
continue
if not schemas:
return {"error": "No valid JSON bodies to analyze"}
merged = _merge_schemas(schemas)
merged["$schema"] = "http://json-schema.org/draft-07/schema#"
merged["title"] = target_name
return merged
# =============================================================================
# Code Generation
# =============================================================================
def _schema_to_python_type(schema: dict, name: str = "Value") -> str:
"""Convert JSON Schema to Python type annotation."""
schema_type = schema.get("type")
if isinstance(schema_type, list):
types = [_schema_to_python_type({"type": t}, name) for t in schema_type]
return " | ".join(types)
if schema_type == "null":
return "None"
elif schema_type == "boolean":
return "bool"
elif schema_type == "integer":
return "int"
elif schema_type == "number":
return "float"
elif schema_type == "string":
return "str"
elif schema_type == "array":
items = schema.get("items", {})
item_type = _schema_to_python_type(items, f"{name}Item")
return f"list[{item_type}]"
elif schema_type == "object":
return name
return "Any"
def _to_class_name(name: str) -> str:
"""Convert a string to PascalCase class name."""
parts = re.split(r'[_-]', name)
return ''.join(word.capitalize() for word in parts)
def _generate_pydantic_v2(schema: dict, class_name: str = "Model") -> str:
"""Generate Pydantic v2 model from JSON Schema."""
lines = [
"from pydantic import BaseModel, Field",
"from typing import Any",
""
]
def generate_class(schema: dict, name: str, indent: int = 0) -> list[str]:
result = []
prefix = " " * indent
if schema.get("type") != "object" or "properties" not in schema:
return result
for prop_name, prop_schema in schema.get("properties", {}).items():
if prop_schema.get("type") == "object" and "properties" in prop_schema:
nested_name = _to_class_name(prop_name)
result.extend(generate_class(prop_schema, nested_name, indent))
result.append("")
required = set(schema.get("required", []))
result.append(f"{prefix}class {name}(BaseModel):")
properties = schema.get("properties", {})
if not properties:
result.append(f"{prefix} pass")
return result
for prop_name, prop_schema in properties.items():
is_required = prop_name in required
if prop_schema.get("type") == "object" and "properties" in prop_schema:
py_type = _to_class_name(prop_name)
else:
py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
if not is_required:
py_type = f"{py_type} | None"
field_args = []
if not is_required:
field_args.append("default=None")
if field_args:
result.append(f"{prefix} {prop_name}: {py_type} = Field({', '.join(field_args)})")
else:
result.append(f"{prefix} {prop_name}: {py_type}")
return result
lines.extend(generate_class(schema, class_name))
return "\n".join(lines)
def _generate_pydantic_v1(schema: dict, class_name: str = "Model") -> str:
"""Generate Pydantic v1 compatible model from JSON Schema."""
lines = [
"from pydantic import BaseModel",
"from typing import Any, List, Optional",
""
]
def generate_class(schema: dict, name: str) -> list[str]:
result = []
if schema.get("type") != "object" or "properties" not in schema:
return result
for prop_name, prop_schema in schema.get("properties", {}).items():
if prop_schema.get("type") == "object" and "properties" in prop_schema:
nested_name = _to_class_name(prop_name)
result.extend(generate_class(prop_schema, nested_name))
result.append("")
required = set(schema.get("required", []))
result.append(f"class {name}(BaseModel):")
properties = schema.get("properties", {})
if not properties:
result.append(" pass")
return result
for prop_name, prop_schema in properties.items():
is_required = prop_name in required
if prop_schema.get("type") == "object" and "properties" in prop_schema:
py_type = _to_class_name(prop_name)
else:
py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
py_type = py_type.replace(" | None", "")
py_type = py_type.replace("list[", "List[")
if not is_required:
result.append(f" {prop_name}: Optional[{py_type}] = None")
else:
result.append(f" {prop_name}: {py_type}")
return result
lines.extend(generate_class(schema, class_name))
return "\n".join(lines)
def _generate_typeddict(schema: dict, class_name: str = "Model") -> str:
"""Generate TypedDict from JSON Schema."""
lines = [
"from typing import TypedDict, NotRequired, Any",
""
]
def generate_class(schema: dict, name: str) -> list[str]:
result = []
if schema.get("type") != "object" or "properties" not in schema:
return result
for prop_name, prop_schema in schema.get("properties", {}).items():
if prop_schema.get("type") == "object" and "properties" in prop_schema:
nested_name = _to_class_name(prop_name)
result.extend(generate_class(prop_schema, nested_name))
result.append("")
required = set(schema.get("required", []))
result.append(f"class {name}(TypedDict):")
properties = schema.get("properties", {})
if not properties:
result.append(" pass")
return result
for prop_name, prop_schema in properties.items():
is_required = prop_name in required
if prop_schema.get("type") == "object" and "properties" in prop_schema:
py_type = _to_class_name(prop_name)
else:
py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
if not is_required:
result.append(f" {prop_name}: NotRequired[{py_type}]")
else:
result.append(f" {prop_name}: {py_type}")
return result
lines.extend(generate_class(schema, class_name))
return "\n".join(lines)
def _generate_dataclass(schema: dict, class_name: str = "Model") -> str:
"""Generate dataclass from JSON Schema."""
lines = [
"from dataclasses import dataclass, field",
"from typing import Any",
""
]
def generate_class(schema: dict, name: str) -> list[str]:
result = []
if schema.get("type") != "object" or "properties" not in schema:
return result
for prop_name, prop_schema in schema.get("properties", {}).items():
if prop_schema.get("type") == "object" and "properties" in prop_schema:
nested_name = _to_class_name(prop_name)
result.extend(generate_class(prop_schema, nested_name))
result.append("")
required = set(schema.get("required", []))
result.append("@dataclass")
result.append(f"class {name}:")
properties = schema.get("properties", {})
if not properties:
result.append(" pass")
return result
required_props = [(k, v) for k, v in properties.items() if k in required]
optional_props = [(k, v) for k, v in properties.items() if k not in required]
for prop_name, prop_schema in required_props:
if prop_schema.get("type") == "object" and "properties" in prop_schema:
py_type = _to_class_name(prop_name)
else:
py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
result.append(f" {prop_name}: {py_type}")
for prop_name, prop_schema in optional_props:
if prop_schema.get("type") == "object" and "properties" in prop_schema:
py_type = _to_class_name(prop_name)
else:
py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
result.append(f" {prop_name}: {py_type} | None = None")
return result
lines.extend(generate_class(schema, class_name))
return "\n".join(lines)
# =============================================================================
# MCP Tools - Traffic Persistence
# =============================================================================
@mcp.tool()
def store_traffic(seconds: int = 60) -> dict:
"""
Capture live traffic and store it in the SQLite cache for analysis.
Best for: Building up a corpus of traffic examples before running analysis.
Args:
seconds: Capture duration (default 60, max 300 = 5 minutes)
Returns: Number of requests captured and stored.
"""
events = _capture_http(seconds=seconds, max_http=1000)
if isinstance(events, dict) and "error" in events:
return events
db = _get_db()
stored = 0
for event in events:
req = event.get("request", {})
res = event.get("response", {})
meta = event.get("metadata", {})
host = req.get("authority", "unknown")
method = req.get("method", "GET")
path = req.get("path", "/")
route_pattern = _normalize_path(path)
status_code = res.get("status", 0)
req_body = req.get("body", "")
res_body = res.get("body", "")
db.execute("""
INSERT INTO traffic (
host, method, path, route_pattern, status_code,
request_headers, request_body, response_headers, response_body,
process_exe, content_type, body_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
host, method, path, route_pattern, status_code,
json.dumps(req.get("headers", {})),
req_body,
json.dumps(res.get("headers", {})),
res_body,
meta.get("process_exe", ""),
res.get("content_type", ""),
_hash_body(req_body)
))
stored += 1
db.commit()
db.close()
return {
"captured": len(events),
"stored": stored,
"message": f"Stored {stored} requests in cache"
}
@mcp.tool()
def clear_traffic(older_than: str = "24h") -> dict:
"""
Clear old traffic from the cache.
Args:
older_than: Time window to keep (e.g., "1h", "24h", "7d"). Traffic older than this is deleted.
Returns: Number of records deleted.
"""
match = re.match(r'^(\d+)([hdm])$', older_than)
if not match:
return {"error": "Invalid time format. Use format like '1h', '24h', '7d'"}
amount, unit = int(match.group(1)), match.group(2)
if unit == 'h':
delta = timedelta(hours=amount)
elif unit == 'd':
delta = timedelta(days=amount)
elif unit == 'm':
delta = timedelta(minutes=amount)
else:
return {"error": "Invalid time unit"}
cutoff = datetime.now() - delta
db = _get_db()
cursor = db.execute("DELETE FROM traffic WHERE captured_at < ?", (cutoff.strftime('%Y-%m-%d %H:%M:%S'),))
deleted = cursor.rowcount
db.commit()
db.close()
return {
"deleted": deleted,
"cutoff": cutoff.isoformat(),
"message": f"Deleted {deleted} records older than {older_than}"
}
@mcp.tool()
def get_traffic_stats() -> dict:
"""
Get statistics about the traffic cache.
Returns: Total records, unique routes, hosts, time range, and size.
"""
db = _get_db()
stats = {}
row = db.execute("SELECT COUNT(*) as count FROM traffic").fetchone()
stats["total_records"] = row["count"]
row = db.execute("SELECT COUNT(DISTINCT route_pattern) as count FROM traffic").fetchone()
stats["unique_routes"] = row["count"]
row = db.execute("SELECT COUNT(DISTINCT host) as count FROM traffic").fetchone()
stats["unique_hosts"] = row["count"]
row = db.execute("SELECT MIN(captured_at) as min_ts, MAX(captured_at) as max_ts FROM traffic").fetchone()
stats["oldest_record"] = row["min_ts"]
stats["newest_record"] = row["max_ts"]
rows = db.execute("""
SELECT route_pattern, host, COUNT(*) as count
FROM traffic
GROUP BY route_pattern, host
ORDER BY count DESC
LIMIT 10
""").fetchall()
stats["top_routes"] = [{"route": r["route_pattern"], "host": r["host"], "count": r["count"]} for r in rows]
if DB_PATH.exists():
stats["cache_size_mb"] = round(DB_PATH.stat().st_size / (1024 * 1024), 2)
db.close()
return stats
# =============================================================================
# MCP Tools - Route Discovery
# =============================================================================
@mcp.tool()
def get_routes(host: str = None, min_count: int = 1) -> dict:
"""
Discover all routes in the traffic cache, grouped by normalized pattern.
Best for: Understanding what endpoints exist in your API.
Args:
host: Optional filter by host
min_count: Minimum request count to include (default 1)
Returns: List of routes with request counts, methods, and status codes.
"""
db = _get_db()
if host:
rows = db.execute("""
SELECT
route_pattern,
host,
COUNT(*) as request_count,
GROUP_CONCAT(DISTINCT method) as methods,
GROUP_CONCAT(DISTINCT status_code) as status_codes,
MIN(captured_at) as first_seen,
MAX(captured_at) as last_seen
FROM traffic
WHERE host LIKE ?
GROUP BY route_pattern, host
HAVING COUNT(*) >= ?
ORDER BY request_count DESC
""", (f"%{host}%", min_count)).fetchall()
else:
rows = db.execute("""
SELECT
route_pattern,
host,
COUNT(*) as request_count,
GROUP_CONCAT(DISTINCT method) as methods,
GROUP_CONCAT(DISTINCT status_code) as status_codes,
MIN(captured_at) as first_seen,
MAX(captured_at) as last_seen
FROM traffic
GROUP BY route_pattern, host
HAVING COUNT(*) >= ?
ORDER BY request_count DESC
""", (min_count,)).fetchall()
routes = []
for row in rows:
routes.append({
"route": row["route_pattern"],
"host": row["host"],
"request_count": row["request_count"],
"methods": row["methods"].split(",") if row["methods"] else [],
"status_codes": [int(s) for s in row["status_codes"].split(",") if s] if row["status_codes"] else [],
"first_seen": row["first_seen"],
"last_seen": row["last_seen"]
})
db.close()
return {
"route_count": len(routes),
"routes": routes
}
@mcp.tool()
def get_route_examples(
route_pattern: str,
host: str = None,
max_examples: int = 5,
include_errors: bool = True,
dedupe_by: str = "shape"
) -> dict:
"""
Get representative examples for a specific route pattern.
Best for: Understanding request/response shapes for a specific endpoint.
Args:
route_pattern: The normalized route pattern (e.g., "/users/{id}")
host: Optional filter by host
max_examples: Maximum examples to return (default 5)
include_errors: Include 4xx/5xx responses (default True)
dedupe_by: Deduplication strategy - "shape" (unique request bodies by hash), "status" (one per status code), or "none"
Returns: List of request/response examples.
"""
db = _get_db()
query = "SELECT * FROM traffic WHERE route_pattern = ?"
params = [route_pattern]
if host:
query += " AND host LIKE ?"
params.append(f"%{host}%")
if not include_errors:
query += " AND (status_code < 400 OR status_code IS NULL)"
query += " ORDER BY captured_at DESC LIMIT 100"
rows = db.execute(query, params).fetchall()
db.close()
examples = []
seen_shapes = set()
seen_statuses = set()
for row in rows:
if len(examples) >= max_examples:
break
if dedupe_by == "shape":
shape_key = _hash_body(row["request_body"] or "") or "empty"
if shape_key in seen_shapes:
continue
seen_shapes.add(shape_key)
elif dedupe_by == "status":
if row["status_code"] in seen_statuses:
continue
seen_statuses.add(row["status_code"])
example = {
"method": row["method"],
"path": row["path"],
"host": row["host"],
"status_code": row["status_code"],
"captured_at": row["captured_at"],
"request": {
"headers": json.loads(row["request_headers"]) if row["request_headers"] else {},
"body": row["request_body"]
},
"response": {
"headers": json.loads(row["response_headers"]) if row["response_headers"] else {},
"body": row["response_body"]
}
}
examples.append(example)
return {
"route_pattern": route_pattern,
"host": host,
"example_count": len(examples),
"examples": examples
}
# =============================================================================
# MCP Tools - Schema Inference
# =============================================================================
@mcp.tool()
def infer_schema(
route_pattern: str,
host: str = None,
target: str = "both"
) -> dict:
"""
Infer JSON Schema from observed traffic for a route.
Best for: Understanding the data contract for an endpoint.
Note: Only uses successful (2xx) responses, limited to 100 samples.
Args:
route_pattern: The normalized route pattern (e.g., "/users/{id}")
host: Optional filter by host
target: What to analyze - "request", "response", or "both" (default)
Returns: JSON Schema for request and/or response bodies.
"""
db = _get_db()
query = "SELECT request_body, response_body, status_code FROM traffic WHERE route_pattern = ?"
params = [route_pattern]
if host:
query += " AND host LIKE ?"
params.append(f"%{host}%")
query += " AND status_code >= 200 AND status_code < 300"
query += " LIMIT 100"
rows = db.execute(query, params).fetchall()
db.close()
if not rows:
return {"error": f"No traffic found for route {route_pattern}"}
result = {
"route_pattern": route_pattern,
"samples_analyzed": len(rows)
}
if target in ("request", "both"):
request_bodies = [row["request_body"] for row in rows if row["request_body"]]
if request_bodies:
result["request_schema"] = _infer_schema_from_bodies(request_bodies, "Request")
else:
result["request_schema"] = {"note": "No request bodies found"}
if target in ("response", "both"):
response_bodies = [row["response_body"] for row in rows if row["response_body"]]
if response_bodies:
result["response_schema"] = _infer_schema_from_bodies(response_bodies, "Response")
else:
result["response_schema"] = {"note": "No response bodies found"}
return result
# =============================================================================
# MCP Tools - Code Generation
# =============================================================================
@mcp.tool()
def generate_types(
route_pattern: str,
host: str = None,
format: str = "pydantic_v2",
class_name: str = None,
target: str = "both"
) -> dict:
"""
Generate type definitions from observed traffic.
Best for: Creating type annotations for your codebase based on real traffic.
Args:
route_pattern: The normalized route pattern (e.g., "/users/{id}")
host: Optional filter by host
format: Output format - "json_schema", "pydantic_v2", "pydantic_v1", "typeddict", "dataclass"
class_name: Override the generated class name (default: derived from route)
target: What to generate - "request", "response", or "both" (default)
Returns: Generated type definitions as code strings.
"""
schema_result = infer_schema(route_pattern, host, target)
if "error" in schema_result:
return schema_result
result = {
"route_pattern": route_pattern,
"format": format,
"samples_analyzed": schema_result.get("samples_analyzed", 0)
}
if not class_name:
parts = route_pattern.strip("/").split("/")
parts = [p for p in parts if not p.startswith("{")]
class_name = _to_class_name("_".join(parts[-2:]) if len(parts) >= 2 else parts[-1] if parts else "Model")
generators = {
"json_schema": lambda s, n: json.dumps(s, indent=2),
"pydantic_v2": _generate_pydantic_v2,
"pydantic_v1": _generate_pydantic_v1,
"typeddict": _generate_typeddict,
"dataclass": _generate_dataclass,
}
if format not in generators:
return {"error": f"Unknown format: {format}. Available: {', '.join(generators.keys())}"}
generator = generators[format]
if target in ("request", "both") and "request_schema" in schema_result:
schema = schema_result["request_schema"]
if "error" not in schema and schema.get("type") == "object":
result["request_types"] = generator(schema, f"{class_name}Request")
if target in ("response", "both") and "response_schema" in schema_result:
schema = schema_result["response_schema"]
if "error" not in schema and schema.get("type") == "object":
result["response_types"] = generator(schema, f"{class_name}Response")
return result
# =============================================================================
# MCP Tools - Test Generation
# =============================================================================
@mcp.tool()
def generate_test_data(
route_pattern: str,
host: str = None,
format: str = "json",
max_examples: int = 10
) -> dict:
"""
Generate test data from observed traffic.
Best for: Creating test fixtures based on real API interactions.
Args:
route_pattern: The normalized route pattern (e.g., "/users/{id}")
host: Optional filter by host
format: Output format - "json" (raw data) or "pytest" (fixture code)
max_examples: Maximum examples to include (default 10)
Returns: Test data in requested format.
"""
examples_result = get_route_examples(route_pattern, host, max_examples, include_errors=True, dedupe_by="status")
if "error" in examples_result:
return examples_result
examples = examples_result.get("examples", [])
if not examples:
return {"error": f"No examples found for route {route_pattern}"}
if format == "json":
test_cases = []
for i, ex in enumerate(examples):
test_case = {
"id": f"test_case_{i + 1}",
"method": ex["method"],
"path": ex["path"],
"status_code": ex["status_code"],
"request_body": None,
"response_body": None
}
if ex["request"]["body"]:
try:
test_case["request_body"] = json.loads(ex["request"]["body"])
except json.JSONDecodeError:
test_case["request_body"] = ex["request"]["body"]
if ex["response"]["body"]:
try:
test_case["response_body"] = json.loads(ex["response"]["body"])
except json.JSONDecodeError:
test_case["response_body"] = ex["response"]["body"]
test_cases.append(test_case)
return {
"route_pattern": route_pattern,
"format": "json",
"test_cases": test_cases
}
elif format == "pytest":
fixture_name = _to_class_name(route_pattern.strip("/").replace("/", "_").replace("{", "").replace("}", "")).lower()
lines = [
"import pytest",
"from typing import Any",
"",
"",
f"# Test data for route: {route_pattern}",
f"# Host: {host or 'any'}",
f"# Generated from {len(examples)} observed requests",
"",
"",
f"@pytest.fixture",
f"def {fixture_name}_test_cases() -> list[dict[str, Any]]:",
' """Test cases captured from real traffic."""',
" return ["
]
for i, ex in enumerate(examples):
req_body = "None"
res_body = "None"
if ex["request"]["body"]:
try:
req_body = repr(json.loads(ex["request"]["body"]))
except json.JSONDecodeError:
req_body = repr(ex["request"]["body"])
if ex["response"]["body"]:
try:
res_body = repr(json.loads(ex["response"]["body"]))
except json.JSONDecodeError:
res_body = repr(ex["response"]["body"])
lines.append(" {")
lines.append(f' "method": {repr(ex["method"])},')
lines.append(f' "path": {repr(ex["path"])},')
lines.append(f' "status_code": {ex["status_code"]},')
lines.append(f' "request_body": {req_body},')
lines.append(f' "response_body": {res_body},')
lines.append(" },")
lines.append(" ]")
lines.append("")
lines.append("")
lines.append(f"@pytest.fixture(params=[")
for i in range(len(examples)):
lines.append(f' "case_{i + 1}",')
lines.append("])")
lines.append(f"def {fixture_name}_case(request, {fixture_name}_test_cases):")
lines.append(' """Parameterized test case fixture."""')
lines.append(f" idx = int(request.param.split('_')[1]) - 1")
lines.append(f" return {fixture_name}_test_cases[idx]")
return {
"route_pattern": route_pattern,
"format": "pytest",
"code": "\n".join(lines)
}
else:
return {"error": f"Unknown format: {format}. Available: json, pytest"}
# =============================================================================
# MCP Tools - Traffic Comparison
# =============================================================================
@mcp.tool()
def compare_traffic(
route_pattern: str,
host: str = None,
baseline_minutes: int = 60
) -> dict:
"""
Compare recent traffic against a baseline to detect schema drift.
Best for: Validating changes haven't broken API contracts.
Note: Only uses successful (2xx) responses, limited to 50 samples per period.
Args:
route_pattern: The normalized route pattern (e.g., "/users/{id}")
host: Optional filter by host
baseline_minutes: Compare traffic from last N minutes against older traffic (default 60)
Returns: Comparison report showing new fields, removed fields, type changes.
"""
db = _get_db()
cutoff = (datetime.now() - timedelta(minutes=baseline_minutes)).strftime('%Y-%m-%d %H:%M:%S')
query = "SELECT request_body, response_body FROM traffic WHERE route_pattern = ? AND captured_at < ?"
params = [route_pattern, cutoff]
if host:
query += " AND host LIKE ?"
params.append(f"%{host}%")
query += " AND status_code >= 200 AND status_code < 300 LIMIT 50"
baseline_rows = db.execute(query, params).fetchall()
query = "SELECT request_body, response_body FROM traffic WHERE route_pattern = ? AND captured_at >= ?"
params = [route_pattern, cutoff]
if host:
query += " AND host LIKE ?"
params.append(f"%{host}%")
query += " AND status_code >= 200 AND status_code < 300 LIMIT 50"
recent_rows = db.execute(query, params).fetchall()
db.close()
if not baseline_rows:
return {"error": "No baseline traffic found (traffic older than cutoff)"}
if not recent_rows:
return {"error": "No recent traffic found"}
baseline_req = [r["request_body"] for r in baseline_rows if r["request_body"]]
baseline_res = [r["response_body"] for r in baseline_rows if r["response_body"]]
recent_req = [r["request_body"] for r in recent_rows if r["request_body"]]
recent_res = [r["response_body"] for r in recent_rows if r["response_body"]]
result = {
"route_pattern": route_pattern,
"baseline_samples": len(baseline_rows),
"recent_samples": len(recent_rows),
"cutoff": cutoff,
"changes": []
}
def compare_schemas(baseline_bodies, recent_bodies, label):
if not baseline_bodies or not recent_bodies:
return
baseline_schema = _infer_schema_from_bodies(baseline_bodies, "Baseline")
recent_schema = _infer_schema_from_bodies(recent_bodies, "Recent")
baseline_props = set(baseline_schema.get("properties", {}).keys())
recent_props = set(recent_schema.get("properties", {}).keys())
new_fields = recent_props - baseline_props
removed_fields = baseline_props - recent_props
if new_fields:
result["changes"].append({
"type": "new_fields",
"location": label,
"fields": list(new_fields)
})
if removed_fields:
result["changes"].append({
"type": "removed_fields",
"location": label,
"fields": list(removed_fields)
})
common = baseline_props & recent_props
for field in common:
baseline_type = baseline_schema.get("properties", {}).get(field, {}).get("type")
recent_type = recent_schema.get("properties", {}).get(field, {}).get("type")
if baseline_type != recent_type:
result["changes"].append({
"type": "type_change",
"location": label,
"field": field,
"baseline_type": baseline_type,
"recent_type": recent_type
})
compare_schemas(baseline_req, recent_req, "request")
compare_schemas(baseline_res, recent_res, "response")
result["has_drift"] = len(result["changes"]) > 0
return result
if __name__ == "__main__":
mcp.run()