API Reference

This section contains the complete API reference for Gobstopper framework.

Core Application

Main Gobstopper application class

class gobstopper.core.app.Gobstopper(name: str = 'gobstopper.core.app', debug: bool = False, slash_policy: str = 'strict')[source]

Bases: object

__init__(name: str = 'gobstopper.core.app', debug: bool = False, slash_policy: str = 'strict')[source]
init_templates(template_folder: str = 'templates', use_rust: bool = None, **kwargs)[source]

Initialize template engine with Jinja2 or Rust backend.

Configures the template rendering system with either the Python Jinja2 engine or the high-performance Rust template engine. The Rust engine provides significant performance improvements and supports streaming template rendering for large datasets.

Parameters:
  • template_folder – Directory path containing template files. Must be relative to application root or absolute path.

  • use_rust – Template engine selection: - None: Auto-detect (uses Rust if available) - True: Force Rust engine (raises ImportError if unavailable) - False: Force Python Jinja2 engine

  • **kwargs – Additional template engine options: - auto_reload (bool): Watch templates for changes (default: True) - cache_size (int): Template cache size (default: 400) - enable_streaming (bool): Enable streaming rendering (Rust only) - enable_hot_reload (bool): Hot reload templates (Rust only)

Raises:
  • ImportError – If Jinja2 is not installed or Rust engine requested but unavailable

  • FileNotFoundError – If template_folder does not exist

Examples

Auto-detection (recommended):

>>> app.init_templates("templates")  # Uses Rust if available

Force specific engine:

>>> app.init_templates("templates", use_rust=True)   # Rust only
>>> app.init_templates("templates", use_rust=False)  # Jinja2 only

With custom options:

>>> app.init_templates(
...     "templates",
...     auto_reload=False,     # Disable file watching
...     cache_size=1000,       # Larger cache
...     enable_streaming=True  # Rust streaming (if available)
... )

Note

Must be called before using render_template(). Rust engine provides 2-5x performance improvement over Jinja2.

See also

render_template(): Render templates context_processor(): Add global template context

add_middleware(middleware: Callable[[Request, Callable[[Request], Awaitable[Any]]], Awaitable[Response]], priority: int = 0)[source]

Add middleware to the application with priority-based execution order.

Registers middleware functions that intercept HTTP requests and responses. Middleware executes in priority order (highest first) and can modify requests, responses, or short-circuit the handler chain. This is the primary mechanism for cross-cutting concerns like authentication, logging, and request preprocessing.

Parameters:
  • middleware – Callable accepting (request, next_handler) and returning Response. Can be sync or async. Must call next_handler(request) to continue the chain or return a Response directly to short-circuit.

  • priority – Execution priority (default: 0). Higher values execute first. Use negative priorities for post-processing middleware.

Examples

Basic middleware:

>>> async def auth_middleware(request, next_handler):
...     if not request.headers.get('authorization'):
...         return Response("Unauthorized", status=401)
...     return await next_handler(request)
>>> app.add_middleware(auth_middleware, priority=100)

Logging middleware:

>>> async def log_middleware(request, next_handler):
...     start = time.time()
...     response = await next_handler(request)
...     duration = time.time() - start
...     app.logger.info(f"{request.method} {request.path} - {duration:.3f}s")
...     return response
>>> app.add_middleware(log_middleware, priority=50)

Note

Middleware execution order: 1. Application-level (highest priority first) 2. Blueprint-level (outer to inner, by priority) 3. Route-level (as registered)

Middleware can be sync or async. The framework handles both transparently.

See also

register_blueprint(): Blueprint middleware composition gobstopper.middleware.cors.CORSMiddleware: Built-in CORS middleware gobstopper.middleware.security.SecurityMiddleware: Security headers

route(path: str, methods: list[str] = None, name: str = None)[source]

Decorator to register HTTP routes with path parameters and method filtering.

Primary routing decorator that maps URL patterns to handler functions. Supports static paths, path parameters with optional type conversion, and multiple HTTP methods. This is the foundation of Gobstopper’s routing system.

Parameters:
  • path – URL pattern to match. Supports formats: - Static: "/users" - Path parameters: "/users/<user_id>" or "/users/<int:user_id>" - Multiple params: "/posts/<post_id>/comments/<comment_id>" Supported parameter types: str (default), int, float, uuid, path, date

  • methods – List of HTTP methods (default: [‘GET’]). Common values: GET, POST, PUT, DELETE, PATCH, OPTIONS

  • name – Optional name for reverse routing with url_for(). Defaults to function name.

Returns:

Decorator function that registers the handler and returns it unchanged.

Examples

Simple GET route:

>>> @app.route("/")
>>> async def home(request):
...     return {"message": "Welcome"}

Multiple methods:

>>> @app.route("/api/data", methods=['GET', 'POST'])
>>> async def data_handler(request):
...     if request.method == 'GET':
...         return {"data": [...]}
...     return {"created": True}

Path parameters with type conversion:

>>> @app.route("/users/<int:user_id>")
>>> async def get_user(request, user_id: int):
...     # user_id is automatically converted to int
...     return {"user_id": user_id}

Complex patterns:

>>> @app.route("/blog/<date:pub_date>/posts/<uuid:post_id>")
>>> async def get_post(request, pub_date, post_id):
...     return {"date": pub_date, "id": post_id}

Note

  • Route conflicts are detected and logged but do not prevent registration

  • Routes are matched in registration order when using Python router

  • Rust router provides O(1) lookup for static routes

  • Path parameters are automatically decoded from URL encoding

  • Type conversion errors return 400 Bad Request automatically

See also

get(): Convenience decorator for GET routes post(): Convenience decorator for POST routes put(): Convenience decorator for PUT routes delete(): Convenience decorator for DELETE routes websocket(): WebSocket route registration

get(path: str, name: str = None)[source]

Convenience decorator for registering GET routes.

Shorthand for @app.route(path, methods=['GET']). Use for read-only operations that retrieve data without side effects.

Parameters:

path – URL pattern to match (same format as route()).

Returns:

Decorator function that registers the GET handler.

Examples

>>> @app.get("/users")
>>> async def list_users(request):
...     return {"users": [...]}

See also

route(): Full routing documentation and path parameter syntax

post(path: str, name: str = None)[source]

Convenience decorator for registering POST routes.

Shorthand for @app.route(path, methods=['POST']). Use for creating resources or submitting data with side effects.

Parameters:

path – URL pattern to match (same format as route()).

Returns:

Decorator function that registers the POST handler.

Examples

>>> @app.post("/users")
>>> async def create_user(request):
...     data = await request.json()
...     return {"id": new_user_id}, 201

See also

route(): Full routing documentation

put(path: str, name: str = None)[source]

Convenience decorator for registering PUT routes.

Shorthand for @app.route(path, methods=['PUT']). Use for full resource updates (replacing entire resource).

Parameters:

path – URL pattern to match (same format as route()).

Returns:

Decorator function that registers the PUT handler.

Examples

>>> @app.put("/users/<int:user_id>")
>>> async def update_user(request, user_id: int):
...     data = await request.json()
...     return {"updated": True}

See also

route(): Full routing documentation patch(): Partial resource updates

delete(path: str, name: str = None)[source]

Convenience decorator for registering DELETE routes.

Shorthand for @app.route(path, methods=['DELETE']). Use for removing resources.

Parameters:

path – URL pattern to match (same format as route()).

Returns:

Decorator function that registers the DELETE handler.

Examples

>>> @app.delete("/users/<int:user_id>")
>>> async def delete_user(request, user_id: int):
...     return {"deleted": True}, 204

See also

route(): Full routing documentation

patch(path: str, name: str = None)[source]

Convenience decorator for registering PATCH routes.

Shorthand for @app.route(path, methods=['PATCH']). Use for partial resource updates (modifying specific fields).

Parameters:

path – URL pattern to match (same format as route()).

Returns:

Decorator function that registers the PATCH handler.

Examples

>>> @app.patch("/users/<int:user_id>")
>>> async def patch_user(request, user_id: int):
...     data = await request.json()  # Only changed fields
...     return {"updated": True}

See also

route(): Full routing documentation put(): Full resource updates

options(path: str, name: str = None)[source]

Convenience decorator for registering OPTIONS routes.

Shorthand for @app.route(path, methods=['OPTIONS']). Use for CORS preflight requests or capability discovery.

Parameters:

path – URL pattern to match (same format as route()).

Returns:

Decorator function that registers the OPTIONS handler.

Examples

>>> @app.options("/api/data")
>>> async def data_options(request):
...     return Response("", headers={"Allow": "GET, POST"})

See also

route(): Full routing documentation gobstopper.middleware.cors.CORSMiddleware: Automatic CORS handling

mount(path: str, app: Gobstopper)[source]

Mount a sub-application at the given path prefix.

Registers a complete Gobstopper application to handle requests under a specific path prefix. This enables modular application architecture where different subsystems can be developed independently and composed together.

Parameters:
  • path – URL prefix for the sub-application. Must start with ‘/’. Trailing slashes are automatically normalized. Examples: “/api”, “/admin”, “/v1”

  • app – Gobstopper application instance to mount. All routes in the mounted app will be accessible under the specified prefix.

Returns:

The mounted application instance (for chaining).

Examples

Basic mounting:

>>> # Create admin sub-application
>>> admin_app = Gobstopper("admin")
>>> @admin_app.get("/dashboard")
>>> async def admin_dashboard(request):
...     return {"admin": True}
>>>
>>> # Mount under /admin prefix
>>> app.mount("/admin", admin_app)
>>> # Now accessible at: /admin/dashboard

Multiple mounts:

>>> api_v1 = Gobstopper("api_v1")
>>> api_v2 = Gobstopper("api_v2")
>>> app.mount("/api/v1", api_v1)
>>> app.mount("/api/v2", api_v2)

Mounting with separate configurations:

>>> debug_app = Gobstopper("debug", debug=True)
>>> @debug_app.get("/info")
>>> async def debug_info(request):
...     return {"debug": True}
>>> app.mount("/_debug", debug_app)

Note

  • Mounted apps maintain their own middleware, error handlers, and configuration

  • Path stripping is automatic: mounted app sees paths relative to mount point

  • Mount order matters: earlier mounts are checked first

  • Mount points should not overlap with main app routes

See also

register_blueprint(): Alternative for simpler route grouping

register_blueprint(blueprint, url_prefix: str | None = None)[source]

Register a Blueprint on this app with an optional URL prefix.

Blueprints provide a structured way to organize related routes, middleware, and handlers into reusable components. They support nesting, scoped middleware, and can have their own template and static file directories.

Parameters:
  • blueprint – Blueprint instance to register. The blueprint contains routes, middleware, hooks (before_request, after_request), and optional template/static folder configurations.

  • url_prefix – Optional URL prefix for all blueprint routes (default: None). If provided, overrides blueprint’s own url_prefix. Must start with ‘/’. Examples: “/api”, “/admin”, “/v1”

Examples

Basic blueprint registration:

>>> from gobstopper.core.blueprint import Blueprint
>>> api = Blueprint("api", url_prefix="/api")
>>>
>>> @api.get("/users")
>>> async def list_users(request):
...     return {"users": [...]}
>>>
>>> app.register_blueprint(api)
>>> # Route available at: /api/users

Override prefix at registration:

>>> admin_bp = Blueprint("admin", url_prefix="/admin")
>>> app.register_blueprint(admin_bp, url_prefix="/dashboard")
>>> # Routes use /dashboard instead of /admin

Nested blueprints:

>>> api = Blueprint("api", url_prefix="/api")
>>> v1 = Blueprint("v1", url_prefix="/v1")
>>> v1.get("/users")(user_handler)
>>> api.register_blueprint(v1)
>>> app.register_blueprint(api)
>>> # Route available at: /api/v1/users

Blueprint with middleware:

>>> auth_bp = Blueprint("auth")
>>> auth_bp.add_middleware(auth_middleware, priority=100)
>>> @auth_bp.get("/profile")
>>> async def profile(request):
...     return {"user": request.user}
>>> app.register_blueprint(auth_bp, url_prefix="/secure")

Note

Middleware execution order with blueprints: 1. Application-level middleware (highest priority first) 2. Parent blueprint middleware (outer to inner) 3. Child blueprint middleware 4. Route-level middleware (innermost)

Before/after request handlers from blueprints are attached to the app. Blueprint template folders are added to template search paths. Blueprint static folders automatically get static file middleware.

See also

gobstopper.core.blueprint.Blueprint: Blueprint class documentation add_middleware(): Adding middleware to applications mount(): Alternative for mounting complete sub-applications

websocket(path: str)[source]

Decorator for registering WebSocket routes with path parameters.

Registers WebSocket connection handlers that manage bidirectional communication channels. WebSocket routes support the same path parameter syntax as HTTP routes but use a different protocol lifecycle (connect, message exchange, disconnect).

Parameters:

path – URL pattern to match. Supports same format as HTTP routes: - Static: "/ws/chat" - Path parameters: "/ws/room/<room_id>" - Type conversion: "/ws/user/<int:user_id>"

Returns:

Decorator function that registers the WebSocket handler.

Examples

Basic WebSocket echo:

>>> @app.websocket("/ws/echo")
>>> async def echo_handler(websocket):
...     await websocket.accept()
...     async for message in websocket:
...         await websocket.send(f"Echo: {message}")

WebSocket with path parameters:

>>> @app.websocket("/ws/room/<room_id>")
>>> async def room_handler(websocket, room_id: str):
...     await websocket.accept()
...     # Join room
...     async for message in websocket:
...         # Broadcast to room
...         await broadcast_to_room(room_id, message)

WebSocket with authentication:

>>> @app.websocket("/ws/notifications")
>>> async def notifications(websocket):
...     token = websocket.query_params.get('token')
...     if not validate_token(token):
...         await websocket.close(code=1008)  # Policy violation
...         return
...     await websocket.accept()
...     # Send notifications
...     while True:
...         notification = await get_next_notification()
...         await websocket.send_json(notification)

Note

  • WebSocket handlers must call await websocket.accept() before communication

  • Handlers should handle connection cleanup (use try/finally)

  • Path parameters work identically to HTTP routes

  • WebSocket middleware is not yet supported (use before_request for auth)

  • Connection errors are logged automatically

See also

gobstopper.websocket.connection.WebSocket: WebSocket connection API route(): HTTP route registration for comparison

url_for(name: str, **params) str[source]

Build a URL for a named route with parameters (reverse routing).

Flask/Quart-style reverse routing that generates URLs from route names. Routes are automatically named with their function name, or you can provide a custom name using the name parameter in route decorators. Blueprint routes are qualified with the blueprint name (e.g., ‘admin.login’).

Parameters:
  • name – Route name (function name, custom name, or ‘blueprint.function’)

  • **params – URL parameters to substitute into the route pattern

Returns:

Generated URL path as a string

Raises:

ValueError – If the named route doesn’t exist or parameters are missing

Examples

Basic usage:

>>> @app.get('/users/<int:id>', name='user_detail')
>>> async def get_user(request):
...     return {"user": ...}
>>>
>>> app.url_for('user_detail', id=123)
'/users/123'

With multiple parameters:

>>> @app.get('/posts/<int:year>/<int:month>')
>>> async def posts_archive(request):
...     return {"posts": ...}
>>>
>>> app.url_for('posts_archive', year=2024, month=12)
'/posts/2024/12'

Blueprint routes (Flask-style):

>>> admin = Blueprint('admin', __name__)
>>> @admin.get('/login')
>>> async def login(request):
...     return {"login": True}
>>>
>>> app.register_blueprint(admin, url_prefix='/admin')
>>> app.url_for('admin.login')  # Returns '/admin/login'
'/admin/login'

In request handlers with redirect:

>>> @app.post('/users')
>>> async def create_user(request):
...     new_id = save_user()
...     return redirect(app.url_for('user_detail', id=new_id))

Note

  • Requires Rust router for best performance

  • Falls back to route scanning if Rust router unavailable

  • Route names default to function names

  • Custom names provided via decorator name parameter

See also

redirect(): Convenience function for redirecting to URLs route(): How to register named routes

task(name: str = None, category: str = 'default')[source]

Decorator to register background task handlers with categorization.

Registers async functions as background tasks that can be queued and executed asynchronously. Tasks are persisted to DuckDB, support retries, priority levels, and progress tracking. Tasks are organized by category for separate worker pools.

Parameters:
  • name – Task identifier (default: function name). Used when queuing tasks via add_background_task(). Must be unique within category.

  • category – Task category for worker pool isolation (default: “default”). Tasks in different categories run in separate worker pools. Examples: “email”, “data_processing”, “notifications”

Returns:

Decorator function that registers the task handler.

Examples

Basic task registration:

>>> @app.task("send_email", category="notifications")
>>> async def send_email(to: str, subject: str, body: str):
...     await email_client.send(to, subject, body)
...     return {"sent": True}

Task with default name (uses function name):

>>> @app.task(category="data")
>>> async def process_upload(file_path: str):
...     data = await read_file(file_path)
...     result = await process_data(data)
...     return {"rows": len(result)}

Task with progress tracking:

>>> @app.task("import_data", category="imports")
>>> async def import_large_dataset(source_url: str):
...     total = await get_row_count(source_url)
...     for i, batch in enumerate(fetch_batches(source_url)):
...         await process_batch(batch)
...         # Progress tracked automatically
...     return {"imported": total}

Queuing registered tasks:

>>> # In a request handler
>>> task_id = await app.add_background_task(
...     "send_email",
...     category="notifications",
...     priority=TaskPriority.HIGH,
...     to="user@example.com",
...     subject="Welcome",
...     body="Hello!"
... )
>>> return {"task_id": task_id}

Note

  • Tasks must be async functions

  • Task return values are stored and can be retrieved later

  • Failed tasks can be automatically retried based on max_retries

  • Tasks persist across application restarts (stored in DuckDB)

  • Each category needs worker processes started via start_task_workers()

See also

add_background_task(): Queue tasks for execution start_task_workers(): Start worker pools for task categories gobstopper.tasks.queue.TaskQueue: Task queue implementation gobstopper.tasks.queue.TaskPriority: Priority levels (LOW, NORMAL, HIGH, URGENT)

before_request(func: Callable[[...], Any]) Callable[[...], Any][source]

Register a before-request handler that runs before each HTTP request.

Before-request handlers execute after middleware but before the route handler. They can perform request validation, inject request-scoped data, or short-circuit the request by returning a Response directly.

Parameters:

func – Callable accepting (request) and optionally returning Response. Can be sync or async. If it returns a Response, the route handler is skipped and that response is returned immediately.

Returns:

The handler function (for decorator chaining).

Examples

Request logging:

>>> @app.before_request
>>> async def log_requests(request):
...     app.logger.info(f"Request: {request.method} {request.path}")
...     # No return - continues to handler

Authentication check:

>>> @app.before_request
>>> async def require_auth(request):
...     if not request.headers.get('authorization'):
...         return Response("Unauthorized", status=401)
...     # Auth successful - continues to handler

Inject request-scoped data:

>>> @app.before_request
>>> async def add_user_context(request):
...     token = request.headers.get('authorization')
...     request.user = await validate_and_get_user(token)

Note

  • Runs for every HTTP request (not WebSocket)

  • Multiple handlers run in registration order

  • First handler to return a Response short-circuits the chain

  • Exceptions propagate and trigger error handlers

See also

after_request(): Post-processing hook add_middleware(): Alternative for cross-cutting concerns

after_request(func: Callable[[...], Any]) Callable[[...], Any][source]

Register an after-request handler that runs after each HTTP request.

After-request handlers execute after the route handler but before sending the response. They can modify responses, add headers, or perform logging. Must accept both request and response parameters.

Parameters:

func – Callable accepting (request, response) and optionally returning modified Response. Can be sync or async. If it returns a Response, that replaces the original. If it returns None, original is used.

Returns:

The handler function (for decorator chaining).

Examples

Add response headers:

>>> @app.after_request
>>> async def add_headers(request, response):
...     response.headers['X-Processed-By'] = 'Gobstopper'
...     return response

Response logging:

>>> @app.after_request
>>> async def log_response(request, response):
...     app.logger.info(f"Response: {response.status}")
...     # No return - original response used

Security headers:

>>> @app.after_request
>>> def add_security_headers(request, response):
...     response.headers['X-Content-Type-Options'] = 'nosniff'
...     response.headers['X-Frame-Options'] = 'DENY'
...     return response

Note

  • Runs for every successful HTTP request (not WebSocket)

  • Multiple handlers run in registration order

  • Each handler receives the response from the previous handler

  • Does not run if an exception occurs (use error handlers instead)

See also

before_request(): Pre-processing hook error_handler(): Handle exceptions

context_processor(func: Callable[[], dict]) Callable[[], dict][source]

Register a template context processor for global template variables.

Context processors provide variables that are automatically available in all rendered templates. They run before each template render and can inject dynamic global context like current user, configuration, or request data.

Parameters:

func – Callable returning a dictionary of template variables. Can be sync or async. The returned dict is merged into every template’s context. Should be idempotent and fast.

Returns:

The processor function (for decorator chaining).

Examples

Add global template variables:

>>> @app.context_processor
>>> def inject_globals():
...     return {
...         'app_name': 'Gobstopper',
...         'version': '1.0.0',
...         'current_year': 2025
...     }

Inject current user:

>>> @app.context_processor
>>> async def inject_user():
...     # Access request context if available
...     return {'user': getattr(request, 'user', None)}

Configuration values:

>>> @app.context_processor
>>> def inject_config():
...     return {
...         'debug': app.debug,
...         'site_name': os.getenv('SITE_NAME', 'Gobstopper App')
...     }

Note

  • Runs before every template render

  • Multiple processors are merged in registration order

  • Later processors can override earlier ones

  • Should be fast - runs on every template render

  • Only affects templates, not JSON responses

See also

render_template(): Template rendering init_templates(): Template engine initialization template_filter(): Custom template filters template_global(): Custom template globals

template_filter(name: str = None)[source]

Decorator to register template filter

Note: When using the Rust template engine, custom filters are not supported. In that case this becomes a no-op to avoid noisy warnings.

template_global(name: str = None)[source]

Decorator to register template global

Note: When using the Rust template engine, custom globals are not supported. In that case this becomes a no-op to avoid noisy warnings.

error_handler(status_code: int)[source]

Decorator to register custom error handlers for HTTP status codes.

Registers handlers that customize error responses for specific HTTP status codes. Error handlers receive the request and exception, allowing for custom error pages, logging, or error tracking integration.

Parameters:

status_code – HTTP status code to handle (e.g., 404, 500, 403, 429). Common codes: 400 (Bad Request), 401 (Unauthorized), 403 (Forbidden), 404 (Not Found), 500 (Internal Server Error).

Returns:

Decorator function that registers the error handler.

Examples

Custom 404 page:

>>> @app.error_handler(404)
>>> async def not_found(request, error):
...     return await app.render_template(
...         "errors/404.html",
...         path=request.path
...     )

Custom 500 with error tracking:

>>> @app.error_handler(500)
>>> async def server_error(request, error):
...     # Log to error tracking service
...     await error_tracker.capture(error, request)
...     return JSONResponse(
...         {"error": "Internal server error", "request_id": request.id},
...         status=500
...     )

Rate limit exceeded:

>>> @app.error_handler(429)
>>> async def rate_limited(request, error):
...     return JSONResponse(
...         {"error": "Too many requests", "retry_after": 60},
...         status=429,
...         headers={"Retry-After": "60"}
...     )

Unauthorized with custom message:

>>> @app.error_handler(401)
>>> async def unauthorized(request, error):
...     return JSONResponse(
...         {
...             "error": "Authentication required",
...             "login_url": "/auth/login"
...         },
...         status=401
...     )

Note

  • Default handlers for 404 and 500 use themed error pages

  • Error handlers run outside middleware chain

  • Exceptions in error handlers fall back to basic text responses

  • Error handlers can be sync or async

See also

before_request(): Pre-request validation gobstopper.http.response.Response: Response construction gobstopper.http.response.JSONResponse: JSON error responses

async render_template(template_name: str, stream: bool = False, **context) Response[source]

Render template with context data and return HTTP response.

Renders the specified template file with provided context variables, returning a properly formatted HTTP response. Supports both traditional rendering and progressive streaming for large datasets.

Parameters:
  • template_name – Name of template file relative to template folder. Must include file extension (e.g., “page.html”, “email.txt”).

  • stream – Enable progressive rendering (Rust engine only): - False: Traditional rendering - template fully rendered before response - True: Streaming rendering - template chunks sent as generated

  • **context – Template context variables passed to template. Variables become available in template as {{ variable_name }}.

Returns:

HTTP response with rendered HTML content. StreamResponse: For streaming renders (when stream=True).

Return type:

Response

Raises:

Examples

Basic template rendering:

>>> @app.get("/")
>>> async def index(request):
...     return await app.render_template("index.html",
...                                    title="Home Page",
...                                    user_name="Alice")

With complex context:

>>> @app.get("/dashboard")
>>> async def dashboard(request):
...     users = await get_users()
...     return await app.render_template("dashboard.html",
...                                    users=users,
...                                    page_title="Dashboard")

Streaming large datasets (Rust only):

>>> @app.get("/report")
>>> async def large_report(request):
...     big_data = await get_large_dataset()
...     return await app.render_template("report.html",
...                                    stream=True,
...                                    data=big_data)

Note

Context processors are automatically applied to provide global variables. Streaming requires Rust template engine and compatible templates.

See also

init_templates(): Initialize template system context_processor(): Add global template context gobstopper.http.Response: Response objects

async add_background_task(name: str, category: str = 'default', priority: TaskPriority = TaskPriority.NORMAL, max_retries: int = 0, *args, **kwargs) str[source]

Add a background task to the queue for asynchronous execution.

Queues a registered task for execution by worker processes. Tasks are persisted to DuckDB immediately and executed based on priority and worker availability. Returns a task ID for tracking progress and retrieving results.

Parameters:
  • name – Name of the registered task (as specified in task() decorator).

  • category – Task category (default: “default”). Must match task registration. Category determines which worker pool executes the task.

  • priority – Execution priority (default: TaskPriority.NORMAL). Options: TaskPriority.LOW, NORMAL, HIGH, URGENT. Higher priority tasks execute before lower priority.

  • max_retries – Maximum retry attempts on failure (default: 0). Task will be retried up to this many times if it raises an exception.

  • *args – Positional arguments passed to task handler.

  • **kwargs – Keyword arguments passed to task handler.

Returns:

Task ID (UUID string) for tracking and result retrieval.

Raises:
  • KeyError – If task name is not registered in the specified category.

  • Exception – If task queueing fails (database error, etc.).

Examples

Queue a simple task:

>>> task_id = await app.add_background_task(
...     "send_email",
...     to="user@example.com",
...     subject="Welcome",
...     body="Hello!"
... )
>>> return {"task_id": task_id}

High-priority task with retries:

>>> task_id = await app.add_background_task(
...     "process_payment",
...     category="payments",
...     priority=TaskPriority.HIGH,
...     max_retries=3,
...     payment_id=123,
...     amount=99.99
... )

Urgent task (process immediately):

>>> task_id = await app.add_background_task(
...     "send_alert",
...     category="notifications",
...     priority=TaskPriority.URGENT,
...     message="System alert",
...     recipients=["admin@example.com"]
... )

Track task status:

>>> # In route handler
>>> task_id = await app.add_background_task("long_process", data=data)
>>> # Later, check status
>>> task_info = await app.task_queue.get_task(task_id)
>>> return {
...     "status": task_info.status,
...     "progress": task_info.progress,
...     "result": task_info.result
... }

Note

  • Tasks execute asynchronously in separate worker processes

  • Task parameters are serialized to JSON (must be JSON-serializable)

  • Workers must be started via start_task_workers() for execution

  • Task status can be queried using the returned task_id

  • Failed tasks remain in database for inspection

See also

task(): Register task handlers start_task_workers(): Start worker pools gobstopper.tasks.queue.TaskPriority: Priority levels gobstopper.tasks.queue.TaskQueue: Task queue operations

async start_task_workers(category: str = 'default', worker_count: int = 1)[source]

Start background worker processes for task execution.

Launches worker processes that poll for and execute queued tasks in the specified category. Workers run continuously until application shutdown. Multiple workers can process tasks concurrently within the same category.

Parameters:
  • category – Task category to process (default: “default”). Workers only execute tasks queued in this category. Must match category used in task() decorator and add_background_task().

  • worker_count – Number of concurrent workers to start (default: 1). More workers enable parallel task processing. Consider CPU-bound vs I/O-bound tasks when sizing worker pools.

Examples

Start default workers:

>>> # In startup handler
>>> @app.on_startup
>>> async def start_workers():
...     await app.start_task_workers("default", worker_count=4)

Multiple categories with different worker counts:

>>> @app.on_startup
>>> async def start_all_workers():
...     # I/O-bound tasks (email, API calls)
...     await app.start_task_workers("notifications", worker_count=10)
...     # CPU-bound tasks (image processing)
...     await app.start_task_workers("processing", worker_count=2)
...     # Critical tasks
...     await app.start_task_workers("payments", worker_count=4)

Single worker for sequential processing:

>>> @app.on_startup
>>> async def start_sequential_worker():
...     # Process tasks one at a time
...     await app.start_task_workers("imports", worker_count=1)

Note

  • Workers run as background asyncio tasks

  • Each worker polls the database for pending tasks

  • Workers respect task priority (URGENT > HIGH > NORMAL > LOW)

  • Workers automatically retry failed tasks based on max_retries

  • Workers shut down gracefully on application shutdown

  • This should be called in on_startup() handlers

See also

task(): Register task handlers add_background_task(): Queue tasks on_startup(): Application startup hooks

on_startup(func)[source]

Decorator to register startup handlers that run once before first request.

Registers functions to execute during application initialization. Startup handlers run lazily on the first request (not at module import time), allowing for async resource initialization like database connections, cache warming, or worker startup.

Parameters:

func – Callable to run at startup. Can be sync or async. Should handle its own exceptions or allow them to propagate to prevent startup completion.

Returns:

The handler function (for decorator chaining).

Examples

Initialize database connection:

>>> @app.on_startup
>>> async def init_database():
...     app.db = await create_db_pool(DATABASE_URL)
...     app.logger.info("Database connected")

Start background workers:

>>> @app.on_startup
>>> async def start_workers():
...     await app.start_task_workers("default", worker_count=4)
...     await app.start_task_workers("email", worker_count=2)

Warm up caches:

>>> @app.on_startup
>>> async def warm_cache():
...     app.config = await load_config()
...     app.cache = await init_redis_cache()

Load ML models:

>>> @app.on_startup
>>> def load_models():
...     app.ml_model = load_pretrained_model("model.pkl")
...     app.logger.info("Model loaded")

Note

  • Handlers run once on first request, not at import time

  • Multiple handlers run in registration order

  • Startup is protected by asyncio.Lock for thread safety

  • Exceptions prevent startup completion and will be retried

  • All handlers must complete before first request proceeds

See also

on_shutdown(): Cleanup handlers start_task_workers(): Starting background workers

on_shutdown(func)[source]

Decorator to register shutdown handlers for graceful cleanup.

Registers functions to execute during application shutdown. Shutdown handlers run after the application stops accepting new requests and in-flight requests complete, allowing for resource cleanup like closing database connections, flushing caches, or stopping background workers.

Parameters:

func – Callable to run at shutdown. Can be sync or async. Exceptions are logged but do not prevent other shutdown handlers from running.

Returns:

The handler function (for decorator chaining).

Examples

Close database connections:

>>> @app.on_shutdown
>>> async def close_database():
...     await app.db.close()
...     app.logger.info("Database connection closed")

Flush caches and save state:

>>> @app.on_shutdown
>>> async def save_state():
...     await app.cache.flush()
...     await save_application_state()
...     app.logger.info("State saved")

Stop external services:

>>> @app.on_shutdown
>>> async def stop_services():
...     await app.websocket_manager.close_all()
...     await app.metrics_client.close()

Cleanup temporary files:

>>> @app.on_shutdown
>>> def cleanup_temp_files():
...     import shutil
...     shutil.rmtree("/tmp/app_uploads", ignore_errors=True)

Note

  • Handlers run after server stops accepting new requests

  • In-flight requests are allowed to complete (with timeout)

  • Multiple handlers run in registration order

  • Exceptions in handlers are logged but don’t stop other handlers

  • Task queue shutdown happens automatically after custom handlers

  • Default shutdown timeout is 10 seconds (configurable via WOPR_SHUTDOWN_TIMEOUT)

See also

on_startup(): Initialization handlers shutdown(): Programmatic shutdown trigger

visualize_routing()[source]

Print a detailed visualization of application routing structure.

Logs a comprehensive view of the routing hierarchy including mounts, blueprints, middleware ordering, and effective middleware chains for each route. Useful for debugging routing issues, understanding middleware execution order, and verifying blueprint composition.

Examples

Call during startup:

>>> @app.on_startup
>>> def show_routes():
...     app.visualize_routing()

Output shows:

>>> # === Routing Visualization ===
>>> # Mount: /api -> api_v1
>>> # Blueprint: auth prefix=/auth
>>> #   MW(prio=100): auth_middleware
>>> #   Blueprint: admin prefix=/admin
>>> #     MW(prio=50): admin_check
>>> # Route ['GET'] /api/users -> MW order: ['cors', 'auth', 'rate_limit']
>>> # === End Visualization ===

Note

  • Shows mounts, blueprints hierarchy, and middleware

  • Displays effective middleware order for each route

  • Middleware order reflects actual execution (app → blueprints → route)

  • Useful for debugging middleware composition issues

  • Output goes to application logger at INFO level

See also

register_blueprint(): Blueprint registration mount(): Sub-application mounting add_middleware(): Middleware registration

async shutdown(timeout: float | None = None)[source]

Initiate graceful application shutdown with in-flight request handling.

Performs a graceful shutdown sequence: stops accepting new requests, waits for in-flight requests to complete (with timeout), runs shutdown hooks, and closes the task queue. This ensures no requests are abruptly terminated and resources are properly cleaned up.

Parameters:

timeout – Maximum seconds to wait for in-flight requests (default: None). If None, reads from environment variable WOPR_SHUTDOWN_TIMEOUT (default: 10 seconds). After timeout, shutdown proceeds anyway.

Examples

Manual shutdown:

>>> # In a special endpoint
>>> @app.post("/admin/shutdown")
>>> async def trigger_shutdown(request):
...     asyncio.create_task(app.shutdown(timeout=30))
...     return {"status": "shutting down"}

Signal handler:

>>> import signal
>>> def handle_sigterm(signum, frame):
...     asyncio.create_task(app.shutdown())
>>> signal.signal(signal.SIGTERM, handle_sigterm)

Custom timeout:

>>> await app.shutdown(timeout=60)  # Wait up to 60s

Note

Shutdown sequence: 1. Stop accepting new requests (503 responses) 2. Wait for in-flight requests to complete (up to timeout) 3. Run registered shutdown hooks via on_shutdown() 4. Shutdown task queue and workers 5. Log any errors but continue shutdown

Environment variable WOPR_SHUTDOWN_TIMEOUT (seconds) sets default timeout. In-flight request count is tracked automatically. Shutdown hooks run even if timeout expires.

See also

on_shutdown(): Register cleanup handlers start_task_workers(): Background workers

async __rsgi__(scope: Scope, protocol)[source]

RSGI application entry point called by Granian server.

Main entry point for the RSGI (Rust Server Gateway Interface) protocol. This method is called by the Granian server for each incoming connection and routes to the appropriate protocol handler (HTTP or WebSocket).

Parameters:
  • scope – RSGI scope object containing request metadata. Expected to have a ‘proto’ field indicating protocol type (‘http’ or ‘ws’/’websocket’).

  • protocol – RSGI protocol object (HTTPProtocol or WebsocketProtocol) for managing the connection and sending responses.

Note

This is the application’s RSGI interface. The __rsgi__ name is required by the RSGI specification and is automatically detected by Granian when you run: granian --interface rsgi app:app

Protocol dispatch: - ‘http’ → _handle_http() - ‘ws’/’websocket’ → _handle_websocket()

The scope is converted to a SimpleNamespace if provided as dict (for test client compatibility).

Examples

Run with Granian:

>>> # In your app.py
>>> from gobstopper import Gobstopper
>>> app = Gobstopper(__name__)
>>> @app.get("/")
>>> async def hello(request):
...     return {"message": "Hello"}

Command line:

>>> # granian --interface rsgi app:app
>>> # granian --interface rsgi --reload app:app  # With hot reload

See also

_handle_http(): HTTP request handler _handle_websocket(): WebSocket connection handler

Blueprint system for grouping routes and related hooks.

Provides a Flask-like Blueprint API that allows organizing routes into reusable modules that can be registered on an application with an optional URL prefix.

class gobstopper.core.blueprint.Blueprint(name: str, url_prefix: str | None = None, *, static_folder: str | None = None, template_folder: str | None = None)[source]

Bases: object

A collection of routes and hooks that can be registered on a Gobstopper app.

Parameters:
  • name – Identifier for the blueprint.

  • url_prefix – Optional URL prefix applied when registering on an app.

__init__(name: str, url_prefix: str | None = None, *, static_folder: str | None = None, template_folder: str | None = None)[source]
route(path: str, methods: list[str] | None = None)[source]
get(path: str)[source]
post(path: str)[source]
put(path: str)[source]
delete(path: str)[source]
patch(path: str)[source]
options(path: str)[source]
websocket(path: str)[source]
register_blueprint(blueprint: Blueprint, url_prefix: str | None = None)[source]
before_request(func: Callable[[...], Any]) Callable[[...], Any][source]
after_request(func: Callable[[...], Any]) Callable[[...], Any][source]
add_middleware(middleware: Middleware, priority: int = 0)[source]

HTTP Components

Request

HTTP Request handling for Gobstopper framework

class gobstopper.http.request.Request(scope: Scope, protocol: RSGIHTTPProtocol)[source]

Bases: object

HTTP request object with async parsing capabilities.

Wraps RSGI request scope and protocol, providing convenient access to request data including headers, query parameters, form data, and JSON payloads. All parsing operations are async and lazy-loaded for optimal performance.

Parameters:
  • scope – RSGI request scope containing request metadata (method, path, headers, etc.)

  • protocol – RSGI protocol instance for reading request body asynchronously

scope

Original RSGI scope object with request metadata

Type:

Scope

protocol

RSGI protocol for async body reading

Type:

HTTPProtocol

method

HTTP method (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS, etc.)

Type:

str

path

Request path without query string (e.g., “/api/users/123”)

Type:

str

query_string

Raw query string without leading ‘?’ (e.g., “page=1&limit=10”)

Type:

str

headers

HTTP headers with case-insensitive access (lowercase keys)

Type:

Dict[str, str]

args

Parsed query parameters (supports multiple values)

Type:

Dict[str, List[str]]

client_ip

Client IP address (respects X-Forwarded-For header)

Type:

str

session

Session data dictionary (set by SecurityMiddleware)

Type:

Optional[Dict[str, Any]]

session_id

Read-only session identifier (set by SecurityMiddleware)

Type:

Optional[str]

logger

Application or framework logger instance for request-scoped logging

Type:

Logger

Examples

Basic request handling:

>>> @app.post("/api/users")
>>> async def create_user(request: Request):
...     data = await request.json()
...     user_ip = request.client_ip
...     request.logger.info(f"User created from {user_ip}")
...     return {"status": "created", "data": data}

Access query parameters:

>>> @app.get("/search")
>>> async def search(request: Request):
...     # URL: /search?q=python&page=2
...     query = request.args.get("q", [""])[0]
...     page = int(request.args.get("page", ["1"])[0])
...     return {"query": query, "page": page}

Session management:

>>> @app.get("/profile")
>>> async def profile(request: Request):
...     if not request.session or not request.session.get("user_id"):
...         return {"error": "Not authenticated"}, 401
...     user_id = request.session["user_id"]
...     return {"user_id": user_id}

Note

Request parsing is lazy - methods like json(), form() only parse when called. Subsequent calls to the same parsing method return cached results. Body reading is async and may involve network I/O operations. Session attributes require SecurityMiddleware to be configured.

See also

Response: HTTP response objects JSONResponse: JSON response helper SecurityMiddleware: Provides session management

__init__(scope: Scope, protocol: RSGIHTTPProtocol)[source]
scope
protocol
session
endpoint
view_args
url_rule
property client_ip: str

Get the client’s IP address with proxy header support.

Attempts to determine the real client IP address by checking the X-Forwarded-For header (for proxied requests) before falling back to the direct connection IP from the RSGI scope.

Returns:

Client IP address as string. Returns “unknown” if IP cannot

be determined from headers or scope.

Return type:

str

Note

When behind a proxy or load balancer, X-Forwarded-For header is respected. Takes the first IP in the chain for multi-proxy setups. For direct connections, uses the client address from RSGI scope.

Examples

Log client IP for requests:

>>> @app.get("/api/endpoint")
>>> async def handler(request: Request):
...     ip = request.client_ip
...     print(f"Request from: {ip}")
...     return {"client": ip}

Rate limiting by IP:

>>> from gobstopper.utils.rate_limiter import RateLimiter
>>> limiter = RateLimiter(max_requests=100, window=60)
>>>
>>> @app.post("/api/action")
>>> async def protected(request: Request):
...     if not limiter.check(request.client_ip):
...         return {"error": "Rate limit exceeded"}, 429
...     return {"status": "success"}
property method: str

Get the HTTP request method.

Returns:

HTTP method in uppercase (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS, etc.)

Return type:

str

Examples

Method-based routing logic:

>>> @app.route("/resource", methods=["GET", "POST"])
>>> async def handle_resource(request: Request):
...     if request.method == "GET":
...         return {"action": "retrieve"}
...     elif request.method == "POST":
...         data = await request.json()
...         return {"action": "create", "data": data}
property path: str

Get the request path without query string.

Returns:

URL path component (e.g., “/api/users/123”). Does not include

query string, fragment, or domain.

Return type:

str

Examples

Path-based logic:

>>> @app.get("/*")
>>> async def catch_all(request: Request):
...     if request.path.startswith("/admin"):
...         # Check admin permissions
...         pass
...     return {"path": request.path}

Extract path segments:

>>> @app.get("/api/*")
>>> async def api_handler(request: Request):
...     segments = request.path.strip("/").split("/")
...     # segments = ["api", "users", "123"]
...     return {"segments": segments}
property query_string: str

Get the raw query string from the URL.

Returns:

Raw query string without leading ‘?’ (e.g., “page=1&limit=10”).

Empty string if no query parameters present.

Return type:

str

Note

For parsed query parameters as a dict, use the args property instead.

Examples

Get raw query string:

>>> @app.get("/search")
>>> async def search(request: Request):
...     # URL: /search?q=python&page=2
...     raw = request.query_string  # "q=python&page=2"
...     return {"raw_query": raw}

See also

args: Parsed query parameters as dict

property args: dict[str, list[str]]

Get parsed query parameters as a dictionary.

Parses the query string into a dictionary mapping parameter names to lists of values. Supports multiple values for the same parameter name. Parsing is lazy-loaded and cached.

Returns:

Dictionary mapping parameter names to lists of values.

Empty dict if no query parameters present.

Return type:

Dict[str, List[str]]

Examples

Single parameter values:

>>> @app.get("/search")
>>> async def search(request: Request):
...     # URL: /search?q=python&page=2
...     query = request.args.get("q", [""])[0]  # "python"
...     page = int(request.args.get("page", ["1"])[0])  # 2
...     return {"query": query, "page": page}

Multiple values for same parameter:

>>> @app.get("/filter")
>>> async def filter_items(request: Request):
...     # URL: /filter?tag=python&tag=web&tag=async
...     tags = request.args.get("tag", [])  # ["python", "web", "async"]
...     return {"tags": tags}

Check if parameter exists:

>>> @app.get("/report")
>>> async def report(request: Request):
...     include_details = "details" in request.args
...     return {"detailed": include_details}

Note

Returns lists even for single values to handle multiple values uniformly. Use request.args.get(“param”, [“default”])[0] to get single value with default. Parsing result is cached for subsequent access.

See also

query_string: Raw unparsed query string

property headers: dict[str, str]

Get HTTP request headers with case-insensitive access.

Provides access to all HTTP headers sent with the request. Header names are normalized to lowercase for case-insensitive lookups. Headers are lazily computed only when accessed for optimal performance.

Returns:

Dictionary mapping lowercase header names to values.

Common headers include ‘content-type’, ‘authorization’, ‘user-agent’, ‘accept’, ‘cookie’, etc.

Return type:

Dict[str, str]

Examples

Check content type:

>>> @app.post("/upload")
>>> async def upload(request: Request):
...     content_type = request.headers.get("content-type", "")
...     if content_type.startswith("application/json"):
...         data = await request.json()
...     elif content_type.startswith("multipart/form-data"):
...         data = await request.multipart()
...     return {"received": True}

Authentication header:

>>> @app.get("/protected")
>>> async def protected(request: Request):
...     auth = request.headers.get("authorization", "")
...     if not auth.startswith("Bearer "):
...         return {"error": "Unauthorized"}, 401
...     token = auth[7:]  # Remove "Bearer " prefix
...     # Validate token...
...     return {"status": "authenticated"}

Custom headers:

>>> @app.get("/api/data")
>>> async def api(request: Request):
...     api_key = request.headers.get("x-api-key")
...     if not api_key:
...         return {"error": "API key required"}, 401
...     return {"data": "sensitive"}

Note

Header names are normalized to lowercase (e.g., “Content-Type” becomes “content-type”). Case-insensitive per HTTP specification (RFC 7230). Lazily computed only when accessed for optimal performance.

property session_id: str | None

Get the current session ID if session middleware is enabled.

Returns the session identifier set by SecurityMiddleware. This is a read-only property - session IDs cannot be modified directly.

Returns:

Session ID string if SecurityMiddleware is enabled and

a session exists. None if no session middleware or no active session.

Return type:

Optional[str]

Examples

Check if user has active session:

>>> @app.get("/dashboard")
>>> async def dashboard(request: Request):
...     if not request.session_id:
...         return {"error": "No active session"}, 401
...     return {"session_id": request.session_id}

Logging with session tracking:

>>> @app.post("/action")
>>> async def action(request: Request):
...     session = request.session_id or "anonymous"
...     request.logger.info(f"Action by session: {session}")
...     return {"status": "success"}

Note

Requires SecurityMiddleware to be configured with session management. Session ID is typically a cryptographically secure random string. This is read-only - use request.session dict to store session data.

See also

session: Session data dictionary SecurityMiddleware: Session management middleware

property logger: Logger

Get the application logger instance for request-scoped logging.

Returns the application’s configured logger if available, otherwise falls back to the framework’s default logger. Useful for logging request-specific events and errors.

Returns:

Python logging.Logger instance for writing log messages.

Return type:

Logger

Examples

Log request information:

>>> @app.post("/api/create")
>>> async def create(request: Request):
...     request.logger.info(f"Creating resource from {request.client_ip}")
...     data = await request.json()
...     request.logger.debug(f"Received data: {data}")
...     return {"status": "created"}, 201

Error logging:

>>> @app.get("/api/data")
>>> async def get_data(request: Request):
...     try:
...         result = await fetch_data()
...         return {"data": result}
...     except Exception as e:
...         request.logger.error(f"Data fetch failed: {e}", exc_info=True)
...         return {"error": "Internal error"}, 500

Conditional debug logging:

>>> @app.post("/api/process")
>>> async def process(request: Request):
...     if request.logger.isEnabledFor(logging.DEBUG):
...         request.logger.debug(f"Headers: {request.headers}")
...     return {"status": "processed"}

Note

Prefers application logger if set, falls back to framework logger. Logger is attached to request for convenient access in handlers.

See also

Tempest.logger: Application-level logger configuration

property cookies: dict[str, str]

Get cookies from Cookie header as a dictionary.

Parses the Cookie header into a dictionary mapping cookie names to values. Parsing is lazy-loaded and cached. Provides Flask/Quart-style convenient cookie access without manual header parsing.

Returns:

Dictionary mapping cookie names to values.

Empty dict if no cookies present.

Return type:

Dict[str, str]

Examples

Access session cookie:

>>> @app.get("/dashboard")
>>> async def dashboard(request: Request):
...     session_id = request.cookies.get('session_id')
...     if not session_id:
...         return {"error": "Not authenticated"}, 401
...     return {"session": session_id}

Check for specific cookie:

>>> @app.get("/preferences")
>>> async def preferences(request: Request):
...     theme = request.cookies.get('theme', 'light')
...     language = request.cookies.get('lang', 'en')
...     return {"theme": theme, "language": language}

Multiple cookies:

>>> @app.get("/tracking")
>>> async def tracking(request: Request):
...     user_id = request.cookies.get('user_id')
...     tracking_id = request.cookies.get('tracking_id')
...     return {"user": user_id, "tracking": tracking_id}

Note

Cookies are parsed from the Cookie header using standard HTTP format. Multi-value cookies and cookie attributes (path, domain, etc.) are not included - only name=value pairs. Parsing result is cached for subsequent access.

See also

Response.set_cookie(): Set cookies in response Response.delete_cookie(): Delete cookies

property is_json: bool

Check if request has JSON content type.

Convenience property for checking if Content-Type header indicates JSON. Useful for conditional parsing logic and content negotiation.

Returns:

True if Content-Type is application/json, False otherwise.

Return type:

bool

Examples

Conditional JSON parsing:

>>> @app.post("/data")
>>> async def submit_data(request: Request):
...     if request.is_json:
...         data = await request.get_json()
...         return {"type": "json", "data": data}
...     else:
...         return {"error": "Expected JSON"}, 400

Content negotiation:

>>> @app.post("/process")
>>> async def process(request: Request):
...     if request.is_json:
...         data = await request.get_json()
...     elif 'form' in request.headers.get('content-type', ''):
...         data = await request.get_form()
...     else:
...         return {"error": "Unsupported content type"}, 415
...     return {"received": True}

Note

Checks if Content-Type starts with ‘application/json’. Also matches ‘application/json; charset=utf-8’ and similar variants.

property scheme: str

Get the URL scheme (http or https).

Determines the scheme from RSGI scope, respecting X-Forwarded-Proto header for proxied requests. Flask/Quart compatibility property.

Returns:

‘http’ or ‘https’ depending on connection type.

Return type:

str

Examples

Build URLs with correct scheme:

>>> @app.get("/redirect")
>>> async def redirect_to_secure(request: Request):
...     if request.scheme == 'http':
...         # Redirect to HTTPS
...         url = f"https://{request.host}{request.path}"
...         return redirect(url, status=301)
...     return {"secure": True}

Generate absolute URLs:

>>> @app.get("/share")
>>> async def share(request: Request):
...     full_url = f"{request.scheme}://{request.host}{request.path}"
...     return {"share_url": full_url}

Note

Respects X-Forwarded-Proto header for proxy/load balancer scenarios. Defaults to scope.proto from RSGI.

property host: str

port).

Returns the Host header value, which includes hostname and optionally port. Flask/Quart compatibility property.

Returns:

Host header value (e.g., ‘localhost:8000’, ‘example.com’).

Return type:

str

Examples

Get hostname:

>>> @app.get("/info")
>>> async def info(request: Request):
...     return {"host": request.host}

Domain-based routing:

>>> @app.get("/")
>>> async def home(request: Request):
...     if 'admin' in request.host:
...         return {"site": "admin"}
...     return {"site": "main"}

Note

Respects X-Forwarded-Host header for proxy scenarios. Includes port number if non-standard (e.g., ‘:8000’).

Type:

Get the host header value (hostname

property host_url: str

Get the base URL including scheme and host.

Constructs the base URL from scheme and host. Useful for building absolute URLs. Flask/Quart compatibility property.

Returns:

Base URL (e.g., ‘http://localhost:8000’, ‘https://example.com’).

Return type:

str

Examples

Build absolute URLs:

>>> @app.post("/users")
>>> async def create_user(request: Request):
...     user_id = await save_user()
...     location = f"{request.host_url}/users/{user_id}"
...     return {"location": location}, 201

API base URL:

>>> @app.get("/")
>>> async def index(request: Request):
...     return {
...         "api_base": request.host_url,
...         "docs_url": f"{request.host_url}/docs"
...     }
property base_url: str

Get the request URL without query string.

Constructs the full URL including scheme, host, and path, but excluding query string. Flask/Quart compatibility property.

Returns:

Complete URL without query string (e.g., ‘http://localhost:8000/users/123’).

Return type:

str

Examples

Get current page URL:

>>> @app.get("/article/<id>")
>>> async def article(request, id):
...     canonical_url = request.base_url
...     return {"canonical": canonical_url}

Share URL without parameters:

>>> @app.get("/search")
>>> async def search(request: Request):
...     # URL: /search?q=python&page=2
...     share_url = request.base_url  # Without ?q=python&page=2
...     return {"share": share_url}
property url: str

Get the complete request URL including query string.

Constructs the full URL including scheme, host, path, and query string. Flask/Quart compatibility property.

Returns:

Complete URL with query string (e.g., ‘http://localhost:8000/search?q=test’).

Return type:

str

Examples

Get full URL:

>>> @app.get("/search")
>>> async def search(request: Request):
...     # URL: /search?q=python&page=2
...     full_url = request.url
...     # Returns: 'http://localhost:8000/search?q=python&page=2'
...     return {"url": full_url}

Logging:

>>> @app.before_request
>>> async def log_request(request: Request):
...     request.logger.info(f"Request: {request.url}")
async get_data() bytes[source]

Get the raw request body as bytes with optional size enforcement.

Reads the complete request body from the RSGI protocol. Enforces maximum body size limit if configured via LimitsMiddleware. Body is lazy-loaded and cached for subsequent calls.

Returns:

Complete request body as raw bytes. Empty bytes object (b’’)

if no body sent with request.

Return type:

bytes

Raises:

RequestTooLarge – If body size exceeds configured max_body_bytes limit (typically set by LimitsMiddleware).

Examples

Read raw body:

>>> @app.post("/upload/raw")
>>> async def upload_raw(request: Request):
...     body = await request.get_data()
...     size = len(body)
...     return {"received_bytes": size}

Custom binary processing:

>>> @app.post("/image/process")
>>> async def process_image(request: Request):
...     image_data = await request.get_data()
...     # Process binary image data...
...     return {"processed": True}

Size limit handling:

>>> from gobstopper.http.errors import RequestTooLarge
>>>
>>> @app.post("/upload")
>>> async def upload(request: Request):
...     try:
...         data = await request.get_data()
...         return {"size": len(data)}
...     except RequestTooLarge:
...         return {"error": "File too large"}, 413

Note

Body reading is async and may involve network I/O. Result is cached - subsequent calls return same bytes object. Size limit enforced if max_body_bytes attribute set by middleware. For JSON/form data, prefer using json() or form().

See also

get_body(): Alias for backwards compatibility json(): Parse JSON body form(): Parse form body LimitsMiddleware: Configures body size limits

async get_body() bytes[source]

Get the raw request body as bytes (alias for get_data).

Backwards-compatible alias for get_data(). Provides the same functionality with identical behavior and caching.

Returns:

Complete request body as raw bytes.

Return type:

bytes

Note

This is an alias maintained for backwards compatibility. New code should prefer using get_data().

See also

get_data(): Primary method for reading raw body

async get_json() Any[source]

Parse request body as JSON asynchronously using msgspec.

Reads and parses the request body as JSON data. The parsing is lazy-loaded and cached - subsequent calls return the same parsed data.

Returns:

Parsed JSON data as Python dict, list, or primitive type. Returns None for empty request body.

Raises:

msgspec.DecodeError – If request body is not valid JSON.

Examples

Parse JSON payload:

>>> @app.post("/api/data")
>>> async def handle_data(request: Request):
...     try:
...         data = await request.get_json()
...         return {"received": data}
...     except msgspec.DecodeError:
...         return {"error": "Invalid JSON"}, 400

Note

Uses high-performance msgspec decoder. Result is cached for subsequent calls.

See also

get_form(): Parse form data get_data(): Get raw request body

async get_form() dict[str, list[str]][source]

Parse request body as form data asynchronously.

Reads and parses the request body as URL-encoded or multipart form data. Supports both application/x-www-form-urlencoded and multipart/form-data.

Returns:

Dict mapping field names to lists of values. Multiple values with same name are preserved in lists. Empty dict for non-form requests or empty body.

Raises:

Examples

Handle form submission:

Use inside an async handler: form = await request.get_form() name = form.get(“name”, [“”])[0] message = form.get(“message”, [“”])[0]

Handle multiple values:

# Form with multiple checkboxes: hobbies=reading&hobbies=coding hobbies = form.get(“hobbies”, []) # [“reading”, “coding”]

Form validation:
if not form.get(“email”):

return {“error”: “Email is required”}, 400

Note

Supports both application/x-www-form-urlencoded and multipart/form-data. For multipart requests, only returns form fields (not files). Result is cached for subsequent calls.

See also

get_json(): Parse JSON data get_data(): Get raw request body get_files(): Get uploaded files from multipart forms args: Query string parameters

async get_files() dict[str, FileStorage][source]

Parse uploaded files from multipart/form-data request.

Extracts uploaded files from multipart/form-data requests. Returns a dictionary mapping field names to FileStorage objects. Flask/Quart compatibility method.

Returns:

Mapping of field names to uploaded files.

Empty dict if no files uploaded or not multipart request.

Return type:

Dict[str, FileStorage]

Examples

Handle single file upload:

>>> @app.post('/upload')
>>> async def upload(request: Request):
...     files = await request.get_files()
...     avatar = files.get('avatar')
...     if avatar and avatar.filename:
...         avatar.save(f'uploads/{avatar.filename}')
...         return {"uploaded": avatar.filename}
...     return {"error": "No file uploaded"}, 400

Handle multiple files:

>>> @app.post('/upload-multiple')
>>> async def upload_multiple(request: Request):
...     files = await request.get_files()
...     uploaded = []
...     for field_name, file in files.items():
...         if file.filename:
...             file.save(f'uploads/{file.filename}')
...             uploaded.append(file.filename)
...     return {"uploaded": uploaded}

With form fields:

>>> @app.post('/profile')
>>> async def update_profile(request: Request):
...     # Get form data
...     body = await request.get_data()
...     content_type = request.headers.get('content-type', '')
...     from gobstopper.http.multipart import parse_multipart
...     form, files = parse_multipart(body, content_type)
...
...     # Access form fields
...     name = form.get('name', [''])[0]
...
...     # Access file
...     avatar = files.get('avatar')
...     if avatar:
...         avatar.save(f'uploads/{name}_avatar.jpg')
...
...     return {"name": name, "avatar": bool(avatar)}

Note

Requires Content-Type: multipart/form-data Files are loaded into memory - consider streaming for large files Result is cached for subsequent calls

See also

files: Property wrapper for this method FileStorage: File upload wrapper secure_filename(): Sanitize filenames

property files: dict[str, FileStorage]

Uploaded files from multipart/form-data request (async property).

Flask/Quart-style property for accessing uploaded files. This is an async property that must be awaited.

Returns:

Uploaded files by field name

Return type:

Dict[str, FileStorage]

Examples

Access uploaded file:

>>> @app.post('/upload')
>>> async def upload(request: Request):
...     files = await request.files
...     avatar = files.get('avatar')
...     if avatar:
...         avatar.save('uploads/avatar.jpg')
...         return {"uploaded": True}

Note

This is an async property - must use await request.files For compatibility, also available as await request.get_files()

See also

get_files(): Get uploaded files (method form) FileStorage: File storage class

async multipart(model: type[Struct] | None = None, max_size: int | None = None) dict[str, list[str]] | Struct | None[source]

Parse multipart/form-data request body (text fields only).

Parses multipart/form-data encoded request bodies, supporting text fields only in the current implementation. Enforces Content-Type header validation and optional body size limits. File uploads are detected but not yet supported.

Parameters:
  • model – Optional msgspec.Struct type to decode parsed fields into. When provided, field values are converted to model instance.

  • max_size – Optional maximum body size in bytes. Raises BodyValidationError if body exceeds this limit.

Returns:

Field name to values mapping if model is None. msgspec.Struct: Model instance if model type provided. None: If no body present and no model required.

Return type:

Dict[str, List[str]]

Raises:
  • UnsupportedMediaType – If Content-Type is not multipart/form-data when body exists.

  • BodyValidationError – If multipart parsing fails, size exceeds max_size, file upload encountered, or model conversion fails.

Examples

Parse multipart text fields:

>>> @app.post("/form/multipart")
>>> async def handle_multipart(request: Request):
...     fields = await request.multipart()
...     name = fields.get("name", [""])[0]
...     email = fields.get("email", [""])[0]
...     return {"name": name, "email": email}

With size limit:

>>> @app.post("/upload/limited")
>>> async def limited_upload(request: Request):
...     try:
...         # Limit to 1MB
...         fields = await request.multipart(max_size=1024*1024)
...         return {"status": "success"}
...     except BodyValidationError as e:
...         return {"error": str(e)}, 413

With model validation:

>>> import msgspec
>>>
>>> class ContactForm(msgspec.Struct):
...     name: str
...     email: str
...     message: str
>>>
>>> @app.post("/contact")
>>> async def contact(request: Request):
...     try:
...         form = await request.multipart(model=ContactForm)
...         # form is ContactForm instance with validated fields
...         return {"received": form.name}
...     except BodyValidationError as e:
...         return {"error": "Invalid form data"}, 400

Note

Current implementation supports text fields only. File uploads (filename= parameter) will raise BodyValidationError. Full file upload support is planned for future versions. Enforces CRLF line endings per multipart/form-data specification. When model provided, takes last value for fields with multiple values.

See also

form(): Parse application/x-www-form-urlencoded data json(): Parse JSON data

async json(model: type[Struct] | None = None) Any | Struct | None[source]

Parse the request body as JSON with optional model validation.

Parses JSON request body using high-performance msgspec decoder. Enforces Content-Type header validation and provides optional automatic model conversion and validation.

Parameters:

model – Optional msgspec.Struct type to decode JSON directly into. When provided, JSON is validated against model schema and returns a typed model instance. None for raw JSON dict/list parsing.

Returns:

Decoded JSON as dict, list, or primitive type if model is None. msgspec.Struct: Validated model instance if model type provided. None: If request body is empty and no model required.

Return type:

Any

Raises:
  • UnsupportedMediaType – If Content-Type header is not application/json or compatible JSON media type (e.g., application/*+json) when body exists.

  • BodyValidationError – If JSON decoding fails, validation fails, or empty body provided when model requires data.

Examples

Parse JSON without model:

>>> @app.post("/api/data")
>>> async def handle_data(request: Request):
...     data = await request.json()
...     # data is dict, list, or primitive
...     return {"received": data}

With model validation:

>>> import msgspec
>>>
>>> class User(msgspec.Struct):
...     name: str
...     email: str
...     age: int
>>>
>>> @app.post("/api/users")
>>> async def create_user(request: Request):
...     try:
...         user = await request.json(model=User)
...         # user is User instance with validated fields
...         return {"created": user.name, "email": user.email}
...     except BodyValidationError as e:
...         return {"error": str(e)}, 400

Handle empty body:

>>> @app.post("/api/optional")
>>> async def optional_data(request: Request):
...     data = await request.json()
...     if data is None:
...         return {"message": "No data provided"}
...     return {"data": data}

Content-Type validation:

>>> from gobstopper.http.errors import UnsupportedMediaType
>>>
>>> @app.post("/api/strict")
>>> async def strict_json(request: Request):
...     try:
...         data = await request.json()
...         return {"data": data}
...     except UnsupportedMediaType:
...         return {"error": "Content-Type must be application/json"}, 415

Note

Uses high-performance msgspec JSON decoder. Accepts application/json and application/*+json media types. Empty body returns None if no model specified. Empty body with model requirement raises BodyValidationError. Result is cached internally via get_body() mechanism.

See also

form(): Parse form-encoded data multipart(): Parse multipart form data get_json(): Legacy JSON parsing without model support

async form(model: type[Struct] | None = None) dict[str, list[str]] | Struct | None[source]

Parse application/x-www-form-urlencoded request body with optional model validation.

Parses URL-encoded form data from request body. Enforces Content-Type header validation and provides optional automatic model conversion and validation using msgspec.

Parameters:

model – Optional msgspec.Struct type to convert parsed form data into. When provided, form values are converted to model instance with validation. None for raw dict[str, list[str]] parsing.

Returns:

Field name to values mapping if model is None.

Lists preserve multiple values for same field name.

msgspec.Struct: Validated model instance if model type provided. None: If request body is empty and no model required.

Return type:

Dict[str, List[str]]

Raises:
  • UnsupportedMediaType – If Content-Type is not application/x-www-form-urlencoded when body exists.

  • BodyValidationError – If form parsing fails or model conversion/validation fails.

Examples

Parse form data without model:

>>> @app.post("/submit")
>>> async def submit_form(request: Request):
...     form = await request.form()
...     # form = {"name": ["John"], "email": ["john@example.com"]}
...     name = form.get("name", [""])[0]
...     email = form.get("email", [""])[0]
...     return {"name": name, "email": email}

Handle multiple values:

>>> @app.post("/preferences")
>>> async def preferences(request: Request):
...     form = await request.form()
...     # Form: hobbies=reading&hobbies=coding&hobbies=gaming
...     hobbies = form.get("hobbies", [])  # ["reading", "coding", "gaming"]
...     return {"hobbies": hobbies}

With model validation:

>>> import msgspec
>>>
>>> class LoginForm(msgspec.Struct):
...     username: str
...     password: str
>>>
>>> @app.post("/login")
>>> async def login(request: Request):
...     try:
...         creds = await request.form(model=LoginForm)
...         # creds is LoginForm instance with validated fields
...         return {"username": creds.username}
...     except BodyValidationError as e:
...         return {"error": "Invalid form data"}, 400

Content-Type enforcement:

>>> from gobstopper.http.errors import UnsupportedMediaType
>>>
>>> @app.post("/form/strict")
>>> async def strict_form(request: Request):
...     try:
...         form = await request.form()
...         return {"received": True}
...     except UnsupportedMediaType:
...         return {"error": "Content-Type must be application/x-www-form-urlencoded"}, 415

Note

Enforces Content-Type header must be application/x-www-form-urlencoded. When model provided, takes last value for fields with multiple values. Uses Python’s urllib.parse.parse_qs for form parsing. Result is cached internally via get_form() mechanism. Empty body returns None if no model specified.

See also

json(): Parse JSON data multipart(): Parse multipart form data get_form(): Legacy form parsing without model support args: Query string parameters

id
app
max_body_bytes
max_json_depth

Response

HTTP Response handling for Gobstopper framework.

This module provides response classes and utilities for handling HTTP responses in the Gobstopper web framework. It includes support for standard responses, JSON serialization, file serving, and streaming responses with full RSGI protocol integration.

Classes:

Response: Standard HTTP response with flexible content types and headers JSONResponse: JSON response with high-performance msgspec serialization FileResponse: File serving response with automatic MIME type detection StreamResponse: Streaming response for server-sent events and large content

Key Features:
  • Automatic content type detection

  • Secure cookie management with production enforcement

  • RSGI protocol compatibility

  • High-performance JSON serialization via msgspec

  • File serving with MIME type detection

  • Streaming support for progressive content delivery

  • Comprehensive header and cookie management

Example

Basic response types:

>>> # Standard text/HTML response
>>> return Response("Hello World")
>>> # JSON API response
>>> return JSONResponse({"status": "success", "data": results})
>>> # File download
>>> return FileResponse("reports/data.pdf", filename="report.pdf")
>>> # Streaming response
>>> async def event_stream():
...     for i in range(100):
...         yield f"data: Event {i}\n\n"
>>> return StreamResponse(event_stream(), content_type="text/event-stream")

See also

gobstopper.http.helpers: Helper functions (jsonify, send_file, stream_template) gobstopper.http.sse: Server-sent events support (SSEStream, format_sse) gobstopper.core.app.Gobstopper: Main application class with RSGI protocol handling

class gobstopper.http.response.Response(body: str | bytes = '', status: int = 200, headers: dict[str, str] | None = None, content_type: str | None = None)[source]

Bases: object

HTTP response with flexible content types and headers.

Represents an HTTP response with configurable status code, headers, and body content. Supports automatic content type detection and header management for RSGI protocol compatibility.

Parameters:
  • body – Response content as string or bytes

  • status – HTTP status code (default: 200)

  • headers – HTTP headers as dict (optional)

  • content_type – MIME type override (auto-detected if not provided)

body

Response body content

status

HTTP status code

headers

Response headers dict

Examples

Text response:

>>> return Response("Hello World")
>>> return Response("Error message", status=400)

HTML response:

>>> html = "<h1>Welcome</h1>"
>>> return Response(html, content_type="text/html")

Custom headers:

>>> return Response("OK", headers={"X-Custom": "value"})

Binary content:

>>> return Response(image_bytes, content_type="image/png")

Note

Content-Type is auto-detected: string content defaults to text/html, bytes content defaults to application/octet-stream.

__init__(body: str | bytes = '', status: int = 200, headers: dict[str, str] | None = None, content_type: str | None = None)[source]

Set a cookie with comprehensive security options.

Adds a Set-Cookie header to the response with secure defaults appropriate for production environments. In production (ENV=production), enforces secure cookie settings unless explicitly disabled via environment variable.

Parameters:
  • name – Cookie name

  • value – Cookie value

  • path – Cookie path scope (default: “/”)

  • domain – Cookie domain scope (optional, defaults to current domain)

  • max_age – Cookie lifetime in seconds (optional, None means session cookie)

  • expires – Cookie expiration date in HTTP format (optional, overridden by max_age)

  • secure – Require HTTPS transmission (default: True, enforced in production)

  • httponly – Prevent JavaScript access (default: True, enforced in production)

  • samesite – CSRF protection level - “Strict”, “Lax”, or “None” (default: “Strict”, enforced to at least “Lax” in production)

Examples

Basic session cookie:

>>> response = Response("Welcome")
>>> response.set_cookie("session_id", "abc123")

Persistent cookie with expiration:

>>> response.set_cookie("user_pref", "dark_mode",
...                    max_age=86400*30)  # 30 days

Subdomain cookie:

>>> response.set_cookie("auth_token", token,
...                    domain=".example.com",
...                    secure=True,
...                    httponly=True)

Development-only insecure cookie:

>>> # Set ENV=development and WOPR_ALLOW_INSECURE_COOKIES=true
>>> response.set_cookie("debug", "1", secure=False)

Note

Production security enforcement (when ENV=production): - secure=False is overridden to True with warning - httponly=False is overridden to True with warning - samesite=None is overridden to “Lax” with warning

Set WOPR_ALLOW_INSECURE_COOKIES=true to disable enforcement.

See also

delete_cookie(): Remove a cookie

Delete a cookie by setting it to expire immediately.

Removes a cookie from the client by setting an expired Set-Cookie header. The path and domain must match the original cookie for proper deletion.

Parameters:
  • name – Cookie name to delete

  • path – Cookie path scope (must match original, default: “/”)

  • domain – Cookie domain scope (must match original, optional)

Examples

Delete a session cookie:

>>> response = Response("Logged out")
>>> response.delete_cookie("session_id")

Delete a subdomain cookie:

>>> response.delete_cookie("auth_token", domain=".example.com")

Delete a path-specific cookie:

>>> response.delete_cookie("cart_id", path="/shop")

Note

To successfully delete a cookie, path and domain must match the values used when the cookie was originally set.

See also

set_cookie(): Set a cookie

to_rsgi_headers() list[tuple[str, str]][source]

Convert response headers to RSGI protocol format.

Transforms the headers dictionary and cookie list into the list of tuples format required by the RSGI (Rust Server Gateway Interface) protocol.

Returns:

List of (name, value) tuples for RSGI response headers. Each cookie generates a separate (“set-cookie”, cookie_string) tuple.

Examples

>>> response = Response("OK", headers={"X-Custom": "value"})
>>> response.set_cookie("session", "abc123")
>>> response.to_rsgi_headers()
[('content-type', 'text/html; charset=utf-8'),
 ('X-Custom', 'value'),
 ('set-cookie', 'session=abc123; Path=/; Secure; HttpOnly; SameSite=Strict')]

Note

This method is called internally by the Gobstopper framework when sending responses through the RSGI protocol. You typically don’t need to call it directly.

Multiple Set-Cookie headers are properly handled as separate tuples, as required by HTTP specifications and RSGI protocol.

See also

Gobstopper: Main application class handling RSGI protocol

class gobstopper.http.response.JSONResponse(data: Any, status: int = 200, **kwargs)[source]

Bases: Response

HTTP response for JSON data with automatic, high-performance serialization.

Convenience class for returning JSON responses with proper Content-Type headers and optimized JSON serialization using msgspec.

Parameters:
  • data – Python object to serialize as JSON (dict, list, primitives)

  • status – HTTP status code (default: 200)

  • **kwargs – Additional arguments passed to parent Response class

Examples

Dictionary response:

>>> return JSONResponse({"message": "Success", "data": results})

List response:

>>> return JSONResponse([1, 2, 3, 4, 5])

With custom status:

>>> return JSONResponse({"error": "Not found"}, status=404)

Note

Uses high-performance msgspec for serialization. Automatically sets Content-Type to ‘application/json’.

See also

Response: Base response class

__init__(data: Any, status: int = 200, **kwargs)[source]
class gobstopper.http.response.FileResponse(path: str | Path, filename: str | None = None, status: int = 200, headers: dict[str, str] | None = None)[source]

Bases: Response

HTTP response for serving files with proper headers and MIME detection.

Optimized response class for serving static files, downloads, and attachments. Automatically detects MIME types, sets appropriate headers, and handles file serving through the RSGI protocol.

Parameters:
  • path – File path as string or Path object

  • filename – Download filename (defaults to basename of path)

  • status – HTTP status code (default: 200)

  • headers – Additional headers (optional)

file_path

Resolved file path as string

filename

Filename for Content-Disposition header

Examples

Serve static file:

>>> @app.get("/download/<filename>")
>>> async def download(request, filename):
...     return FileResponse(f"uploads/{filename}")

Force download with custom name:

>>> return FileResponse("report.pdf", filename="monthly_report.pdf")

Image serving:

>>> @app.get("/images/<image_id>")
>>> async def serve_image(request, image_id):
...     path = f"images/{image_id}.jpg"
...     return FileResponse(path)

Note

Automatically sets Content-Type based on file extension. Sets Content-Disposition: attachment for download behavior. File path resolution and existence checking is handled by RSGI protocol.

See also

Response: Base response class StreamResponse: For streaming large files

__init__(path: str | Path, filename: str | None = None, status: int = 200, headers: dict[str, str] | None = None)[source]
class gobstopper.http.response.StreamResponse(generator: Callable[[], Awaitable], status: int = 200, headers: dict[str, str] | None = None, content_type: str = 'text/plain')[source]

Bases: object

HTTP streaming response for real-time data and large content.

Enables streaming HTTP responses for server-sent events, chunked transfer encoding, and progressive content delivery. Ideal for large datasets, real-time updates, and template streaming.

Parameters:
  • generator – Async generator function that yields string or bytes chunks

  • status – HTTP status code (default: 200)

  • headers – Additional headers (optional)

  • content_type – MIME type (default: ‘text/plain’)

generator

Async generator for content chunks

status

HTTP status code

headers

Response headers dict

Examples

Server-sent events:

>>> @app.get("/events")
>>> async def stream_events(request):
...     async def event_generator():
...         for i in range(100):
...             yield f"data: Event {i}\n\n"
...             await asyncio.sleep(1)
...     return StreamResponse(event_generator(),
...                         content_type="text/event-stream")

Large data streaming:

>>> @app.get("/large-csv")
>>> async def stream_csv(request):
...     async def csv_generator():
...         yield "id,name,email\n"
...         async for user in get_all_users_stream():
...             yield f"{user.id},{user.name},{user.email}\n"
...     return StreamResponse(csv_generator(),
...                         content_type="text/csv")

Template streaming (with Rust engine):

>>> async def stream_template():
...     return await app.render_template("large_page.html",
...                                     stream=True,
...                                     data=huge_dataset)

Note

Generator function must be async and yield string or bytes. Streaming reduces memory usage for large responses. Compatible with template streaming when using Rust engine.

See also

Response: Standard response class Gobstopper.render_template(): Template rendering with streaming

__init__(generator: Callable[[], Awaitable], status: int = 200, headers: dict[str, str] | None = None, content_type: str = 'text/plain')[source]
gobstopper.http.response.redirect(location: str, status: int = 302) Response[source]

Create a redirect response to the given location.

Flask/Quart-style redirect helper that creates an HTTP redirect response. Commonly used with url_for() for type-safe redirects to named routes.

Parameters:
  • location – URL to redirect to. Can be absolute (/path) or relative (path) or full URL (https://example.com/path)

  • status – HTTP redirect status code. Common values: - 301: Moved Permanently (cached by browsers) - 302: Found (temporary, default) - 303: See Other (POST → GET redirect) - 307: Temporary Redirect (preserves method) - 308: Permanent Redirect (preserves method)

Returns:

Response object with Location header and appropriate status code

Examples

Simple redirect:

>>> return redirect('/dashboard')

Redirect to named route:

>>> return redirect(app.url_for('user_profile', user_id=123))

Permanent redirect:

>>> return redirect('/new-location', status=301)

Post-redirect-get pattern:

>>> @app.post('/users')
>>> async def create_user(request):
...     user_id = save_user()
...     return redirect(app.url_for('user_detail', id=user_id), status=303)

External redirect:

>>> return redirect('https://example.com/login')

Note

  • Default 302 is safe for most use cases

  • Use 301 carefully as browsers cache it permanently

  • Use 303 for POST → GET redirects (RESTful pattern)

  • Use 307/308 if you need to preserve the HTTP method

See also

Gobstopper.url_for(): Build URLs for named routes Response: Base response class

Background Tasks

Task queue management for Gobstopper framework.

This module provides the core task queue system for Gobstopper, featuring: - Priority-based task queuing with category separation - Async worker pool management with graceful shutdown - Automatic retry logic with exponential backoff - Persistent storage using DuckDB (optional) - Feature flag control via environment variable

The task system is disabled by default and must be explicitly enabled via the WOPR_TASKS_ENABLED environment variable or constructor parameter. This prevents accidental database creation and resource usage.

Classes:

NoopStorage: Placeholder storage when tasks are disabled. TaskQueue: Main task queue with worker management and execution.

Example

Basic task queue usage:

from gobstopper.tasks.queue import TaskQueue
from gobstopper.tasks.models import TaskPriority
import os

# Enable tasks
os.environ["WOPR_TASKS_ENABLED"] = "1"

# Create queue
queue = TaskQueue(enabled=True)

# Register task functions
@queue.register_task("send_email", category="notifications")
async def send_email(to: str, subject: str):
    # Send email logic
    return {"sent": True, "to": to}

# Start workers for category
await queue.start_workers("notifications", worker_count=3)

# Queue a task
task_id = await queue.add_task(
    "send_email",
    category="notifications",
    priority=TaskPriority.HIGH,
    max_retries=3,
    "user@example.com",
    subject="Welcome!"
)

# Check task status
task_info = await queue.get_task_info(task_id)
print(f"Status: {task_info.status}")

# Shutdown gracefully
await queue.shutdown()
Environment Variables:
WOPR_TASKS_ENABLED: Set to “1”, “true”, “True”, or “yes” to enable

the task system. Defaults to disabled.

Note

  • Tasks are disabled by default to prevent unintended side effects

  • Storage (DuckDB) is created lazily on first use

  • Each category has its own queue and worker pool

  • Tasks within a category are ordered by priority then FIFO

  • Workers shut down gracefully, completing current tasks

class gobstopper.tasks.queue.NoopStorage[source]

Bases: object

A storage implementation that does nothing (used when tasks are disabled).

This class provides a null-object pattern implementation of the storage interface, allowing the task queue to function without raising errors when tasks are disabled. All methods are no-ops that return safe defaults.

This prevents database file creation and avoids import errors when DuckDB is not installed but tasks are disabled anyway.

Example

NoopStorage is used automatically when tasks are disabled:

queue = TaskQueue(enabled=False)
# queue.storage will be NoopStorage()
# No database file created, no errors raised

Note

  • All save operations are silently ignored

  • All query operations return None or empty lists

  • No exceptions are raised

  • Used internally by TaskQueue when enabled=False

save_task(task_info: TaskInfo)[source]

No-op: task is not saved.

get_task(task_id: str) TaskInfo | None[source]

No-op: always returns None.

get_tasks(**kwargs)[source]

No-op: always returns empty list.

cleanup_old_tasks(**kwargs)[source]

No-op: always returns 0 deleted.

class gobstopper.tasks.queue.TaskQueue(enabled: bool | None = None, storage_factory=None)[source]

Bases: object

Priority-based background task queue with worker pool management.

TaskQueue provides a complete async task processing system with: - Category-based task organization (separate queues per category) - Priority ordering within each category queue - Configurable worker pools per category - Automatic retry logic with exponential backoff - Persistent storage using DuckDB (optional) - Graceful shutdown with task completion - Task cancellation and manual retry support

The queue system is disabled by default and must be explicitly enabled via environment variable (WOPR_TASKS_ENABLED=1) or constructor parameter. Storage is created lazily on first use to avoid unnecessary database files.

enabled

Whether the task system is enabled.

queues

Dict mapping category names to asyncio.PriorityQueue objects.

running_tasks

Dict of currently executing tasks by task_id.

workers

Dict mapping category names to lists of worker tasks.

shutdown_event

asyncio.Event signaling worker shutdown.

task_functions

Dict mapping task names to their registered functions.

Example

Complete task queue workflow:

import asyncio
from gobstopper.tasks.queue import TaskQueue
from gobstopper.tasks.models import TaskPriority, TaskStatus

# Enable and create queue
queue = TaskQueue(enabled=True)

# Register task functions
@queue.register_task("send_email", category="notifications")
async def send_email(to: str, subject: str, body: str):
    # Email sending logic
    await asyncio.sleep(1)  # Simulate work
    return {"sent": True, "to": to}

@queue.register_task("process_data", category="analytics")
def process_data(data: dict):  # Sync functions work too
    # Processing logic
    return {"records": len(data)}

# Start workers
await queue.start_workers("notifications", worker_count=3)
await queue.start_workers("analytics", worker_count=5)

# Queue tasks with different priorities
task1 = await queue.add_task(
    "send_email",
    category="notifications",
    priority=TaskPriority.HIGH,
    max_retries=3,
    "user@example.com",
    subject="Welcome",
    body="Thanks for signing up!"
)

task2 = await queue.add_task(
    "process_data",
    category="analytics",
    priority=TaskPriority.NORMAL,
    {"users": 1000}
)

# Monitor task status
while True:
    info = await queue.get_task_info(task1)
    if info.is_completed:
        if info.status == TaskStatus.SUCCESS:
            print(f"Result: {info.result}")
        else:
            print(f"Failed: {info.error}")
        break
    await asyncio.sleep(0.5)

# Get statistics
stats = await queue.get_task_stats()
print(f"Total tasks: {stats['total']}")
print(f"Running: {stats['running']}")
print(f"By status: {stats['by_status']}")

# Graceful shutdown
await queue.shutdown()

Note

  • Each category has its own queue and worker pool

  • Tasks are ordered by priority (descending), then FIFO

  • Both async and sync task functions are supported

  • Sync functions run in thread pool executor to avoid blocking

  • Retry delay uses exponential backoff: min(2^retry_count, 60) seconds

  • Workers complete current tasks before shutting down

__init__(enabled: bool | None = None, storage_factory=None)[source]

Initialize a new TaskQueue.

Parameters:
  • enabled – Whether to enable the task system. If None, reads from WOPR_TASKS_ENABLED environment variable. Defaults to disabled.

  • storage_factory – Optional factory function to create custom storage. If None, uses TaskStorage with DuckDB. Useful for testing.

Example

Creating queues with different configurations:

# Use environment variable
import os
os.environ["WOPR_TASKS_ENABLED"] = "1"
queue1 = TaskQueue()  # enabled=True from env

# Explicitly enable
queue2 = TaskQueue(enabled=True)

# Explicitly disable
queue3 = TaskQueue(enabled=False)

# Custom storage for testing
from unittest.mock import Mock
mock_storage = Mock()
queue4 = TaskQueue(
    enabled=True,
    storage_factory=lambda: mock_storage
)
property storage

Get the storage backend, creating it lazily if needed.

Returns NoopStorage if tasks are disabled, otherwise creates and caches a TaskStorage instance (or custom storage from factory).

Returns:

Storage object implementing save_task, get_task, get_tasks, and cleanup_old_tasks methods.

Note

  • Storage is created only on first access (lazy initialization)

  • Returns NoopStorage if enabled=False

  • Uses storage_factory if provided, otherwise creates TaskStorage

  • DuckDB is imported only when storage is actually needed

register_task(name: str, category: str = 'default')[source]

Decorator to register a task function with the queue.

Registers a function (sync or async) to be available for task execution. The function can then be queued by name using add_task(). Both regular functions and coroutine functions are supported.

Parameters:
  • name – Unique name to identify this task function.

  • category – Category for organizing tasks. Tasks in the same category share a queue and worker pool. Defaults to “default”.

Returns:

Decorator function that registers and returns the original function.

Example

Registering task functions:

queue = TaskQueue(enabled=True)

# Register async task
@queue.register_task("send_email", category="notifications")
async def send_email(to: str, subject: str):
    # Async email logic
    return {"sent": True}

# Register sync task
@queue.register_task("process_file", category="analytics")
def process_file(filename: str):
    # Sync file processing
    return {"lines": 1000}

# Register with default category
@queue.register_task("cleanup")
def cleanup():
    # Cleanup logic
    pass

Note

  • Task name must be unique across all categories

  • Category determines which queue and workers handle the task

  • Both sync and async functions are supported

  • The original function is returned unchanged (can be called directly)

async add_task(name: str, category: str = 'default', priority: TaskPriority = TaskPriority.NORMAL, max_retries: int = 0, *args, **kwargs) str[source]

Add a task to the queue for execution.

Creates a TaskInfo object with a unique ID, saves it to storage, and adds it to the appropriate category queue. The task will be picked up by a worker when available.

Parameters:
  • name – Name of the registered task function to execute.

  • category – Category queue to add the task to. Defaults to “default”.

  • priority – Task priority for queue ordering. Higher priority tasks execute first. Defaults to TaskPriority.NORMAL.

  • max_retries – Maximum number of retry attempts on failure. Defaults to 0 (no retries).

  • *args – Positional arguments to pass to the task function.

  • **kwargs – Keyword arguments to pass to the task function.

Returns:

Unique task ID (UUID4) for tracking the task.

Return type:

str

Raises:

Example

Adding tasks with different configurations:

queue = TaskQueue(enabled=True)

# Simple task with no args
task_id1 = await queue.add_task("cleanup")

# Task with positional args
task_id2 = await queue.add_task(
    "send_email",
    category="notifications",
    "user@example.com",
    "Welcome to our service"
)

# Task with keyword args and high priority
task_id3 = await queue.add_task(
    "process_order",
    category="orders",
    priority=TaskPriority.HIGH,
    max_retries=3,
    order_id=12345,
    customer_id=67890
)

# Critical task with immediate execution
task_id4 = await queue.add_task(
    "send_alert",
    category="alerts",
    priority=TaskPriority.CRITICAL,
    alert_type="security_breach"
)

Note

  • Task function must be registered before adding to queue

  • Tasks are stored persistently before queueing

  • Priority queue ensures higher priority tasks execute first

  • Category queue is created automatically if it doesn’t exist

  • Workers must be started for the category to process tasks

async get_task_info(task_id: str) TaskInfo | None[source]

Retrieve task information and current status.

Checks running tasks first (in-memory), then queries storage for completed or pending tasks. Provides real-time task status.

Parameters:

task_id – Unique identifier of the task to retrieve.

Returns:

TaskInfo object if found, None if task doesn’t exist or tasks are disabled.

Example

Monitoring task progress:

task_id = await queue.add_task("long_process")

# Poll for completion
while True:
    info = await queue.get_task_info(task_id)
    if info:
        print(f"Status: {info.status.value}")
        print(f"Progress: {info.progress}%")
        print(f"Message: {info.progress_message}")

        if info.is_completed:
            if info.status == TaskStatus.SUCCESS:
                print(f"Result: {info.result}")
                print(f"Duration: {info.elapsed_seconds}s")
            else:
                print(f"Error: {info.error}")
            break
    await asyncio.sleep(1)

Note

  • Returns None if tasks are disabled

  • Checks running tasks first for immediate status

  • Falls back to storage for persistent lookup

  • Real-time progress tracking if task updates progress field

async cancel_task(task_id: str) bool[source]

Cancel a pending task before execution.

Marks a task as CANCELLED if it’s still pending. Cannot cancel tasks that are already running or completed.

Parameters:

task_id – Unique identifier of the task to cancel.

Returns:

True if task was successfully cancelled, False if task

doesn’t exist, is not pending, or tasks are disabled.

Return type:

bool

Example

Cancelling tasks conditionally:

# Queue a low priority task
task_id = await queue.add_task(
    "generate_report",
    priority=TaskPriority.LOW
)

# Later, decide to cancel it
if await queue.cancel_task(task_id):
    print("Task cancelled successfully")
else:
    print("Task already started or completed")

Note

  • Only PENDING tasks can be cancelled

  • Running tasks cannot be cancelled this way

  • Cancelled tasks are marked with completed_at timestamp

  • Returns False if tasks are disabled

async retry_task(task_id: str) bool[source]

Manually retry a failed task.

Resets a failed task’s status and requeues it for execution. Useful for retrying tasks that failed due to transient issues after the automatic retry limit was reached.

Parameters:

task_id – Unique identifier of the task to retry.

Returns:

True if task was successfully requeued, False if task

doesn’t exist, is not failed, or tasks are disabled.

Return type:

bool

Example

Manual retry after investigation:

# Check failed task
task = await queue.get_task_info(task_id)
if task and task.status == TaskStatus.FAILED:
    print(f"Task failed: {task.error}")

    # Fix underlying issue (e.g., restore network)
    # ...

    # Retry the task
    if await queue.retry_task(task_id):
        print("Task requeued for retry")
    else:
        print("Could not retry task")

Note

  • Only FAILED tasks can be manually retried

  • Resets task status to PENDING

  • Clears error, timing, and progress information

  • Does not increment retry_count

  • Task is added back to its original category queue

  • Returns False if tasks are disabled

async get_task_stats() dict[source]

Get aggregate statistics about tasks across all categories.

Queries storage for recent tasks and computes statistics including counts by status, counts by category, running tasks, and queued tasks.

Returns:
dict: Statistics dictionary with keys:
  • total: Total number of tasks in storage (up to 1000 recent)

  • by_status: Dict mapping status values to counts

  • by_category: Dict mapping category names to counts

  • running: Number of currently executing tasks

  • queued: Number of tasks waiting in all queues

Example:

Monitoring queue health:

stats = await queue.get_task_stats()

print(f"Total tasks: {stats['total']}")
print(f"Currently running: {stats['running']}")
print(f"Queued: {stats['queued']}")

print("
By status:”)
for status, count in stats[‘by_status’].items():

print(f” {status}: {count}”)

print(”

By category:”)
for category, count in stats[‘by_category’].items():

print(f” {category}: {count}”)

Note:
  • Returns zero/empty stats if tasks are disabled

  • Limited to 1000 most recent tasks for performance

  • by_status uses status string values, not enums

  • running count is from in-memory tracking

  • queued count is sum of all category queues

async start_workers(category: str, worker_count: int = 1)[source]

Start worker tasks to process tasks in a category queue.

Creates and starts async worker tasks that continuously poll the category queue and execute tasks. Workers run until shutdown is called or they are cancelled.

Parameters:
  • category – Name of the category queue to start workers for.

  • worker_count – Number of worker tasks to create. Defaults to 1.

Example

Starting workers for different workloads:

queue = TaskQueue(enabled=True)

# Start 3 workers for email notifications
await queue.start_workers("notifications", worker_count=3)

# Start 10 workers for heavy processing
await queue.start_workers("analytics", worker_count=10)

# Start 1 worker for low-volume admin tasks
await queue.start_workers("admin", worker_count=1)

# Now tasks in these categories will be processed

Note

  • Does nothing if tasks are disabled

  • Category queue is created automatically if needed

  • Workers are tracked in self.workers[category]

  • Multiple calls add more workers to existing pool

  • Workers process tasks by priority, then FIFO

  • Each worker handles one task at a time

  • Workers shut down gracefully on queue.shutdown()

async shutdown()[source]

Shutdown all workers gracefully, completing current tasks.

Sets the shutdown event, cancels all worker tasks, and waits for them to finish their current task and exit. Ensures clean shutdown without leaving tasks in inconsistent states.

Example

Graceful application shutdown:

queue = TaskQueue(enabled=True)

# Register tasks and start workers
# ...

try:
    # Run application
    await app.run()
finally:
    # Ensure workers shut down cleanly
    await queue.shutdown()
    print("All workers stopped")

Note

  • Does nothing if tasks are disabled

  • Sets shutdown_event to signal workers to stop

  • Cancels all worker tasks across all categories

  • Waits for workers to complete current task execution

  • Safe to call multiple times (idempotent)

  • Exceptions during worker shutdown are suppressed

Task data models for Gobstopper framework.

This module provides the core data models and enums for the Gobstopper task system, including task status tracking, priority levels, and comprehensive metadata.

The models are implemented using msgspec.Struct for high performance serialization and deserialization, making them suitable for storage in DuckDB and efficient memory usage.

Classes:

TaskStatus: Enumeration of all possible task execution states. TaskPriority: Enumeration of task priority levels for queue ordering. TaskInfo: Complete task metadata and execution state tracking.

Example

Creating a task info object for tracking:

from datetime import datetime
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority

task = TaskInfo(
    id="task-123",
    name="send_email",
    category="notifications",
    priority=TaskPriority.HIGH,
    status=TaskStatus.PENDING,
    created_at=datetime.now(),
    args=("user@example.com",),
    kwargs={"subject": "Welcome", "template": "welcome.html"}
)

Note

All datetime fields use timezone-naive datetime objects. It’s recommended to use UTC for consistency across distributed systems.

class gobstopper.tasks.models.TaskStatus(*values)[source]

Bases: Enum

Task execution status enumeration.

Tracks the lifecycle state of a task from creation through completion. Tasks progress through these states as they are queued, executed, and finalized.

PENDING

Task is queued and waiting to be picked up by a worker.

STARTED

Task is currently being executed by a worker.

SUCCESS

Task completed successfully without errors.

FAILED

Task failed and exceeded its retry limit.

CANCELLED

Task was explicitly cancelled before execution.

RETRY

Task failed but will be retried (transient state).

Example

Checking task status:

if task.status == TaskStatus.PENDING:
    print("Task is waiting in queue")
elif task.status == TaskStatus.STARTED:
    print(f"Task is running: {task.progress}% complete")
elif task.status == TaskStatus.SUCCESS:
    print(f"Task completed in {task.elapsed_seconds}s")

Note

RETRY is a transient state - tasks in RETRY status are automatically requeued as PENDING after an exponential backoff delay.

PENDING = 'pending'
STARTED = 'started'
SUCCESS = 'success'
FAILED = 'failed'
CANCELLED = 'cancelled'
RETRY = 'retry'
class gobstopper.tasks.models.TaskPriority(*values)[source]

Bases: Enum

Task priority levels for queue ordering.

Higher priority tasks are executed before lower priority tasks within the same category queue. Priority values determine the execution order in the priority queue.

LOW

Low priority (1) - for non-urgent background tasks.

NORMAL

Normal priority (2) - default for most tasks.

HIGH

High priority (3) - for time-sensitive operations.

CRITICAL

Critical priority (4) - for urgent, must-run-first tasks.

Example

Setting task priority:

# Regular background cleanup
await queue.add_task(
    "cleanup_temp_files",
    priority=TaskPriority.LOW
)

# User-initiated email
await queue.add_task(
    "send_email",
    priority=TaskPriority.HIGH,
    args=("user@example.com",)
)

# System alert notification
await queue.add_task(
    "send_alert",
    priority=TaskPriority.CRITICAL,
    kwargs={"alert_type": "security"}
)

Note

Tasks with the same priority are processed in FIFO order. Priority only affects ordering within a category queue.

LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
class gobstopper.tasks.models.TaskInfo(*, id: str, name: str, category: str, priority: ~gobstopper.tasks.models.TaskPriority, status: ~gobstopper.tasks.models.TaskStatus, created_at: ~datetime.datetime, started_at: ~datetime.datetime | None = None, completed_at: ~datetime.datetime | None = None, elapsed_seconds: float = 0.0, result: ~typing.Any = None, error: str | None = None, retry_count: int = 0, max_retries: int = 0, args: tuple = (), kwargs: dict = <factory>, progress: float = 0.0, progress_message: str = '')[source]

Bases: Struct

Complete task metadata and execution state tracking.

Stores all information about a task from creation through completion, including timing data, execution results, error information, and progress tracking. Uses msgspec.Struct for efficient serialization and memory usage.

id

Unique task identifier (UUID4 string).

Type:

str

name

Name of the registered task function to execute.

Type:

str

category

Category queue this task belongs to (e.g., “email”, “reports”).

Type:

str

priority

Task priority level determining execution order.

Type:

gobstopper.tasks.models.TaskPriority

status

Current execution state of the task.

Type:

gobstopper.tasks.models.TaskStatus

created_at

Timestamp when the task was created and queued.

Type:

datetime.datetime

started_at

Timestamp when worker began executing (None if not started).

Type:

datetime.datetime | None

completed_at

Timestamp when execution finished (None if not complete).

Type:

datetime.datetime | None

elapsed_seconds

Total execution time in seconds (0.0 if not complete).

Type:

float

result

Return value from the task function (None if no return value).

Type:

Any

error

Error message if task failed (None if successful).

Type:

str | None

retry_count

Number of retry attempts made so far.

Type:

int

max_retries

Maximum number of retries allowed before marking as FAILED.

Type:

int

args

Positional arguments passed to the task function.

Type:

tuple

kwargs

Keyword arguments passed to the task function.

Type:

dict

progress

Completion percentage (0.0 to 100.0) for progress tracking.

Type:

float

progress_message

Human-readable progress status message.

Type:

str

Example

Creating and tracking a task:

from datetime import datetime
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority

# Create task info when queuing
task = TaskInfo(
    id="550e8400-e29b-41d4-a716-446655440000",
    name="send_welcome_email",
    category="notifications",
    priority=TaskPriority.HIGH,
    status=TaskStatus.PENDING,
    created_at=datetime.now(),
    max_retries=3,
    args=("user@example.com",),
    kwargs={
        "template": "welcome.html",
        "language": "en"
    }
)

# Update progress during execution
task.progress = 50.0
task.progress_message = "Rendering email template..."

# Mark completion
task.status = TaskStatus.SUCCESS
task.completed_at = datetime.now()
task.elapsed_seconds = 2.5
task.progress = 100.0

Note

  • All datetime fields are timezone-naive; use UTC for consistency.

  • TaskInfo instances are stored in DuckDB for persistence.

  • The result field can store any JSON-serializable data.

  • Progress tracking is optional; tasks work fine with progress=0.0.

id: str
name: str
category: str
priority: TaskPriority
status: TaskStatus
created_at: datetime
started_at: datetime | None
completed_at: datetime | None
elapsed_seconds: float
result: Any
error: str | None
retry_count: int
max_retries: int
args: tuple
kwargs: dict
progress: float
progress_message: str
property is_running: bool

Check if the task is currently being executed.

Returns:

True if task status is STARTED, False otherwise.

Return type:

bool

Example

Monitoring task execution:

task_info = await queue.get_task_info(task_id)
if task_info.is_running:
    print(f"Task is {task_info.progress}% complete")
    print(f"Status: {task_info.progress_message}")
property is_completed: bool

Check if the task has reached a terminal state.

A task is considered completed if it has reached SUCCESS, FAILED, or CANCELLED status. These are terminal states where no further execution will occur.

Returns:

True if task is in a terminal state, False if still

pending, running, or eligible for retry.

Return type:

bool

Example

Waiting for task completion:

while True:
    task_info = await queue.get_task_info(task_id)
    if task_info.is_completed:
        if task_info.status == TaskStatus.SUCCESS:
            print(f"Result: {task_info.result}")
        else:
            print(f"Error: {task_info.error}")
        break
    await asyncio.sleep(1)

Note

RETRY status is not considered completed - the task will be requeued automatically.

Task storage with DuckDB backend for Gobstopper framework.

This module provides persistent storage for task metadata using DuckDB, an embedded analytical database. Tasks are stored with full metadata including status, timing, results, and error information for tracking and debugging.

The storage system features: - Lazy database initialization (no file created until first use) - Automatic table and index creation - Efficient querying with category and status filters - Intelligent cleanup of old completed tasks - JSON serialization for complex data types

Classes:

TaskStorage: DuckDB-based persistent task storage with indexing.

Example

Basic storage operations:

from gobstopper.tasks.storage import TaskStorage
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority
from datetime import datetime

# Create storage instance
storage = TaskStorage("my_tasks.duckdb")

# Save a task
task = TaskInfo(
    id="task-123",
    name="process_data",
    category="analytics",
    priority=TaskPriority.NORMAL,
    status=TaskStatus.PENDING,
    created_at=datetime.now()
)
storage.save_task(task)

# Retrieve task
retrieved = storage.get_task("task-123")
print(f"Status: {retrieved.status}")

# Query by category
analytics_tasks = storage.get_tasks(category="analytics", limit=50)

# Cleanup old completed tasks
deleted = storage.cleanup_old_tasks(days=30)
print(f"Deleted {deleted} old tasks")

Note

DuckDB is required for task storage. Install with: uv add duckdb The database file is created lazily on first use.

class gobstopper.tasks.storage.TaskStorage(db_path: str | Path = 'wopr_tasks.duckdb')[source]

Bases: object

DuckDB-based task storage with intelligent cleanup.

Provides persistent storage for TaskInfo objects using DuckDB as the backend. The database is created lazily on first use, with automatic schema creation and index optimization for common query patterns.

The storage layer handles: - Conversion between TaskInfo objects and database rows - JSON serialization of complex fields (args, kwargs, result) - Datetime string conversion for database compatibility - Efficient indexing on category, status, created_at, and priority - Safe concurrent access through DuckDB’s transaction handling

db_path

Path to the DuckDB database file.

connection

DuckDB connection object (created lazily).

Example

Creating and using storage:

from gobstopper.tasks.storage import TaskStorage
from gobstopper.tasks.models import TaskInfo, TaskStatus, TaskPriority
from datetime import datetime

# Initialize storage (no database file created yet)
storage = TaskStorage("tasks.duckdb")

# Create and save a task
task = TaskInfo(
    id="550e8400-e29b-41d4-a716-446655440000",
    name="send_email",
    category="notifications",
    priority=TaskPriority.HIGH,
    status=TaskStatus.PENDING,
    created_at=datetime.now(),
    args=("user@example.com",),
    kwargs={"subject": "Welcome"}
)
storage.save_task(task)  # Database file created here

# Update task status
task.status = TaskStatus.SUCCESS
task.completed_at = datetime.now()
storage.save_task(task)  # Updates existing record

# Query tasks
pending = storage.get_tasks(status=TaskStatus.PENDING, limit=100)
email_tasks = storage.get_tasks(category="notifications")
Raises:

ImportError – If DuckDB is not installed.

Note

  • Database initialization is deferred until first actual use

  • The database file is created with full schema on first write

  • All datetime values are stored as ISO 8601 strings

  • JSON fields (args, kwargs, result) support any JSON-serializable data

__init__(db_path: str | Path = 'wopr_tasks.duckdb')[source]

Initialize TaskStorage with a database path.

Parameters:

db_path – Path to the DuckDB database file. Can be relative or absolute. Defaults to “wopr_tasks.duckdb” in current directory.

Raises:

ImportError – If DuckDB package is not installed.

Note

The database file is not created until the first operation that requires it. This allows TaskStorage to be instantiated without side effects.

save_task(task_info: TaskInfo)[source]

Save or update task information in the database.

Performs an upsert operation (INSERT OR REPLACE) to either create a new task record or update an existing one. All TaskInfo fields are serialized appropriately for database storage.

Parameters:

task_info – TaskInfo object to save or update.

Example

Saving and updating a task:

task = TaskInfo(
    id="task-123",
    name="process_data",
    category="analytics",
    priority=TaskPriority.NORMAL,
    status=TaskStatus.PENDING,
    created_at=datetime.now()
)

# Initial save
storage.save_task(task)

# Update after execution
task.status = TaskStatus.SUCCESS
task.completed_at = datetime.now()
task.elapsed_seconds = 5.2
task.result = {"records_processed": 1000}
storage.save_task(task)  # Updates existing record

Note

  • Datetime objects are converted to ISO 8601 strings

  • TaskPriority and TaskStatus enums are stored as their values

  • args, kwargs, and result are JSON-serialized

  • The database is initialized automatically if needed

get_task(task_id: str) TaskInfo | None[source]

Retrieve a single task by its unique ID.

Parameters:

task_id – Unique identifier of the task to retrieve.

Returns:

TaskInfo object if found, None if no task exists with that ID.

Example

Retrieving and checking a task:

task = storage.get_task("550e8400-e29b-41d4-a716-446655440000")
if task:
    print(f"Task: {task.name}")
    print(f"Status: {task.status.value}")
    if task.status == TaskStatus.SUCCESS:
        print(f"Result: {task.result}")
else:
    print("Task not found")

Note

Returns None rather than raising an exception if task doesn’t exist.

get_tasks(category: str | None = None, status: TaskStatus | None = None, limit: int = 100, offset: int = 0) List[TaskInfo][source]

Query tasks with optional filtering and pagination.

Retrieves multiple tasks from storage with optional filters on category and status. Results are ordered by creation time (newest first) and support pagination via limit/offset.

Parameters:
  • category – Filter by task category (e.g., “email”, “reports”). If None, returns tasks from all categories.

  • status – Filter by TaskStatus enum value. If None, returns tasks in any status.

  • limit – Maximum number of tasks to return. Defaults to 100.

  • offset – Number of tasks to skip (for pagination). Defaults to 0.

Returns:

List of TaskInfo objects matching the filters, ordered by created_at DESC (newest first). Empty list if no matches.

Example

Querying tasks with filters:

# Get all pending email tasks
pending_emails = storage.get_tasks(
    category="email",
    status=TaskStatus.PENDING,
    limit=50
)

# Get next page of results
next_page = storage.get_tasks(
    category="email",
    status=TaskStatus.PENDING,
    limit=50,
    offset=50
)

# Get recent failed tasks across all categories
failed = storage.get_tasks(
    status=TaskStatus.FAILED,
    limit=20
)

# Get all tasks in analytics category
analytics = storage.get_tasks(category="analytics")

Note

  • Results are always ordered by created_at DESC

  • Efficient queries due to indexes on category and status

  • Both category and status filters are optional

cleanup_old_tasks(days: int = None, months: int = None, years: int = None)[source]

Delete old completed tasks to manage database size.

Removes tasks that completed before a calculated cutoff date. Only deletes tasks in terminal states (SUCCESS, FAILED, CANCELLED) to avoid removing active or pending tasks.

Parameters:
  • days – Number of days to retain. Tasks older than this are deleted.

  • months – Number of months to retain (converted to 30-day periods).

  • years – Number of years to retain (converted to 365-day periods).

Returns:

Number of task records deleted.

Return type:

int

Raises:

None – Returns 0 if no time period specified.

Example

Cleaning up old tasks:

# Delete tasks completed over 30 days ago
deleted = storage.cleanup_old_tasks(days=30)
print(f"Cleaned up {deleted} old tasks")

# Delete tasks completed over 3 months ago
deleted = storage.cleanup_old_tasks(months=3)

# Delete tasks completed over 1 year ago
deleted = storage.cleanup_old_tasks(years=1)

# Combine periods (90 days + 6 months)
deleted = storage.cleanup_old_tasks(days=90, months=6)

Note

  • Only deletes tasks in SUCCESS, FAILED, or CANCELLED status

  • PENDING and STARTED tasks are never deleted

  • Time periods are cumulative if multiple specified

  • Safe to run periodically (e.g., daily cron job)

  • Returns 0 if no time period arguments provided

WebSockets

WebSocket connection handling for Gobstopper framework.

This module provides WebSocket support through Granian’s RSGI protocol, offering a high-level interface for real-time bidirectional communication between server and clients.

The WebSocket class wraps the low-level RSGI WebSocket protocol and provides: - Automatic connection lifecycle management - Message size limits and validation (configurable via environment) - Automatic chunking for large messages with backpressure - Support for both text and binary messages - Type-safe message receiving with proper error handling

Security Features:
  • Message size limits (default 1 MiB) to prevent memory exhaustion

  • Automatic connection closure on oversized messages (WebSocket code 1009)

  • Chunked transmission to provide backpressure and prevent buffer overflow

Configuration:

Environment variables for tuning: - MAX_WS_MESSAGE_BYTES: Maximum message size in bytes (default: 1048576 = 1 MiB) - WS_SEND_CHUNK_BYTES: Chunk size for large messages (default: 65536 = 64 KiB)

Examples

Simple echo server:

>>> from gobstopper import Gobstopper
>>> app = Gobstopper()
>>>
>>> @app.websocket("/ws/echo")
>>> async def echo(websocket):
...     await websocket.accept()
...     while True:
...         message = await websocket.receive()
...         if message.type == "close":
...             break
...         if message.type == "text":
...             await websocket.send_text(f"Echo: {message.data}")

JSON message handling:

>>> import json
>>>
>>> @app.websocket("/ws/api")
>>> async def api_handler(websocket):
...     await websocket.accept()
...     async for message in websocket:
...         if message.type == "text":
...             data = json.loads(message.data)
...             result = await process_request(data)
...             await websocket.send_text(json.dumps(result))

Binary data streaming:

>>> @app.websocket("/ws/stream")
>>> async def stream_handler(websocket):
...     await websocket.accept()
...     # Stream large binary data in chunks
...     with open("large_file.bin", "rb") as f:
...         while chunk := f.read(65536):
...             await websocket.send_bytes(chunk)

Note

This module requires Granian with RSGI support. Fallback types are provided for development environments without Granian installed.

See also

WebSocketManager: Room-based connection management gobstopper.core.app: Main Gobstopper application class

class gobstopper.websocket.connection.WebSocket(scope: Scope, protocol: RSGIWebsocketProtocol)[source]

Bases: object

WebSocket connection wrapper for RSGI protocol.

Provides a high-level interface for WebSocket communication through Granian’s RSGI WebSocket protocol. Handles connection lifecycle, message sending/receiving, and connection management.

Parameters:
  • scope – RSGI WebSocket scope containing connection metadata

  • protocol – RSGI WebSocket protocol instance for communication

scope

Original RSGI scope object with connection info

protocol

RSGI WebSocket protocol for low-level operations

transport

Active transport instance (set after accept())

Examples

Basic echo server:

>>> @app.websocket("/ws/echo")
>>> async def echo_handler(websocket: WebSocket):
...     await websocket.accept()
...     while True:
...         message = await websocket.receive()
...         if message.type == "text":
...             await websocket.send_text(f"Echo: {message.data}")
...         elif message.type == "close":
...             break

Chat room implementation:

>>> @app.websocket("/ws/chat")
>>> async def chat_handler(websocket: WebSocket):
...     await websocket.accept()
...     try:
...         while True:
...             message = await websocket.receive()
...             if message.type == "text":
...                 # Broadcast to all connected clients
...                 await broadcast_message(message.data)
...     except ConnectionClosed:
...         pass  # Client disconnected

Binary data handling:

>>> @app.websocket("/ws/data")
>>> async def data_handler(websocket: WebSocket):
...     await websocket.accept()
...     async for message in websocket.receive_iter():
...         if message.type == "bytes":
...             processed = process_binary_data(message.data)
...             await websocket.send_bytes(processed)

Note

Must call accept() before sending/receiving messages. WebSocket connections are persistent until explicitly closed. Use try/except to handle connection errors gracefully.

See also

WebSocketManager: Room-based connection management Gobstopper.websocket(): WebSocket route decorator

__init__(scope: Scope, protocol: RSGIWebsocketProtocol)[source]
async accept()[source]

Accept the WebSocket connection and establish transport.

Must be called before sending or receiving messages. Creates the transport layer for bidirectional communication.

Returns:

Transport instance for low-level operations (usually not needed)

Raises:

Examples

Standard connection acceptance:

>>> @app.websocket("/ws")
>>> async def handler(websocket):
...     await websocket.accept()
...     # Now ready to send/receive

Note

This must be the first operation after receiving WebSocket instance. Connection cannot be accepted twice.

async send_text(data: str)[source]

Send text message to WebSocket client with automatic chunking.

Sends text messages with automatic chunking for large payloads to provide backpressure management. Messages exceeding the configured size limit will cause the connection to close with status code 1009 (Message Too Big).

The message is encoded to UTF-8 and checked against the maximum message size limit (configurable via MAX_WS_MESSAGE_BYTES environment variable, default 1 MiB). Large messages are automatically split into chunks (configurable via WS_SEND_CHUNK_BYTES, default 64 KiB).

Parameters:

data – Text message to send. Will be UTF-8 encoded before transmission.

Raises:
  • RuntimeError – If WebSocket connection not accepted (transport is None)

  • ConnectionError – If connection fails during transmission

Examples

Sending simple text messages:

>>> await websocket.accept()
>>> await websocket.send_text("Hello, client!")
>>> await websocket.send_text("Another message")

Sending JSON data:

>>> import json
>>> data = {"type": "notification", "message": "Update available"}
>>> await websocket.send_text(json.dumps(data))

Handling large messages:

>>> # Large message is automatically chunked
>>> large_text = "x" * 500_000  # 500 KB text
>>> await websocket.send_text(large_text)
>>> # Sent in chunks with automatic backpressure

Broadcasting to multiple clients:

>>> for ws in active_connections:
...     await ws.send_text(broadcast_message)

Note

Messages larger than max_message_bytes will close the connection. Chunking happens automatically for messages > send_chunk_bytes. The transport.drain() method is called after each chunk if available. Silent failure if transport is None (connection not accepted).

See also

send_bytes(): Send binary data accept(): Must be called before sending

async send_bytes(data: bytes)[source]

Send binary message to WebSocket client with automatic chunking.

Sends binary messages with automatic chunking for large payloads to provide backpressure management. Messages exceeding the configured size limit will cause the connection to close with status code 1009 (Message Too Big).

The binary data is checked against the maximum message size limit (configurable via MAX_WS_MESSAGE_BYTES environment variable, default 1 MiB). Large messages are automatically split into chunks (configurable via WS_SEND_CHUNK_BYTES, default 64 KiB).

Parameters:

data – Binary data to send as bytes or bytearray.

Raises:
  • RuntimeError – If WebSocket connection not accepted (transport is None)

  • ConnectionError – If connection fails during transmission

Examples

Sending binary data:

>>> await websocket.accept()
>>> binary_data = b"\x00\x01\x02\x03"
>>> await websocket.send_bytes(binary_data)

Sending image data:

>>> with open("image.png", "rb") as f:
...     image_data = f.read()
>>> await websocket.send_bytes(image_data)

Sending protocol buffers:

>>> import my_proto_pb2
>>> message = my_proto_pb2.MyMessage()
>>> message.field = "value"
>>> await websocket.send_bytes(message.SerializeToString())

Chunked large binary transfer:

>>> # Large binary is automatically chunked
>>> large_data = bytes(500_000)  # 500 KB
>>> await websocket.send_bytes(large_data)
>>> # Sent in 64 KB chunks with backpressure

Note

Messages larger than max_message_bytes will close the connection. Chunking happens automatically for messages > send_chunk_bytes. The transport.drain() method is called after each chunk if available. Silent failure if transport is None (connection not accepted).

See also

send_text(): Send text messages accept(): Must be called before sending

async receive()[source]

Receive message from WebSocket client.

Waits for and receives the next message from the client. Returns a message object with type and data attributes.

Returns:

  • type: Message type (“text”, “bytes”, “close”, “error”)

  • data: Message content (string for text, bytes for binary)

  • None: If transport not available

Return type:

Message object with

Raises:

Examples

Basic message receiving:

>>> message = await websocket.receive()
>>> if message.type == "text":
...     print(f"Received: {message.data}")
>>> elif message.type == "close":
...     print("Client disconnected")

Message type handling:

>>> while True:
...     message = await websocket.receive()
...     if message.type == "text":
...         await handle_text_message(message.data)
...     elif message.type == "bytes":
...         await handle_binary_data(message.data)
...     elif message.type == "close":
...         break

Note

This method blocks until a message is received. Handle connection close messages to avoid errors.

See also

send_text(): Send text messages send_bytes(): Send binary messages

async close(code: int = 1000, reason: str = '')[source]

Close the WebSocket connection gracefully.

Closes the WebSocket connection with an optional close code and reason. Following the WebSocket protocol specification for standard close codes.

Standard WebSocket close codes:
  • 1000: Normal closure (default)

  • 1001: Going away (server shutdown, browser navigation)

  • 1002: Protocol error

  • 1003: Unsupported data type

  • 1007: Invalid payload data

  • 1008: Policy violation

  • 1009: Message too big

  • 1011: Internal server error

Parameters:
  • code – WebSocket close status code (default: 1000 for normal closure). Must be a valid WebSocket close code (1000-4999).

  • reason – Optional human-readable close reason string. Must be UTF-8 and no longer than 123 bytes when encoded.

Raises:
  • RuntimeError – If connection not accepted (transport is None)

  • ValueError – If code is invalid or reason is too long

Examples

Normal closure:

>>> await websocket.accept()
>>> # ... handle messages ...
>>> await websocket.close()

Closure with reason:

>>> await websocket.close(1000, "Session ended")

Error closure:

>>> try:
...     await process_message(message)
>>> except ValidationError:
...     await websocket.close(1008, "Invalid message format")

Server shutdown:

>>> for ws in active_connections:
...     await ws.close(1001, "Server shutting down")

Note

After closing, no further messages can be sent or received. Client may also initiate close; handle “close” message type in receive(). Connection is automatically cleaned up after close.

See also

accept(): Accept connection before closing receive(): Receive close messages from client

WebSocket room management for Gobstopper framework.

This module provides a comprehensive WebSocket connection manager with room-based organization for building real-time applications like chat systems, live dashboards, collaborative editing, and multiplayer games.

The WebSocketManager class offers: - Centralized connection tracking and lifecycle management - Room-based organization for targeted message broadcasting - Automatic cleanup of dead connections - Thread-safe operations for concurrent access - Efficient broadcast mechanisms for one-to-many communication

Key Concepts:

Connection: Individual WebSocket client identified by unique connection_id Room: Named group of connections that can receive broadcasts together Broadcast: Sending messages to multiple connections simultaneously

Architecture:

The manager maintains two key data structures: - connections: Dict mapping connection_id -> WebSocket instance - rooms: Dict mapping room_name -> Set of connection_ids

This separation allows: - O(1) connection lookups - O(1) room membership checks - Efficient multi-room membership per connection - Easy cleanup when connections close

Use Cases:

Chat rooms: Users join named chat rooms to exchange messages Live updates: Broadcast data updates to subscribers of specific topics Notifications: Send alerts to specific user groups Gaming: Manage players in different game lobbies Collaboration: Sync state across users editing the same document

Examples

Basic chat room implementation:

>>> from gobstopper import Gobstopper
>>> from gobstopper.websocket import WebSocketManager
>>> import uuid
>>>
>>> app = Gobstopper()
>>> manager = WebSocketManager()
>>>
>>> @app.websocket("/ws/chat/<room>")
>>> async def chat_handler(websocket, room):
...     conn_id = str(uuid.uuid4())
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...     manager.join_room(conn_id, room)
...
...     try:
...         while True:
...             message = await websocket.receive()
...             if message.type == "close":
...                 break
...             if message.type == "text":
...                 await manager.broadcast_to_room(room, message.data)
...     finally:
...         manager.remove_connection(conn_id)

Multi-room user presence:

>>> @app.websocket("/ws/notifications")
>>> async def notification_handler(websocket):
...     conn_id = str(uuid.uuid4())
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...
...     # User can join multiple notification topics
...     manager.join_room(conn_id, "global")
...     manager.join_room(conn_id, f"user_{user_id}")
...     manager.join_room(conn_id, f"team_{team_id}")
...
...     # Now receives broadcasts from all three rooms
...     while True:
...         message = await websocket.receive()
...         if message.type == "close":
...             break

Admin broadcast to all users:

>>> async def notify_all_users(message: str):
...     await manager.broadcast_to_all(message)

Room statistics and monitoring:

>>> def get_room_stats(room: str) -> dict:
...     connections = manager.get_room_connections(room)
...     return {
...         "room": room,
...         "active_users": len(connections),
...         "connection_ids": list(connections)
...     }
Security Considerations:
  • Always validate connection_ids to prevent injection attacks

  • Implement authentication before adding connections

  • Sanitize room names to prevent unauthorized access

  • Rate limit broadcast operations to prevent abuse

  • Monitor connection counts to detect DoS attempts

Performance Notes:
  • Room lookups are O(1) with hash-based dictionaries

  • Broadcasting is O(n) where n is room size

  • Connection cleanup is O(m) where m is number of rooms

  • Memory usage scales with: connections + (rooms × avg_room_size)

See also

WebSocket: Individual WebSocket connection wrapper gobstopper.core.app: Main Gobstopper application for routing

class gobstopper.websocket.manager.WebSocketManager[source]

Bases: object

Centralized WebSocket connection manager with room-based organization.

Manages WebSocket connections with support for rooms (groups), broadcasting, and automatic connection lifecycle management. Designed for building real-time applications with many concurrent connections and targeted message delivery.

The manager provides efficient data structures for: - Fast connection lookups by ID - Organizing connections into named rooms - Broadcasting messages to rooms or all connections - Tracking which rooms a connection belongs to - Automatic cleanup of disconnected clients

connections

Dictionary mapping connection_id (str) to WebSocket instances. Maintains all active WebSocket connections.

rooms

Dictionary mapping room names (str) to Sets of connection_ids. Organizes connections into logical groups for targeted broadcasting.

Thread Safety:

This implementation is NOT thread-safe by default. For multi-threaded applications, wrap operations in locks or use asyncio’s thread-safe primitives.

Examples

Initialize manager and handle connections:

>>> from gobstopper.websocket import WebSocketManager
>>> manager = WebSocketManager()
>>>
>>> # In your WebSocket handler
>>> conn_id = "user_123"
>>> manager.add_connection(conn_id, websocket)
>>> manager.join_room(conn_id, "lobby")

Complete chat application:

>>> import uuid
>>> from gobstopper import Gobstopper
>>>
>>> app = Gobstopper()
>>> manager = WebSocketManager()
>>>
>>> @app.websocket("/chat/<room_name>")
>>> async def chat(websocket, room_name):
...     conn_id = str(uuid.uuid4())
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...     manager.join_room(conn_id, room_name)
...
...     try:
...         # Send join notification
...         await manager.broadcast_to_room(
...             room_name,
...             f"User {conn_id[:8]} joined"
...         )
...
...         # Message loop
...         while True:
...             msg = await websocket.receive()
...             if msg.type == "close":
...                 break
...             if msg.type == "text":
...                 await manager.broadcast_to_room(room_name, msg.data)
...     finally:
...         manager.remove_connection(conn_id)
...         await manager.broadcast_to_room(
...             room_name,
...             f"User {conn_id[:8]} left"
...         )

Multi-room presence (user in multiple rooms):

>>> conn_id = "user_456"
>>> manager.add_connection(conn_id, websocket)
>>> manager.join_room(conn_id, "general")
>>> manager.join_room(conn_id, "announcements")
>>> manager.join_room(conn_id, "team_alpha")
>>>
>>> # User receives broadcasts from all three rooms
>>> rooms = manager.get_connection_rooms(conn_id)
>>> print(rooms)  # {'general', 'announcements', 'team_alpha'}

Room monitoring and statistics:

>>> def get_all_room_stats():
...     stats = {}
...     for room in manager.rooms.keys():
...         connections = manager.get_room_connections(room)
...         stats[room] = {
...             "users": len(connections),
...             "connection_ids": list(connections)
...         }
...     return stats

Graceful shutdown:

>>> async def shutdown():
...     # Notify all users
...     await manager.broadcast_to_all(
...         "Server shutting down in 10 seconds"
...     )
...     # Close all connections
...     for conn_id, ws in list(manager.connections.items()):
...         await ws.close(1001, "Server shutdown")
...         manager.remove_connection(conn_id)

See also

WebSocket: Individual WebSocket connection broadcast_to_room(): Send message to specific room broadcast_to_all(): Send message to all connections

__init__()[source]

Initialize empty WebSocket manager.

Creates a new manager with no connections or rooms. The connections dictionary and rooms defaultdict are initialized and ready for use.

Examples

>>> manager = WebSocketManager()
>>> print(len(manager.connections))  # 0
>>> print(len(manager.rooms))  # 0
add_connection(connection_id: str, websocket: WebSocket)[source]

Register a new WebSocket connection with the manager.

Adds a WebSocket connection to the manager’s tracking dictionary. This must be called after accepting the WebSocket connection and before using any room or broadcast features.

The connection_id should be unique across all active connections. Using UUIDs or session tokens is recommended to ensure uniqueness.

Parameters:
  • connection_id – Unique identifier for this connection. Must be hashable and unique. Recommended to use UUID4 or secure random string.

  • websocket – The WebSocket instance to register. Should already be accepted (websocket.accept() called).

Examples

Using UUID for connection ID:

>>> import uuid
>>> conn_id = str(uuid.uuid4())
>>> manager.add_connection(conn_id, websocket)

Using session-based ID:

>>> session_id = request.session["user_id"]
>>> conn_id = f"user_{session_id}"
>>> manager.add_connection(conn_id, websocket)

Complete connection lifecycle:

>>> @app.websocket("/ws")
>>> async def handler(websocket):
...     conn_id = str(uuid.uuid4())
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...     try:
...         # Handle messages...
...         pass
...     finally:
...         manager.remove_connection(conn_id)

Note

If connection_id already exists, it will be overwritten. Ensure IDs are unique to avoid connection conflicts.

See also

remove_connection(): Remove connection from manager join_room(): Add connection to a room

remove_connection(connection_id: str)[source]

Remove a WebSocket connection and clean up all room memberships.

Removes the connection from the manager and automatically removes it from all rooms it was a member of. This is the proper way to clean up when a connection closes, ensuring no dangling references remain.

The operation is idempotent - calling it multiple times with the same connection_id is safe and has no effect after the first removal.

Parameters:

connection_id – The unique identifier of the connection to remove.

Examples

Basic cleanup in finally block:

>>> try:
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...     # Handle messages...
... finally:
...     manager.remove_connection(conn_id)

Cleanup with logging:

>>> def cleanup_connection(conn_id: str):
...     rooms = manager.get_connection_rooms(conn_id)
...     manager.remove_connection(conn_id)
...     logger.info(f"Removed {conn_id} from {len(rooms)} rooms")

Graceful disconnect with notification:

>>> async def disconnect_user(conn_id: str):
...     # Get rooms before removal
...     user_rooms = manager.get_connection_rooms(conn_id)
...
...     # Notify rooms about departure
...     for room in user_rooms:
...         await manager.broadcast_to_room(
...             room,
...             f"User {conn_id} disconnected"
...         )
...
...     # Remove connection
...     manager.remove_connection(conn_id)

Bulk cleanup on shutdown:

>>> async def cleanup_all():
...     conn_ids = list(manager.connections.keys())
...     for conn_id in conn_ids:
...         manager.remove_connection(conn_id)

Note

This method is safe to call even if connection_id doesn’t exist. All room memberships are automatically cleaned up. Empty rooms remain in the rooms dictionary (with empty sets).

See also

add_connection(): Register a new connection leave_room(): Remove from specific room only

join_room(connection_id: str, room: str)[source]

Add a connection to a named room for targeted broadcasting.

Adds the specified connection to a room, allowing it to receive broadcasts sent to that room. Connections can be members of multiple rooms simultaneously. If the room doesn’t exist, it will be created automatically.

This operation only succeeds if the connection exists in the manager. Attempting to add a non-existent connection to a room will silently fail.

Parameters:
  • connection_id – Unique identifier of the connection to add to the room. Must be a connection previously registered with add_connection().

  • room – Name of the room to join. Can be any string identifier. Room is created automatically if it doesn’t exist.

Examples

Join single room:

>>> manager.add_connection(conn_id, websocket)
>>> manager.join_room(conn_id, "general")

Join multiple rooms:

>>> manager.join_room(conn_id, "global_chat")
>>> manager.join_room(conn_id, "announcements")
>>> manager.join_room(conn_id, f"user_{user_id}_private")

Dynamic room joining based on user:

>>> @app.websocket("/ws/subscribe")
>>> async def subscribe_handler(websocket):
...     await websocket.accept()
...     conn_id = str(uuid.uuid4())
...     manager.add_connection(conn_id, websocket)
...
...     # User sends room names to join
...     while True:
...         message = await websocket.receive()
...         if message.type == "text":
...             data = json.loads(message.data)
...             if data["action"] == "join":
...                 manager.join_room(conn_id, data["room"])
...                 await websocket.send_text(
...                     f"Joined room: {data['room']}"
...                 )

Topic-based subscriptions:

>>> # User subscribes to multiple topics
>>> user_topics = ["python", "javascript", "devops"]
>>> for topic in user_topics:
...     manager.join_room(conn_id, f"topic_{topic}")

Permission-based room access:

>>> async def join_with_permission(conn_id, room, user):
...     if await user.has_permission(room):
...         manager.join_room(conn_id, room)
...         return True
...     return False

Note

Silently does nothing if connection_id not in manager.connections. Joining the same room multiple times has no additional effect. Room is created automatically on first join.

See also

leave_room(): Remove connection from room broadcast_to_room(): Send message to room members get_room_connections(): Get all connections in room

leave_room(connection_id: str, room: str)[source]

Remove a connection from a specific room.

Removes the connection from the specified room without affecting its membership in other rooms or removing it from the manager. This is useful for managing dynamic subscriptions where users can leave specific channels without disconnecting entirely.

The operation is idempotent - calling it multiple times or on a connection that isn’t in the room has no effect.

Parameters:
  • connection_id – Unique identifier of the connection to remove from room.

  • room – Name of the room to leave.

Examples

Leave single room:

>>> manager.leave_room(conn_id, "general")

Dynamic unsubscribe:

>>> @app.websocket("/ws/manage")
>>> async def manage_subscriptions(websocket):
...     conn_id = str(uuid.uuid4())
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...
...     while True:
...         message = await websocket.receive()
...         if message.type == "text":
...             data = json.loads(message.data)
...             if data["action"] == "leave":
...                 manager.leave_room(conn_id, data["room"])
...                 await websocket.send_text(
...                     f"Left room: {data['room']}"
...                 )

Leave multiple rooms:

>>> rooms_to_leave = ["channel1", "channel2", "channel3"]
>>> for room in rooms_to_leave:
...     manager.leave_room(conn_id, room)

Permission revocation:

>>> async def revoke_access(conn_id: str, room: str):
...     manager.leave_room(conn_id, room)
...     # Notify user
...     ws = manager.connections.get(conn_id)
...     if ws:
...         await ws.send_text(
...             f"Access to {room} has been revoked"
...         )

Clean exit from specific room:

>>> # Leave room but stay connected
>>> rooms = manager.get_connection_rooms(conn_id)
>>> if "temporary_room" in rooms:
...     manager.leave_room(conn_id, "temporary_room")

Note

Does not remove connection from manager, only from the room. Safe to call even if connection not in room or room doesn’t exist. Connection remains active and in other rooms.

See also

join_room(): Add connection to room remove_connection(): Remove connection entirely get_connection_rooms(): Check which rooms connection is in

async broadcast_to_room(room: str, message: str)[source]

Broadcast text message to all connections in a specific room.

Sends the same text message to every connection currently in the specified room. Failed sends (due to closed connections) automatically trigger cleanup of that connection from the manager.

This is an async operation that sends messages sequentially to each connection in the room. For better performance with large rooms, consider implementing parallel sends with asyncio.gather().

Parameters:
  • room – Name of the room to broadcast to.

  • message – Text message to send to all room members. Must be a string.

Examples

Simple room broadcast:

>>> await manager.broadcast_to_room("lobby", "Welcome everyone!")

Chat message broadcasting:

>>> @app.websocket("/chat/<room>")
>>> async def chat_handler(websocket, room):
...     conn_id = str(uuid.uuid4())
...     await websocket.accept()
...     manager.add_connection(conn_id, websocket)
...     manager.join_room(conn_id, room)
...
...     while True:
...         msg = await websocket.receive()
...         if msg.type == "text":
...             # Broadcast to everyone in room
...             await manager.broadcast_to_room(
...                 room,
...                 f"{conn_id[:8]}: {msg.data}"
...             )

JSON broadcast:

>>> import json
>>> data = {"type": "update", "value": 42}
>>> await manager.broadcast_to_room(
...     "notifications",
...     json.dumps(data)
... )

Notify room about events:

>>> async def notify_room_event(room: str, event: str, data: dict):
...     message = json.dumps({
...         "event": event,
...         "data": data,
...         "timestamp": time.time()
...     })
...     await manager.broadcast_to_room(room, message)

System announcements:

>>> async def system_announce(room: str, announcement: str):
...     await manager.broadcast_to_room(
...         room,
...         f"[SYSTEM] {announcement}"
...     )

Multiple room broadcast:

>>> async def broadcast_to_multiple_rooms(rooms: list, message: str):
...     for room in rooms:
...         await manager.broadcast_to_room(room, message)

Note

Messages are sent sequentially, not in parallel. Failed sends automatically clean up dead connections. If room doesn’t exist or is empty, nothing happens. For binary data, use websocket.send_bytes() directly on each connection.

See also

broadcast_to_all(): Broadcast to all connections get_room_connections(): Get connection IDs in room WebSocket.send_text: Underlying send method

async broadcast_to_all(message: str)[source]

Broadcast text message to every active connection.

Sends the same text message to all connections registered with the manager, regardless of room membership. This is useful for system-wide announcements, critical alerts, or global events.

Failed sends (due to closed connections) automatically trigger cleanup of that connection. Uses list() to create a snapshot of connections to avoid modification during iteration.

Parameters:

message – Text message to send to all connections. Must be a string.

Examples

System-wide announcement:

>>> await manager.broadcast_to_all(
...     "Server will restart in 5 minutes"
... )

Global notification:

>>> import json
>>> notification = {
...     "type": "announcement",
...     "message": "New feature deployed!",
...     "priority": "high"
... }
>>> await manager.broadcast_to_all(json.dumps(notification))

Scheduled broadcast:

>>> import asyncio
>>>
>>> async def scheduled_announcement():
...     while True:
...         await asyncio.sleep(3600)  # Every hour
...         await manager.broadcast_to_all(
...             "Hourly server status: All systems operational"
...         )

Emergency alert:

>>> async def emergency_broadcast(alert_message: str):
...     priority_msg = json.dumps({
...         "type": "emergency",
...         "message": alert_message,
...         "timestamp": time.time()
...     })
...     await manager.broadcast_to_all(priority_msg)

Server shutdown notification:

>>> async def notify_shutdown():
...     await manager.broadcast_to_all(
...         "Server shutting down for maintenance"
...     )
...     await asyncio.sleep(2)  # Give time to send
...     # Proceed with shutdown...

Broadcast with counter:

>>> async def count_broadcast_recipients(message: str):
...     initial_count = len(manager.connections)
...     await manager.broadcast_to_all(message)
...     final_count = len(manager.connections)
...     failed = initial_count - final_count
...     return {"sent": final_count, "failed": failed}

Note

Sends to ALL connections, ignoring room memberships. Messages are sent sequentially, not in parallel. Creates connection snapshot to handle concurrent modifications. Failed sends automatically clean up dead connections. For large connection counts, consider rate limiting or batching.

See also

broadcast_to_room(): Broadcast to specific room get_room_connections(): Get connections in a room WebSocket.send_text: Underlying send method

get_room_connections(room: str) Set[str][source]

Get all connection IDs currently in a specific room.

Returns a copy of the set of connection IDs that are members of the specified room. The returned set is a copy, so modifications won’t affect the manager’s internal state.

Parameters:

room – Name of the room to query.

Returns:

Set of connection_id strings for all connections in the room. Returns empty set if room doesn’t exist or has no members.

Examples

Get room size:

>>> connections = manager.get_room_connections("lobby")
>>> print(f"Lobby has {len(connections)} users")

Check if connection in room:

>>> room_members = manager.get_room_connections("vip_lounge")
>>> if conn_id in room_members:
...     print("User is in VIP lounge")

Room statistics:

>>> def get_room_info(room: str) -> dict:
...     connections = manager.get_room_connections(room)
...     return {
...         "room": room,
...         "member_count": len(connections),
...         "members": list(connections)
...     }

Iterate over room members:

>>> room_members = manager.get_room_connections("announcements")
>>> for conn_id in room_members:
...     ws = manager.connections.get(conn_id)
...     if ws:
...         print(f"Connection {conn_id} is active")

Compare room sizes:

>>> def find_most_popular_room() -> str:
...     max_size = 0
...     popular = None
...     for room in manager.rooms.keys():
...         size = len(manager.get_room_connections(room))
...         if size > max_size:
...             max_size = size
...             popular = room
...     return popular

Note

Returns a copy of the set, safe to modify without affecting manager. Empty set returned if room doesn’t exist. Connection IDs in set may reference closed connections.

See also

get_connection_rooms(): Get rooms a connection is in join_room(): Add connection to room broadcast_to_room(): Send message to room

get_connection_rooms(connection_id: str) Set[str][source]

Get all rooms that a specific connection is a member of.

Iterates through all rooms to find which ones contain the specified connection. Returns a new set containing the room names, so modifications won’t affect the manager’s internal state.

Parameters:

connection_id – Unique identifier of the connection to query.

Returns:

Set of room name strings that the connection is a member of. Returns empty set if connection is not in any rooms.

Examples

Check user’s room memberships:

>>> rooms = manager.get_connection_rooms(conn_id)
>>> print(f"User is in {len(rooms)} rooms: {rooms}")

Verify room membership:

>>> user_rooms = manager.get_connection_rooms(conn_id)
>>> if "admin_only" in user_rooms:
...     print("User has admin access")

Display user’s subscriptions:

>>> async def show_subscriptions(websocket, conn_id):
...     rooms = manager.get_connection_rooms(conn_id)
...     await websocket.send_text(
...         f"You are subscribed to: {', '.join(rooms)}"
...     )

Leave all rooms except one:

>>> def keep_only_room(conn_id: str, keep_room: str):
...     current_rooms = manager.get_connection_rooms(conn_id)
...     for room in current_rooms:
...         if room != keep_room:
...             manager.leave_room(conn_id, room)

Connection activity report:

>>> def get_connection_report(conn_id: str) -> dict:
...     rooms = manager.get_connection_rooms(conn_id)
...     ws = manager.connections.get(conn_id)
...     return {
...         "connection_id": conn_id,
...         "active": ws is not None,
...         "room_count": len(rooms),
...         "rooms": list(rooms)
...     }

Cleanup before disconnect:

>>> async def graceful_disconnect(conn_id: str):
...     # Notify all rooms user was in
...     rooms = manager.get_connection_rooms(conn_id)
...     for room in rooms:
...         await manager.broadcast_to_room(
...             room,
...             f"User {conn_id} has left {room}"
...         )
...     # Remove connection
...     manager.remove_connection(conn_id)

Note

Returns a new set, safe to modify without affecting manager. Empty set if connection not in any rooms. Performance is O(n) where n is total number of rooms.

See also

get_room_connections(): Get connections in a room join_room(): Add connection to room leave_room(): Remove from specific room

Sessions

Session storage backends for Gobstopper

class gobstopper.sessions.storage.BaseSessionStorage[source]

Bases: ABC

Abstract base class for session storage.

abstractmethod load(session_id: str) Dict[str, Any] | None[source]

Load a session from storage.

abstractmethod save(session_id: str, data: Dict[str, Any])[source]

Save a session to storage.

abstractmethod delete(session_id: str)[source]

Delete a session from storage.

abstractmethod cleanup()[source]

Clean up expired sessions.

class gobstopper.sessions.storage.AsyncBaseSessionStorage[source]

Bases: ABC

abstractmethod async load(session_id: str) Dict[str, Any] | None[source]
abstractmethod async save(session_id: str, data: Dict[str, Any]) None[source]
abstractmethod async delete(session_id: str) None[source]
async cleanup() None[source]
async gobstopper.sessions.storage.maybe_await(result)[source]
class gobstopper.sessions.storage.FileSessionStorage(directory: Path)[source]

Bases: BaseSessionStorage

File-based session storage.

__init__(directory: Path)[source]
load(session_id: str) Dict[str, Any] | None[source]

Load session data from a file.

save(session_id: str, data: Dict[str, Any])[source]

Save session data to a file.

delete(session_id: str)[source]

Delete a session file.

cleanup()[source]

Remove expired session files.

class gobstopper.sessions.redis_storage.AsyncRedisSessionStorage(client: None, key_prefix: str = 'session:', ttl: int = 604800)[source]

Bases: AsyncBaseSessionStorage

An asynchronous session storage backend using Redis. This backend is recommended for production deployments.

__init__(client: None, key_prefix: str = 'session:', ttl: int = 604800)[source]
async load(session_id: str) Dict[str, Any] | None[source]
async save(session_id: str, data: Dict[str, Any]) None[source]
async delete(session_id: str) None[source]
async cleanup() None[source]

Middleware

Static file middleware for Gobstopper framework.

This module provides secure static file serving with automatic MIME type detection, caching headers, and directory traversal protection. This is the pure Python implementation, suitable for development and production use.

class gobstopper.middleware.static.StaticFileMiddleware(static_dir: str = 'static', url_prefix: str = '/static')[source]

Bases: object

Middleware for serving static files securely.

This middleware serves static files from a designated directory with built-in security features to prevent directory traversal attacks. It automatically detects MIME types and adds appropriate caching headers.

Security Features:
  • Directory traversal protection (prevents ‘..’ attacks)

  • Path validation to ensure files are within static directory

  • Automatic MIME type detection

  • 403 Forbidden for invalid paths

  • 404 Not Found for missing files

Performance Features:
  • Cache-Control headers (1 hour default)

  • Content-Type auto-detection

  • Content-Length headers

  • Efficient file reading

Parameters:
  • static_dir – Directory containing static files. Can be relative or absolute. Default is ‘static’. The directory must exist.

  • url_prefix – URL prefix for static routes. Must start with ‘/’. Default is ‘/static’. Requests to this prefix will serve static files.

Examples

Basic static file serving:

from gobstopper.middleware import StaticFileMiddleware

# Serve files from './static' at '/static' URLs
static = StaticFileMiddleware()
app.add_middleware(static)

Custom directory and prefix:

static = StaticFileMiddleware(
    static_dir='public',
    url_prefix='/assets'
)
app.add_middleware(static)

Multiple static directories:

# Serve from different directories with different prefixes
app.add_middleware(StaticFileMiddleware('css', '/css'))
app.add_middleware(StaticFileMiddleware('js', '/js'))
app.add_middleware(StaticFileMiddleware('images', '/images'))

Note

  • For production with high traffic, consider using RustStaticMiddleware

  • The middleware automatically adds ‘public, max-age=3600’ cache headers

  • MIME types are detected using Python’s mimetypes module

  • Unknown file types default to ‘application/octet-stream’

Security Considerations:
  • Never serve files from user-controlled directories

  • Be careful with symlinks (they can escape the static directory)

  • Consider using Content-Security-Policy headers

  • Validate file extensions if serving user-uploaded content

See also

  • RustStaticMiddleware: High-performance Rust-based static serving

  • HybridStaticMiddleware: Combines Rust and Python for optimal performance

__init__(static_dir: str = 'static', url_prefix: str = '/static')[source]
async __call__(request: Request, call_next: Callable[[Request], Awaitable[Any]]) Response[source]

Process request and serve static files if needed.

Checks if the request path matches the static URL prefix. If so, serves the corresponding file; otherwise, passes the request to the next handler.

Parameters:
  • request – The incoming HTTP request.

  • call_next – The next middleware or handler in the chain.

Returns:

Response containing the file contents if path matches static prefix, otherwise the result from call_next().

Note

  • Only processes requests matching the configured url_prefix

  • Other requests are passed through unchanged

CORS middleware for Gobstopper framework.

This module provides Cross-Origin Resource Sharing (CORS) support for web applications, enabling secure cross-origin requests from browsers. CORS is essential for modern web applications that need to make requests from different domains.

class gobstopper.middleware.cors.CORSMiddleware(origins: List[str] | None = None, methods: List[str] | None = None, headers: List[str] | None = None, allow_credentials: bool = False, max_age: int = 3600)[source]

Bases: object

Middleware for handling Cross-Origin Resource Sharing (CORS).

This middleware adds appropriate CORS headers to responses and handles preflight OPTIONS requests. It supports configurable origins, methods, headers, and credential handling.

Security Considerations:
  • Use specific origins instead of ‘*’ when possible

  • Never use ‘*’ with credentials enabled (browsers will reject)

  • Always include Vary header when using specific origins

  • Consider the security implications of exposed headers

Parameters:
  • origins – List of allowed origins. Use [‘*’] for all origins (default). Examples: [’https://example.com’, ‘https://app.example.com’] Note: ‘*’ cannot be used with credentials.

  • methods – List of allowed HTTP methods. Defaults to [‘GET’, ‘POST’, ‘PUT’, ‘DELETE’, ‘OPTIONS’].

  • headers – List of allowed request headers. Defaults to [‘Content-Type’, ‘Authorization’].

  • allow_credentials – Whether to allow credentials (cookies, auth headers). Default is False. When True, specific origins must be listed.

  • max_age – How long (in seconds) browsers should cache preflight responses. Default is 3600 (1 hour). Maximum varies by browser (usually 86400).

Examples

Basic CORS for public API:

from gobstopper.middleware import CORSMiddleware

# Allow all origins (no credentials)
cors = CORSMiddleware()
app.add_middleware(cors)

CORS with specific origins and credentials:

cors = CORSMiddleware(
    origins=['https://app.example.com', 'https://admin.example.com'],
    methods=['GET', 'POST', 'PUT', 'DELETE'],
    headers=['Content-Type', 'Authorization', 'X-Custom-Header'],
    allow_credentials=True,
    max_age=7200  # 2 hours
)
app.add_middleware(cors)

Development setup (permissive):

cors = CORSMiddleware(
    origins=['http://localhost:3000', 'http://localhost:8080'],
    allow_credentials=True
)
app.add_middleware(cors)

Note

  • Preflight requests (OPTIONS) are handled automatically

  • The middleware adds ‘Vary: Origin’ header when using specific origins

  • Browsers enforce CORS; this middleware just provides the headers

  • Empty origin headers are rejected for security

See also

__init__(origins: List[str] | None = None, methods: List[str] | None = None, headers: List[str] | None = None, allow_credentials: bool = False, max_age: int = 3600)[source]
async __call__(request: Request, call_next: Callable[[Request], Awaitable[Any]]) Response[source]

Process request and add CORS headers.

This method handles both preflight OPTIONS requests and regular requests, adding appropriate CORS headers based on the middleware configuration.

Parameters:
  • request – The incoming HTTP request.

  • call_next – The next middleware or handler in the chain.

Returns:

Response with CORS headers added if the origin is allowed.

Note

  • Preflight requests receive a 204 No Content response

  • Regular requests are passed through with CORS headers added

  • Origins are validated before headers are added

Security middleware for Gobstopper framework.

This module provides comprehensive security features for web applications including: - CSRF (Cross-Site Request Forgery) protection - Secure session management with multiple storage backends - Modern security headers (HSTS, CSP, COOP, COEP, etc.) - Cookie security with environment-aware defaults - Session ID signing and verification

The middleware is designed to be production-ready with secure defaults that are automatically enforced in production environments.

class gobstopper.middleware.security.SecurityMiddleware(secret_key: str | None = None, enable_csrf: bool = True, enable_security_headers: bool = True, hsts_max_age: int = 31536000, hsts_include_subdomains: bool = True, csp_policy: str = "default-src 'self'; object-src 'none'", referrer_policy: str = 'strict-origin-when-cross-origin', coop_policy: str = 'same-origin', coep_policy: str = 'require-corp', session_storage: BaseSessionStorage | AsyncBaseSessionStorage | None = None, debug: bool = False, cookie_name: str = 'session_id', cookie_secure: bool = True, cookie_httponly: bool = True, cookie_samesite: str = 'Strict', cookie_path: str = '/', cookie_domain: str | None = None, cookie_max_age: int = 604800, rolling_sessions: bool = False, sign_session_id: bool = False)[source]

Bases: object

Middleware for comprehensive application security.

This middleware provides multiple layers of security protection including CSRF tokens, secure session management, and modern security headers. It’s designed to be production-ready with secure defaults that are automatically enforced based on the environment.

Security Features:
  • CSRF Protection: Validates tokens for state-changing requests

  • Session Management: Secure sessions with multiple storage backends

  • Security Headers: HSTS, CSP, COOP, COEP, X-Frame-Options, etc.

  • Cookie Security: Secure, HttpOnly, SameSite flags with environment enforcement

  • Session ID Signing: HMAC-SHA256 signing to prevent tampering

  • Rolling Sessions: Optional session renewal on each request

Production Behavior:

When ENV=production, the middleware enforces: - Cookie Secure flag (HTTPS only) - HttpOnly flag (prevents JavaScript access) - SameSite=Strict (prevents CSRF attacks) These cannot be disabled in production for safety.

Parameters:
  • secret_key – Secret key for CSRF and session signing. Required for production. Should be a long, random string. If not provided, a random key is generated but this is not suitable for multi-server deployments.

  • enable_csrf – Enable CSRF protection for POST/PUT/DELETE/PATCH requests. Default is True. Recommended to keep enabled.

  • enable_security_headers – Add security headers to responses. Default is True. Includes HSTS, CSP, COOP, COEP, etc.

  • hsts_max_age – HTTP Strict Transport Security max-age in seconds. Default is 31536000 (1 year). Tells browsers to use HTTPS.

  • hsts_include_subdomains – Include subdomains in HSTS policy. Default is True. Applies HSTS to all subdomains.

  • csp_policy – Content Security Policy directive string. Default restricts to same-origin and blocks objects. Customize based on your application’s needs.

  • referrer_policy – Referrer-Policy header value. Default is ‘strict-origin-when-cross-origin’.

  • coop_policy – Cross-Origin-Opener-Policy header value. Default is ‘same-origin’. Prevents window.opener access.

  • coep_policy – Cross-Origin-Embedder-Policy header value. Default is ‘require-corp’. Enables SharedArrayBuffer.

  • session_storage – Storage backend for sessions. Defaults to FileSessionStorage. Can be FileSessionStorage, MemorySessionStorage, or custom implementation.

  • debug – Debug mode. Disables some security warnings. Default is False. Never use True in production.

  • cookie_name – Name of the session cookie. Default is ‘session_id’.

  • cookie_secure – Require HTTPS for session cookies. Default is True. Forced to True in production.

  • cookie_httponly – Prevent JavaScript access to session cookies. Default is True. Forced to True in production.

  • cookie_samesite – SameSite cookie attribute. Default is ‘Strict’. Forced to ‘Strict’ in production. Options: ‘Strict’, ‘Lax’, ‘None’ (requires Secure=True).

  • cookie_path – Path scope for session cookie. Default is ‘/’. Cookie valid for entire site.

  • cookie_domain – Domain scope for session cookie. Default is None (current domain only).

  • cookie_max_age – Session cookie lifetime in seconds. Default is SESSION_EXPIRATION_TIME (typically 24 hours).

  • rolling_sessions – Refresh session on every request. Default is False. When True, session expiry is extended on each request.

  • sign_session_id – Sign session IDs with HMAC-SHA256. Default is False. Prevents session ID tampering when enabled.

Examples

Basic security setup:

from gobstopper.middleware import SecurityMiddleware

security = SecurityMiddleware(
    secret_key='your-secret-key-here',
    enable_csrf=True,
    enable_security_headers=True
)
app.add_middleware(security)

Production configuration:

import os

security = SecurityMiddleware(
    secret_key=os.environ['SECRET_KEY'],
    enable_csrf=True,
    enable_security_headers=True,
    cookie_secure=True,
    cookie_httponly=True,
    cookie_samesite='Strict',
    rolling_sessions=True,
    sign_session_id=True,
    csp_policy="default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'"
)
app.add_middleware(security)

Custom session storage:

from gobstopper.sessions.storage import MemorySessionStorage

security = SecurityMiddleware(
    secret_key='your-secret-key',
    session_storage=MemorySessionStorage()
)
app.add_middleware(security)

Development setup (relaxed):

security = SecurityMiddleware(
    secret_key='dev-key',
    cookie_secure=False,  # Allow HTTP in development
    cookie_samesite='Lax',
    debug=True
)
app.add_middleware(security)

Note

  • Always use a strong, persistent secret_key in production

  • CSRF tokens must be included in forms or headers for state-changing requests

  • Session data is available via request.session dictionary

  • Call generate_csrf_token() to create tokens for templates

  • Use regenerate_session_id() after privilege escalation (e.g., login)

Security Best Practices:
  • Set ENV=production for production deployments

  • Use HTTPS in production (required for secure cookies)

  • Regenerate session ID after authentication changes

  • Implement session timeout and idle timeout

  • Use database-backed sessions for multi-server deployments

  • Regularly rotate the secret_key (requires session invalidation)

  • Monitor and log security events

See also

__init__(secret_key: str | None = None, enable_csrf: bool = True, enable_security_headers: bool = True, hsts_max_age: int = 31536000, hsts_include_subdomains: bool = True, csp_policy: str = "default-src 'self'; object-src 'none'", referrer_policy: str = 'strict-origin-when-cross-origin', coop_policy: str = 'same-origin', coep_policy: str = 'require-corp', session_storage: BaseSessionStorage | AsyncBaseSessionStorage | None = None, debug: bool = False, cookie_name: str = 'session_id', cookie_secure: bool = True, cookie_httponly: bool = True, cookie_samesite: str = 'Strict', cookie_path: str = '/', cookie_domain: str | None = None, cookie_max_age: int = 604800, rolling_sessions: bool = False, sign_session_id: bool = False)[source]

Return a signed cookie value when signing is enabled.

Signs the session ID using HMAC-SHA256 to prevent tampering. The signature is appended to the session ID with a dot separator.

Parameters:

session_id – The session ID to sign.

Returns:

Signed session ID in format “session_id.signature” if signing is enabled, otherwise returns the original session_id unchanged.

Note

  • Only signs if sign_session_id=True was set during initialization

  • Uses HMAC-SHA256 with the middleware’s secret key

  • Signature prevents session ID tampering and fixation attacks

get_session_id(request: Request) str | None[source]

Public accessor for the current request’s session ID.

Retrieves the session ID that was set by the middleware during request processing.

Parameters:

request – The request object.

Returns:

The session ID string if a session is active, None otherwise.

Note

  • This is the preferred way to access session IDs from application code

  • Returns None if no session exists or middleware hasn’t run yet

async __call__(request: Request, call_next: Callable[[Request], Awaitable[Any]]) Response[source]

Process request through security middleware.

Handles session loading, CSRF validation, request processing, and session saving. Also adds security headers to responses.

Parameters:
  • request – The incoming HTTP request.

  • call_next – The next middleware or handler in the chain.

Returns:

Response with security headers added, or 403 Forbidden if CSRF validation fails.

Note

  • Loads session from storage if session cookie exists

  • Validates CSRF token for POST/PUT/DELETE/PATCH requests

  • Saves session if modified or rolling_sessions is enabled

  • Adds security headers to all responses

  • Sets request.session dict for application use

generate_csrf_token(session: Dict[str, Any]) str[source]

Generate a CSRF token and store it in the session.

Creates a cryptographically secure random token and stores it in the session. This token must be included in subsequent state-changing requests.

Parameters:

session – The session dictionary to store the token in.

Returns:

The generated CSRF token string (URL-safe base64).

Note

  • Token is stored in session[‘csrf_token’]

  • Should be called when rendering forms that will submit data

  • Tokens are validated on POST/PUT/DELETE/PATCH requests

  • Token is unique per session and remains valid until session expires

Example

In a route handler:

@app.get('/form')
async def show_form(request):
    token = security.generate_csrf_token(request.session)
    return template('form.html', csrf_token=token)
create_session_id() str[source]

Generate a new cryptographically secure session ID.

Creates a URL-safe base64-encoded random session ID suitable for use as a session identifier.

Returns:

A 32-byte URL-safe random session ID string.

Note

  • Uses secrets.token_urlsafe() for cryptographic randomness

  • IDs are 43 characters long (32 bytes base64-encoded)

  • Safe to use in cookies and URLs

async create_session(data: dict) str[source]

Create a new session with the provided data.

Generates a new session ID, saves the session data to storage, and returns the session ID.

Parameters:

data – Dictionary of session data to store.

Returns:

The newly created session ID.

Note

  • Automatically generates a secure session ID

  • Saves to configured session storage

  • Use this for programmatic session creation (e.g., after login)

Example:

session_id = await security.create_session({
    'user_id': user.id,
    'authenticated': True
})
# Set cookie with session_id
async destroy_session(session_id: str)[source]

Destroy a session by deleting it from storage.

Removes all session data from the storage backend. This is typically called during logout.

Parameters:

session_id – The session ID to destroy.

Note

  • Deletes session from storage immediately

  • Does not clear the client’s cookie (handle separately)

  • Safe to call even if session doesn’t exist

Example:

@app.post('/logout')
async def logout(request):
    session_id = security.get_session_id(request)
    if session_id:
        await security.destroy_session(session_id)
    # Also clear the cookie in response
    return Response('Logged out')
async regenerate_session_id(request: Request) str | None[source]

Regenerate session ID for the current session.

Creates a new session ID and migrates existing session data to it, then deletes the old session. This is critical for preventing session fixation attacks after privilege escalation.

Parameters:

request – The request object with the current session.

Returns:

The new session ID string, or None if no session was active.

Note

  • Always call this after login or privilege changes

  • Preserves all session data while changing the ID

  • Old session is deleted to prevent reuse

  • Updates request._session_id with the new ID

Example

After successful login:

@app.post('/login')
async def login(request):
    if validate_credentials(username, password):
        # Regenerate to prevent session fixation
        new_sid = await security.regenerate_session_id(request)
        request.session['user_id'] = user.id
        request.session['authenticated'] = True
        return Response('Login successful')

Templates

Jinja2 template engine for Gobstopper framework

class gobstopper.templates.engine.TemplateEngine(template_folder: str | Path = 'templates', auto_reload: bool = True, cache_size: int = 400)[source]

Bases: object

Jinja2-based template engine with async support

__init__(template_folder: str | Path = 'templates', auto_reload: bool = True, cache_size: int = 400)[source]
add_search_path(path: str | Path)[source]

Add an additional search path for templates (used by blueprints).

async render_template_async(template_name: str, **context) str[source]

Render template asynchronously

add_filter(name: str, func)[source]

Add a custom filter

add_global(name: str, func)[source]

Add a custom global function

Utilities

Simple in-memory token-bucket rate limiter and middleware helpers.

Usage example:

from gobstopper.utils.rate_limiter import TokenBucketLimiter, rate_limit

limiter = TokenBucketLimiter(rate=5, capacity=10) # 5 tokens/sec, burst 10

@app.get(‘/limited’) @rate_limit(limiter, key=lambda req: req.client_ip) async def limited(request):

return {‘ok’: True}

class gobstopper.utils.rate_limiter.TokenBucketLimiter(rate: float, capacity: int)[source]

Bases: object

__init__(rate: float, capacity: int)[source]

rate: tokens added per second capacity: maximum number of tokens (burst)

allow(key: str, cost: float = 1.0) bool[source]
gobstopper.utils.rate_limiter.rate_limit(limiter: TokenBucketLimiter, key: Callable[[Any], str] | None = None, cost: float = 1.0)[source]

Middleware decorator factory to enforce rate limits.

Example

limiter = TokenBucketLimiter(rate=5, capacity=10)

@app.get(‘/api/x’) @rate_limit(limiter, key=lambda req: req.client_ip) async def handler(request):

return {‘ok’: True}

Idempotency helper for POST endpoints.

Provides a simple in-memory TTL store to deduplicate requests based on an Idempotency-Key header or provided key argument. Suitable for a single process. For multi-process deployments, use a shared cache (Redis, etc.).

gobstopper.utils.idempotency.use_idempotency(ttl_seconds: float = 60.0)[source]

Decorator to enforce idempotency for handlers.

Reads key from request headers (Idempotency-Key) or request.args[“idempotency_key”]. Caches the result for ttl_seconds and returns cached response for repeated keys.

Note: Returns the exact previous handler return value. If the handler returns a Response, it will be reused as-is; for dict/list the JSONResponse wrapping will happen as usual in the app pipeline.

gobstopper.utils.idempotency.remember_idempotency(key: str, value: Any, ttl_seconds: float = 60.0)[source]

Manually store a result for an idempotency key with TTL.

gobstopper.utils.idempotency.get_idempotent(key: str) Any | None[source]

Get a cached result for a given idempotency key if present and not expired.