API Reference¶
This section contains the complete API reference for Gobstopper framework.
Core Application¶
Main Gobstopper application class
- class gobstopper.core.app.Gobstopper(name: str = 'gobstopper.core.app', debug: bool = False, slash_policy: str = 'strict')[source]¶
Bases:
object- __init__(name: str = 'gobstopper.core.app', debug: bool = False, slash_policy: str = 'strict')[source]¶
- init_templates(template_folder: str = 'templates', use_rust: bool = None, **kwargs)[source]¶
Initialize template engine with Jinja2 or Rust backend.
Configures the template rendering system with either the Python Jinja2 engine or the high-performance Rust template engine. The Rust engine provides significant performance improvements and supports streaming template rendering for large datasets.
- Parameters:
template_folder – Directory path containing template files. Must be relative to application root or absolute path.
use_rust – Template engine selection: -
None: Auto-detect (uses Rust if available) -True: Force Rust engine (raises ImportError if unavailable) -False: Force Python Jinja2 engine**kwargs – Additional template engine options: - auto_reload (bool): Watch templates for changes (default: True) - cache_size (int): Template cache size (default: 400) - enable_streaming (bool): Enable streaming rendering (Rust only) - enable_hot_reload (bool): Hot reload templates (Rust only)
- Raises:
ImportError – If Jinja2 is not installed or Rust engine requested but unavailable
FileNotFoundError – If template_folder does not exist
Examples
Auto-detection (recommended):
>>> app.init_templates("templates") # Uses Rust if available
Force specific engine:
>>> app.init_templates("templates", use_rust=True) # Rust only >>> app.init_templates("templates", use_rust=False) # Jinja2 only
With custom options:
>>> app.init_templates( ... "templates", ... auto_reload=False, # Disable file watching ... cache_size=1000, # Larger cache ... enable_streaming=True # Rust streaming (if available) ... )
Note
Must be called before using
render_template(). Rust engine provides 2-5x performance improvement over Jinja2.See also
render_template(): Render templatescontext_processor(): Add global template context
- add_middleware(middleware: Callable[[Request, Callable[[Request], Awaitable[Any]]], Awaitable[Response]], priority: int = 0)[source]¶
Add middleware to the application with priority-based execution order.
Registers middleware functions that intercept HTTP requests and responses. Middleware executes in priority order (highest first) and can modify requests, responses, or short-circuit the handler chain. This is the primary mechanism for cross-cutting concerns like authentication, logging, and request preprocessing.
- Parameters:
middleware – Callable accepting (request, next_handler) and returning Response. Can be sync or async. Must call next_handler(request) to continue the chain or return a Response directly to short-circuit.
priority – Execution priority (default: 0). Higher values execute first. Use negative priorities for post-processing middleware.
Examples
Basic middleware:
>>> async def auth_middleware(request, next_handler): ... if not request.headers.get('authorization'): ... return Response("Unauthorized", status=401) ... return await next_handler(request) >>> app.add_middleware(auth_middleware, priority=100)
Logging middleware:
>>> async def log_middleware(request, next_handler): ... start = time.time() ... response = await next_handler(request) ... duration = time.time() - start ... app.logger.info(f"{request.method} {request.path} - {duration:.3f}s") ... return response >>> app.add_middleware(log_middleware, priority=50)
Note
Middleware execution order: 1. Application-level (highest priority first) 2. Blueprint-level (outer to inner, by priority) 3. Route-level (as registered)
Middleware can be sync or async. The framework handles both transparently.
See also
register_blueprint(): Blueprint middleware compositiongobstopper.middleware.cors.CORSMiddleware: Built-in CORS middlewaregobstopper.middleware.security.SecurityMiddleware: Security headers
- route(path: str, methods: list[str] = None, name: str = None)[source]¶
Decorator to register HTTP routes with path parameters and method filtering.
Primary routing decorator that maps URL patterns to handler functions. Supports static paths, path parameters with optional type conversion, and multiple HTTP methods. This is the foundation of Gobstopper’s routing system.
- Parameters:
path – URL pattern to match. Supports formats: - Static:
"/users"- Path parameters:"/users/<user_id>"or"/users/<int:user_id>"- Multiple params:"/posts/<post_id>/comments/<comment_id>"Supported parameter types: str (default), int, float, uuid, path, datemethods – List of HTTP methods (default: [‘GET’]). Common values: GET, POST, PUT, DELETE, PATCH, OPTIONS
name – Optional name for reverse routing with url_for(). Defaults to function name.
- Returns:
Decorator function that registers the handler and returns it unchanged.
Examples
Simple GET route:
>>> @app.route("/") >>> async def home(request): ... return {"message": "Welcome"}
Multiple methods:
>>> @app.route("/api/data", methods=['GET', 'POST']) >>> async def data_handler(request): ... if request.method == 'GET': ... return {"data": [...]} ... return {"created": True}
Path parameters with type conversion:
>>> @app.route("/users/<int:user_id>") >>> async def get_user(request, user_id: int): ... # user_id is automatically converted to int ... return {"user_id": user_id}
Complex patterns:
>>> @app.route("/blog/<date:pub_date>/posts/<uuid:post_id>") >>> async def get_post(request, pub_date, post_id): ... return {"date": pub_date, "id": post_id}
Note
Route conflicts are detected and logged but do not prevent registration
Routes are matched in registration order when using Python router
Rust router provides O(1) lookup for static routes
Path parameters are automatically decoded from URL encoding
Type conversion errors return 400 Bad Request automatically
See also
get(): Convenience decorator for GET routespost(): Convenience decorator for POST routesput(): Convenience decorator for PUT routesdelete(): Convenience decorator for DELETE routeswebsocket(): WebSocket route registration
- get(path: str, name: str = None)[source]¶
Convenience decorator for registering GET routes.
Shorthand for
@app.route(path, methods=['GET']). Use for read-only operations that retrieve data without side effects.- Parameters:
path – URL pattern to match (same format as
route()).- Returns:
Decorator function that registers the GET handler.
Examples
>>> @app.get("/users") >>> async def list_users(request): ... return {"users": [...]}
See also
route(): Full routing documentation and path parameter syntax
- post(path: str, name: str = None)[source]¶
Convenience decorator for registering POST routes.
Shorthand for
@app.route(path, methods=['POST']). Use for creating resources or submitting data with side effects.- Parameters:
path – URL pattern to match (same format as
route()).- Returns:
Decorator function that registers the POST handler.
Examples
>>> @app.post("/users") >>> async def create_user(request): ... data = await request.json() ... return {"id": new_user_id}, 201
See also
route(): Full routing documentation
- put(path: str, name: str = None)[source]¶
Convenience decorator for registering PUT routes.
Shorthand for
@app.route(path, methods=['PUT']). Use for full resource updates (replacing entire resource).- Parameters:
path – URL pattern to match (same format as
route()).- Returns:
Decorator function that registers the PUT handler.
Examples
>>> @app.put("/users/<int:user_id>") >>> async def update_user(request, user_id: int): ... data = await request.json() ... return {"updated": True}
- delete(path: str, name: str = None)[source]¶
Convenience decorator for registering DELETE routes.
Shorthand for
@app.route(path, methods=['DELETE']). Use for removing resources.- Parameters:
path – URL pattern to match (same format as
route()).- Returns:
Decorator function that registers the DELETE handler.
Examples
>>> @app.delete("/users/<int:user_id>") >>> async def delete_user(request, user_id: int): ... return {"deleted": True}, 204
See also
route(): Full routing documentation
- patch(path: str, name: str = None)[source]¶
Convenience decorator for registering PATCH routes.
Shorthand for
@app.route(path, methods=['PATCH']). Use for partial resource updates (modifying specific fields).- Parameters:
path – URL pattern to match (same format as
route()).- Returns:
Decorator function that registers the PATCH handler.
Examples
>>> @app.patch("/users/<int:user_id>") >>> async def patch_user(request, user_id: int): ... data = await request.json() # Only changed fields ... return {"updated": True}
- options(path: str, name: str = None)[source]¶
Convenience decorator for registering OPTIONS routes.
Shorthand for
@app.route(path, methods=['OPTIONS']). Use for CORS preflight requests or capability discovery.- Parameters:
path – URL pattern to match (same format as
route()).- Returns:
Decorator function that registers the OPTIONS handler.
Examples
>>> @app.options("/api/data") >>> async def data_options(request): ... return Response("", headers={"Allow": "GET, POST"})
See also
route(): Full routing documentationgobstopper.middleware.cors.CORSMiddleware: Automatic CORS handling
- mount(path: str, app: Gobstopper)[source]¶
Mount a sub-application at the given path prefix.
Registers a complete Gobstopper application to handle requests under a specific path prefix. This enables modular application architecture where different subsystems can be developed independently and composed together.
- Parameters:
path – URL prefix for the sub-application. Must start with ‘/’. Trailing slashes are automatically normalized. Examples: “/api”, “/admin”, “/v1”
app – Gobstopper application instance to mount. All routes in the mounted app will be accessible under the specified prefix.
- Returns:
The mounted application instance (for chaining).
Examples
Basic mounting:
>>> # Create admin sub-application >>> admin_app = Gobstopper("admin") >>> @admin_app.get("/dashboard") >>> async def admin_dashboard(request): ... return {"admin": True} >>> >>> # Mount under /admin prefix >>> app.mount("/admin", admin_app) >>> # Now accessible at: /admin/dashboard
Multiple mounts:
>>> api_v1 = Gobstopper("api_v1") >>> api_v2 = Gobstopper("api_v2") >>> app.mount("/api/v1", api_v1) >>> app.mount("/api/v2", api_v2)
Mounting with separate configurations:
>>> debug_app = Gobstopper("debug", debug=True) >>> @debug_app.get("/info") >>> async def debug_info(request): ... return {"debug": True} >>> app.mount("/_debug", debug_app)
Note
Mounted apps maintain their own middleware, error handlers, and configuration
Path stripping is automatic: mounted app sees paths relative to mount point
Mount order matters: earlier mounts are checked first
Mount points should not overlap with main app routes
See also
register_blueprint(): Alternative for simpler route grouping
- register_blueprint(blueprint, url_prefix: str | None = None)[source]¶
Register a Blueprint on this app with an optional URL prefix.
Blueprints provide a structured way to organize related routes, middleware, and handlers into reusable components. They support nesting, scoped middleware, and can have their own template and static file directories.
- Parameters:
blueprint – Blueprint instance to register. The blueprint contains routes, middleware, hooks (before_request, after_request), and optional template/static folder configurations.
url_prefix – Optional URL prefix for all blueprint routes (default: None). If provided, overrides blueprint’s own url_prefix. Must start with ‘/’. Examples: “/api”, “/admin”, “/v1”
Examples
Basic blueprint registration:
>>> from gobstopper.core.blueprint import Blueprint >>> api = Blueprint("api", url_prefix="/api") >>> >>> @api.get("/users") >>> async def list_users(request): ... return {"users": [...]} >>> >>> app.register_blueprint(api) >>> # Route available at: /api/users
Override prefix at registration:
>>> admin_bp = Blueprint("admin", url_prefix="/admin") >>> app.register_blueprint(admin_bp, url_prefix="/dashboard") >>> # Routes use /dashboard instead of /admin
Nested blueprints:
>>> api = Blueprint("api", url_prefix="/api") >>> v1 = Blueprint("v1", url_prefix="/v1") >>> v1.get("/users")(user_handler) >>> api.register_blueprint(v1) >>> app.register_blueprint(api) >>> # Route available at: /api/v1/users
Blueprint with middleware:
>>> auth_bp = Blueprint("auth") >>> auth_bp.add_middleware(auth_middleware, priority=100) >>> @auth_bp.get("/profile") >>> async def profile(request): ... return {"user": request.user} >>> app.register_blueprint(auth_bp, url_prefix="/secure")
Note
Middleware execution order with blueprints: 1. Application-level middleware (highest priority first) 2. Parent blueprint middleware (outer to inner) 3. Child blueprint middleware 4. Route-level middleware (innermost)
Before/after request handlers from blueprints are attached to the app. Blueprint template folders are added to template search paths. Blueprint static folders automatically get static file middleware.
See also
gobstopper.core.blueprint.Blueprint: Blueprint class documentationadd_middleware(): Adding middleware to applicationsmount(): Alternative for mounting complete sub-applications
- websocket(path: str)[source]¶
Decorator for registering WebSocket routes with path parameters.
Registers WebSocket connection handlers that manage bidirectional communication channels. WebSocket routes support the same path parameter syntax as HTTP routes but use a different protocol lifecycle (connect, message exchange, disconnect).
- Parameters:
path – URL pattern to match. Supports same format as HTTP routes: - Static:
"/ws/chat"- Path parameters:"/ws/room/<room_id>"- Type conversion:"/ws/user/<int:user_id>"- Returns:
Decorator function that registers the WebSocket handler.
Examples
Basic WebSocket echo:
>>> @app.websocket("/ws/echo") >>> async def echo_handler(websocket): ... await websocket.accept() ... async for message in websocket: ... await websocket.send(f"Echo: {message}")
WebSocket with path parameters:
>>> @app.websocket("/ws/room/<room_id>") >>> async def room_handler(websocket, room_id: str): ... await websocket.accept() ... # Join room ... async for message in websocket: ... # Broadcast to room ... await broadcast_to_room(room_id, message)
WebSocket with authentication:
>>> @app.websocket("/ws/notifications") >>> async def notifications(websocket): ... token = websocket.query_params.get('token') ... if not validate_token(token): ... await websocket.close(code=1008) # Policy violation ... return ... await websocket.accept() ... # Send notifications ... while True: ... notification = await get_next_notification() ... await websocket.send_json(notification)
Note
WebSocket handlers must call
await websocket.accept()before communicationHandlers should handle connection cleanup (use try/finally)
Path parameters work identically to HTTP routes
WebSocket middleware is not yet supported (use before_request for auth)
Connection errors are logged automatically
See also
gobstopper.websocket.connection.WebSocket: WebSocket connection APIroute(): HTTP route registration for comparison
- url_for(name: str, **params) str[source]¶
Build a URL for a named route with parameters (reverse routing).
Flask/Quart-style reverse routing that generates URLs from route names. Routes are automatically named with their function name, or you can provide a custom name using the
nameparameter in route decorators. Blueprint routes are qualified with the blueprint name (e.g., ‘admin.login’).- Parameters:
name – Route name (function name, custom name, or ‘blueprint.function’)
**params – URL parameters to substitute into the route pattern
- Returns:
Generated URL path as a string
- Raises:
ValueError – If the named route doesn’t exist or parameters are missing
Examples
Basic usage:
>>> @app.get('/users/<int:id>', name='user_detail') >>> async def get_user(request): ... return {"user": ...} >>> >>> app.url_for('user_detail', id=123) '/users/123'
With multiple parameters:
>>> @app.get('/posts/<int:year>/<int:month>') >>> async def posts_archive(request): ... return {"posts": ...} >>> >>> app.url_for('posts_archive', year=2024, month=12) '/posts/2024/12'
Blueprint routes (Flask-style):
>>> admin = Blueprint('admin', __name__) >>> @admin.get('/login') >>> async def login(request): ... return {"login": True} >>> >>> app.register_blueprint(admin, url_prefix='/admin') >>> app.url_for('admin.login') # Returns '/admin/login' '/admin/login'
In request handlers with redirect:
>>> @app.post('/users') >>> async def create_user(request): ... new_id = save_user() ... return redirect(app.url_for('user_detail', id=new_id))
Note
Requires Rust router for best performance
Falls back to route scanning if Rust router unavailable
Route names default to function names
Custom names provided via decorator
nameparameter
See also
redirect(): Convenience function for redirecting to URLsroute(): How to register named routes
- task(name: str = None, category: str = 'default')[source]¶
Decorator to register background task handlers with categorization.
Registers async functions as background tasks that can be queued and executed asynchronously. Tasks are persisted to DuckDB, support retries, priority levels, and progress tracking. Tasks are organized by category for separate worker pools.
- Parameters:
name – Task identifier (default: function name). Used when queuing tasks via
add_background_task(). Must be unique within category.category – Task category for worker pool isolation (default: “default”). Tasks in different categories run in separate worker pools. Examples: “email”, “data_processing”, “notifications”
- Returns:
Decorator function that registers the task handler.
Examples
Basic task registration:
>>> @app.task("send_email", category="notifications") >>> async def send_email(to: str, subject: str, body: str): ... await email_client.send(to, subject, body) ... return {"sent": True}
Task with default name (uses function name):
>>> @app.task(category="data") >>> async def process_upload(file_path: str): ... data = await read_file(file_path) ... result = await process_data(data) ... return {"rows": len(result)}
Task with progress tracking:
>>> @app.task("import_data", category="imports") >>> async def import_large_dataset(source_url: str): ... total = await get_row_count(source_url) ... for i, batch in enumerate(fetch_batches(source_url)): ... await process_batch(batch) ... # Progress tracked automatically ... return {"imported": total}
Queuing registered tasks:
>>> # In a request handler >>> task_id = await app.add_background_task( ... "send_email", ... category="notifications", ... priority=TaskPriority.HIGH, ... to="user@example.com", ... subject="Welcome", ... body="Hello!" ... ) >>> return {"task_id": task_id}
Note
Tasks must be async functions
Task return values are stored and can be retrieved later
Failed tasks can be automatically retried based on max_retries
Tasks persist across application restarts (stored in DuckDB)
Each category needs worker processes started via
start_task_workers()
See also
add_background_task(): Queue tasks for executionstart_task_workers(): Start worker pools for task categoriesgobstopper.tasks.queue.TaskQueue: Task queue implementationgobstopper.tasks.queue.TaskPriority: Priority levels (LOW, NORMAL, HIGH, URGENT)
- before_request(func: Callable[[...], Any]) Callable[[...], Any][source]¶
Register a before-request handler that runs before each HTTP request.
Before-request handlers execute after middleware but before the route handler. They can perform request validation, inject request-scoped data, or short-circuit the request by returning a Response directly.
- Parameters:
func – Callable accepting (request) and optionally returning Response. Can be sync or async. If it returns a Response, the route handler is skipped and that response is returned immediately.
- Returns:
The handler function (for decorator chaining).
Examples
Request logging:
>>> @app.before_request >>> async def log_requests(request): ... app.logger.info(f"Request: {request.method} {request.path}") ... # No return - continues to handler
Authentication check:
>>> @app.before_request >>> async def require_auth(request): ... if not request.headers.get('authorization'): ... return Response("Unauthorized", status=401) ... # Auth successful - continues to handler
Inject request-scoped data:
>>> @app.before_request >>> async def add_user_context(request): ... token = request.headers.get('authorization') ... request.user = await validate_and_get_user(token)
Note
Runs for every HTTP request (not WebSocket)
Multiple handlers run in registration order
First handler to return a Response short-circuits the chain
Exceptions propagate and trigger error handlers
See also
after_request(): Post-processing hookadd_middleware(): Alternative for cross-cutting concerns
- after_request(func: Callable[[...], Any]) Callable[[...], Any][source]¶
Register an after-request handler that runs after each HTTP request.
After-request handlers execute after the route handler but before sending the response. They can modify responses, add headers, or perform logging. Must accept both request and response parameters.
- Parameters:
func – Callable accepting (request, response) and optionally returning modified Response. Can be sync or async. If it returns a Response, that replaces the original. If it returns None, original is used.
- Returns:
The handler function (for decorator chaining).
Examples
Add response headers:
>>> @app.after_request >>> async def add_headers(request, response): ... response.headers['X-Processed-By'] = 'Gobstopper' ... return response
Response logging:
>>> @app.after_request >>> async def log_response(request, response): ... app.logger.info(f"Response: {response.status}") ... # No return - original response used
Security headers:
>>> @app.after_request >>> def add_security_headers(request, response): ... response.headers['X-Content-Type-Options'] = 'nosniff' ... response.headers['X-Frame-Options'] = 'DENY' ... return response
Note
Runs for every successful HTTP request (not WebSocket)
Multiple handlers run in registration order
Each handler receives the response from the previous handler
Does not run if an exception occurs (use error handlers instead)
See also
before_request(): Pre-processing hookerror_handler(): Handle exceptions
- context_processor(func: Callable[[], dict]) Callable[[], dict][source]¶
Register a template context processor for global template variables.
Context processors provide variables that are automatically available in all rendered templates. They run before each template render and can inject dynamic global context like current user, configuration, or request data.
- Parameters:
func – Callable returning a dictionary of template variables. Can be sync or async. The returned dict is merged into every template’s context. Should be idempotent and fast.
- Returns:
The processor function (for decorator chaining).
Examples
Add global template variables:
>>> @app.context_processor >>> def inject_globals(): ... return { ... 'app_name': 'Gobstopper', ... 'version': '1.0.0', ... 'current_year': 2025 ... }
Inject current user:
>>> @app.context_processor >>> async def inject_user(): ... # Access request context if available ... return {'user': getattr(request, 'user', None)}
Configuration values:
>>> @app.context_processor >>> def inject_config(): ... return { ... 'debug': app.debug, ... 'site_name': os.getenv('SITE_NAME', 'Gobstopper App') ... }
Note
Runs before every template render
Multiple processors are merged in registration order
Later processors can override earlier ones
Should be fast - runs on every template render
Only affects templates, not JSON responses
See also
render_template(): Template renderinginit_templates(): Template engine initializationtemplate_filter(): Custom template filterstemplate_global(): Custom template globals
- template_filter(name: str = None)[source]¶
Decorator to register template filter
Note: When using the Rust template engine, custom filters are not supported. In that case this becomes a no-op to avoid noisy warnings.
- template_global(name: str = None)[source]¶
Decorator to register template global
Note: When using the Rust template engine, custom globals are not supported. In that case this becomes a no-op to avoid noisy warnings.
- error_handler(status_code: int)[source]¶
Decorator to register custom error handlers for HTTP status codes.
Registers handlers that customize error responses for specific HTTP status codes. Error handlers receive the request and exception, allowing for custom error pages, logging, or error tracking integration.
- Parameters:
status_code – HTTP status code to handle (e.g., 404, 500, 403, 429). Common codes: 400 (Bad Request), 401 (Unauthorized), 403 (Forbidden), 404 (Not Found), 500 (Internal Server Error).
- Returns:
Decorator function that registers the error handler.
Examples
Custom 404 page:
>>> @app.error_handler(404) >>> async def not_found(request, error): ... return await app.render_template( ... "errors/404.html", ... path=request.path ... )
Custom 500 with error tracking:
>>> @app.error_handler(500) >>> async def server_error(request, error): ... # Log to error tracking service ... await error_tracker.capture(error, request) ... return JSONResponse( ... {"error": "Internal server error", "request_id": request.id}, ... status=500 ... )
Rate limit exceeded:
>>> @app.error_handler(429) >>> async def rate_limited(request, error): ... return JSONResponse( ... {"error": "Too many requests", "retry_after": 60}, ... status=429, ... headers={"Retry-After": "60"} ... )
Unauthorized with custom message:
>>> @app.error_handler(401) >>> async def unauthorized(request, error): ... return JSONResponse( ... { ... "error": "Authentication required", ... "login_url": "/auth/login" ... }, ... status=401 ... )
Note
Default handlers for 404 and 500 use themed error pages
Error handlers run outside middleware chain
Exceptions in error handlers fall back to basic text responses
Error handlers can be sync or async
See also
before_request(): Pre-request validationgobstopper.http.response.Response: Response constructiongobstopper.http.response.JSONResponse: JSON error responses
- async render_template(template_name: str, stream: bool = False, **context) Response[source]¶
Render template with context data and return HTTP response.
Renders the specified template file with provided context variables, returning a properly formatted HTTP response. Supports both traditional rendering and progressive streaming for large datasets.
- Parameters:
template_name – Name of template file relative to template folder. Must include file extension (e.g., “page.html”, “email.txt”).
stream – Enable progressive rendering (Rust engine only): -
False: Traditional rendering - template fully rendered before response -True: Streaming rendering - template chunks sent as generated**context – Template context variables passed to template. Variables become available in template as
{{ variable_name }}.
- Returns:
HTTP response with rendered HTML content. StreamResponse: For streaming renders (when stream=True).
- Return type:
- Raises:
RuntimeError – If template engine not initialized with
init_templates()FileNotFoundError – If template file does not exist
TemplateRenderError – If template rendering fails (syntax errors, missing variables)
Examples
Basic template rendering:
>>> @app.get("/") >>> async def index(request): ... return await app.render_template("index.html", ... title="Home Page", ... user_name="Alice")
With complex context:
>>> @app.get("/dashboard") >>> async def dashboard(request): ... users = await get_users() ... return await app.render_template("dashboard.html", ... users=users, ... page_title="Dashboard")
Streaming large datasets (Rust only):
>>> @app.get("/report") >>> async def large_report(request): ... big_data = await get_large_dataset() ... return await app.render_template("report.html", ... stream=True, ... data=big_data)
Note
Context processors are automatically applied to provide global variables. Streaming requires Rust template engine and compatible templates.
See also
init_templates(): Initialize template systemcontext_processor(): Add global template contextgobstopper.http.Response: Response objects
- async add_background_task(name: str, category: str = 'default', priority: TaskPriority = TaskPriority.NORMAL, max_retries: int = 0, *args, **kwargs) str[source]¶
Add a background task to the queue for asynchronous execution.
Queues a registered task for execution by worker processes. Tasks are persisted to DuckDB immediately and executed based on priority and worker availability. Returns a task ID for tracking progress and retrieving results.
- Parameters:
name – Name of the registered task (as specified in
task()decorator).category – Task category (default: “default”). Must match task registration. Category determines which worker pool executes the task.
priority – Execution priority (default: TaskPriority.NORMAL). Options: TaskPriority.LOW, NORMAL, HIGH, URGENT. Higher priority tasks execute before lower priority.
max_retries – Maximum retry attempts on failure (default: 0). Task will be retried up to this many times if it raises an exception.
*args – Positional arguments passed to task handler.
**kwargs – Keyword arguments passed to task handler.
- Returns:
Task ID (UUID string) for tracking and result retrieval.
- Raises:
Examples
Queue a simple task:
>>> task_id = await app.add_background_task( ... "send_email", ... to="user@example.com", ... subject="Welcome", ... body="Hello!" ... ) >>> return {"task_id": task_id}
High-priority task with retries:
>>> task_id = await app.add_background_task( ... "process_payment", ... category="payments", ... priority=TaskPriority.HIGH, ... max_retries=3, ... payment_id=123, ... amount=99.99 ... )
Urgent task (process immediately):
>>> task_id = await app.add_background_task( ... "send_alert", ... category="notifications", ... priority=TaskPriority.URGENT, ... message="System alert", ... recipients=["admin@example.com"] ... )
Track task status:
>>> # In route handler >>> task_id = await app.add_background_task("long_process", data=data) >>> # Later, check status >>> task_info = await app.task_queue.get_task(task_id) >>> return { ... "status": task_info.status, ... "progress": task_info.progress, ... "result": task_info.result ... }
Note
Tasks execute asynchronously in separate worker processes
Task parameters are serialized to JSON (must be JSON-serializable)
Workers must be started via
start_task_workers()for executionTask status can be queried using the returned task_id
Failed tasks remain in database for inspection
See also
task(): Register task handlersstart_task_workers(): Start worker poolsgobstopper.tasks.queue.TaskPriority: Priority levelsgobstopper.tasks.queue.TaskQueue: Task queue operations
- async start_task_workers(category: str = 'default', worker_count: int = 1)[source]¶
Start background worker processes for task execution.
Launches worker processes that poll for and execute queued tasks in the specified category. Workers run continuously until application shutdown. Multiple workers can process tasks concurrently within the same category.
- Parameters:
category – Task category to process (default: “default”). Workers only execute tasks queued in this category. Must match category used in
task()decorator andadd_background_task().worker_count – Number of concurrent workers to start (default: 1). More workers enable parallel task processing. Consider CPU-bound vs I/O-bound tasks when sizing worker pools.
Examples
Start default workers:
>>> # In startup handler >>> @app.on_startup >>> async def start_workers(): ... await app.start_task_workers("default", worker_count=4)
Multiple categories with different worker counts:
>>> @app.on_startup >>> async def start_all_workers(): ... # I/O-bound tasks (email, API calls) ... await app.start_task_workers("notifications", worker_count=10) ... # CPU-bound tasks (image processing) ... await app.start_task_workers("processing", worker_count=2) ... # Critical tasks ... await app.start_task_workers("payments", worker_count=4)
Single worker for sequential processing:
>>> @app.on_startup >>> async def start_sequential_worker(): ... # Process tasks one at a time ... await app.start_task_workers("imports", worker_count=1)
Note
Workers run as background asyncio tasks
Each worker polls the database for pending tasks
Workers respect task priority (URGENT > HIGH > NORMAL > LOW)
Workers automatically retry failed tasks based on max_retries
Workers shut down gracefully on application shutdown
This should be called in
on_startup()handlers
See also
task(): Register task handlersadd_background_task(): Queue taskson_startup(): Application startup hooks
- on_startup(func)[source]¶
Decorator to register startup handlers that run once before first request.
Registers functions to execute during application initialization. Startup handlers run lazily on the first request (not at module import time), allowing for async resource initialization like database connections, cache warming, or worker startup.
- Parameters:
func – Callable to run at startup. Can be sync or async. Should handle its own exceptions or allow them to propagate to prevent startup completion.
- Returns:
The handler function (for decorator chaining).
Examples
Initialize database connection:
>>> @app.on_startup >>> async def init_database(): ... app.db = await create_db_pool(DATABASE_URL) ... app.logger.info("Database connected")
Start background workers:
>>> @app.on_startup >>> async def start_workers(): ... await app.start_task_workers("default", worker_count=4) ... await app.start_task_workers("email", worker_count=2)
Warm up caches:
>>> @app.on_startup >>> async def warm_cache(): ... app.config = await load_config() ... app.cache = await init_redis_cache()
Load ML models:
>>> @app.on_startup >>> def load_models(): ... app.ml_model = load_pretrained_model("model.pkl") ... app.logger.info("Model loaded")
Note
Handlers run once on first request, not at import time
Multiple handlers run in registration order
Startup is protected by asyncio.Lock for thread safety
Exceptions prevent startup completion and will be retried
All handlers must complete before first request proceeds
See also
on_shutdown(): Cleanup handlersstart_task_workers(): Starting background workers
- on_shutdown(func)[source]¶
Decorator to register shutdown handlers for graceful cleanup.
Registers functions to execute during application shutdown. Shutdown handlers run after the application stops accepting new requests and in-flight requests complete, allowing for resource cleanup like closing database connections, flushing caches, or stopping background workers.
- Parameters:
func – Callable to run at shutdown. Can be sync or async. Exceptions are logged but do not prevent other shutdown handlers from running.
- Returns:
The handler function (for decorator chaining).
Examples
Close database connections:
>>> @app.on_shutdown >>> async def close_database(): ... await app.db.close() ... app.logger.info("Database connection closed")
Flush caches and save state:
>>> @app.on_shutdown >>> async def save_state(): ... await app.cache.flush() ... await save_application_state() ... app.logger.info("State saved")
Stop external services:
>>> @app.on_shutdown >>> async def stop_services(): ... await app.websocket_manager.close_all() ... await app.metrics_client.close()
Cleanup temporary files:
>>> @app.on_shutdown >>> def cleanup_temp_files(): ... import shutil ... shutil.rmtree("/tmp/app_uploads", ignore_errors=True)
Note
Handlers run after server stops accepting new requests
In-flight requests are allowed to complete (with timeout)
Multiple handlers run in registration order
Exceptions in handlers are logged but don’t stop other handlers
Task queue shutdown happens automatically after custom handlers
Default shutdown timeout is 10 seconds (configurable via WOPR_SHUTDOWN_TIMEOUT)
See also
on_startup(): Initialization handlersshutdown(): Programmatic shutdown trigger
- visualize_routing()[source]¶
Print a detailed visualization of application routing structure.
Logs a comprehensive view of the routing hierarchy including mounts, blueprints, middleware ordering, and effective middleware chains for each route. Useful for debugging routing issues, understanding middleware execution order, and verifying blueprint composition.
Examples
Call during startup:
>>> @app.on_startup >>> def show_routes(): ... app.visualize_routing()
Output shows:
>>> # === Routing Visualization === >>> # Mount: /api -> api_v1 >>> # Blueprint: auth prefix=/auth >>> # MW(prio=100): auth_middleware >>> # Blueprint: admin prefix=/admin >>> # MW(prio=50): admin_check >>> # Route ['GET'] /api/users -> MW order: ['cors', 'auth', 'rate_limit'] >>> # === End Visualization ===
Note
Shows mounts, blueprints hierarchy, and middleware
Displays effective middleware order for each route
Middleware order reflects actual execution (app → blueprints → route)
Useful for debugging middleware composition issues
Output goes to application logger at INFO level
See also
register_blueprint(): Blueprint registrationmount(): Sub-application mountingadd_middleware(): Middleware registration
- async shutdown(timeout: float | None = None)[source]¶
Initiate graceful application shutdown with in-flight request handling.
Performs a graceful shutdown sequence: stops accepting new requests, waits for in-flight requests to complete (with timeout), runs shutdown hooks, and closes the task queue. This ensures no requests are abruptly terminated and resources are properly cleaned up.
- Parameters:
timeout – Maximum seconds to wait for in-flight requests (default: None). If None, reads from environment variable WOPR_SHUTDOWN_TIMEOUT (default: 10 seconds). After timeout, shutdown proceeds anyway.
Examples
Manual shutdown:
>>> # In a special endpoint >>> @app.post("/admin/shutdown") >>> async def trigger_shutdown(request): ... asyncio.create_task(app.shutdown(timeout=30)) ... return {"status": "shutting down"}
Signal handler:
>>> import signal >>> def handle_sigterm(signum, frame): ... asyncio.create_task(app.shutdown()) >>> signal.signal(signal.SIGTERM, handle_sigterm)
Custom timeout:
>>> await app.shutdown(timeout=60) # Wait up to 60s
Note
Shutdown sequence: 1. Stop accepting new requests (503 responses) 2. Wait for in-flight requests to complete (up to timeout) 3. Run registered shutdown hooks via
on_shutdown()4. Shutdown task queue and workers 5. Log any errors but continue shutdownEnvironment variable WOPR_SHUTDOWN_TIMEOUT (seconds) sets default timeout. In-flight request count is tracked automatically. Shutdown hooks run even if timeout expires.
See also
on_shutdown(): Register cleanup handlersstart_task_workers(): Background workers
- async __rsgi__(scope: Scope, protocol)[source]¶
RSGI application entry point called by Granian server.
Main entry point for the RSGI (Rust Server Gateway Interface) protocol. This method is called by the Granian server for each incoming connection and routes to the appropriate protocol handler (HTTP or WebSocket).
- Parameters:
scope – RSGI scope object containing request metadata. Expected to have a ‘proto’ field indicating protocol type (‘http’ or ‘ws’/’websocket’).
protocol – RSGI protocol object (HTTPProtocol or WebsocketProtocol) for managing the connection and sending responses.
Note
This is the application’s RSGI interface. The
__rsgi__name is required by the RSGI specification and is automatically detected by Granian when you run:granian --interface rsgi app:appProtocol dispatch: - ‘http’ →
_handle_http()- ‘ws’/’websocket’ →_handle_websocket()The scope is converted to a SimpleNamespace if provided as dict (for test client compatibility).
Examples
Run with Granian:
>>> # In your app.py >>> from gobstopper import Gobstopper >>> app = Gobstopper(__name__) >>> @app.get("/") >>> async def hello(request): ... return {"message": "Hello"}
Command line:
>>> # granian --interface rsgi app:app >>> # granian --interface rsgi --reload app:app # With hot reload
See also
_handle_http(): HTTP request handler_handle_websocket(): WebSocket connection handler
Blueprint system for grouping routes and related hooks.
Provides a Flask-like Blueprint API that allows organizing routes into reusable modules that can be registered on an application with an optional URL prefix.
- class gobstopper.core.blueprint.Blueprint(name: str, url_prefix: str | None = None, *, static_folder: str | None = None, template_folder: str | None = None)[source]¶
Bases:
objectA collection of routes and hooks that can be registered on a Gobstopper app.
- Parameters:
name – Identifier for the blueprint.
url_prefix – Optional URL prefix applied when registering on an app.
HTTP Components¶
Request¶
HTTP Request handling for Gobstopper framework
- class gobstopper.http.request.Request(scope: Scope, protocol: RSGIHTTPProtocol)[source]¶
Bases:
objectHTTP request object with async parsing capabilities.
Wraps RSGI request scope and protocol, providing convenient access to request data including headers, query parameters, form data, and JSON payloads. All parsing operations are async and lazy-loaded for optimal performance.
- Parameters:
scope – RSGI request scope containing request metadata (method, path, headers, etc.)
protocol – RSGI protocol instance for reading request body asynchronously
- scope¶
Original RSGI scope object with request metadata
- Type:
Scope
- protocol¶
RSGI protocol for async body reading
- Type:
HTTPProtocol
- logger¶
Application or framework logger instance for request-scoped logging
- Type:
Logger
Examples
Basic request handling:
>>> @app.post("/api/users") >>> async def create_user(request: Request): ... data = await request.json() ... user_ip = request.client_ip ... request.logger.info(f"User created from {user_ip}") ... return {"status": "created", "data": data}
Access query parameters:
>>> @app.get("/search") >>> async def search(request: Request): ... # URL: /search?q=python&page=2 ... query = request.args.get("q", [""])[0] ... page = int(request.args.get("page", ["1"])[0]) ... return {"query": query, "page": page}
Session management:
>>> @app.get("/profile") >>> async def profile(request: Request): ... if not request.session or not request.session.get("user_id"): ... return {"error": "Not authenticated"}, 401 ... user_id = request.session["user_id"] ... return {"user_id": user_id}
Note
Request parsing is lazy - methods like json(), form() only parse when called. Subsequent calls to the same parsing method return cached results. Body reading is async and may involve network I/O operations. Session attributes require SecurityMiddleware to be configured.
See also
Response: HTTP response objects JSONResponse: JSON response helper SecurityMiddleware: Provides session management
- scope¶
- protocol¶
- session¶
- endpoint¶
- view_args¶
- url_rule¶
- property client_ip: str¶
Get the client’s IP address with proxy header support.
Attempts to determine the real client IP address by checking the X-Forwarded-For header (for proxied requests) before falling back to the direct connection IP from the RSGI scope.
- Returns:
- Client IP address as string. Returns “unknown” if IP cannot
be determined from headers or scope.
- Return type:
Note
When behind a proxy or load balancer, X-Forwarded-For header is respected. Takes the first IP in the chain for multi-proxy setups. For direct connections, uses the client address from RSGI scope.
Examples
Log client IP for requests:
>>> @app.get("/api/endpoint") >>> async def handler(request: Request): ... ip = request.client_ip ... print(f"Request from: {ip}") ... return {"client": ip}
Rate limiting by IP:
>>> from gobstopper.utils.rate_limiter import RateLimiter >>> limiter = RateLimiter(max_requests=100, window=60) >>> >>> @app.post("/api/action") >>> async def protected(request: Request): ... if not limiter.check(request.client_ip): ... return {"error": "Rate limit exceeded"}, 429 ... return {"status": "success"}
- property method: str¶
Get the HTTP request method.
- Returns:
HTTP method in uppercase (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS, etc.)
- Return type:
Examples
Method-based routing logic:
>>> @app.route("/resource", methods=["GET", "POST"]) >>> async def handle_resource(request: Request): ... if request.method == "GET": ... return {"action": "retrieve"} ... elif request.method == "POST": ... data = await request.json() ... return {"action": "create", "data": data}
- property path: str¶
Get the request path without query string.
- Returns:
- URL path component (e.g., “/api/users/123”). Does not include
query string, fragment, or domain.
- Return type:
Examples
Path-based logic:
>>> @app.get("/*") >>> async def catch_all(request: Request): ... if request.path.startswith("/admin"): ... # Check admin permissions ... pass ... return {"path": request.path}
Extract path segments:
>>> @app.get("/api/*") >>> async def api_handler(request: Request): ... segments = request.path.strip("/").split("/") ... # segments = ["api", "users", "123"] ... return {"segments": segments}
- property query_string: str¶
Get the raw query string from the URL.
- Returns:
- Raw query string without leading ‘?’ (e.g., “page=1&limit=10”).
Empty string if no query parameters present.
- Return type:
Note
For parsed query parameters as a dict, use the
argsproperty instead.Examples
Get raw query string:
>>> @app.get("/search") >>> async def search(request: Request): ... # URL: /search?q=python&page=2 ... raw = request.query_string # "q=python&page=2" ... return {"raw_query": raw}
See also
args: Parsed query parameters as dict
- property args: dict[str, list[str]]¶
Get parsed query parameters as a dictionary.
Parses the query string into a dictionary mapping parameter names to lists of values. Supports multiple values for the same parameter name. Parsing is lazy-loaded and cached.
- Returns:
- Dictionary mapping parameter names to lists of values.
Empty dict if no query parameters present.
- Return type:
Examples
Single parameter values:
>>> @app.get("/search") >>> async def search(request: Request): ... # URL: /search?q=python&page=2 ... query = request.args.get("q", [""])[0] # "python" ... page = int(request.args.get("page", ["1"])[0]) # 2 ... return {"query": query, "page": page}
Multiple values for same parameter:
>>> @app.get("/filter") >>> async def filter_items(request: Request): ... # URL: /filter?tag=python&tag=web&tag=async ... tags = request.args.get("tag", []) # ["python", "web", "async"] ... return {"tags": tags}
Check if parameter exists:
>>> @app.get("/report") >>> async def report(request: Request): ... include_details = "details" in request.args ... return {"detailed": include_details}
Note
Returns lists even for single values to handle multiple values uniformly. Use request.args.get(“param”, [“default”])[0] to get single value with default. Parsing result is cached for subsequent access.
See also
query_string: Raw unparsed query string
- property headers: dict[str, str]¶
Get HTTP request headers with case-insensitive access.
Provides access to all HTTP headers sent with the request. Header names are normalized to lowercase for case-insensitive lookups. Headers are lazily computed only when accessed for optimal performance.
- Returns:
- Dictionary mapping lowercase header names to values.
Common headers include ‘content-type’, ‘authorization’, ‘user-agent’, ‘accept’, ‘cookie’, etc.
- Return type:
Examples
Check content type:
>>> @app.post("/upload") >>> async def upload(request: Request): ... content_type = request.headers.get("content-type", "") ... if content_type.startswith("application/json"): ... data = await request.json() ... elif content_type.startswith("multipart/form-data"): ... data = await request.multipart() ... return {"received": True}
Authentication header:
>>> @app.get("/protected") >>> async def protected(request: Request): ... auth = request.headers.get("authorization", "") ... if not auth.startswith("Bearer "): ... return {"error": "Unauthorized"}, 401 ... token = auth[7:] # Remove "Bearer " prefix ... # Validate token... ... return {"status": "authenticated"}
Custom headers:
>>> @app.get("/api/data") >>> async def api(request: Request): ... api_key = request.headers.get("x-api-key") ... if not api_key: ... return {"error": "API key required"}, 401 ... return {"data": "sensitive"}
Note
Header names are normalized to lowercase (e.g., “Content-Type” becomes “content-type”). Case-insensitive per HTTP specification (RFC 7230). Lazily computed only when accessed for optimal performance.
- property session_id: str | None¶
Get the current session ID if session middleware is enabled.
Returns the session identifier set by SecurityMiddleware. This is a read-only property - session IDs cannot be modified directly.
- Returns:
- Session ID string if SecurityMiddleware is enabled and
a session exists. None if no session middleware or no active session.
- Return type:
Optional[str]
Examples
Check if user has active session:
>>> @app.get("/dashboard") >>> async def dashboard(request: Request): ... if not request.session_id: ... return {"error": "No active session"}, 401 ... return {"session_id": request.session_id}
Logging with session tracking:
>>> @app.post("/action") >>> async def action(request: Request): ... session = request.session_id or "anonymous" ... request.logger.info(f"Action by session: {session}") ... return {"status": "success"}
Note
Requires SecurityMiddleware to be configured with session management. Session ID is typically a cryptographically secure random string. This is read-only - use request.session dict to store session data.
See also
session: Session data dictionary SecurityMiddleware: Session management middleware
- property logger: Logger¶
Get the application logger instance for request-scoped logging.
Returns the application’s configured logger if available, otherwise falls back to the framework’s default logger. Useful for logging request-specific events and errors.
- Returns:
Python logging.Logger instance for writing log messages.
- Return type:
Logger
Examples
Log request information:
>>> @app.post("/api/create") >>> async def create(request: Request): ... request.logger.info(f"Creating resource from {request.client_ip}") ... data = await request.json() ... request.logger.debug(f"Received data: {data}") ... return {"status": "created"}, 201
Error logging:
>>> @app.get("/api/data") >>> async def get_data(request: Request): ... try: ... result = await fetch_data() ... return {"data": result} ... except Exception as e: ... request.logger.error(f"Data fetch failed: {e}", exc_info=True) ... return {"error": "Internal error"}, 500
Conditional debug logging:
>>> @app.post("/api/process") >>> async def process(request: Request): ... if request.logger.isEnabledFor(logging.DEBUG): ... request.logger.debug(f"Headers: {request.headers}") ... return {"status": "processed"}
Note
Prefers application logger if set, falls back to framework logger. Logger is attached to request for convenient access in handlers.
See also
Tempest.logger: Application-level logger configuration
- property cookies: dict[str, str]¶
Get cookies from Cookie header as a dictionary.
Parses the Cookie header into a dictionary mapping cookie names to values. Parsing is lazy-loaded and cached. Provides Flask/Quart-style convenient cookie access without manual header parsing.
- Returns:
- Dictionary mapping cookie names to values.
Empty dict if no cookies present.
- Return type:
Examples
Access session cookie:
>>> @app.get("/dashboard") >>> async def dashboard(request: Request): ... session_id = request.cookies.get('session_id') ... if not session_id: ... return {"error": "Not authenticated"}, 401 ... return {"session": session_id}
Check for specific cookie:
>>> @app.get("/preferences") >>> async def preferences(request: Request): ... theme = request.cookies.get('theme', 'light') ... language = request.cookies.get('lang', 'en') ... return {"theme": theme, "language": language}
Multiple cookies:
>>> @app.get("/tracking") >>> async def tracking(request: Request): ... user_id = request.cookies.get('user_id') ... tracking_id = request.cookies.get('tracking_id') ... return {"user": user_id, "tracking": tracking_id}
Note
Cookies are parsed from the Cookie header using standard HTTP format. Multi-value cookies and cookie attributes (path, domain, etc.) are not included - only name=value pairs. Parsing result is cached for subsequent access.
See also
Response.set_cookie(): Set cookies in response Response.delete_cookie(): Delete cookies
- property is_json: bool¶
Check if request has JSON content type.
Convenience property for checking if Content-Type header indicates JSON. Useful for conditional parsing logic and content negotiation.
- Returns:
True if Content-Type is application/json, False otherwise.
- Return type:
Examples
Conditional JSON parsing:
>>> @app.post("/data") >>> async def submit_data(request: Request): ... if request.is_json: ... data = await request.get_json() ... return {"type": "json", "data": data} ... else: ... return {"error": "Expected JSON"}, 400
Content negotiation:
>>> @app.post("/process") >>> async def process(request: Request): ... if request.is_json: ... data = await request.get_json() ... elif 'form' in request.headers.get('content-type', ''): ... data = await request.get_form() ... else: ... return {"error": "Unsupported content type"}, 415 ... return {"received": True}
Note
Checks if Content-Type starts with ‘application/json’. Also matches ‘application/json; charset=utf-8’ and similar variants.
- property scheme: str¶
Get the URL scheme (http or https).
Determines the scheme from RSGI scope, respecting X-Forwarded-Proto header for proxied requests. Flask/Quart compatibility property.
- Returns:
‘http’ or ‘https’ depending on connection type.
- Return type:
Examples
Build URLs with correct scheme:
>>> @app.get("/redirect") >>> async def redirect_to_secure(request: Request): ... if request.scheme == 'http': ... # Redirect to HTTPS ... url = f"https://{request.host}{request.path}" ... return redirect(url, status=301) ... return {"secure": True}
Generate absolute URLs:
>>> @app.get("/share") >>> async def share(request: Request): ... full_url = f"{request.scheme}://{request.host}{request.path}" ... return {"share_url": full_url}
Note
Respects X-Forwarded-Proto header for proxy/load balancer scenarios. Defaults to scope.proto from RSGI.
- property host: str¶
port).
Returns the Host header value, which includes hostname and optionally port. Flask/Quart compatibility property.
- Returns:
Host header value (e.g., ‘localhost:8000’, ‘example.com’).
- Return type:
Examples
Get hostname:
>>> @app.get("/info") >>> async def info(request: Request): ... return {"host": request.host}
Domain-based routing:
>>> @app.get("/") >>> async def home(request: Request): ... if 'admin' in request.host: ... return {"site": "admin"} ... return {"site": "main"}
Note
Respects X-Forwarded-Host header for proxy scenarios. Includes port number if non-standard (e.g., ‘:8000’).
- Type:
Get the host header value (hostname
- property host_url: str¶
Get the base URL including scheme and host.
Constructs the base URL from scheme and host. Useful for building absolute URLs. Flask/Quart compatibility property.
- Returns:
Base URL (e.g., ‘http://localhost:8000’, ‘https://example.com’).
- Return type:
Examples
Build absolute URLs:
>>> @app.post("/users") >>> async def create_user(request: Request): ... user_id = await save_user() ... location = f"{request.host_url}/users/{user_id}" ... return {"location": location}, 201
API base URL:
>>> @app.get("/") >>> async def index(request: Request): ... return { ... "api_base": request.host_url, ... "docs_url": f"{request.host_url}/docs" ... }
- property base_url: str¶
Get the request URL without query string.
Constructs the full URL including scheme, host, and path, but excluding query string. Flask/Quart compatibility property.
- Returns:
Complete URL without query string (e.g., ‘http://localhost:8000/users/123’).
- Return type:
Examples
Get current page URL:
>>> @app.get("/article/<id>") >>> async def article(request, id): ... canonical_url = request.base_url ... return {"canonical": canonical_url}
Share URL without parameters:
>>> @app.get("/search") >>> async def search(request: Request): ... # URL: /search?q=python&page=2 ... share_url = request.base_url # Without ?q=python&page=2 ... return {"share": share_url}
- property url: str¶
Get the complete request URL including query string.
Constructs the full URL including scheme, host, path, and query string. Flask/Quart compatibility property.
- Returns:
Complete URL with query string (e.g., ‘http://localhost:8000/search?q=test’).
- Return type:
Examples
Get full URL:
>>> @app.get("/search") >>> async def search(request: Request): ... # URL: /search?q=python&page=2 ... full_url = request.url ... # Returns: 'http://localhost:8000/search?q=python&page=2' ... return {"url": full_url}
Logging:
>>> @app.before_request >>> async def log_request(request: Request): ... request.logger.info(f"Request: {request.url}")
- async get_data() bytes[source]¶
Get the raw request body as bytes with optional size enforcement.
Reads the complete request body from the RSGI protocol. Enforces maximum body size limit if configured via LimitsMiddleware. Body is lazy-loaded and cached for subsequent calls.
- Returns:
- Complete request body as raw bytes. Empty bytes object (b’’)
if no body sent with request.
- Return type:
- Raises:
RequestTooLarge – If body size exceeds configured max_body_bytes limit (typically set by LimitsMiddleware).
Examples
Read raw body:
>>> @app.post("/upload/raw") >>> async def upload_raw(request: Request): ... body = await request.get_data() ... size = len(body) ... return {"received_bytes": size}
Custom binary processing:
>>> @app.post("/image/process") >>> async def process_image(request: Request): ... image_data = await request.get_data() ... # Process binary image data... ... return {"processed": True}
Size limit handling:
>>> from gobstopper.http.errors import RequestTooLarge >>> >>> @app.post("/upload") >>> async def upload(request: Request): ... try: ... data = await request.get_data() ... return {"size": len(data)} ... except RequestTooLarge: ... return {"error": "File too large"}, 413
Note
Body reading is async and may involve network I/O. Result is cached - subsequent calls return same bytes object. Size limit enforced if max_body_bytes attribute set by middleware. For JSON/form data, prefer using
json()orform().See also
get_body(): Alias for backwards compatibilityjson(): Parse JSON bodyform(): Parse form body LimitsMiddleware: Configures body size limits
- async get_body() bytes[source]¶
Get the raw request body as bytes (alias for get_data).
Backwards-compatible alias for
get_data(). Provides the same functionality with identical behavior and caching.- Returns:
Complete request body as raw bytes.
- Return type:
Note
This is an alias maintained for backwards compatibility. New code should prefer using
get_data().See also
get_data(): Primary method for reading raw body
- async get_json() Any[source]¶
Parse request body as JSON asynchronously using msgspec.
Reads and parses the request body as JSON data. The parsing is lazy-loaded and cached - subsequent calls return the same parsed data.
- Returns:
Parsed JSON data as Python dict, list, or primitive type. Returns None for empty request body.
- Raises:
msgspec.DecodeError – If request body is not valid JSON.
Examples
Parse JSON payload:
>>> @app.post("/api/data") >>> async def handle_data(request: Request): ... try: ... data = await request.get_json() ... return {"received": data} ... except msgspec.DecodeError: ... return {"error": "Invalid JSON"}, 400
Note
Uses high-performance msgspec decoder. Result is cached for subsequent calls.
See also
get_form(): Parse form dataget_data(): Get raw request body
- async get_form() dict[str, list[str]][source]¶
Parse request body as form data asynchronously.
Reads and parses the request body as URL-encoded or multipart form data. Supports both application/x-www-form-urlencoded and multipart/form-data.
- Returns:
Dict mapping field names to lists of values. Multiple values with same name are preserved in lists. Empty dict for non-form requests or empty body.
- Raises:
ValueError – If form data is malformed
UnicodeDecodeError – If form data contains invalid UTF-8
Examples
- Handle form submission:
Use inside an async handler: form = await request.get_form() name = form.get(“name”, [“”])[0] message = form.get(“message”, [“”])[0]
- Handle multiple values:
# Form with multiple checkboxes: hobbies=reading&hobbies=coding hobbies = form.get(“hobbies”, []) # [“reading”, “coding”]
- Form validation:
- if not form.get(“email”):
return {“error”: “Email is required”}, 400
Note
Supports both application/x-www-form-urlencoded and multipart/form-data. For multipart requests, only returns form fields (not files). Result is cached for subsequent calls.
See also
get_json(): Parse JSON dataget_data(): Get raw request bodyget_files(): Get uploaded files from multipart formsargs: Query string parameters
- async get_files() dict[str, FileStorage][source]¶
Parse uploaded files from multipart/form-data request.
Extracts uploaded files from multipart/form-data requests. Returns a dictionary mapping field names to FileStorage objects. Flask/Quart compatibility method.
- Returns:
- Mapping of field names to uploaded files.
Empty dict if no files uploaded or not multipart request.
- Return type:
Dict[str, FileStorage]
Examples
Handle single file upload:
>>> @app.post('/upload') >>> async def upload(request: Request): ... files = await request.get_files() ... avatar = files.get('avatar') ... if avatar and avatar.filename: ... avatar.save(f'uploads/{avatar.filename}') ... return {"uploaded": avatar.filename} ... return {"error": "No file uploaded"}, 400
Handle multiple files:
>>> @app.post('/upload-multiple') >>> async def upload_multiple(request: Request): ... files = await request.get_files() ... uploaded = [] ... for field_name, file in files.items(): ... if file.filename: ... file.save(f'uploads/{file.filename}') ... uploaded.append(file.filename) ... return {"uploaded": uploaded}
With form fields:
>>> @app.post('/profile') >>> async def update_profile(request: Request): ... # Get form data ... body = await request.get_data() ... content_type = request.headers.get('content-type', '') ... from gobstopper.http.multipart import parse_multipart ... form, files = parse_multipart(body, content_type) ... ... # Access form fields ... name = form.get('name', [''])[0] ... ... # Access file ... avatar = files.get('avatar') ... if avatar: ... avatar.save(f'uploads/{name}_avatar.jpg') ... ... return {"name": name, "avatar": bool(avatar)}
Note
Requires Content-Type: multipart/form-data Files are loaded into memory - consider streaming for large files Result is cached for subsequent calls
See also
files: Property wrapper for this methodFileStorage: File upload wrappersecure_filename(): Sanitize filenames
- property files: dict[str, FileStorage]¶
Uploaded files from multipart/form-data request (async property).
Flask/Quart-style property for accessing uploaded files. This is an async property that must be awaited.
- Returns:
Uploaded files by field name
- Return type:
Dict[str, FileStorage]
Examples
Access uploaded file:
>>> @app.post('/upload') >>> async def upload(request: Request): ... files = await request.files ... avatar = files.get('avatar') ... if avatar: ... avatar.save('uploads/avatar.jpg') ... return {"uploaded": True}
Note
This is an async property - must use await request.files For compatibility, also available as await request.get_files()
See also
get_files(): Get uploaded files (method form)FileStorage: File storage class
- async multipart(model: type[Struct] | None = None, max_size: int | None = None) dict[str, list[str]] | Struct | None[source]¶
Parse multipart/form-data request body (text fields only).
Parses multipart/form-data encoded request bodies, supporting text fields only in the current implementation. Enforces Content-Type header validation and optional body size limits. File uploads are detected but not yet supported.
- Parameters:
model – Optional msgspec.Struct type to decode parsed fields into. When provided, field values are converted to model instance.
max_size – Optional maximum body size in bytes. Raises BodyValidationError if body exceeds this limit.
- Returns:
Field name to values mapping if model is None. msgspec.Struct: Model instance if model type provided. None: If no body present and no model required.
- Return type:
- Raises:
UnsupportedMediaType – If Content-Type is not multipart/form-data when body exists.
BodyValidationError – If multipart parsing fails, size exceeds max_size, file upload encountered, or model conversion fails.
Examples
Parse multipart text fields:
>>> @app.post("/form/multipart") >>> async def handle_multipart(request: Request): ... fields = await request.multipart() ... name = fields.get("name", [""])[0] ... email = fields.get("email", [""])[0] ... return {"name": name, "email": email}
With size limit:
>>> @app.post("/upload/limited") >>> async def limited_upload(request: Request): ... try: ... # Limit to 1MB ... fields = await request.multipart(max_size=1024*1024) ... return {"status": "success"} ... except BodyValidationError as e: ... return {"error": str(e)}, 413
With model validation:
>>> import msgspec >>> >>> class ContactForm(msgspec.Struct): ... name: str ... email: str ... message: str >>> >>> @app.post("/contact") >>> async def contact(request: Request): ... try: ... form = await request.multipart(model=ContactForm) ... # form is ContactForm instance with validated fields ... return {"received": form.name} ... except BodyValidationError as e: ... return {"error": "Invalid form data"}, 400
Note
Current implementation supports text fields only. File uploads (filename= parameter) will raise BodyValidationError. Full file upload support is planned for future versions. Enforces CRLF line endings per multipart/form-data specification. When model provided, takes last value for fields with multiple values.
- async json(model: type[Struct] | None = None) Any | Struct | None[source]¶
Parse the request body as JSON with optional model validation.
Parses JSON request body using high-performance msgspec decoder. Enforces Content-Type header validation and provides optional automatic model conversion and validation.
- Parameters:
model – Optional msgspec.Struct type to decode JSON directly into. When provided, JSON is validated against model schema and returns a typed model instance. None for raw JSON dict/list parsing.
- Returns:
Decoded JSON as dict, list, or primitive type if model is None. msgspec.Struct: Validated model instance if model type provided. None: If request body is empty and no model required.
- Return type:
Any
- Raises:
UnsupportedMediaType – If Content-Type header is not application/json or compatible JSON media type (e.g., application/*+json) when body exists.
BodyValidationError – If JSON decoding fails, validation fails, or empty body provided when model requires data.
Examples
Parse JSON without model:
>>> @app.post("/api/data") >>> async def handle_data(request: Request): ... data = await request.json() ... # data is dict, list, or primitive ... return {"received": data}
With model validation:
>>> import msgspec >>> >>> class User(msgspec.Struct): ... name: str ... email: str ... age: int >>> >>> @app.post("/api/users") >>> async def create_user(request: Request): ... try: ... user = await request.json(model=User) ... # user is User instance with validated fields ... return {"created": user.name, "email": user.email} ... except BodyValidationError as e: ... return {"error": str(e)}, 400
Handle empty body:
>>> @app.post("/api/optional") >>> async def optional_data(request: Request): ... data = await request.json() ... if data is None: ... return {"message": "No data provided"} ... return {"data": data}
Content-Type validation:
>>> from gobstopper.http.errors import UnsupportedMediaType >>> >>> @app.post("/api/strict") >>> async def strict_json(request: Request): ... try: ... data = await request.json() ... return {"data": data} ... except UnsupportedMediaType: ... return {"error": "Content-Type must be application/json"}, 415
Note
Uses high-performance msgspec JSON decoder. Accepts application/json and application/*+json media types. Empty body returns None if no model specified. Empty body with model requirement raises BodyValidationError. Result is cached internally via get_body() mechanism.
See also
form(): Parse form-encoded datamultipart(): Parse multipart form dataget_json(): Legacy JSON parsing without model support
- async form(model: type[Struct] | None = None) dict[str, list[str]] | Struct | None[source]¶
Parse application/x-www-form-urlencoded request body with optional model validation.
Parses URL-encoded form data from request body. Enforces Content-Type header validation and provides optional automatic model conversion and validation using msgspec.
- Parameters:
model – Optional msgspec.Struct type to convert parsed form data into. When provided, form values are converted to model instance with validation. None for raw dict[str, list[str]] parsing.
- Returns:
- Field name to values mapping if model is None.
Lists preserve multiple values for same field name.
msgspec.Struct: Validated model instance if model type provided. None: If request body is empty and no model required.
- Return type:
- Raises:
UnsupportedMediaType – If Content-Type is not application/x-www-form-urlencoded when body exists.
BodyValidationError – If form parsing fails or model conversion/validation fails.
Examples
Parse form data without model:
>>> @app.post("/submit") >>> async def submit_form(request: Request): ... form = await request.form() ... # form = {"name": ["John"], "email": ["john@example.com"]} ... name = form.get("name", [""])[0] ... email = form.get("email", [""])[0] ... return {"name": name, "email": email}
Handle multiple values:
>>> @app.post("/preferences") >>> async def preferences(request: Request): ... form = await request.form() ... # Form: hobbies=reading&hobbies=coding&hobbies=gaming ... hobbies = form.get("hobbies", []) # ["reading", "coding", "gaming"] ... return {"hobbies": hobbies}
With model validation:
>>> import msgspec >>> >>> class LoginForm(msgspec.Struct): ... username: str ... password: str >>> >>> @app.post("/login") >>> async def login(request: Request): ... try: ... creds = await request.form(model=LoginForm) ... # creds is LoginForm instance with validated fields ... return {"username": creds.username} ... except BodyValidationError as e: ... return {"error": "Invalid form data"}, 400
Content-Type enforcement:
>>> from gobstopper.http.errors import UnsupportedMediaType >>> >>> @app.post("/form/strict") >>> async def strict_form(request: Request): ... try: ... form = await request.form() ... return {"received": True} ... except UnsupportedMediaType: ... return {"error": "Content-Type must be application/x-www-form-urlencoded"}, 415
Note
Enforces Content-Type header must be application/x-www-form-urlencoded. When model provided, takes last value for fields with multiple values. Uses Python’s urllib.parse.parse_qs for form parsing. Result is cached internally via get_form() mechanism. Empty body returns None if no model specified.
See also
json(): Parse JSON datamultipart(): Parse multipart form dataget_form(): Legacy form parsing without model supportargs: Query string parameters
- id¶
- app¶
- max_body_bytes¶
- max_json_depth¶
Response¶
HTTP Response handling for Gobstopper framework.
This module provides response classes and utilities for handling HTTP responses in the Gobstopper web framework. It includes support for standard responses, JSON serialization, file serving, and streaming responses with full RSGI protocol integration.
- Classes:
Response: Standard HTTP response with flexible content types and headers JSONResponse: JSON response with high-performance msgspec serialization FileResponse: File serving response with automatic MIME type detection StreamResponse: Streaming response for server-sent events and large content
- Key Features:
Automatic content type detection
Secure cookie management with production enforcement
RSGI protocol compatibility
High-performance JSON serialization via msgspec
File serving with MIME type detection
Streaming support for progressive content delivery
Comprehensive header and cookie management
Example
Basic response types:
>>> # Standard text/HTML response
>>> return Response("Hello World")
>>> # JSON API response
>>> return JSONResponse({"status": "success", "data": results})
>>> # File download
>>> return FileResponse("reports/data.pdf", filename="report.pdf")
>>> # Streaming response
>>> async def event_stream():
... for i in range(100):
... yield f"data: Event {i}\n\n"
>>> return StreamResponse(event_stream(), content_type="text/event-stream")
See also
gobstopper.http.helpers: Helper functions (jsonify, send_file, stream_template)
gobstopper.http.sse: Server-sent events support (SSEStream, format_sse)
gobstopper.core.app.Gobstopper: Main application class with RSGI protocol handling
- class gobstopper.http.response.Response(body: str | bytes = '', status: int = 200, headers: dict[str, str] | None = None, content_type: str | None = None)[source]¶
Bases:
objectHTTP response with flexible content types and headers.
Represents an HTTP response with configurable status code, headers, and body content. Supports automatic content type detection and header management for RSGI protocol compatibility.
- Parameters:
body – Response content as string or bytes
status – HTTP status code (default: 200)
headers – HTTP headers as dict (optional)
content_type – MIME type override (auto-detected if not provided)
- body¶
Response body content
- status¶
HTTP status code
- headers¶
Response headers dict
Examples
Text response:
>>> return Response("Hello World") >>> return Response("Error message", status=400)
HTML response:
>>> html = "<h1>Welcome</h1>" >>> return Response(html, content_type="text/html")
Custom headers:
>>> return Response("OK", headers={"X-Custom": "value"})
Binary content:
>>> return Response(image_bytes, content_type="image/png")
Note
Content-Type is auto-detected: string content defaults to text/html, bytes content defaults to application/octet-stream.
- __init__(body: str | bytes = '', status: int = 200, headers: dict[str, str] | None = None, content_type: str | None = None)[source]¶
- set_cookie(name: str, value: str, *, path: str = '/', domain: str | None = None, max_age: int | None = None, expires: str | None = None, secure: bool = True, httponly: bool = True, samesite: str | None = 'Strict')[source]¶
Set a cookie with comprehensive security options.
Adds a Set-Cookie header to the response with secure defaults appropriate for production environments. In production (ENV=production), enforces secure cookie settings unless explicitly disabled via environment variable.
- Parameters:
name – Cookie name
value – Cookie value
path – Cookie path scope (default: “/”)
domain – Cookie domain scope (optional, defaults to current domain)
max_age – Cookie lifetime in seconds (optional, None means session cookie)
expires – Cookie expiration date in HTTP format (optional, overridden by max_age)
secure – Require HTTPS transmission (default: True, enforced in production)
httponly – Prevent JavaScript access (default: True, enforced in production)
samesite – CSRF protection level - “Strict”, “Lax”, or “None” (default: “Strict”, enforced to at least “Lax” in production)
Examples
Basic session cookie:
>>> response = Response("Welcome") >>> response.set_cookie("session_id", "abc123")
Persistent cookie with expiration:
>>> response.set_cookie("user_pref", "dark_mode", ... max_age=86400*30) # 30 days
Subdomain cookie:
>>> response.set_cookie("auth_token", token, ... domain=".example.com", ... secure=True, ... httponly=True)
Development-only insecure cookie:
>>> # Set ENV=development and WOPR_ALLOW_INSECURE_COOKIES=true >>> response.set_cookie("debug", "1", secure=False)
Note
Production security enforcement (when ENV=production): - secure=False is overridden to True with warning - httponly=False is overridden to True with warning - samesite=None is overridden to “Lax” with warning
Set WOPR_ALLOW_INSECURE_COOKIES=true to disable enforcement.
See also
delete_cookie(): Remove a cookie
- delete_cookie(name: str, *, path: str = '/', domain: str | None = None)[source]¶
Delete a cookie by setting it to expire immediately.
Removes a cookie from the client by setting an expired Set-Cookie header. The path and domain must match the original cookie for proper deletion.
- Parameters:
name – Cookie name to delete
path – Cookie path scope (must match original, default: “/”)
domain – Cookie domain scope (must match original, optional)
Examples
Delete a session cookie:
>>> response = Response("Logged out") >>> response.delete_cookie("session_id")
Delete a subdomain cookie:
>>> response.delete_cookie("auth_token", domain=".example.com")
Delete a path-specific cookie:
>>> response.delete_cookie("cart_id", path="/shop")
Note
To successfully delete a cookie, path and domain must match the values used when the cookie was originally set.
See also
set_cookie(): Set a cookie
- to_rsgi_headers() list[tuple[str, str]][source]¶
Convert response headers to RSGI protocol format.
Transforms the headers dictionary and cookie list into the list of tuples format required by the RSGI (Rust Server Gateway Interface) protocol.
- Returns:
List of (name, value) tuples for RSGI response headers. Each cookie generates a separate (“set-cookie”, cookie_string) tuple.
Examples
>>> response = Response("OK", headers={"X-Custom": "value"}) >>> response.set_cookie("session", "abc123") >>> response.to_rsgi_headers() [('content-type', 'text/html; charset=utf-8'), ('X-Custom', 'value'), ('set-cookie', 'session=abc123; Path=/; Secure; HttpOnly; SameSite=Strict')]
Note
This method is called internally by the Gobstopper framework when sending responses through the RSGI protocol. You typically don’t need to call it directly.
Multiple Set-Cookie headers are properly handled as separate tuples, as required by HTTP specifications and RSGI protocol.
See also
Gobstopper: Main application class handling RSGI protocol
- class gobstopper.http.response.JSONResponse(data: Any, status: int = 200, **kwargs)[source]¶
Bases:
ResponseHTTP response for JSON data with automatic, high-performance serialization.
Convenience class for returning JSON responses with proper Content-Type headers and optimized JSON serialization using msgspec.
- Parameters:
data – Python object to serialize as JSON (dict, list, primitives)
status – HTTP status code (default: 200)
**kwargs – Additional arguments passed to parent Response class
Examples
Dictionary response:
>>> return JSONResponse({"message": "Success", "data": results})
List response:
>>> return JSONResponse([1, 2, 3, 4, 5])
With custom status:
>>> return JSONResponse({"error": "Not found"}, status=404)
Note
Uses high-performance msgspec for serialization. Automatically sets Content-Type to ‘application/json’.
See also
Response: Base response class
- class gobstopper.http.response.FileResponse(path: str | Path, filename: str | None = None, status: int = 200, headers: dict[str, str] | None = None)[source]¶
Bases:
ResponseHTTP response for serving files with proper headers and MIME detection.
Optimized response class for serving static files, downloads, and attachments. Automatically detects MIME types, sets appropriate headers, and handles file serving through the RSGI protocol.
- Parameters:
path – File path as string or Path object
filename – Download filename (defaults to basename of path)
status – HTTP status code (default: 200)
headers – Additional headers (optional)
- file_path¶
Resolved file path as string
- filename¶
Filename for Content-Disposition header
Examples
Serve static file:
>>> @app.get("/download/<filename>") >>> async def download(request, filename): ... return FileResponse(f"uploads/{filename}")
Force download with custom name:
>>> return FileResponse("report.pdf", filename="monthly_report.pdf")
Image serving:
>>> @app.get("/images/<image_id>") >>> async def serve_image(request, image_id): ... path = f"images/{image_id}.jpg" ... return FileResponse(path)
Note
Automatically sets Content-Type based on file extension. Sets Content-Disposition: attachment for download behavior. File path resolution and existence checking is handled by RSGI protocol.
See also
Response: Base response classStreamResponse: For streaming large files
- class gobstopper.http.response.StreamResponse(generator: Callable[[], Awaitable], status: int = 200, headers: dict[str, str] | None = None, content_type: str = 'text/plain')[source]¶
Bases:
objectHTTP streaming response for real-time data and large content.
Enables streaming HTTP responses for server-sent events, chunked transfer encoding, and progressive content delivery. Ideal for large datasets, real-time updates, and template streaming.
- Parameters:
generator – Async generator function that yields string or bytes chunks
status – HTTP status code (default: 200)
headers – Additional headers (optional)
content_type – MIME type (default: ‘text/plain’)
- generator¶
Async generator for content chunks
- status¶
HTTP status code
- headers¶
Response headers dict
Examples
Server-sent events:
>>> @app.get("/events") >>> async def stream_events(request): ... async def event_generator(): ... for i in range(100): ... yield f"data: Event {i}\n\n" ... await asyncio.sleep(1) ... return StreamResponse(event_generator(), ... content_type="text/event-stream")
Large data streaming:
>>> @app.get("/large-csv") >>> async def stream_csv(request): ... async def csv_generator(): ... yield "id,name,email\n" ... async for user in get_all_users_stream(): ... yield f"{user.id},{user.name},{user.email}\n" ... return StreamResponse(csv_generator(), ... content_type="text/csv")
Template streaming (with Rust engine):
>>> async def stream_template(): ... return await app.render_template("large_page.html", ... stream=True, ... data=huge_dataset)
Note
Generator function must be async and yield string or bytes. Streaming reduces memory usage for large responses. Compatible with template streaming when using Rust engine.
See also
Response: Standard response classGobstopper.render_template(): Template rendering with streaming
- gobstopper.http.response.redirect(location: str, status: int = 302) Response[source]¶
Create a redirect response to the given location.
Flask/Quart-style redirect helper that creates an HTTP redirect response. Commonly used with
url_for()for type-safe redirects to named routes.- Parameters:
location – URL to redirect to. Can be absolute (/path) or relative (path) or full URL (https://example.com/path)
status – HTTP redirect status code. Common values: - 301: Moved Permanently (cached by browsers) - 302: Found (temporary, default) - 303: See Other (POST → GET redirect) - 307: Temporary Redirect (preserves method) - 308: Permanent Redirect (preserves method)
- Returns:
Response object with Location header and appropriate status code
Examples
Simple redirect:
>>> return redirect('/dashboard')
Redirect to named route:
>>> return redirect(app.url_for('user_profile', user_id=123))
Permanent redirect:
>>> return redirect('/new-location', status=301)
Post-redirect-get pattern:
>>> @app.post('/users') >>> async def create_user(request): ... user_id = save_user() ... return redirect(app.url_for('user_detail', id=user_id), status=303)
External redirect:
>>> return redirect('https://example.com/login')
Note
Default 302 is safe for most use cases
Use 301 carefully as browsers cache it permanently
Use 303 for POST → GET redirects (RESTful pattern)
Use 307/308 if you need to preserve the HTTP method
See also
Gobstopper.url_for(): Build URLs for named routesResponse: Base response class
Background Tasks¶
Task queue management for Gobstopper framework.
This module provides the core task queue system for Gobstopper, featuring: - Priority-based task queuing with category separation - Async worker pool management with graceful shutdown - Automatic retry logic with exponential backoff - Persistent storage using DuckDB (optional) - Feature flag control via environment variable
The task system is disabled by default and must be explicitly enabled via the WOPR_TASKS_ENABLED environment variable or constructor parameter. This prevents accidental database creation and resource usage.
- Classes:
NoopStorage: Placeholder storage when tasks are disabled. TaskQueue: Main task queue with worker management and execution.
Example
Basic task queue usage:
from gobstopper.tasks.queue import TaskQueue
from gobstopper.tasks.models import TaskPriority
import os
# Enable tasks
os.environ["WOPR_TASKS_ENABLED"] = "1"
# Create queue
queue = TaskQueue(enabled=True)
# Register task functions
@queue.register_task("send_email", category="notifications")
async def send_email(to: str, subject: str):
# Send email logic
return {"sent": True, "to": to}
# Start workers for category
await queue.start_workers("notifications", worker_count=3)
# Queue a task
task_id = await queue.add_task(
"send_email",
category="notifications",
priority=TaskPriority.HIGH,
max_retries=3,
"user@example.com",
subject="Welcome!"
)
# Check task status
task_info = await queue.get_task_info(task_id)
print(f"Status: {task_info.status}")
# Shutdown gracefully
await queue.shutdown()
- Environment Variables:
- WOPR_TASKS_ENABLED: Set to “1”, “true”, “True”, or “yes” to enable
the task system. Defaults to disabled.
Note
Tasks are disabled by default to prevent unintended side effects
Storage (DuckDB) is created lazily on first use
Each category has its own queue and worker pool
Tasks within a category are ordered by priority then FIFO
Workers shut down gracefully, completing current tasks
- class gobstopper.tasks.queue.NoopStorage[source]¶
Bases:
objectA storage implementation that does nothing (used when tasks are disabled).
This class provides a null-object pattern implementation of the storage interface, allowing the task queue to function without raising errors when tasks are disabled. All methods are no-ops that return safe defaults.
This prevents database file creation and avoids import errors when DuckDB is not installed but tasks are disabled anyway.
Example
NoopStorage is used automatically when tasks are disabled:
queue = TaskQueue(enabled=False) # queue.storage will be NoopStorage() # No database file created, no errors raised
Note
All save operations are silently ignored
All query operations return None or empty lists
No exceptions are raised
Used internally by TaskQueue when enabled=False
- class gobstopper.tasks.queue.TaskQueue(enabled: bool | None = None, storage_factory=None)[source]¶
Bases:
objectPriority-based background task queue with worker pool management.
TaskQueue provides a complete async task processing system with: - Category-based task organization (separate queues per category) - Priority ordering within each category queue - Configurable worker pools per category - Automatic retry logic with exponential backoff - Persistent storage using DuckDB (optional) - Graceful shutdown with task completion - Task cancellation and manual retry support
The queue system is disabled by default and must be explicitly enabled via environment variable (WOPR_TASKS_ENABLED=1) or constructor parameter. Storage is created lazily on first use to avoid unnecessary database files.
- enabled¶
Whether the task system is enabled.
- queues¶
Dict mapping category names to asyncio.PriorityQueue objects.
- running_tasks¶
Dict of currently executing tasks by task_id.
- workers¶
Dict mapping category names to lists of worker tasks.
- shutdown_event¶
asyncio.Event signaling worker shutdown.
- task_functions¶
Dict mapping task names to their registered functions.
Example
Complete task queue workflow:
import asyncio from gobstopper.tasks.queue import TaskQueue from gobstopper.tasks.models import TaskPriority, TaskStatus # Enable and create queue queue = TaskQueue(enabled=True) # Register task functions @queue.register_task("send_email", category="notifications") async def send_email(to: str, subject: str, body: str): # Email sending logic await asyncio.sleep(1) # Simulate work return {"sent": True, "to": to} @queue.register_task("process_data", category="analytics") def process_data(data: dict): # Sync functions work too # Processing logic return {"records": len(data)} # Start workers await queue.start_workers("notifications", worker_count=3) await queue.start_workers("analytics", worker_count=5) # Queue tasks with different priorities task1 = await queue.add_task( "send_email", category="notifications", priority=TaskPriority.HIGH, max_retries=3, "user@example.com", subject="Welcome", body="Thanks for signing up!" ) task2 = await queue.add_task( "process_data", category="analytics", priority=TaskPriority.NORMAL, {"users": 1000} ) # Monitor task status while True: info = await queue.get_task_info(task1) if info.is_completed: if info.status == TaskStatus.SUCCESS: print(f"Result: {info.result}") else: print(f"Failed: {info.error}") break await asyncio.sleep(0.5) # Get statistics stats = await queue.get_task_stats() print(f"Total tasks: {stats['total']}") print(f"Running: {stats['running']}") print(f"By status: {stats['by_status']}") # Graceful shutdown await queue.shutdown()
Note
Each category has its own queue and worker pool
Tasks are ordered by priority (descending), then FIFO
Both async and sync task functions are supported
Sync functions run in thread pool executor to avoid blocking
Retry delay uses exponential backoff: min(2^retry_count, 60) seconds
Workers complete current tasks before shutting down
- __init__(enabled: bool | None = None, storage_factory=None)[source]¶
Initialize a new TaskQueue.
- Parameters:
enabled – Whether to enable the task system. If None, reads from WOPR_TASKS_ENABLED environment variable. Defaults to disabled.
storage_factory – Optional factory function to create custom storage. If None, uses TaskStorage with DuckDB. Useful for testing.
Example
Creating queues with different configurations:
# Use environment variable import os os.environ["WOPR_TASKS_ENABLED"] = "1" queue1 = TaskQueue() # enabled=True from env # Explicitly enable queue2 = TaskQueue(enabled=True) # Explicitly disable queue3 = TaskQueue(enabled=False) # Custom storage for testing from unittest.mock import Mock mock_storage = Mock() queue4 = TaskQueue( enabled=True, storage_factory=lambda: mock_storage )
- property storage¶
Get the storage backend, creating it lazily if needed.
Returns NoopStorage if tasks are disabled, otherwise creates and caches a TaskStorage instance (or custom storage from factory).
- Returns:
Storage object implementing save_task, get_task, get_tasks, and cleanup_old_tasks methods.
Note
Storage is created only on first access (lazy initialization)
Returns NoopStorage if enabled=False
Uses storage_factory if provided, otherwise creates TaskStorage
DuckDB is imported only when storage is actually needed
- register_task(name: str, category: str = 'default')[source]¶
Decorator to register a task function with the queue.
Registers a function (sync or async) to be available for task execution. The function can then be queued by name using add_task(). Both regular functions and coroutine functions are supported.
- Parameters:
name – Unique name to identify this task function.
category – Category for organizing tasks. Tasks in the same category share a queue and worker pool. Defaults to “default”.
- Returns:
Decorator function that registers and returns the original function.
Example
Registering task functions:
queue = TaskQueue(enabled=True) # Register async task @queue.register_task("send_email", category="notifications") async def send_email(to: str, subject: str): # Async email logic return {"sent": True} # Register sync task @queue.register_task("process_file", category="analytics") def process_file(filename: str): # Sync file processing return {"lines": 1000} # Register with default category @queue.register_task("cleanup") def cleanup(): # Cleanup logic pass
Note
Task name must be unique across all categories
Category determines which queue and workers handle the task
Both sync and async functions are supported
The original function is returned unchanged (can be called directly)
- async add_task(name: str, category: str = 'default', priority: TaskPriority = TaskPriority.NORMAL, max_retries: int = 0, *args, **kwargs) str[source]¶
Add a task to the queue for execution.
Creates a TaskInfo object with a unique ID, saves it to storage, and adds it to the appropriate category queue. The task will be picked up by a worker when available.
- Parameters:
name – Name of the registered task function to execute.
category – Category queue to add the task to. Defaults to “default”.
priority – Task priority for queue ordering. Higher priority tasks execute first. Defaults to TaskPriority.NORMAL.
max_retries – Maximum number of retry attempts on failure. Defaults to 0 (no retries).
*args – Positional arguments to pass to the task function.
**kwargs – Keyword arguments to pass to the task function.
- Returns:
Unique task ID (UUID4) for tracking the task.
- Return type:
- Raises:
RuntimeError – If task system is disabled (enabled=False).
ValueError – If task name is not registered.
Example
Adding tasks with different configurations:
queue = TaskQueue(enabled=True) # Simple task with no args task_id1 = await queue.add_task("cleanup") # Task with positional args task_id2 = await queue.add_task( "send_email", category="notifications", "user@example.com", "Welcome to our service" ) # Task with keyword args and high priority task_id3 = await queue.add_task( "process_order", category="orders", priority=TaskPriority.HIGH, max_retries=3, order_id=12345, customer_id=67890 ) # Critical task with immediate execution task_id4 = await queue.add_task( "send_alert", category="alerts", priority=TaskPriority.CRITICAL, alert_type="security_breach" )
Note
Task function must be registered before adding to queue
Tasks are stored persistently before queueing
Priority queue ensures higher priority tasks execute first
Category queue is created automatically if it doesn’t exist
Workers must be started for the category to process tasks
- async get_task_info(task_id: str) TaskInfo | None[source]¶
Retrieve task information and current status.
Checks running tasks first (in-memory), then queries storage for completed or pending tasks. Provides real-time task status.
- Parameters:
task_id – Unique identifier of the task to retrieve.
- Returns:
TaskInfo object if found, None if task doesn’t exist or tasks are disabled.
Example
Monitoring task progress:
task_id = await queue.add_task("long_process") # Poll for completion while True: info = await queue.get_task_info(task_id) if info: print(f"Status: {info.status.value}") print(f"Progress: {info.progress}%") print(f"Message: {info.progress_message}") if info.is_completed: if info.status == TaskStatus.SUCCESS: print(f"Result: {info.result}") print(f"Duration: {info.elapsed_seconds}s") else: print(f"Error: {info.error}") break await asyncio.sleep(1)
Note
Returns None if tasks are disabled
Checks running tasks first for immediate status
Falls back to storage for persistent lookup
Real-time progress tracking if task updates progress field
- async cancel_task(task_id: str) bool[source]¶
Cancel a pending task before execution.
Marks a task as CANCELLED if it’s still pending. Cannot cancel tasks that are already running or completed.
- Parameters:
task_id – Unique identifier of the task to cancel.
- Returns:
- True if task was successfully cancelled, False if task
doesn’t exist, is not pending, or tasks are disabled.
- Return type:
Example
Cancelling tasks conditionally:
# Queue a low priority task task_id = await queue.add_task( "generate_report", priority=TaskPriority.LOW ) # Later, decide to cancel it if await queue.cancel_task(task_id): print("Task cancelled successfully") else: print("Task already started or completed")
Note
Only PENDING tasks can be cancelled
Running tasks cannot be cancelled this way
Cancelled tasks are marked with completed_at timestamp
Returns False if tasks are disabled
- async retry_task(task_id: str) bool[source]¶
Manually retry a failed task.
Resets a failed task’s status and requeues it for execution. Useful for retrying tasks that failed due to transient issues after the automatic retry limit was reached.
- Parameters:
task_id – Unique identifier of the task to retry.
- Returns:
- True if task was successfully requeued, False if task
doesn’t exist, is not failed, or tasks are disabled.
- Return type:
Example
Manual retry after investigation:
# Check failed task task = await queue.get_task_info(task_id) if task and task.status == TaskStatus.FAILED: print(f"Task failed: {task.error}") # Fix underlying issue (e.g., restore network) # ... # Retry the task if await queue.retry_task(task_id): print("Task requeued for retry") else: print("Could not retry task")
Note
Only FAILED tasks can be manually retried
Resets task status to PENDING
Clears error, timing, and progress information
Does not increment retry_count
Task is added back to its original category queue
Returns False if tasks are disabled
- async get_task_stats() dict[source]¶
Get aggregate statistics about tasks across all categories.
Queries storage for recent tasks and computes statistics including counts by status, counts by category, running tasks, and queued tasks.
- Returns:
- dict: Statistics dictionary with keys:
total: Total number of tasks in storage (up to 1000 recent)
by_status: Dict mapping status values to counts
by_category: Dict mapping category names to counts
running: Number of currently executing tasks
queued: Number of tasks waiting in all queues
- Example:
Monitoring queue health:
stats = await queue.get_task_stats() print(f"Total tasks: {stats['total']}") print(f"Currently running: {stats['running']}") print(f"Queued: {stats['queued']}") print("
- By status:”)
- for status, count in stats[‘by_status’].items():
print(f” {status}: {count}”)
print(”
- By category:”)
- for category, count in stats[‘by_category’].items():
print(f” {category}: {count}”)
- Note:
Returns zero/empty stats if tasks are disabled
Limited to 1000 most recent tasks for performance
by_status uses status string values, not enums
running count is from in-memory tracking
queued count is sum of all category queues
- async start_workers(category: str, worker_count: int = 1)[source]¶
Start worker tasks to process tasks in a category queue.
Creates and starts async worker tasks that continuously poll the category queue and execute tasks. Workers run until shutdown is called or they are cancelled.
- Parameters:
category – Name of the category queue to start workers for.
worker_count – Number of worker tasks to create. Defaults to 1.
Example
Starting workers for different workloads:
queue = TaskQueue(enabled=True) # Start 3 workers for email notifications await queue.start_workers("notifications", worker_count=3) # Start 10 workers for heavy processing await queue.start_workers("analytics", worker_count=10) # Start 1 worker for low-volume admin tasks await queue.start_workers("admin", worker_count=1) # Now tasks in these categories will be processed
Note
Does nothing if tasks are disabled
Category queue is created automatically if needed
Workers are tracked in self.workers[category]
Multiple calls add more workers to existing pool
Workers process tasks by priority, then FIFO
Each worker handles one task at a time
Workers shut down gracefully on queue.shutdown()
- async shutdown()[source]¶
Shutdown all workers gracefully, completing current tasks.
Sets the shutdown event, cancels all worker tasks, and waits for them to finish their current task and exit. Ensures clean shutdown without leaving tasks in inconsistent states.
Example
Graceful application shutdown:
queue = TaskQueue(enabled=True) # Register tasks and start workers # ... try: # Run application await app.run() finally: # Ensure workers shut down cleanly await queue.shutdown() print("All workers stopped")
Note
Does nothing if tasks are disabled
Sets shutdown_event to signal workers to stop
Cancels all worker tasks across all categories
Waits for workers to complete current task execution
Safe to call multiple times (idempotent)
Exceptions during worker shutdown are suppressed
Task data models for Gobstopper framework.
This module provides the core data models and enums for the Gobstopper task system, including task status tracking, priority levels, and comprehensive metadata.
The models are implemented using msgspec.Struct for high performance serialization and deserialization, making them suitable for storage in DuckDB and efficient memory usage.
- Classes:
TaskStatus: Enumeration of all possible task execution states. TaskPriority: Enumeration of task priority levels for queue ordering. TaskInfo: Complete task metadata and execution state tracking.
Example
Creating a task info object for tracking:
from datetime import datetime
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority
task = TaskInfo(
id="task-123",
name="send_email",
category="notifications",
priority=TaskPriority.HIGH,
status=TaskStatus.PENDING,
created_at=datetime.now(),
args=("user@example.com",),
kwargs={"subject": "Welcome", "template": "welcome.html"}
)
Note
All datetime fields use timezone-naive datetime objects. It’s recommended to use UTC for consistency across distributed systems.
- class gobstopper.tasks.models.TaskStatus(*values)[source]¶
Bases:
EnumTask execution status enumeration.
Tracks the lifecycle state of a task from creation through completion. Tasks progress through these states as they are queued, executed, and finalized.
- PENDING¶
Task is queued and waiting to be picked up by a worker.
- STARTED¶
Task is currently being executed by a worker.
- SUCCESS¶
Task completed successfully without errors.
- FAILED¶
Task failed and exceeded its retry limit.
- CANCELLED¶
Task was explicitly cancelled before execution.
- RETRY¶
Task failed but will be retried (transient state).
Example
Checking task status:
if task.status == TaskStatus.PENDING: print("Task is waiting in queue") elif task.status == TaskStatus.STARTED: print(f"Task is running: {task.progress}% complete") elif task.status == TaskStatus.SUCCESS: print(f"Task completed in {task.elapsed_seconds}s")
Note
RETRY is a transient state - tasks in RETRY status are automatically requeued as PENDING after an exponential backoff delay.
- PENDING = 'pending'¶
- STARTED = 'started'¶
- SUCCESS = 'success'¶
- FAILED = 'failed'¶
- CANCELLED = 'cancelled'¶
- RETRY = 'retry'¶
- class gobstopper.tasks.models.TaskPriority(*values)[source]¶
Bases:
EnumTask priority levels for queue ordering.
Higher priority tasks are executed before lower priority tasks within the same category queue. Priority values determine the execution order in the priority queue.
- LOW¶
Low priority (1) - for non-urgent background tasks.
- NORMAL¶
Normal priority (2) - default for most tasks.
- HIGH¶
High priority (3) - for time-sensitive operations.
- CRITICAL¶
Critical priority (4) - for urgent, must-run-first tasks.
Example
Setting task priority:
# Regular background cleanup await queue.add_task( "cleanup_temp_files", priority=TaskPriority.LOW ) # User-initiated email await queue.add_task( "send_email", priority=TaskPriority.HIGH, args=("user@example.com",) ) # System alert notification await queue.add_task( "send_alert", priority=TaskPriority.CRITICAL, kwargs={"alert_type": "security"} )
Note
Tasks with the same priority are processed in FIFO order. Priority only affects ordering within a category queue.
- LOW = 1¶
- NORMAL = 2¶
- HIGH = 3¶
- CRITICAL = 4¶
- class gobstopper.tasks.models.TaskInfo(*, id: str, name: str, category: str, priority: ~gobstopper.tasks.models.TaskPriority, status: ~gobstopper.tasks.models.TaskStatus, created_at: ~datetime.datetime, started_at: ~datetime.datetime | None = None, completed_at: ~datetime.datetime | None = None, elapsed_seconds: float = 0.0, result: ~typing.Any = None, error: str | None = None, retry_count: int = 0, max_retries: int = 0, args: tuple = (), kwargs: dict = <factory>, progress: float = 0.0, progress_message: str = '')[source]¶
Bases:
StructComplete task metadata and execution state tracking.
Stores all information about a task from creation through completion, including timing data, execution results, error information, and progress tracking. Uses msgspec.Struct for efficient serialization and memory usage.
- priority¶
Task priority level determining execution order.
- status¶
Current execution state of the task.
- created_at¶
Timestamp when the task was created and queued.
- Type:
- started_at¶
Timestamp when worker began executing (None if not started).
- Type:
datetime.datetime | None
- completed_at¶
Timestamp when execution finished (None if not complete).
- Type:
datetime.datetime | None
- result¶
Return value from the task function (None if no return value).
- Type:
Any
Example
Creating and tracking a task:
from datetime import datetime from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority # Create task info when queuing task = TaskInfo( id="550e8400-e29b-41d4-a716-446655440000", name="send_welcome_email", category="notifications", priority=TaskPriority.HIGH, status=TaskStatus.PENDING, created_at=datetime.now(), max_retries=3, args=("user@example.com",), kwargs={ "template": "welcome.html", "language": "en" } ) # Update progress during execution task.progress = 50.0 task.progress_message = "Rendering email template..." # Mark completion task.status = TaskStatus.SUCCESS task.completed_at = datetime.now() task.elapsed_seconds = 2.5 task.progress = 100.0
Note
All datetime fields are timezone-naive; use UTC for consistency.
TaskInfo instances are stored in DuckDB for persistence.
The result field can store any JSON-serializable data.
Progress tracking is optional; tasks work fine with progress=0.0.
- priority: TaskPriority¶
- status: TaskStatus¶
- property is_running: bool¶
Check if the task is currently being executed.
- Returns:
True if task status is STARTED, False otherwise.
- Return type:
Example
Monitoring task execution:
task_info = await queue.get_task_info(task_id) if task_info.is_running: print(f"Task is {task_info.progress}% complete") print(f"Status: {task_info.progress_message}")
- property is_completed: bool¶
Check if the task has reached a terminal state.
A task is considered completed if it has reached SUCCESS, FAILED, or CANCELLED status. These are terminal states where no further execution will occur.
- Returns:
- True if task is in a terminal state, False if still
pending, running, or eligible for retry.
- Return type:
Example
Waiting for task completion:
while True: task_info = await queue.get_task_info(task_id) if task_info.is_completed: if task_info.status == TaskStatus.SUCCESS: print(f"Result: {task_info.result}") else: print(f"Error: {task_info.error}") break await asyncio.sleep(1)
Note
RETRY status is not considered completed - the task will be requeued automatically.
Task storage with DuckDB backend for Gobstopper framework.
This module provides persistent storage for task metadata using DuckDB, an embedded analytical database. Tasks are stored with full metadata including status, timing, results, and error information for tracking and debugging.
The storage system features: - Lazy database initialization (no file created until first use) - Automatic table and index creation - Efficient querying with category and status filters - Intelligent cleanup of old completed tasks - JSON serialization for complex data types
- Classes:
TaskStorage: DuckDB-based persistent task storage with indexing.
Example
Basic storage operations:
from gobstopper.tasks.storage import TaskStorage
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority
from datetime import datetime
# Create storage instance
storage = TaskStorage("my_tasks.duckdb")
# Save a task
task = TaskInfo(
id="task-123",
name="process_data",
category="analytics",
priority=TaskPriority.NORMAL,
status=TaskStatus.PENDING,
created_at=datetime.now()
)
storage.save_task(task)
# Retrieve task
retrieved = storage.get_task("task-123")
print(f"Status: {retrieved.status}")
# Query by category
analytics_tasks = storage.get_tasks(category="analytics", limit=50)
# Cleanup old completed tasks
deleted = storage.cleanup_old_tasks(days=30)
print(f"Deleted {deleted} old tasks")
Note
DuckDB is required for task storage. Install with: uv add duckdb The database file is created lazily on first use.
- class gobstopper.tasks.storage.TaskStorage(db_path: str | Path = 'wopr_tasks.duckdb')[source]¶
Bases:
objectDuckDB-based task storage with intelligent cleanup.
Provides persistent storage for TaskInfo objects using DuckDB as the backend. The database is created lazily on first use, with automatic schema creation and index optimization for common query patterns.
The storage layer handles: - Conversion between TaskInfo objects and database rows - JSON serialization of complex fields (args, kwargs, result) - Datetime string conversion for database compatibility - Efficient indexing on category, status, created_at, and priority - Safe concurrent access through DuckDB’s transaction handling
- db_path¶
Path to the DuckDB database file.
- connection¶
DuckDB connection object (created lazily).
Example
Creating and using storage:
from gobstopper.tasks.storage import TaskStorage from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority from datetime import datetime # Initialize storage (no database file created yet) storage = TaskStorage("tasks.duckdb") # Create and save a task task = TaskInfo( id="550e8400-e29b-41d4-a716-446655440000", name="send_email", category="notifications", priority=TaskPriority.HIGH, status=TaskStatus.PENDING, created_at=datetime.now(), args=("user@example.com",), kwargs={"subject": "Welcome"} ) storage.save_task(task) # Database file created here # Update task status task.status = TaskStatus.SUCCESS task.completed_at = datetime.now() storage.save_task(task) # Updates existing record # Query tasks pending = storage.get_tasks(status=TaskStatus.PENDING, limit=100) email_tasks = storage.get_tasks(category="notifications")
- Raises:
ImportError – If DuckDB is not installed.
Note
Database initialization is deferred until first actual use
The database file is created with full schema on first write
All datetime values are stored as ISO 8601 strings
JSON fields (args, kwargs, result) support any JSON-serializable data
- __init__(db_path: str | Path = 'wopr_tasks.duckdb')[source]¶
Initialize TaskStorage with a database path.
- Parameters:
db_path – Path to the DuckDB database file. Can be relative or absolute. Defaults to “wopr_tasks.duckdb” in current directory.
- Raises:
ImportError – If DuckDB package is not installed.
Note
The database file is not created until the first operation that requires it. This allows TaskStorage to be instantiated without side effects.
- save_task(task_info: TaskInfo)[source]¶
Save or update task information in the database.
Performs an upsert operation (INSERT OR REPLACE) to either create a new task record or update an existing one. All TaskInfo fields are serialized appropriately for database storage.
- Parameters:
task_info – TaskInfo object to save or update.
Example
Saving and updating a task:
task = TaskInfo( id="task-123", name="process_data", category="analytics", priority=TaskPriority.NORMAL, status=TaskStatus.PENDING, created_at=datetime.now() ) # Initial save storage.save_task(task) # Update after execution task.status = TaskStatus.SUCCESS task.completed_at = datetime.now() task.elapsed_seconds = 5.2 task.result = {"records_processed": 1000} storage.save_task(task) # Updates existing record
Note
Datetime objects are converted to ISO 8601 strings
TaskPriority and TaskStatus enums are stored as their values
args, kwargs, and result are JSON-serialized
The database is initialized automatically if needed
- get_task(task_id: str) TaskInfo | None[source]¶
Retrieve a single task by its unique ID.
- Parameters:
task_id – Unique identifier of the task to retrieve.
- Returns:
TaskInfo object if found, None if no task exists with that ID.
Example
Retrieving and checking a task:
task = storage.get_task("550e8400-e29b-41d4-a716-446655440000") if task: print(f"Task: {task.name}") print(f"Status: {task.status.value}") if task.status == TaskStatus.SUCCESS: print(f"Result: {task.result}") else: print("Task not found")
Note
Returns None rather than raising an exception if task doesn’t exist.
- get_tasks(category: str | None = None, status: TaskStatus | None = None, limit: int = 100, offset: int = 0) List[TaskInfo][source]¶
Query tasks with optional filtering and pagination.
Retrieves multiple tasks from storage with optional filters on category and status. Results are ordered by creation time (newest first) and support pagination via limit/offset.
- Parameters:
category – Filter by task category (e.g., “email”, “reports”). If None, returns tasks from all categories.
status – Filter by TaskStatus enum value. If None, returns tasks in any status.
limit – Maximum number of tasks to return. Defaults to 100.
offset – Number of tasks to skip (for pagination). Defaults to 0.
- Returns:
List of TaskInfo objects matching the filters, ordered by created_at DESC (newest first). Empty list if no matches.
Example
Querying tasks with filters:
# Get all pending email tasks pending_emails = storage.get_tasks( category="email", status=TaskStatus.PENDING, limit=50 ) # Get next page of results next_page = storage.get_tasks( category="email", status=TaskStatus.PENDING, limit=50, offset=50 ) # Get recent failed tasks across all categories failed = storage.get_tasks( status=TaskStatus.FAILED, limit=20 ) # Get all tasks in analytics category analytics = storage.get_tasks(category="analytics")
Note
Results are always ordered by created_at DESC
Efficient queries due to indexes on category and status
Both category and status filters are optional
- cleanup_old_tasks(days: int = None, months: int = None, years: int = None)[source]¶
Delete old completed tasks to manage database size.
Removes tasks that completed before a calculated cutoff date. Only deletes tasks in terminal states (SUCCESS, FAILED, CANCELLED) to avoid removing active or pending tasks.
- Parameters:
days – Number of days to retain. Tasks older than this are deleted.
months – Number of months to retain (converted to 30-day periods).
years – Number of years to retain (converted to 365-day periods).
- Returns:
Number of task records deleted.
- Return type:
- Raises:
None – Returns 0 if no time period specified.
Example
Cleaning up old tasks:
# Delete tasks completed over 30 days ago deleted = storage.cleanup_old_tasks(days=30) print(f"Cleaned up {deleted} old tasks") # Delete tasks completed over 3 months ago deleted = storage.cleanup_old_tasks(months=3) # Delete tasks completed over 1 year ago deleted = storage.cleanup_old_tasks(years=1) # Combine periods (90 days + 6 months) deleted = storage.cleanup_old_tasks(days=90, months=6)
Note
Only deletes tasks in SUCCESS, FAILED, or CANCELLED status
PENDING and STARTED tasks are never deleted
Time periods are cumulative if multiple specified
Safe to run periodically (e.g., daily cron job)
Returns 0 if no time period arguments provided
WebSockets¶
WebSocket connection handling for Gobstopper framework.
This module provides WebSocket support through Granian’s RSGI protocol, offering a high-level interface for real-time bidirectional communication between server and clients.
The WebSocket class wraps the low-level RSGI WebSocket protocol and provides: - Automatic connection lifecycle management - Message size limits and validation (configurable via environment) - Automatic chunking for large messages with backpressure - Support for both text and binary messages - Type-safe message receiving with proper error handling
- Security Features:
Message size limits (default 1 MiB) to prevent memory exhaustion
Automatic connection closure on oversized messages (WebSocket code 1009)
Chunked transmission to provide backpressure and prevent buffer overflow
- Configuration:
Environment variables for tuning: - MAX_WS_MESSAGE_BYTES: Maximum message size in bytes (default: 1048576 = 1 MiB) - WS_SEND_CHUNK_BYTES: Chunk size for large messages (default: 65536 = 64 KiB)
Examples
Simple echo server:
>>> from gobstopper import Gobstopper
>>> app = Gobstopper()
>>>
>>> @app.websocket("/ws/echo")
>>> async def echo(websocket):
... await websocket.accept()
... while True:
... message = await websocket.receive()
... if message.type == "close":
... break
... if message.type == "text":
... await websocket.send_text(f"Echo: {message.data}")
JSON message handling:
>>> import json
>>>
>>> @app.websocket("/ws/api")
>>> async def api_handler(websocket):
... await websocket.accept()
... async for message in websocket:
... if message.type == "text":
... data = json.loads(message.data)
... result = await process_request(data)
... await websocket.send_text(json.dumps(result))
Binary data streaming:
>>> @app.websocket("/ws/stream")
>>> async def stream_handler(websocket):
... await websocket.accept()
... # Stream large binary data in chunks
... with open("large_file.bin", "rb") as f:
... while chunk := f.read(65536):
... await websocket.send_bytes(chunk)
Note
This module requires Granian with RSGI support. Fallback types are provided for development environments without Granian installed.
See also
WebSocketManager: Room-based connection management
gobstopper.core.app: Main Gobstopper application class
- class gobstopper.websocket.connection.WebSocket(scope: Scope, protocol: RSGIWebsocketProtocol)[source]¶
Bases:
objectWebSocket connection wrapper for RSGI protocol.
Provides a high-level interface for WebSocket communication through Granian’s RSGI WebSocket protocol. Handles connection lifecycle, message sending/receiving, and connection management.
- Parameters:
scope – RSGI WebSocket scope containing connection metadata
protocol – RSGI WebSocket protocol instance for communication
- scope¶
Original RSGI scope object with connection info
- protocol¶
RSGI WebSocket protocol for low-level operations
- transport¶
Active transport instance (set after accept())
Examples
Basic echo server:
>>> @app.websocket("/ws/echo") >>> async def echo_handler(websocket: WebSocket): ... await websocket.accept() ... while True: ... message = await websocket.receive() ... if message.type == "text": ... await websocket.send_text(f"Echo: {message.data}") ... elif message.type == "close": ... break
Chat room implementation:
>>> @app.websocket("/ws/chat") >>> async def chat_handler(websocket: WebSocket): ... await websocket.accept() ... try: ... while True: ... message = await websocket.receive() ... if message.type == "text": ... # Broadcast to all connected clients ... await broadcast_message(message.data) ... except ConnectionClosed: ... pass # Client disconnected
Binary data handling:
>>> @app.websocket("/ws/data") >>> async def data_handler(websocket: WebSocket): ... await websocket.accept() ... async for message in websocket.receive_iter(): ... if message.type == "bytes": ... processed = process_binary_data(message.data) ... await websocket.send_bytes(processed)
Note
Must call accept() before sending/receiving messages. WebSocket connections are persistent until explicitly closed. Use try/except to handle connection errors gracefully.
See also
WebSocketManager: Room-based connection managementGobstopper.websocket(): WebSocket route decorator- async accept()[source]¶
Accept the WebSocket connection and establish transport.
Must be called before sending or receiving messages. Creates the transport layer for bidirectional communication.
- Returns:
Transport instance for low-level operations (usually not needed)
- Raises:
ConnectionError – If connection cannot be established
RuntimeError – If already accepted or connection closed
Examples
Standard connection acceptance:
>>> @app.websocket("/ws") >>> async def handler(websocket): ... await websocket.accept() ... # Now ready to send/receive
Note
This must be the first operation after receiving WebSocket instance. Connection cannot be accepted twice.
- async send_text(data: str)[source]¶
Send text message to WebSocket client with automatic chunking.
Sends text messages with automatic chunking for large payloads to provide backpressure management. Messages exceeding the configured size limit will cause the connection to close with status code 1009 (Message Too Big).
The message is encoded to UTF-8 and checked against the maximum message size limit (configurable via MAX_WS_MESSAGE_BYTES environment variable, default 1 MiB). Large messages are automatically split into chunks (configurable via WS_SEND_CHUNK_BYTES, default 64 KiB).
- Parameters:
data – Text message to send. Will be UTF-8 encoded before transmission.
- Raises:
RuntimeError – If WebSocket connection not accepted (transport is None)
ConnectionError – If connection fails during transmission
Examples
Sending simple text messages:
>>> await websocket.accept() >>> await websocket.send_text("Hello, client!") >>> await websocket.send_text("Another message")
Sending JSON data:
>>> import json >>> data = {"type": "notification", "message": "Update available"} >>> await websocket.send_text(json.dumps(data))
Handling large messages:
>>> # Large message is automatically chunked >>> large_text = "x" * 500_000 # 500 KB text >>> await websocket.send_text(large_text) >>> # Sent in chunks with automatic backpressure
Broadcasting to multiple clients:
>>> for ws in active_connections: ... await ws.send_text(broadcast_message)
Note
Messages larger than max_message_bytes will close the connection. Chunking happens automatically for messages > send_chunk_bytes. The transport.drain() method is called after each chunk if available. Silent failure if transport is None (connection not accepted).
See also
send_bytes(): Send binary dataaccept(): Must be called before sending
- async send_bytes(data: bytes)[source]¶
Send binary message to WebSocket client with automatic chunking.
Sends binary messages with automatic chunking for large payloads to provide backpressure management. Messages exceeding the configured size limit will cause the connection to close with status code 1009 (Message Too Big).
The binary data is checked against the maximum message size limit (configurable via MAX_WS_MESSAGE_BYTES environment variable, default 1 MiB). Large messages are automatically split into chunks (configurable via WS_SEND_CHUNK_BYTES, default 64 KiB).
- Parameters:
data – Binary data to send as bytes or bytearray.
- Raises:
RuntimeError – If WebSocket connection not accepted (transport is None)
ConnectionError – If connection fails during transmission
Examples
Sending binary data:
>>> await websocket.accept() >>> binary_data = b"\x00\x01\x02\x03" >>> await websocket.send_bytes(binary_data)
Sending image data:
>>> with open("image.png", "rb") as f: ... image_data = f.read() >>> await websocket.send_bytes(image_data)
Sending protocol buffers:
>>> import my_proto_pb2 >>> message = my_proto_pb2.MyMessage() >>> message.field = "value" >>> await websocket.send_bytes(message.SerializeToString())
Chunked large binary transfer:
>>> # Large binary is automatically chunked >>> large_data = bytes(500_000) # 500 KB >>> await websocket.send_bytes(large_data) >>> # Sent in 64 KB chunks with backpressure
Note
Messages larger than max_message_bytes will close the connection. Chunking happens automatically for messages > send_chunk_bytes. The transport.drain() method is called after each chunk if available. Silent failure if transport is None (connection not accepted).
See also
send_text(): Send text messagesaccept(): Must be called before sending
- async receive()[source]¶
Receive message from WebSocket client.
Waits for and receives the next message from the client. Returns a message object with type and data attributes.
- Returns:
type: Message type (“text”, “bytes”, “close”, “error”)
data: Message content (string for text, bytes for binary)
None: If transport not available
- Return type:
Message object with
- Raises:
ConnectionError – If connection is closed unexpectedly
RuntimeError – If connection not accepted
Examples
Basic message receiving:
>>> message = await websocket.receive() >>> if message.type == "text": ... print(f"Received: {message.data}") >>> elif message.type == "close": ... print("Client disconnected")
Message type handling:
>>> while True: ... message = await websocket.receive() ... if message.type == "text": ... await handle_text_message(message.data) ... elif message.type == "bytes": ... await handle_binary_data(message.data) ... elif message.type == "close": ... break
Note
This method blocks until a message is received. Handle connection close messages to avoid errors.
See also
send_text(): Send text messagessend_bytes(): Send binary messages
- async close(code: int = 1000, reason: str = '')[source]¶
Close the WebSocket connection gracefully.
Closes the WebSocket connection with an optional close code and reason. Following the WebSocket protocol specification for standard close codes.
- Standard WebSocket close codes:
1000: Normal closure (default)
1001: Going away (server shutdown, browser navigation)
1002: Protocol error
1003: Unsupported data type
1007: Invalid payload data
1008: Policy violation
1009: Message too big
1011: Internal server error
- Parameters:
code – WebSocket close status code (default: 1000 for normal closure). Must be a valid WebSocket close code (1000-4999).
reason – Optional human-readable close reason string. Must be UTF-8 and no longer than 123 bytes when encoded.
- Raises:
RuntimeError – If connection not accepted (transport is None)
ValueError – If code is invalid or reason is too long
Examples
Normal closure:
>>> await websocket.accept() >>> # ... handle messages ... >>> await websocket.close()
Closure with reason:
>>> await websocket.close(1000, "Session ended")
Error closure:
>>> try: ... await process_message(message) >>> except ValidationError: ... await websocket.close(1008, "Invalid message format")
Server shutdown:
>>> for ws in active_connections: ... await ws.close(1001, "Server shutting down")
Note
After closing, no further messages can be sent or received. Client may also initiate close; handle “close” message type in receive(). Connection is automatically cleaned up after close.
WebSocket room management for Gobstopper framework.
This module provides a comprehensive WebSocket connection manager with room-based organization for building real-time applications like chat systems, live dashboards, collaborative editing, and multiplayer games.
The WebSocketManager class offers: - Centralized connection tracking and lifecycle management - Room-based organization for targeted message broadcasting - Automatic cleanup of dead connections - Thread-safe operations for concurrent access - Efficient broadcast mechanisms for one-to-many communication
- Key Concepts:
Connection: Individual WebSocket client identified by unique connection_id Room: Named group of connections that can receive broadcasts together Broadcast: Sending messages to multiple connections simultaneously
- Architecture:
The manager maintains two key data structures: - connections: Dict mapping connection_id -> WebSocket instance - rooms: Dict mapping room_name -> Set of connection_ids
This separation allows: - O(1) connection lookups - O(1) room membership checks - Efficient multi-room membership per connection - Easy cleanup when connections close
- Use Cases:
Chat rooms: Users join named chat rooms to exchange messages Live updates: Broadcast data updates to subscribers of specific topics Notifications: Send alerts to specific user groups Gaming: Manage players in different game lobbies Collaboration: Sync state across users editing the same document
Examples
Basic chat room implementation:
>>> from gobstopper import Gobstopper
>>> from gobstopper.websocket import WebSocketManager
>>> import uuid
>>>
>>> app = Gobstopper()
>>> manager = WebSocketManager()
>>>
>>> @app.websocket("/ws/chat/<room>")
>>> async def chat_handler(websocket, room):
... conn_id = str(uuid.uuid4())
... await websocket.accept()
... manager.add_connection(conn_id, websocket)
... manager.join_room(conn_id, room)
...
... try:
... while True:
... message = await websocket.receive()
... if message.type == "close":
... break
... if message.type == "text":
... await manager.broadcast_to_room(room, message.data)
... finally:
... manager.remove_connection(conn_id)
Multi-room user presence:
>>> @app.websocket("/ws/notifications")
>>> async def notification_handler(websocket):
... conn_id = str(uuid.uuid4())
... await websocket.accept()
... manager.add_connection(conn_id, websocket)
...
... # User can join multiple notification topics
... manager.join_room(conn_id, "global")
... manager.join_room(conn_id, f"user_{user_id}")
... manager.join_room(conn_id, f"team_{team_id}")
...
... # Now receives broadcasts from all three rooms
... while True:
... message = await websocket.receive()
... if message.type == "close":
... break
Admin broadcast to all users:
>>> async def notify_all_users(message: str):
... await manager.broadcast_to_all(message)
Room statistics and monitoring:
>>> def get_room_stats(room: str) -> dict:
... connections = manager.get_room_connections(room)
... return {
... "room": room,
... "active_users": len(connections),
... "connection_ids": list(connections)
... }
- Security Considerations:
Always validate connection_ids to prevent injection attacks
Implement authentication before adding connections
Sanitize room names to prevent unauthorized access
Rate limit broadcast operations to prevent abuse
Monitor connection counts to detect DoS attempts
- Performance Notes:
Room lookups are O(1) with hash-based dictionaries
Broadcasting is O(n) where n is room size
Connection cleanup is O(m) where m is number of rooms
Memory usage scales with: connections + (rooms × avg_room_size)
See also
WebSocket: Individual WebSocket connection wrapper
gobstopper.core.app: Main Gobstopper application for routing
- class gobstopper.websocket.manager.WebSocketManager[source]¶
Bases:
objectCentralized WebSocket connection manager with room-based organization.
Manages WebSocket connections with support for rooms (groups), broadcasting, and automatic connection lifecycle management. Designed for building real-time applications with many concurrent connections and targeted message delivery.
The manager provides efficient data structures for: - Fast connection lookups by ID - Organizing connections into named rooms - Broadcasting messages to rooms or all connections - Tracking which rooms a connection belongs to - Automatic cleanup of disconnected clients
- connections¶
Dictionary mapping connection_id (str) to WebSocket instances. Maintains all active WebSocket connections.
- rooms¶
Dictionary mapping room names (str) to Sets of connection_ids. Organizes connections into logical groups for targeted broadcasting.
- Thread Safety:
This implementation is NOT thread-safe by default. For multi-threaded applications, wrap operations in locks or use asyncio’s thread-safe primitives.
Examples
Initialize manager and handle connections:
>>> from gobstopper.websocket import WebSocketManager >>> manager = WebSocketManager() >>> >>> # In your WebSocket handler >>> conn_id = "user_123" >>> manager.add_connection(conn_id, websocket) >>> manager.join_room(conn_id, "lobby")
Complete chat application:
>>> import uuid >>> from gobstopper import Gobstopper >>> >>> app = Gobstopper() >>> manager = WebSocketManager() >>> >>> @app.websocket("/chat/<room_name>") >>> async def chat(websocket, room_name): ... conn_id = str(uuid.uuid4()) ... await websocket.accept() ... manager.add_connection(conn_id, websocket) ... manager.join_room(conn_id, room_name) ... ... try: ... # Send join notification ... await manager.broadcast_to_room( ... room_name, ... f"User {conn_id[:8]} joined" ... ) ... ... # Message loop ... while True: ... msg = await websocket.receive() ... if msg.type == "close": ... break ... if msg.type == "text": ... await manager.broadcast_to_room(room_name, msg.data) ... finally: ... manager.remove_connection(conn_id) ... await manager.broadcast_to_room( ... room_name, ... f"User {conn_id[:8]} left" ... )
Multi-room presence (user in multiple rooms):
>>> conn_id = "user_456" >>> manager.add_connection(conn_id, websocket) >>> manager.join_room(conn_id, "general") >>> manager.join_room(conn_id, "announcements") >>> manager.join_room(conn_id, "team_alpha") >>> >>> # User receives broadcasts from all three rooms >>> rooms = manager.get_connection_rooms(conn_id) >>> print(rooms) # {'general', 'announcements', 'team_alpha'}
Room monitoring and statistics:
>>> def get_all_room_stats(): ... stats = {} ... for room in manager.rooms.keys(): ... connections = manager.get_room_connections(room) ... stats[room] = { ... "users": len(connections), ... "connection_ids": list(connections) ... } ... return stats
Graceful shutdown:
>>> async def shutdown(): ... # Notify all users ... await manager.broadcast_to_all( ... "Server shutting down in 10 seconds" ... ) ... # Close all connections ... for conn_id, ws in list(manager.connections.items()): ... await ws.close(1001, "Server shutdown") ... manager.remove_connection(conn_id)
See also
WebSocket: Individual WebSocket connectionbroadcast_to_room(): Send message to specific roombroadcast_to_all(): Send message to all connections- __init__()[source]¶
Initialize empty WebSocket manager.
Creates a new manager with no connections or rooms. The connections dictionary and rooms defaultdict are initialized and ready for use.
Examples
>>> manager = WebSocketManager() >>> print(len(manager.connections)) # 0 >>> print(len(manager.rooms)) # 0
- add_connection(connection_id: str, websocket: WebSocket)[source]¶
Register a new WebSocket connection with the manager.
Adds a WebSocket connection to the manager’s tracking dictionary. This must be called after accepting the WebSocket connection and before using any room or broadcast features.
The connection_id should be unique across all active connections. Using UUIDs or session tokens is recommended to ensure uniqueness.
- Parameters:
connection_id – Unique identifier for this connection. Must be hashable and unique. Recommended to use UUID4 or secure random string.
websocket – The WebSocket instance to register. Should already be accepted (websocket.accept() called).
Examples
Using UUID for connection ID:
>>> import uuid >>> conn_id = str(uuid.uuid4()) >>> manager.add_connection(conn_id, websocket)
Using session-based ID:
>>> session_id = request.session["user_id"] >>> conn_id = f"user_{session_id}" >>> manager.add_connection(conn_id, websocket)
Complete connection lifecycle:
>>> @app.websocket("/ws") >>> async def handler(websocket): ... conn_id = str(uuid.uuid4()) ... await websocket.accept() ... manager.add_connection(conn_id, websocket) ... try: ... # Handle messages... ... pass ... finally: ... manager.remove_connection(conn_id)
Note
If connection_id already exists, it will be overwritten. Ensure IDs are unique to avoid connection conflicts.
See also
remove_connection(): Remove connection from managerjoin_room(): Add connection to a room
- remove_connection(connection_id: str)[source]¶
Remove a WebSocket connection and clean up all room memberships.
Removes the connection from the manager and automatically removes it from all rooms it was a member of. This is the proper way to clean up when a connection closes, ensuring no dangling references remain.
The operation is idempotent - calling it multiple times with the same connection_id is safe and has no effect after the first removal.
- Parameters:
connection_id – The unique identifier of the connection to remove.
Examples
Basic cleanup in finally block:
>>> try: ... await websocket.accept() ... manager.add_connection(conn_id, websocket) ... # Handle messages... ... finally: ... manager.remove_connection(conn_id)
Cleanup with logging:
>>> def cleanup_connection(conn_id: str): ... rooms = manager.get_connection_rooms(conn_id) ... manager.remove_connection(conn_id) ... logger.info(f"Removed {conn_id} from {len(rooms)} rooms")
Graceful disconnect with notification:
>>> async def disconnect_user(conn_id: str): ... # Get rooms before removal ... user_rooms = manager.get_connection_rooms(conn_id) ... ... # Notify rooms about departure ... for room in user_rooms: ... await manager.broadcast_to_room( ... room, ... f"User {conn_id} disconnected" ... ) ... ... # Remove connection ... manager.remove_connection(conn_id)
Bulk cleanup on shutdown:
>>> async def cleanup_all(): ... conn_ids = list(manager.connections.keys()) ... for conn_id in conn_ids: ... manager.remove_connection(conn_id)
Note
This method is safe to call even if connection_id doesn’t exist. All room memberships are automatically cleaned up. Empty rooms remain in the rooms dictionary (with empty sets).
See also
add_connection(): Register a new connectionleave_room(): Remove from specific room only
- join_room(connection_id: str, room: str)[source]¶
Add a connection to a named room for targeted broadcasting.
Adds the specified connection to a room, allowing it to receive broadcasts sent to that room. Connections can be members of multiple rooms simultaneously. If the room doesn’t exist, it will be created automatically.
This operation only succeeds if the connection exists in the manager. Attempting to add a non-existent connection to a room will silently fail.
- Parameters:
connection_id – Unique identifier of the connection to add to the room. Must be a connection previously registered with add_connection().
room – Name of the room to join. Can be any string identifier. Room is created automatically if it doesn’t exist.
Examples
Join single room:
>>> manager.add_connection(conn_id, websocket) >>> manager.join_room(conn_id, "general")
Join multiple rooms:
>>> manager.join_room(conn_id, "global_chat") >>> manager.join_room(conn_id, "announcements") >>> manager.join_room(conn_id, f"user_{user_id}_private")
Dynamic room joining based on user:
>>> @app.websocket("/ws/subscribe") >>> async def subscribe_handler(websocket): ... await websocket.accept() ... conn_id = str(uuid.uuid4()) ... manager.add_connection(conn_id, websocket) ... ... # User sends room names to join ... while True: ... message = await websocket.receive() ... if message.type == "text": ... data = json.loads(message.data) ... if data["action"] == "join": ... manager.join_room(conn_id, data["room"]) ... await websocket.send_text( ... f"Joined room: {data['room']}" ... )
Topic-based subscriptions:
>>> # User subscribes to multiple topics >>> user_topics = ["python", "javascript", "devops"] >>> for topic in user_topics: ... manager.join_room(conn_id, f"topic_{topic}")
Permission-based room access:
>>> async def join_with_permission(conn_id, room, user): ... if await user.has_permission(room): ... manager.join_room(conn_id, room) ... return True ... return False
Note
Silently does nothing if connection_id not in manager.connections. Joining the same room multiple times has no additional effect. Room is created automatically on first join.
See also
leave_room(): Remove connection from roombroadcast_to_room(): Send message to room membersget_room_connections(): Get all connections in room
- leave_room(connection_id: str, room: str)[source]¶
Remove a connection from a specific room.
Removes the connection from the specified room without affecting its membership in other rooms or removing it from the manager. This is useful for managing dynamic subscriptions where users can leave specific channels without disconnecting entirely.
The operation is idempotent - calling it multiple times or on a connection that isn’t in the room has no effect.
- Parameters:
connection_id – Unique identifier of the connection to remove from room.
room – Name of the room to leave.
Examples
Leave single room:
>>> manager.leave_room(conn_id, "general")
Dynamic unsubscribe:
>>> @app.websocket("/ws/manage") >>> async def manage_subscriptions(websocket): ... conn_id = str(uuid.uuid4()) ... await websocket.accept() ... manager.add_connection(conn_id, websocket) ... ... while True: ... message = await websocket.receive() ... if message.type == "text": ... data = json.loads(message.data) ... if data["action"] == "leave": ... manager.leave_room(conn_id, data["room"]) ... await websocket.send_text( ... f"Left room: {data['room']}" ... )
Leave multiple rooms:
>>> rooms_to_leave = ["channel1", "channel2", "channel3"] >>> for room in rooms_to_leave: ... manager.leave_room(conn_id, room)
Permission revocation:
>>> async def revoke_access(conn_id: str, room: str): ... manager.leave_room(conn_id, room) ... # Notify user ... ws = manager.connections.get(conn_id) ... if ws: ... await ws.send_text( ... f"Access to {room} has been revoked" ... )
Clean exit from specific room:
>>> # Leave room but stay connected >>> rooms = manager.get_connection_rooms(conn_id) >>> if "temporary_room" in rooms: ... manager.leave_room(conn_id, "temporary_room")
Note
Does not remove connection from manager, only from the room. Safe to call even if connection not in room or room doesn’t exist. Connection remains active and in other rooms.
See also
join_room(): Add connection to roomremove_connection(): Remove connection entirelyget_connection_rooms(): Check which rooms connection is in
- async broadcast_to_room(room: str, message: str)[source]¶
Broadcast text message to all connections in a specific room.
Sends the same text message to every connection currently in the specified room. Failed sends (due to closed connections) automatically trigger cleanup of that connection from the manager.
This is an async operation that sends messages sequentially to each connection in the room. For better performance with large rooms, consider implementing parallel sends with asyncio.gather().
- Parameters:
room – Name of the room to broadcast to.
message – Text message to send to all room members. Must be a string.
Examples
Simple room broadcast:
>>> await manager.broadcast_to_room("lobby", "Welcome everyone!")
Chat message broadcasting:
>>> @app.websocket("/chat/<room>") >>> async def chat_handler(websocket, room): ... conn_id = str(uuid.uuid4()) ... await websocket.accept() ... manager.add_connection(conn_id, websocket) ... manager.join_room(conn_id, room) ... ... while True: ... msg = await websocket.receive() ... if msg.type == "text": ... # Broadcast to everyone in room ... await manager.broadcast_to_room( ... room, ... f"{conn_id[:8]}: {msg.data}" ... )
JSON broadcast:
>>> import json >>> data = {"type": "update", "value": 42} >>> await manager.broadcast_to_room( ... "notifications", ... json.dumps(data) ... )
Notify room about events:
>>> async def notify_room_event(room: str, event: str, data: dict): ... message = json.dumps({ ... "event": event, ... "data": data, ... "timestamp": time.time() ... }) ... await manager.broadcast_to_room(room, message)
System announcements:
>>> async def system_announce(room: str, announcement: str): ... await manager.broadcast_to_room( ... room, ... f"[SYSTEM] {announcement}" ... )
Multiple room broadcast:
>>> async def broadcast_to_multiple_rooms(rooms: list, message: str): ... for room in rooms: ... await manager.broadcast_to_room(room, message)
Note
Messages are sent sequentially, not in parallel. Failed sends automatically clean up dead connections. If room doesn’t exist or is empty, nothing happens. For binary data, use websocket.send_bytes() directly on each connection.
See also
broadcast_to_all(): Broadcast to all connectionsget_room_connections(): Get connection IDs in roomWebSocket.send_text: Underlying send method
- async broadcast_to_all(message: str)[source]¶
Broadcast text message to every active connection.
Sends the same text message to all connections registered with the manager, regardless of room membership. This is useful for system-wide announcements, critical alerts, or global events.
Failed sends (due to closed connections) automatically trigger cleanup of that connection. Uses list() to create a snapshot of connections to avoid modification during iteration.
- Parameters:
message – Text message to send to all connections. Must be a string.
Examples
System-wide announcement:
>>> await manager.broadcast_to_all( ... "Server will restart in 5 minutes" ... )
Global notification:
>>> import json >>> notification = { ... "type": "announcement", ... "message": "New feature deployed!", ... "priority": "high" ... } >>> await manager.broadcast_to_all(json.dumps(notification))
Scheduled broadcast:
>>> import asyncio >>> >>> async def scheduled_announcement(): ... while True: ... await asyncio.sleep(3600) # Every hour ... await manager.broadcast_to_all( ... "Hourly server status: All systems operational" ... )
Emergency alert:
>>> async def emergency_broadcast(alert_message: str): ... priority_msg = json.dumps({ ... "type": "emergency", ... "message": alert_message, ... "timestamp": time.time() ... }) ... await manager.broadcast_to_all(priority_msg)
Server shutdown notification:
>>> async def notify_shutdown(): ... await manager.broadcast_to_all( ... "Server shutting down for maintenance" ... ) ... await asyncio.sleep(2) # Give time to send ... # Proceed with shutdown...
Broadcast with counter:
>>> async def count_broadcast_recipients(message: str): ... initial_count = len(manager.connections) ... await manager.broadcast_to_all(message) ... final_count = len(manager.connections) ... failed = initial_count - final_count ... return {"sent": final_count, "failed": failed}
Note
Sends to ALL connections, ignoring room memberships. Messages are sent sequentially, not in parallel. Creates connection snapshot to handle concurrent modifications. Failed sends automatically clean up dead connections. For large connection counts, consider rate limiting or batching.
See also
broadcast_to_room(): Broadcast to specific roomget_room_connections(): Get connections in a roomWebSocket.send_text: Underlying send method
- get_room_connections(room: str) Set[str][source]¶
Get all connection IDs currently in a specific room.
Returns a copy of the set of connection IDs that are members of the specified room. The returned set is a copy, so modifications won’t affect the manager’s internal state.
- Parameters:
room – Name of the room to query.
- Returns:
Set of connection_id strings for all connections in the room. Returns empty set if room doesn’t exist or has no members.
Examples
Get room size:
>>> connections = manager.get_room_connections("lobby") >>> print(f"Lobby has {len(connections)} users")
Check if connection in room:
>>> room_members = manager.get_room_connections("vip_lounge") >>> if conn_id in room_members: ... print("User is in VIP lounge")
Room statistics:
>>> def get_room_info(room: str) -> dict: ... connections = manager.get_room_connections(room) ... return { ... "room": room, ... "member_count": len(connections), ... "members": list(connections) ... }
Iterate over room members:
>>> room_members = manager.get_room_connections("announcements") >>> for conn_id in room_members: ... ws = manager.connections.get(conn_id) ... if ws: ... print(f"Connection {conn_id} is active")
Compare room sizes:
>>> def find_most_popular_room() -> str: ... max_size = 0 ... popular = None ... for room in manager.rooms.keys(): ... size = len(manager.get_room_connections(room)) ... if size > max_size: ... max_size = size ... popular = room ... return popular
Note
Returns a copy of the set, safe to modify without affecting manager. Empty set returned if room doesn’t exist. Connection IDs in set may reference closed connections.
See also
get_connection_rooms(): Get rooms a connection is injoin_room(): Add connection to roombroadcast_to_room(): Send message to room
- get_connection_rooms(connection_id: str) Set[str][source]¶
Get all rooms that a specific connection is a member of.
Iterates through all rooms to find which ones contain the specified connection. Returns a new set containing the room names, so modifications won’t affect the manager’s internal state.
- Parameters:
connection_id – Unique identifier of the connection to query.
- Returns:
Set of room name strings that the connection is a member of. Returns empty set if connection is not in any rooms.
Examples
Check user’s room memberships:
>>> rooms = manager.get_connection_rooms(conn_id) >>> print(f"User is in {len(rooms)} rooms: {rooms}")
Verify room membership:
>>> user_rooms = manager.get_connection_rooms(conn_id) >>> if "admin_only" in user_rooms: ... print("User has admin access")
Display user’s subscriptions:
>>> async def show_subscriptions(websocket, conn_id): ... rooms = manager.get_connection_rooms(conn_id) ... await websocket.send_text( ... f"You are subscribed to: {', '.join(rooms)}" ... )
Leave all rooms except one:
>>> def keep_only_room(conn_id: str, keep_room: str): ... current_rooms = manager.get_connection_rooms(conn_id) ... for room in current_rooms: ... if room != keep_room: ... manager.leave_room(conn_id, room)
Connection activity report:
>>> def get_connection_report(conn_id: str) -> dict: ... rooms = manager.get_connection_rooms(conn_id) ... ws = manager.connections.get(conn_id) ... return { ... "connection_id": conn_id, ... "active": ws is not None, ... "room_count": len(rooms), ... "rooms": list(rooms) ... }
Cleanup before disconnect:
>>> async def graceful_disconnect(conn_id: str): ... # Notify all rooms user was in ... rooms = manager.get_connection_rooms(conn_id) ... for room in rooms: ... await manager.broadcast_to_room( ... room, ... f"User {conn_id} has left {room}" ... ) ... # Remove connection ... manager.remove_connection(conn_id)
Note
Returns a new set, safe to modify without affecting manager. Empty set if connection not in any rooms. Performance is O(n) where n is total number of rooms.
See also
get_room_connections(): Get connections in a roomjoin_room(): Add connection to roomleave_room(): Remove from specific room
Sessions¶
Session storage backends for Gobstopper
- class gobstopper.sessions.storage.BaseSessionStorage[source]¶
Bases:
ABCAbstract base class for session storage.
- class gobstopper.sessions.storage.FileSessionStorage(directory: Path)[source]¶
Bases:
BaseSessionStorageFile-based session storage.
Middleware¶
Static file middleware for Gobstopper framework.
This module provides secure static file serving with automatic MIME type detection, caching headers, and directory traversal protection. This is the pure Python implementation, suitable for development and production use.
- class gobstopper.middleware.static.StaticFileMiddleware(static_dir: str = 'static', url_prefix: str = '/static')[source]¶
Bases:
objectMiddleware for serving static files securely.
This middleware serves static files from a designated directory with built-in security features to prevent directory traversal attacks. It automatically detects MIME types and adds appropriate caching headers.
- Security Features:
Directory traversal protection (prevents ‘..’ attacks)
Path validation to ensure files are within static directory
Automatic MIME type detection
403 Forbidden for invalid paths
404 Not Found for missing files
- Performance Features:
Cache-Control headers (1 hour default)
Content-Type auto-detection
Content-Length headers
Efficient file reading
- Parameters:
static_dir – Directory containing static files. Can be relative or absolute. Default is ‘static’. The directory must exist.
url_prefix – URL prefix for static routes. Must start with ‘/’. Default is ‘/static’. Requests to this prefix will serve static files.
Examples
Basic static file serving:
from gobstopper.middleware import StaticFileMiddleware # Serve files from './static' at '/static' URLs static = StaticFileMiddleware() app.add_middleware(static)
Custom directory and prefix:
static = StaticFileMiddleware( static_dir='public', url_prefix='/assets' ) app.add_middleware(static)
Multiple static directories:
# Serve from different directories with different prefixes app.add_middleware(StaticFileMiddleware('css', '/css')) app.add_middleware(StaticFileMiddleware('js', '/js')) app.add_middleware(StaticFileMiddleware('images', '/images'))
Note
For production with high traffic, consider using RustStaticMiddleware
The middleware automatically adds ‘public, max-age=3600’ cache headers
MIME types are detected using Python’s mimetypes module
Unknown file types default to ‘application/octet-stream’
- Security Considerations:
Never serve files from user-controlled directories
Be careful with symlinks (they can escape the static directory)
Consider using Content-Security-Policy headers
Validate file extensions if serving user-uploaded content
See also
RustStaticMiddleware: High-performance Rust-based static serving
HybridStaticMiddleware: Combines Rust and Python for optimal performance
- async __call__(request: Request, call_next: Callable[[Request], Awaitable[Any]]) Response[source]¶
Process request and serve static files if needed.
Checks if the request path matches the static URL prefix. If so, serves the corresponding file; otherwise, passes the request to the next handler.
- Parameters:
request – The incoming HTTP request.
call_next – The next middleware or handler in the chain.
- Returns:
Response containing the file contents if path matches static prefix, otherwise the result from call_next().
Note
Only processes requests matching the configured url_prefix
Other requests are passed through unchanged
CORS middleware for Gobstopper framework.
This module provides Cross-Origin Resource Sharing (CORS) support for web applications, enabling secure cross-origin requests from browsers. CORS is essential for modern web applications that need to make requests from different domains.
- class gobstopper.middleware.cors.CORSMiddleware(origins: List[str] | None = None, methods: List[str] | None = None, headers: List[str] | None = None, allow_credentials: bool = False, max_age: int = 3600)[source]¶
Bases:
objectMiddleware for handling Cross-Origin Resource Sharing (CORS).
This middleware adds appropriate CORS headers to responses and handles preflight OPTIONS requests. It supports configurable origins, methods, headers, and credential handling.
- Security Considerations:
Use specific origins instead of ‘*’ when possible
Never use ‘*’ with credentials enabled (browsers will reject)
Always include Vary header when using specific origins
Consider the security implications of exposed headers
- Parameters:
origins – List of allowed origins. Use [‘*’] for all origins (default). Examples: [’https://example.com’, ‘https://app.example.com’] Note: ‘*’ cannot be used with credentials.
methods – List of allowed HTTP methods. Defaults to [‘GET’, ‘POST’, ‘PUT’, ‘DELETE’, ‘OPTIONS’].
headers – List of allowed request headers. Defaults to [‘Content-Type’, ‘Authorization’].
allow_credentials – Whether to allow credentials (cookies, auth headers). Default is False. When True, specific origins must be listed.
max_age – How long (in seconds) browsers should cache preflight responses. Default is 3600 (1 hour). Maximum varies by browser (usually 86400).
Examples
Basic CORS for public API:
from gobstopper.middleware import CORSMiddleware # Allow all origins (no credentials) cors = CORSMiddleware() app.add_middleware(cors)
CORS with specific origins and credentials:
cors = CORSMiddleware( origins=['https://app.example.com', 'https://admin.example.com'], methods=['GET', 'POST', 'PUT', 'DELETE'], headers=['Content-Type', 'Authorization', 'X-Custom-Header'], allow_credentials=True, max_age=7200 # 2 hours ) app.add_middleware(cors)
Development setup (permissive):
cors = CORSMiddleware( origins=['http://localhost:3000', 'http://localhost:8080'], allow_credentials=True ) app.add_middleware(cors)
Note
Preflight requests (OPTIONS) are handled automatically
The middleware adds ‘Vary: Origin’ header when using specific origins
Browsers enforce CORS; this middleware just provides the headers
Empty origin headers are rejected for security
See also
SecurityMiddleware: For additional security headers
MDN CORS documentation: https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS
- __init__(origins: List[str] | None = None, methods: List[str] | None = None, headers: List[str] | None = None, allow_credentials: bool = False, max_age: int = 3600)[source]¶
- async __call__(request: Request, call_next: Callable[[Request], Awaitable[Any]]) Response[source]¶
Process request and add CORS headers.
This method handles both preflight OPTIONS requests and regular requests, adding appropriate CORS headers based on the middleware configuration.
- Parameters:
request – The incoming HTTP request.
call_next – The next middleware or handler in the chain.
- Returns:
Response with CORS headers added if the origin is allowed.
Note
Preflight requests receive a 204 No Content response
Regular requests are passed through with CORS headers added
Origins are validated before headers are added
Security middleware for Gobstopper framework.
This module provides comprehensive security features for web applications including: - CSRF (Cross-Site Request Forgery) protection - Secure session management with multiple storage backends - Modern security headers (HSTS, CSP, COOP, COEP, etc.) - Cookie security with environment-aware defaults - Session ID signing and verification
The middleware is designed to be production-ready with secure defaults that are automatically enforced in production environments.
- class gobstopper.middleware.security.SecurityMiddleware(secret_key: str | None = None, enable_csrf: bool = True, enable_security_headers: bool = True, hsts_max_age: int = 31536000, hsts_include_subdomains: bool = True, csp_policy: str = "default-src 'self'; object-src 'none'", referrer_policy: str = 'strict-origin-when-cross-origin', coop_policy: str = 'same-origin', coep_policy: str = 'require-corp', session_storage: BaseSessionStorage | AsyncBaseSessionStorage | None = None, debug: bool = False, cookie_name: str = 'session_id', cookie_secure: bool = True, cookie_httponly: bool = True, cookie_samesite: str = 'Strict', cookie_path: str = '/', cookie_domain: str | None = None, cookie_max_age: int = 604800, rolling_sessions: bool = False, sign_session_id: bool = False)[source]¶
Bases:
objectMiddleware for comprehensive application security.
This middleware provides multiple layers of security protection including CSRF tokens, secure session management, and modern security headers. It’s designed to be production-ready with secure defaults that are automatically enforced based on the environment.
- Security Features:
CSRF Protection: Validates tokens for state-changing requests
Session Management: Secure sessions with multiple storage backends
Security Headers: HSTS, CSP, COOP, COEP, X-Frame-Options, etc.
Cookie Security: Secure, HttpOnly, SameSite flags with environment enforcement
Session ID Signing: HMAC-SHA256 signing to prevent tampering
Rolling Sessions: Optional session renewal on each request
- Production Behavior:
When ENV=production, the middleware enforces: - Cookie Secure flag (HTTPS only) - HttpOnly flag (prevents JavaScript access) - SameSite=Strict (prevents CSRF attacks) These cannot be disabled in production for safety.
- Parameters:
secret_key – Secret key for CSRF and session signing. Required for production. Should be a long, random string. If not provided, a random key is generated but this is not suitable for multi-server deployments.
enable_csrf – Enable CSRF protection for POST/PUT/DELETE/PATCH requests. Default is True. Recommended to keep enabled.
enable_security_headers – Add security headers to responses. Default is True. Includes HSTS, CSP, COOP, COEP, etc.
hsts_max_age – HTTP Strict Transport Security max-age in seconds. Default is 31536000 (1 year). Tells browsers to use HTTPS.
hsts_include_subdomains – Include subdomains in HSTS policy. Default is True. Applies HSTS to all subdomains.
csp_policy – Content Security Policy directive string. Default restricts to same-origin and blocks objects. Customize based on your application’s needs.
referrer_policy – Referrer-Policy header value. Default is ‘strict-origin-when-cross-origin’.
coop_policy – Cross-Origin-Opener-Policy header value. Default is ‘same-origin’. Prevents window.opener access.
coep_policy – Cross-Origin-Embedder-Policy header value. Default is ‘require-corp’. Enables SharedArrayBuffer.
session_storage – Storage backend for sessions. Defaults to FileSessionStorage. Can be FileSessionStorage, MemorySessionStorage, or custom implementation.
debug – Debug mode. Disables some security warnings. Default is False. Never use True in production.
cookie_name – Name of the session cookie. Default is ‘session_id’.
cookie_secure – Require HTTPS for session cookies. Default is True. Forced to True in production.
cookie_httponly – Prevent JavaScript access to session cookies. Default is True. Forced to True in production.
cookie_samesite – SameSite cookie attribute. Default is ‘Strict’. Forced to ‘Strict’ in production. Options: ‘Strict’, ‘Lax’, ‘None’ (requires Secure=True).
cookie_path – Path scope for session cookie. Default is ‘/’. Cookie valid for entire site.
cookie_domain – Domain scope for session cookie. Default is None (current domain only).
cookie_max_age – Session cookie lifetime in seconds. Default is SESSION_EXPIRATION_TIME (typically 24 hours).
rolling_sessions – Refresh session on every request. Default is False. When True, session expiry is extended on each request.
sign_session_id – Sign session IDs with HMAC-SHA256. Default is False. Prevents session ID tampering when enabled.
Examples
Basic security setup:
from gobstopper.middleware import SecurityMiddleware security = SecurityMiddleware( secret_key='your-secret-key-here', enable_csrf=True, enable_security_headers=True ) app.add_middleware(security)
Production configuration:
import os security = SecurityMiddleware( secret_key=os.environ['SECRET_KEY'], enable_csrf=True, enable_security_headers=True, cookie_secure=True, cookie_httponly=True, cookie_samesite='Strict', rolling_sessions=True, sign_session_id=True, csp_policy="default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'" ) app.add_middleware(security)
Custom session storage:
from gobstopper.sessions.storage import MemorySessionStorage security = SecurityMiddleware( secret_key='your-secret-key', session_storage=MemorySessionStorage() ) app.add_middleware(security)
Development setup (relaxed):
security = SecurityMiddleware( secret_key='dev-key', cookie_secure=False, # Allow HTTP in development cookie_samesite='Lax', debug=True ) app.add_middleware(security)
Note
Always use a strong, persistent secret_key in production
CSRF tokens must be included in forms or headers for state-changing requests
Session data is available via request.session dictionary
Call generate_csrf_token() to create tokens for templates
Use regenerate_session_id() after privilege escalation (e.g., login)
- Security Best Practices:
Set ENV=production for production deployments
Use HTTPS in production (required for secure cookies)
Regenerate session ID after authentication changes
Implement session timeout and idle timeout
Use database-backed sessions for multi-server deployments
Regularly rotate the secret_key (requires session invalidation)
Monitor and log security events
See also
CORSMiddleware: For cross-origin request handling
gobstopper.sessions.storage: Session storage backends
OWASP CSRF Prevention: https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html
- __init__(secret_key: str | None = None, enable_csrf: bool = True, enable_security_headers: bool = True, hsts_max_age: int = 31536000, hsts_include_subdomains: bool = True, csp_policy: str = "default-src 'self'; object-src 'none'", referrer_policy: str = 'strict-origin-when-cross-origin', coop_policy: str = 'same-origin', coep_policy: str = 'require-corp', session_storage: BaseSessionStorage | AsyncBaseSessionStorage | None = None, debug: bool = False, cookie_name: str = 'session_id', cookie_secure: bool = True, cookie_httponly: bool = True, cookie_samesite: str = 'Strict', cookie_path: str = '/', cookie_domain: str | None = None, cookie_max_age: int = 604800, rolling_sessions: bool = False, sign_session_id: bool = False)[source]¶
- sign_cookie_value(session_id: str) str[source]¶
Return a signed cookie value when signing is enabled.
Signs the session ID using HMAC-SHA256 to prevent tampering. The signature is appended to the session ID with a dot separator.
- Parameters:
session_id – The session ID to sign.
- Returns:
Signed session ID in format “session_id.signature” if signing is enabled, otherwise returns the original session_id unchanged.
Note
Only signs if sign_session_id=True was set during initialization
Uses HMAC-SHA256 with the middleware’s secret key
Signature prevents session ID tampering and fixation attacks
- get_session_id(request: Request) str | None[source]¶
Public accessor for the current request’s session ID.
Retrieves the session ID that was set by the middleware during request processing.
- Parameters:
request – The request object.
- Returns:
The session ID string if a session is active, None otherwise.
Note
This is the preferred way to access session IDs from application code
Returns None if no session exists or middleware hasn’t run yet
- async __call__(request: Request, call_next: Callable[[Request], Awaitable[Any]]) Response[source]¶
Process request through security middleware.
Handles session loading, CSRF validation, request processing, and session saving. Also adds security headers to responses.
- Parameters:
request – The incoming HTTP request.
call_next – The next middleware or handler in the chain.
- Returns:
Response with security headers added, or 403 Forbidden if CSRF validation fails.
Note
Loads session from storage if session cookie exists
Validates CSRF token for POST/PUT/DELETE/PATCH requests
Saves session if modified or rolling_sessions is enabled
Adds security headers to all responses
Sets request.session dict for application use
- generate_csrf_token(session: Dict[str, Any]) str[source]¶
Generate a CSRF token and store it in the session.
Creates a cryptographically secure random token and stores it in the session. This token must be included in subsequent state-changing requests.
- Parameters:
session – The session dictionary to store the token in.
- Returns:
The generated CSRF token string (URL-safe base64).
Note
Token is stored in session[‘csrf_token’]
Should be called when rendering forms that will submit data
Tokens are validated on POST/PUT/DELETE/PATCH requests
Token is unique per session and remains valid until session expires
Example
In a route handler:
@app.get('/form') async def show_form(request): token = security.generate_csrf_token(request.session) return template('form.html', csrf_token=token)
- create_session_id() str[source]¶
Generate a new cryptographically secure session ID.
Creates a URL-safe base64-encoded random session ID suitable for use as a session identifier.
- Returns:
A 32-byte URL-safe random session ID string.
Note
Uses secrets.token_urlsafe() for cryptographic randomness
IDs are 43 characters long (32 bytes base64-encoded)
Safe to use in cookies and URLs
- async create_session(data: dict) str[source]¶
Create a new session with the provided data.
Generates a new session ID, saves the session data to storage, and returns the session ID.
- Parameters:
data – Dictionary of session data to store.
- Returns:
The newly created session ID.
Note
Automatically generates a secure session ID
Saves to configured session storage
Use this for programmatic session creation (e.g., after login)
Example:
session_id = await security.create_session({ 'user_id': user.id, 'authenticated': True }) # Set cookie with session_id
- async destroy_session(session_id: str)[source]¶
Destroy a session by deleting it from storage.
Removes all session data from the storage backend. This is typically called during logout.
- Parameters:
session_id – The session ID to destroy.
Note
Deletes session from storage immediately
Does not clear the client’s cookie (handle separately)
Safe to call even if session doesn’t exist
Example:
@app.post('/logout') async def logout(request): session_id = security.get_session_id(request) if session_id: await security.destroy_session(session_id) # Also clear the cookie in response return Response('Logged out')
- async regenerate_session_id(request: Request) str | None[source]¶
Regenerate session ID for the current session.
Creates a new session ID and migrates existing session data to it, then deletes the old session. This is critical for preventing session fixation attacks after privilege escalation.
- Parameters:
request – The request object with the current session.
- Returns:
The new session ID string, or None if no session was active.
Note
Always call this after login or privilege changes
Preserves all session data while changing the ID
Old session is deleted to prevent reuse
Updates request._session_id with the new ID
Example
After successful login:
@app.post('/login') async def login(request): if validate_credentials(username, password): # Regenerate to prevent session fixation new_sid = await security.regenerate_session_id(request) request.session['user_id'] = user.id request.session['authenticated'] = True return Response('Login successful')
Templates¶
Jinja2 template engine for Gobstopper framework
- class gobstopper.templates.engine.TemplateEngine(template_folder: str | Path = 'templates', auto_reload: bool = True, cache_size: int = 400)[source]¶
Bases:
objectJinja2-based template engine with async support
- __init__(template_folder: str | Path = 'templates', auto_reload: bool = True, cache_size: int = 400)[source]¶
- add_search_path(path: str | Path)[source]¶
Add an additional search path for templates (used by blueprints).
Utilities¶
Simple in-memory token-bucket rate limiter and middleware helpers.
- Usage example:
from gobstopper.utils.rate_limiter import TokenBucketLimiter, rate_limit
limiter = TokenBucketLimiter(rate=5, capacity=10) # 5 tokens/sec, burst 10
@app.get(‘/limited’) @rate_limit(limiter, key=lambda req: req.client_ip) async def limited(request):
return {‘ok’: True}
- class gobstopper.utils.rate_limiter.TokenBucketLimiter(rate: float, capacity: int)[source]¶
Bases:
object
- gobstopper.utils.rate_limiter.rate_limit(limiter: TokenBucketLimiter, key: Callable[[Any], str] | None = None, cost: float = 1.0)[source]¶
Middleware decorator factory to enforce rate limits.
Example
limiter = TokenBucketLimiter(rate=5, capacity=10)
@app.get(‘/api/x’) @rate_limit(limiter, key=lambda req: req.client_ip) async def handler(request):
return {‘ok’: True}
Idempotency helper for POST endpoints.
Provides a simple in-memory TTL store to deduplicate requests based on an Idempotency-Key header or provided key argument. Suitable for a single process. For multi-process deployments, use a shared cache (Redis, etc.).
- gobstopper.utils.idempotency.use_idempotency(ttl_seconds: float = 60.0)[source]¶
Decorator to enforce idempotency for handlers.
Reads key from request headers (Idempotency-Key) or request.args[“idempotency_key”]. Caches the result for ttl_seconds and returns cached response for repeated keys.
Note: Returns the exact previous handler return value. If the handler returns a Response, it will be reused as-is; for dict/list the JSONResponse wrapping will happen as usual in the app pipeline.