Unified Gateway Architecture Guide¶
Overview¶
The Unified Gateway (mcp_http_gateway.py) is a single entry point that serves both:
1. MCP Tools - Via subprocess worker pool (isolated, scalable)
2. REST APIs - Via direct function calls (low-latency, efficient)
This hybrid architecture allows you to choose the best execution model for each operation.
Architecture Diagram¶
┌────────────────────────────────────────────────────────┐
│ Unified Gateway │
│ (mcp_http_gateway.py) │
├────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Request Router │ │
│ │ - Classifies requests by path │ │
│ │ - Routes to appropriate handler │ │
│ └──────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌───────────────┴───────────────┐ │
│ │ │ │
│ ┌──▼─────────────┐ ┌────────────▼──────────┐ │
│ │ MCP Routes │ │ REST API Routes │ │
│ │ /tools/* │ │ /api/v1/* │ │
│ │ /ws/tools/* │ │ - Journey Management │ │
│ └────────┬───────┘ │ - Analytics │ │
│ │ │ - Authentication │ │
│ │ │ - Health Checks │ │
│ │ └────────────┬───────────┘ │
│ │ │ │
│ ┌────────▼───────┐ ┌───────────▼───────────┐ │
│ │ Worker Pool │ │ Direct Import │ │
│ │ (Subprocess) │ │ (Same Process) │ │
│ └────────┬───────┘ └───────────┬───────────┘ │
│ │ │ │
└───────────┼────────────────────────┼──────────────────┘
│ │
┌───────▼───────┐ ┌───────▼───────┐
│ MCP Server │ │ API Services │
│ (src/server) │ │ (src/apis) │
└───────────────┘ └───────────────┘
Endpoint Structure¶
MCP Tool Endpoints (Worker Pool)¶
POST /tools/{tool_name}- Execute MCP toolPOST /tools/{tool_name}/stream- Stream tool execution (SSE)WS /ws/tools/{tool_name}- WebSocket streamingGET /tools- List available toolsGET /stats- Worker pool statistics
REST API Endpoints (Direct Call)¶
/api/v1/journeys/*- Journey management/api/v1/analytics/*- Analytics and metrics/api/v1/auth/*- Authentication/api/v1/health/*- Health checks
Hybrid Endpoints¶
/api/v1/hybrid/*- Can use either worker or direct call
When to Use Each Type¶
Use MCP Workers (Subprocess) For:¶
| Scenario | Reason |
|---|---|
| Heavy computation | Process isolation prevents blocking |
| Long-running tasks | Non-blocking execution |
| Memory-intensive operations | Separate memory space |
| Untrusted code execution | Security isolation |
| ML model inference | Resource isolation |
| File processing | Prevents memory leaks |
| Batch operations | Parallel processing |
Use Direct REST APIs For:¶
| Scenario | Reason |
|---|---|
| Simple CRUD operations | Lower latency (~10ms vs ~100ms) |
| Database queries | Connection pool efficiency |
| Cache lookups | Minimal overhead |
| Authentication checks | Fast response needed |
| Health checks | Quick status checks |
| Real-time data | Minimal latency |
| High-frequency calls | Avoid subprocess overhead |
API Implementation Examples¶
1. Simple REST API (Direct Call)¶
# src/apis/journey_api.py
@router.get("/{journey_id}/status")
async def get_journey_status(journey_id: str):
"""Quick status check - direct database query"""
service = JourneyService()
result = await service.get_status(journey_id)
return {"status": result.status, "progress": result.progress}
2. Heavy Processing (Use Worker)¶
# For heavy operations, delegate to MCP worker internally
@router.post("/{journey_id}/analyze")
async def analyze_journey(journey_id: str):
"""Complex analysis - delegate to worker"""
# This internally uses the worker pool
worker = await pool.get_worker()
message = {
"method": "tools/call",
"params": {"name": "analyze_tool", "arguments": {...}}
}
return await worker.send_one(message, timeout=60)
3. Hybrid Approach¶
@app.post("/api/v1/hybrid/process")
async def hybrid_process(items: List[str], use_worker: bool = None):
"""Smart routing based on workload"""
if use_worker is None:
# Auto-decide based on size
use_worker = len(items) > 10
if use_worker:
# Large batch - use worker
return await process_with_worker(items)
else:
# Small batch - direct processing
return await process_directly(items)
Configuration¶
Environment Variables¶
# Worker Pool Configuration
MIN_WORKERS=2 # Minimum subprocess workers
MAX_WORKERS=8 # Maximum subprocess workers
MCP_REQUEST_TIMEOUT=60.0 # Worker request timeout
# API Configuration
API_RATE_LIMIT=100 # Requests per minute
API_CACHE_TTL=300 # Cache TTL in seconds
ENABLE_API_CACHE=true # Enable response caching
# Authentication
API_KEY=super-secret # API key for authentication
JWT_SECRET=change-this # JWT signing secret
# Logging
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR
Starting the Gateway¶
Basic Start¶
With Custom Configuration¶
export MIN_WORKERS=4
export MAX_WORKERS=16
export LOG_LEVEL=DEBUG
python mcp_http_gateway.py --port 8000
Using Docker¶
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["python", "mcp_http_gateway.py"]
Testing¶
Test All Components¶
# Test unified gateway (MCP + APIs)
python test_unified_gateway.py
# Test MCP integration only
python test_mcp_integration.py
Example API Calls¶
REST API (Direct)¶
# Quick journey status (direct call, ~10ms)
curl -X GET http://localhost:8000/api/v1/journeys/JRN-001/status \
-H "X-API-Key: super-secret"
# Login (direct call)
curl -X POST http://localhost:8000/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"username": "admin", "password": "admin123"}'
MCP Tool (Worker)¶
# Complex journey analysis (worker process, isolated)
curl -X POST http://localhost:8000/tools/journeys_tool \
-H "X-API-Key: super-secret" \
-H "Content-Type: application/json" \
-d '{"arguments": {"action": "analyze", "journey_id": "JRN-001"}}'
Streaming (SSE)¶
# Stream metrics (Server-Sent Events)
curl -X POST http://localhost:8000/api/v1/analytics/stream \
-H "X-API-Key: super-secret" \
-H "Content-Type: application/json" \
-d '{"stream_type": "metrics"}'
Performance Comparison¶
| Operation Type | Direct API | MCP Worker | Recommendation |
|---|---|---|---|
| Simple GET | ~10ms | ~100ms | Use Direct API |
| Database Query | ~20ms | ~120ms | Use Direct API |
| Complex Analysis | ~500ms | ~550ms | Use Worker (isolation) |
| Batch Processing | O(n) | O(n/workers) | Use Worker (parallel) |
| Memory Heavy | Risk OOM | Isolated | Use Worker |
| Real-time Stream | Low latency | Higher latency | Use Direct API |
Security Considerations¶
Authentication Options¶
- API Key - Simple, stateless
- JWT Token - Stateful, user-specific
- OAuth 2.0 - Third-party integration
Rate Limiting¶
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.get("/api/v1/resource")
@limiter.limit("10/minute")
async def limited_resource():
return {"data": "limited"}
Input Validation¶
All inputs validated through Pydantic models:
class JourneyCreateRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
priority: str = Field(..., pattern="^(low|medium|high)$")
Monitoring¶
Built-in Metrics¶
- Worker pool statistics:
/stats - Health checks:
/api/v1/health/ - Unified status:
/api/status
Custom Headers¶
Every response includes:
- X-Route-Type: mcp_worker | rest_api | system
- X-Response-Time: Response time in seconds
Logging¶
# All requests logged with routing information
[mcp_worker] POST /tools/analyze - Status: 200, Duration: 1.234s
[rest_api] GET /api/v1/journeys - Status: 200, Duration: 0.015s
Best Practices¶
- Use Direct APIs for UI - Lower latency for better UX
- Use Workers for Background Jobs - Process isolation
- Implement Caching - Reduce database load
- Monitor Worker Health - Auto-recovery enabled
- Use Streaming for Large Data - Reduce memory usage
- Implement Circuit Breakers - Fail gracefully
- Version Your APIs - Use
/api/v1/,/api/v2/ - Document Everything - OpenAPI at
/docs
Troubleshooting¶
Gateway Not Starting¶
# Check if port is in use
lsof -i :8000
# Check Python version (3.8+ required)
python --version
# Check dependencies
pip list | grep fastapi
Workers Not Processing¶
# Check worker status
curl http://localhost:8000/stats
# Check MCP server directly
python src/server.py --stdio
# Check logs
tail -f logs/gateway.log
API Errors¶
# Check API documentation
open http://localhost:8000/docs
# Test health endpoint
curl http://localhost:8000/api/v1/health/
# Check authentication
curl -I http://localhost:8000/api/v1/journeys \
-H "X-API-Key: your-key"
Future Enhancements¶
- [ ] GraphQL endpoint support
- [ ] gRPC for binary streaming
- [ ] Redis queue for job management
- [ ] Prometheus metrics export
- [ ] Distributed tracing (OpenTelemetry)
- [ ] WebSocket pub/sub for real-time events
- [ ] API versioning middleware
- [ ] Response compression
- [ ] Request/response caching layer
- [ ] Database connection pooling optimization