# Enhanced MCP with Server-Side Filtering

Upgrade your MCP server to use server-side event filtering for optimized AI-powered troubleshooting. Instead of receiving all events and filtering in Python, tell Qtap exactly what you want and receive only matching events.

{% hint style="info" %}
**Requires Qtap v0.14.0+**: Server-side filtering is available in Qtap v0.14.0 and later. See [Server-Side Event Filtering](/guides/devtools-guides/server-side-event-filtering.md) for the full filter reference.
{% endhint %}

**What you'll build:**

```
You: "Show me errors from the payment service"

AI: [Calling get_container_errors with filter: res.status >= 400 AND container.name == "payment-service"]
"Found 3 errors in payment-service over the last 60 seconds:
- POST /v1/charges → 402 Payment Required (insufficient funds)
- POST /v1/charges → 500 Internal Server Error (Stripe timeout)
- GET /v1/customers/cus_xxx → 404 Not Found
..."
```

***

## Why Server-Side Filtering?

| Aspect     | Standard MCP (Client-Side)  | Enhanced MCP (Server-Side)   |
| ---------- | --------------------------- | ---------------------------- |
| Network    | Receive 1000s of events     | Receive only matching events |
| Processing | Decode/filter all in Python | Server handles filtering     |
| Latency    | Wait for full capture       | Faster results               |
| Bandwidth  | High in busy environments   | Minimal                      |
| Use Case   | General exploration         | Targeted troubleshooting     |

**When to use this approach:**

* High-traffic environments where bandwidth matters
* Incident response requiring focused debugging
* Container-scoped troubleshooting in microservices
* Security audits targeting specific patterns

***

## Architecture Comparison

### Standard MCP Server

```
DevTools API ──GET──> All Events ──> Python MCP Server ──> Filter ──> AI
                      (1000s)           (decode all)        (discard most)
```

### Enhanced MCP Server

```
DevTools API ──POST + Filter──> Matching Events Only ──> Python MCP Server ──> AI
                                (10s or 100s)               (decode matches)
```

The server-side filter is applied at the Qtap level before events are streamed, dramatically reducing network traffic and processing overhead.

***

## Enhanced MCP Server Code

Save this as `devtools_mcp_enhanced.py`:

```python
#!/usr/bin/env python3
"""
Enhanced MCP Server with Server-Side Filtering for Qtap DevTools.

Uses POST with filter expressions (v0.14.0+) for targeted event capture.
Works with Codex CLI, ChatGPT, and Claude Code.

Tools:
- get_errors: HTTP errors with server-side status filtering
- get_errors_by_container: Errors scoped to specific container
- get_container_traffic: All traffic for a specific container
- get_process_traffic: All traffic from a specific process
- get_external_traffic: External egress traffic for API auditing
- get_traffic_filtered: Custom filter expression support
- get_traffic_summary: Quick overview of patterns
- get_hosts: External API dependency mapping
- check_sensitive_data: Security audit for leaked secrets
"""

import base64
import json
import re
import time
import httpx
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Qtap DevTools Enhanced")

DEVTOOLS_URL = "http://localhost:10001/devtools/api/events"

# Sensitive data patterns for security scanning
SENSITIVE_PATTERNS = [
    (r'[Aa]uthorization', 'Authorization header'),
    (r'[Bb]earer\s+[A-Za-z0-9\-_\.]+', 'Bearer token'),
    (r'[Aa]pi[-_]?[Kk]ey', 'API key'),
    (r'[Ss]ecret', 'Secret'),
    (r'[Pp]assword', 'Password'),
    (r'[Tt]oken', 'Token'),
    (r'[Cc]ookie', 'Cookie'),
    (r'[Ss]ession[-_]?[Ii]d', 'Session ID'),
    (r'[Cc]redential', 'Credential'),
    (r'[Pp]rivate[-_]?[Kk]ey', 'Private key'),
    (r'sk-[a-zA-Z0-9]{20,}', 'OpenAI API key'),
    (r'ghp_[a-zA-Z0-9]{36}', 'GitHub PAT'),
    (r'xox[baprs]-[a-zA-Z0-9-]+', 'Slack token'),
]


def _capture_http_filtered(
    seconds: int = 30,
    max_http: int = 500,
    filter_expr: str = None
) -> list | dict:
    """
    Capture HTTP transactions with optional server-side filtering (v0.14.0+).

    Args:
        seconds: Capture duration (max 300)
        max_http: Maximum HTTP transactions to return
        filter_expr: Rulekit filter expression for server-side filtering

    Returns:
        List of decoded HTTP transactions or error dict
    """
    seconds = min(seconds, 300)

    try:
        with httpx.Client(timeout=seconds + 10) as client:
            events = []
            first_event_time = None

            # Build request based on filter
            if filter_expr:
                # Server-side filtering via POST
                payload = {"topics": {"http": filter_expr}}
                response_ctx = client.stream(
                    "POST",
                    DEVTOOLS_URL,
                    json=payload,
                    timeout=seconds + 10
                )
            else:
                # No filter - GET all events
                response_ctx = client.stream(
                    "GET",
                    DEVTOOLS_URL,
                    timeout=seconds + 10
                )

            with response_ctx as response:
                current_event_type = None
                start = time.time()

                for line in response.iter_lines():
                    if time.time() - start > seconds:
                        break

                    if line.startswith("event: "):
                        current_event_type = line[7:]
                    elif line.startswith("data: "):
                        try:
                            event_data = json.loads(line[6:])
                            event_ts = event_data.get("ts", "")

                            if first_event_time is None:
                                first_event_time = event_ts

                            # Only process HTTP transactions
                            if current_event_type == "request.http_transaction":
                                decoded = _decode_http_transaction({
                                    "type": current_event_type,
                                    "ts": event_ts,
                                    "data": event_data.get("data", {})
                                })
                                if decoded:
                                    events.append(decoded)

                        except json.JSONDecodeError:
                            pass

                    if len(events) >= max_http:
                        break

            return events

    except httpx.ConnectError:
        return {
            "error": "Cannot connect to DevTools API at localhost:10001",
            "hint": "Start Qtap with --enable-dev-tools or ENABLE_DEV_TOOLS=true"
        }
    except Exception as e:
        return {"error": str(e)}


def _decode_http_transaction(event: dict) -> dict | None:
    """Decode a raw http_transaction event into structured data."""
    inner = event.get("data", {})
    if isinstance(inner, dict) and "data" in inner:
        payload = inner.get("data")
        if isinstance(payload, dict) and "data" in payload:
            payload = payload.get("data")
        if not isinstance(payload, str):
            return None
        try:
            decoded = json.loads(base64.b64decode(payload))

            # Decode response body
            res = decoded.get("response", {})
            if res.get("body"):
                try:
                    res["body"] = base64.b64decode(res["body"]).decode('utf-8', errors='replace')
                except:
                    res["body"] = "[binary data]"

            # Decode request body
            req = decoded.get("request", {})
            if req.get("body"):
                try:
                    req["body"] = base64.b64decode(req["body"]).decode('utf-8', errors='replace')
                except:
                    req["body"] = "[binary data]"

            return {
                "ts": event.get("ts"),
                "metadata": decoded.get("metadata", {}),
                "request": req,
                "response": res
            }
        except:
            pass
    return None


# =============================================================================
# Server-Side Filtered Tools
# =============================================================================

@mcp.tool()
def get_errors(seconds: int = 60) -> dict:
    """
    Get HTTP errors (4xx/5xx) using server-side filtering.

    Best for: "What's failing?", "Show me all errors", "Debug 500s"

    Uses server-side filter: res.status >= 400

    Args:
        seconds: Capture duration (default 60, max 300)

    Returns: Error requests with full details for debugging.
    """
    events = _capture_http_filtered(
        seconds=seconds,
        max_http=500,
        filter_expr="res.status >= 400"
    )

    if isinstance(events, dict) and "error" in events:
        return events

    errors = []
    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        errors.append({
            "ts": event.get("ts"),
            "host": req.get("authority"),
            "method": req.get("method"),
            "path": req.get("path"),
            "status": res.get("status"),
            "request_headers": req.get("headers", {}),
            "request_body": req.get("body", "")[:1000] if req.get("body") else None,
            "response_headers": res.get("headers", {}),
            "response_body": res.get("body", "")[:3000] if res.get("body") else None
        })

    by_status = {}
    for err in errors:
        status = str(err["status"])
        by_status[status] = by_status.get(status, 0) + 1

    return {
        "filter_used": "res.status >= 400",
        "capture_seconds": seconds,
        "error_count": len(errors),
        "by_status": by_status,
        "errors": errors[:200]
    }


@mcp.tool()
def get_errors_by_container(container_name: str, seconds: int = 60) -> dict:
    """
    Get HTTP errors from a specific container using server-side filtering.

    Best for: "What's failing in payment-service?", "Errors from my-app container"

    Uses server-side filter: res.status >= 400 AND container.name == "..."

    Args:
        container_name: Docker container name to filter by
        seconds: Capture duration (default 60, max 300)

    Returns: Errors from the specified container only.
    """
    filter_expr = f'res.status >= 400 AND container.name == "{container_name}"'

    events = _capture_http_filtered(
        seconds=seconds,
        max_http=500,
        filter_expr=filter_expr
    )

    if isinstance(events, dict) and "error" in events:
        return events

    errors = []
    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        errors.append({
            "ts": event.get("ts"),
            "host": req.get("authority"),
            "method": req.get("method"),
            "path": req.get("path"),
            "status": res.get("status"),
            "response_body": res.get("body", "")[:3000] if res.get("body") else None
        })

    return {
        "filter_used": filter_expr,
        "container": container_name,
        "capture_seconds": seconds,
        "error_count": len(errors),
        "errors": errors[:100]
    }


@mcp.tool()
def get_process_traffic(process_name: str, seconds: int = 60, max_requests: int = 200) -> dict:
    """
    Get all HTTP traffic from a specific process using server-side filtering.

    Best for: "What's python doing?", "Traffic from node", "Process debugging"

    Uses server-side filter: process.binary == "..."

    Args:
        process_name: Process binary name to filter by (e.g., "python3.11", "node")
        seconds: Capture duration (default 60, max 300)
        max_requests: Maximum requests to return (default 200)

    Returns: All HTTP traffic from the specified process.
    """
    filter_expr = f'process.binary == "{process_name}"'

    events = _capture_http_filtered(
        seconds=seconds,
        max_http=max_requests,
        filter_expr=filter_expr
    )

    if isinstance(events, dict) and "error" in events:
        return events

    requests = []
    hosts = {}
    methods = {}
    status_codes = {}

    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})

        host = req.get("authority", "unknown")
        method = req.get("method", "?")
        status = res.get("status", 0)

        hosts[host] = hosts.get(host, 0) + 1
        methods[method] = methods.get(method, 0) + 1
        status_codes[str(status)] = status_codes.get(str(status), 0) + 1

        requests.append({
            "ts": event.get("ts"),
            "host": host,
            "method": method,
            "path": req.get("path"),
            "status": status
        })

    return {
        "filter_used": filter_expr,
        "process": process_name,
        "capture_seconds": seconds,
        "total_requests": len(requests),
        "hosts": dict(sorted(hosts.items(), key=lambda x: -x[1])),
        "methods": methods,
        "status_codes": dict(sorted(status_codes.items())),
        "requests": requests
    }


@mcp.tool()
def get_container_traffic(container_name: str, seconds: int = 60, max_requests: int = 200) -> dict:
    """
    Get all HTTP traffic from a specific container using server-side filtering.

    Best for: "What's payment-service doing?", "Traffic from my-app"

    Uses server-side filter: container.name == "..."

    Args:
        container_name: Docker container name to filter by
        seconds: Capture duration (default 60, max 300)
        max_requests: Maximum requests to return (default 200)

    Returns: All HTTP traffic from the specified container.
    """
    filter_expr = f'container.name == "{container_name}"'

    events = _capture_http_filtered(
        seconds=seconds,
        max_http=max_requests,
        filter_expr=filter_expr
    )

    if isinstance(events, dict) and "error" in events:
        return events

    requests = []
    hosts = {}
    methods = {}
    status_codes = {}

    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})

        host = req.get("authority", "unknown")
        method = req.get("method", "?")
        status = res.get("status", 0)

        hosts[host] = hosts.get(host, 0) + 1
        methods[method] = methods.get(method, 0) + 1
        status_codes[str(status)] = status_codes.get(str(status), 0) + 1

        requests.append({
            "ts": event.get("ts"),
            "host": host,
            "method": method,
            "path": req.get("path"),
            "status": status
        })

    return {
        "filter_used": filter_expr,
        "container": container_name,
        "capture_seconds": seconds,
        "total_requests": len(requests),
        "hosts": dict(sorted(hosts.items(), key=lambda x: -x[1])),
        "methods": methods,
        "status_codes": dict(sorted(status_codes.items())),
        "requests": requests
    }


@mcp.tool()
def get_external_traffic(seconds: int = 60, max_requests: int = 200) -> dict:
    """
    Get external egress HTTP traffic using server-side filtering.

    Best for: "What external APIs are we calling?", "Outbound traffic audit"

    Uses server-side filter: direction == "egress-external"

    Args:
        seconds: Capture duration (default 60, max 300)
        max_requests: Maximum requests to return (default 200)

    Returns: External HTTP requests for API dependency analysis.
    """
    filter_expr = 'direction == "egress-external"'

    events = _capture_http_filtered(
        seconds=seconds,
        max_http=max_requests,
        filter_expr=filter_expr
    )

    if isinstance(events, dict) and "error" in events:
        return events

    requests = []
    by_host = {}

    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        meta = event.get("metadata", {})

        host = req.get("authority", "unknown")
        by_host[host] = by_host.get(host, 0) + 1

        requests.append({
            "ts": event.get("ts"),
            "host": host,
            "method": req.get("method"),
            "path": req.get("path"),
            "status": res.get("status"),
            "process": meta.get("process_exe")
        })

    return {
        "filter_used": filter_expr,
        "capture_seconds": seconds,
        "external_requests": len(requests),
        "by_host": dict(sorted(by_host.items(), key=lambda x: -x[1])),
        "requests": requests
    }


@mcp.tool()
def get_traffic_filtered(filter_expr: str, seconds: int = 60, max_requests: int = 200) -> dict:
    """
    Get HTTP traffic with a custom server-side filter expression.

    Best for: Complex queries requiring custom filter logic.

    Filter examples:
    - "res.status >= 500" - Server errors only
    - "req.method == \"POST\"" - POST requests only
    - "req.host == \"api.stripe.com\"" - Stripe API calls
    - "process.binary == \"python3.11\"" - Python process traffic
    - "res.status >= 400 AND container.name == \"my-app\"" - Container errors
    - "direction == \"egress-external\"" - External API traffic

    Args:
        filter_expr: Rulekit filter expression
        seconds: Capture duration (default 60, max 300)
        max_requests: Maximum requests to return (default 200)

    Returns: HTTP transactions matching the filter.
    """
    events = _capture_http_filtered(
        seconds=seconds,
        max_http=max_requests,
        filter_expr=filter_expr
    )

    if isinstance(events, dict) and "error" in events:
        return events

    requests = []
    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        meta = event.get("metadata", {})

        requests.append({
            "ts": event.get("ts"),
            "host": req.get("authority"),
            "method": req.get("method"),
            "path": req.get("path"),
            "status": res.get("status"),
            "process": meta.get("process_exe"),
            "request_headers": req.get("headers", {}),
            "response_body": res.get("body", "")[:2000] if res.get("body") else None
        })

    return {
        "filter_used": filter_expr,
        "capture_seconds": seconds,
        "match_count": len(requests),
        "requests": requests
    }


# =============================================================================
# Standard Tools (No Server-Side Filtering)
# =============================================================================

@mcp.tool()
def get_traffic_summary(seconds: int = 60) -> dict:
    """
    Quick overview of HTTP traffic patterns.

    Best for: "What's happening?", "Traffic overview", "Any errors?"

    Args:
        seconds: Capture duration (default 60, max 300)

    Returns: Host counts, method breakdown, status distribution.
    """
    events = _capture_http_filtered(seconds=seconds, max_http=1000)

    if isinstance(events, dict) and "error" in events:
        return events

    hosts = {}
    status_codes = {}
    methods = {}
    errors = []

    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})

        host = req.get("authority", "unknown")
        method = req.get("method", "?")
        status = res.get("status", 0)

        hosts[host] = hosts.get(host, 0) + 1
        methods[method] = methods.get(method, 0) + 1
        status_codes[str(status)] = status_codes.get(str(status), 0) + 1

        if status >= 400:
            errors.append({
                "host": host,
                "method": method,
                "path": req.get("path", "/")[:60],
                "status": status
            })

    return {
        "total_requests": len(events),
        "unique_hosts": len(hosts),
        "hosts": dict(sorted(hosts.items(), key=lambda x: -x[1])[:25]),
        "methods": methods,
        "status_codes": dict(sorted(status_codes.items())),
        "error_count": len(errors),
        "errors": errors[:30]
    }


@mcp.tool()
def get_hosts(seconds: int = 60) -> dict:
    """
    Map all external hosts/APIs being called.

    Best for: "What APIs are we calling?", "External dependencies"

    Args:
        seconds: Capture duration (default 60, max 300)

    Returns: Hosts with request counts, methods, status codes, paths.
    """
    events = _capture_http_filtered(seconds=seconds, max_http=1000)

    if isinstance(events, dict) and "error" in events:
        return events

    hosts = {}
    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        host = req.get("authority", "unknown")

        if host not in hosts:
            hosts[host] = {
                "count": 0,
                "methods": set(),
                "status_codes": set(),
                "paths": [],
                "errors": 0
            }

        hosts[host]["count"] += 1
        hosts[host]["methods"].add(req.get("method", "?"))
        hosts[host]["status_codes"].add(res.get("status", 0))
        if res.get("status", 0) >= 400:
            hosts[host]["errors"] += 1
        if len(hosts[host]["paths"]) < 5:
            hosts[host]["paths"].append(req.get("path", "/")[:60])

    result = {}
    for host, data in sorted(hosts.items(), key=lambda x: -x[1]["count"]):
        result[host] = {
            "requests": data["count"],
            "errors": data["errors"],
            "methods": sorted(data["methods"]),
            "status_codes": sorted(data["status_codes"]),
            "sample_paths": data["paths"]
        }

    return {
        "capture_seconds": seconds,
        "total_requests": len(events),
        "unique_hosts": len(result),
        "hosts": result
    }


@mcp.tool()
def check_sensitive_data(seconds: int = 60) -> dict:
    """
    Scan traffic for sensitive data: API keys, tokens, passwords.

    Best for: "Are we leaking secrets?", "Security audit"

    Args:
        seconds: Capture duration (default 60, max 300)

    Returns: Report of sensitive patterns found.
    """
    events = _capture_http_filtered(seconds=seconds, max_http=500)

    if isinstance(events, dict) and "error" in events:
        return events

    findings = []

    for event in events:
        req = event.get("request", {})
        res = event.get("response", {})
        host = req.get("authority", "unknown")
        path = req.get("path", "/")

        event_findings = []

        # Check request headers
        for header, value in req.get("headers", {}).items():
            for pattern, label in SENSITIVE_PATTERNS:
                if re.search(pattern, header, re.IGNORECASE) or re.search(pattern, str(value)):
                    event_findings.append({
                        "location": f"request header: {header}",
                        "type": label,
                        "preview": str(value)[:80] + ("..." if len(str(value)) > 80 else "")
                    })
                    break

        # Check response headers
        for header, value in res.get("headers", {}).items():
            for pattern, label in SENSITIVE_PATTERNS:
                if re.search(pattern, header, re.IGNORECASE):
                    event_findings.append({
                        "location": f"response header: {header}",
                        "type": label
                    })
                    break

        # Check request body
        req_body = str(req.get("body", ""))
        if req_body:
            for pattern, label in SENSITIVE_PATTERNS:
                match = re.search(f'.{{0,30}}{pattern}.{{0,30}}', req_body, re.IGNORECASE)
                if match:
                    event_findings.append({
                        "location": "request body",
                        "type": label,
                        "context": match.group(0)
                    })

        # Check response body
        res_body = str(res.get("body", ""))
        if res_body and len(res_body) < 50000:
            for pattern, label in SENSITIVE_PATTERNS:
                match = re.search(f'.{{0,30}}{pattern}.{{0,30}}', res_body, re.IGNORECASE)
                if match:
                    event_findings.append({
                        "location": "response body",
                        "type": label,
                        "context": match.group(0)
                    })

        if event_findings:
            findings.append({
                "host": host,
                "method": req.get("method"),
                "path": path[:100],
                "findings": event_findings
            })

    return {
        "requests_scanned": len(events),
        "requests_with_sensitive_data": len(findings),
        "risk_level": "HIGH" if len(findings) > 5 else "MEDIUM" if len(findings) > 0 else "LOW",
        "findings": findings
    }


if __name__ == "__main__":
    mcp.run()
```

Make it executable:

```bash
chmod +x devtools_mcp_enhanced.py
```

***

## Troubleshooting Workflows

### Workflow 1: Error-First Debugging

**Scenario**: Production incident - users reporting failures in the payment service.

**Filter**: `res.status >= 400 AND container.name == "payment-service"`

**Example session**:

```
You: "What errors are happening in the payment service?"

AI: [Calling get_errors_by_container("payment-service")]
"Found 5 errors in payment-service over the last 60 seconds:

1. POST /v1/charges → 402 Payment Required
   Response: {"error": {"code": "card_declined", "message": "Your card has insufficient funds"}}

2. POST /v1/charges → 500 Internal Server Error
   Response: {"error": {"type": "api_error", "message": "Stripe timeout after 30s"}}

3. GET /v1/customers/cus_xxx → 404 Not Found
   Response: {"error": {"code": "resource_missing"}}

The 500 error suggests a timeout issue with Stripe. Consider:
- Checking Stripe's status page
- Reviewing timeout configuration
- Adding retry logic for transient failures"
```

***

### Workflow 2: External API Audit

**Scenario**: Audit what external APIs your services are calling.

**Filter**: `direction == "egress-external"`

**Example session**:

```
You: "What external APIs are we calling? Any credential leaks?"

AI: [Calling get_external_traffic()]
"Found 25 external requests over the last 60 seconds:

By host:
- api.stripe.com: 8 requests
- api.github.com: 7 requests
- api.openai.com: 5 requests
- api.sendgrid.com: 5 requests

Let me scan for sensitive data leaks..."

AI: [Calling check_sensitive_data()]
"Security scan complete:
- 25 requests scanned
- 0 sensitive data leaks detected
- Risk level: LOW

All credentials are properly contained in Authorization headers."
```

***

### Workflow 3: Container-Scoped Debugging

**Scenario**: A specific microservice is behaving unexpectedly.

**Filter**: `container.name == "user-service"`

**Example session**:

```
You: "What's the user-service container doing?"

AI: [Calling get_container_traffic("user-service")]
"Traffic from user-service over 60 seconds:

Total: 45 requests
Hosts:
- postgres-primary:5432: 28 requests
- redis-cache:6379: 12 requests
- api.sendgrid.com: 5 requests

Methods: GET (35), POST (10)
Status codes: 200 (42), 201 (2), 500 (1)

There's 1 error - let me get details..."

AI: [Calling get_errors_by_container("user-service")]
"1 error found:
- POST to api.sendgrid.com/v3/mail/send → 500
  Response: Rate limit exceeded

The user-service is hitting SendGrid rate limits."
```

***

### Workflow 4: Process-Scoped Debugging

**Scenario**: Need to understand what a specific process is doing.

**Filter**: `process.binary == "python3.11"`

**Example session**:

```
You: "What HTTP calls is Python making?"

AI: [Calling get_process_traffic("python3.11")]
"Found 45 requests from python3.11 over 60 seconds:

By host:
- api.stripe.com: 15 requests
- api.openai.com: 12 requests
- postgres-primary:5432: 10 requests
- redis-cache:6379: 8 requests

Methods: GET (28), POST (17)
Status codes: 200 (40), 201 (3), 500 (2)

There are 2 errors - let me get details..."

AI: [Calling get_traffic_filtered("process.binary == \"python3.11\" AND res.status >= 400")]
"2 errors from Python:
- POST to api.stripe.com/v1/charges → 402 (card declined)
- POST to api.openai.com/v1/chat/completions → 500 (rate limit)

The OpenAI 500 suggests you're hitting rate limits."
```

***

## Filter Expression Quick Reference

### Common Filters

| Use Case             | Filter Expression                         |
| -------------------- | ----------------------------------------- |
| All errors           | `res.status >= 400`                       |
| Server errors only   | `res.status >= 500`                       |
| Client errors only   | `res.status >= 400 AND res.status < 500`  |
| Specific container   | `container.name == "my-app"`              |
| Specific process     | `process.binary == "python3.11"`          |
| POST/PUT/DELETE only | `req.method in ["POST", "PUT", "DELETE"]` |
| Specific host        | `req.host == "api.stripe.com"`            |
| External egress      | `direction == "egress-external"`          |
| HTTPS traffic        | `dst.port == 443`                         |
| HTTP/2 protocol      | `protocol == "http2"`                     |

### Combining Filters

```
# Errors from a specific container
res.status >= 400 AND container.name == "payment-service"

# External API errors
res.status >= 400 AND direction == "egress-external"

# Stripe errors from Python
req.host == "api.stripe.com" AND res.status >= 400 AND process.binary == "python3.11"

# POST requests to external services
req.method == "POST" AND direction == "egress-external"
```

See [Server-Side Event Filtering](/guides/devtools-guides/server-side-event-filtering.md) for the complete operator and field reference.

***

## Setup Instructions

### Prerequisites

* **Qtap v0.14.0+** with DevTools enabled
* **Python 3.10+**
* **mcp** and **httpx** packages

### Installation

```bash
# Install dependencies
pip install mcp httpx

# Save the enhanced MCP server code
# (copy from above to devtools_mcp_enhanced.py)

# Make executable
chmod +x devtools_mcp_enhanced.py
```

### Start Qtap with DevTools

{% tabs %}
{% tab title="Binary" %}

```bash
sudo qtap --enable-dev-tools
```

{% endtab %}

{% tab title="Docker" %}

```bash
docker run -d --name qtap \
  --user 0:0 --privileged \
  --cap-add CAP_BPF --cap-add CAP_SYS_ADMIN \
  --pid=host --network=host \
  -v /sys:/sys \
  -v /var/run/docker.sock:/var/run/docker.sock \
  -e TINI_SUBREAPER=1 \
  -e ENABLE_DEV_TOOLS=true \
  --ulimit=memlock=-1 \
  us-docker.pkg.dev/qpoint-edge/public/qtap:v0
```

{% endtab %}
{% endtabs %}

### Connect to AI Assistants

{% tabs %}
{% tab title="Claude Code" %}

```bash
claude mcp add --transport stdio devtools-enhanced -- python3 /path/to/devtools_mcp_enhanced.py
```

Verify:

```bash
claude mcp list
# Should show: devtools-enhanced: ... - ✓ Connected
```

{% endtab %}

{% tab title="Codex CLI" %}

```bash
codex mcp add devtools-enhanced -- python3 /path/to/devtools_mcp_enhanced.py
```

Or edit `~/.codex/config.toml`:

```toml
[mcp_servers.devtools-enhanced]
command = "python3"
args = ["/path/to/devtools_mcp_enhanced.py"]
```

{% endtab %}

{% tab title="ChatGPT" %}
Modify the server to use HTTP transport:

```python
if __name__ == "__main__":
    mcp.run(transport="http", host="0.0.0.0", port=8080)
```

Expose via HTTPS tunnel:

```bash
cloudflared tunnel --url http://localhost:8080
```

Add the tunnel URL to ChatGPT Developer Mode connectors.
{% endtab %}
{% endtabs %}

***

## Available Tools

| Tool                            | Server-Side Filter                              | Use Case                   |
| ------------------------------- | ----------------------------------------------- | -------------------------- |
| `get_errors()`                  | `res.status >= 400`                             | All HTTP errors            |
| `get_errors_by_container(name)` | `res.status >= 400 AND container.name == "..."` | Container-scoped errors    |
| `get_container_traffic(name)`   | `container.name == "..."`                       | All traffic from container |
| `get_process_traffic(name)`     | `process.binary == "..."`                       | All traffic from process   |
| `get_external_traffic()`        | `direction == "egress-external"`                | External API calls         |
| `get_traffic_filtered(expr)`    | Custom                                          | Any filter expression      |
| `get_traffic_summary()`         | None (all traffic)                              | Quick overview             |
| `get_hosts()`                   | None (all traffic)                              | API dependency mapping     |
| `check_sensitive_data()`        | None (all traffic)                              | Security audit             |

***

## Troubleshooting

### "No events received" with filter

1. **Verify filter syntax**: Test with `"*"` first to confirm events are flowing
2. **Check field names**: Use exact field names (`res.status`, not `status`)
3. **Verify Qtap version**: Server-side filtering requires v0.14.0+
4. **Generate matching traffic**: Your filter might be too restrictive

### Verify subscription accepted

The first event should be `system.connected` with your filter:

```bash
curl -sN -X POST http://localhost:10001/devtools/api/events \
  -H "Content-Type: application/json" \
  -d '{"topics": {"http": "res.status >= 400"}}' | head -3
```

Expected:

```
event: system.connected
data: {"data":{"topics":{"http":"res.status >= 400"}}}
```

### Check Qtap version

```bash
qtap --version
# Should be v0.14.0 or later
```

### Common filter mistakes

| Mistake                 | Fix                                              |
| ----------------------- | ------------------------------------------------ |
| `status >= 400`         | Use full path: `res.status >= 400`               |
| `method == "POST"`      | Use full path: `req.method == "POST"`            |
| `container == "my-app"` | Use full path: `container.name == "my-app"`      |
| Unescaped quotes        | Escape in JSON: `"container.name == \"my-app\""` |

***

## Next Steps

* [Server-Side Event Filtering](/guides/devtools-guides/server-side-event-filtering.md) - Full filter reference
* [DevTools MCP Server](/guides/devtools-guides/devtools-mcp-server.md) - Original MCP server (client-side filtering)
* [DevTools API](/guides/devtools-guides/devtools-api.md) - Event schemas and API reference
* [Traffic Analysis MCP Server](/guides/devtools-guides/traffic-analysis-mcp-server.md) - Type inference and test generation


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.qpoint.io/guides/devtools-guides/enhanced-mcp-troubleshooting.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
