Middleware System

Gobstopper provides a flexible and powerful middleware system for handling cross-cutting concerns like security, CORS, static file serving, and custom request/response processing. Middleware functions execute in a prioritized chain around each request.

Overview

The middleware system features:

  • Prioritized Execution: Middleware executes in priority order (higher numbers first)

  • Request/Response Processing: Full access to modify requests and responses

  • Built-in Middleware: Security, CORS, static file serving with Rust optimization

  • Custom Middleware: Easy creation of custom middleware functions

  • Flexible Integration: Works with both sync and async middleware

Middleware Architecture

Execution Order

Middleware executes in a chain around the route handler:

Request → Middleware 1 → Middleware 2 → Route Handler → Middleware 2 → Middleware 1 → Response

Higher priority middleware executes first (outer layers) and can:

  • Modify the request before it reaches lower priority middleware

  • Short-circuit the chain by returning a response early

  • Modify the response after it comes back from the handler

Adding Middleware

Basic Middleware Registration

from gobstopper import Gobstopper

app = Gobstopper()

# Add middleware with priority (higher numbers execute first)
async def logging_middleware(request, next_handler):
    """Log request information"""
    start_time = time.time()
    
    app.logger.info(f"→ {request.method} {request.path}")
    
    # Continue to next handler
    response = await next_handler(request)
    
    duration = time.time() - start_time
    app.logger.info(f"← {response.status} ({duration:.3f}s)")
    
    return response

# Register with priority
app.add_middleware(logging_middleware, priority=100)

Middleware with Classes

class RequestTimingMiddleware:
    """Middleware class for request timing"""
    
    def __init__(self, slow_request_threshold: float = 1.0):
        self.threshold = slow_request_threshold
    
    async def __call__(self, request, next_handler):
        start_time = time.time()
        
        response = await next_handler(request)
        
        duration = time.time() - start_time
        
        # Add timing header
        response.headers['X-Response-Time'] = f"{duration:.3f}s"
        
        # Log slow requests
        if duration > self.threshold:
            app.logger.warning(f"Slow request: {request.path} took {duration:.3f}s")
        
        return response

# Register class-based middleware
app.add_middleware(RequestTimingMiddleware(slow_request_threshold=0.5), priority=90)

Built-in Middleware

Security Middleware

Provides CSRF protection, security headers, and a powerful, pluggable session management system.

from gobstopper.middleware.security import SecurityMiddleware
from gobstopper.sessions.redis_storage import AsyncRedisSessionStorage
from redis.asyncio import Redis

# Example of advanced configuration
redis_client = Redis.from_url("redis://localhost")
storage = AsyncRedisSessionStorage(client=redis_client, ttl=86400) # 1 day TTL

security_middleware = SecurityMiddleware(
    secret_key="your-super-secret-key-that-is-long-and-random",
    session_storage=storage,
    rolling_sessions=True,
    sign_session_id=True,
    cookie_secure=True,
    cookie_samesite="Lax"
)
app.add_middleware(security_middleware, priority=100)

Security Headers

The SecurityMiddleware automatically adds modern security headers to every response. These can be customized in the constructor.

Session Management

The session management system is highly configurable and supports multiple backends. See the Application Core documentation for details on choosing a backend.

Configuration Options:

  • session_storage: An instance of a session storage backend (e.g., MemorySessionStorage, AsyncRedisSessionStorage). Defaults to FileSessionStorage.

  • cookie_name: The name of the session cookie. Defaults to "session_id".

  • cookie_secure: Whether the cookie should only be sent over HTTPS. Defaults to True.

  • cookie_httponly: Whether the cookie should be inaccessible to JavaScript. Defaults to True.

  • cookie_samesite: The SameSite attribute of the cookie ("Strict", "Lax", "None"). Defaults to "Strict".

  • cookie_path: The path for the cookie. Defaults to /.

  • cookie_domain: The domain for the cookie. Defaults to None.

  • cookie_max_age: The maximum age of the cookie in seconds.

  • rolling_sessions: If True, the session’s expiration time is refreshed on every request. Defaults to False.

  • sign_session_id: If True, the session ID stored in the cookie is cryptographically signed to prevent tampering. Requires secret_key. Defaults to False.

Usage in Route Handlers:

from gobstopper.http.response import Response

@app.post("/login")
async def login(request):
    # ... authenticate user ...
    
    # Create a new session
    session_data = {"user_id": user.id}
    session_id = await security_middleware.create_session(session_data)

    # Sign the cookie value if signing is enabled
    cookie_val = (
        security_middleware._sign(session_id)
        if security_middleware.sign_session_id
        else session_id
    )

    resp = Response("", status=302, headers={"Location": "/dashboard"})

    # Use the response helper to set the cookie
    resp.set_cookie(
        security_middleware.cookie_name,
        cookie_val,
        path=security_middleware.cookie_path,
        domain=security_middleware.cookie_domain,
        max_age=security_middleware.cookie_max_age,
        secure=security_middleware.cookie_secure,
        httponly=security_middleware.cookie_httponly,
        samesite=security_middleware.cookie_samesite,
    )
    return resp

@app.get("/profile")
async def get_profile(request):
    # The session data is automatically loaded into request.session
    if not request.session:
        return Response("Not authenticated", status=401)
    
    return {"user_id": request.session.get("user_id")}

@app.post("/logout")
async def logout(request):
    # Destroy the session on the backend
    if getattr(request, "_session_id", None):
        await security_middleware.destroy_session(request._session_id)

    resp = Response("", status=302, headers={"Location": "/"})
    
    # Use the response helper to delete the cookie from the browser
    resp.delete_cookie(
        security_middleware.cookie_name,
        path=security_middleware.cookie_path,
        domain=security_middleware.cookie_domain
    )
    return resp

CORS Middleware

Handles Cross-Origin Resource Sharing for API access:

from gobstopper.middleware.cors import CORSMiddleware

# Allow all origins (development only)
app.add_middleware(CORSMiddleware(), priority=95)

# Production CORS configuration
cors_middleware = CORSMiddleware(
    origins=["https://myapp.com", "https://admin.myapp.com"],
    methods=["GET", "POST", "PUT", "DELETE"],
    headers=["Content-Type", "Authorization", "X-API-Key"],
    allow_credentials=True,
    max_age=3600  # Cache preflight for 1 hour
)
app.add_middleware(cors_middleware, priority=95)

# API-specific CORS
api_cors = CORSMiddleware(
    origins=["*"],  # Allow all for public API
    methods=["GET", "POST"],
    headers=["Content-Type"],
    allow_credentials=False
)

# Apply only to API routes (middleware can be conditional)
async def conditional_cors_middleware(request, next_handler):
    if request.path.startswith("/api/public"):
        return await api_cors(request, next_handler)
    else:
        return await next_handler(request)

app.add_middleware(conditional_cors_middleware, priority=90)

Static File Middleware

Python Static Middleware

Basic static file serving:

from gobstopper.middleware.static import StaticFileMiddleware

# Basic static serving
app.add_middleware(
    StaticFileMiddleware(static_dir="static", url_prefix="/static"), 
    priority=80
)

# Multiple static directories
app.add_middleware(
    StaticFileMiddleware(static_dir="assets", url_prefix="/assets"),
    priority=80
)
app.add_middleware(
    StaticFileMiddleware(static_dir="uploads", url_prefix="/uploads"),
    priority=79
)

Rust Static Middleware

High-performance static file serving with caching:

from gobstopper.middleware.rust_static import RustStaticMiddleware

# Ultra-fast Rust static serving
app.add_middleware(
    RustStaticMiddleware(directory="static", url_prefix="/static"),
    priority=85
)

# Advanced hybrid serving
from gobstopper.middleware.rust_static import HybridStaticMiddleware

hybrid_static = HybridStaticMiddleware(
    directory="static",
    url_prefix="/static"
)
app.add_middleware(hybrid_static, priority=85)

# Clear cache when needed
@app.post("/admin/clear-static-cache")
async def clear_cache(request):
    hybrid_static.rust_middleware.clear_cache()
    return {"status": "cache_cleared"}

Limits Middleware

Provides request body size and read timeout protection. This middleware integrates with Request body reading to enforce limits early and fail fast.

  • max_body_bytes: Maximum allowed request body size in bytes. Requests exceeding this limit result in 413 Request Entity Too Large.

  • timeout_s: Maximum time in seconds to read the request body before returning 504 Gateway Timeout.

Example:

from gobstopper.middleware.limits import LimitsMiddleware

# Enforce a 5 MB body limit and 10s read timeout
app.add_middleware(LimitsMiddleware(max_body_bytes=5 * 1024 * 1024, timeout_s=10), priority=120)

Notes:

  • Limits are applied to streaming body reads as well.

  • The 413 and 504 responses use RFC7807 problem+json when appropriate via error handlers.

Custom Middleware

Authentication Middleware

import jwt
from datetime import datetime

async def jwt_auth_middleware(request, next_handler):
    """JWT authentication middleware"""
    # Skip auth for public endpoints
    if request.path in ["/login", "/register", "/health"]:
        return await next_handler(request)
    
    # Get token from header
    auth_header = request.headers.get("Authorization")
    if not auth_header or not auth_header.startswith("Bearer "):
        return JSONResponse({"error": "Authentication required"}, status=401)
    
    token = auth_header[7:]  # Remove "Bearer "
    
    try:
        # Verify JWT token
        payload = jwt.decode(token, app.config["JWT_SECRET"], algorithms=["HS256"])
        
        # Add user info to request
        request.user = {
            "id": payload["user_id"],
            "username": payload["username"],
            "role": payload.get("role", "user"),
            "exp": payload["exp"]
        }
        
        # Check token expiration
        if datetime.utcnow().timestamp() > payload["exp"]:
            return JSONResponse({"error": "Token expired"}, status=401)
        
        return await next_handler(request)
        
    except jwt.InvalidTokenError:
        return JSONResponse({"error": "Invalid token"}, status=401)

app.add_middleware(jwt_auth_middleware, priority=95)

# Routes can now access request.user
@app.get("/api/profile")
async def get_profile(request):
    return {"user": request.user}

Rate Limiting Middleware

import time
from collections import defaultdict, deque

class RateLimitMiddleware:
    """Rate limiting middleware"""
    
    def __init__(self, requests_per_minute: int = 60, burst_limit: int = 10):
        self.requests_per_minute = requests_per_minute
        self.burst_limit = burst_limit
        self.request_times = defaultdict(deque)  # ip -> deque of timestamps
        self.burst_counts = defaultdict(int)     # ip -> current burst count
        self.last_reset = defaultdict(float)     # ip -> last reset time
    
    async def __call__(self, request, next_handler):
        client_ip = request.client_ip
        now = time.time()
        
        # Reset burst counter every minute
        if now - self.last_reset[client_ip] > 60:
            self.burst_counts[client_ip] = 0
            self.last_reset[client_ip] = now
        
        # Remove old timestamps (outside 1-minute window)
        while (self.request_times[client_ip] and 
               now - self.request_times[client_ip][0] > 60):
            self.request_times[client_ip].popleft()
        
        # Check rate limits
        requests_in_minute = len(self.request_times[client_ip])
        
        if requests_in_minute >= self.requests_per_minute:
            return JSONResponse(
                {"error": "Rate limit exceeded", "retry_after": 60},
                status=429,
                headers={"Retry-After": "60"}
            )
        
        # Check burst limit
        if self.burst_counts[client_ip] >= self.burst_limit:
            return JSONResponse(
                {"error": "Too many requests", "retry_after": 10},
                status=429,
                headers={"Retry-After": "10"}
            )
        
        # Record this request
        self.request_times[client_ip].append(now)
        self.burst_counts[client_ip] += 1
        
        # Process request
        response = await next_handler(request)
        
        # Add rate limit headers
        response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute)
        response.headers["X-RateLimit-Remaining"] = str(
            self.requests_per_minute - requests_in_minute - 1
        )
        response.headers["X-RateLimit-Reset"] = str(int(now + 60))
        
        return response

# Apply to API routes only
async def api_rate_limiter(request, next_handler):
    if request.path.startswith("/api/"):
        rate_limiter = RateLimitMiddleware(requests_per_minute=100, burst_limit=20)
        return await rate_limiter(request, next_handler)
    else:
        return await next_handler(request)

app.add_middleware(api_rate_limiter, priority=90)

Request Validation Middleware

import json
from jsonschema import validate, ValidationError

class RequestValidationMiddleware:
    """JSON schema validation middleware"""
    
    def __init__(self, schemas: dict):
        self.schemas = schemas  # path -> schema mapping
    
    async def __call__(self, request, next_handler):
        # Only validate POST/PUT/PATCH requests
        if request.method not in ["POST", "PUT", "PATCH"]:
            return await next_handler(request)
        
        # Check if we have a schema for this path
        schema = self.schemas.get(request.path)
        if not schema:
            return await next_handler(request)
        
        try:
            # Parse request body
            data = await request.get_json()
            
            # Validate against schema
            validate(instance=data, schema=schema)
            
            # Store validated data for handler
            request.validated_data = data
            
            return await next_handler(request)
            
        except json.JSONDecodeError:
            return JSONResponse(
                {"error": "Invalid JSON", "details": "Request body must be valid JSON"},
                status=400
            )
        except ValidationError as e:
            return JSONResponse(
                {"error": "Validation failed", "details": str(e)},
                status=400
            )

# Define schemas
user_schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string", "minLength": 1, "maxLength": 100},
        "email": {"type": "string", "format": "email"},
        "age": {"type": "integer", "minimum": 0, "maximum": 150}
    },
    "required": ["name", "email"],
    "additionalProperties": False
}

post_schema = {
    "type": "object",
    "properties": {
        "title": {"type": "string", "minLength": 1, "maxLength": 200},
        "content": {"type": "string", "minLength": 1},
        "tags": {
            "type": "array",
            "items": {"type": "string"},
            "maxItems": 10
        }
    },
    "required": ["title", "content"],
    "additionalProperties": False
}

# Register validation middleware
validation_middleware = RequestValidationMiddleware({
    "/api/users": user_schema,
    "/api/posts": post_schema
})
app.add_middleware(validation_middleware, priority=85)

# Handlers can access validated data
@app.post("/api/users")
async def create_user(request):
    # request.validated_data contains the validated JSON
    user_data = request.validated_data
    user = await create_user_record(user_data)
    return {"user": user}

Request Transformation Middleware

async def api_transformation_middleware(request, next_handler):
    """Transform API requests and responses"""
    
    # Only transform API requests
    if not request.path.startswith("/api/"):
        return await next_handler(request)
    
    # Transform request
    if request.method in ["POST", "PUT", "PATCH"]:
        try:
            data = await request.get_json()
            
            # Convert snake_case to camelCase for API compatibility
            transformed_data = snake_to_camel(data)
            
            # Store transformed data
            request._transformed_json = transformed_data
            
        except Exception:
            pass  # Leave original data if transformation fails
    
    # Process request
    response = await next_handler(request)
    
    # Transform response
    if isinstance(response, JSONResponse):
        try:
            # Parse response data
            response_data = json.loads(response.body)
            
            # Convert camelCase back to snake_case
            transformed_response = camel_to_snake(response_data)
            
            # Create new response with transformed data
            response = JSONResponse(transformed_response, status=response.status)
            response.headers.update(response.headers)
            
        except Exception:
            pass  # Return original response if transformation fails
    
    return response

def snake_to_camel(data):
    """Convert snake_case keys to camelCase"""
    if isinstance(data, dict):
        return {
            camel_case(k): snake_to_camel(v) 
            for k, v in data.items()
        }
    elif isinstance(data, list):
        return [snake_to_camel(item) for item in data]
    else:
        return data

def camel_to_snake(data):
    """Convert camelCase keys to snake_case"""
    if isinstance(data, dict):
        return {
            snake_case(k): camel_to_snake(v) 
            for k, v in data.items()
        }
    elif isinstance(data, list):
        return [camel_to_snake(item) for item in data]
    else:
        return data

def camel_case(snake_str):
    """Convert snake_case to camelCase"""
    components = snake_str.split('_')
    return components[0] + ''.join(x.title() for x in components[1:])

def snake_case(camel_str):
    """Convert camelCase to snake_case"""
    import re
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_str)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

app.add_middleware(api_transformation_middleware, priority=80)

Middleware Patterns

Conditional Middleware

async def conditional_middleware(request, next_handler):
    """Apply different logic based on request properties"""
    
    # Different handling for different paths
    if request.path.startswith("/admin/"):
        return await admin_middleware(request, next_handler)
    elif request.path.startswith("/api/"):
        return await api_middleware(request, next_handler)
    else:
        return await web_middleware(request, next_handler)

async def admin_middleware(request, next_handler):
    """Admin-specific middleware"""
    # Check admin authentication
    if not await is_admin_user(request):
        return Response("Admin access required", status=403)
    return await next_handler(request)

async def api_middleware(request, next_handler):
    """API-specific middleware"""
    # Add API versioning, rate limiting, etc.
    response = await next_handler(request)
    response.headers["X-API-Version"] = "1.0"
    return response

async def web_middleware(request, next_handler):
    """Web UI specific middleware"""
    # Add CSP headers for web pages
    response = await next_handler(request)
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    return response

app.add_middleware(conditional_middleware, priority=75)

Middleware Composition

def compose_middleware(*middlewares):
    """Compose multiple middleware functions into one"""
    async def composed_middleware(request, next_handler):
        # Create a chain of middleware
        handler = next_handler
        
        # Apply middleware in reverse order
        for middleware in reversed(middlewares):
            current_handler = handler
            async def make_handler(mw, h):
                return lambda req: await mw(req, h)
            handler = await make_handler(middleware, current_handler)
        
        return await handler(request)
    
    return composed_middleware

# Compose middleware for API endpoints
api_middleware_stack = compose_middleware(
    jwt_auth_middleware,
    rate_limiter_middleware,
    request_validation_middleware,
    api_transformation_middleware
)

app.add_middleware(api_middleware_stack, priority=85)

Error Handling Middleware

async def error_handling_middleware(request, next_handler):
    """Catch and handle errors from downstream middleware/handlers"""
    try:
        return await next_handler(request)
        
    except ValidationError as e:
        app.logger.warning(f"Validation error for {request.path}: {e}")
        return JSONResponse(
            {"error": "Invalid request", "details": str(e)},
            status=400
        )
        
    except AuthenticationError as e:
        app.logger.warning(f"Auth error for {request.path}: {e}")
        return JSONResponse(
            {"error": "Authentication failed"},
            status=401
        )
        
    except PermissionError as e:
        app.logger.warning(f"Permission error for {request.path}: {e}")
        return JSONResponse(
            {"error": "Access denied"},
            status=403
        )
        
    except DatabaseError as e:
        app.logger.error(f"Database error for {request.path}: {e}", exc_info=True)
        return JSONResponse(
            {"error": "Internal server error"},
            status=500
        )
        
    except Exception as e:
        app.logger.error(f"Unexpected error for {request.path}: {e}", exc_info=True)
        return JSONResponse(
            {"error": "Internal server error"},
            status=500
        )

# Apply error handling at high priority to catch all errors
app.add_middleware(error_handling_middleware, priority=110)

Advanced Middleware Features

Middleware with State

class StatefulMiddleware:
    """Middleware that maintains state across requests"""
    
    def __init__(self):
        self.request_count = 0
        self.unique_ips = set()
        self.start_time = time.time()
    
    async def __call__(self, request, next_handler):
        # Update statistics
        self.request_count += 1
        self.unique_ips.add(request.client_ip)
        
        # Add statistics headers
        response = await next_handler(request)
        
        response.headers["X-Total-Requests"] = str(self.request_count)
        response.headers["X-Unique-IPs"] = str(len(self.unique_ips))
        response.headers["X-Uptime"] = str(int(time.time() - self.start_time))
        
        return response

app.add_middleware(StatefulMiddleware(), priority=70)

Async Context Middleware

import contextvars

# Context variables for request-scoped data
request_id_var = contextvars.ContextVar('request_id')
user_var = contextvars.ContextVar('user')

async def context_middleware(request, next_handler):
    """Set up request context"""
    request_id = str(uuid.uuid4())
    request_id_var.set(request_id)
    
    # Set user context if authenticated
    if hasattr(request, 'user'):
        user_var.set(request.user)
    
    # Add request ID to response headers
    response = await next_handler(request)
    response.headers["X-Request-ID"] = request_id
    
    return response

app.add_middleware(context_middleware, priority=105)

# Use context in other parts of the application
def get_current_user():
    """Get current user from context"""
    return user_var.get(None)

def get_request_id():
    """Get current request ID from context"""
    return request_id_var.get(None)

Middleware Best Practices

Performance Considerations

  1. Order Matters: High-priority middleware executes first and last

  2. Early Returns: Return responses early when possible to skip unnecessary processing

  3. Async Operations: Use async/await for I/O operations to avoid blocking

  4. Resource Management: Clean up resources in finally blocks or context managers

Security Best Practices

  1. Security First: Apply security middleware at high priority

  2. Input Validation: Validate inputs early in the middleware chain

  3. Rate Limiting: Implement rate limiting to prevent abuse

  4. Error Handling: Don’t expose internal errors to clients

Development Tips

# Middleware execution tracker (development only)
if app.config.get("DEBUG"):
    async def debug_middleware(request, next_handler):
        print(f"→ Entering middleware chain for {request.path}")
        response = await next_handler(request)
        print(f"← Exiting middleware chain with status {response.status}")
        return response
    
    app.add_middleware(debug_middleware, priority=200)

# Conditional middleware based on environment
import os

if os.getenv("ENABLE_PROFILING"):
    from .profiling import ProfilingMiddleware
    app.add_middleware(ProfilingMiddleware(), priority=60)

if os.getenv("ENABLE_METRICS"):
    from .metrics import MetricsMiddleware
    app.add_middleware(MetricsMiddleware(), priority=55)

The middleware system in Gobstopper provides a flexible and powerful way to handle cross-cutting concerns while maintaining clean separation of responsibilities and optimal performance through prioritized execution.