# WebSocket Support Gobstopper provides first-class WebSocket support through the RSGI protocol, enabling real-time bidirectional communication between clients and servers. The WebSocket system includes connection management, room-based broadcasting, and efficient message handling. ## Overview The WebSocket system consists of: - **WebSocket**: Connection wrapper for RSGI protocol communication - **WebSocketManager**: Room-based connection management and broadcasting - **RSGI Integration**: Direct integration with Granian's high-performance WebSocket implementation Note: When a client attempts to open a WebSocket connection to a path with no matching route, the server now responds by initiating a close with code 1001 (Going Away). ## WebSocket Class The `WebSocket` class provides a high-level interface for WebSocket communication. ### Connection Lifecycle ```python @app.websocket("/ws") async def websocket_handler(websocket): # 1. Accept the connection await websocket.accept() # 2. Handle messages try: while True: message = await websocket.receive() if message.type == "text": await websocket.send_text(f"Echo: {message.data}") elif message.type == "close": break except ConnectionError: # Handle disconnection pass ``` ### Methods #### `accept()` Accept the WebSocket connection and establish transport: ```python @app.websocket("/ws/connect") async def connect_handler(websocket): try: await websocket.accept() await websocket.send_text("Connected successfully!") except ConnectionError: # Connection failed app.logger.error("Failed to accept WebSocket connection") ``` #### `send_text(data: str)` Send text messages to the client: ```python @app.websocket("/ws/notifications") async def notification_handler(websocket): await websocket.accept() # Send simple message await websocket.send_text("Welcome to notifications!") # Send JSON data import json notification = { "type": "alert", "message": "New message received", "timestamp": datetime.utcnow().isoformat() } await websocket.send_text(json.dumps(notification)) # Send formatted message user_id = get_user_id_from_connection(websocket) await websocket.send_text(f"User {user_id} connected at {datetime.now()}") ``` #### `send_bytes(data: bytes)` Send binary messages to the client: ```python @app.websocket("/ws/binary") async def binary_handler(websocket): await websocket.accept() # Send binary data image_data = await load_image_bytes("welcome.png") await websocket.send_bytes(image_data) # Send compressed data import gzip large_data = get_large_dataset() compressed = gzip.compress(json.dumps(large_data).encode()) await websocket.send_bytes(compressed) ``` #### `receive()` Receive messages from the client: ```python @app.websocket("/ws/chat") async def chat_handler(websocket): await websocket.accept() while True: message = await websocket.receive() if message.type == "text": # Handle text message data = json.loads(message.data) await process_chat_message(data) elif message.type == "bytes": # Handle binary message await process_binary_data(message.data) elif message.type == "close": # Client disconnected break elif message.type == "error": # Handle error app.logger.error(f"WebSocket error: {message.data}") break ``` ## WebSocket Patterns ### Echo Server ```python @app.websocket("/ws/echo") async def echo_server(websocket): await websocket.accept() try: while True: message = await websocket.receive() if message.type == "text": await websocket.send_text(f"Echo: {message.data}") elif message.type == "bytes": await websocket.send_bytes(b"Echo: " + message.data) elif message.type == "close": break except ConnectionError: app.logger.info("Echo client disconnected") ``` ### Real-time Updates ```python import asyncio @app.websocket("/ws/live-data") async def live_data_stream(websocket): await websocket.accept() try: # Send periodic updates while True: data = await get_live_metrics() update = { "timestamp": datetime.utcnow().isoformat(), "cpu_usage": data["cpu"], "memory_usage": data["memory"], "active_connections": data["connections"] } await websocket.send_text(json.dumps(update)) await asyncio.sleep(1) # Update every second except ConnectionError: app.logger.info("Live data client disconnected") ``` ### File Upload Progress ```python @app.websocket("/ws/upload-progress/") async def upload_progress(websocket, upload_id): await websocket.accept() try: while True: # Check upload progress progress = await get_upload_progress(upload_id) if progress is None: await websocket.send_text(json.dumps({ "error": "Upload not found" })) break await websocket.send_text(json.dumps({ "upload_id": upload_id, "progress": progress["percentage"], "bytes_uploaded": progress["bytes_uploaded"], "total_bytes": progress["total_bytes"], "status": progress["status"] })) if progress["status"] in ["completed", "failed"]: break await asyncio.sleep(0.5) except ConnectionError: app.logger.info(f"Upload progress client disconnected: {upload_id}") ``` ## WebSocketManager The `WebSocketManager` class provides room-based connection management for building chat systems, live collaboration tools, and broadcast features. ### Basic Usage ```python from gobstopper.websocket.manager import WebSocketManager # Create global manager instance websocket_manager = WebSocketManager() @app.websocket("/ws/chat") async def chat_handler(websocket): await websocket.accept() # Generate unique connection ID connection_id = str(uuid.uuid4()) # Add to manager websocket_manager.add_connection(connection_id, websocket) try: while True: message = await websocket.receive() if message.type == "text": data = json.loads(message.data) if data["type"] == "join_room": # Join a chat room websocket_manager.join_room(connection_id, data["room"]) await websocket.send_text(json.dumps({ "type": "joined", "room": data["room"] })) elif data["type"] == "chat_message": # Broadcast to room message_data = { "type": "message", "user": data["user"], "message": data["message"], "timestamp": datetime.utcnow().isoformat() } await websocket_manager.broadcast_to_room( data["room"], json.dumps(message_data) ) elif message.type == "close": break except ConnectionError: pass finally: # Cleanup websocket_manager.remove_connection(connection_id) ``` ### Room Management ```python # Chat rooms with user management user_connections = {} # user_id -> connection_id mapping @app.websocket("/ws/rooms/") async def room_handler(websocket, room_id): await websocket.accept() # Authenticate user (example) auth_token = websocket.scope.query_string.decode().split('token=')[1] user = await authenticate_token(auth_token) if not user: await websocket.close(1008, "Authentication failed") return connection_id = str(uuid.uuid4()) user_connections[user.id] = connection_id # Add to manager and join room websocket_manager.add_connection(connection_id, websocket) websocket_manager.join_room(connection_id, room_id) # Notify others of new user await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "user_joined", "user_id": user.id, "username": user.username, "timestamp": datetime.utcnow().isoformat() })) try: async for message in receive_messages(websocket): if message["type"] == "chat": # Broadcast chat message await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "message", "user_id": user.id, "username": user.username, "message": message["content"], "timestamp": datetime.utcnow().isoformat() })) elif message["type"] == "typing": # Broadcast typing indicator (exclude sender) typing_msg = json.dumps({ "type": "typing", "user_id": user.id, "username": user.username, "is_typing": message["is_typing"] }) room_connections = websocket_manager.get_room_connections(room_id) for conn_id in room_connections: if conn_id != connection_id: # Don't send to sender conn = websocket_manager.connections.get(conn_id) if conn: await conn.send_text(typing_msg) except ConnectionError: pass finally: # User left await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "user_left", "user_id": user.id, "username": user.username, "timestamp": datetime.utcnow().isoformat() })) # Cleanup websocket_manager.remove_connection(connection_id) if user.id in user_connections: del user_connections[user.id] async def receive_messages(websocket): """Async generator for receiving messages""" try: while True: message = await websocket.receive() if message.type == "text": yield json.loads(message.data) elif message.type == "close": break except ConnectionError: pass ``` ### Broadcasting Patterns ```python # Global announcements @app.post("/api/broadcast") async def send_global_announcement(request): data = await request.json() announcement = { "type": "announcement", "title": data["title"], "message": data["message"], "priority": data.get("priority", "normal"), "timestamp": datetime.utcnow().isoformat() } await websocket_manager.broadcast_to_all(json.dumps(announcement)) return {"status": "broadcasted", "connections": len(websocket_manager.connections)} # Room-specific notifications @app.post("/api/notify-room") async def notify_room(request): data = await request.json() notification = { "type": "room_notification", "message": data["message"], "from_admin": True, "timestamp": datetime.utcnow().isoformat() } await websocket_manager.broadcast_to_room( data["room_id"], json.dumps(notification) ) room_size = len(websocket_manager.get_room_connections(data["room_id"])) return {"status": "sent", "room_connections": room_size} # User-to-user messaging @app.post("/api/send-private-message") async def send_private_message(request): data = await request.json() # Find recipient connection recipient_connection_id = user_connections.get(data["recipient_id"]) if not recipient_connection_id: return {"error": "Recipient not online"}, 404 websocket = websocket_manager.connections.get(recipient_connection_id) if not websocket: return {"error": "Recipient connection not found"}, 404 message = { "type": "private_message", "from_user_id": data["sender_id"], "message": data["message"], "timestamp": datetime.utcnow().isoformat() } try: await websocket.send_text(json.dumps(message)) return {"status": "delivered"} except ConnectionError: return {"error": "Failed to deliver message"}, 500 ``` ## Advanced WebSocket Features ### Connection Authentication ```python async def authenticate_websocket(websocket): """Authenticate WebSocket connection before accepting""" # Get token from query string or headers query = websocket.scope.query_string.decode() token = None for param in query.split('&'): if param.startswith('token='): token = param.split('=')[1] break if not token: # Check headers as fallback for name, value in websocket.scope.headers: if name.decode() == 'authorization': auth_header = value.decode() if auth_header.startswith('Bearer '): token = auth_header[7:] break if not token: return None # Verify token try: user = await verify_jwt_token(token) return user except Exception: return None @app.websocket("/ws/authenticated") async def authenticated_websocket(websocket): # Authenticate before accepting user = await authenticate_websocket(websocket) if not user: await websocket.close(1008, "Authentication required") return await websocket.accept() # Add user info to websocket websocket.user = user # Continue with authenticated handler connection_id = str(uuid.uuid4()) websocket_manager.add_connection(connection_id, websocket) # ... rest of handler ``` ### Message Validation ```python from dataclasses import dataclass from typing import Optional @dataclass class ChatMessage: type: str content: str room: Optional[str] = None recipient: Optional[str] = None def validate_message(data: dict) -> Optional[ChatMessage]: """Validate incoming WebSocket messages""" try: msg_type = data.get("type") if not msg_type or not isinstance(msg_type, str): return None content = data.get("content", "").strip() if not content or len(content) > 1000: # Message length limit return None return ChatMessage( type=msg_type, content=content, room=data.get("room"), recipient=data.get("recipient") ) except (KeyError, ValueError, TypeError): return None @app.websocket("/ws/validated-chat") async def validated_chat_handler(websocket): await websocket.accept() connection_id = str(uuid.uuid4()) websocket_manager.add_connection(connection_id, websocket) try: while True: raw_message = await websocket.receive() if raw_message.type == "text": try: data = json.loads(raw_message.data) message = validate_message(data) if not message: await websocket.send_text(json.dumps({ "type": "error", "message": "Invalid message format" })) continue # Process valid message if message.type == "chat" and message.room: websocket_manager.join_room(connection_id, message.room) await websocket_manager.broadcast_to_room( message.room, json.dumps({ "type": "message", "content": message.content, "timestamp": datetime.utcnow().isoformat() }) ) except json.JSONDecodeError: await websocket.send_text(json.dumps({ "type": "error", "message": "Invalid JSON" })) elif raw_message.type == "close": break except ConnectionError: pass finally: websocket_manager.remove_connection(connection_id) ``` ### Connection Monitoring ```python import time class ConnectionMonitor: def __init__(self, websocket_manager): self.websocket_manager = websocket_manager self.connection_stats = {} # connection_id -> stats def track_connection(self, connection_id: str): """Start tracking a connection""" self.connection_stats[connection_id] = { "connected_at": time.time(), "messages_sent": 0, "messages_received": 0, "bytes_sent": 0, "bytes_received": 0 } def record_message(self, connection_id: str, sent: bool, message_size: int): """Record message statistics""" if connection_id in self.connection_stats: stats = self.connection_stats[connection_id] if sent: stats["messages_sent"] += 1 stats["bytes_sent"] += message_size else: stats["messages_received"] += 1 stats["bytes_received"] += message_size def get_connection_stats(self, connection_id: str): """Get statistics for a connection""" return self.connection_stats.get(connection_id, {}) def cleanup_connection(self, connection_id: str): """Clean up tracking data""" self.connection_stats.pop(connection_id, None) # Global monitor instance monitor = ConnectionMonitor(websocket_manager) @app.websocket("/ws/monitored") async def monitored_websocket(websocket): await websocket.accept() connection_id = str(uuid.uuid4()) # Start monitoring websocket_manager.add_connection(connection_id, websocket) monitor.track_connection(connection_id) try: while True: message = await websocket.receive() if message.type == "text": # Record received message monitor.record_message(connection_id, False, len(message.data)) # Echo back response = f"Received: {message.data}" await websocket.send_text(response) # Record sent message monitor.record_message(connection_id, True, len(response)) elif message.type == "close": break except ConnectionError: pass finally: websocket_manager.remove_connection(connection_id) monitor.cleanup_connection(connection_id) @app.get("/api/websocket-stats") async def get_websocket_stats(request): """Get WebSocket connection statistics""" stats = { "total_connections": len(websocket_manager.connections), "rooms": { room: len(connections) for room, connections in websocket_manager.rooms.items() }, "connection_details": {} } for conn_id in websocket_manager.connections: connection_stats = monitor.get_connection_stats(conn_id) if connection_stats: stats["connection_details"][conn_id] = connection_stats return stats ``` ## Error Handling ### Connection Error Handling ```python @app.websocket("/ws/robust") async def robust_websocket_handler(websocket): connection_id = str(uuid.uuid4()) try: await websocket.accept() websocket_manager.add_connection(connection_id, websocket) # Send welcome message await websocket.send_text(json.dumps({ "type": "welcome", "connection_id": connection_id })) while True: try: message = await websocket.receive() if message.type == "text": await handle_text_message(websocket, message.data) elif message.type == "bytes": await handle_binary_message(websocket, message.data) elif message.type == "close": app.logger.info(f"Client {connection_id} disconnected normally") break elif message.type == "error": app.logger.error(f"WebSocket error for {connection_id}: {message.data}") break except json.JSONDecodeError: await websocket.send_text(json.dumps({ "type": "error", "message": "Invalid JSON format" })) except Exception as e: app.logger.error(f"Error handling message for {connection_id}: {e}") await websocket.send_text(json.dumps({ "type": "error", "message": "Internal server error" })) except ConnectionError: app.logger.info(f"Connection {connection_id} lost unexpectedly") except Exception as e: app.logger.error(f"Unexpected error in websocket {connection_id}: {e}", exc_info=True) finally: # Always cleanup websocket_manager.remove_connection(connection_id) app.logger.info(f"Cleaned up connection {connection_id}") async def handle_text_message(websocket, data): """Handle text messages with error recovery""" try: message = json.loads(data) # Process message... except Exception as e: app.logger.error(f"Text message handling error: {e}") raise ``` ## Performance Considerations ### Connection Limits ```python MAX_CONNECTIONS = 1000 MAX_CONNECTIONS_PER_IP = 10 connection_counts = defaultdict(int) # ip -> count async def check_connection_limits(websocket): """Check if connection should be allowed""" client_ip = get_client_ip(websocket.scope) # Global limit if len(websocket_manager.connections) >= MAX_CONNECTIONS: return False, "Server at capacity" # Per-IP limit if connection_counts[client_ip] >= MAX_CONNECTIONS_PER_IP: return False, "Too many connections from this IP" return True, "OK" @app.websocket("/ws/limited") async def limited_websocket(websocket): allowed, reason = await check_connection_limits(websocket) if not allowed: await websocket.close(1008, reason) return client_ip = get_client_ip(websocket.scope) connection_counts[client_ip] += 1 try: await websocket.accept() # ... handle connection finally: connection_counts[client_ip] -= 1 ``` ### Memory Management ```python # Limit room sizes MAX_ROOM_SIZE = 100 def can_join_room(room_id: str) -> bool: """Check if room has space""" room_connections = websocket_manager.get_room_connections(room_id) return len(room_connections) < MAX_ROOM_SIZE # Message rate limiting import time from collections import defaultdict, deque message_rates = defaultdict(deque) # connection_id -> timestamps MAX_MESSAGES_PER_MINUTE = 60 def is_rate_limited(connection_id: str) -> bool: """Check if connection is sending too many messages""" now = time.time() minute_ago = now - 60 # Remove old timestamps timestamps = message_rates[connection_id] while timestamps and timestamps[0] < minute_ago: timestamps.popleft() # Check rate if len(timestamps) >= MAX_MESSAGES_PER_MINUTE: return True # Record this message timestamps.append(now) return False ``` ## Integration Examples ### Complete Chat Application ```python import uuid import json from datetime import datetime from collections import defaultdict # Global state websocket_manager = WebSocketManager() user_connections = {} # user_id -> connection_id active_rooms = defaultdict(set) # room_id -> user_ids @app.websocket("/ws/chat") async def chat_websocket(websocket): # Authenticate user = await authenticate_websocket(websocket) if not user: await websocket.close(1008, "Authentication required") return await websocket.accept() connection_id = str(uuid.uuid4()) user_connections[user.id] = connection_id websocket_manager.add_connection(connection_id, websocket) # User online notification await notify_user_status(user.id, "online") try: while True: message = await websocket.receive() if message.type == "text": data = json.loads(message.data) await handle_chat_command(user, connection_id, data) elif message.type == "close": break except ConnectionError: pass finally: # Cleanup await handle_user_disconnect(user.id, connection_id) async def handle_chat_command(user, connection_id, data): """Handle different chat commands""" command = data.get("command") if command == "join_room": room_id = data["room_id"] if can_join_room(room_id): websocket_manager.join_room(connection_id, room_id) active_rooms[room_id].add(user.id) await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "user_joined", "user": user.to_dict(), "timestamp": datetime.utcnow().isoformat() })) elif command == "send_message": room_id = data["room_id"] if user.id in active_rooms[room_id]: await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "message", "user": user.to_dict(), "message": data["message"], "timestamp": datetime.utcnow().isoformat() })) elif command == "leave_room": room_id = data["room_id"] websocket_manager.leave_room(connection_id, room_id) active_rooms[room_id].discard(user.id) await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "user_left", "user": user.to_dict(), "timestamp": datetime.utcnow().isoformat() })) async def handle_user_disconnect(user_id, connection_id): """Handle user disconnection""" # Remove from all rooms for room_id, users in active_rooms.items(): if user_id in users: users.discard(user_id) await websocket_manager.broadcast_to_room(room_id, json.dumps({ "type": "user_disconnected", "user_id": user_id, "timestamp": datetime.utcnow().isoformat() })) # Cleanup websocket_manager.remove_connection(connection_id) user_connections.pop(user_id, None) # User offline notification await notify_user_status(user_id, "offline") # REST API endpoints for chat management @app.get("/api/chat/rooms") async def list_rooms(request): """List active chat rooms""" rooms = [] for room_id, users in active_rooms.items(): if users: # Only include rooms with users rooms.append({ "room_id": room_id, "user_count": len(users), "users": list(users) }) return {"rooms": rooms} @app.post("/api/chat/broadcast") async def admin_broadcast(request): """Admin broadcast to all users""" data = await request.json() message = { "type": "admin_broadcast", "message": data["message"], "timestamp": datetime.utcnow().isoformat() } await websocket_manager.broadcast_to_all(json.dumps(message)) return { "status": "broadcasted", "recipient_count": len(websocket_manager.connections) } ``` The WebSocket system in Gobstopper provides a solid foundation for building real-time applications with efficient connection management, room-based broadcasting, and robust error handling.