HTTP Request and Response Handling

Gobstopper provides comprehensive HTTP request and response handling through RSGI protocol integration. The system supports async request parsing, multiple response types, and efficient data handling for high-performance applications.

Overview

The HTTP layer consists of:

  • Request: Async request parsing with lazy loading

  • Response: Flexible response generation with multiple content types

  • JSONResponse: JSON serialization and content type handling

  • FileResponse: Static file serving with MIME detection

  • StreamResponse: Streaming responses for real-time data

Request Class

The Request class provides async access to HTTP request data with lazy parsing for optimal performance.

Constructor

from gobstopper.http.request import Request

# Typically created automatically by Gobstopper
# request = Request(scope, protocol)

Parameters:

  • scope: RSGI request scope containing metadata

  • protocol: RSGI protocol instance for body reading

Properties

Basic Request Information

@app.get("/api/info")
async def request_info(request):
    info = {
        "method": request.method,           # GET, POST, PUT, DELETE, etc.
        "path": request.path,               # /api/info
        "query_string": request.query_string, # ?page=1&limit=10
        "client_ip": request.client_ip,     # Client IP (respects X-Forwarded-For)
        "user_agent": request.headers.get("user-agent", "Unknown")
    }
    return info

Headers

Request headers are accessible as a case-insensitive dictionary:

@app.post("/api/data")
async def handle_data(request):
    # Access headers
    content_type = request.headers.get("content-type")
    authorization = request.headers.get("authorization")
    custom_header = request.headers.get("x-custom-header")
    
    # Check authentication
    if not authorization:
        return {"error": "Authentication required"}, 401
        
    return {"received": True}

Query Parameters

Query parameters are parsed into a dictionary of lists:

@app.get("/search")
async def search(request):
    # URL: /search?q=python&category=web&category=api&page=2
    
    query = request.args.get("q", [""])[0]              # "python"
    categories = request.args.get("category", [])        # ["web", "api"]  
    page = int(request.args.get("page", ["1"])[0])       # 2
    
    return {
        "query": query,
        "categories": categories,
        "page": page
    }

# Utility function for simpler access
def get_arg(request, key, default="", as_type=str):
    """Get single query parameter with type conversion"""
    value = request.args.get(key, [default])[0]
    try:
        return as_type(value) if value != default else default
    except (ValueError, TypeError):
        return default

@app.get("/api/posts")
async def list_posts(request):
    page = get_arg(request, "page", 1, int)
    limit = get_arg(request, "limit", 10, int)
    category = get_arg(request, "category", "all")
    
    posts = await get_posts(page=page, limit=limit, category=category)
    return {"posts": posts}

Async Data Parsing

JSON Data

Parse JSON request bodies asynchronously:

@app.post("/api/users")
async def create_user(request):
    try:
        data = await request.get_json()
        
        if not data:
            return {"error": "No data provided"}, 400
            
        # Validate required fields
        required_fields = ["name", "email"]
        for field in required_fields:
            if field not in data:
                return {"error": f"{field} is required"}, 400
        
        # Create user
        user = await create_user_record(data)
        return {"user": user, "created": True}
        
    except ValueError:
        return {"error": "Invalid JSON format"}, 400
    except UnicodeDecodeError:
        return {"error": "Invalid character encoding"}, 400

# Complex JSON handling
@app.post("/api/bulk-import")
async def bulk_import(request):
    data = await request.get_json()
    
    if not isinstance(data, dict) or "items" not in data:
        return {"error": "Expected JSON object with 'items' array"}, 400
    
    items = data["items"]
    if not isinstance(items, list):
        return {"error": "'items' must be an array"}, 400
    
    results = []
    for i, item in enumerate(items):
        try:
            result = await process_item(item)
            results.append({"index": i, "status": "success", "result": result})
        except Exception as e:
            results.append({"index": i, "status": "error", "error": str(e)})
    
    return {"results": results, "total": len(items)}

Form Data

Parse form-encoded data (application/x-www-form-urlencoded):

@app.post("/contact")
async def contact_form(request):
    form = await request.get_form()
    
    # Extract form fields
    name = form.get("name", [""])[0]
    email = form.get("email", [""])[0]
    subject = form.get("subject", [""])[0]
    message = form.get("message", [""])[0]
    
    # Handle multiple values (checkboxes, multiple selects)
    interests = form.get("interests", [])  # Multiple checkboxes
    
    # Validation
    if not name or not email or not message:
        return {"error": "Name, email, and message are required"}, 400
    
    # Process form submission
    await send_contact_email(name, email, subject, message, interests)
    
    return {"success": True, "message": "Contact form submitted"}

# Form with file uploads (planned feature)
@app.post("/upload")
async def upload_file(request):
    # Note: Multipart/form-data support is planned
    # Current version only supports URL-encoded forms
    
    form = await request.get_form()
    description = form.get("description", [""])[0]
    
    # File upload handling will be added in future versions
    return {"error": "File uploads not yet supported"}, 501

Raw Body Data

Access raw request body bytes:

@app.post("/webhook")
async def webhook_handler(request):
    # Get raw body for signature verification
    body = await request.get_data()
    
    # Verify webhook signature
    signature = request.headers.get("x-signature")
    if not verify_signature(body, signature):
        return {"error": "Invalid signature"}, 401
    
    # Process webhook
    try:
        data = json.loads(body.decode('utf-8'))
        await process_webhook(data)
        return {"status": "processed"}
    except json.JSONDecodeError:
        return {"error": "Invalid JSON in webhook payload"}, 400

@app.post("/api/binary-data")
async def handle_binary(request):
    data = await request.get_data()
    
    # Process binary data
    if request.headers.get("content-type") == "application/octet-stream":
        result = await process_binary_data(data)
        return {"processed_bytes": len(data), "result": result}
    else:
        return {"error": "Expected binary data"}, 400

Client Information

IP Address Resolution

The client IP property respects proxy headers:

@app.get("/api/client-info")
async def client_info(request):
    # Respects X-Forwarded-For header for proxied requests
    client_ip = request.client_ip
    
    # Additional client information
    user_agent = request.headers.get("user-agent", "Unknown")
    accept_language = request.headers.get("accept-language", "")
    
    # Rate limiting example
    if await is_rate_limited(client_ip):
        return {"error": "Rate limit exceeded"}, 429
    
    return {
        "client_ip": client_ip,
        "user_agent": user_agent,
        "languages": accept_language.split(",")
    }

Custom Attributes

Add custom attributes to request objects:

@app.before_request
async def authenticate_user(request):
    """Add user information to request"""
    token = request.headers.get("authorization")
    if token and token.startswith("Bearer "):
        user = await verify_jwt_token(token[7:])
        request.user = user
        request.is_authenticated = True
    else:
        request.user = None
        request.is_authenticated = False

@app.get("/api/profile")
async def get_profile(request):
    if not request.is_authenticated:
        return {"error": "Authentication required"}, 401
    
    profile = await get_user_profile(request.user.id)
    return {"profile": profile}

Problem Details (RFC 7807)

Gobstopper provides a helper for generating machine-readable error responses that conform to RFC 7807 using the media type application/problem+json.

  • Helper: gobstopper.http.problem.problem()

  • Typical fields: type, title, status, detail, instance, plus extension members

Example:

from gobstopper.http.problem import problem

@app.get("/api/items/<id>")
async def get_item(request, id: str):
    item = await load_item(id)
    if not item:
        return problem(
            status=404,
            title="Item not found",
            type="https://example.com/probs/not-found",
            detail=f"No item with id {id}",
            instance=request.path,
            extra={"item_id": id}
        )
    return item

Notes:

  • Framework error handlers may also return problem+json for certain errors (e.g., 415/422 from typed parsers, 413 for oversized bodies).

  • Content negotiation utilities ensure the correct content type.

Request Too Large (413)

If a request body exceeds configured limits, the framework raises RequestTooLarge and returns a 413 response. When the LimitsMiddleware is enabled, this is enforced during body read.

# See Middleware docs for enabling limits
# Response example (problem+json):
{
  "type": "about:blank",
  "title": "Request Entity Too Large",
  "status": 413,
  "detail": "Request body exceeds 5 MB limit"
}

Body Read Timeout (504)

When body reading exceeds the configured timeout, a 504 Gateway Timeout is returned. This is also typically formatted as problem+json.

Response Classes

Basic Response

The Response class handles text, HTML, and binary content:

from gobstopper.http.response import Response

@app.get("/")
async def index(request):
    # Simple text response
    return Response("Hello World")

@app.get("/html")
async def html_page(request):
    html = "<h1>Welcome</h1><p>This is HTML content</p>"
    return Response(html, content_type="text/html")

@app.get("/api/status")
async def status(request):
    # Custom status code
    return Response("Service Unavailable", status=503)

@app.get("/custom-headers")
async def custom_headers(request):
    # Add custom headers
    return Response("OK", headers={
        "X-Custom-Header": "CustomValue",
        "Cache-Control": "no-cache",
        "X-Rate-Limit": "100"
    })

JSON Response

The JSONResponse class automatically serializes Python objects:

from gobstopper.http.response import JSONResponse

@app.get("/api/users")
async def list_users(request):
    users = await get_all_users()
    return JSONResponse({
        "users": users,
        "count": len(users),
        "page": 1,
        "total_pages": 10
    })

@app.get("/api/user/<user_id>")
async def get_user(request, user_id):
    user = await find_user(user_id)
    
    if not user:
        return JSONResponse({"error": "User not found"}, status=404)
    
    return JSONResponse({"user": user})

# Complex data structures
@app.get("/api/analytics")
async def analytics(request):
    data = {
        "metrics": {
            "page_views": 15420,
            "unique_visitors": 8934,
            "bounce_rate": 0.34
        },
        "top_pages": [
            {"path": "/", "views": 5234},
            {"path": "/about", "views": 2156},
            {"path": "/contact", "views": 1876}
        ],
        "generated_at": datetime.utcnow().isoformat()
    }
    return JSONResponse(data)

File Response

The FileResponse class serves files with proper MIME types:

from gobstopper.http.response import FileResponse

@app.get("/download/<filename>")
async def download_file(request, filename):
    file_path = f"uploads/{filename}"
    
    # Check file exists and is safe
    if not os.path.exists(file_path) or ".." in filename:
        return JSONResponse({"error": "File not found"}, status=404)
    
    return FileResponse(file_path)

@app.get("/reports/<report_id>")
async def download_report(request, report_id):
    report_path = await generate_report(report_id)
    
    # Force specific filename
    return FileResponse(
        report_path, 
        filename=f"report_{report_id}.pdf"
    )

@app.get("/images/<image_id>")
async def serve_image(request, image_id):
    # Serve images with appropriate MIME types
    image_path = f"images/{image_id}.jpg"
    return FileResponse(image_path)

Stream Response

The StreamResponse class enables real-time data streaming:

from gobstopper.http.response import StreamResponse
import asyncio

@app.get("/events")
async def server_sent_events(request):
    """Server-Sent Events endpoint"""
    async def event_stream():
        # SSE headers
        yield "event: connected\n"
        yield "data: Connected to event stream\n\n"
        
        counter = 0
        while counter < 100:
            yield f"event: update\n"
            yield f"data: {{\"count\": {counter}, \"timestamp\": \"{datetime.utcnow().isoformat()}\"}}\n\n"
            counter += 1
            await asyncio.sleep(1)
        
        yield "event: complete\n"
        yield "data: Stream complete\n\n"
    
    return StreamResponse(
        event_stream(),
        content_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Access-Control-Allow-Origin": "*"
        }
    )

@app.get("/export/csv")
async def export_large_csv(request):
    """Stream large CSV export"""
    async def csv_generator():
        # CSV header
        yield "id,name,email,created_at\n"
        
        # Stream data in chunks
        async for batch in get_users_in_batches(batch_size=1000):
            for user in batch:
                yield f"{user.id},{user.name},{user.email},{user.created_at}\n"
    
    return StreamResponse(
        csv_generator(),
        content_type="text/csv",
        headers={
            "Content-Disposition": "attachment; filename=users.csv"
        }
    )

@app.get("/progress/<task_id>")
async def task_progress(request, task_id):
    """Stream task progress updates"""
    async def progress_stream():
        task = await get_task(task_id)
        
        while not task.is_complete:
            progress = await get_task_progress(task_id)
            yield f"data: {{\"progress\": {progress}, \"status\": \"{task.status}\"}}\n\n"
            await asyncio.sleep(0.5)
            task = await get_task(task_id)
        
        # Final status
        yield f"data: {{\"progress\": 100, \"status\": \"complete\", \"result\": {task.result}}}\n\n"
    
    return StreamResponse(
        progress_stream(),
        content_type="text/event-stream"
    )

Advanced Usage Patterns

Request Validation

from dataclasses import dataclass
from typing import Optional

@dataclass
class CreateUserRequest:
    name: str
    email: str
    age: Optional[int] = None

async def validate_user_request(request) -> CreateUserRequest:
    """Validate and parse user creation request"""
    data = await request.get_json()
    
    if not data:
        raise ValueError("Request body required")
    
    if "name" not in data or not data["name"].strip():
        raise ValueError("Name is required")
    
    if "email" not in data or "@" not in data["email"]:
        raise ValueError("Valid email is required")
    
    age = data.get("age")
    if age is not None:
        try:
            age = int(age)
            if age < 0 or age > 150:
                raise ValueError("Age must be between 0 and 150")
        except (ValueError, TypeError):
            raise ValueError("Age must be a valid integer")
    
    return CreateUserRequest(
        name=data["name"].strip(),
        email=data["email"].strip().lower(),
        age=age
    )

@app.post("/api/users")
async def create_user(request):
    try:
        user_data = await validate_user_request(request)
        user = await create_user_record(user_data)
        return JSONResponse({"user": user}, status=201)
    except ValueError as e:
        return JSONResponse({"error": str(e)}, status=400)

Content Negotiation

@app.get("/api/data")
async def get_data(request):
    """Return data in requested format"""
    data = await fetch_data()
    accept_header = request.headers.get("accept", "application/json")
    
    if "application/json" in accept_header:
        return JSONResponse(data)
    elif "text/csv" in accept_header:
        async def csv_generator():
            yield "id,name,value\n"
            for item in data:
                yield f"{item['id']},{item['name']},{item['value']}\n"
        return StreamResponse(csv_generator(), content_type="text/csv")
    elif "text/xml" in accept_header:
        xml_data = convert_to_xml(data)
        return Response(xml_data, content_type="application/xml")
    else:
        return JSONResponse({"error": "Unsupported content type"}, status=406)

Response Middleware

async def response_headers_middleware(request, next_handler):
    """Add security headers to all responses"""
    response = await next_handler(request)
    
    # Add security headers
    security_headers = {
        "X-Content-Type-Options": "nosniff",
        "X-Frame-Options": "DENY",
        "X-XSS-Protection": "1; mode=block",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains"
    }
    
    if hasattr(response, 'headers'):
        response.headers.update(security_headers)
    
    return response

app.add_middleware(response_headers_middleware, priority=50)

Performance Considerations

Request Parsing Optimization

  • Lazy Loading: Request data is only parsed when accessed

  • Caching: Parsed data is cached for subsequent access

  • Memory Efficiency: Large request bodies are handled efficiently

  • Async Operations: All parsing operations are async to avoid blocking

Response Optimization

  • JSON Serialization: Uses compact JSON format (no whitespace)

  • Content-Type Detection: Automatic MIME type detection

  • Streaming: Large responses can be streamed to reduce memory usage

  • Header Optimization: Efficient header handling for RSGI protocol

Best Practices

  1. Validate Early: Validate request data as early as possible

  2. Use Appropriate Response Types: Choose the right response class for your content

  3. Stream Large Data: Use StreamResponse for large datasets

  4. Handle Errors Gracefully: Provide meaningful error messages

  5. Security Headers: Add security headers through middleware

  6. Content Negotiation: Support multiple response formats when appropriate

Error Handling

@app.post("/api/process")
async def process_data(request):
    try:
        # Parse request
        data = await request.get_json()
        
        # Validate data
        if not data or "items" not in data:
            return JSONResponse(
                {"error": "Invalid request format", "code": "INVALID_FORMAT"}, 
                status=400
            )
        
        # Process data
        result = await process_items(data["items"])
        
        return JSONResponse({
            "success": True,
            "processed": len(data["items"]),
            "result": result
        })
        
    except json.JSONDecodeError:
        return JSONResponse(
            {"error": "Invalid JSON", "code": "JSON_DECODE_ERROR"}, 
            status=400
        )
    except UnicodeDecodeError:
        return JSONResponse(
            {"error": "Invalid encoding", "code": "ENCODING_ERROR"}, 
            status=400
        )
    except Exception as e:
        request.logger.error(f"Unexpected error: {e}", exc_info=True)
        return JSONResponse(
            {"error": "Internal server error", "code": "INTERNAL_ERROR"}, 
            status=500
        )

The HTTP request and response system in Gobstopper provides a robust foundation for building high-performance web applications with clean, intuitive APIs and efficient data handling.