HTTP Request and Response Handling¶
Gobstopper provides comprehensive HTTP request and response handling through RSGI protocol integration. The system supports async request parsing, multiple response types, and efficient data handling for high-performance applications.
Overview¶
The HTTP layer consists of:
Request: Async request parsing with lazy loading
Response: Flexible response generation with multiple content types
JSONResponse: JSON serialization and content type handling
FileResponse: Static file serving with MIME detection
StreamResponse: Streaming responses for real-time data
Request Class¶
The Request class provides async access to HTTP request data with lazy parsing for optimal performance.
Constructor¶
from gobstopper.http.request import Request
# Typically created automatically by Gobstopper
# request = Request(scope, protocol)
Parameters:
scope: RSGI request scope containing metadataprotocol: RSGI protocol instance for body reading
Properties¶
Basic Request Information¶
@app.get("/api/info")
async def request_info(request):
info = {
"method": request.method, # GET, POST, PUT, DELETE, etc.
"path": request.path, # /api/info
"query_string": request.query_string, # ?page=1&limit=10
"client_ip": request.client_ip, # Client IP (respects X-Forwarded-For)
"user_agent": request.headers.get("user-agent", "Unknown")
}
return info
Headers¶
Request headers are accessible as a case-insensitive dictionary:
@app.post("/api/data")
async def handle_data(request):
# Access headers
content_type = request.headers.get("content-type")
authorization = request.headers.get("authorization")
custom_header = request.headers.get("x-custom-header")
# Check authentication
if not authorization:
return {"error": "Authentication required"}, 401
return {"received": True}
Query Parameters¶
Query parameters are parsed into a dictionary of lists:
@app.get("/search")
async def search(request):
# URL: /search?q=python&category=web&category=api&page=2
query = request.args.get("q", [""])[0] # "python"
categories = request.args.get("category", []) # ["web", "api"]
page = int(request.args.get("page", ["1"])[0]) # 2
return {
"query": query,
"categories": categories,
"page": page
}
# Utility function for simpler access
def get_arg(request, key, default="", as_type=str):
"""Get single query parameter with type conversion"""
value = request.args.get(key, [default])[0]
try:
return as_type(value) if value != default else default
except (ValueError, TypeError):
return default
@app.get("/api/posts")
async def list_posts(request):
page = get_arg(request, "page", 1, int)
limit = get_arg(request, "limit", 10, int)
category = get_arg(request, "category", "all")
posts = await get_posts(page=page, limit=limit, category=category)
return {"posts": posts}
Async Data Parsing¶
JSON Data¶
Parse JSON request bodies asynchronously:
@app.post("/api/users")
async def create_user(request):
try:
data = await request.get_json()
if not data:
return {"error": "No data provided"}, 400
# Validate required fields
required_fields = ["name", "email"]
for field in required_fields:
if field not in data:
return {"error": f"{field} is required"}, 400
# Create user
user = await create_user_record(data)
return {"user": user, "created": True}
except ValueError:
return {"error": "Invalid JSON format"}, 400
except UnicodeDecodeError:
return {"error": "Invalid character encoding"}, 400
# Complex JSON handling
@app.post("/api/bulk-import")
async def bulk_import(request):
data = await request.get_json()
if not isinstance(data, dict) or "items" not in data:
return {"error": "Expected JSON object with 'items' array"}, 400
items = data["items"]
if not isinstance(items, list):
return {"error": "'items' must be an array"}, 400
results = []
for i, item in enumerate(items):
try:
result = await process_item(item)
results.append({"index": i, "status": "success", "result": result})
except Exception as e:
results.append({"index": i, "status": "error", "error": str(e)})
return {"results": results, "total": len(items)}
Form Data¶
Parse form-encoded data (application/x-www-form-urlencoded):
@app.post("/contact")
async def contact_form(request):
form = await request.get_form()
# Extract form fields
name = form.get("name", [""])[0]
email = form.get("email", [""])[0]
subject = form.get("subject", [""])[0]
message = form.get("message", [""])[0]
# Handle multiple values (checkboxes, multiple selects)
interests = form.get("interests", []) # Multiple checkboxes
# Validation
if not name or not email or not message:
return {"error": "Name, email, and message are required"}, 400
# Process form submission
await send_contact_email(name, email, subject, message, interests)
return {"success": True, "message": "Contact form submitted"}
# Form with file uploads (planned feature)
@app.post("/upload")
async def upload_file(request):
# Note: Multipart/form-data support is planned
# Current version only supports URL-encoded forms
form = await request.get_form()
description = form.get("description", [""])[0]
# File upload handling will be added in future versions
return {"error": "File uploads not yet supported"}, 501
Raw Body Data¶
Access raw request body bytes:
@app.post("/webhook")
async def webhook_handler(request):
# Get raw body for signature verification
body = await request.get_data()
# Verify webhook signature
signature = request.headers.get("x-signature")
if not verify_signature(body, signature):
return {"error": "Invalid signature"}, 401
# Process webhook
try:
data = json.loads(body.decode('utf-8'))
await process_webhook(data)
return {"status": "processed"}
except json.JSONDecodeError:
return {"error": "Invalid JSON in webhook payload"}, 400
@app.post("/api/binary-data")
async def handle_binary(request):
data = await request.get_data()
# Process binary data
if request.headers.get("content-type") == "application/octet-stream":
result = await process_binary_data(data)
return {"processed_bytes": len(data), "result": result}
else:
return {"error": "Expected binary data"}, 400
Client Information¶
IP Address Resolution¶
The client IP property respects proxy headers:
@app.get("/api/client-info")
async def client_info(request):
# Respects X-Forwarded-For header for proxied requests
client_ip = request.client_ip
# Additional client information
user_agent = request.headers.get("user-agent", "Unknown")
accept_language = request.headers.get("accept-language", "")
# Rate limiting example
if await is_rate_limited(client_ip):
return {"error": "Rate limit exceeded"}, 429
return {
"client_ip": client_ip,
"user_agent": user_agent,
"languages": accept_language.split(",")
}
Custom Attributes¶
Add custom attributes to request objects:
@app.before_request
async def authenticate_user(request):
"""Add user information to request"""
token = request.headers.get("authorization")
if token and token.startswith("Bearer "):
user = await verify_jwt_token(token[7:])
request.user = user
request.is_authenticated = True
else:
request.user = None
request.is_authenticated = False
@app.get("/api/profile")
async def get_profile(request):
if not request.is_authenticated:
return {"error": "Authentication required"}, 401
profile = await get_user_profile(request.user.id)
return {"profile": profile}
Problem Details (RFC 7807)¶
Gobstopper provides a helper for generating machine-readable error responses that conform to RFC 7807 using the media type application/problem+json.
Helper: gobstopper.http.problem.problem()
Typical fields: type, title, status, detail, instance, plus extension members
Example:
from gobstopper.http.problem import problem
@app.get("/api/items/<id>")
async def get_item(request, id: str):
item = await load_item(id)
if not item:
return problem(
status=404,
title="Item not found",
type="https://example.com/probs/not-found",
detail=f"No item with id {id}",
instance=request.path,
extra={"item_id": id}
)
return item
Notes:
Framework error handlers may also return problem+json for certain errors (e.g., 415/422 from typed parsers, 413 for oversized bodies).
Content negotiation utilities ensure the correct content type.
Request Too Large (413)¶
If a request body exceeds configured limits, the framework raises RequestTooLarge and returns a 413 response. When the LimitsMiddleware is enabled, this is enforced during body read.
# See Middleware docs for enabling limits
# Response example (problem+json):
{
"type": "about:blank",
"title": "Request Entity Too Large",
"status": 413,
"detail": "Request body exceeds 5 MB limit"
}
Body Read Timeout (504)¶
When body reading exceeds the configured timeout, a 504 Gateway Timeout is returned. This is also typically formatted as problem+json.
Response Classes¶
Basic Response¶
The Response class handles text, HTML, and binary content:
from gobstopper.http.response import Response
@app.get("/")
async def index(request):
# Simple text response
return Response("Hello World")
@app.get("/html")
async def html_page(request):
html = "<h1>Welcome</h1><p>This is HTML content</p>"
return Response(html, content_type="text/html")
@app.get("/api/status")
async def status(request):
# Custom status code
return Response("Service Unavailable", status=503)
@app.get("/custom-headers")
async def custom_headers(request):
# Add custom headers
return Response("OK", headers={
"X-Custom-Header": "CustomValue",
"Cache-Control": "no-cache",
"X-Rate-Limit": "100"
})
JSON Response¶
The JSONResponse class automatically serializes Python objects:
from gobstopper.http.response import JSONResponse
@app.get("/api/users")
async def list_users(request):
users = await get_all_users()
return JSONResponse({
"users": users,
"count": len(users),
"page": 1,
"total_pages": 10
})
@app.get("/api/user/<user_id>")
async def get_user(request, user_id):
user = await find_user(user_id)
if not user:
return JSONResponse({"error": "User not found"}, status=404)
return JSONResponse({"user": user})
# Complex data structures
@app.get("/api/analytics")
async def analytics(request):
data = {
"metrics": {
"page_views": 15420,
"unique_visitors": 8934,
"bounce_rate": 0.34
},
"top_pages": [
{"path": "/", "views": 5234},
{"path": "/about", "views": 2156},
{"path": "/contact", "views": 1876}
],
"generated_at": datetime.utcnow().isoformat()
}
return JSONResponse(data)
File Response¶
The FileResponse class serves files with proper MIME types:
from gobstopper.http.response import FileResponse
@app.get("/download/<filename>")
async def download_file(request, filename):
file_path = f"uploads/{filename}"
# Check file exists and is safe
if not os.path.exists(file_path) or ".." in filename:
return JSONResponse({"error": "File not found"}, status=404)
return FileResponse(file_path)
@app.get("/reports/<report_id>")
async def download_report(request, report_id):
report_path = await generate_report(report_id)
# Force specific filename
return FileResponse(
report_path,
filename=f"report_{report_id}.pdf"
)
@app.get("/images/<image_id>")
async def serve_image(request, image_id):
# Serve images with appropriate MIME types
image_path = f"images/{image_id}.jpg"
return FileResponse(image_path)
Stream Response¶
The StreamResponse class enables real-time data streaming:
from gobstopper.http.response import StreamResponse
import asyncio
@app.get("/events")
async def server_sent_events(request):
"""Server-Sent Events endpoint"""
async def event_stream():
# SSE headers
yield "event: connected\n"
yield "data: Connected to event stream\n\n"
counter = 0
while counter < 100:
yield f"event: update\n"
yield f"data: {{\"count\": {counter}, \"timestamp\": \"{datetime.utcnow().isoformat()}\"}}\n\n"
counter += 1
await asyncio.sleep(1)
yield "event: complete\n"
yield "data: Stream complete\n\n"
return StreamResponse(
event_stream(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*"
}
)
@app.get("/export/csv")
async def export_large_csv(request):
"""Stream large CSV export"""
async def csv_generator():
# CSV header
yield "id,name,email,created_at\n"
# Stream data in chunks
async for batch in get_users_in_batches(batch_size=1000):
for user in batch:
yield f"{user.id},{user.name},{user.email},{user.created_at}\n"
return StreamResponse(
csv_generator(),
content_type="text/csv",
headers={
"Content-Disposition": "attachment; filename=users.csv"
}
)
@app.get("/progress/<task_id>")
async def task_progress(request, task_id):
"""Stream task progress updates"""
async def progress_stream():
task = await get_task(task_id)
while not task.is_complete:
progress = await get_task_progress(task_id)
yield f"data: {{\"progress\": {progress}, \"status\": \"{task.status}\"}}\n\n"
await asyncio.sleep(0.5)
task = await get_task(task_id)
# Final status
yield f"data: {{\"progress\": 100, \"status\": \"complete\", \"result\": {task.result}}}\n\n"
return StreamResponse(
progress_stream(),
content_type="text/event-stream"
)
Advanced Usage Patterns¶
Request Validation¶
from dataclasses import dataclass
from typing import Optional
@dataclass
class CreateUserRequest:
name: str
email: str
age: Optional[int] = None
async def validate_user_request(request) -> CreateUserRequest:
"""Validate and parse user creation request"""
data = await request.get_json()
if not data:
raise ValueError("Request body required")
if "name" not in data or not data["name"].strip():
raise ValueError("Name is required")
if "email" not in data or "@" not in data["email"]:
raise ValueError("Valid email is required")
age = data.get("age")
if age is not None:
try:
age = int(age)
if age < 0 or age > 150:
raise ValueError("Age must be between 0 and 150")
except (ValueError, TypeError):
raise ValueError("Age must be a valid integer")
return CreateUserRequest(
name=data["name"].strip(),
email=data["email"].strip().lower(),
age=age
)
@app.post("/api/users")
async def create_user(request):
try:
user_data = await validate_user_request(request)
user = await create_user_record(user_data)
return JSONResponse({"user": user}, status=201)
except ValueError as e:
return JSONResponse({"error": str(e)}, status=400)
Content Negotiation¶
@app.get("/api/data")
async def get_data(request):
"""Return data in requested format"""
data = await fetch_data()
accept_header = request.headers.get("accept", "application/json")
if "application/json" in accept_header:
return JSONResponse(data)
elif "text/csv" in accept_header:
async def csv_generator():
yield "id,name,value\n"
for item in data:
yield f"{item['id']},{item['name']},{item['value']}\n"
return StreamResponse(csv_generator(), content_type="text/csv")
elif "text/xml" in accept_header:
xml_data = convert_to_xml(data)
return Response(xml_data, content_type="application/xml")
else:
return JSONResponse({"error": "Unsupported content type"}, status=406)
Response Middleware¶
async def response_headers_middleware(request, next_handler):
"""Add security headers to all responses"""
response = await next_handler(request)
# Add security headers
security_headers = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains"
}
if hasattr(response, 'headers'):
response.headers.update(security_headers)
return response
app.add_middleware(response_headers_middleware, priority=50)
Performance Considerations¶
Request Parsing Optimization¶
Lazy Loading: Request data is only parsed when accessed
Caching: Parsed data is cached for subsequent access
Memory Efficiency: Large request bodies are handled efficiently
Async Operations: All parsing operations are async to avoid blocking
Response Optimization¶
JSON Serialization: Uses compact JSON format (no whitespace)
Content-Type Detection: Automatic MIME type detection
Streaming: Large responses can be streamed to reduce memory usage
Header Optimization: Efficient header handling for RSGI protocol
Best Practices¶
Validate Early: Validate request data as early as possible
Use Appropriate Response Types: Choose the right response class for your content
Stream Large Data: Use StreamResponse for large datasets
Handle Errors Gracefully: Provide meaningful error messages
Security Headers: Add security headers through middleware
Content Negotiation: Support multiple response formats when appropriate
Error Handling¶
@app.post("/api/process")
async def process_data(request):
try:
# Parse request
data = await request.get_json()
# Validate data
if not data or "items" not in data:
return JSONResponse(
{"error": "Invalid request format", "code": "INVALID_FORMAT"},
status=400
)
# Process data
result = await process_items(data["items"])
return JSONResponse({
"success": True,
"processed": len(data["items"]),
"result": result
})
except json.JSONDecodeError:
return JSONResponse(
{"error": "Invalid JSON", "code": "JSON_DECODE_ERROR"},
status=400
)
except UnicodeDecodeError:
return JSONResponse(
{"error": "Invalid encoding", "code": "ENCODING_ERROR"},
status=400
)
except Exception as e:
request.logger.error(f"Unexpected error: {e}", exc_info=True)
return JSONResponse(
{"error": "Internal server error", "code": "INTERNAL_ERROR"},
status=500
)
The HTTP request and response system in Gobstopper provides a robust foundation for building high-performance web applications with clean, intuitive APIs and efficient data handling.