Source code for gobstopper.sessions.redis_storage

# src/wopr/sessions/redis_storage.py
import json
from typing import Optional, Dict, Any

try:
    from redis.asyncio import Redis
    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False
    Redis = None

from .storage import AsyncBaseSessionStorage, SESSION_EXPIRATION_TIME

[docs] class AsyncRedisSessionStorage(AsyncBaseSessionStorage): """ An asynchronous session storage backend using Redis. This backend is recommended for production deployments. """
[docs] def __init__(self, client: Redis, key_prefix: str = "session:", ttl: int = SESSION_EXPIRATION_TIME): self.client = client self.key_prefix = key_prefix self.ttl = ttl
def _key(self, session_id: str) -> str: return f"{self.key_prefix}{session_id}"
[docs] async def load(self, session_id: str) -> Optional[Dict[str, Any]]: raw = await self.client.get(self._key(session_id)) if not raw: return None try: return json.loads(raw) except Exception: # Handle cases where data in Redis is corrupted or not valid JSON return None
[docs] async def save(self, session_id: str, data: Dict[str, Any]) -> None: await self.client.set(self._key(session_id), json.dumps(data), ex=self.ttl)
[docs] async def delete(self, session_id: str) -> None: await self.client.delete(self._key(session_id))
[docs] async def cleanup(self) -> None: # Redis handles TTL automatically, so no explicit cleanup is needed. pass