Security Best Practices for Gobstopper¶
This guide covers security best practices for building production-ready Gobstopper applications. Gobstopper includes built-in security features through the SecurityMiddleware, but proper configuration and usage are essential.
Table of Contents¶
Production Environment Setup¶
Environment Variable¶
Always set the ENV environment variable to production in production deployments:
export ENV=production
This automatically enforces secure defaults:
Cookie
Secureflag (HTTPS only)Cookie
HttpOnlyflag (prevents JavaScript access)Cookie
SameSite=Strict(prevents CSRF attacks)
Secret Key Management¶
Never hardcode secret keys in your source code.
# ❌ NEVER DO THIS
security = SecurityMiddleware(secret_key='my-secret-key')
# ✅ DO THIS
import os
security = SecurityMiddleware(
secret_key=os.environ['GOBSTOPPER_SECRET_KEY']
)
Generate strong secret keys:
# Generate a secure secret key
python -c "import secrets; print(secrets.token_urlsafe(32))"
Store secrets in:
Environment variables
Secret management services (AWS Secrets Manager, HashiCorp Vault, etc.)
Encrypted configuration files
HTTPS Enforcement¶
Always use HTTPS in production. The SecurityMiddleware sets HSTS headers, but your deployment must handle TLS termination:
security = SecurityMiddleware(
secret_key=os.environ['GOBSTOPPER_SECRET_KEY'],
enable_security_headers=True,
hsts_max_age=31536000, # 1 year
hsts_include_subdomains=True
)
Deployment options:
Use a reverse proxy (Nginx, Caddy) with TLS certificates
Use load balancer TLS termination (AWS ELB, GCP Load Balancer)
Use Let’s Encrypt for free certificates
Consider using Caddy for automatic HTTPS
CSRF Protection¶
CSRF (Cross-Site Request Forgery) protection is enabled by default and validates tokens for all state-changing requests (POST, PUT, DELETE, PATCH).
Basic CSRF Setup¶
from gobstopper import Gobstopper
from gobstopper.middleware import SecurityMiddleware
app = Gobstopper(__name__)
security = SecurityMiddleware(
secret_key=os.environ['SECRET_KEY'],
enable_csrf=True # Default
)
app.add_middleware(security)
Including CSRF Tokens in Forms¶
Generate and include CSRF tokens in your templates:
@app.get('/form')
async def show_form(request):
csrf_token = security.generate_csrf_token(request.session)
return await app.render_template('form.html', csrf_token=csrf_token)
Template (Jinja2):
<form method="POST" action="/submit">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<input type="text" name="username">
<button type="submit">Submit</button>
</form>
CSRF for AJAX Requests¶
For AJAX requests, include the token in the X-CSRF-Token header:
// Get token from meta tag or data attribute
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;
fetch('/api/update', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrfToken
},
body: JSON.stringify(data)
});
Template meta tag:
<meta name="csrf-token" content="{{ csrf_token }}">
Exempting Endpoints¶
For public APIs that don’t use session-based authentication, you may want to disable CSRF for specific endpoints. Currently, you can:
Use a separate app instance without SecurityMiddleware for API routes
Implement route-level middleware to skip CSRF validation
Use token-based authentication (JWT) instead of sessions
Session Management¶
Session Storage Backends¶
Gobstopper supports multiple session storage backends:
from gobstopper.middleware import SecurityMiddleware
from gobstopper.sessions.storage import (
FileSessionStorage, # File-based (default)
MemorySessionStorage, # In-memory (dev only)
SQLSessionStorage, # SQL database
RedisSessionStorage # Redis cache
)
from pathlib import Path
# File-based (simple deployments)
security = SecurityMiddleware(
secret_key=os.environ['SECRET_KEY'],
session_storage=FileSessionStorage(Path('./sessions'))
)
# Redis-based (production recommended)
security = SecurityMiddleware(
secret_key=os.environ['SECRET_KEY'],
session_storage=RedisSessionStorage(
redis_url='redis://localhost:6379/0',
prefix='gobstopper:session:'
)
)
# SQL-based (database deployments)
security = SecurityMiddleware(
secret_key=os.environ['SECRET_KEY'],
session_storage=SQLSessionStorage(
connection_string='postgresql://user:pass@localhost/db'
)
)
Session Configuration¶
security = SecurityMiddleware(
secret_key=os.environ['SECRET_KEY'],
cookie_name='session_id',
cookie_max_age=86400, # 24 hours
rolling_sessions=True, # Extend session on each request
sign_session_id=True # Sign session IDs to prevent tampering
)
Session Regeneration¶
Always regenerate session IDs after privilege escalation (login, permission changes):
@app.post('/login')
async def login(request):
data = await request.json()
# Validate credentials
user = await authenticate(data['username'], data['password'])
if not user:
return JSONResponse({'error': 'Invalid credentials'}, status=401)
# ✅ Regenerate session ID to prevent session fixation
new_session_id = await security.regenerate_session_id(request)
# Store user data in session
request.session['user_id'] = user.id
request.session['authenticated'] = True
request.session['role'] = user.role
# Set the new session cookie
response = JSONResponse({'success': True})
signed_id = security.sign_cookie_value(new_session_id)
response.set_cookie(
security.cookie_name,
signed_id,
max_age=security.cookie_max_age,
secure=security.cookie_secure,
httponly=security.cookie_httponly,
samesite=security.cookie_samesite,
path=security.cookie_path
)
return response
Session Destruction (Logout)¶
@app.post('/logout')
async def logout(request):
session_id = security.get_session_id(request)
if session_id:
# Destroy session data
await security.destroy_session(session_id)
# Clear the cookie
response = JSONResponse({'success': True})
response.delete_cookie(security.cookie_name)
return response
Session Timeouts¶
Implement both absolute and idle timeouts:
import time
@app.before_request
async def check_session_timeout(request):
"""Check session expiration and idle timeout."""
if not hasattr(request, 'session') or not request.session:
return
now = time.time()
# Absolute timeout: 24 hours from creation
created_at = request.session.get('created_at')
if not created_at:
request.session['created_at'] = now
created_at = now
if now - created_at > 86400: # 24 hours
session_id = security.get_session_id(request)
if session_id:
await security.destroy_session(session_id)
request.session = {}
return Response('Session expired', status=401)
# Idle timeout: 30 minutes of inactivity
last_activity = request.session.get('last_activity', created_at)
if now - last_activity > 1800: # 30 minutes
session_id = security.get_session_id(request)
if session_id:
await security.destroy_session(session_id)
request.session = {}
return Response('Session expired due to inactivity', status=401)
# Update last activity time
request.session['last_activity'] = now
Security Headers¶
The SecurityMiddleware automatically adds comprehensive security headers. Configure them for your application:
security = SecurityMiddleware(
secret_key=os.environ['SECRET_KEY'],
enable_security_headers=True,
# HSTS: Force HTTPS
hsts_max_age=31536000, # 1 year
hsts_include_subdomains=True,
# CSP: Prevent XSS attacks
csp_policy=(
"default-src 'self'; "
"script-src 'self' https://cdn.example.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self' https://fonts.gstatic.com; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none'"
),
# Referrer policy
referrer_policy='strict-origin-when-cross-origin',
# Cross-Origin policies
coop_policy='same-origin',
coep_policy='require-corp'
)
Content Security Policy (CSP)¶
Customize CSP based on your application’s needs:
# Strict CSP (recommended)
csp_policy = "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self' data:"
# Allow specific CDNs
csp_policy = (
"default-src 'self'; "
"script-src 'self' https://cdn.jsdelivr.net; "
"style-src 'self' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com"
)
# For development (relaxed, DO NOT use in production)
csp_policy = "default-src 'self' 'unsafe-inline' 'unsafe-eval'"
CSP Testing:
Start with
Content-Security-Policy-Report-Onlyheader to test without blockingUse CSP violation reporting to identify issues
Gradually tighten the policy
Input Validation¶
Request Body Size Limits¶
Gobstopper includes built-in request body size limits to prevent DoS attacks:
from gobstopper.http.request import Request
# Global limit is set in Request class
# Default: 10MB for JSON, 100MB for multipart
# Check body size in middleware
@app.middleware
async def enforce_size_limit(request, call_next):
content_length = request.headers.get('content-length')
if content_length and int(content_length) > 10_000_000: # 10MB
return Response('Request too large', status=413)
return await call_next(request)
JSON Validation¶
Validate and sanitize all user input:
from dataclasses import dataclass
from typing import Optional
@dataclass
class CreateUserRequest:
username: str
email: str
age: Optional[int] = None
@app.post('/users')
async def create_user(request):
try:
data = await request.json()
# Validate required fields
if not data.get('username') or not data.get('email'):
return JSONResponse(
{'error': 'Missing required fields'},
status=400
)
# Validate types
if not isinstance(data['username'], str):
return JSONResponse(
{'error': 'username must be a string'},
status=400
)
# Validate length
if len(data['username']) > 50:
return JSONResponse(
{'error': 'username too long'},
status=400
)
# Validate format (email regex)
import re
if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', data['email']):
return JSONResponse(
{'error': 'Invalid email format'},
status=400
)
# Process validated data
user = await create_user_in_db(data)
return JSONResponse(user, status=201)
except ValueError:
return JSONResponse({'error': 'Invalid JSON'}, status=400)
Use OpenAPI for Validation¶
For automatic validation, use the OpenAPI extension with type adapters:
from gobstopper.extensions.openapi import attach_openapi
from gobstopper.extensions.openapi.decorators import doc, request_body, response
from dataclasses import dataclass
attach_openapi(app, title="My API", version="1.0.0")
@dataclass
class CreateUserRequest:
username: str
email: str
age: int = 18
@app.post('/users')
@doc(summary="Create user", tags=["Users"])
@request_body(model=CreateUserRequest, required=True)
@response(201, description="User created")
async def create_user(request):
data = await request.json()
# OpenAPI extension documents the schema
# You still need to validate manually
return JSONResponse(data, status=201)
Rate Limiting¶
Protect your endpoints from abuse with rate limiting:
from gobstopper.utils.rate_limiter import TokenBucketLimiter, rate_limit
# Create a rate limiter
limiter = TokenBucketLimiter(
rate=10, # 10 tokens per second
capacity=100, # Burst capacity of 100
cost_per_request=1 # 1 token per request
)
# Apply to specific routes
@app.get('/api/data')
@rate_limit(limiter)
async def get_data(request):
return JSONResponse({'data': 'sensitive'})
# Or apply globally via middleware
@app.middleware
async def rate_limit_middleware(request, call_next):
client_ip = request.headers.get('x-forwarded-for',
request.scope.get('client', [''])[0])
if not await limiter.allow(client_ip):
return JSONResponse(
{'error': 'Rate limit exceeded'},
status=429,
headers={'Retry-After': '60'}
)
return await call_next(request)
Per-User Rate Limiting¶
@app.get('/api/user-data')
@rate_limit(limiter)
async def get_user_data(request):
user_id = request.session.get('user_id')
if not user_id:
return JSONResponse({'error': 'Unauthorized'}, status=401)
# Use user_id as rate limit key
key = f"user:{user_id}"
if not await limiter.allow(key):
return JSONResponse(
{'error': 'Rate limit exceeded'},
status=429
)
return JSONResponse({'data': 'user-specific'})
Secure File Uploads¶
File Upload Validation¶
import os
import hashlib
from pathlib import Path
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'pdf'}
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
def allowed_file(filename: str) -> bool:
"""Check if file extension is allowed."""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.post('/upload')
async def upload_file(request):
# Get file from multipart form
form = await request.get_form()
file = form.get('file')
if not file:
return JSONResponse({'error': 'No file provided'}, status=400)
filename = file.filename
content = file.file.read()
# Validate file size
if len(content) > MAX_FILE_SIZE:
return JSONResponse({'error': 'File too large'}, status=413)
# Validate file extension
if not allowed_file(filename):
return JSONResponse({'error': 'File type not allowed'}, status=400)
# Generate safe filename (prevent path traversal)
safe_filename = hashlib.sha256(
f"{filename}{time.time()}".encode()
).hexdigest()[:16]
ext = filename.rsplit('.', 1)[1].lower()
safe_filename = f"{safe_filename}.{ext}"
# Save to secure location outside web root
upload_dir = Path('/var/app/uploads')
upload_dir.mkdir(exist_ok=True)
file_path = upload_dir / safe_filename
with open(file_path, 'wb') as f:
f.write(content)
return JSONResponse({
'success': True,
'filename': safe_filename
})
File Download Security¶
@app.get('/download/<filename>')
async def download_file(request, filename: str):
# Validate filename (prevent path traversal)
if '..' in filename or '/' in filename or '\\' in filename:
return Response('Invalid filename', status=400)
upload_dir = Path('/var/app/uploads')
file_path = upload_dir / filename
# Check if file exists and is within upload directory
if not file_path.exists() or not str(file_path).startswith(str(upload_dir)):
return Response('File not found', status=404)
# Serve file
return FileResponse(
str(file_path),
filename=filename,
content_type='application/octet-stream'
)
Logging & Monitoring¶
Security Event Logging¶
import logging
from datetime import datetime
# Configure security logger
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
handler = logging.FileHandler('/var/log/app/security.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
security_logger.addHandler(handler)
# Log authentication events
@app.post('/login')
async def login(request):
data = await request.json()
username = data.get('username')
client_ip = request.headers.get('x-forwarded-for',
request.scope.get('client', ['unknown'])[0])
user = await authenticate(username, data.get('password'))
if user:
security_logger.info(
f"Login success: user={username} ip={client_ip}"
)
return JSONResponse({'success': True})
else:
security_logger.warning(
f"Login failed: user={username} ip={client_ip}"
)
return JSONResponse({'error': 'Invalid credentials'}, status=401)
# Log authorization failures
@app.get('/admin/users')
async def admin_users(request):
user_role = request.session.get('role')
if user_role != 'admin':
security_logger.warning(
f"Unauthorized access attempt: "
f"user={request.session.get('user_id')} "
f"role={user_role} path=/admin/users"
)
return JSONResponse({'error': 'Forbidden'}, status=403)
# ... admin logic
Monitor Security Metrics¶
from collections import defaultdict
import time
# Track failed login attempts
failed_logins = defaultdict(list)
@app.post('/login')
async def login(request):
data = await request.json()
username = data.get('username')
client_ip = request.headers.get('x-forwarded-for', 'unknown')
# Check for brute force attempts
recent_failures = [
t for t in failed_logins[username]
if time.time() - t < 300 # Last 5 minutes
]
if len(recent_failures) >= 5:
security_logger.warning(
f"Brute force detected: user={username} ip={client_ip}"
)
return JSONResponse(
{'error': 'Too many failed attempts'},
status=429
)
user = await authenticate(username, data.get('password'))
if not user:
failed_logins[username].append(time.time())
return JSONResponse({'error': 'Invalid credentials'}, status=401)
# Clear failed attempts on success
failed_logins[username] = []
# ... successful login logic
Security Checklist¶
Use this checklist before deploying to production:
Environment & Configuration¶
[ ]
ENV=productionis set[ ] Strong, random secret key stored in environment variable
[ ] HTTPS is enforced via reverse proxy or load balancer
[ ] TLS certificates are valid and up-to-date
[ ] Sensitive data is not logged or exposed in error messages
Middleware & Headers¶
[ ] SecurityMiddleware is enabled with production settings
[ ] CSRF protection is enabled for state-changing requests
[ ] Security headers are configured (HSTS, CSP, COOP, COEP)
[ ] Cookie flags are secure (Secure, HttpOnly, SameSite=Strict)
[ ] CORS is properly configured if needed
Session Management¶
[ ] Session storage backend is production-ready (Redis/SQL, not Memory)
[ ] Session IDs are signed to prevent tampering
[ ] Session regeneration after authentication/privilege changes
[ ] Session timeout and idle timeout are implemented
[ ] Proper session cleanup on logout
Authentication & Authorization¶
[ ] Strong password requirements are enforced
[ ] Passwords are hashed (bcrypt, Argon2, never plain text)
[ ] JWT tokens have reasonable expiration times
[ ] Role-based access control is implemented
[ ] API endpoints have proper authentication
Input Validation¶
[ ] All user input is validated and sanitized
[ ] Request body size limits are enforced
[ ] File uploads are validated (type, size, content)
[ ] Path traversal attacks are prevented
[ ] SQL injection is prevented (use parameterized queries)
Rate Limiting¶
[ ] Rate limiting is applied to sensitive endpoints
[ ] Authentication endpoints have brute-force protection
[ ] API endpoints have appropriate rate limits
Monitoring & Logging¶
[ ] Security events are logged (login, authorization failures)
[ ] Failed authentication attempts are monitored
[ ] Logs are stored securely and rotated
[ ] Alerts are configured for security events
[ ] Regular security audits are performed
Dependencies & Updates¶
[ ] All dependencies are up-to-date
[ ] Security advisories are monitored
[ ] Regular dependency updates are performed
[ ] Automated vulnerability scanning is enabled
Testing¶
[ ] Security tests are included in test suite
[ ] CSRF protection is tested
[ ] Authentication and authorization are tested
[ ] Input validation is tested with malicious inputs
[ ] Penetration testing is performed before launch
Additional Resources¶
Getting Help¶
If you discover security vulnerabilities in Gobstopper:
Do not open a public GitHub issue
Email security concerns to the maintainers
Allow reasonable time for a fix before public disclosure
For security questions about your application:
Review OWASP guidelines
Consult with security professionals
Perform regular security audits
Stay informed about emerging threats