# Traffic Analysis MCP Server

Extend DevTools with AI-powered traffic analysis for code modernization. Infer types from real API responses, generate test fixtures from captured traffic, and validate changes haven't broken API contracts.

{% hint style="info" %}
**Need real-time debugging instead?** See the [DevTools MCP Server](/guides/devtools-guides/devtools-mcp-server.md) for live traffic inspection.
{% endhint %}

**What you'll build:**

```
You: "Capture 2 minutes of traffic while I run my test suite"

AI: [Calling store_traffic tool...]
"Captured 156 requests across 12 unique routes. Ready for analysis."

You: "What endpoints are we hitting?"

AI: [Calling get_routes tool...]
"Found 12 routes:
- /v1/charges (23 requests, POST, all 200s)
- /v1/customers/{id} (18 requests, GET, 200/404)
- /v1/balance (12 requests, GET, all 200s)
..."

You: "Generate types for the charges endpoint"

AI: [Calling generate_types tool...]
"Generated ChargesResponse with 45 typed fields including nested
BillingDetails, PaymentMethodDetails, and Card objects."

You: "Create test fixtures from this traffic"

AI: [Calling generate_test_data tool...]
"Created pytest fixtures with 5 test cases covering 200, 400, and 402 responses."
```

***

## Available Tools

| Tool                 | Use Case                              | Best For                          |
| -------------------- | ------------------------------------- | --------------------------------- |
| `store_traffic`      | Capture live traffic to SQLite cache  | Building a corpus before analysis |
| `clear_traffic`      | Remove old traffic from cache         | Cleaning up after sessions        |
| `get_traffic_stats`  | Cache statistics                      | Checking what's available         |
| `get_routes`         | Discover routes grouped by pattern    | Finding endpoints to analyze      |
| `get_route_examples` | Representative request/response pairs | Understanding payload shapes      |
| `infer_schema`       | JSON Schema from observed traffic     | Understanding data contracts      |
| `generate_types`     | Type definitions in multiple formats  | Adding types to code              |
| `generate_test_data` | Test fixtures from real traffic       | Creating test cases               |
| `compare_traffic`    | Schema drift detection                | Validating changes                |

All tools work with cached traffic. Capture first with `store_traffic`, then analyze.

{% hint style="warning" %}
**Data Sensitivity:** Traffic is stored in `.traffic_cache.db` alongside the script, including full headers and bodies. This may contain API keys, tokens, and PII. Use `clear_traffic()` regularly and avoid committing the database to version control.
{% endhint %}

**Limits:** Capture is capped at 5 minutes and 1,000 HTTP transactions per call. Schema inference uses only successful (2xx) responses.

***

## Use Cases

### Adding Types to Legacy Code

Capture traffic from your running application, then generate type definitions:

```
"Capture traffic while I run the app, then generate Pydantic models for all Stripe endpoints"
```

### Generating Test Fixtures

Create realistic test data from actual API responses:

```
"Generate pytest fixtures for the /v1/customers endpoint with examples of success and error responses"
```

### Validating API Contracts

Before and after dependency upgrades, compare traffic to detect breaking changes:

```
"Capture traffic before the upgrade, then again after - show me any schema drift"
```

### Dependency Modernization

Migrating between library versions (e.g., Pydantic v1 to v2):

```
"Generate both Pydantic v1 and v2 models for this endpoint so I can compare the migration"
```

***

## Prerequisites

* Linux host with Qtap installed and DevTools enabled
* Python 3.10+
* MCP and httpx packages

***

## Create the MCP Server

### Step 1: Install Dependencies

```bash
pip install mcp httpx
```

### Step 2: Create the Server

Save this as `traffic_analysis_mcp.py`:

```python
#!/usr/bin/env python3
"""
MCP Server for traffic-based code modernization.

Captures HTTP traffic and provides tools for:
- Route discovery and grouping
- Schema inference from observed payloads
- Type generation (Pydantic, TypedDict, dataclass, etc.)
- Test data generation
- Before/after traffic comparison
"""

import base64
import hashlib
import json
import re
import sqlite3
import time
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any

import httpx
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Traffic Analysis")

DEVTOOLS_URL = "http://localhost:10001/devtools/api/events"
DB_PATH = Path(__file__).parent / ".traffic_cache.db"

# =============================================================================
# Path Normalization Patterns
# =============================================================================

DYNAMIC_PATTERNS = [
    (re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', re.I), '{uuid}'),
    (re.compile(r'\d{4}-\d{2}-\d{2}'), '{date}'),
    (re.compile(r'[0-9a-f]{24}', re.I), '{object_id}'),
    (re.compile(r'[A-Za-z0-9_-]{20,}'), '{token}'),
    (re.compile(r'\d+'), '{id}'),
]


def _normalize_path(path: str) -> str:
    """Normalize a path by replacing dynamic segments with placeholders."""
    if '?' in path:
        path, query = path.split('?', 1)
    else:
        query = None

    segments = path.split('/')
    normalized = []
    for segment in segments:
        if not segment:
            normalized.append(segment)
            continue

        replaced = segment
        for pattern, placeholder in DYNAMIC_PATTERNS:
            if pattern.fullmatch(segment):
                replaced = placeholder
                break
        normalized.append(replaced)

    return '/'.join(normalized)


# =============================================================================
# SQLite Database Management
# =============================================================================

def _get_db() -> sqlite3.Connection:
    """Get a database connection, creating schema if needed."""
    db = sqlite3.connect(str(DB_PATH))
    db.row_factory = sqlite3.Row

    db.executescript("""
        CREATE TABLE IF NOT EXISTS traffic (
            id INTEGER PRIMARY KEY,
            captured_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            host TEXT NOT NULL,
            method TEXT NOT NULL,
            path TEXT NOT NULL,
            route_pattern TEXT NOT NULL,
            status_code INTEGER,
            request_headers TEXT,
            request_body TEXT,
            response_headers TEXT,
            response_body TEXT,
            process_exe TEXT,
            content_type TEXT,
            body_hash TEXT
        );

        CREATE INDEX IF NOT EXISTS idx_route ON traffic(route_pattern, host);
        CREATE INDEX IF NOT EXISTS idx_captured ON traffic(captured_at);
        CREATE INDEX IF NOT EXISTS idx_host ON traffic(host);
    """)

    return db


def _hash_body(body: str | None) -> str | None:
    """Create a hash of body content for deduplication."""
    if not body:
        return None
    return hashlib.sha256(body.encode()).hexdigest()[:16]


# =============================================================================
# Traffic Capture (from DevTools API)
# =============================================================================

def _capture_raw_events(seconds: int = 30, max_events: int = 500):
    """Capture raw events from DevTools SSE stream."""
    seconds = min(seconds, 300)

    try:
        with httpx.Client(timeout=seconds + 10) as client:
            events = []
            current_event_type = None
            first_event_time = None

            with client.stream("GET", DEVTOOLS_URL, timeout=seconds + 10) as response:
                start = time.time()
                for line in response.iter_lines():
                    if time.time() - start > seconds:
                        break

                    if line.startswith("event: "):
                        current_event_type = line[7:]
                    elif line.startswith("data: "):
                        try:
                            event_data = json.loads(line[6:])
                            event_ts = event_data.get("ts", "")

                            if first_event_time is None:
                                first_event_time = event_ts

                            if current_event_type == "process.started":
                                if event_ts[:20] == first_event_time[:20]:
                                    continue

                            events.append({
                                "type": current_event_type,
                                "ts": event_ts,
                                "data": event_data.get("data", {})
                            })
                        except json.JSONDecodeError:
                            pass

                    if len(events) >= max_events:
                        break

            return events

    except httpx.ConnectError:
        return {"error": "Cannot connect to DevTools API at localhost:10001",
                "hint": "Start Qtap with --enable-dev-tools or ENABLE_DEV_TOOLS=true"}
    except Exception as e:
        return {"error": str(e)}


def _decode_http_transaction(event):
    """Decode a raw http_transaction event into structured data."""
    inner = event.get("data", {})
    if isinstance(inner, dict) and "data" in inner:
        payload = inner.get("data")
        if isinstance(payload, dict) and "data" in payload:
            payload = payload.get("data")
        if not isinstance(payload, str):
            return None
        try:
            decoded = json.loads(base64.b64decode(payload))

            res = decoded.get("response", {})
            if res.get("body"):
                try:
                    res["body"] = base64.b64decode(res["body"]).decode('utf-8', errors='replace')
                except:
                    res["body"] = "[binary data]"

            req = decoded.get("request", {})
            if req.get("body"):
                try:
                    req["body"] = base64.b64decode(req["body"]).decode('utf-8', errors='replace')
                except:
                    req["body"] = "[binary data]"

            return {
                "ts": event.get("ts"),
                "metadata": decoded.get("metadata", {}),
                "request": req,
                "response": res
            }
        except:
            pass
    return None


def _capture_http(seconds: int = 30, max_http: int = 500):
    """Capture and decode HTTP transactions."""
    max_raw_events = max(seconds * 500, 10000)
    events = _capture_raw_events(seconds=seconds, max_events=max_raw_events)

    if isinstance(events, dict) and "error" in events:
        return events

    http_events = []
    for event in events:
        if event.get("type") == "request.http_transaction":
            decoded = _decode_http_transaction(event)
            if decoded:
                http_events.append(decoded)
                if len(http_events) >= max_http:
                    break

    return http_events


# =============================================================================
# Schema Inference
# =============================================================================

def _infer_type(value: Any) -> dict:
    """Infer JSON Schema type from a Python value."""
    if value is None:
        return {"type": "null"}
    elif isinstance(value, bool):
        return {"type": "boolean"}
    elif isinstance(value, int):
        return {"type": "integer"}
    elif isinstance(value, float):
        return {"type": "number"}
    elif isinstance(value, str):
        if re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', value):
            return {"type": "string", "format": "date-time"}
        elif re.match(r'^\d{4}-\d{2}-\d{2}$', value):
            return {"type": "string", "format": "date"}
        elif re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
            return {"type": "string", "format": "email"}
        elif re.match(r'^https?://', value):
            return {"type": "string", "format": "uri"}
        elif re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', value, re.I):
            return {"type": "string", "format": "uuid"}
        return {"type": "string"}
    elif isinstance(value, list):
        if not value:
            return {"type": "array", "items": {}}
        item_schemas = [_infer_type(item) for item in value[:10] if item is not None]
        if item_schemas:
            merged = _merge_schemas(item_schemas)
            return {"type": "array", "items": merged}
        return {"type": "array", "items": {}}
    elif isinstance(value, dict):
        properties = {}
        for k, v in value.items():
            properties[k] = _infer_type(v)
        return {
            "type": "object",
            "properties": properties,
            "required": list(value.keys())
        }
    return {}


def _merge_schemas(schemas: list[dict]) -> dict:
    """Merge multiple JSON schemas into one that accepts all observed shapes."""
    if not schemas:
        return {}
    if len(schemas) == 1:
        return schemas[0]

    types = set()
    all_properties = defaultdict(list)
    all_required = None
    formats = set()
    item_schemas = []

    for schema in schemas:
        schema_type = schema.get("type")
        if schema_type:
            types.add(schema_type)

        if schema.get("format"):
            formats.add(schema["format"])

        if schema_type == "object" and "properties" in schema:
            for prop, prop_schema in schema["properties"].items():
                all_properties[prop].append(prop_schema)

            required = set(schema.get("required", []))
            if all_required is None:
                all_required = required
            else:
                all_required = all_required & required

        if schema_type == "array" and "items" in schema:
            item_schemas.append(schema["items"])

    result = {}

    if len(types) == 1:
        result["type"] = types.pop()
    elif len(types) > 1:
        if types == {"integer", "number"}:
            result["type"] = "number"
        elif "null" in types:
            other_types = types - {"null"}
            if len(other_types) == 1:
                result["type"] = [other_types.pop(), "null"]
            else:
                result["type"] = list(types)
        else:
            result["type"] = list(types)

    if len(formats) == 1:
        result["format"] = formats.pop()

    if all_properties:
        merged_props = {}
        for prop, prop_schemas in all_properties.items():
            merged_props[prop] = _merge_schemas(prop_schemas)
        result["properties"] = merged_props
        if all_required:
            result["required"] = sorted(all_required)

    if item_schemas:
        result["items"] = _merge_schemas(item_schemas)

    return result


def _infer_schema_from_bodies(bodies: list[str], target_name: str = "Root") -> dict:
    """Infer JSON Schema from multiple body strings."""
    schemas = []

    for body in bodies:
        if not body or body == "[binary data]":
            continue
        try:
            data = json.loads(body)
            schemas.append(_infer_type(data))
        except json.JSONDecodeError:
            continue

    if not schemas:
        return {"error": "No valid JSON bodies to analyze"}

    merged = _merge_schemas(schemas)
    merged["$schema"] = "http://json-schema.org/draft-07/schema#"
    merged["title"] = target_name

    return merged


# =============================================================================
# Code Generation
# =============================================================================

def _schema_to_python_type(schema: dict, name: str = "Value") -> str:
    """Convert JSON Schema to Python type annotation."""
    schema_type = schema.get("type")

    if isinstance(schema_type, list):
        types = [_schema_to_python_type({"type": t}, name) for t in schema_type]
        return " | ".join(types)

    if schema_type == "null":
        return "None"
    elif schema_type == "boolean":
        return "bool"
    elif schema_type == "integer":
        return "int"
    elif schema_type == "number":
        return "float"
    elif schema_type == "string":
        return "str"
    elif schema_type == "array":
        items = schema.get("items", {})
        item_type = _schema_to_python_type(items, f"{name}Item")
        return f"list[{item_type}]"
    elif schema_type == "object":
        return name

    return "Any"


def _to_class_name(name: str) -> str:
    """Convert a string to PascalCase class name."""
    parts = re.split(r'[_-]', name)
    return ''.join(word.capitalize() for word in parts)


def _generate_pydantic_v2(schema: dict, class_name: str = "Model") -> str:
    """Generate Pydantic v2 model from JSON Schema."""
    lines = [
        "from pydantic import BaseModel, Field",
        "from typing import Any",
        ""
    ]

    def generate_class(schema: dict, name: str, indent: int = 0) -> list[str]:
        result = []
        prefix = "    " * indent

        if schema.get("type") != "object" or "properties" not in schema:
            return result

        for prop_name, prop_schema in schema.get("properties", {}).items():
            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                nested_name = _to_class_name(prop_name)
                result.extend(generate_class(prop_schema, nested_name, indent))
                result.append("")

        required = set(schema.get("required", []))
        result.append(f"{prefix}class {name}(BaseModel):")

        properties = schema.get("properties", {})
        if not properties:
            result.append(f"{prefix}    pass")
            return result

        for prop_name, prop_schema in properties.items():
            is_required = prop_name in required

            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                py_type = _to_class_name(prop_name)
            else:
                py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))

            if not is_required:
                py_type = f"{py_type} | None"

            field_args = []
            if not is_required:
                field_args.append("default=None")

            if field_args:
                result.append(f"{prefix}    {prop_name}: {py_type} = Field({', '.join(field_args)})")
            else:
                result.append(f"{prefix}    {prop_name}: {py_type}")

        return result

    lines.extend(generate_class(schema, class_name))
    return "\n".join(lines)


def _generate_pydantic_v1(schema: dict, class_name: str = "Model") -> str:
    """Generate Pydantic v1 compatible model from JSON Schema."""
    lines = [
        "from pydantic import BaseModel",
        "from typing import Any, List, Optional",
        ""
    ]

    def generate_class(schema: dict, name: str) -> list[str]:
        result = []

        if schema.get("type") != "object" or "properties" not in schema:
            return result

        for prop_name, prop_schema in schema.get("properties", {}).items():
            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                nested_name = _to_class_name(prop_name)
                result.extend(generate_class(prop_schema, nested_name))
                result.append("")

        required = set(schema.get("required", []))
        result.append(f"class {name}(BaseModel):")

        properties = schema.get("properties", {})
        if not properties:
            result.append("    pass")
            return result

        for prop_name, prop_schema in properties.items():
            is_required = prop_name in required

            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                py_type = _to_class_name(prop_name)
            else:
                py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
                py_type = py_type.replace(" | None", "")
                py_type = py_type.replace("list[", "List[")

            if not is_required:
                result.append(f"    {prop_name}: Optional[{py_type}] = None")
            else:
                result.append(f"    {prop_name}: {py_type}")

        return result

    lines.extend(generate_class(schema, class_name))
    return "\n".join(lines)


def _generate_typeddict(schema: dict, class_name: str = "Model") -> str:
    """Generate TypedDict from JSON Schema."""
    lines = [
        "from typing import TypedDict, NotRequired, Any",
        ""
    ]

    def generate_class(schema: dict, name: str) -> list[str]:
        result = []

        if schema.get("type") != "object" or "properties" not in schema:
            return result

        for prop_name, prop_schema in schema.get("properties", {}).items():
            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                nested_name = _to_class_name(prop_name)
                result.extend(generate_class(prop_schema, nested_name))
                result.append("")

        required = set(schema.get("required", []))
        result.append(f"class {name}(TypedDict):")

        properties = schema.get("properties", {})
        if not properties:
            result.append("    pass")
            return result

        for prop_name, prop_schema in properties.items():
            is_required = prop_name in required

            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                py_type = _to_class_name(prop_name)
            else:
                py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))

            if not is_required:
                result.append(f"    {prop_name}: NotRequired[{py_type}]")
            else:
                result.append(f"    {prop_name}: {py_type}")

        return result

    lines.extend(generate_class(schema, class_name))
    return "\n".join(lines)


def _generate_dataclass(schema: dict, class_name: str = "Model") -> str:
    """Generate dataclass from JSON Schema."""
    lines = [
        "from dataclasses import dataclass, field",
        "from typing import Any",
        ""
    ]

    def generate_class(schema: dict, name: str) -> list[str]:
        result = []

        if schema.get("type") != "object" or "properties" not in schema:
            return result

        for prop_name, prop_schema in schema.get("properties", {}).items():
            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                nested_name = _to_class_name(prop_name)
                result.extend(generate_class(prop_schema, nested_name))
                result.append("")

        required = set(schema.get("required", []))

        result.append("@dataclass")
        result.append(f"class {name}:")

        properties = schema.get("properties", {})
        if not properties:
            result.append("    pass")
            return result

        required_props = [(k, v) for k, v in properties.items() if k in required]
        optional_props = [(k, v) for k, v in properties.items() if k not in required]

        for prop_name, prop_schema in required_props:
            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                py_type = _to_class_name(prop_name)
            else:
                py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
            result.append(f"    {prop_name}: {py_type}")

        for prop_name, prop_schema in optional_props:
            if prop_schema.get("type") == "object" and "properties" in prop_schema:
                py_type = _to_class_name(prop_name)
            else:
                py_type = _schema_to_python_type(prop_schema, _to_class_name(prop_name))
            result.append(f"    {prop_name}: {py_type} | None = None")

        return result

    lines.extend(generate_class(schema, class_name))
    return "\n".join(lines)


# =============================================================================
# MCP Tools - Traffic Persistence
# =============================================================================

@mcp.tool()
def store_traffic(seconds: int = 60) -> dict:
    """
    Capture live traffic and store it in the SQLite cache for analysis.

    Best for: Building up a corpus of traffic examples before running analysis.

    Args:
        seconds: Capture duration (default 60, max 300 = 5 minutes)

    Returns: Number of requests captured and stored.
    """
    events = _capture_http(seconds=seconds, max_http=1000)

    if isinstance(events, dict) and "error" in events:
        return events

    db = _get_db()
    stored = 0

    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        meta = event.get("metadata", {})

        host = req.get("authority", "unknown")
        method = req.get("method", "GET")
        path = req.get("path", "/")
        route_pattern = _normalize_path(path)
        status_code = res.get("status", 0)

        req_body = req.get("body", "")
        res_body = res.get("body", "")

        db.execute("""
            INSERT INTO traffic (
                host, method, path, route_pattern, status_code,
                request_headers, request_body, response_headers, response_body,
                process_exe, content_type, body_hash
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            host, method, path, route_pattern, status_code,
            json.dumps(req.get("headers", {})),
            req_body,
            json.dumps(res.get("headers", {})),
            res_body,
            meta.get("process_exe", ""),
            res.get("content_type", ""),
            _hash_body(req_body)
        ))
        stored += 1

    db.commit()
    db.close()

    return {
        "captured": len(events),
        "stored": stored,
        "message": f"Stored {stored} requests in cache"
    }


@mcp.tool()
def clear_traffic(older_than: str = "24h") -> dict:
    """
    Clear old traffic from the cache.

    Args:
        older_than: Time window to keep (e.g., "1h", "24h", "7d"). Traffic older than this is deleted.

    Returns: Number of records deleted.
    """
    match = re.match(r'^(\d+)([hdm])$', older_than)
    if not match:
        return {"error": "Invalid time format. Use format like '1h', '24h', '7d'"}

    amount, unit = int(match.group(1)), match.group(2)

    if unit == 'h':
        delta = timedelta(hours=amount)
    elif unit == 'd':
        delta = timedelta(days=amount)
    elif unit == 'm':
        delta = timedelta(minutes=amount)
    else:
        return {"error": "Invalid time unit"}

    cutoff = datetime.now() - delta

    db = _get_db()
    cursor = db.execute("DELETE FROM traffic WHERE captured_at < ?", (cutoff.strftime('%Y-%m-%d %H:%M:%S'),))
    deleted = cursor.rowcount
    db.commit()
    db.close()

    return {
        "deleted": deleted,
        "cutoff": cutoff.isoformat(),
        "message": f"Deleted {deleted} records older than {older_than}"
    }


@mcp.tool()
def get_traffic_stats() -> dict:
    """
    Get statistics about the traffic cache.

    Returns: Total records, unique routes, hosts, time range, and size.
    """
    db = _get_db()

    stats = {}

    row = db.execute("SELECT COUNT(*) as count FROM traffic").fetchone()
    stats["total_records"] = row["count"]

    row = db.execute("SELECT COUNT(DISTINCT route_pattern) as count FROM traffic").fetchone()
    stats["unique_routes"] = row["count"]

    row = db.execute("SELECT COUNT(DISTINCT host) as count FROM traffic").fetchone()
    stats["unique_hosts"] = row["count"]

    row = db.execute("SELECT MIN(captured_at) as min_ts, MAX(captured_at) as max_ts FROM traffic").fetchone()
    stats["oldest_record"] = row["min_ts"]
    stats["newest_record"] = row["max_ts"]

    rows = db.execute("""
        SELECT route_pattern, host, COUNT(*) as count
        FROM traffic
        GROUP BY route_pattern, host
        ORDER BY count DESC
        LIMIT 10
    """).fetchall()
    stats["top_routes"] = [{"route": r["route_pattern"], "host": r["host"], "count": r["count"]} for r in rows]

    if DB_PATH.exists():
        stats["cache_size_mb"] = round(DB_PATH.stat().st_size / (1024 * 1024), 2)

    db.close()

    return stats


# =============================================================================
# MCP Tools - Route Discovery
# =============================================================================

@mcp.tool()
def get_routes(host: str = None, min_count: int = 1) -> dict:
    """
    Discover all routes in the traffic cache, grouped by normalized pattern.

    Best for: Understanding what endpoints exist in your API.

    Args:
        host: Optional filter by host
        min_count: Minimum request count to include (default 1)

    Returns: List of routes with request counts, methods, and status codes.
    """
    db = _get_db()

    if host:
        rows = db.execute("""
            SELECT
                route_pattern,
                host,
                COUNT(*) as request_count,
                GROUP_CONCAT(DISTINCT method) as methods,
                GROUP_CONCAT(DISTINCT status_code) as status_codes,
                MIN(captured_at) as first_seen,
                MAX(captured_at) as last_seen
            FROM traffic
            WHERE host LIKE ?
            GROUP BY route_pattern, host
            HAVING COUNT(*) >= ?
            ORDER BY request_count DESC
        """, (f"%{host}%", min_count)).fetchall()
    else:
        rows = db.execute("""
            SELECT
                route_pattern,
                host,
                COUNT(*) as request_count,
                GROUP_CONCAT(DISTINCT method) as methods,
                GROUP_CONCAT(DISTINCT status_code) as status_codes,
                MIN(captured_at) as first_seen,
                MAX(captured_at) as last_seen
            FROM traffic
            GROUP BY route_pattern, host
            HAVING COUNT(*) >= ?
            ORDER BY request_count DESC
        """, (min_count,)).fetchall()

    routes = []
    for row in rows:
        routes.append({
            "route": row["route_pattern"],
            "host": row["host"],
            "request_count": row["request_count"],
            "methods": row["methods"].split(",") if row["methods"] else [],
            "status_codes": [int(s) for s in row["status_codes"].split(",") if s] if row["status_codes"] else [],
            "first_seen": row["first_seen"],
            "last_seen": row["last_seen"]
        })

    db.close()

    return {
        "route_count": len(routes),
        "routes": routes
    }


@mcp.tool()
def get_route_examples(
    route_pattern: str,
    host: str = None,
    max_examples: int = 5,
    include_errors: bool = True,
    dedupe_by: str = "shape"
) -> dict:
    """
    Get representative examples for a specific route pattern.

    Best for: Understanding request/response shapes for a specific endpoint.

    Args:
        route_pattern: The normalized route pattern (e.g., "/users/{id}")
        host: Optional filter by host
        max_examples: Maximum examples to return (default 5)
        include_errors: Include 4xx/5xx responses (default True)
        dedupe_by: Deduplication strategy - "shape" (unique request bodies by hash), "status" (one per status code), or "none"

    Returns: List of request/response examples.
    """
    db = _get_db()

    query = "SELECT * FROM traffic WHERE route_pattern = ?"
    params = [route_pattern]

    if host:
        query += " AND host LIKE ?"
        params.append(f"%{host}%")

    if not include_errors:
        query += " AND (status_code < 400 OR status_code IS NULL)"

    query += " ORDER BY captured_at DESC LIMIT 100"

    rows = db.execute(query, params).fetchall()
    db.close()

    examples = []
    seen_shapes = set()
    seen_statuses = set()

    for row in rows:
        if len(examples) >= max_examples:
            break

        if dedupe_by == "shape":
            shape_key = _hash_body(row["request_body"] or "") or "empty"
            if shape_key in seen_shapes:
                continue
            seen_shapes.add(shape_key)
        elif dedupe_by == "status":
            if row["status_code"] in seen_statuses:
                continue
            seen_statuses.add(row["status_code"])

        example = {
            "method": row["method"],
            "path": row["path"],
            "host": row["host"],
            "status_code": row["status_code"],
            "captured_at": row["captured_at"],
            "request": {
                "headers": json.loads(row["request_headers"]) if row["request_headers"] else {},
                "body": row["request_body"]
            },
            "response": {
                "headers": json.loads(row["response_headers"]) if row["response_headers"] else {},
                "body": row["response_body"]
            }
        }
        examples.append(example)

    return {
        "route_pattern": route_pattern,
        "host": host,
        "example_count": len(examples),
        "examples": examples
    }


# =============================================================================
# MCP Tools - Schema Inference
# =============================================================================

@mcp.tool()
def infer_schema(
    route_pattern: str,
    host: str = None,
    target: str = "both"
) -> dict:
    """
    Infer JSON Schema from observed traffic for a route.

    Best for: Understanding the data contract for an endpoint.

    Note: Only uses successful (2xx) responses, limited to 100 samples.

    Args:
        route_pattern: The normalized route pattern (e.g., "/users/{id}")
        host: Optional filter by host
        target: What to analyze - "request", "response", or "both" (default)

    Returns: JSON Schema for request and/or response bodies.
    """
    db = _get_db()

    query = "SELECT request_body, response_body, status_code FROM traffic WHERE route_pattern = ?"
    params = [route_pattern]

    if host:
        query += " AND host LIKE ?"
        params.append(f"%{host}%")

    query += " AND status_code >= 200 AND status_code < 300"
    query += " LIMIT 100"

    rows = db.execute(query, params).fetchall()
    db.close()

    if not rows:
        return {"error": f"No traffic found for route {route_pattern}"}

    result = {
        "route_pattern": route_pattern,
        "samples_analyzed": len(rows)
    }

    if target in ("request", "both"):
        request_bodies = [row["request_body"] for row in rows if row["request_body"]]
        if request_bodies:
            result["request_schema"] = _infer_schema_from_bodies(request_bodies, "Request")
        else:
            result["request_schema"] = {"note": "No request bodies found"}

    if target in ("response", "both"):
        response_bodies = [row["response_body"] for row in rows if row["response_body"]]
        if response_bodies:
            result["response_schema"] = _infer_schema_from_bodies(response_bodies, "Response")
        else:
            result["response_schema"] = {"note": "No response bodies found"}

    return result


# =============================================================================
# MCP Tools - Code Generation
# =============================================================================

@mcp.tool()
def generate_types(
    route_pattern: str,
    host: str = None,
    format: str = "pydantic_v2",
    class_name: str = None,
    target: str = "both"
) -> dict:
    """
    Generate type definitions from observed traffic.

    Best for: Creating type annotations for your codebase based on real traffic.

    Args:
        route_pattern: The normalized route pattern (e.g., "/users/{id}")
        host: Optional filter by host
        format: Output format - "json_schema", "pydantic_v2", "pydantic_v1", "typeddict", "dataclass"
        class_name: Override the generated class name (default: derived from route)
        target: What to generate - "request", "response", or "both" (default)

    Returns: Generated type definitions as code strings.
    """
    schema_result = infer_schema(route_pattern, host, target)

    if "error" in schema_result:
        return schema_result

    result = {
        "route_pattern": route_pattern,
        "format": format,
        "samples_analyzed": schema_result.get("samples_analyzed", 0)
    }

    if not class_name:
        parts = route_pattern.strip("/").split("/")
        parts = [p for p in parts if not p.startswith("{")]
        class_name = _to_class_name("_".join(parts[-2:]) if len(parts) >= 2 else parts[-1] if parts else "Model")

    generators = {
        "json_schema": lambda s, n: json.dumps(s, indent=2),
        "pydantic_v2": _generate_pydantic_v2,
        "pydantic_v1": _generate_pydantic_v1,
        "typeddict": _generate_typeddict,
        "dataclass": _generate_dataclass,
    }

    if format not in generators:
        return {"error": f"Unknown format: {format}. Available: {', '.join(generators.keys())}"}

    generator = generators[format]

    if target in ("request", "both") and "request_schema" in schema_result:
        schema = schema_result["request_schema"]
        if "error" not in schema and schema.get("type") == "object":
            result["request_types"] = generator(schema, f"{class_name}Request")

    if target in ("response", "both") and "response_schema" in schema_result:
        schema = schema_result["response_schema"]
        if "error" not in schema and schema.get("type") == "object":
            result["response_types"] = generator(schema, f"{class_name}Response")

    return result


# =============================================================================
# MCP Tools - Test Generation
# =============================================================================

@mcp.tool()
def generate_test_data(
    route_pattern: str,
    host: str = None,
    format: str = "json",
    max_examples: int = 10
) -> dict:
    """
    Generate test data from observed traffic.

    Best for: Creating test fixtures based on real API interactions.

    Args:
        route_pattern: The normalized route pattern (e.g., "/users/{id}")
        host: Optional filter by host
        format: Output format - "json" (raw data) or "pytest" (fixture code)
        max_examples: Maximum examples to include (default 10)

    Returns: Test data in requested format.
    """
    examples_result = get_route_examples(route_pattern, host, max_examples, include_errors=True, dedupe_by="status")

    if "error" in examples_result:
        return examples_result

    examples = examples_result.get("examples", [])

    if not examples:
        return {"error": f"No examples found for route {route_pattern}"}

    if format == "json":
        test_cases = []
        for i, ex in enumerate(examples):
            test_case = {
                "id": f"test_case_{i + 1}",
                "method": ex["method"],
                "path": ex["path"],
                "status_code": ex["status_code"],
                "request_body": None,
                "response_body": None
            }

            if ex["request"]["body"]:
                try:
                    test_case["request_body"] = json.loads(ex["request"]["body"])
                except json.JSONDecodeError:
                    test_case["request_body"] = ex["request"]["body"]

            if ex["response"]["body"]:
                try:
                    test_case["response_body"] = json.loads(ex["response"]["body"])
                except json.JSONDecodeError:
                    test_case["response_body"] = ex["response"]["body"]

            test_cases.append(test_case)

        return {
            "route_pattern": route_pattern,
            "format": "json",
            "test_cases": test_cases
        }

    elif format == "pytest":
        fixture_name = _to_class_name(route_pattern.strip("/").replace("/", "_").replace("{", "").replace("}", "")).lower()

        lines = [
            "import pytest",
            "from typing import Any",
            "",
            "",
            f"# Test data for route: {route_pattern}",
            f"# Host: {host or 'any'}",
            f"# Generated from {len(examples)} observed requests",
            "",
            "",
            f"@pytest.fixture",
            f"def {fixture_name}_test_cases() -> list[dict[str, Any]]:",
            '    """Test cases captured from real traffic."""',
            "    return ["
        ]

        for i, ex in enumerate(examples):
            req_body = "None"
            res_body = "None"

            if ex["request"]["body"]:
                try:
                    req_body = repr(json.loads(ex["request"]["body"]))
                except json.JSONDecodeError:
                    req_body = repr(ex["request"]["body"])

            if ex["response"]["body"]:
                try:
                    res_body = repr(json.loads(ex["response"]["body"]))
                except json.JSONDecodeError:
                    res_body = repr(ex["response"]["body"])

            lines.append("        {")
            lines.append(f'            "method": {repr(ex["method"])},')
            lines.append(f'            "path": {repr(ex["path"])},')
            lines.append(f'            "status_code": {ex["status_code"]},')
            lines.append(f'            "request_body": {req_body},')
            lines.append(f'            "response_body": {res_body},')
            lines.append("        },")

        lines.append("    ]")
        lines.append("")
        lines.append("")
        lines.append(f"@pytest.fixture(params=[")
        for i in range(len(examples)):
            lines.append(f'    "case_{i + 1}",')
        lines.append("])")
        lines.append(f"def {fixture_name}_case(request, {fixture_name}_test_cases):")
        lines.append('    """Parameterized test case fixture."""')
        lines.append(f"    idx = int(request.param.split('_')[1]) - 1")
        lines.append(f"    return {fixture_name}_test_cases[idx]")

        return {
            "route_pattern": route_pattern,
            "format": "pytest",
            "code": "\n".join(lines)
        }

    else:
        return {"error": f"Unknown format: {format}. Available: json, pytest"}


# =============================================================================
# MCP Tools - Traffic Comparison
# =============================================================================

@mcp.tool()
def compare_traffic(
    route_pattern: str,
    host: str = None,
    baseline_minutes: int = 60
) -> dict:
    """
    Compare recent traffic against a baseline to detect schema drift.

    Best for: Validating changes haven't broken API contracts.

    Note: Only uses successful (2xx) responses, limited to 50 samples per period.

    Args:
        route_pattern: The normalized route pattern (e.g., "/users/{id}")
        host: Optional filter by host
        baseline_minutes: Compare traffic from last N minutes against older traffic (default 60)

    Returns: Comparison report showing new fields, removed fields, type changes.
    """
    db = _get_db()

    cutoff = (datetime.now() - timedelta(minutes=baseline_minutes)).strftime('%Y-%m-%d %H:%M:%S')

    query = "SELECT request_body, response_body FROM traffic WHERE route_pattern = ? AND captured_at < ?"
    params = [route_pattern, cutoff]
    if host:
        query += " AND host LIKE ?"
        params.append(f"%{host}%")
    query += " AND status_code >= 200 AND status_code < 300 LIMIT 50"

    baseline_rows = db.execute(query, params).fetchall()

    query = "SELECT request_body, response_body FROM traffic WHERE route_pattern = ? AND captured_at >= ?"
    params = [route_pattern, cutoff]
    if host:
        query += " AND host LIKE ?"
        params.append(f"%{host}%")
    query += " AND status_code >= 200 AND status_code < 300 LIMIT 50"

    recent_rows = db.execute(query, params).fetchall()
    db.close()

    if not baseline_rows:
        return {"error": "No baseline traffic found (traffic older than cutoff)"}

    if not recent_rows:
        return {"error": "No recent traffic found"}

    baseline_req = [r["request_body"] for r in baseline_rows if r["request_body"]]
    baseline_res = [r["response_body"] for r in baseline_rows if r["response_body"]]
    recent_req = [r["request_body"] for r in recent_rows if r["request_body"]]
    recent_res = [r["response_body"] for r in recent_rows if r["response_body"]]

    result = {
        "route_pattern": route_pattern,
        "baseline_samples": len(baseline_rows),
        "recent_samples": len(recent_rows),
        "cutoff": cutoff,
        "changes": []
    }

    def compare_schemas(baseline_bodies, recent_bodies, label):
        if not baseline_bodies or not recent_bodies:
            return

        baseline_schema = _infer_schema_from_bodies(baseline_bodies, "Baseline")
        recent_schema = _infer_schema_from_bodies(recent_bodies, "Recent")

        baseline_props = set(baseline_schema.get("properties", {}).keys())
        recent_props = set(recent_schema.get("properties", {}).keys())

        new_fields = recent_props - baseline_props
        removed_fields = baseline_props - recent_props

        if new_fields:
            result["changes"].append({
                "type": "new_fields",
                "location": label,
                "fields": list(new_fields)
            })

        if removed_fields:
            result["changes"].append({
                "type": "removed_fields",
                "location": label,
                "fields": list(removed_fields)
            })

        common = baseline_props & recent_props
        for field in common:
            baseline_type = baseline_schema.get("properties", {}).get(field, {}).get("type")
            recent_type = recent_schema.get("properties", {}).get(field, {}).get("type")
            if baseline_type != recent_type:
                result["changes"].append({
                    "type": "type_change",
                    "location": label,
                    "field": field,
                    "baseline_type": baseline_type,
                    "recent_type": recent_type
                })

    compare_schemas(baseline_req, recent_req, "request")
    compare_schemas(baseline_res, recent_res, "response")

    result["has_drift"] = len(result["changes"]) > 0

    return result


if __name__ == "__main__":
    mcp.run()
```

Make it executable:

```bash
chmod +x traffic_analysis_mcp.py
```

***

## Connect to AI Assistants

### Option 1: Codex CLI

[Codex CLI](https://github.com/openai/codex) is OpenAI's terminal-based coding assistant that supports MCP servers natively.

**Add the MCP server to your config:**

```bash
codex mcp add traffic-analysis -- python3 /path/to/traffic_analysis_mcp.py
```

Or edit `~/.codex/config.toml` directly:

```toml
[mcp_servers.traffic-analysis]
command = "python3"
args = ["/path/to/traffic_analysis_mcp.py"]
```

{% hint style="info" %}
**Using a virtual environment?** Replace `python3` with your venv Python path:

```bash
codex mcp add traffic-analysis -- /path/to/venv/bin/python /path/to/traffic_analysis_mcp.py
```

{% endhint %}

**Use it:**

```bash
codex
> Capture 2 minutes of traffic and show me what routes exist
```

***

### Option 2: ChatGPT Developer Mode

ChatGPT can connect to MCP servers via Developer Mode connectors. Since ChatGPT requires HTTPS, you'll need to expose your local server.

**1. Start the MCP server with HTTP transport:**

Create a separate entrypoint file `traffic_analysis_http.py` to avoid modifying the main script (which would break Codex/Claude stdio mode):

```python
#!/usr/bin/env python3
# traffic_analysis_http.py - HTTP entrypoint for ChatGPT
from traffic_analysis_mcp import mcp

if __name__ == "__main__":
    mcp.run(transport="http", host="0.0.0.0", port=8081)
```

Then run it:

```bash
python traffic_analysis_http.py
```

**2. Expose your local server via HTTPS tunnel:**

{% tabs %}
{% tab title="Cloudflared (Recommended)" %}

```bash
# One-liner, no account required
cloudflared tunnel --url http://localhost:8081
```

This creates a temporary `*.trycloudflare.com` URL you can use immediately.
{% endtab %}

{% tab title="ngrok" %}

```bash
ngrok http 8081
```

Requires a free ngrok account.
{% endtab %}
{% endtabs %}

**3. In ChatGPT:**

* Go to Settings > Connectors > Advanced Settings
* Enable Developer Mode
* Add a new connector with your tunnel URL

***

### Option 3: Claude Code

Claude Code supports MCP servers natively via the CLI.

**Add the MCP server:**

```bash
claude mcp add --transport stdio traffic-analysis -- python3 /path/to/traffic_analysis_mcp.py
```

{% hint style="info" %}
**Using a virtual environment?** Replace `python3` with your venv Python path:

```bash
claude mcp add --transport stdio traffic-analysis -- /path/to/venv/bin/python /path/to/traffic_analysis_mcp.py
```

{% endhint %}

**Verify it's connected:**

```bash
claude mcp list
# Should show: traffic-analysis: ... - ✓ Connected
```

**Use it in Claude Code:**

```
Capture traffic while I run my tests, then generate types for the user API
```

***

## Example Workflow

### Step 1: Start Traffic Generation

Run your application, test suite, or any code that makes HTTP requests:

```bash
# Option A: Run your app
python my_app.py

# Option B: Run your test suite
pytest tests/ -v

# Option C: Use curl/scripts
./run_api_tests.sh
```

### Step 2: Capture Traffic

While your application is running, ask your AI assistant to capture:

```
"Capture 2 minutes of traffic"
```

The assistant will call `store_traffic(seconds=120)` and report what was captured.

{% hint style="info" %}
**Note:** Traffic capture blocks for the specified duration. During a 2-minute capture, the assistant waits until capture completes before responding.
{% endhint %}

### Step 3: Discover Routes

```
"What routes did we capture?"
```

The assistant calls `get_routes()` and shows you all endpoints grouped by pattern.

### Step 4: Examine Examples

```
"Show me examples for the /v1/charges endpoint"
```

The assistant calls `get_route_examples("/v1/charges")` to show request/response pairs.

### Step 5: Generate Types

```
"Generate Pydantic models for that endpoint"
```

The assistant calls `generate_types("/v1/charges", format="pydantic_v2")` to create type definitions.

### Step 6: Create Test Fixtures

```
"Create pytest fixtures from this traffic"
```

The assistant calls `generate_test_data("/v1/charges", format="pytest")` to generate test code.

***

## Example Prompts

**Route Discovery:**

* "What endpoints are we hitting?"
* "Show me all routes for api.stripe.com"
* "Which routes have error responses?"

**Type Generation:**

* "Generate types for the /users endpoint"
* "Create Pydantic v1 models for backwards compatibility"
* "Generate TypedDicts instead of Pydantic models"
* "What does the response schema look like for /v1/charges?"

**Test Generation:**

* "Create test fixtures for the payment API"
* "Generate JSON test data I can use in my tests"
* "Build pytest fixtures with examples of error responses"

**Schema Validation:**

* "Has the user API schema changed since yesterday?"
* "Compare recent traffic against the baseline"
* "Are there any breaking changes in the response format?"

**Cache Management:**

* "How much traffic have we captured?"
* "Clear traffic older than 1 hour"
* "What's in the cache?"

***

## Example Output

### get\_routes()

```json
{
  "route_count": 5,
  "routes": [
    {
      "route": "/v1/charges",
      "host": "api.stripe.com",
      "request_count": 23,
      "methods": ["POST"],
      "status_codes": [200, 402],
      "first_seen": "2024-01-15 10:30:00",
      "last_seen": "2024-01-15 10:45:00"
    },
    {
      "route": "/v1/customers/{id}",
      "host": "api.stripe.com",
      "request_count": 18,
      "methods": ["GET"],
      "status_codes": [200, 404],
      "first_seen": "2024-01-15 10:30:15",
      "last_seen": "2024-01-15 10:44:30"
    }
  ]
}
```

### infer\_schema()

```json
{
  "route_pattern": "/v1/charges",
  "samples_analyzed": 23,
  "response_schema": {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Response",
    "type": "object",
    "properties": {
      "id": {"type": "string"},
      "amount": {"type": "integer"},
      "currency": {"type": "string"},
      "status": {"type": "string"},
      "created": {"type": "integer"},
      "billing_details": {
        "type": "object",
        "properties": {
          "name": {"type": "string"},
          "email": {"type": "string", "format": "email"}
        }
      }
    },
    "required": ["id", "amount", "currency", "status", "created"]
  }
}
```

### generate\_types() - Pydantic v2

```python
from pydantic import BaseModel, Field
from typing import Any

class BillingDetails(BaseModel):
    name: str
    email: str | None = Field(default=None)

class ChargesResponse(BaseModel):
    id: str
    amount: int
    currency: str
    status: str
    created: int
    billing_details: BillingDetails | None = Field(default=None)
```

***

## Path Normalization

Routes are automatically normalized to group similar paths:

| Raw Path                             | Normalized Pattern      |
| ------------------------------------ | ----------------------- |
| `/users/123`                         | `/users/{id}`           |
| `/users/456`                         | `/users/{id}`           |
| `/orders/abc-def-123-456-789`        | `/orders/{uuid}`        |
| `/events/2024-01-15`                 | `/events/{date}`        |
| `/sessions/abc123def456789012345678` | `/sessions/{object_id}` |
| `/tokens/sk_test_abc123...`          | `/tokens/{token}`       |

This grouping ensures you get meaningful aggregates even with high-volume traffic containing many unique IDs.

***

## Supported Type Formats

| Format        | Description                    | Use Case                   |
| ------------- | ------------------------------ | -------------------------- |
| `json_schema` | Raw JSON Schema (RFC draft-07) | Language-agnostic, OpenAPI |
| `pydantic_v2` | Pydantic v2 BaseModel          | Modern Python              |
| `pydantic_v1` | Pydantic v1 compatible         | Legacy Python              |
| `typeddict`   | typing.TypedDict               | Lightweight typing         |
| `dataclass`   | @dataclass                     | Simple data containers     |

***

## Troubleshooting

### "Cannot connect to DevTools API"

Ensure Qtap is running with DevTools enabled:

{% tabs %}
{% tab title="Binary" %}

```bash
sudo qtap --enable-dev-tools
```

{% endtab %}

{% tab title="Docker" %}

```bash
docker run -d --name qtap \
  --user 0:0 --privileged \
  --cap-add CAP_BPF --cap-add CAP_SYS_ADMIN \
  --pid=host --network=host \
  -v /sys:/sys \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -e TINI_SUBREAPER=1 \
  -e ENABLE_DEV_TOOLS=true \
  --ulimit=memlock=-1 \
  us-docker.pkg.dev/qpoint-edge/public/qtap:v0
```

{% endtab %}
{% endtabs %}

Verify it's accessible:

```bash
curl -s http://localhost:10001/devtools/api/events | head -3
```

### "No traffic captured"

Traffic capture is **live and on-demand**, not persistent background collection.

* Ensure traffic is flowing **while** `store_traffic()` runs
* Run your app/tests in one terminal, capture in another
* Try longer capture times: `store_traffic(seconds=120)`

### "No valid JSON bodies to analyze"

The endpoint may return non-JSON responses (HTML, XML, binary).

* Check `get_route_examples()` to see actual response bodies
* Schema inference only works with JSON content

### "MCP server not found"

Check your configuration path:

```bash
# Codex
cat ~/.codex/config.toml

# Claude Code
claude mcp list
```

Ensure the Python path is absolute and the script exists.

***

## Next Steps

* [DevTools MCP Server](/guides/devtools-guides/devtools-mcp-server.md) - Real-time traffic debugging
* [DevTools API Reference](/guides/devtools-guides/devtools-api.md) - Full API documentation
* [DevTools Interface Guide](/guides/devtools-guides/devtools-interface-guide.md) - Browser UI walkthrough


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.qpoint.io/guides/devtools-guides/traffic-analysis-mcp-server.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
