# Security Best Practices for Gobstopper This guide covers security best practices for building production-ready Gobstopper applications. Gobstopper includes built-in security features through the SecurityMiddleware, but proper configuration and usage are essential. ## Table of Contents - [Production Environment Setup](#production-environment-setup) - [CSRF Protection](#csrf-protection) - [Session Management](#session-management) - [Security Headers](#security-headers) - [Cookie Security](#cookie-security) - [Input Validation](#input-validation) - [Rate Limiting](#rate-limiting) - [Authentication & Authorization](#authentication--authorization) - [Secure File Uploads](#secure-file-uploads) - [Logging & Monitoring](#logging--monitoring) - [Security Checklist](#security-checklist) ## Production Environment Setup ### Environment Variable Always set the `ENV` environment variable to `production` in production deployments: ```bash export ENV=production ``` This automatically enforces secure defaults: - Cookie `Secure` flag (HTTPS only) - Cookie `HttpOnly` flag (prevents JavaScript access) - Cookie `SameSite=Strict` (prevents CSRF attacks) ### Secret Key Management **Never hardcode secret keys in your source code.** ```python # ❌ NEVER DO THIS security = SecurityMiddleware(secret_key='my-secret-key') # ✅ DO THIS import os security = SecurityMiddleware( secret_key=os.environ['GOBSTOPPER_SECRET_KEY'] ) ``` Generate strong secret keys: ```bash # Generate a secure secret key python -c "import secrets; print(secrets.token_urlsafe(32))" ``` Store secrets in: - Environment variables - Secret management services (AWS Secrets Manager, HashiCorp Vault, etc.) - Encrypted configuration files ### HTTPS Enforcement **Always use HTTPS in production.** The SecurityMiddleware sets HSTS headers, but your deployment must handle TLS termination: ```python security = SecurityMiddleware( secret_key=os.environ['GOBSTOPPER_SECRET_KEY'], enable_security_headers=True, hsts_max_age=31536000, # 1 year hsts_include_subdomains=True ) ``` **Deployment options:** - Use a reverse proxy (Nginx, Caddy) with TLS certificates - Use load balancer TLS termination (AWS ELB, GCP Load Balancer) - Use Let's Encrypt for free certificates - Consider using Caddy for automatic HTTPS ## CSRF Protection CSRF (Cross-Site Request Forgery) protection is enabled by default and validates tokens for all state-changing requests (POST, PUT, DELETE, PATCH). ### Basic CSRF Setup ```python from gobstopper import Gobstopper from gobstopper.middleware import SecurityMiddleware app = Gobstopper(__name__) security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], enable_csrf=True # Default ) app.add_middleware(security) ``` ### Including CSRF Tokens in Forms Generate and include CSRF tokens in your templates: ```python @app.get('/form') async def show_form(request): csrf_token = security.generate_csrf_token(request.session) return await app.render_template('form.html', csrf_token=csrf_token) ``` **Template (Jinja2):** ```html
``` ### CSRF for AJAX Requests For AJAX requests, include the token in the `X-CSRF-Token` header: ```javascript // Get token from meta tag or data attribute const csrfToken = document.querySelector('meta[name="csrf-token"]').content; fetch('/api/update', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-CSRF-Token': csrfToken }, body: JSON.stringify(data) }); ``` Template meta tag: ```html ``` ### Exempting Endpoints For public APIs that don't use session-based authentication, you may want to disable CSRF for specific endpoints. Currently, you can: 1. **Use a separate app instance** without SecurityMiddleware for API routes 2. **Implement route-level middleware** to skip CSRF validation 3. **Use token-based authentication** (JWT) instead of sessions ## Session Management ### Session Storage Backends Gobstopper supports multiple session storage backends: ```python from gobstopper.middleware import SecurityMiddleware from gobstopper.sessions.storage import ( FileSessionStorage, # File-based (default) MemorySessionStorage, # In-memory (dev only) SQLSessionStorage, # SQL database RedisSessionStorage # Redis cache ) from pathlib import Path # File-based (simple deployments) security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], session_storage=FileSessionStorage(Path('./sessions')) ) # Redis-based (production recommended) security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], session_storage=RedisSessionStorage( redis_url='redis://localhost:6379/0', prefix='gobstopper:session:' ) ) # SQL-based (database deployments) security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], session_storage=SQLSessionStorage( connection_string='postgresql://user:pass@localhost/db' ) ) ``` ### Session Configuration ```python security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], cookie_name='session_id', cookie_max_age=86400, # 24 hours rolling_sessions=True, # Extend session on each request sign_session_id=True # Sign session IDs to prevent tampering ) ``` ### Session Regeneration **Always regenerate session IDs after privilege escalation** (login, permission changes): ```python @app.post('/login') async def login(request): data = await request.json() # Validate credentials user = await authenticate(data['username'], data['password']) if not user: return JSONResponse({'error': 'Invalid credentials'}, status=401) # ✅ Regenerate session ID to prevent session fixation new_session_id = await security.regenerate_session_id(request) # Store user data in session request.session['user_id'] = user.id request.session['authenticated'] = True request.session['role'] = user.role # Set the new session cookie response = JSONResponse({'success': True}) signed_id = security.sign_cookie_value(new_session_id) response.set_cookie( security.cookie_name, signed_id, max_age=security.cookie_max_age, secure=security.cookie_secure, httponly=security.cookie_httponly, samesite=security.cookie_samesite, path=security.cookie_path ) return response ``` ### Session Destruction (Logout) ```python @app.post('/logout') async def logout(request): session_id = security.get_session_id(request) if session_id: # Destroy session data await security.destroy_session(session_id) # Clear the cookie response = JSONResponse({'success': True}) response.delete_cookie(security.cookie_name) return response ``` ### Session Timeouts Implement both absolute and idle timeouts: ```python import time @app.before_request async def check_session_timeout(request): """Check session expiration and idle timeout.""" if not hasattr(request, 'session') or not request.session: return now = time.time() # Absolute timeout: 24 hours from creation created_at = request.session.get('created_at') if not created_at: request.session['created_at'] = now created_at = now if now - created_at > 86400: # 24 hours session_id = security.get_session_id(request) if session_id: await security.destroy_session(session_id) request.session = {} return Response('Session expired', status=401) # Idle timeout: 30 minutes of inactivity last_activity = request.session.get('last_activity', created_at) if now - last_activity > 1800: # 30 minutes session_id = security.get_session_id(request) if session_id: await security.destroy_session(session_id) request.session = {} return Response('Session expired due to inactivity', status=401) # Update last activity time request.session['last_activity'] = now ``` ## Security Headers The SecurityMiddleware automatically adds comprehensive security headers. Configure them for your application: ```python security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], enable_security_headers=True, # HSTS: Force HTTPS hsts_max_age=31536000, # 1 year hsts_include_subdomains=True, # CSP: Prevent XSS attacks csp_policy=( "default-src 'self'; " "script-src 'self' https://cdn.example.com; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data: https:; " "font-src 'self' https://fonts.gstatic.com; " "connect-src 'self' https://api.example.com; " "frame-ancestors 'none'" ), # Referrer policy referrer_policy='strict-origin-when-cross-origin', # Cross-Origin policies coop_policy='same-origin', coep_policy='require-corp' ) ``` ### Content Security Policy (CSP) Customize CSP based on your application's needs: ```python # Strict CSP (recommended) csp_policy = "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self' data:" # Allow specific CDNs csp_policy = ( "default-src 'self'; " "script-src 'self' https://cdn.jsdelivr.net; " "style-src 'self' https://fonts.googleapis.com; " "font-src 'self' https://fonts.gstatic.com" ) # For development (relaxed, DO NOT use in production) csp_policy = "default-src 'self' 'unsafe-inline' 'unsafe-eval'" ``` **CSP Testing:** 1. Start with `Content-Security-Policy-Report-Only` header to test without blocking 2. Use CSP violation reporting to identify issues 3. Gradually tighten the policy ## Cookie Security ### Production Cookie Settings ```python security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], cookie_secure=True, # Require HTTPS (forced in production) cookie_httponly=True, # Prevent JavaScript access (forced in production) cookie_samesite='Strict', # Prevent CSRF (forced in production) cookie_path='/', cookie_max_age=86400, # 24 hours sign_session_id=True # Sign cookies to prevent tampering ) ``` ### Cookie Attributes Explained - **Secure**: Only send cookie over HTTPS (prevents man-in-the-middle attacks) - **HttpOnly**: Prevent JavaScript from accessing cookie (prevents XSS cookie theft) - **SameSite=Strict**: Only send cookie for same-site requests (prevents CSRF) - **SameSite=Lax**: Send cookie for same-site and top-level navigation - **SameSite=None**: Send cookie for all requests (requires Secure=True) **When to use SameSite values:** - `Strict`: Best security, but breaks some flows (OAuth redirects) - `Lax`: Good balance for most applications - `None`: Only for cross-site integrations (iframes, OAuth) ## Input Validation ### Request Body Size Limits Gobstopper includes built-in request body size limits to prevent DoS attacks: ```python from gobstopper.http.request import Request # Global limit is set in Request class # Default: 10MB for JSON, 100MB for multipart # Check body size in middleware @app.middleware async def enforce_size_limit(request, call_next): content_length = request.headers.get('content-length') if content_length and int(content_length) > 10_000_000: # 10MB return Response('Request too large', status=413) return await call_next(request) ``` ### JSON Validation Validate and sanitize all user input: ```python from dataclasses import dataclass from typing import Optional @dataclass class CreateUserRequest: username: str email: str age: Optional[int] = None @app.post('/users') async def create_user(request): try: data = await request.json() # Validate required fields if not data.get('username') or not data.get('email'): return JSONResponse( {'error': 'Missing required fields'}, status=400 ) # Validate types if not isinstance(data['username'], str): return JSONResponse( {'error': 'username must be a string'}, status=400 ) # Validate length if len(data['username']) > 50: return JSONResponse( {'error': 'username too long'}, status=400 ) # Validate format (email regex) import re if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', data['email']): return JSONResponse( {'error': 'Invalid email format'}, status=400 ) # Process validated data user = await create_user_in_db(data) return JSONResponse(user, status=201) except ValueError: return JSONResponse({'error': 'Invalid JSON'}, status=400) ``` ### Use OpenAPI for Validation For automatic validation, use the OpenAPI extension with type adapters: ```python from gobstopper.extensions.openapi import attach_openapi from gobstopper.extensions.openapi.decorators import doc, request_body, response from dataclasses import dataclass attach_openapi(app, title="My API", version="1.0.0") @dataclass class CreateUserRequest: username: str email: str age: int = 18 @app.post('/users') @doc(summary="Create user", tags=["Users"]) @request_body(model=CreateUserRequest, required=True) @response(201, description="User created") async def create_user(request): data = await request.json() # OpenAPI extension documents the schema # You still need to validate manually return JSONResponse(data, status=201) ``` ## Rate Limiting Protect your endpoints from abuse with rate limiting: ```python from gobstopper.utils.rate_limiter import TokenBucketLimiter, rate_limit # Create a rate limiter limiter = TokenBucketLimiter( rate=10, # 10 tokens per second capacity=100, # Burst capacity of 100 cost_per_request=1 # 1 token per request ) # Apply to specific routes @app.get('/api/data') @rate_limit(limiter) async def get_data(request): return JSONResponse({'data': 'sensitive'}) # Or apply globally via middleware @app.middleware async def rate_limit_middleware(request, call_next): client_ip = request.headers.get('x-forwarded-for', request.scope.get('client', [''])[0]) if not await limiter.allow(client_ip): return JSONResponse( {'error': 'Rate limit exceeded'}, status=429, headers={'Retry-After': '60'} ) return await call_next(request) ``` ### Per-User Rate Limiting ```python @app.get('/api/user-data') @rate_limit(limiter) async def get_user_data(request): user_id = request.session.get('user_id') if not user_id: return JSONResponse({'error': 'Unauthorized'}, status=401) # Use user_id as rate limit key key = f"user:{user_id}" if not await limiter.allow(key): return JSONResponse( {'error': 'Rate limit exceeded'}, status=429 ) return JSONResponse({'data': 'user-specific'}) ``` ## Authentication & Authorization ### JWT-Based Authentication ```python import jwt from datetime import datetime, timedelta SECRET_KEY = os.environ['JWT_SECRET_KEY'] def create_jwt_token(user_id: int, role: str) -> str: """Create a JWT token for a user.""" payload = { 'user_id': user_id, 'role': role, 'exp': datetime.utcnow() + timedelta(hours=24), 'iat': datetime.utcnow() } return jwt.encode(payload, SECRET_KEY, algorithm='HS256') def verify_jwt_token(token: str) -> dict: """Verify and decode a JWT token.""" try: return jwt.decode(token, SECRET_KEY, algorithms=['HS256']) except jwt.ExpiredSignatureError: raise ValueError('Token expired') except jwt.InvalidTokenError: raise ValueError('Invalid token') # Middleware for JWT authentication @app.middleware async def jwt_auth_middleware(request, call_next): # Skip auth for public endpoints if request.path in ['/login', '/register', '/health']: return await call_next(request) # Extract token from Authorization header auth_header = request.headers.get('authorization', '') if not auth_header.startswith('Bearer '): return JSONResponse({'error': 'Missing token'}, status=401) token = auth_header[7:] # Remove "Bearer " prefix try: payload = verify_jwt_token(token) request.state.user_id = payload['user_id'] request.state.role = payload['role'] except ValueError as e: return JSONResponse({'error': str(e)}, status=401) return await call_next(request) # Login endpoint @app.post('/login') async def login(request): data = await request.json() user = await authenticate(data['username'], data['password']) if not user: return JSONResponse({'error': 'Invalid credentials'}, status=401) token = create_jwt_token(user.id, user.role) return JSONResponse({'token': token}) ``` ### Role-Based Access Control (RBAC) ```python from functools import wraps def require_role(*roles): """Decorator to require specific roles.""" def decorator(func): @wraps(func) async def wrapper(request, *args, **kwargs): user_role = getattr(request.state, 'role', None) if user_role not in roles: return JSONResponse( {'error': 'Insufficient permissions'}, status=403 ) return await func(request, *args, **kwargs) return wrapper return decorator # Usage @app.delete('/users/') @require_role('admin', 'moderator') async def delete_user(request, user_id: int): await delete_user_from_db(user_id) return JSONResponse({'success': True}) ``` ## Secure File Uploads ### File Upload Validation ```python import os import hashlib from pathlib import Path ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'pdf'} MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB def allowed_file(filename: str) -> bool: """Check if file extension is allowed.""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.post('/upload') async def upload_file(request): # Get file from multipart form form = await request.get_form() file = form.get('file') if not file: return JSONResponse({'error': 'No file provided'}, status=400) filename = file.filename content = file.file.read() # Validate file size if len(content) > MAX_FILE_SIZE: return JSONResponse({'error': 'File too large'}, status=413) # Validate file extension if not allowed_file(filename): return JSONResponse({'error': 'File type not allowed'}, status=400) # Generate safe filename (prevent path traversal) safe_filename = hashlib.sha256( f"{filename}{time.time()}".encode() ).hexdigest()[:16] ext = filename.rsplit('.', 1)[1].lower() safe_filename = f"{safe_filename}.{ext}" # Save to secure location outside web root upload_dir = Path('/var/app/uploads') upload_dir.mkdir(exist_ok=True) file_path = upload_dir / safe_filename with open(file_path, 'wb') as f: f.write(content) return JSONResponse({ 'success': True, 'filename': safe_filename }) ``` ### File Download Security ```python @app.get('/download/') async def download_file(request, filename: str): # Validate filename (prevent path traversal) if '..' in filename or '/' in filename or '\\' in filename: return Response('Invalid filename', status=400) upload_dir = Path('/var/app/uploads') file_path = upload_dir / filename # Check if file exists and is within upload directory if not file_path.exists() or not str(file_path).startswith(str(upload_dir)): return Response('File not found', status=404) # Serve file return FileResponse( str(file_path), filename=filename, content_type='application/octet-stream' ) ``` ## Logging & Monitoring ### Security Event Logging ```python import logging from datetime import datetime # Configure security logger security_logger = logging.getLogger('security') security_logger.setLevel(logging.INFO) handler = logging.FileHandler('/var/log/app/security.log') handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) security_logger.addHandler(handler) # Log authentication events @app.post('/login') async def login(request): data = await request.json() username = data.get('username') client_ip = request.headers.get('x-forwarded-for', request.scope.get('client', ['unknown'])[0]) user = await authenticate(username, data.get('password')) if user: security_logger.info( f"Login success: user={username} ip={client_ip}" ) return JSONResponse({'success': True}) else: security_logger.warning( f"Login failed: user={username} ip={client_ip}" ) return JSONResponse({'error': 'Invalid credentials'}, status=401) # Log authorization failures @app.get('/admin/users') async def admin_users(request): user_role = request.session.get('role') if user_role != 'admin': security_logger.warning( f"Unauthorized access attempt: " f"user={request.session.get('user_id')} " f"role={user_role} path=/admin/users" ) return JSONResponse({'error': 'Forbidden'}, status=403) # ... admin logic ``` ### Monitor Security Metrics ```python from collections import defaultdict import time # Track failed login attempts failed_logins = defaultdict(list) @app.post('/login') async def login(request): data = await request.json() username = data.get('username') client_ip = request.headers.get('x-forwarded-for', 'unknown') # Check for brute force attempts recent_failures = [ t for t in failed_logins[username] if time.time() - t < 300 # Last 5 minutes ] if len(recent_failures) >= 5: security_logger.warning( f"Brute force detected: user={username} ip={client_ip}" ) return JSONResponse( {'error': 'Too many failed attempts'}, status=429 ) user = await authenticate(username, data.get('password')) if not user: failed_logins[username].append(time.time()) return JSONResponse({'error': 'Invalid credentials'}, status=401) # Clear failed attempts on success failed_logins[username] = [] # ... successful login logic ``` ## Security Checklist Use this checklist before deploying to production: ### Environment & Configuration - [ ] `ENV=production` is set - [ ] Strong, random secret key stored in environment variable - [ ] HTTPS is enforced via reverse proxy or load balancer - [ ] TLS certificates are valid and up-to-date - [ ] Sensitive data is not logged or exposed in error messages ### Middleware & Headers - [ ] SecurityMiddleware is enabled with production settings - [ ] CSRF protection is enabled for state-changing requests - [ ] Security headers are configured (HSTS, CSP, COOP, COEP) - [ ] Cookie flags are secure (Secure, HttpOnly, SameSite=Strict) - [ ] CORS is properly configured if needed ### Session Management - [ ] Session storage backend is production-ready (Redis/SQL, not Memory) - [ ] Session IDs are signed to prevent tampering - [ ] Session regeneration after authentication/privilege changes - [ ] Session timeout and idle timeout are implemented - [ ] Proper session cleanup on logout ### Authentication & Authorization - [ ] Strong password requirements are enforced - [ ] Passwords are hashed (bcrypt, Argon2, never plain text) - [ ] JWT tokens have reasonable expiration times - [ ] Role-based access control is implemented - [ ] API endpoints have proper authentication ### Input Validation - [ ] All user input is validated and sanitized - [ ] Request body size limits are enforced - [ ] File uploads are validated (type, size, content) - [ ] Path traversal attacks are prevented - [ ] SQL injection is prevented (use parameterized queries) ### Rate Limiting - [ ] Rate limiting is applied to sensitive endpoints - [ ] Authentication endpoints have brute-force protection - [ ] API endpoints have appropriate rate limits ### Monitoring & Logging - [ ] Security events are logged (login, authorization failures) - [ ] Failed authentication attempts are monitored - [ ] Logs are stored securely and rotated - [ ] Alerts are configured for security events - [ ] Regular security audits are performed ### Dependencies & Updates - [ ] All dependencies are up-to-date - [ ] Security advisories are monitored - [ ] Regular dependency updates are performed - [ ] Automated vulnerability scanning is enabled ### Testing - [ ] Security tests are included in test suite - [ ] CSRF protection is tested - [ ] Authentication and authorization are tested - [ ] Input validation is tested with malicious inputs - [ ] Penetration testing is performed before launch ## Additional Resources - [OWASP Top 10](https://owasp.org/www-project-top-ten/) - [OWASP Cheat Sheet Series](https://cheatsheetseries.owasp.org/) - [Mozilla Web Security Guidelines](https://infosec.mozilla.org/guidelines/web_security) - [Security Headers Best Practices](https://securityheaders.com/) - [CSP Evaluator](https://csp-evaluator.withgoogle.com/) ## Getting Help If you discover security vulnerabilities in Gobstopper: 1. **Do not** open a public GitHub issue 2. Email security concerns to the maintainers 3. Allow reasonable time for a fix before public disclosure For security questions about your application: - Review OWASP guidelines - Consult with security professionals - Perform regular security audits - Stay informed about emerging threats