Upgrade your MCP server to use server-side event filtering for optimized AI-powered troubleshooting. Instead of receiving all events and filtering in Python, tell Qtap exactly what you want and receive only matching events.
Requires Qtap v0.14.0+: Server-side filtering is available in Qtap v0.14.0 and later. See Server-Side Event Filtering for the full filter reference.
What you'll build:
You: "Show me errors from the payment service"
AI: [Calling get_container_errors with filter: res.status >= 400 AND container.name == "payment-service"]
"Found 3 errors in payment-service over the last 60 seconds:
- POST /v1/charges → 402 Payment Required (insufficient funds)
- POST /v1/charges → 500 Internal Server Error (Stripe timeout)
- GET /v1/customers/cus_xxx → 404 Not Found
..."
Why Server-Side Filtering?
Aspect
Standard MCP (Client-Side)
Enhanced MCP (Server-Side)
Network
Receive 1000s of events
Receive only matching events
Processing
Decode/filter all in Python
Server handles filtering
Latency
Wait for full capture
Faster results
Bandwidth
High in busy environments
Minimal
Use Case
General exploration
Targeted troubleshooting
When to use this approach:
High-traffic environments where bandwidth matters
Incident response requiring focused debugging
Container-scoped troubleshooting in microservices
Security audits targeting specific patterns
Architecture Comparison
Standard MCP Server
Enhanced MCP Server
The server-side filter is applied at the Qtap level before events are streamed, dramatically reducing network traffic and processing overhead.
Enhanced MCP Server Code
Save this as devtools_mcp_enhanced.py:
Make it executable:
Troubleshooting Workflows
Workflow 1: Error-First Debugging
Scenario: Production incident - users reporting failures in the payment service.
Filter: res.status >= 400 AND container.name == "payment-service"
Example session:
Workflow 2: External API Audit
Scenario: Audit what external APIs your services are calling.
Filter: direction == "egress-external"
Example session:
Workflow 3: Container-Scoped Debugging
Scenario: A specific microservice is behaving unexpectedly.
Filter: container.name == "user-service"
Example session:
Workflow 4: Process-Scoped Debugging
Scenario: Need to understand what a specific process is doing.
DevTools API ──GET──> All Events ──> Python MCP Server ──> Filter ──> AI
(1000s) (decode all) (discard most)
DevTools API ──POST + Filter──> Matching Events Only ──> Python MCP Server ──> AI
(10s or 100s) (decode matches)
#!/usr/bin/env python3
"""
Enhanced MCP Server with Server-Side Filtering for Qtap DevTools.
Uses POST with filter expressions (v0.14.0+) for targeted event capture.
Works with Codex CLI, ChatGPT, and Claude Code.
Tools:
- get_errors: HTTP errors with server-side status filtering
- get_errors_by_container: Errors scoped to specific container
- get_container_traffic: All traffic for a specific container
- get_process_traffic: All traffic from a specific process
- get_external_traffic: External egress traffic for API auditing
- get_traffic_filtered: Custom filter expression support
- get_traffic_summary: Quick overview of patterns
- get_hosts: External API dependency mapping
- check_sensitive_data: Security audit for leaked secrets
"""
import base64
import json
import re
import time
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Qtap DevTools Enhanced")
DEVTOOLS_URL = "http://localhost:10001/devtools/api/events"
# Sensitive data patterns for security scanning
SENSITIVE_PATTERNS = [
(r'[Aa]uthorization', 'Authorization header'),
(r'[Bb]earer\s+[A-Za-z0-9\-_\.]+', 'Bearer token'),
(r'[Aa]pi[-_]?[Kk]ey', 'API key'),
(r'[Ss]ecret', 'Secret'),
(r'[Pp]assword', 'Password'),
(r'[Tt]oken', 'Token'),
(r'[Cc]ookie', 'Cookie'),
(r'[Ss]ession[-_]?[Ii]d', 'Session ID'),
(r'[Cc]redential', 'Credential'),
(r'[Pp]rivate[-_]?[Kk]ey', 'Private key'),
(r'sk-[a-zA-Z0-9]{20,}', 'OpenAI API key'),
(r'ghp_[a-zA-Z0-9]{36}', 'GitHub PAT'),
(r'xox[baprs]-[a-zA-Z0-9-]+', 'Slack token'),
]
def _capture_http_filtered(
seconds: int = 30,
max_http: int = 500,
filter_expr: str = None
) -> list | dict:
"""
Capture HTTP transactions with optional server-side filtering (v0.14.0+).
Args:
seconds: Capture duration (max 300)
max_http: Maximum HTTP transactions to return
filter_expr: Rulekit filter expression for server-side filtering
Returns:
List of decoded HTTP transactions or error dict
"""
seconds = min(seconds, 300)
try:
with httpx.Client(timeout=seconds + 10) as client:
events = []
first_event_time = None
# Build request based on filter
if filter_expr:
# Server-side filtering via POST
payload = {"topics": {"http": filter_expr}}
response_ctx = client.stream(
"POST",
DEVTOOLS_URL,
json=payload,
timeout=seconds + 10
)
else:
# No filter - GET all events
response_ctx = client.stream(
"GET",
DEVTOOLS_URL,
timeout=seconds + 10
)
with response_ctx as response:
current_event_type = None
start = time.time()
for line in response.iter_lines():
if time.time() - start > seconds:
break
if line.startswith("event: "):
current_event_type = line[7:]
elif line.startswith("data: "):
try:
event_data = json.loads(line[6:])
event_ts = event_data.get("ts", "")
if first_event_time is None:
first_event_time = event_ts
# Only process HTTP transactions
if current_event_type == "request.http_transaction":
decoded = _decode_http_transaction({
"type": current_event_type,
"ts": event_ts,
"data": event_data.get("data", {})
})
if decoded:
events.append(decoded)
except json.JSONDecodeError:
pass
if len(events) >= max_http:
break
return events
except httpx.ConnectError:
return {
"error": "Cannot connect to DevTools API at localhost:10001",
"hint": "Start Qtap with --enable-dev-tools or ENABLE_DEV_TOOLS=true"
}
except Exception as e:
return {"error": str(e)}
def _decode_http_transaction(event: dict) -> dict | None:
"""Decode a raw http_transaction event into structured data."""
inner = event.get("data", {})
if isinstance(inner, dict) and "data" in inner:
payload = inner.get("data")
if isinstance(payload, dict) and "data" in payload:
payload = payload.get("data")
if not isinstance(payload, str):
return None
try:
decoded = json.loads(base64.b64decode(payload))
# Decode response body
res = decoded.get("response", {})
if res.get("body"):
try:
res["body"] = base64.b64decode(res["body"]).decode('utf-8', errors='replace')
except:
res["body"] = "[binary data]"
# Decode request body
req = decoded.get("request", {})
if req.get("body"):
try:
req["body"] = base64.b64decode(req["body"]).decode('utf-8', errors='replace')
except:
req["body"] = "[binary data]"
return {
"ts": event.get("ts"),
"metadata": decoded.get("metadata", {}),
"request": req,
"response": res
}
except:
pass
return None
# =============================================================================
# Server-Side Filtered Tools
# =============================================================================
@mcp.tool()
def get_errors(seconds: int = 60) -> dict:
"""
Get HTTP errors (4xx/5xx) using server-side filtering.
Best for: "What's failing?", "Show me all errors", "Debug 500s"
Uses server-side filter: res.status >= 400
Args:
seconds: Capture duration (default 60, max 300)
Returns: Error requests with full details for debugging.
"""
events = _capture_http_filtered(
seconds=seconds,
max_http=500,
filter_expr="res.status >= 400"
)
if isinstance(events, dict) and "error" in events:
return events
errors = []
for event in events:
req = event.get("request", {})
res = event.get("response", {})
errors.append({
"ts": event.get("ts"),
"host": req.get("authority"),
"method": req.get("method"),
"path": req.get("path"),
"status": res.get("status"),
"request_headers": req.get("headers", {}),
"request_body": req.get("body", "")[:1000] if req.get("body") else None,
"response_headers": res.get("headers", {}),
"response_body": res.get("body", "")[:3000] if res.get("body") else None
})
by_status = {}
for err in errors:
status = str(err["status"])
by_status[status] = by_status.get(status, 0) + 1
return {
"filter_used": "res.status >= 400",
"capture_seconds": seconds,
"error_count": len(errors),
"by_status": by_status,
"errors": errors[:200]
}
@mcp.tool()
def get_errors_by_container(container_name: str, seconds: int = 60) -> dict:
"""
Get HTTP errors from a specific container using server-side filtering.
Best for: "What's failing in payment-service?", "Errors from my-app container"
Uses server-side filter: res.status >= 400 AND container.name == "..."
Args:
container_name: Docker container name to filter by
seconds: Capture duration (default 60, max 300)
Returns: Errors from the specified container only.
"""
filter_expr = f'res.status >= 400 AND container.name == "{container_name}"'
events = _capture_http_filtered(
seconds=seconds,
max_http=500,
filter_expr=filter_expr
)
if isinstance(events, dict) and "error" in events:
return events
errors = []
for event in events:
req = event.get("request", {})
res = event.get("response", {})
errors.append({
"ts": event.get("ts"),
"host": req.get("authority"),
"method": req.get("method"),
"path": req.get("path"),
"status": res.get("status"),
"response_body": res.get("body", "")[:3000] if res.get("body") else None
})
return {
"filter_used": filter_expr,
"container": container_name,
"capture_seconds": seconds,
"error_count": len(errors),
"errors": errors[:100]
}
@mcp.tool()
def get_process_traffic(process_name: str, seconds: int = 60, max_requests: int = 200) -> dict:
"""
Get all HTTP traffic from a specific process using server-side filtering.
Best for: "What's python doing?", "Traffic from node", "Process debugging"
Uses server-side filter: process.binary == "..."
Args:
process_name: Process binary name to filter by (e.g., "python3.11", "node")
seconds: Capture duration (default 60, max 300)
max_requests: Maximum requests to return (default 200)
Returns: All HTTP traffic from the specified process.
"""
filter_expr = f'process.binary == "{process_name}"'
events = _capture_http_filtered(
seconds=seconds,
max_http=max_requests,
filter_expr=filter_expr
)
if isinstance(events, dict) and "error" in events:
return events
requests = []
hosts = {}
methods = {}
status_codes = {}
for event in events:
req = event.get("request", {})
res = event.get("response", {})
host = req.get("authority", "unknown")
method = req.get("method", "?")
status = res.get("status", 0)
hosts[host] = hosts.get(host, 0) + 1
methods[method] = methods.get(method, 0) + 1
status_codes[str(status)] = status_codes.get(str(status), 0) + 1
requests.append({
"ts": event.get("ts"),
"host": host,
"method": method,
"path": req.get("path"),
"status": status
})
return {
"filter_used": filter_expr,
"process": process_name,
"capture_seconds": seconds,
"total_requests": len(requests),
"hosts": dict(sorted(hosts.items(), key=lambda x: -x[1])),
"methods": methods,
"status_codes": dict(sorted(status_codes.items())),
"requests": requests
}
@mcp.tool()
def get_container_traffic(container_name: str, seconds: int = 60, max_requests: int = 200) -> dict:
"""
Get all HTTP traffic from a specific container using server-side filtering.
Best for: "What's payment-service doing?", "Traffic from my-app"
Uses server-side filter: container.name == "..."
Args:
container_name: Docker container name to filter by
seconds: Capture duration (default 60, max 300)
max_requests: Maximum requests to return (default 200)
Returns: All HTTP traffic from the specified container.
"""
filter_expr = f'container.name == "{container_name}"'
events = _capture_http_filtered(
seconds=seconds,
max_http=max_requests,
filter_expr=filter_expr
)
if isinstance(events, dict) and "error" in events:
return events
requests = []
hosts = {}
methods = {}
status_codes = {}
for event in events:
req = event.get("request", {})
res = event.get("response", {})
host = req.get("authority", "unknown")
method = req.get("method", "?")
status = res.get("status", 0)
hosts[host] = hosts.get(host, 0) + 1
methods[method] = methods.get(method, 0) + 1
status_codes[str(status)] = status_codes.get(str(status), 0) + 1
requests.append({
"ts": event.get("ts"),
"host": host,
"method": method,
"path": req.get("path"),
"status": status
})
return {
"filter_used": filter_expr,
"container": container_name,
"capture_seconds": seconds,
"total_requests": len(requests),
"hosts": dict(sorted(hosts.items(), key=lambda x: -x[1])),
"methods": methods,
"status_codes": dict(sorted(status_codes.items())),
"requests": requests
}
@mcp.tool()
def get_external_traffic(seconds: int = 60, max_requests: int = 200) -> dict:
"""
Get external egress HTTP traffic using server-side filtering.
Best for: "What external APIs are we calling?", "Outbound traffic audit"
Uses server-side filter: direction == "egress-external"
Args:
seconds: Capture duration (default 60, max 300)
max_requests: Maximum requests to return (default 200)
Returns: External HTTP requests for API dependency analysis.
"""
filter_expr = 'direction == "egress-external"'
events = _capture_http_filtered(
seconds=seconds,
max_http=max_requests,
filter_expr=filter_expr
)
if isinstance(events, dict) and "error" in events:
return events
requests = []
by_host = {}
for event in events:
req = event.get("request", {})
res = event.get("response", {})
meta = event.get("metadata", {})
host = req.get("authority", "unknown")
by_host[host] = by_host.get(host, 0) + 1
requests.append({
"ts": event.get("ts"),
"host": host,
"method": req.get("method"),
"path": req.get("path"),
"status": res.get("status"),
"process": meta.get("process_exe")
})
return {
"filter_used": filter_expr,
"capture_seconds": seconds,
"external_requests": len(requests),
"by_host": dict(sorted(by_host.items(), key=lambda x: -x[1])),
"requests": requests
}
@mcp.tool()
def get_traffic_filtered(filter_expr: str, seconds: int = 60, max_requests: int = 200) -> dict:
"""
Get HTTP traffic with a custom server-side filter expression.
Best for: Complex queries requiring custom filter logic.
Filter examples:
- "res.status >= 500" - Server errors only
- "req.method == \"POST\"" - POST requests only
- "req.host == \"api.stripe.com\"" - Stripe API calls
- "process.binary == \"python3.11\"" - Python process traffic
- "res.status >= 400 AND container.name == \"my-app\"" - Container errors
- "direction == \"egress-external\"" - External API traffic
Args:
filter_expr: Rulekit filter expression
seconds: Capture duration (default 60, max 300)
max_requests: Maximum requests to return (default 200)
Returns: HTTP transactions matching the filter.
"""
events = _capture_http_filtered(
seconds=seconds,
max_http=max_requests,
filter_expr=filter_expr
)
if isinstance(events, dict) and "error" in events:
return events
requests = []
for event in events:
req = event.get("request", {})
res = event.get("response", {})
meta = event.get("metadata", {})
requests.append({
"ts": event.get("ts"),
"host": req.get("authority"),
"method": req.get("method"),
"path": req.get("path"),
"status": res.get("status"),
"process": meta.get("process_exe"),
"request_headers": req.get("headers", {}),
"response_body": res.get("body", "")[:2000] if res.get("body") else None
})
return {
"filter_used": filter_expr,
"capture_seconds": seconds,
"match_count": len(requests),
"requests": requests
}
# =============================================================================
# Standard Tools (No Server-Side Filtering)
# =============================================================================
@mcp.tool()
def get_traffic_summary(seconds: int = 60) -> dict:
"""
Quick overview of HTTP traffic patterns.
Best for: "What's happening?", "Traffic overview", "Any errors?"
Args:
seconds: Capture duration (default 60, max 300)
Returns: Host counts, method breakdown, status distribution.
"""
events = _capture_http_filtered(seconds=seconds, max_http=1000)
if isinstance(events, dict) and "error" in events:
return events
hosts = {}
status_codes = {}
methods = {}
errors = []
for event in events:
req = event.get("request", {})
res = event.get("response", {})
host = req.get("authority", "unknown")
method = req.get("method", "?")
status = res.get("status", 0)
hosts[host] = hosts.get(host, 0) + 1
methods[method] = methods.get(method, 0) + 1
status_codes[str(status)] = status_codes.get(str(status), 0) + 1
if status >= 400:
errors.append({
"host": host,
"method": method,
"path": req.get("path", "/")[:60],
"status": status
})
return {
"total_requests": len(events),
"unique_hosts": len(hosts),
"hosts": dict(sorted(hosts.items(), key=lambda x: -x[1])[:25]),
"methods": methods,
"status_codes": dict(sorted(status_codes.items())),
"error_count": len(errors),
"errors": errors[:30]
}
@mcp.tool()
def get_hosts(seconds: int = 60) -> dict:
"""
Map all external hosts/APIs being called.
Best for: "What APIs are we calling?", "External dependencies"
Args:
seconds: Capture duration (default 60, max 300)
Returns: Hosts with request counts, methods, status codes, paths.
"""
events = _capture_http_filtered(seconds=seconds, max_http=1000)
if isinstance(events, dict) and "error" in events:
return events
hosts = {}
for event in events:
req = event.get("request", {})
res = event.get("response", {})
host = req.get("authority", "unknown")
if host not in hosts:
hosts[host] = {
"count": 0,
"methods": set(),
"status_codes": set(),
"paths": [],
"errors": 0
}
hosts[host]["count"] += 1
hosts[host]["methods"].add(req.get("method", "?"))
hosts[host]["status_codes"].add(res.get("status", 0))
if res.get("status", 0) >= 400:
hosts[host]["errors"] += 1
if len(hosts[host]["paths"]) < 5:
hosts[host]["paths"].append(req.get("path", "/")[:60])
result = {}
for host, data in sorted(hosts.items(), key=lambda x: -x[1]["count"]):
result[host] = {
"requests": data["count"],
"errors": data["errors"],
"methods": sorted(data["methods"]),
"status_codes": sorted(data["status_codes"]),
"sample_paths": data["paths"]
}
return {
"capture_seconds": seconds,
"total_requests": len(events),
"unique_hosts": len(result),
"hosts": result
}
@mcp.tool()
def check_sensitive_data(seconds: int = 60) -> dict:
"""
Scan traffic for sensitive data: API keys, tokens, passwords.
Best for: "Are we leaking secrets?", "Security audit"
Args:
seconds: Capture duration (default 60, max 300)
Returns: Report of sensitive patterns found.
"""
events = _capture_http_filtered(seconds=seconds, max_http=500)
if isinstance(events, dict) and "error" in events:
return events
findings = []
for event in events:
req = event.get("request", {})
res = event.get("response", {})
host = req.get("authority", "unknown")
path = req.get("path", "/")
event_findings = []
# Check request headers
for header, value in req.get("headers", {}).items():
for pattern, label in SENSITIVE_PATTERNS:
if re.search(pattern, header, re.IGNORECASE) or re.search(pattern, str(value)):
event_findings.append({
"location": f"request header: {header}",
"type": label,
"preview": str(value)[:80] + ("..." if len(str(value)) > 80 else "")
})
break
# Check response headers
for header, value in res.get("headers", {}).items():
for pattern, label in SENSITIVE_PATTERNS:
if re.search(pattern, header, re.IGNORECASE):
event_findings.append({
"location": f"response header: {header}",
"type": label
})
break
# Check request body
req_body = str(req.get("body", ""))
if req_body:
for pattern, label in SENSITIVE_PATTERNS:
match = re.search(f'.{{0,30}}{pattern}.{{0,30}}', req_body, re.IGNORECASE)
if match:
event_findings.append({
"location": "request body",
"type": label,
"context": match.group(0)
})
# Check response body
res_body = str(res.get("body", ""))
if res_body and len(res_body) < 50000:
for pattern, label in SENSITIVE_PATTERNS:
match = re.search(f'.{{0,30}}{pattern}.{{0,30}}', res_body, re.IGNORECASE)
if match:
event_findings.append({
"location": "response body",
"type": label,
"context": match.group(0)
})
if event_findings:
findings.append({
"host": host,
"method": req.get("method"),
"path": path[:100],
"findings": event_findings
})
return {
"requests_scanned": len(events),
"requests_with_sensitive_data": len(findings),
"risk_level": "HIGH" if len(findings) > 5 else "MEDIUM" if len(findings) > 0 else "LOW",
"findings": findings
}
if __name__ == "__main__":
mcp.run()
chmod +x devtools_mcp_enhanced.py
You: "What errors are happening in the payment service?"
AI: [Calling get_errors_by_container("payment-service")]
"Found 5 errors in payment-service over the last 60 seconds:
1. POST /v1/charges → 402 Payment Required
Response: {"error": {"code": "card_declined", "message": "Your card has insufficient funds"}}
2. POST /v1/charges → 500 Internal Server Error
Response: {"error": {"type": "api_error", "message": "Stripe timeout after 30s"}}
3. GET /v1/customers/cus_xxx → 404 Not Found
Response: {"error": {"code": "resource_missing"}}
The 500 error suggests a timeout issue with Stripe. Consider:
- Checking Stripe's status page
- Reviewing timeout configuration
- Adding retry logic for transient failures"
You: "What external APIs are we calling? Any credential leaks?"
AI: [Calling get_external_traffic()]
"Found 25 external requests over the last 60 seconds:
By host:
- api.stripe.com: 8 requests
- api.github.com: 7 requests
- api.openai.com: 5 requests
- api.sendgrid.com: 5 requests
Let me scan for sensitive data leaks..."
AI: [Calling check_sensitive_data()]
"Security scan complete:
- 25 requests scanned
- 0 sensitive data leaks detected
- Risk level: LOW
All credentials are properly contained in Authorization headers."
You: "What's the user-service container doing?"
AI: [Calling get_container_traffic("user-service")]
"Traffic from user-service over 60 seconds:
Total: 45 requests
Hosts:
- postgres-primary:5432: 28 requests
- redis-cache:6379: 12 requests
- api.sendgrid.com: 5 requests
Methods: GET (35), POST (10)
Status codes: 200 (42), 201 (2), 500 (1)
There's 1 error - let me get details..."
AI: [Calling get_errors_by_container("user-service")]
"1 error found:
- POST to api.sendgrid.com/v3/mail/send → 500
Response: Rate limit exceeded
The user-service is hitting SendGrid rate limits."
You: "What HTTP calls is Python making?"
AI: [Calling get_process_traffic("python3.11")]
"Found 45 requests from python3.11 over 60 seconds:
By host:
- api.stripe.com: 15 requests
- api.openai.com: 12 requests
- postgres-primary:5432: 10 requests
- redis-cache:6379: 8 requests
Methods: GET (28), POST (17)
Status codes: 200 (40), 201 (3), 500 (2)
There are 2 errors - let me get details..."
AI: [Calling get_traffic_filtered("process.binary == \"python3.11\" AND res.status >= 400")]
"2 errors from Python:
- POST to api.stripe.com/v1/charges → 402 (card declined)
- POST to api.openai.com/v1/chat/completions → 500 (rate limit)
The OpenAI 500 suggests you're hitting rate limits."
# Errors from a specific container
res.status >= 400 AND container.name == "payment-service"
# External API errors
res.status >= 400 AND direction == "egress-external"
# Stripe errors from Python
req.host == "api.stripe.com" AND res.status >= 400 AND process.binary == "python3.11"
# POST requests to external services
req.method == "POST" AND direction == "egress-external"
# Install dependencies
pip install mcp httpx
# Save the enhanced MCP server code
# (copy from above to devtools_mcp_enhanced.py)
# Make executable
chmod +x devtools_mcp_enhanced.py