Performance Tuning Guide for Gobstopper¶
This comprehensive guide covers performance optimization strategies for Gobstopper applications, from development through production deployment.
Table of Contents¶
Performance Overview¶
Gobstopper is designed for high performance through:
RSGI Protocol: Native async Python interface optimized for speed
Rust Components: Optional Rust-powered router, templates, and static files
Efficient Task Queue: Background processing with DuckDB persistence
Smart Caching: Template and static file caching
Minimal Overhead: Lean middleware chain with optional features
Lazy Loading: Headers, query params, and body parsing only when accessed
Pre-compilation: Middleware chains and decoders built at registration time
Validated Performance Metrics¶
Real-world benchmarks with PostgreSQL backend:
JSON Serialization: 17,000+ RPS
Plaintext Response: 18,000+ RPS
Single DB Query: 14,000+ RPS
Multiple Queries (20): 5,500+ RPS
Database Updates (5): 7,700+ RPS
Version 0.3.6 Performance Improvements¶
The 0.3.6 release includes conservative internal optimizations providing 3-8% throughput improvement:
Optimization |
Impact |
Workload |
|---|---|---|
Rust URL Decoding |
~3-5% |
Routes with dynamic parameters |
Lazy Header Access |
~2-3% |
Endpoints not using headers |
msgspec Decoder Caching |
~2-3% |
JSON endpoints with msgspec models |
Lazy Query Parsing |
~1-2% |
Routes without query params |
Combined Effect |
3-8% |
Typical web applications |
These optimizations are automatic - no code changes required to benefit from them! The gains are modest but measurable, and more importantly, these changes maintain code simplicity and debuggability.
Rust Components¶
Gobstopper includes optional Rust components that provide significant performance improvements. These are automatically detected and used when available.
Rust Router¶
The Rust router provides 2-3x faster path matching compared to the Python implementation.
Installation:
# Install with Rust components
cd rust/gobstopper_core_rs
cargo build --release
# Or install from project root
uv sync --extra rust
Verification:
from gobstopper import Gobstopper
app = Gobstopper(__name__)
# Check if Rust router is active
if hasattr(app, '_use_rust_router') and app._use_rust_router:
print("✅ Using Rust router")
else:
print("⚠️ Using Python router")
Performance Impact:
Path matching: 2-3x faster
Parameter extraction: 1.5-2x faster
Route registration: Similar to Python
Best for: Applications with many routes (50+)
Rust Template Engine¶
The Rust template engine (Tera) is compatible with Jinja2 templates and offers:
3-5x faster rendering for large templates
Lower memory usage
Built-in template caching
Setup:
from gobstopper import Gobstopper
app = Gobstopper(__name__, use_rust_templates=True)
# Verify Rust templates are active
if app.template_engine and 'Rust' in str(type(app.template_engine)):
print("✅ Using Rust templates")
When to Use:
Complex templates with many loops/conditionals
High-traffic template rendering
Applications serving many concurrent users
Memory-constrained environments
Limitations:
Some advanced Jinja2 features may not be available
Custom filters must be registered differently
Template debugging is less detailed
Rust Static File Middleware¶
Optimized static file serving with better performance characteristics:
from gobstopper.middleware import RustStaticFileMiddleware
app = Gobstopper(__name__)
app.add_middleware(RustStaticFileMiddleware(
directory='./static',
url_prefix='/static',
cache_max_age=31536000 # 1 year for hashed assets
))
Benefits:
2-4x faster file serving
Better handling of concurrent requests
Lower memory footprint
Automatic content type detection
Template Engine Optimization¶
Template Caching¶
Both Jinja2 and Tera engines cache compiled templates by default:
from gobstopper import Gobstopper
app = Gobstopper(__name__)
# Jinja2 caching (default)
app.template_engine.env.auto_reload = False # Disable reload in production
app.template_engine.env.cache_size = 400 # Increase cache size
# Monitor cache hits
print(f"Cache info: {app.template_engine.env.cache}")
Template Best Practices¶
1. Minimize Template Complexity
<!-- ❌ Slow: Complex loop with nested conditionals -->
{% for user in users %}
{% if user.active %}
{% for post in user.posts %}
{% if post.published %}
<div>{{ post.title }}</div>
{% endif %}
{% endfor %}
{% endif %}
{% endfor %}
<!-- ✅ Fast: Pre-filter data in Python -->
{% for post in published_posts %}
<div>{{ post.title }}</div>
{% endfor %}
2. Use Template Inheritance Efficiently
<!-- base.html: Keep base templates lean -->
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}Default{% endblock %}</title>
{% block head %}{% endblock %}
</head>
<body>
{% block content %}{% endblock %}
</body>
</html>
<!-- page.html: Extend efficiently -->
{% extends "base.html" %}
{% block title %}{{ page_title }}{% endblock %}
{% block content %}
{{ content | safe }}
{% endblock %}
3. Avoid Heavy Filters in Templates
# ❌ Slow: Complex filter in template
@app.template_filter('format_markdown')
def format_markdown(text):
return markdown.markdown(text, extensions=['tables', 'fenced_code'])
# ✅ Fast: Pre-process in view
@app.get('/post/<int:id>')
async def show_post(request, id: int):
post = await get_post(id)
post.html_content = markdown.markdown(post.content) # Pre-process
return await app.render_template('post.html', post=post)
Template Streaming¶
For large pages, use streaming templates to improve perceived performance:
from gobstopper.http.response import StreamResponse
@app.get('/large-report')
async def large_report(request):
async def generate():
# Yield header immediately
yield await app.render_template_partial('header.html')
# Stream data as it becomes available
async for chunk in get_report_data():
yield await app.render_template_partial('row.html', data=chunk)
# Yield footer
yield await app.render_template_partial('footer.html')
return StreamResponse(generate(), content_type='text/html')
Static File Serving¶
Production Static File Strategy¶
Use a CDN or dedicated static server in production:
# Development: Serve locally
if os.getenv('ENV') != 'production':
from gobstopper.middleware import RustStaticFileMiddleware
app.add_middleware(RustStaticFileMiddleware(
directory='./static',
url_prefix='/static'
))
else:
# Production: Use CDN URLs
app.static_url_prefix = 'https://cdn.example.com/static'
Asset Optimization¶
1. File Compression
Pre-compress static assets:
# Compress CSS/JS with gzip and brotli
find static -type f \( -name "*.css" -o -name "*.js" \) -exec gzip -k {} \;
find static -type f \( -name "*.css" -o -name "*.js" \) -exec brotli {} \;
Configure middleware to serve pre-compressed files:
from gobstopper.middleware import RustStaticFileMiddleware
app.add_middleware(RustStaticFileMiddleware(
directory='./static',
url_prefix='/static',
precompressed=True # Serve .br and .gz files if available
))
2. Asset Versioning
Use content hashing for cache busting:
import hashlib
from pathlib import Path
def hash_static_file(filepath: str) -> str:
"""Generate hash for static file."""
with open(filepath, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()[:8]
# Build asset manifest
static_dir = Path('./static')
assets = {}
for file in static_dir.rglob('*'):
if file.is_file():
rel_path = str(file.relative_to(static_dir))
file_hash = hash_static_file(file)
name, ext = rel_path.rsplit('.', 1)
hashed_name = f"{name}.{file_hash}.{ext}"
assets[rel_path] = hashed_name
# Use in templates
@app.context_processor
async def inject_assets():
return {'asset': lambda path: f"/static/{assets.get(path, path)}"}
Template usage:
<link rel="stylesheet" href="{{ asset('css/main.css') }}">
<script src="{{ asset('js/app.js') }}"></script>
3. Cache Headers
Set appropriate cache headers:
from gobstopper.middleware import RustStaticFileMiddleware
app.add_middleware(RustStaticFileMiddleware(
directory='./static',
url_prefix='/static',
cache_max_age=31536000, # 1 year for versioned assets
immutable=True # Assets never change
))
Database Optimization¶
Connection Pooling¶
Use connection pools for database access:
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, dsn: str, min_size: int = 10, max_size: int = 20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
async def close(self):
if self.pool:
await self.pool.close()
@asynccontextmanager
async def acquire(self):
async with self.pool.acquire() as conn:
yield conn
# Application setup
db = DatabasePool('postgresql://user:pass@localhost/db')
@app.on_startup
async def startup():
await db.connect()
@app.on_shutdown
async def shutdown():
await db.close()
# Usage in routes
@app.get('/users/<int:id>')
async def get_user(request, id: int):
async with db.acquire() as conn:
user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', id)
return JSONResponse(dict(user))
Query Optimization¶
1. Use Prepared Statements
# ❌ Slow: New query each time
async def get_user(user_id: int):
async with db.acquire() as conn:
return await conn.fetchrow(
f'SELECT * FROM users WHERE id = {user_id}' # SQL injection risk!
)
# ✅ Fast: Prepared statement
async def get_user(user_id: int):
async with db.acquire() as conn:
return await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id # Safe and cached
)
2. Batch Queries
# ❌ Slow: N+1 queries
async def get_users_with_posts(user_ids: list[int]):
users = []
async with db.acquire() as conn:
for user_id in user_ids:
user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
posts = await conn.fetch('SELECT * FROM posts WHERE user_id = $1', user_id)
users.append({'user': user, 'posts': posts})
return users
# ✅ Fast: Batch query with JOIN
async def get_users_with_posts(user_ids: list[int]):
async with db.acquire() as conn:
rows = await conn.fetch('''
SELECT u.*, p.id as post_id, p.title, p.content
FROM users u
LEFT JOIN posts p ON p.user_id = u.id
WHERE u.id = ANY($1)
''', user_ids)
# Group results in Python
return group_by_user(rows)
3. Use Indexes
-- Create indexes for frequently queried columns
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_user_id ON posts(user_id);
CREATE INDEX idx_posts_created_at ON posts(created_at DESC);
-- Composite indexes for multi-column queries
CREATE INDEX idx_posts_user_status ON posts(user_id, status);
Database Query Caching¶
Cache expensive query results:
from functools import lru_cache
import asyncio
# Simple in-memory cache with TTL
cache = {}
cache_timestamps = {}
CACHE_TTL = 60 # 60 seconds
async def cached_query(key: str, query_func, ttl: int = CACHE_TTL):
"""Cache query results with TTL."""
now = asyncio.get_event_loop().time()
if key in cache:
if now - cache_timestamps[key] < ttl:
return cache[key]
result = await query_func()
cache[key] = result
cache_timestamps[key] = now
return result
# Usage
@app.get('/stats/users')
async def user_stats(request):
async def get_stats():
async with db.acquire() as conn:
return await conn.fetchrow('SELECT COUNT(*) as total FROM users')
stats = await cached_query('user_stats', get_stats, ttl=300)
return JSONResponse(dict(stats))
Caching Strategies¶
Response Caching¶
Cache entire responses for read-heavy endpoints:
from functools import wraps
import hashlib
import json
response_cache = {}
def cache_response(ttl: int = 60):
"""Decorator to cache responses."""
def decorator(func):
@wraps(func)
async def wrapper(request, *args, **kwargs):
# Create cache key from route and args
cache_key = hashlib.md5(
f"{request.path}:{json.dumps(kwargs)}".encode()
).hexdigest()
# Check cache
if cache_key in response_cache:
cached, timestamp = response_cache[cache_key]
if asyncio.get_event_loop().time() - timestamp < ttl:
return cached
# Generate response
response = await func(request, *args, **kwargs)
# Cache it
response_cache[cache_key] = (response, asyncio.get_event_loop().time())
return response
return wrapper
return decorator
# Usage
@app.get('/api/popular-posts')
@cache_response(ttl=300) # Cache for 5 minutes
async def popular_posts(request):
posts = await get_popular_posts()
return JSONResponse(posts)
Redis Caching¶
For distributed caching across multiple servers:
import aioredis
import json
class RedisCache:
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
async def connect(self):
self.redis = await aioredis.from_url(self.redis_url)
async def get(self, key: str):
value = await self.redis.get(key)
return json.loads(value) if value else None
async def set(self, key: str, value, ttl: int = 60):
await self.redis.setex(key, ttl, json.dumps(value))
async def delete(self, key: str):
await self.redis.delete(key)
# Setup
cache = RedisCache('redis://localhost:6379/0')
@app.on_startup
async def startup():
await cache.connect()
# Usage
@app.get('/api/user/<int:id>')
async def get_user(request, id: int):
cache_key = f"user:{id}"
# Try cache first
user = await cache.get(cache_key)
if user:
return JSONResponse(user)
# Fetch from database
async with db.acquire() as conn:
user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', id)
user = dict(user)
# Cache for 5 minutes
await cache.set(cache_key, user, ttl=300)
return JSONResponse(user)
Background Tasks¶
Task Queue Optimization¶
Configure the task queue for optimal performance:
from gobstopper import Gobstopper
from pathlib import Path
app = Gobstopper(__name__)
# Configure task storage
app.task_storage.cleanup_interval = 3600 # Cleanup every hour
app.task_storage.retention_days = 7 # Keep completed tasks for 7 days
# Start workers with concurrency
@app.on_startup
async def startup():
# Start multiple workers for CPU-intensive tasks
await app.start_workers(
num_workers=4, # 4 concurrent workers
categories=['processing', 'heavy']
)
# Start dedicated worker for I/O tasks
await app.start_workers(
num_workers=8, # More workers for I/O
categories=['email', 'api']
)
Task Priorities¶
Use priorities for critical tasks:
from gobstopper.tasks.models import TaskPriority
# High priority task
@app.task(name='send_verification_email', category='email', priority=TaskPriority.HIGH)
async def send_verification_email(user_id: int, email: str):
await send_email(email, 'Verify your account', '...')
# Normal priority task
@app.task(name='generate_report', category='processing', priority=TaskPriority.NORMAL)
async def generate_report(report_id: int):
await generate_pdf_report(report_id)
# Low priority task
@app.task(name='cleanup_old_data', category='maintenance', priority=TaskPriority.LOW)
async def cleanup_old_data():
await delete_old_records()
Batch Processing¶
Process multiple items in batches:
@app.task(name='process_batch', category='processing')
async def process_batch(item_ids: list[int]):
"""Process multiple items at once."""
async with db.acquire() as conn:
# Fetch all items in one query
items = await conn.fetch(
'SELECT * FROM items WHERE id = ANY($1)',
item_ids
)
# Process batch
for item in items:
await process_item(item)
# Queue batches instead of individual items
@app.post('/api/process')
async def queue_processing(request):
data = await request.json()
item_ids = data['item_ids']
# Split into batches of 100
batch_size = 100
for i in range(0, len(item_ids), batch_size):
batch = item_ids[i:i + batch_size]
await app.queue_task('process_batch', item_ids=batch)
return JSONResponse({'queued': len(item_ids)})
Request/Response Optimization¶
Automatic Optimizations (v0.3.6+)¶
Gobstopper 0.3.6 includes several automatic optimizations that improve performance without any code changes:
Lazy Header Access¶
Headers are no longer parsed and lowercased on every request. Instead, they’re computed only when accessed:
@app.get('/api/data')
async def get_data(request):
# If you don't access request.headers, no processing occurs
# This saves ~2-3% overhead for simple endpoints
return JSONResponse({'data': 'value'})
@app.get('/api/auth')
async def check_auth(request):
# Headers are parsed only when needed
token = request.headers.get('authorization') # Lazy computation happens here
return JSONResponse({'authenticated': bool(token)})
Best Practice: Only access headers when necessary to maximize this optimization.
msgspec Decoder Caching¶
JSON decoders for msgspec.Struct models are cached on the model class:
import msgspec
class User(msgspec.Struct):
name: str
email: str
age: int
@app.post('/api/users')
async def create_user(request: Request, user: User):
# First request: decoder created and cached on User class
# Subsequent requests: cached decoder reused (~2-3% faster)
return JSONResponse({'created': user.name})
No configuration needed - caching happens automatically.
Lazy Query String Parsing¶
Query parameters are parsed only when request.args is accessed:
@app.get('/api/search')
async def search(request):
# If you don't use query params, parsing is skipped
# For requests without query strings, this is a free optimization
query = request.args.get('q', [''])[0] if request.args else ''
return JSONResponse({'query': query})
Rust-Level URL Decoding¶
URL decoding moved to Rust router for native-speed processing:
# URL with encoded characters: /api/users/John%20Doe
@app.get('/api/users/<name>')
async def get_user(request, name: str):
# 'name' is decoded at Rust level (3-5% faster)
# You receive: name = "John Doe"
return JSONResponse({'user': name})
JSON Serialization¶
Use fast JSON serializers:
# Option 1: msgspec (fastest)
import msgspec
@app.get('/api/data')
async def get_data(request):
data = {'items': [...]} # Large dataset
json_bytes = msgspec.json.encode(data)
return Response(json_bytes, content_type='application/json')
# Option 2: orjson (very fast)
import orjson
@app.get('/api/data')
async def get_data(request):
data = {'items': [...]}
json_bytes = orjson.dumps(data)
return Response(json_bytes, content_type='application/json')
Response Compression¶
Enable compression for text responses:
from gobstopper.middleware import CompressionMiddleware
app.add_middleware(CompressionMiddleware(
minimum_size=1000, # Only compress responses > 1KB
compression_level=6 # Balance between speed and size
))
Request Body Parsing¶
Parse request bodies efficiently:
# ❌ Slow: Multiple awaits
@app.post('/api/users')
async def create_user(request):
json_data = await request.json()
form_data = await request.get_form() # Never used
# ...
# ✅ Fast: Parse only what you need
@app.post('/api/users')
async def create_user(request):
json_data = await request.json() # Only parse JSON
# ...
Granian Server Configuration¶
Optimize Granian server settings for production:
# Development
granian --interface rsgi --reload example_app:app
# Production: Multiple workers
granian --interface rsgi \
--workers 4 \
--threads 2 \
--blocking-threads 2 \
--backlog 2048 \
example_app:app
# Production: Optimized for high concurrency
granian --interface rsgi \
--workers $(nproc) \
--threads 4 \
--blocking-threads 4 \
--backlog 4096 \
--http1-buffer-size 8192 \
--http1-keep-alive \
--http2-max-concurrent-streams 1000 \
example_app:app
Configuration guidelines:
Workers:
num_cpu_coresornum_cpu_cores + 1Threads: 2-4 per worker for mixed I/O and CPU work
Blocking Threads: Match threads for blocking operations
Backlog: 2048-4096 for high-traffic sites
Keep-Alive: Enable for persistent connections
Worker Strategy¶
Choose worker strategy based on workload:
# CPU-intensive: More workers, fewer threads
granian --interface rsgi --workers 8 --threads 1 app:app
# I/O-intensive: Fewer workers, more threads
granian --interface rsgi --workers 2 --threads 8 app:app
# Mixed workload: Balanced (recommended)
granian --interface rsgi --workers 4 --threads 2 app:app
Monitoring & Profiling¶
Application Profiling¶
Profile your application to identify bottlenecks:
import cProfile
import pstats
from pstats import SortKey
def profile_route():
"""Profile a specific route."""
profiler = cProfile.Profile()
profiler.enable()
# Your code here
response = await some_route(request)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats(SortKey.TIME)
stats.print_stats(20) # Top 20 functions
# Or use middleware for profiling
@app.middleware
async def profiling_middleware(request, call_next):
if request.path.startswith('/profile/'):
profiler = cProfile.Profile()
profiler.enable()
response = await call_next(request)
profiler.disable()
# Save stats
profiler.dump_stats(f'profile_{request.path.replace("/", "_")}.prof')
return response
return await call_next(request)
Performance Metrics¶
Track key metrics:
import time
from collections import defaultdict
metrics = {
'request_count': defaultdict(int),
'request_duration': defaultdict(list),
'error_count': defaultdict(int)
}
@app.middleware
async def metrics_middleware(request, call_next):
start_time = time.time()
try:
response = await call_next(request)
metrics['request_count'][request.path] += 1
duration = time.time() - start_time
metrics['request_duration'][request.path].append(duration)
return response
except Exception as e:
metrics['error_count'][request.path] += 1
raise
# Expose metrics endpoint
@app.get('/metrics')
async def get_metrics(request):
stats = {}
for path, durations in metrics['request_duration'].items():
stats[path] = {
'count': metrics['request_count'][path],
'avg_duration': sum(durations) / len(durations),
'max_duration': max(durations),
'errors': metrics['error_count'][path]
}
return JSONResponse(stats)
Load Testing¶
Use load testing tools to validate performance:
# wrk: HTTP benchmarking
wrk -t12 -c400 -d30s http://localhost:8000/api/data
# Apache Bench
ab -n 10000 -c 100 http://localhost:8000/
# Locust: Python-based load testing
locust -f locustfile.py --host=http://localhost:8000
Example Locust file:
from locust import HttpUser, task, between
class WebsiteUser(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_homepage(self):
self.client.get("/")
@task(2)
def get_api_data(self):
self.client.get("/api/data")
@task(1)
def post_data(self):
self.client.post("/api/users", json={
"name": "Test User",
"email": "test@example.com"
})
Production Deployment¶
Optimization Checklist¶
[ ] Enable Rust components (router, templates, static files)
[ ] Configure database connection pooling
[ ] Set up Redis for distributed caching
[ ] Enable response compression
[ ] Use CDN for static assets
[ ] Configure proper cache headers
[ ] Enable HTTP/2 in reverse proxy
[ ] Set up multiple Granian workers
[ ] Implement response caching for read-heavy endpoints
[ ] Profile and optimize slow queries
[ ] Monitor application metrics
[ ] Set up load balancing
[ ] Enable database query result caching
[ ] Optimize template rendering
[ ] Use background tasks for heavy operations
Deployment Architecture¶
Recommended production setup:
Internet
↓
Load Balancer (AWS ELB, Nginx)
↓
┌─────────────────────────────────┐
│ Reverse Proxy (Nginx/Caddy) │
│ - TLS termination │
│ - Static file serving │
│ - Gzip/Brotli compression │
│ - Rate limiting │
└─────────────────────────────────┘
↓
┌─────────────────────────────────┐
│ Granian Workers (4-8) │
│ - Gobstopper application │
│ - Rust components enabled │
│ - Connection pooling │
└─────────────────────────────────┘
↓ ↓
┌──────────────┐ ┌──────────────┐
│ PostgreSQL │ │ Redis Cache │
│ (Primary) │ │ │
└──────────────┘ └──────────────┘
Nginx Configuration¶
upstream gobstopper_app {
least_conn;
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
}
server {
listen 443 ssl http2;
server_name example.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
# Gzip compression
gzip on;
gzip_types text/plain text/css application/json application/javascript;
gzip_min_length 1000;
# Static files
location /static/ {
alias /var/www/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
# Application
location / {
proxy_pass http://gobstopper_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# Buffer settings
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
}
Benchmarking Results¶
Test Environment¶
Hardware: 8 CPU cores, 16GB RAM
Database: PostgreSQL 15 with connection pool (20 connections)
Configuration: 4 Granian workers, Rust components enabled
Results¶
Test Type |
Requests/sec |
Latency (avg) |
Latency (p99) |
|---|---|---|---|
Plaintext |
18,241 |
2.1ms |
5.2ms |
JSON |
17,486 |
2.3ms |
6.1ms |
Single Query |
14,203 |
2.8ms |
8.4ms |
Multiple Queries (20) |
5,523 |
7.2ms |
18.1ms |
Updates (5) |
7,741 |
5.1ms |
14.2ms |
Templates |
12,004 |
3.3ms |
9.7ms |
Comparison with Other Frameworks¶
Gobstopper performance is competitive with major Python async frameworks:
FastAPI: Similar performance for JSON serialization
Starlette: Comparable baseline performance
BlackSheep: Similar template rendering speed
Sanic: Gobstopper typically 10-15% faster with Rust components
Continuous Performance Testing¶
Set up automated benchmarks:
#!/bin/bash
# benchmark.sh - Run performance tests
echo "Running Gobstopper benchmarks..."
# Start application
granian --interface rsgi --workers 4 benchmark_simple:app &
APP_PID=$!
sleep 2
# Run tests
wrk -t4 -c100 -d30s http://localhost:8000/json > results_json.txt
wrk -t4 -c100 -d30s http://localhost:8000/db > results_db.txt
# Stop application
kill $APP_PID
# Parse and compare results
python compare_benchmarks.py results_json.txt results_db.txt
Conclusion¶
Gobstopper provides excellent performance out of the box, but following these optimization strategies can push performance even higher:
Enable Rust components for 2-4x performance gains
Use connection pooling for database access
Implement caching at multiple levels
Optimize database queries with proper indexes
Configure Granian with appropriate worker settings
Monitor and profile to identify bottlenecks
Use CDN for static assets in production
For most applications, these optimizations will achieve 15,000+ RPS with sub-5ms latency.