# HTTP Request and Response Handling Gobstopper provides comprehensive HTTP request and response handling through RSGI protocol integration. The system supports async request parsing, multiple response types, and efficient data handling for high-performance applications. ## Overview The HTTP layer consists of: - **Request**: Async request parsing with lazy loading - **Response**: Flexible response generation with multiple content types - **JSONResponse**: JSON serialization and content type handling - **FileResponse**: Static file serving with MIME detection - **StreamResponse**: Streaming responses for real-time data ## Request Class The `Request` class provides async access to HTTP request data with lazy parsing for optimal performance. ### Constructor ```python from gobstopper.http.request import Request # Typically created automatically by Gobstopper # request = Request(scope, protocol) ``` **Parameters:** - `scope`: RSGI request scope containing metadata - `protocol`: RSGI protocol instance for body reading ### Properties #### Basic Request Information ```python @app.get("/api/info") async def request_info(request): info = { "method": request.method, # GET, POST, PUT, DELETE, etc. "path": request.path, # /api/info "query_string": request.query_string, # ?page=1&limit=10 "client_ip": request.client_ip, # Client IP (respects X-Forwarded-For) "user_agent": request.headers.get("user-agent", "Unknown") } return info ``` #### Headers Request headers are accessible as a case-insensitive dictionary: ```python @app.post("/api/data") async def handle_data(request): # Access headers content_type = request.headers.get("content-type") authorization = request.headers.get("authorization") custom_header = request.headers.get("x-custom-header") # Check authentication if not authorization: return {"error": "Authentication required"}, 401 return {"received": True} ``` #### Query Parameters Query parameters are parsed into a dictionary of lists: ```python @app.get("/search") async def search(request): # URL: /search?q=python&category=web&category=api&page=2 query = request.args.get("q", [""])[0] # "python" categories = request.args.get("category", []) # ["web", "api"] page = int(request.args.get("page", ["1"])[0]) # 2 return { "query": query, "categories": categories, "page": page } # Utility function for simpler access def get_arg(request, key, default="", as_type=str): """Get single query parameter with type conversion""" value = request.args.get(key, [default])[0] try: return as_type(value) if value != default else default except (ValueError, TypeError): return default @app.get("/api/posts") async def list_posts(request): page = get_arg(request, "page", 1, int) limit = get_arg(request, "limit", 10, int) category = get_arg(request, "category", "all") posts = await get_posts(page=page, limit=limit, category=category) return {"posts": posts} ``` ### Async Data Parsing #### JSON Data Parse JSON request bodies asynchronously: ```python @app.post("/api/users") async def create_user(request): try: data = await request.get_json() if not data: return {"error": "No data provided"}, 400 # Validate required fields required_fields = ["name", "email"] for field in required_fields: if field not in data: return {"error": f"{field} is required"}, 400 # Create user user = await create_user_record(data) return {"user": user, "created": True} except ValueError: return {"error": "Invalid JSON format"}, 400 except UnicodeDecodeError: return {"error": "Invalid character encoding"}, 400 # Complex JSON handling @app.post("/api/bulk-import") async def bulk_import(request): data = await request.get_json() if not isinstance(data, dict) or "items" not in data: return {"error": "Expected JSON object with 'items' array"}, 400 items = data["items"] if not isinstance(items, list): return {"error": "'items' must be an array"}, 400 results = [] for i, item in enumerate(items): try: result = await process_item(item) results.append({"index": i, "status": "success", "result": result}) except Exception as e: results.append({"index": i, "status": "error", "error": str(e)}) return {"results": results, "total": len(items)} ``` #### Form Data Parse form-encoded data (application/x-www-form-urlencoded): ```python @app.post("/contact") async def contact_form(request): form = await request.get_form() # Extract form fields name = form.get("name", [""])[0] email = form.get("email", [""])[0] subject = form.get("subject", [""])[0] message = form.get("message", [""])[0] # Handle multiple values (checkboxes, multiple selects) interests = form.get("interests", []) # Multiple checkboxes # Validation if not name or not email or not message: return {"error": "Name, email, and message are required"}, 400 # Process form submission await send_contact_email(name, email, subject, message, interests) return {"success": True, "message": "Contact form submitted"} # Form with file uploads (planned feature) @app.post("/upload") async def upload_file(request): # Note: Multipart/form-data support is planned # Current version only supports URL-encoded forms form = await request.get_form() description = form.get("description", [""])[0] # File upload handling will be added in future versions return {"error": "File uploads not yet supported"}, 501 ``` #### Raw Body Data Access raw request body bytes: ```python @app.post("/webhook") async def webhook_handler(request): # Get raw body for signature verification body = await request.get_data() # Verify webhook signature signature = request.headers.get("x-signature") if not verify_signature(body, signature): return {"error": "Invalid signature"}, 401 # Process webhook try: data = json.loads(body.decode('utf-8')) await process_webhook(data) return {"status": "processed"} except json.JSONDecodeError: return {"error": "Invalid JSON in webhook payload"}, 400 @app.post("/api/binary-data") async def handle_binary(request): data = await request.get_data() # Process binary data if request.headers.get("content-type") == "application/octet-stream": result = await process_binary_data(data) return {"processed_bytes": len(data), "result": result} else: return {"error": "Expected binary data"}, 400 ``` ### Client Information #### IP Address Resolution The client IP property respects proxy headers: ```python @app.get("/api/client-info") async def client_info(request): # Respects X-Forwarded-For header for proxied requests client_ip = request.client_ip # Additional client information user_agent = request.headers.get("user-agent", "Unknown") accept_language = request.headers.get("accept-language", "") # Rate limiting example if await is_rate_limited(client_ip): return {"error": "Rate limit exceeded"}, 429 return { "client_ip": client_ip, "user_agent": user_agent, "languages": accept_language.split(",") } ``` #### Custom Attributes Add custom attributes to request objects: ```python @app.before_request async def authenticate_user(request): """Add user information to request""" token = request.headers.get("authorization") if token and token.startswith("Bearer "): user = await verify_jwt_token(token[7:]) request.user = user request.is_authenticated = True else: request.user = None request.is_authenticated = False @app.get("/api/profile") async def get_profile(request): if not request.is_authenticated: return {"error": "Authentication required"}, 401 profile = await get_user_profile(request.user.id) return {"profile": profile} ``` ## Problem Details (RFC 7807) Gobstopper provides a helper for generating machine-readable error responses that conform to RFC 7807 using the media type `application/problem+json`. - Helper: gobstopper.http.problem.problem() - Typical fields: type, title, status, detail, instance, plus extension members Example: ```python from gobstopper.http.problem import problem @app.get("/api/items/") async def get_item(request, id: str): item = await load_item(id) if not item: return problem( status=404, title="Item not found", type="https://example.com/probs/not-found", detail=f"No item with id {id}", instance=request.path, extra={"item_id": id} ) return item ``` Notes: - Framework error handlers may also return problem+json for certain errors (e.g., 415/422 from typed parsers, 413 for oversized bodies). - Content negotiation utilities ensure the correct content type. ### Request Too Large (413) If a request body exceeds configured limits, the framework raises RequestTooLarge and returns a 413 response. When the LimitsMiddleware is enabled, this is enforced during body read. ```python # See Middleware docs for enabling limits # Response example (problem+json): { "type": "about:blank", "title": "Request Entity Too Large", "status": 413, "detail": "Request body exceeds 5 MB limit" } ``` ### Body Read Timeout (504) When body reading exceeds the configured timeout, a 504 Gateway Timeout is returned. This is also typically formatted as problem+json. ## Response Classes ### Basic Response The `Response` class handles text, HTML, and binary content: ```python from gobstopper.http.response import Response @app.get("/") async def index(request): # Simple text response return Response("Hello World") @app.get("/html") async def html_page(request): html = "

Welcome

This is HTML content

" return Response(html, content_type="text/html") @app.get("/api/status") async def status(request): # Custom status code return Response("Service Unavailable", status=503) @app.get("/custom-headers") async def custom_headers(request): # Add custom headers return Response("OK", headers={ "X-Custom-Header": "CustomValue", "Cache-Control": "no-cache", "X-Rate-Limit": "100" }) ``` ### JSON Response The `JSONResponse` class automatically serializes Python objects: ```python from gobstopper.http.response import JSONResponse @app.get("/api/users") async def list_users(request): users = await get_all_users() return JSONResponse({ "users": users, "count": len(users), "page": 1, "total_pages": 10 }) @app.get("/api/user/") async def get_user(request, user_id): user = await find_user(user_id) if not user: return JSONResponse({"error": "User not found"}, status=404) return JSONResponse({"user": user}) # Complex data structures @app.get("/api/analytics") async def analytics(request): data = { "metrics": { "page_views": 15420, "unique_visitors": 8934, "bounce_rate": 0.34 }, "top_pages": [ {"path": "/", "views": 5234}, {"path": "/about", "views": 2156}, {"path": "/contact", "views": 1876} ], "generated_at": datetime.utcnow().isoformat() } return JSONResponse(data) ``` ### File Response The `FileResponse` class serves files with proper MIME types: ```python from gobstopper.http.response import FileResponse @app.get("/download/") async def download_file(request, filename): file_path = f"uploads/{filename}" # Check file exists and is safe if not os.path.exists(file_path) or ".." in filename: return JSONResponse({"error": "File not found"}, status=404) return FileResponse(file_path) @app.get("/reports/") async def download_report(request, report_id): report_path = await generate_report(report_id) # Force specific filename return FileResponse( report_path, filename=f"report_{report_id}.pdf" ) @app.get("/images/") async def serve_image(request, image_id): # Serve images with appropriate MIME types image_path = f"images/{image_id}.jpg" return FileResponse(image_path) ``` ### Stream Response The `StreamResponse` class enables real-time data streaming: ```python from gobstopper.http.response import StreamResponse import asyncio @app.get("/events") async def server_sent_events(request): """Server-Sent Events endpoint""" async def event_stream(): # SSE headers yield "event: connected\n" yield "data: Connected to event stream\n\n" counter = 0 while counter < 100: yield f"event: update\n" yield f"data: {{\"count\": {counter}, \"timestamp\": \"{datetime.utcnow().isoformat()}\"}}\n\n" counter += 1 await asyncio.sleep(1) yield "event: complete\n" yield "data: Stream complete\n\n" return StreamResponse( event_stream(), content_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*" } ) @app.get("/export/csv") async def export_large_csv(request): """Stream large CSV export""" async def csv_generator(): # CSV header yield "id,name,email,created_at\n" # Stream data in chunks async for batch in get_users_in_batches(batch_size=1000): for user in batch: yield f"{user.id},{user.name},{user.email},{user.created_at}\n" return StreamResponse( csv_generator(), content_type="text/csv", headers={ "Content-Disposition": "attachment; filename=users.csv" } ) @app.get("/progress/") async def task_progress(request, task_id): """Stream task progress updates""" async def progress_stream(): task = await get_task(task_id) while not task.is_complete: progress = await get_task_progress(task_id) yield f"data: {{\"progress\": {progress}, \"status\": \"{task.status}\"}}\n\n" await asyncio.sleep(0.5) task = await get_task(task_id) # Final status yield f"data: {{\"progress\": 100, \"status\": \"complete\", \"result\": {task.result}}}\n\n" return StreamResponse( progress_stream(), content_type="text/event-stream" ) ``` ## Advanced Usage Patterns ### Request Validation ```python from dataclasses import dataclass from typing import Optional @dataclass class CreateUserRequest: name: str email: str age: Optional[int] = None async def validate_user_request(request) -> CreateUserRequest: """Validate and parse user creation request""" data = await request.get_json() if not data: raise ValueError("Request body required") if "name" not in data or not data["name"].strip(): raise ValueError("Name is required") if "email" not in data or "@" not in data["email"]: raise ValueError("Valid email is required") age = data.get("age") if age is not None: try: age = int(age) if age < 0 or age > 150: raise ValueError("Age must be between 0 and 150") except (ValueError, TypeError): raise ValueError("Age must be a valid integer") return CreateUserRequest( name=data["name"].strip(), email=data["email"].strip().lower(), age=age ) @app.post("/api/users") async def create_user(request): try: user_data = await validate_user_request(request) user = await create_user_record(user_data) return JSONResponse({"user": user}, status=201) except ValueError as e: return JSONResponse({"error": str(e)}, status=400) ``` ### Content Negotiation ```python @app.get("/api/data") async def get_data(request): """Return data in requested format""" data = await fetch_data() accept_header = request.headers.get("accept", "application/json") if "application/json" in accept_header: return JSONResponse(data) elif "text/csv" in accept_header: async def csv_generator(): yield "id,name,value\n" for item in data: yield f"{item['id']},{item['name']},{item['value']}\n" return StreamResponse(csv_generator(), content_type="text/csv") elif "text/xml" in accept_header: xml_data = convert_to_xml(data) return Response(xml_data, content_type="application/xml") else: return JSONResponse({"error": "Unsupported content type"}, status=406) ``` ### Response Middleware ```python async def response_headers_middleware(request, next_handler): """Add security headers to all responses""" response = await next_handler(request) # Add security headers security_headers = { "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", "X-XSS-Protection": "1; mode=block", "Strict-Transport-Security": "max-age=31536000; includeSubDomains" } if hasattr(response, 'headers'): response.headers.update(security_headers) return response app.add_middleware(response_headers_middleware, priority=50) ``` ## Performance Considerations ### Request Parsing Optimization - **Lazy Loading**: Request data is only parsed when accessed - **Caching**: Parsed data is cached for subsequent access - **Memory Efficiency**: Large request bodies are handled efficiently - **Async Operations**: All parsing operations are async to avoid blocking ### Response Optimization - **JSON Serialization**: Uses compact JSON format (no whitespace) - **Content-Type Detection**: Automatic MIME type detection - **Streaming**: Large responses can be streamed to reduce memory usage - **Header Optimization**: Efficient header handling for RSGI protocol ### Best Practices 1. **Validate Early**: Validate request data as early as possible 2. **Use Appropriate Response Types**: Choose the right response class for your content 3. **Stream Large Data**: Use StreamResponse for large datasets 4. **Handle Errors Gracefully**: Provide meaningful error messages 5. **Security Headers**: Add security headers through middleware 6. **Content Negotiation**: Support multiple response formats when appropriate ## Error Handling ```python @app.post("/api/process") async def process_data(request): try: # Parse request data = await request.get_json() # Validate data if not data or "items" not in data: return JSONResponse( {"error": "Invalid request format", "code": "INVALID_FORMAT"}, status=400 ) # Process data result = await process_items(data["items"]) return JSONResponse({ "success": True, "processed": len(data["items"]), "result": result }) except json.JSONDecodeError: return JSONResponse( {"error": "Invalid JSON", "code": "JSON_DECODE_ERROR"}, status=400 ) except UnicodeDecodeError: return JSONResponse( {"error": "Invalid encoding", "code": "ENCODING_ERROR"}, status=400 ) except Exception as e: request.logger.error(f"Unexpected error: {e}", exc_info=True) return JSONResponse( {"error": "Internal server error", "code": "INTERNAL_ERROR"}, status=500 ) ``` The HTTP request and response system in Gobstopper provides a robust foundation for building high-performance web applications with clean, intuitive APIs and efficient data handling.