Source code for gobstopper.utils.idempotency

"""
Idempotency helper for POST endpoints.

Provides a simple in-memory TTL store to deduplicate requests based on
an Idempotency-Key header or provided key argument. Suitable for a single
process. For multi-process deployments, use a shared cache (Redis, etc.).
"""
from __future__ import annotations

import time
import threading
from typing import Any, Callable, Optional, Tuple


class _TTLCache:
    def __init__(self):
        self._lock = threading.Lock()
        self._data: dict[str, Tuple[float, Any]] = {}

    def set(self, key: str, value: Any, ttl_seconds: float):
        expire_at = time.time() + ttl_seconds
        with self._lock:
            self._data[key] = (expire_at, value)

    def get(self, key: str) -> Optional[Any]:
        now = time.time()
        with self._lock:
            item = self._data.get(key)
            if not item:
                return None
            expire_at, value = item
            if expire_at < now:
                # expired; remove
                self._data.pop(key, None)
                return None
            return value

    def exists(self, key: str) -> bool:
        return self.get(key) is not None


_memory_cache = _TTLCache()


[docs] def use_idempotency(ttl_seconds: float = 60.0): """Decorator to enforce idempotency for handlers. Reads key from request headers (Idempotency-Key) or request.args["idempotency_key"]. Caches the result for ttl_seconds and returns cached response for repeated keys. Note: Returns the exact previous handler return value. If the handler returns a Response, it will be reused as-is; for dict/list the JSONResponse wrapping will happen as usual in the app pipeline. """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: async def wrapper(request, *args, **kwargs): key = request.headers.get('idempotency-key') if not key: # try query args try: key = (request.args.get('idempotency_key') or [''])[0] except Exception: key = None if not key: return await func(request, *args, **kwargs) cached = _memory_cache.get(key) if cached is not None: return cached result = await func(request, *args, **kwargs) _memory_cache.set(key, result, ttl_seconds) return result return wrapper return decorator
[docs] def remember_idempotency(key: str, value: Any, ttl_seconds: float = 60.0): """Manually store a result for an idempotency key with TTL.""" _memory_cache.set(key, value, ttl_seconds)
[docs] def get_idempotent(key: str) -> Optional[Any]: """Get a cached result for a given idempotency key if present and not expired.""" return _memory_cache.get(key)