# Middleware System Gobstopper provides a flexible and powerful middleware system for handling cross-cutting concerns like security, CORS, static file serving, and custom request/response processing. Middleware functions execute in a prioritized chain around each request. ## Overview The middleware system features: - **Prioritized Execution**: Middleware executes in priority order (higher numbers first) - **Request/Response Processing**: Full access to modify requests and responses - **Built-in Middleware**: Security, CORS, static file serving with Rust optimization - **Custom Middleware**: Easy creation of custom middleware functions - **Flexible Integration**: Works with both sync and async middleware ## Middleware Architecture ### Execution Order Middleware executes in a chain around the route handler: ``` Request → Middleware 1 → Middleware 2 → Route Handler → Middleware 2 → Middleware 1 → Response ``` Higher priority middleware executes first (outer layers) and can: - Modify the request before it reaches lower priority middleware - Short-circuit the chain by returning a response early - Modify the response after it comes back from the handler ## Adding Middleware ### Basic Middleware Registration ```python from gobstopper import Gobstopper app = Gobstopper() # Add middleware with priority (higher numbers execute first) async def logging_middleware(request, next_handler): """Log request information""" start_time = time.time() app.logger.info(f"→ {request.method} {request.path}") # Continue to next handler response = await next_handler(request) duration = time.time() - start_time app.logger.info(f"← {response.status} ({duration:.3f}s)") return response # Register with priority app.add_middleware(logging_middleware, priority=100) ``` ### Middleware with Classes ```python class RequestTimingMiddleware: """Middleware class for request timing""" def __init__(self, slow_request_threshold: float = 1.0): self.threshold = slow_request_threshold async def __call__(self, request, next_handler): start_time = time.time() response = await next_handler(request) duration = time.time() - start_time # Add timing header response.headers['X-Response-Time'] = f"{duration:.3f}s" # Log slow requests if duration > self.threshold: app.logger.warning(f"Slow request: {request.path} took {duration:.3f}s") return response # Register class-based middleware app.add_middleware(RequestTimingMiddleware(slow_request_threshold=0.5), priority=90) ``` ## Built-in Middleware ### Security Middleware Provides CSRF protection, security headers, and a powerful, pluggable session management system. ```python from gobstopper.middleware.security import SecurityMiddleware from gobstopper.sessions.redis_storage import AsyncRedisSessionStorage from redis.asyncio import Redis # Example of advanced configuration redis_client = Redis.from_url("redis://localhost") storage = AsyncRedisSessionStorage(client=redis_client, ttl=86400) # 1 day TTL security_middleware = SecurityMiddleware( secret_key="your-super-secret-key-that-is-long-and-random", session_storage=storage, rolling_sessions=True, sign_session_id=True, cookie_secure=True, cookie_samesite="Lax" ) app.add_middleware(security_middleware, priority=100) ``` #### Security Headers The `SecurityMiddleware` automatically adds modern security headers to every response. These can be customized in the constructor. #### Session Management The session management system is highly configurable and supports multiple backends. See the [Application Core documentation](./application.md#session-management) for details on choosing a backend. **Configuration Options:** - `session_storage`: An instance of a session storage backend (e.g., `MemorySessionStorage`, `AsyncRedisSessionStorage`). Defaults to `FileSessionStorage`. - `cookie_name`: The name of the session cookie. Defaults to `"session_id"`. - `cookie_secure`: Whether the cookie should only be sent over HTTPS. Defaults to `True`. - `cookie_httponly`: Whether the cookie should be inaccessible to JavaScript. Defaults to `True`. - `cookie_samesite`: The `SameSite` attribute of the cookie (`"Strict"`, `"Lax"`, `"None"`). Defaults to `"Strict"`. - `cookie_path`: The path for the cookie. Defaults to `/`. - `cookie_domain`: The domain for the cookie. Defaults to `None`. - `cookie_max_age`: The maximum age of the cookie in seconds. - `rolling_sessions`: If `True`, the session's expiration time is refreshed on every request. Defaults to `False`. - `sign_session_id`: If `True`, the session ID stored in the cookie is cryptographically signed to prevent tampering. Requires `secret_key`. Defaults to `False`. **Usage in Route Handlers:** ```python from gobstopper.http.response import Response @app.post("/login") async def login(request): # ... authenticate user ... # Create a new session session_data = {"user_id": user.id} session_id = await security_middleware.create_session(session_data) # Sign the cookie value if signing is enabled cookie_val = ( security_middleware._sign(session_id) if security_middleware.sign_session_id else session_id ) resp = Response("", status=302, headers={"Location": "/dashboard"}) # Use the response helper to set the cookie resp.set_cookie( security_middleware.cookie_name, cookie_val, path=security_middleware.cookie_path, domain=security_middleware.cookie_domain, max_age=security_middleware.cookie_max_age, secure=security_middleware.cookie_secure, httponly=security_middleware.cookie_httponly, samesite=security_middleware.cookie_samesite, ) return resp @app.get("/profile") async def get_profile(request): # The session data is automatically loaded into request.session if not request.session: return Response("Not authenticated", status=401) return {"user_id": request.session.get("user_id")} @app.post("/logout") async def logout(request): # Destroy the session on the backend if getattr(request, "_session_id", None): await security_middleware.destroy_session(request._session_id) resp = Response("", status=302, headers={"Location": "/"}) # Use the response helper to delete the cookie from the browser resp.delete_cookie( security_middleware.cookie_name, path=security_middleware.cookie_path, domain=security_middleware.cookie_domain ) return resp ``` ### CORS Middleware Handles Cross-Origin Resource Sharing for API access: ```python from gobstopper.middleware.cors import CORSMiddleware # Allow all origins (development only) app.add_middleware(CORSMiddleware(), priority=95) # Production CORS configuration cors_middleware = CORSMiddleware( origins=["https://myapp.com", "https://admin.myapp.com"], methods=["GET", "POST", "PUT", "DELETE"], headers=["Content-Type", "Authorization", "X-API-Key"], allow_credentials=True, max_age=3600 # Cache preflight for 1 hour ) app.add_middleware(cors_middleware, priority=95) # API-specific CORS api_cors = CORSMiddleware( origins=["*"], # Allow all for public API methods=["GET", "POST"], headers=["Content-Type"], allow_credentials=False ) # Apply only to API routes (middleware can be conditional) async def conditional_cors_middleware(request, next_handler): if request.path.startswith("/api/public"): return await api_cors(request, next_handler) else: return await next_handler(request) app.add_middleware(conditional_cors_middleware, priority=90) ``` ### Static File Middleware #### Python Static Middleware Basic static file serving: ```python from gobstopper.middleware.static import StaticFileMiddleware # Basic static serving app.add_middleware( StaticFileMiddleware(static_dir="static", url_prefix="/static"), priority=80 ) # Multiple static directories app.add_middleware( StaticFileMiddleware(static_dir="assets", url_prefix="/assets"), priority=80 ) app.add_middleware( StaticFileMiddleware(static_dir="uploads", url_prefix="/uploads"), priority=79 ) ``` #### Rust Static Middleware High-performance static file serving with caching: ```python from gobstopper.middleware.rust_static import RustStaticMiddleware # Ultra-fast Rust static serving app.add_middleware( RustStaticMiddleware(directory="static", url_prefix="/static"), priority=85 ) # Advanced hybrid serving from gobstopper.middleware.rust_static import HybridStaticMiddleware hybrid_static = HybridStaticMiddleware( directory="static", url_prefix="/static" ) app.add_middleware(hybrid_static, priority=85) # Clear cache when needed @app.post("/admin/clear-static-cache") async def clear_cache(request): hybrid_static.rust_middleware.clear_cache() return {"status": "cache_cleared"} ``` ### Limits Middleware Provides request body size and read timeout protection. This middleware integrates with Request body reading to enforce limits early and fail fast. - max_body_bytes: Maximum allowed request body size in bytes. Requests exceeding this limit result in 413 Request Entity Too Large. - timeout_s: Maximum time in seconds to read the request body before returning 504 Gateway Timeout. Example: ```python from gobstopper.middleware.limits import LimitsMiddleware # Enforce a 5 MB body limit and 10s read timeout app.add_middleware(LimitsMiddleware(max_body_bytes=5 * 1024 * 1024, timeout_s=10), priority=120) ``` Notes: - Limits are applied to streaming body reads as well. - The 413 and 504 responses use RFC7807 problem+json when appropriate via error handlers. ## Custom Middleware ### Authentication Middleware ```python import jwt from datetime import datetime async def jwt_auth_middleware(request, next_handler): """JWT authentication middleware""" # Skip auth for public endpoints if request.path in ["/login", "/register", "/health"]: return await next_handler(request) # Get token from header auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): return JSONResponse({"error": "Authentication required"}, status=401) token = auth_header[7:] # Remove "Bearer " try: # Verify JWT token payload = jwt.decode(token, app.config["JWT_SECRET"], algorithms=["HS256"]) # Add user info to request request.user = { "id": payload["user_id"], "username": payload["username"], "role": payload.get("role", "user"), "exp": payload["exp"] } # Check token expiration if datetime.utcnow().timestamp() > payload["exp"]: return JSONResponse({"error": "Token expired"}, status=401) return await next_handler(request) except jwt.InvalidTokenError: return JSONResponse({"error": "Invalid token"}, status=401) app.add_middleware(jwt_auth_middleware, priority=95) # Routes can now access request.user @app.get("/api/profile") async def get_profile(request): return {"user": request.user} ``` ### Rate Limiting Middleware ```python import time from collections import defaultdict, deque class RateLimitMiddleware: """Rate limiting middleware""" def __init__(self, requests_per_minute: int = 60, burst_limit: int = 10): self.requests_per_minute = requests_per_minute self.burst_limit = burst_limit self.request_times = defaultdict(deque) # ip -> deque of timestamps self.burst_counts = defaultdict(int) # ip -> current burst count self.last_reset = defaultdict(float) # ip -> last reset time async def __call__(self, request, next_handler): client_ip = request.client_ip now = time.time() # Reset burst counter every minute if now - self.last_reset[client_ip] > 60: self.burst_counts[client_ip] = 0 self.last_reset[client_ip] = now # Remove old timestamps (outside 1-minute window) while (self.request_times[client_ip] and now - self.request_times[client_ip][0] > 60): self.request_times[client_ip].popleft() # Check rate limits requests_in_minute = len(self.request_times[client_ip]) if requests_in_minute >= self.requests_per_minute: return JSONResponse( {"error": "Rate limit exceeded", "retry_after": 60}, status=429, headers={"Retry-After": "60"} ) # Check burst limit if self.burst_counts[client_ip] >= self.burst_limit: return JSONResponse( {"error": "Too many requests", "retry_after": 10}, status=429, headers={"Retry-After": "10"} ) # Record this request self.request_times[client_ip].append(now) self.burst_counts[client_ip] += 1 # Process request response = await next_handler(request) # Add rate limit headers response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute) response.headers["X-RateLimit-Remaining"] = str( self.requests_per_minute - requests_in_minute - 1 ) response.headers["X-RateLimit-Reset"] = str(int(now + 60)) return response # Apply to API routes only async def api_rate_limiter(request, next_handler): if request.path.startswith("/api/"): rate_limiter = RateLimitMiddleware(requests_per_minute=100, burst_limit=20) return await rate_limiter(request, next_handler) else: return await next_handler(request) app.add_middleware(api_rate_limiter, priority=90) ``` ### Request Validation Middleware ```python import json from jsonschema import validate, ValidationError class RequestValidationMiddleware: """JSON schema validation middleware""" def __init__(self, schemas: dict): self.schemas = schemas # path -> schema mapping async def __call__(self, request, next_handler): # Only validate POST/PUT/PATCH requests if request.method not in ["POST", "PUT", "PATCH"]: return await next_handler(request) # Check if we have a schema for this path schema = self.schemas.get(request.path) if not schema: return await next_handler(request) try: # Parse request body data = await request.get_json() # Validate against schema validate(instance=data, schema=schema) # Store validated data for handler request.validated_data = data return await next_handler(request) except json.JSONDecodeError: return JSONResponse( {"error": "Invalid JSON", "details": "Request body must be valid JSON"}, status=400 ) except ValidationError as e: return JSONResponse( {"error": "Validation failed", "details": str(e)}, status=400 ) # Define schemas user_schema = { "type": "object", "properties": { "name": {"type": "string", "minLength": 1, "maxLength": 100}, "email": {"type": "string", "format": "email"}, "age": {"type": "integer", "minimum": 0, "maximum": 150} }, "required": ["name", "email"], "additionalProperties": False } post_schema = { "type": "object", "properties": { "title": {"type": "string", "minLength": 1, "maxLength": 200}, "content": {"type": "string", "minLength": 1}, "tags": { "type": "array", "items": {"type": "string"}, "maxItems": 10 } }, "required": ["title", "content"], "additionalProperties": False } # Register validation middleware validation_middleware = RequestValidationMiddleware({ "/api/users": user_schema, "/api/posts": post_schema }) app.add_middleware(validation_middleware, priority=85) # Handlers can access validated data @app.post("/api/users") async def create_user(request): # request.validated_data contains the validated JSON user_data = request.validated_data user = await create_user_record(user_data) return {"user": user} ``` ### Request Transformation Middleware ```python async def api_transformation_middleware(request, next_handler): """Transform API requests and responses""" # Only transform API requests if not request.path.startswith("/api/"): return await next_handler(request) # Transform request if request.method in ["POST", "PUT", "PATCH"]: try: data = await request.get_json() # Convert snake_case to camelCase for API compatibility transformed_data = snake_to_camel(data) # Store transformed data request._transformed_json = transformed_data except Exception: pass # Leave original data if transformation fails # Process request response = await next_handler(request) # Transform response if isinstance(response, JSONResponse): try: # Parse response data response_data = json.loads(response.body) # Convert camelCase back to snake_case transformed_response = camel_to_snake(response_data) # Create new response with transformed data response = JSONResponse(transformed_response, status=response.status) response.headers.update(response.headers) except Exception: pass # Return original response if transformation fails return response def snake_to_camel(data): """Convert snake_case keys to camelCase""" if isinstance(data, dict): return { camel_case(k): snake_to_camel(v) for k, v in data.items() } elif isinstance(data, list): return [snake_to_camel(item) for item in data] else: return data def camel_to_snake(data): """Convert camelCase keys to snake_case""" if isinstance(data, dict): return { snake_case(k): camel_to_snake(v) for k, v in data.items() } elif isinstance(data, list): return [camel_to_snake(item) for item in data] else: return data def camel_case(snake_str): """Convert snake_case to camelCase""" components = snake_str.split('_') return components[0] + ''.join(x.title() for x in components[1:]) def snake_case(camel_str): """Convert camelCase to snake_case""" import re s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_str) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() app.add_middleware(api_transformation_middleware, priority=80) ``` ## Middleware Patterns ### Conditional Middleware ```python async def conditional_middleware(request, next_handler): """Apply different logic based on request properties""" # Different handling for different paths if request.path.startswith("/admin/"): return await admin_middleware(request, next_handler) elif request.path.startswith("/api/"): return await api_middleware(request, next_handler) else: return await web_middleware(request, next_handler) async def admin_middleware(request, next_handler): """Admin-specific middleware""" # Check admin authentication if not await is_admin_user(request): return Response("Admin access required", status=403) return await next_handler(request) async def api_middleware(request, next_handler): """API-specific middleware""" # Add API versioning, rate limiting, etc. response = await next_handler(request) response.headers["X-API-Version"] = "1.0" return response async def web_middleware(request, next_handler): """Web UI specific middleware""" # Add CSP headers for web pages response = await next_handler(request) response.headers["Content-Security-Policy"] = "default-src 'self'" return response app.add_middleware(conditional_middleware, priority=75) ``` ### Middleware Composition ```python def compose_middleware(*middlewares): """Compose multiple middleware functions into one""" async def composed_middleware(request, next_handler): # Create a chain of middleware handler = next_handler # Apply middleware in reverse order for middleware in reversed(middlewares): current_handler = handler async def make_handler(mw, h): return lambda req: await mw(req, h) handler = await make_handler(middleware, current_handler) return await handler(request) return composed_middleware # Compose middleware for API endpoints api_middleware_stack = compose_middleware( jwt_auth_middleware, rate_limiter_middleware, request_validation_middleware, api_transformation_middleware ) app.add_middleware(api_middleware_stack, priority=85) ``` ### Error Handling Middleware ```python async def error_handling_middleware(request, next_handler): """Catch and handle errors from downstream middleware/handlers""" try: return await next_handler(request) except ValidationError as e: app.logger.warning(f"Validation error for {request.path}: {e}") return JSONResponse( {"error": "Invalid request", "details": str(e)}, status=400 ) except AuthenticationError as e: app.logger.warning(f"Auth error for {request.path}: {e}") return JSONResponse( {"error": "Authentication failed"}, status=401 ) except PermissionError as e: app.logger.warning(f"Permission error for {request.path}: {e}") return JSONResponse( {"error": "Access denied"}, status=403 ) except DatabaseError as e: app.logger.error(f"Database error for {request.path}: {e}", exc_info=True) return JSONResponse( {"error": "Internal server error"}, status=500 ) except Exception as e: app.logger.error(f"Unexpected error for {request.path}: {e}", exc_info=True) return JSONResponse( {"error": "Internal server error"}, status=500 ) # Apply error handling at high priority to catch all errors app.add_middleware(error_handling_middleware, priority=110) ``` ## Advanced Middleware Features ### Middleware with State ```python class StatefulMiddleware: """Middleware that maintains state across requests""" def __init__(self): self.request_count = 0 self.unique_ips = set() self.start_time = time.time() async def __call__(self, request, next_handler): # Update statistics self.request_count += 1 self.unique_ips.add(request.client_ip) # Add statistics headers response = await next_handler(request) response.headers["X-Total-Requests"] = str(self.request_count) response.headers["X-Unique-IPs"] = str(len(self.unique_ips)) response.headers["X-Uptime"] = str(int(time.time() - self.start_time)) return response app.add_middleware(StatefulMiddleware(), priority=70) ``` ### Async Context Middleware ```python import contextvars # Context variables for request-scoped data request_id_var = contextvars.ContextVar('request_id') user_var = contextvars.ContextVar('user') async def context_middleware(request, next_handler): """Set up request context""" request_id = str(uuid.uuid4()) request_id_var.set(request_id) # Set user context if authenticated if hasattr(request, 'user'): user_var.set(request.user) # Add request ID to response headers response = await next_handler(request) response.headers["X-Request-ID"] = request_id return response app.add_middleware(context_middleware, priority=105) # Use context in other parts of the application def get_current_user(): """Get current user from context""" return user_var.get(None) def get_request_id(): """Get current request ID from context""" return request_id_var.get(None) ``` ## Middleware Best Practices ### Performance Considerations 1. **Order Matters**: High-priority middleware executes first and last 2. **Early Returns**: Return responses early when possible to skip unnecessary processing 3. **Async Operations**: Use async/await for I/O operations to avoid blocking 4. **Resource Management**: Clean up resources in finally blocks or context managers ### Security Best Practices 1. **Security First**: Apply security middleware at high priority 2. **Input Validation**: Validate inputs early in the middleware chain 3. **Rate Limiting**: Implement rate limiting to prevent abuse 4. **Error Handling**: Don't expose internal errors to clients ### Development Tips ```python # Middleware execution tracker (development only) if app.config.get("DEBUG"): async def debug_middleware(request, next_handler): print(f"→ Entering middleware chain for {request.path}") response = await next_handler(request) print(f"← Exiting middleware chain with status {response.status}") return response app.add_middleware(debug_middleware, priority=200) # Conditional middleware based on environment import os if os.getenv("ENABLE_PROFILING"): from .profiling import ProfilingMiddleware app.add_middleware(ProfilingMiddleware(), priority=60) if os.getenv("ENABLE_METRICS"): from .metrics import MetricsMiddleware app.add_middleware(MetricsMiddleware(), priority=55) ``` The middleware system in Gobstopper provides a flexible and powerful way to handle cross-cutting concerns while maintaining clean separation of responsibilities and optimal performance through prioritized execution.