Performance Tuning Guide for Gobstopper

This comprehensive guide covers performance optimization strategies for Gobstopper applications, from development through production deployment.

Table of Contents

Performance Overview

Gobstopper is designed for high performance through:

  • RSGI Protocol: Native async Python interface optimized for speed

  • Rust Components: Optional Rust-powered router, templates, and static files

  • Efficient Task Queue: Background processing with DuckDB persistence

  • Smart Caching: Template and static file caching

  • Minimal Overhead: Lean middleware chain with optional features

  • Lazy Loading: Headers, query params, and body parsing only when accessed

  • Pre-compilation: Middleware chains and decoders built at registration time

Validated Performance Metrics

Real-world benchmarks with PostgreSQL backend:

  • JSON Serialization: 17,000+ RPS

  • Plaintext Response: 18,000+ RPS

  • Single DB Query: 14,000+ RPS

  • Multiple Queries (20): 5,500+ RPS

  • Database Updates (5): 7,700+ RPS

Version 0.3.6 Performance Improvements

The 0.3.6 release includes conservative internal optimizations providing 3-8% throughput improvement:

Optimization

Impact

Workload

Rust URL Decoding

~3-5%

Routes with dynamic parameters

Lazy Header Access

~2-3%

Endpoints not using headers

msgspec Decoder Caching

~2-3%

JSON endpoints with msgspec models

Lazy Query Parsing

~1-2%

Routes without query params

Combined Effect

3-8%

Typical web applications

These optimizations are automatic - no code changes required to benefit from them! The gains are modest but measurable, and more importantly, these changes maintain code simplicity and debuggability.

Rust Components

Gobstopper includes optional Rust components that provide significant performance improvements. These are automatically detected and used when available.

Rust Router

The Rust router provides 2-3x faster path matching compared to the Python implementation.

Installation:

# Install with Rust components
cd rust/gobstopper_core_rs
cargo build --release

# Or install from project root
uv sync --extra rust

Verification:

from gobstopper import Gobstopper

app = Gobstopper(__name__)

# Check if Rust router is active
if hasattr(app, '_use_rust_router') and app._use_rust_router:
    print("✅ Using Rust router")
else:
    print("⚠️ Using Python router")

Performance Impact:

  • Path matching: 2-3x faster

  • Parameter extraction: 1.5-2x faster

  • Route registration: Similar to Python

  • Best for: Applications with many routes (50+)

Rust Template Engine

The Rust template engine (Tera) is compatible with Jinja2 templates and offers:

  • 3-5x faster rendering for large templates

  • Lower memory usage

  • Built-in template caching

Setup:

from gobstopper import Gobstopper

app = Gobstopper(__name__, use_rust_templates=True)

# Verify Rust templates are active
if app.template_engine and 'Rust' in str(type(app.template_engine)):
    print("✅ Using Rust templates")

When to Use:

  • Complex templates with many loops/conditionals

  • High-traffic template rendering

  • Applications serving many concurrent users

  • Memory-constrained environments

Limitations:

  • Some advanced Jinja2 features may not be available

  • Custom filters must be registered differently

  • Template debugging is less detailed

Rust Static File Middleware

Optimized static file serving with better performance characteristics:

from gobstopper.middleware import RustStaticFileMiddleware

app = Gobstopper(__name__)
app.add_middleware(RustStaticFileMiddleware(
    directory='./static',
    url_prefix='/static',
    cache_max_age=31536000  # 1 year for hashed assets
))

Benefits:

  • 2-4x faster file serving

  • Better handling of concurrent requests

  • Lower memory footprint

  • Automatic content type detection

Template Engine Optimization

Template Caching

Both Jinja2 and Tera engines cache compiled templates by default:

from gobstopper import Gobstopper

app = Gobstopper(__name__)

# Jinja2 caching (default)
app.template_engine.env.auto_reload = False  # Disable reload in production
app.template_engine.env.cache_size = 400    # Increase cache size

# Monitor cache hits
print(f"Cache info: {app.template_engine.env.cache}")

Template Best Practices

1. Minimize Template Complexity

<!-- ❌ Slow: Complex loop with nested conditionals -->
{% for user in users %}
  {% if user.active %}
    {% for post in user.posts %}
      {% if post.published %}
        <div>{{ post.title }}</div>
      {% endif %}
    {% endfor %}
  {% endif %}
{% endfor %}

<!-- ✅ Fast: Pre-filter data in Python -->
{% for post in published_posts %}
  <div>{{ post.title }}</div>
{% endfor %}

2. Use Template Inheritance Efficiently

<!-- base.html: Keep base templates lean -->
<!DOCTYPE html>
<html>
<head>
    <title>{% block title %}Default{% endblock %}</title>
    {% block head %}{% endblock %}
</head>
<body>
    {% block content %}{% endblock %}
</body>
</html>

<!-- page.html: Extend efficiently -->
{% extends "base.html" %}
{% block title %}{{ page_title }}{% endblock %}
{% block content %}
    {{ content | safe }}
{% endblock %}

3. Avoid Heavy Filters in Templates

# ❌ Slow: Complex filter in template
@app.template_filter('format_markdown')
def format_markdown(text):
    return markdown.markdown(text, extensions=['tables', 'fenced_code'])

# ✅ Fast: Pre-process in view
@app.get('/post/<int:id>')
async def show_post(request, id: int):
    post = await get_post(id)
    post.html_content = markdown.markdown(post.content)  # Pre-process
    return await app.render_template('post.html', post=post)

Template Streaming

For large pages, use streaming templates to improve perceived performance:

from gobstopper.http.response import StreamResponse

@app.get('/large-report')
async def large_report(request):
    async def generate():
        # Yield header immediately
        yield await app.render_template_partial('header.html')

        # Stream data as it becomes available
        async for chunk in get_report_data():
            yield await app.render_template_partial('row.html', data=chunk)

        # Yield footer
        yield await app.render_template_partial('footer.html')

    return StreamResponse(generate(), content_type='text/html')

Static File Serving

Production Static File Strategy

Use a CDN or dedicated static server in production:

# Development: Serve locally
if os.getenv('ENV') != 'production':
    from gobstopper.middleware import RustStaticFileMiddleware
    app.add_middleware(RustStaticFileMiddleware(
        directory='./static',
        url_prefix='/static'
    ))
else:
    # Production: Use CDN URLs
    app.static_url_prefix = 'https://cdn.example.com/static'

Asset Optimization

1. File Compression

Pre-compress static assets:

# Compress CSS/JS with gzip and brotli
find static -type f \( -name "*.css" -o -name "*.js" \) -exec gzip -k {} \;
find static -type f \( -name "*.css" -o -name "*.js" \) -exec brotli {} \;

Configure middleware to serve pre-compressed files:

from gobstopper.middleware import RustStaticFileMiddleware

app.add_middleware(RustStaticFileMiddleware(
    directory='./static',
    url_prefix='/static',
    precompressed=True  # Serve .br and .gz files if available
))

2. Asset Versioning

Use content hashing for cache busting:

import hashlib
from pathlib import Path

def hash_static_file(filepath: str) -> str:
    """Generate hash for static file."""
    with open(filepath, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()[:8]

# Build asset manifest
static_dir = Path('./static')
assets = {}
for file in static_dir.rglob('*'):
    if file.is_file():
        rel_path = str(file.relative_to(static_dir))
        file_hash = hash_static_file(file)
        name, ext = rel_path.rsplit('.', 1)
        hashed_name = f"{name}.{file_hash}.{ext}"
        assets[rel_path] = hashed_name

# Use in templates
@app.context_processor
async def inject_assets():
    return {'asset': lambda path: f"/static/{assets.get(path, path)}"}

Template usage:

<link rel="stylesheet" href="{{ asset('css/main.css') }}">
<script src="{{ asset('js/app.js') }}"></script>

3. Cache Headers

Set appropriate cache headers:

from gobstopper.middleware import RustStaticFileMiddleware

app.add_middleware(RustStaticFileMiddleware(
    directory='./static',
    url_prefix='/static',
    cache_max_age=31536000,  # 1 year for versioned assets
    immutable=True           # Assets never change
))

Database Optimization

Connection Pooling

Use connection pools for database access:

import asyncpg
from contextlib import asynccontextmanager

class DatabasePool:
    def __init__(self, dsn: str, min_size: int = 10, max_size: int = 20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=60
        )

    async def close(self):
        if self.pool:
            await self.pool.close()

    @asynccontextmanager
    async def acquire(self):
        async with self.pool.acquire() as conn:
            yield conn

# Application setup
db = DatabasePool('postgresql://user:pass@localhost/db')

@app.on_startup
async def startup():
    await db.connect()

@app.on_shutdown
async def shutdown():
    await db.close()

# Usage in routes
@app.get('/users/<int:id>')
async def get_user(request, id: int):
    async with db.acquire() as conn:
        user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', id)
        return JSONResponse(dict(user))

Query Optimization

1. Use Prepared Statements

# ❌ Slow: New query each time
async def get_user(user_id: int):
    async with db.acquire() as conn:
        return await conn.fetchrow(
            f'SELECT * FROM users WHERE id = {user_id}'  # SQL injection risk!
        )

# ✅ Fast: Prepared statement
async def get_user(user_id: int):
    async with db.acquire() as conn:
        return await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id  # Safe and cached
        )

2. Batch Queries

# ❌ Slow: N+1 queries
async def get_users_with_posts(user_ids: list[int]):
    users = []
    async with db.acquire() as conn:
        for user_id in user_ids:
            user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
            posts = await conn.fetch('SELECT * FROM posts WHERE user_id = $1', user_id)
            users.append({'user': user, 'posts': posts})
    return users

# ✅ Fast: Batch query with JOIN
async def get_users_with_posts(user_ids: list[int]):
    async with db.acquire() as conn:
        rows = await conn.fetch('''
            SELECT u.*, p.id as post_id, p.title, p.content
            FROM users u
            LEFT JOIN posts p ON p.user_id = u.id
            WHERE u.id = ANY($1)
        ''', user_ids)
        # Group results in Python
        return group_by_user(rows)

3. Use Indexes

-- Create indexes for frequently queried columns
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_user_id ON posts(user_id);
CREATE INDEX idx_posts_created_at ON posts(created_at DESC);

-- Composite indexes for multi-column queries
CREATE INDEX idx_posts_user_status ON posts(user_id, status);

Database Query Caching

Cache expensive query results:

from functools import lru_cache
import asyncio

# Simple in-memory cache with TTL
cache = {}
cache_timestamps = {}
CACHE_TTL = 60  # 60 seconds

async def cached_query(key: str, query_func, ttl: int = CACHE_TTL):
    """Cache query results with TTL."""
    now = asyncio.get_event_loop().time()

    if key in cache:
        if now - cache_timestamps[key] < ttl:
            return cache[key]

    result = await query_func()
    cache[key] = result
    cache_timestamps[key] = now
    return result

# Usage
@app.get('/stats/users')
async def user_stats(request):
    async def get_stats():
        async with db.acquire() as conn:
            return await conn.fetchrow('SELECT COUNT(*) as total FROM users')

    stats = await cached_query('user_stats', get_stats, ttl=300)
    return JSONResponse(dict(stats))

Caching Strategies

Response Caching

Cache entire responses for read-heavy endpoints:

from functools import wraps
import hashlib
import json

response_cache = {}

def cache_response(ttl: int = 60):
    """Decorator to cache responses."""
    def decorator(func):
        @wraps(func)
        async def wrapper(request, *args, **kwargs):
            # Create cache key from route and args
            cache_key = hashlib.md5(
                f"{request.path}:{json.dumps(kwargs)}".encode()
            ).hexdigest()

            # Check cache
            if cache_key in response_cache:
                cached, timestamp = response_cache[cache_key]
                if asyncio.get_event_loop().time() - timestamp < ttl:
                    return cached

            # Generate response
            response = await func(request, *args, **kwargs)

            # Cache it
            response_cache[cache_key] = (response, asyncio.get_event_loop().time())
            return response

        return wrapper
    return decorator

# Usage
@app.get('/api/popular-posts')
@cache_response(ttl=300)  # Cache for 5 minutes
async def popular_posts(request):
    posts = await get_popular_posts()
    return JSONResponse(posts)

Redis Caching

For distributed caching across multiple servers:

import aioredis
import json

class RedisCache:
    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.redis = None

    async def connect(self):
        self.redis = await aioredis.from_url(self.redis_url)

    async def get(self, key: str):
        value = await self.redis.get(key)
        return json.loads(value) if value else None

    async def set(self, key: str, value, ttl: int = 60):
        await self.redis.setex(key, ttl, json.dumps(value))

    async def delete(self, key: str):
        await self.redis.delete(key)

# Setup
cache = RedisCache('redis://localhost:6379/0')

@app.on_startup
async def startup():
    await cache.connect()

# Usage
@app.get('/api/user/<int:id>')
async def get_user(request, id: int):
    cache_key = f"user:{id}"

    # Try cache first
    user = await cache.get(cache_key)
    if user:
        return JSONResponse(user)

    # Fetch from database
    async with db.acquire() as conn:
        user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', id)
        user = dict(user)

    # Cache for 5 minutes
    await cache.set(cache_key, user, ttl=300)

    return JSONResponse(user)

Background Tasks

Task Queue Optimization

Configure the task queue for optimal performance:

from gobstopper import Gobstopper
from pathlib import Path

app = Gobstopper(__name__)

# Configure task storage
app.task_storage.cleanup_interval = 3600  # Cleanup every hour
app.task_storage.retention_days = 7      # Keep completed tasks for 7 days

# Start workers with concurrency
@app.on_startup
async def startup():
    # Start multiple workers for CPU-intensive tasks
    await app.start_workers(
        num_workers=4,           # 4 concurrent workers
        categories=['processing', 'heavy']
    )

    # Start dedicated worker for I/O tasks
    await app.start_workers(
        num_workers=8,           # More workers for I/O
        categories=['email', 'api']
    )

Task Priorities

Use priorities for critical tasks:

from gobstopper.tasks.models import TaskPriority

# High priority task
@app.task(name='send_verification_email', category='email', priority=TaskPriority.HIGH)
async def send_verification_email(user_id: int, email: str):
    await send_email(email, 'Verify your account', '...')

# Normal priority task
@app.task(name='generate_report', category='processing', priority=TaskPriority.NORMAL)
async def generate_report(report_id: int):
    await generate_pdf_report(report_id)

# Low priority task
@app.task(name='cleanup_old_data', category='maintenance', priority=TaskPriority.LOW)
async def cleanup_old_data():
    await delete_old_records()

Batch Processing

Process multiple items in batches:

@app.task(name='process_batch', category='processing')
async def process_batch(item_ids: list[int]):
    """Process multiple items at once."""
    async with db.acquire() as conn:
        # Fetch all items in one query
        items = await conn.fetch(
            'SELECT * FROM items WHERE id = ANY($1)',
            item_ids
        )

        # Process batch
        for item in items:
            await process_item(item)

# Queue batches instead of individual items
@app.post('/api/process')
async def queue_processing(request):
    data = await request.json()
    item_ids = data['item_ids']

    # Split into batches of 100
    batch_size = 100
    for i in range(0, len(item_ids), batch_size):
        batch = item_ids[i:i + batch_size]
        await app.queue_task('process_batch', item_ids=batch)

    return JSONResponse({'queued': len(item_ids)})

Request/Response Optimization

Automatic Optimizations (v0.3.6+)

Gobstopper 0.3.6 includes several automatic optimizations that improve performance without any code changes:

Lazy Header Access

Headers are no longer parsed and lowercased on every request. Instead, they’re computed only when accessed:

@app.get('/api/data')
async def get_data(request):
    # If you don't access request.headers, no processing occurs
    # This saves ~2-3% overhead for simple endpoints
    return JSONResponse({'data': 'value'})

@app.get('/api/auth')
async def check_auth(request):
    # Headers are parsed only when needed
    token = request.headers.get('authorization')  # Lazy computation happens here
    return JSONResponse({'authenticated': bool(token)})

Best Practice: Only access headers when necessary to maximize this optimization.

msgspec Decoder Caching

JSON decoders for msgspec.Struct models are cached on the model class:

import msgspec

class User(msgspec.Struct):
    name: str
    email: str
    age: int

@app.post('/api/users')
async def create_user(request: Request, user: User):
    # First request: decoder created and cached on User class
    # Subsequent requests: cached decoder reused (~2-3% faster)
    return JSONResponse({'created': user.name})

No configuration needed - caching happens automatically.

Lazy Query String Parsing

Query parameters are parsed only when request.args is accessed:

@app.get('/api/search')
async def search(request):
    # If you don't use query params, parsing is skipped
    # For requests without query strings, this is a free optimization
    query = request.args.get('q', [''])[0] if request.args else ''
    return JSONResponse({'query': query})

Rust-Level URL Decoding

URL decoding moved to Rust router for native-speed processing:

# URL with encoded characters: /api/users/John%20Doe
@app.get('/api/users/<name>')
async def get_user(request, name: str):
    # 'name' is decoded at Rust level (3-5% faster)
    # You receive: name = "John Doe"
    return JSONResponse({'user': name})

JSON Serialization

Use fast JSON serializers:

# Option 1: msgspec (fastest)
import msgspec

@app.get('/api/data')
async def get_data(request):
    data = {'items': [...]}  # Large dataset
    json_bytes = msgspec.json.encode(data)
    return Response(json_bytes, content_type='application/json')

# Option 2: orjson (very fast)
import orjson

@app.get('/api/data')
async def get_data(request):
    data = {'items': [...]}
    json_bytes = orjson.dumps(data)
    return Response(json_bytes, content_type='application/json')

Response Compression

Enable compression for text responses:

from gobstopper.middleware import CompressionMiddleware

app.add_middleware(CompressionMiddleware(
    minimum_size=1000,      # Only compress responses > 1KB
    compression_level=6     # Balance between speed and size
))

Request Body Parsing

Parse request bodies efficiently:

# ❌ Slow: Multiple awaits
@app.post('/api/users')
async def create_user(request):
    json_data = await request.json()
    form_data = await request.get_form()  # Never used
    # ...

# ✅ Fast: Parse only what you need
@app.post('/api/users')
async def create_user(request):
    json_data = await request.json()  # Only parse JSON
    # ...

Granian Server Configuration

Optimize Granian server settings for production:

# Development
granian --interface rsgi --reload example_app:app

# Production: Multiple workers
granian --interface rsgi \
    --workers 4 \
    --threads 2 \
    --blocking-threads 2 \
    --backlog 2048 \
    example_app:app

# Production: Optimized for high concurrency
granian --interface rsgi \
    --workers $(nproc) \
    --threads 4 \
    --blocking-threads 4 \
    --backlog 4096 \
    --http1-buffer-size 8192 \
    --http1-keep-alive \
    --http2-max-concurrent-streams 1000 \
    example_app:app

Configuration guidelines:

  • Workers: num_cpu_cores or num_cpu_cores + 1

  • Threads: 2-4 per worker for mixed I/O and CPU work

  • Blocking Threads: Match threads for blocking operations

  • Backlog: 2048-4096 for high-traffic sites

  • Keep-Alive: Enable for persistent connections

Worker Strategy

Choose worker strategy based on workload:

# CPU-intensive: More workers, fewer threads
granian --interface rsgi --workers 8 --threads 1 app:app

# I/O-intensive: Fewer workers, more threads
granian --interface rsgi --workers 2 --threads 8 app:app

# Mixed workload: Balanced (recommended)
granian --interface rsgi --workers 4 --threads 2 app:app

Monitoring & Profiling

Application Profiling

Profile your application to identify bottlenecks:

import cProfile
import pstats
from pstats import SortKey

def profile_route():
    """Profile a specific route."""
    profiler = cProfile.Profile()
    profiler.enable()

    # Your code here
    response = await some_route(request)

    profiler.disable()
    stats = pstats.Stats(profiler)
    stats.sort_stats(SortKey.TIME)
    stats.print_stats(20)  # Top 20 functions

# Or use middleware for profiling
@app.middleware
async def profiling_middleware(request, call_next):
    if request.path.startswith('/profile/'):
        profiler = cProfile.Profile()
        profiler.enable()
        response = await call_next(request)
        profiler.disable()

        # Save stats
        profiler.dump_stats(f'profile_{request.path.replace("/", "_")}.prof')
        return response

    return await call_next(request)

Performance Metrics

Track key metrics:

import time
from collections import defaultdict

metrics = {
    'request_count': defaultdict(int),
    'request_duration': defaultdict(list),
    'error_count': defaultdict(int)
}

@app.middleware
async def metrics_middleware(request, call_next):
    start_time = time.time()

    try:
        response = await call_next(request)
        metrics['request_count'][request.path] += 1

        duration = time.time() - start_time
        metrics['request_duration'][request.path].append(duration)

        return response

    except Exception as e:
        metrics['error_count'][request.path] += 1
        raise

# Expose metrics endpoint
@app.get('/metrics')
async def get_metrics(request):
    stats = {}
    for path, durations in metrics['request_duration'].items():
        stats[path] = {
            'count': metrics['request_count'][path],
            'avg_duration': sum(durations) / len(durations),
            'max_duration': max(durations),
            'errors': metrics['error_count'][path]
        }
    return JSONResponse(stats)

Load Testing

Use load testing tools to validate performance:

# wrk: HTTP benchmarking
wrk -t12 -c400 -d30s http://localhost:8000/api/data

# Apache Bench
ab -n 10000 -c 100 http://localhost:8000/

# Locust: Python-based load testing
locust -f locustfile.py --host=http://localhost:8000

Example Locust file:

from locust import HttpUser, task, between

class WebsiteUser(HttpUser):
    wait_time = between(1, 3)

    @task(3)
    def get_homepage(self):
        self.client.get("/")

    @task(2)
    def get_api_data(self):
        self.client.get("/api/data")

    @task(1)
    def post_data(self):
        self.client.post("/api/users", json={
            "name": "Test User",
            "email": "test@example.com"
        })

Production Deployment

Optimization Checklist

  • [ ] Enable Rust components (router, templates, static files)

  • [ ] Configure database connection pooling

  • [ ] Set up Redis for distributed caching

  • [ ] Enable response compression

  • [ ] Use CDN for static assets

  • [ ] Configure proper cache headers

  • [ ] Enable HTTP/2 in reverse proxy

  • [ ] Set up multiple Granian workers

  • [ ] Implement response caching for read-heavy endpoints

  • [ ] Profile and optimize slow queries

  • [ ] Monitor application metrics

  • [ ] Set up load balancing

  • [ ] Enable database query result caching

  • [ ] Optimize template rendering

  • [ ] Use background tasks for heavy operations

Deployment Architecture

Recommended production setup:

Internet
    ↓
Load Balancer (AWS ELB, Nginx)
    ↓
┌─────────────────────────────────┐
│  Reverse Proxy (Nginx/Caddy)   │
│  - TLS termination              │
│  - Static file serving          │
│  - Gzip/Brotli compression      │
│  - Rate limiting                │
└─────────────────────────────────┘
    ↓
┌─────────────────────────────────┐
│  Granian Workers (4-8)          │
│  - Gobstopper application             │
│  - Rust components enabled      │
│  - Connection pooling           │
└─────────────────────────────────┘
    ↓                         ↓
┌──────────────┐    ┌──────────────┐
│  PostgreSQL  │    │  Redis Cache │
│  (Primary)   │    │              │
└──────────────┘    └──────────────┘

Nginx Configuration

upstream gobstopper_app {
    least_conn;
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
    server 127.0.0.1:8003;
}

server {
    listen 443 ssl http2;
    server_name example.com;

    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;

    # Gzip compression
    gzip on;
    gzip_types text/plain text/css application/json application/javascript;
    gzip_min_length 1000;

    # Static files
    location /static/ {
        alias /var/www/static/;
        expires 1y;
        add_header Cache-Control "public, immutable";
    }

    # Application
    location / {
        proxy_pass http://gobstopper_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Timeouts
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;

        # Buffer settings
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
    }
}

Benchmarking Results

Test Environment

  • Hardware: 8 CPU cores, 16GB RAM

  • Database: PostgreSQL 15 with connection pool (20 connections)

  • Configuration: 4 Granian workers, Rust components enabled

Results

Test Type

Requests/sec

Latency (avg)

Latency (p99)

Plaintext

18,241

2.1ms

5.2ms

JSON

17,486

2.3ms

6.1ms

Single Query

14,203

2.8ms

8.4ms

Multiple Queries (20)

5,523

7.2ms

18.1ms

Updates (5)

7,741

5.1ms

14.2ms

Templates

12,004

3.3ms

9.7ms

Comparison with Other Frameworks

Gobstopper performance is competitive with major Python async frameworks:

  • FastAPI: Similar performance for JSON serialization

  • Starlette: Comparable baseline performance

  • BlackSheep: Similar template rendering speed

  • Sanic: Gobstopper typically 10-15% faster with Rust components

Continuous Performance Testing

Set up automated benchmarks:

#!/bin/bash
# benchmark.sh - Run performance tests

echo "Running Gobstopper benchmarks..."

# Start application
granian --interface rsgi --workers 4 benchmark_simple:app &
APP_PID=$!
sleep 2

# Run tests
wrk -t4 -c100 -d30s http://localhost:8000/json > results_json.txt
wrk -t4 -c100 -d30s http://localhost:8000/db > results_db.txt

# Stop application
kill $APP_PID

# Parse and compare results
python compare_benchmarks.py results_json.txt results_db.txt

Conclusion

Gobstopper provides excellent performance out of the box, but following these optimization strategies can push performance even higher:

  1. Enable Rust components for 2-4x performance gains

  2. Use connection pooling for database access

  3. Implement caching at multiple levels

  4. Optimize database queries with proper indexes

  5. Configure Granian with appropriate worker settings

  6. Monitor and profile to identify bottlenecks

  7. Use CDN for static assets in production

For most applications, these optimizations will achieve 15,000+ RPS with sub-5ms latency.