WebSocket Support

Gobstopper provides first-class WebSocket support through the RSGI protocol, enabling real-time bidirectional communication between clients and servers. The WebSocket system includes connection management, room-based broadcasting, and efficient message handling.

Overview

The WebSocket system consists of:

  • WebSocket: Connection wrapper for RSGI protocol communication

  • WebSocketManager: Room-based connection management and broadcasting

  • RSGI Integration: Direct integration with Granian’s high-performance WebSocket implementation

Note: When a client attempts to open a WebSocket connection to a path with no matching route, the server now responds by initiating a close with code 1001 (Going Away).

WebSocket Class

The WebSocket class provides a high-level interface for WebSocket communication.

Connection Lifecycle

@app.websocket("/ws")
async def websocket_handler(websocket):
    # 1. Accept the connection
    await websocket.accept()
    
    # 2. Handle messages
    try:
        while True:
            message = await websocket.receive()
            if message.type == "text":
                await websocket.send_text(f"Echo: {message.data}")
            elif message.type == "close":
                break
    except ConnectionError:
        # Handle disconnection
        pass

Methods

accept()

Accept the WebSocket connection and establish transport:

@app.websocket("/ws/connect")
async def connect_handler(websocket):
    try:
        await websocket.accept()
        await websocket.send_text("Connected successfully!")
    except ConnectionError:
        # Connection failed
        app.logger.error("Failed to accept WebSocket connection")

send_text(data: str)

Send text messages to the client:

@app.websocket("/ws/notifications")
async def notification_handler(websocket):
    await websocket.accept()
    
    # Send simple message
    await websocket.send_text("Welcome to notifications!")
    
    # Send JSON data
    import json
    notification = {
        "type": "alert",
        "message": "New message received",
        "timestamp": datetime.utcnow().isoformat()
    }
    await websocket.send_text(json.dumps(notification))
    
    # Send formatted message
    user_id = get_user_id_from_connection(websocket)
    await websocket.send_text(f"User {user_id} connected at {datetime.now()}")

send_bytes(data: bytes)

Send binary messages to the client:

@app.websocket("/ws/binary")
async def binary_handler(websocket):
    await websocket.accept()
    
    # Send binary data
    image_data = await load_image_bytes("welcome.png")
    await websocket.send_bytes(image_data)
    
    # Send compressed data
    import gzip
    large_data = get_large_dataset()
    compressed = gzip.compress(json.dumps(large_data).encode())
    await websocket.send_bytes(compressed)

receive()

Receive messages from the client:

@app.websocket("/ws/chat")
async def chat_handler(websocket):
    await websocket.accept()
    
    while True:
        message = await websocket.receive()
        
        if message.type == "text":
            # Handle text message
            data = json.loads(message.data)
            await process_chat_message(data)
            
        elif message.type == "bytes":
            # Handle binary message
            await process_binary_data(message.data)
            
        elif message.type == "close":
            # Client disconnected
            break
            
        elif message.type == "error":
            # Handle error
            app.logger.error(f"WebSocket error: {message.data}")
            break

WebSocket Patterns

Echo Server

@app.websocket("/ws/echo")
async def echo_server(websocket):
    await websocket.accept()
    
    try:
        while True:
            message = await websocket.receive()
            if message.type == "text":
                await websocket.send_text(f"Echo: {message.data}")
            elif message.type == "bytes":
                await websocket.send_bytes(b"Echo: " + message.data)
            elif message.type == "close":
                break
    except ConnectionError:
        app.logger.info("Echo client disconnected")

Real-time Updates

import asyncio

@app.websocket("/ws/live-data")
async def live_data_stream(websocket):
    await websocket.accept()
    
    try:
        # Send periodic updates
        while True:
            data = await get_live_metrics()
            update = {
                "timestamp": datetime.utcnow().isoformat(),
                "cpu_usage": data["cpu"],
                "memory_usage": data["memory"],
                "active_connections": data["connections"]
            }
            await websocket.send_text(json.dumps(update))
            await asyncio.sleep(1)  # Update every second
            
    except ConnectionError:
        app.logger.info("Live data client disconnected")

File Upload Progress

@app.websocket("/ws/upload-progress/<upload_id>")
async def upload_progress(websocket, upload_id):
    await websocket.accept()
    
    try:
        while True:
            # Check upload progress
            progress = await get_upload_progress(upload_id)
            
            if progress is None:
                await websocket.send_text(json.dumps({
                    "error": "Upload not found"
                }))
                break
                
            await websocket.send_text(json.dumps({
                "upload_id": upload_id,
                "progress": progress["percentage"],
                "bytes_uploaded": progress["bytes_uploaded"],
                "total_bytes": progress["total_bytes"],
                "status": progress["status"]
            }))
            
            if progress["status"] in ["completed", "failed"]:
                break
                
            await asyncio.sleep(0.5)
            
    except ConnectionError:
        app.logger.info(f"Upload progress client disconnected: {upload_id}")

WebSocketManager

The WebSocketManager class provides room-based connection management for building chat systems, live collaboration tools, and broadcast features.

Basic Usage

from gobstopper.websocket.manager import WebSocketManager

# Create global manager instance
websocket_manager = WebSocketManager()

@app.websocket("/ws/chat")
async def chat_handler(websocket):
    await websocket.accept()
    
    # Generate unique connection ID
    connection_id = str(uuid.uuid4())
    
    # Add to manager
    websocket_manager.add_connection(connection_id, websocket)
    
    try:
        while True:
            message = await websocket.receive()
            if message.type == "text":
                data = json.loads(message.data)
                
                if data["type"] == "join_room":
                    # Join a chat room
                    websocket_manager.join_room(connection_id, data["room"])
                    await websocket.send_text(json.dumps({
                        "type": "joined",
                        "room": data["room"]
                    }))
                    
                elif data["type"] == "chat_message":
                    # Broadcast to room
                    message_data = {
                        "type": "message",
                        "user": data["user"],
                        "message": data["message"],
                        "timestamp": datetime.utcnow().isoformat()
                    }
                    await websocket_manager.broadcast_to_room(
                        data["room"], 
                        json.dumps(message_data)
                    )
                    
            elif message.type == "close":
                break
                
    except ConnectionError:
        pass
    finally:
        # Cleanup
        websocket_manager.remove_connection(connection_id)

Room Management

# Chat rooms with user management
user_connections = {}  # user_id -> connection_id mapping

@app.websocket("/ws/rooms/<room_id>")
async def room_handler(websocket, room_id):
    await websocket.accept()
    
    # Authenticate user (example)
    auth_token = websocket.scope.query_string.decode().split('token=')[1]
    user = await authenticate_token(auth_token)
    if not user:
        await websocket.close(1008, "Authentication failed")
        return
    
    connection_id = str(uuid.uuid4())
    user_connections[user.id] = connection_id
    
    # Add to manager and join room
    websocket_manager.add_connection(connection_id, websocket)
    websocket_manager.join_room(connection_id, room_id)
    
    # Notify others of new user
    await websocket_manager.broadcast_to_room(room_id, json.dumps({
        "type": "user_joined",
        "user_id": user.id,
        "username": user.username,
        "timestamp": datetime.utcnow().isoformat()
    }))
    
    try:
        async for message in receive_messages(websocket):
            if message["type"] == "chat":
                # Broadcast chat message
                await websocket_manager.broadcast_to_room(room_id, json.dumps({
                    "type": "message",
                    "user_id": user.id,
                    "username": user.username,
                    "message": message["content"],
                    "timestamp": datetime.utcnow().isoformat()
                }))
                
            elif message["type"] == "typing":
                # Broadcast typing indicator (exclude sender)
                typing_msg = json.dumps({
                    "type": "typing",
                    "user_id": user.id,
                    "username": user.username,
                    "is_typing": message["is_typing"]
                })
                
                room_connections = websocket_manager.get_room_connections(room_id)
                for conn_id in room_connections:
                    if conn_id != connection_id:  # Don't send to sender
                        conn = websocket_manager.connections.get(conn_id)
                        if conn:
                            await conn.send_text(typing_msg)
                            
    except ConnectionError:
        pass
    finally:
        # User left
        await websocket_manager.broadcast_to_room(room_id, json.dumps({
            "type": "user_left",
            "user_id": user.id,
            "username": user.username,
            "timestamp": datetime.utcnow().isoformat()
        }))
        
        # Cleanup
        websocket_manager.remove_connection(connection_id)
        if user.id in user_connections:
            del user_connections[user.id]

async def receive_messages(websocket):
    """Async generator for receiving messages"""
    try:
        while True:
            message = await websocket.receive()
            if message.type == "text":
                yield json.loads(message.data)
            elif message.type == "close":
                break
    except ConnectionError:
        pass

Broadcasting Patterns

# Global announcements
@app.post("/api/broadcast")
async def send_global_announcement(request):
    data = await request.json()
    
    announcement = {
        "type": "announcement",
        "title": data["title"],
        "message": data["message"],
        "priority": data.get("priority", "normal"),
        "timestamp": datetime.utcnow().isoformat()
    }
    
    await websocket_manager.broadcast_to_all(json.dumps(announcement))
    
    return {"status": "broadcasted", "connections": len(websocket_manager.connections)}

# Room-specific notifications
@app.post("/api/notify-room")
async def notify_room(request):
    data = await request.json()
    
    notification = {
        "type": "room_notification",
        "message": data["message"],
        "from_admin": True,
        "timestamp": datetime.utcnow().isoformat()
    }
    
    await websocket_manager.broadcast_to_room(
        data["room_id"], 
        json.dumps(notification)
    )
    
    room_size = len(websocket_manager.get_room_connections(data["room_id"]))
    return {"status": "sent", "room_connections": room_size}

# User-to-user messaging
@app.post("/api/send-private-message")
async def send_private_message(request):
    data = await request.json()
    
    # Find recipient connection
    recipient_connection_id = user_connections.get(data["recipient_id"])
    if not recipient_connection_id:
        return {"error": "Recipient not online"}, 404
    
    websocket = websocket_manager.connections.get(recipient_connection_id)
    if not websocket:
        return {"error": "Recipient connection not found"}, 404
    
    message = {
        "type": "private_message",
        "from_user_id": data["sender_id"],
        "message": data["message"],
        "timestamp": datetime.utcnow().isoformat()
    }
    
    try:
        await websocket.send_text(json.dumps(message))
        return {"status": "delivered"}
    except ConnectionError:
        return {"error": "Failed to deliver message"}, 500

Advanced WebSocket Features

Connection Authentication

async def authenticate_websocket(websocket):
    """Authenticate WebSocket connection before accepting"""
    # Get token from query string or headers
    query = websocket.scope.query_string.decode()
    token = None
    
    for param in query.split('&'):
        if param.startswith('token='):
            token = param.split('=')[1]
            break
    
    if not token:
        # Check headers as fallback
        for name, value in websocket.scope.headers:
            if name.decode() == 'authorization':
                auth_header = value.decode()
                if auth_header.startswith('Bearer '):
                    token = auth_header[7:]
                break
    
    if not token:
        return None
    
    # Verify token
    try:
        user = await verify_jwt_token(token)
        return user
    except Exception:
        return None

@app.websocket("/ws/authenticated")
async def authenticated_websocket(websocket):
    # Authenticate before accepting
    user = await authenticate_websocket(websocket)
    if not user:
        await websocket.close(1008, "Authentication required")
        return
    
    await websocket.accept()
    
    # Add user info to websocket
    websocket.user = user
    
    # Continue with authenticated handler
    connection_id = str(uuid.uuid4())
    websocket_manager.add_connection(connection_id, websocket)
    
    # ... rest of handler

Message Validation

from dataclasses import dataclass
from typing import Optional

@dataclass
class ChatMessage:
    type: str
    content: str
    room: Optional[str] = None
    recipient: Optional[str] = None

def validate_message(data: dict) -> Optional[ChatMessage]:
    """Validate incoming WebSocket messages"""
    try:
        msg_type = data.get("type")
        if not msg_type or not isinstance(msg_type, str):
            return None
        
        content = data.get("content", "").strip()
        if not content or len(content) > 1000:  # Message length limit
            return None
        
        return ChatMessage(
            type=msg_type,
            content=content,
            room=data.get("room"),
            recipient=data.get("recipient")
        )
    except (KeyError, ValueError, TypeError):
        return None

@app.websocket("/ws/validated-chat")
async def validated_chat_handler(websocket):
    await websocket.accept()
    connection_id = str(uuid.uuid4())
    websocket_manager.add_connection(connection_id, websocket)
    
    try:
        while True:
            raw_message = await websocket.receive()
            if raw_message.type == "text":
                try:
                    data = json.loads(raw_message.data)
                    message = validate_message(data)
                    
                    if not message:
                        await websocket.send_text(json.dumps({
                            "type": "error",
                            "message": "Invalid message format"
                        }))
                        continue
                    
                    # Process valid message
                    if message.type == "chat" and message.room:
                        websocket_manager.join_room(connection_id, message.room)
                        await websocket_manager.broadcast_to_room(
                            message.room,
                            json.dumps({
                                "type": "message",
                                "content": message.content,
                                "timestamp": datetime.utcnow().isoformat()
                            })
                        )
                        
                except json.JSONDecodeError:
                    await websocket.send_text(json.dumps({
                        "type": "error",
                        "message": "Invalid JSON"
                    }))
                    
            elif raw_message.type == "close":
                break
                
    except ConnectionError:
        pass
    finally:
        websocket_manager.remove_connection(connection_id)

Connection Monitoring

import time

class ConnectionMonitor:
    def __init__(self, websocket_manager):
        self.websocket_manager = websocket_manager
        self.connection_stats = {}  # connection_id -> stats
    
    def track_connection(self, connection_id: str):
        """Start tracking a connection"""
        self.connection_stats[connection_id] = {
            "connected_at": time.time(),
            "messages_sent": 0,
            "messages_received": 0,
            "bytes_sent": 0,
            "bytes_received": 0
        }
    
    def record_message(self, connection_id: str, sent: bool, message_size: int):
        """Record message statistics"""
        if connection_id in self.connection_stats:
            stats = self.connection_stats[connection_id]
            if sent:
                stats["messages_sent"] += 1
                stats["bytes_sent"] += message_size
            else:
                stats["messages_received"] += 1
                stats["bytes_received"] += message_size
    
    def get_connection_stats(self, connection_id: str):
        """Get statistics for a connection"""
        return self.connection_stats.get(connection_id, {})
    
    def cleanup_connection(self, connection_id: str):
        """Clean up tracking data"""
        self.connection_stats.pop(connection_id, None)

# Global monitor instance
monitor = ConnectionMonitor(websocket_manager)

@app.websocket("/ws/monitored")
async def monitored_websocket(websocket):
    await websocket.accept()
    connection_id = str(uuid.uuid4())
    
    # Start monitoring
    websocket_manager.add_connection(connection_id, websocket)
    monitor.track_connection(connection_id)
    
    try:
        while True:
            message = await websocket.receive()
            if message.type == "text":
                # Record received message
                monitor.record_message(connection_id, False, len(message.data))
                
                # Echo back
                response = f"Received: {message.data}"
                await websocket.send_text(response)
                
                # Record sent message
                monitor.record_message(connection_id, True, len(response))
                
            elif message.type == "close":
                break
                
    except ConnectionError:
        pass
    finally:
        websocket_manager.remove_connection(connection_id)
        monitor.cleanup_connection(connection_id)

@app.get("/api/websocket-stats")
async def get_websocket_stats(request):
    """Get WebSocket connection statistics"""
    stats = {
        "total_connections": len(websocket_manager.connections),
        "rooms": {
            room: len(connections) 
            for room, connections in websocket_manager.rooms.items()
        },
        "connection_details": {}
    }
    
    for conn_id in websocket_manager.connections:
        connection_stats = monitor.get_connection_stats(conn_id)
        if connection_stats:
            stats["connection_details"][conn_id] = connection_stats
    
    return stats

Error Handling

Connection Error Handling

@app.websocket("/ws/robust")
async def robust_websocket_handler(websocket):
    connection_id = str(uuid.uuid4())
    
    try:
        await websocket.accept()
        websocket_manager.add_connection(connection_id, websocket)
        
        # Send welcome message
        await websocket.send_text(json.dumps({
            "type": "welcome",
            "connection_id": connection_id
        }))
        
        while True:
            try:
                message = await websocket.receive()
                
                if message.type == "text":
                    await handle_text_message(websocket, message.data)
                elif message.type == "bytes":
                    await handle_binary_message(websocket, message.data)
                elif message.type == "close":
                    app.logger.info(f"Client {connection_id} disconnected normally")
                    break
                elif message.type == "error":
                    app.logger.error(f"WebSocket error for {connection_id}: {message.data}")
                    break
                    
            except json.JSONDecodeError:
                await websocket.send_text(json.dumps({
                    "type": "error",
                    "message": "Invalid JSON format"
                }))
            except Exception as e:
                app.logger.error(f"Error handling message for {connection_id}: {e}")
                await websocket.send_text(json.dumps({
                    "type": "error", 
                    "message": "Internal server error"
                }))
                
    except ConnectionError:
        app.logger.info(f"Connection {connection_id} lost unexpectedly")
    except Exception as e:
        app.logger.error(f"Unexpected error in websocket {connection_id}: {e}", exc_info=True)
    finally:
        # Always cleanup
        websocket_manager.remove_connection(connection_id)
        app.logger.info(f"Cleaned up connection {connection_id}")

async def handle_text_message(websocket, data):
    """Handle text messages with error recovery"""
    try:
        message = json.loads(data)
        # Process message...
    except Exception as e:
        app.logger.error(f"Text message handling error: {e}")
        raise

Performance Considerations

Connection Limits

MAX_CONNECTIONS = 1000
MAX_CONNECTIONS_PER_IP = 10

connection_counts = defaultdict(int)  # ip -> count

async def check_connection_limits(websocket):
    """Check if connection should be allowed"""
    client_ip = get_client_ip(websocket.scope)
    
    # Global limit
    if len(websocket_manager.connections) >= MAX_CONNECTIONS:
        return False, "Server at capacity"
    
    # Per-IP limit
    if connection_counts[client_ip] >= MAX_CONNECTIONS_PER_IP:
        return False, "Too many connections from this IP"
    
    return True, "OK"

@app.websocket("/ws/limited")
async def limited_websocket(websocket):
    allowed, reason = await check_connection_limits(websocket)
    
    if not allowed:
        await websocket.close(1008, reason)
        return
    
    client_ip = get_client_ip(websocket.scope)
    connection_counts[client_ip] += 1
    
    try:
        await websocket.accept()
        # ... handle connection
    finally:
        connection_counts[client_ip] -= 1

Memory Management

# Limit room sizes
MAX_ROOM_SIZE = 100

def can_join_room(room_id: str) -> bool:
    """Check if room has space"""
    room_connections = websocket_manager.get_room_connections(room_id)
    return len(room_connections) < MAX_ROOM_SIZE

# Message rate limiting
import time
from collections import defaultdict, deque

message_rates = defaultdict(deque)  # connection_id -> timestamps
MAX_MESSAGES_PER_MINUTE = 60

def is_rate_limited(connection_id: str) -> bool:
    """Check if connection is sending too many messages"""
    now = time.time()
    minute_ago = now - 60
    
    # Remove old timestamps
    timestamps = message_rates[connection_id]
    while timestamps and timestamps[0] < minute_ago:
        timestamps.popleft()
    
    # Check rate
    if len(timestamps) >= MAX_MESSAGES_PER_MINUTE:
        return True
    
    # Record this message
    timestamps.append(now)
    return False

Integration Examples

Complete Chat Application

import uuid
import json
from datetime import datetime
from collections import defaultdict

# Global state
websocket_manager = WebSocketManager()
user_connections = {}  # user_id -> connection_id
active_rooms = defaultdict(set)  # room_id -> user_ids

@app.websocket("/ws/chat")
async def chat_websocket(websocket):
    # Authenticate
    user = await authenticate_websocket(websocket)
    if not user:
        await websocket.close(1008, "Authentication required")
        return
    
    await websocket.accept()
    
    connection_id = str(uuid.uuid4())
    user_connections[user.id] = connection_id
    websocket_manager.add_connection(connection_id, websocket)
    
    # User online notification
    await notify_user_status(user.id, "online")
    
    try:
        while True:
            message = await websocket.receive()
            if message.type == "text":
                data = json.loads(message.data)
                await handle_chat_command(user, connection_id, data)
            elif message.type == "close":
                break
                
    except ConnectionError:
        pass
    finally:
        # Cleanup
        await handle_user_disconnect(user.id, connection_id)

async def handle_chat_command(user, connection_id, data):
    """Handle different chat commands"""
    command = data.get("command")
    
    if command == "join_room":
        room_id = data["room_id"]
        if can_join_room(room_id):
            websocket_manager.join_room(connection_id, room_id)
            active_rooms[room_id].add(user.id)
            
            await websocket_manager.broadcast_to_room(room_id, json.dumps({
                "type": "user_joined",
                "user": user.to_dict(),
                "timestamp": datetime.utcnow().isoformat()
            }))
        
    elif command == "send_message":
        room_id = data["room_id"]
        if user.id in active_rooms[room_id]:
            await websocket_manager.broadcast_to_room(room_id, json.dumps({
                "type": "message",
                "user": user.to_dict(),
                "message": data["message"],
                "timestamp": datetime.utcnow().isoformat()
            }))
    
    elif command == "leave_room":
        room_id = data["room_id"]
        websocket_manager.leave_room(connection_id, room_id)
        active_rooms[room_id].discard(user.id)
        
        await websocket_manager.broadcast_to_room(room_id, json.dumps({
            "type": "user_left",
            "user": user.to_dict(),
            "timestamp": datetime.utcnow().isoformat()
        }))

async def handle_user_disconnect(user_id, connection_id):
    """Handle user disconnection"""
    # Remove from all rooms
    for room_id, users in active_rooms.items():
        if user_id in users:
            users.discard(user_id)
            await websocket_manager.broadcast_to_room(room_id, json.dumps({
                "type": "user_disconnected",
                "user_id": user_id,
                "timestamp": datetime.utcnow().isoformat()
            }))
    
    # Cleanup
    websocket_manager.remove_connection(connection_id)
    user_connections.pop(user_id, None)
    
    # User offline notification
    await notify_user_status(user_id, "offline")

# REST API endpoints for chat management
@app.get("/api/chat/rooms")
async def list_rooms(request):
    """List active chat rooms"""
    rooms = []
    for room_id, users in active_rooms.items():
        if users:  # Only include rooms with users
            rooms.append({
                "room_id": room_id,
                "user_count": len(users),
                "users": list(users)
            })
    return {"rooms": rooms}

@app.post("/api/chat/broadcast")
async def admin_broadcast(request):
    """Admin broadcast to all users"""
    data = await request.json()
    
    message = {
        "type": "admin_broadcast",
        "message": data["message"],
        "timestamp": datetime.utcnow().isoformat()
    }
    
    await websocket_manager.broadcast_to_all(json.dumps(message))
    
    return {
        "status": "broadcasted",
        "recipient_count": len(websocket_manager.connections)
    }

The WebSocket system in Gobstopper provides a solid foundation for building real-time applications with efficient connection management, room-based broadcasting, and robust error handling.