Security Best Practices for Gobstopper

This guide covers security best practices for building production-ready Gobstopper applications. Gobstopper includes built-in security features through the SecurityMiddleware, but proper configuration and usage are essential.

Table of Contents

Production Environment Setup

Environment Variable

Always set the ENV environment variable to production in production deployments:

export ENV=production

This automatically enforces secure defaults:

  • Cookie Secure flag (HTTPS only)

  • Cookie HttpOnly flag (prevents JavaScript access)

  • Cookie SameSite=Strict (prevents CSRF attacks)

Secret Key Management

Never hardcode secret keys in your source code.

# ❌ NEVER DO THIS
security = SecurityMiddleware(secret_key='my-secret-key')

# ✅ DO THIS
import os
security = SecurityMiddleware(
    secret_key=os.environ['GOBSTOPPER_SECRET_KEY']
)

Generate strong secret keys:

# Generate a secure secret key
python -c "import secrets; print(secrets.token_urlsafe(32))"

Store secrets in:

  • Environment variables

  • Secret management services (AWS Secrets Manager, HashiCorp Vault, etc.)

  • Encrypted configuration files

HTTPS Enforcement

Always use HTTPS in production. The SecurityMiddleware sets HSTS headers, but your deployment must handle TLS termination:

security = SecurityMiddleware(
    secret_key=os.environ['GOBSTOPPER_SECRET_KEY'],
    enable_security_headers=True,
    hsts_max_age=31536000,  # 1 year
    hsts_include_subdomains=True
)

Deployment options:

  • Use a reverse proxy (Nginx, Caddy) with TLS certificates

  • Use load balancer TLS termination (AWS ELB, GCP Load Balancer)

  • Use Let’s Encrypt for free certificates

  • Consider using Caddy for automatic HTTPS

CSRF Protection

CSRF (Cross-Site Request Forgery) protection is enabled by default and validates tokens for all state-changing requests (POST, PUT, DELETE, PATCH).

Basic CSRF Setup

from gobstopper import Gobstopper
from gobstopper.middleware import SecurityMiddleware

app = Gobstopper(__name__)
security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    enable_csrf=True  # Default
)
app.add_middleware(security)

Including CSRF Tokens in Forms

Generate and include CSRF tokens in your templates:

@app.get('/form')
async def show_form(request):
    csrf_token = security.generate_csrf_token(request.session)
    return await app.render_template('form.html', csrf_token=csrf_token)

Template (Jinja2):

<form method="POST" action="/submit">
    <input type="hidden" name="csrf_token" value="{{ csrf_token }}">
    <input type="text" name="username">
    <button type="submit">Submit</button>
</form>

CSRF for AJAX Requests

For AJAX requests, include the token in the X-CSRF-Token header:

// Get token from meta tag or data attribute
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;

fetch('/api/update', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'X-CSRF-Token': csrfToken
    },
    body: JSON.stringify(data)
});

Template meta tag:

<meta name="csrf-token" content="{{ csrf_token }}">

Exempting Endpoints

For public APIs that don’t use session-based authentication, you may want to disable CSRF for specific endpoints. Currently, you can:

  1. Use a separate app instance without SecurityMiddleware for API routes

  2. Implement route-level middleware to skip CSRF validation

  3. Use token-based authentication (JWT) instead of sessions

Session Management

Session Storage Backends

Gobstopper supports multiple session storage backends:

from gobstopper.middleware import SecurityMiddleware
from gobstopper.sessions.storage import (
    FileSessionStorage,      # File-based (default)
    MemorySessionStorage,    # In-memory (dev only)
    SQLSessionStorage,       # SQL database
    RedisSessionStorage      # Redis cache
)
from pathlib import Path

# File-based (simple deployments)
security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    session_storage=FileSessionStorage(Path('./sessions'))
)

# Redis-based (production recommended)
security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    session_storage=RedisSessionStorage(
        redis_url='redis://localhost:6379/0',
        prefix='gobstopper:session:'
    )
)

# SQL-based (database deployments)
security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    session_storage=SQLSessionStorage(
        connection_string='postgresql://user:pass@localhost/db'
    )
)

Session Configuration

security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    cookie_name='session_id',
    cookie_max_age=86400,  # 24 hours
    rolling_sessions=True,  # Extend session on each request
    sign_session_id=True   # Sign session IDs to prevent tampering
)

Session Regeneration

Always regenerate session IDs after privilege escalation (login, permission changes):

@app.post('/login')
async def login(request):
    data = await request.json()

    # Validate credentials
    user = await authenticate(data['username'], data['password'])
    if not user:
        return JSONResponse({'error': 'Invalid credentials'}, status=401)

    # ✅ Regenerate session ID to prevent session fixation
    new_session_id = await security.regenerate_session_id(request)

    # Store user data in session
    request.session['user_id'] = user.id
    request.session['authenticated'] = True
    request.session['role'] = user.role

    # Set the new session cookie
    response = JSONResponse({'success': True})
    signed_id = security.sign_cookie_value(new_session_id)
    response.set_cookie(
        security.cookie_name,
        signed_id,
        max_age=security.cookie_max_age,
        secure=security.cookie_secure,
        httponly=security.cookie_httponly,
        samesite=security.cookie_samesite,
        path=security.cookie_path
    )

    return response

Session Destruction (Logout)

@app.post('/logout')
async def logout(request):
    session_id = security.get_session_id(request)

    if session_id:
        # Destroy session data
        await security.destroy_session(session_id)

    # Clear the cookie
    response = JSONResponse({'success': True})
    response.delete_cookie(security.cookie_name)

    return response

Session Timeouts

Implement both absolute and idle timeouts:

import time

@app.before_request
async def check_session_timeout(request):
    """Check session expiration and idle timeout."""
    if not hasattr(request, 'session') or not request.session:
        return

    now = time.time()

    # Absolute timeout: 24 hours from creation
    created_at = request.session.get('created_at')
    if not created_at:
        request.session['created_at'] = now
        created_at = now

    if now - created_at > 86400:  # 24 hours
        session_id = security.get_session_id(request)
        if session_id:
            await security.destroy_session(session_id)
            request.session = {}
        return Response('Session expired', status=401)

    # Idle timeout: 30 minutes of inactivity
    last_activity = request.session.get('last_activity', created_at)
    if now - last_activity > 1800:  # 30 minutes
        session_id = security.get_session_id(request)
        if session_id:
            await security.destroy_session(session_id)
            request.session = {}
        return Response('Session expired due to inactivity', status=401)

    # Update last activity time
    request.session['last_activity'] = now

Security Headers

The SecurityMiddleware automatically adds comprehensive security headers. Configure them for your application:

security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    enable_security_headers=True,

    # HSTS: Force HTTPS
    hsts_max_age=31536000,  # 1 year
    hsts_include_subdomains=True,

    # CSP: Prevent XSS attacks
    csp_policy=(
        "default-src 'self'; "
        "script-src 'self' https://cdn.example.com; "
        "style-src 'self' 'unsafe-inline'; "
        "img-src 'self' data: https:; "
        "font-src 'self' https://fonts.gstatic.com; "
        "connect-src 'self' https://api.example.com; "
        "frame-ancestors 'none'"
    ),

    # Referrer policy
    referrer_policy='strict-origin-when-cross-origin',

    # Cross-Origin policies
    coop_policy='same-origin',
    coep_policy='require-corp'
)

Content Security Policy (CSP)

Customize CSP based on your application’s needs:

# Strict CSP (recommended)
csp_policy = "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self' data:"

# Allow specific CDNs
csp_policy = (
    "default-src 'self'; "
    "script-src 'self' https://cdn.jsdelivr.net; "
    "style-src 'self' https://fonts.googleapis.com; "
    "font-src 'self' https://fonts.gstatic.com"
)

# For development (relaxed, DO NOT use in production)
csp_policy = "default-src 'self' 'unsafe-inline' 'unsafe-eval'"

CSP Testing:

  1. Start with Content-Security-Policy-Report-Only header to test without blocking

  2. Use CSP violation reporting to identify issues

  3. Gradually tighten the policy

Input Validation

Request Body Size Limits

Gobstopper includes built-in request body size limits to prevent DoS attacks:

from gobstopper.http.request import Request

# Global limit is set in Request class
# Default: 10MB for JSON, 100MB for multipart

# Check body size in middleware
@app.middleware
async def enforce_size_limit(request, call_next):
    content_length = request.headers.get('content-length')
    if content_length and int(content_length) > 10_000_000:  # 10MB
        return Response('Request too large', status=413)
    return await call_next(request)

JSON Validation

Validate and sanitize all user input:

from dataclasses import dataclass
from typing import Optional

@dataclass
class CreateUserRequest:
    username: str
    email: str
    age: Optional[int] = None

@app.post('/users')
async def create_user(request):
    try:
        data = await request.json()

        # Validate required fields
        if not data.get('username') or not data.get('email'):
            return JSONResponse(
                {'error': 'Missing required fields'},
                status=400
            )

        # Validate types
        if not isinstance(data['username'], str):
            return JSONResponse(
                {'error': 'username must be a string'},
                status=400
            )

        # Validate length
        if len(data['username']) > 50:
            return JSONResponse(
                {'error': 'username too long'},
                status=400
            )

        # Validate format (email regex)
        import re
        if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', data['email']):
            return JSONResponse(
                {'error': 'Invalid email format'},
                status=400
            )

        # Process validated data
        user = await create_user_in_db(data)
        return JSONResponse(user, status=201)

    except ValueError:
        return JSONResponse({'error': 'Invalid JSON'}, status=400)

Use OpenAPI for Validation

For automatic validation, use the OpenAPI extension with type adapters:

from gobstopper.extensions.openapi import attach_openapi
from gobstopper.extensions.openapi.decorators import doc, request_body, response
from dataclasses import dataclass

attach_openapi(app, title="My API", version="1.0.0")

@dataclass
class CreateUserRequest:
    username: str
    email: str
    age: int = 18

@app.post('/users')
@doc(summary="Create user", tags=["Users"])
@request_body(model=CreateUserRequest, required=True)
@response(201, description="User created")
async def create_user(request):
    data = await request.json()
    # OpenAPI extension documents the schema
    # You still need to validate manually
    return JSONResponse(data, status=201)

Rate Limiting

Protect your endpoints from abuse with rate limiting:

from gobstopper.utils.rate_limiter import TokenBucketLimiter, rate_limit

# Create a rate limiter
limiter = TokenBucketLimiter(
    rate=10,           # 10 tokens per second
    capacity=100,      # Burst capacity of 100
    cost_per_request=1 # 1 token per request
)

# Apply to specific routes
@app.get('/api/data')
@rate_limit(limiter)
async def get_data(request):
    return JSONResponse({'data': 'sensitive'})

# Or apply globally via middleware
@app.middleware
async def rate_limit_middleware(request, call_next):
    client_ip = request.headers.get('x-forwarded-for',
                                   request.scope.get('client', [''])[0])

    if not await limiter.allow(client_ip):
        return JSONResponse(
            {'error': 'Rate limit exceeded'},
            status=429,
            headers={'Retry-After': '60'}
        )

    return await call_next(request)

Per-User Rate Limiting

@app.get('/api/user-data')
@rate_limit(limiter)
async def get_user_data(request):
    user_id = request.session.get('user_id')
    if not user_id:
        return JSONResponse({'error': 'Unauthorized'}, status=401)

    # Use user_id as rate limit key
    key = f"user:{user_id}"
    if not await limiter.allow(key):
        return JSONResponse(
            {'error': 'Rate limit exceeded'},
            status=429
        )

    return JSONResponse({'data': 'user-specific'})

Authentication & Authorization

JWT-Based Authentication

import jwt
from datetime import datetime, timedelta

SECRET_KEY = os.environ['JWT_SECRET_KEY']

def create_jwt_token(user_id: int, role: str) -> str:
    """Create a JWT token for a user."""
    payload = {
        'user_id': user_id,
        'role': role,
        'exp': datetime.utcnow() + timedelta(hours=24),
        'iat': datetime.utcnow()
    }
    return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

def verify_jwt_token(token: str) -> dict:
    """Verify and decode a JWT token."""
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
    except jwt.ExpiredSignatureError:
        raise ValueError('Token expired')
    except jwt.InvalidTokenError:
        raise ValueError('Invalid token')

# Middleware for JWT authentication
@app.middleware
async def jwt_auth_middleware(request, call_next):
    # Skip auth for public endpoints
    if request.path in ['/login', '/register', '/health']:
        return await call_next(request)

    # Extract token from Authorization header
    auth_header = request.headers.get('authorization', '')
    if not auth_header.startswith('Bearer '):
        return JSONResponse({'error': 'Missing token'}, status=401)

    token = auth_header[7:]  # Remove "Bearer " prefix

    try:
        payload = verify_jwt_token(token)
        request.state.user_id = payload['user_id']
        request.state.role = payload['role']
    except ValueError as e:
        return JSONResponse({'error': str(e)}, status=401)

    return await call_next(request)

# Login endpoint
@app.post('/login')
async def login(request):
    data = await request.json()
    user = await authenticate(data['username'], data['password'])

    if not user:
        return JSONResponse({'error': 'Invalid credentials'}, status=401)

    token = create_jwt_token(user.id, user.role)
    return JSONResponse({'token': token})

Role-Based Access Control (RBAC)

from functools import wraps

def require_role(*roles):
    """Decorator to require specific roles."""
    def decorator(func):
        @wraps(func)
        async def wrapper(request, *args, **kwargs):
            user_role = getattr(request.state, 'role', None)
            if user_role not in roles:
                return JSONResponse(
                    {'error': 'Insufficient permissions'},
                    status=403
                )
            return await func(request, *args, **kwargs)
        return wrapper
    return decorator

# Usage
@app.delete('/users/<int:user_id>')
@require_role('admin', 'moderator')
async def delete_user(request, user_id: int):
    await delete_user_from_db(user_id)
    return JSONResponse({'success': True})

Secure File Uploads

File Upload Validation

import os
import hashlib
from pathlib import Path

ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'pdf'}
MAX_FILE_SIZE = 5 * 1024 * 1024  # 5MB

def allowed_file(filename: str) -> bool:
    """Check if file extension is allowed."""
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.post('/upload')
async def upload_file(request):
    # Get file from multipart form
    form = await request.get_form()
    file = form.get('file')

    if not file:
        return JSONResponse({'error': 'No file provided'}, status=400)

    filename = file.filename
    content = file.file.read()

    # Validate file size
    if len(content) > MAX_FILE_SIZE:
        return JSONResponse({'error': 'File too large'}, status=413)

    # Validate file extension
    if not allowed_file(filename):
        return JSONResponse({'error': 'File type not allowed'}, status=400)

    # Generate safe filename (prevent path traversal)
    safe_filename = hashlib.sha256(
        f"{filename}{time.time()}".encode()
    ).hexdigest()[:16]
    ext = filename.rsplit('.', 1)[1].lower()
    safe_filename = f"{safe_filename}.{ext}"

    # Save to secure location outside web root
    upload_dir = Path('/var/app/uploads')
    upload_dir.mkdir(exist_ok=True)
    file_path = upload_dir / safe_filename

    with open(file_path, 'wb') as f:
        f.write(content)

    return JSONResponse({
        'success': True,
        'filename': safe_filename
    })

File Download Security

@app.get('/download/<filename>')
async def download_file(request, filename: str):
    # Validate filename (prevent path traversal)
    if '..' in filename or '/' in filename or '\\' in filename:
        return Response('Invalid filename', status=400)

    upload_dir = Path('/var/app/uploads')
    file_path = upload_dir / filename

    # Check if file exists and is within upload directory
    if not file_path.exists() or not str(file_path).startswith(str(upload_dir)):
        return Response('File not found', status=404)

    # Serve file
    return FileResponse(
        str(file_path),
        filename=filename,
        content_type='application/octet-stream'
    )

Logging & Monitoring

Security Event Logging

import logging
from datetime import datetime

# Configure security logger
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
handler = logging.FileHandler('/var/log/app/security.log')
handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(levelname)s - %(message)s'
))
security_logger.addHandler(handler)

# Log authentication events
@app.post('/login')
async def login(request):
    data = await request.json()
    username = data.get('username')
    client_ip = request.headers.get('x-forwarded-for',
                                   request.scope.get('client', ['unknown'])[0])

    user = await authenticate(username, data.get('password'))

    if user:
        security_logger.info(
            f"Login success: user={username} ip={client_ip}"
        )
        return JSONResponse({'success': True})
    else:
        security_logger.warning(
            f"Login failed: user={username} ip={client_ip}"
        )
        return JSONResponse({'error': 'Invalid credentials'}, status=401)

# Log authorization failures
@app.get('/admin/users')
async def admin_users(request):
    user_role = request.session.get('role')
    if user_role != 'admin':
        security_logger.warning(
            f"Unauthorized access attempt: "
            f"user={request.session.get('user_id')} "
            f"role={user_role} path=/admin/users"
        )
        return JSONResponse({'error': 'Forbidden'}, status=403)

    # ... admin logic

Monitor Security Metrics

from collections import defaultdict
import time

# Track failed login attempts
failed_logins = defaultdict(list)

@app.post('/login')
async def login(request):
    data = await request.json()
    username = data.get('username')
    client_ip = request.headers.get('x-forwarded-for', 'unknown')

    # Check for brute force attempts
    recent_failures = [
        t for t in failed_logins[username]
        if time.time() - t < 300  # Last 5 minutes
    ]

    if len(recent_failures) >= 5:
        security_logger.warning(
            f"Brute force detected: user={username} ip={client_ip}"
        )
        return JSONResponse(
            {'error': 'Too many failed attempts'},
            status=429
        )

    user = await authenticate(username, data.get('password'))

    if not user:
        failed_logins[username].append(time.time())
        return JSONResponse({'error': 'Invalid credentials'}, status=401)

    # Clear failed attempts on success
    failed_logins[username] = []

    # ... successful login logic

Security Checklist

Use this checklist before deploying to production:

Environment & Configuration

  • [ ] ENV=production is set

  • [ ] Strong, random secret key stored in environment variable

  • [ ] HTTPS is enforced via reverse proxy or load balancer

  • [ ] TLS certificates are valid and up-to-date

  • [ ] Sensitive data is not logged or exposed in error messages

Middleware & Headers

  • [ ] SecurityMiddleware is enabled with production settings

  • [ ] CSRF protection is enabled for state-changing requests

  • [ ] Security headers are configured (HSTS, CSP, COOP, COEP)

  • [ ] Cookie flags are secure (Secure, HttpOnly, SameSite=Strict)

  • [ ] CORS is properly configured if needed

Session Management

  • [ ] Session storage backend is production-ready (Redis/SQL, not Memory)

  • [ ] Session IDs are signed to prevent tampering

  • [ ] Session regeneration after authentication/privilege changes

  • [ ] Session timeout and idle timeout are implemented

  • [ ] Proper session cleanup on logout

Authentication & Authorization

  • [ ] Strong password requirements are enforced

  • [ ] Passwords are hashed (bcrypt, Argon2, never plain text)

  • [ ] JWT tokens have reasonable expiration times

  • [ ] Role-based access control is implemented

  • [ ] API endpoints have proper authentication

Input Validation

  • [ ] All user input is validated and sanitized

  • [ ] Request body size limits are enforced

  • [ ] File uploads are validated (type, size, content)

  • [ ] Path traversal attacks are prevented

  • [ ] SQL injection is prevented (use parameterized queries)

Rate Limiting

  • [ ] Rate limiting is applied to sensitive endpoints

  • [ ] Authentication endpoints have brute-force protection

  • [ ] API endpoints have appropriate rate limits

Monitoring & Logging

  • [ ] Security events are logged (login, authorization failures)

  • [ ] Failed authentication attempts are monitored

  • [ ] Logs are stored securely and rotated

  • [ ] Alerts are configured for security events

  • [ ] Regular security audits are performed

Dependencies & Updates

  • [ ] All dependencies are up-to-date

  • [ ] Security advisories are monitored

  • [ ] Regular dependency updates are performed

  • [ ] Automated vulnerability scanning is enabled

Testing

  • [ ] Security tests are included in test suite

  • [ ] CSRF protection is tested

  • [ ] Authentication and authorization are tested

  • [ ] Input validation is tested with malicious inputs

  • [ ] Penetration testing is performed before launch

Additional Resources

Getting Help

If you discover security vulnerabilities in Gobstopper:

  1. Do not open a public GitHub issue

  2. Email security concerns to the maintainers

  3. Allow reasonable time for a fix before public disclosure

For security questions about your application:

  • Review OWASP guidelines

  • Consult with security professionals

  • Perform regular security audits

  • Stay informed about emerging threats