WebSocket Support¶
Gobstopper provides first-class WebSocket support through the RSGI protocol, enabling real-time bidirectional communication between clients and servers. The WebSocket system includes connection management, room-based broadcasting, and efficient message handling.
Overview¶
The WebSocket system consists of:
WebSocket: Connection wrapper for RSGI protocol communication
WebSocketManager: Room-based connection management and broadcasting
RSGI Integration: Direct integration with Granian’s high-performance WebSocket implementation
Note: When a client attempts to open a WebSocket connection to a path with no matching route, the server now responds by initiating a close with code 1001 (Going Away).
WebSocket Class¶
The WebSocket class provides a high-level interface for WebSocket communication.
Connection Lifecycle¶
@app.websocket("/ws")
async def websocket_handler(websocket):
# 1. Accept the connection
await websocket.accept()
# 2. Handle messages
try:
while True:
message = await websocket.receive()
if message.type == "text":
await websocket.send_text(f"Echo: {message.data}")
elif message.type == "close":
break
except ConnectionError:
# Handle disconnection
pass
Methods¶
accept()¶
Accept the WebSocket connection and establish transport:
@app.websocket("/ws/connect")
async def connect_handler(websocket):
try:
await websocket.accept()
await websocket.send_text("Connected successfully!")
except ConnectionError:
# Connection failed
app.logger.error("Failed to accept WebSocket connection")
send_text(data: str)¶
Send text messages to the client:
@app.websocket("/ws/notifications")
async def notification_handler(websocket):
await websocket.accept()
# Send simple message
await websocket.send_text("Welcome to notifications!")
# Send JSON data
import json
notification = {
"type": "alert",
"message": "New message received",
"timestamp": datetime.utcnow().isoformat()
}
await websocket.send_text(json.dumps(notification))
# Send formatted message
user_id = get_user_id_from_connection(websocket)
await websocket.send_text(f"User {user_id} connected at {datetime.now()}")
send_bytes(data: bytes)¶
Send binary messages to the client:
@app.websocket("/ws/binary")
async def binary_handler(websocket):
await websocket.accept()
# Send binary data
image_data = await load_image_bytes("welcome.png")
await websocket.send_bytes(image_data)
# Send compressed data
import gzip
large_data = get_large_dataset()
compressed = gzip.compress(json.dumps(large_data).encode())
await websocket.send_bytes(compressed)
receive()¶
Receive messages from the client:
@app.websocket("/ws/chat")
async def chat_handler(websocket):
await websocket.accept()
while True:
message = await websocket.receive()
if message.type == "text":
# Handle text message
data = json.loads(message.data)
await process_chat_message(data)
elif message.type == "bytes":
# Handle binary message
await process_binary_data(message.data)
elif message.type == "close":
# Client disconnected
break
elif message.type == "error":
# Handle error
app.logger.error(f"WebSocket error: {message.data}")
break
WebSocket Patterns¶
Echo Server¶
@app.websocket("/ws/echo")
async def echo_server(websocket):
await websocket.accept()
try:
while True:
message = await websocket.receive()
if message.type == "text":
await websocket.send_text(f"Echo: {message.data}")
elif message.type == "bytes":
await websocket.send_bytes(b"Echo: " + message.data)
elif message.type == "close":
break
except ConnectionError:
app.logger.info("Echo client disconnected")
Real-time Updates¶
import asyncio
@app.websocket("/ws/live-data")
async def live_data_stream(websocket):
await websocket.accept()
try:
# Send periodic updates
while True:
data = await get_live_metrics()
update = {
"timestamp": datetime.utcnow().isoformat(),
"cpu_usage": data["cpu"],
"memory_usage": data["memory"],
"active_connections": data["connections"]
}
await websocket.send_text(json.dumps(update))
await asyncio.sleep(1) # Update every second
except ConnectionError:
app.logger.info("Live data client disconnected")
File Upload Progress¶
@app.websocket("/ws/upload-progress/<upload_id>")
async def upload_progress(websocket, upload_id):
await websocket.accept()
try:
while True:
# Check upload progress
progress = await get_upload_progress(upload_id)
if progress is None:
await websocket.send_text(json.dumps({
"error": "Upload not found"
}))
break
await websocket.send_text(json.dumps({
"upload_id": upload_id,
"progress": progress["percentage"],
"bytes_uploaded": progress["bytes_uploaded"],
"total_bytes": progress["total_bytes"],
"status": progress["status"]
}))
if progress["status"] in ["completed", "failed"]:
break
await asyncio.sleep(0.5)
except ConnectionError:
app.logger.info(f"Upload progress client disconnected: {upload_id}")
WebSocketManager¶
The WebSocketManager class provides room-based connection management for building chat systems, live collaboration tools, and broadcast features.
Basic Usage¶
from gobstopper.websocket.manager import WebSocketManager
# Create global manager instance
websocket_manager = WebSocketManager()
@app.websocket("/ws/chat")
async def chat_handler(websocket):
await websocket.accept()
# Generate unique connection ID
connection_id = str(uuid.uuid4())
# Add to manager
websocket_manager.add_connection(connection_id, websocket)
try:
while True:
message = await websocket.receive()
if message.type == "text":
data = json.loads(message.data)
if data["type"] == "join_room":
# Join a chat room
websocket_manager.join_room(connection_id, data["room"])
await websocket.send_text(json.dumps({
"type": "joined",
"room": data["room"]
}))
elif data["type"] == "chat_message":
# Broadcast to room
message_data = {
"type": "message",
"user": data["user"],
"message": data["message"],
"timestamp": datetime.utcnow().isoformat()
}
await websocket_manager.broadcast_to_room(
data["room"],
json.dumps(message_data)
)
elif message.type == "close":
break
except ConnectionError:
pass
finally:
# Cleanup
websocket_manager.remove_connection(connection_id)
Room Management¶
# Chat rooms with user management
user_connections = {} # user_id -> connection_id mapping
@app.websocket("/ws/rooms/<room_id>")
async def room_handler(websocket, room_id):
await websocket.accept()
# Authenticate user (example)
auth_token = websocket.scope.query_string.decode().split('token=')[1]
user = await authenticate_token(auth_token)
if not user:
await websocket.close(1008, "Authentication failed")
return
connection_id = str(uuid.uuid4())
user_connections[user.id] = connection_id
# Add to manager and join room
websocket_manager.add_connection(connection_id, websocket)
websocket_manager.join_room(connection_id, room_id)
# Notify others of new user
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "user_joined",
"user_id": user.id,
"username": user.username,
"timestamp": datetime.utcnow().isoformat()
}))
try:
async for message in receive_messages(websocket):
if message["type"] == "chat":
# Broadcast chat message
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "message",
"user_id": user.id,
"username": user.username,
"message": message["content"],
"timestamp": datetime.utcnow().isoformat()
}))
elif message["type"] == "typing":
# Broadcast typing indicator (exclude sender)
typing_msg = json.dumps({
"type": "typing",
"user_id": user.id,
"username": user.username,
"is_typing": message["is_typing"]
})
room_connections = websocket_manager.get_room_connections(room_id)
for conn_id in room_connections:
if conn_id != connection_id: # Don't send to sender
conn = websocket_manager.connections.get(conn_id)
if conn:
await conn.send_text(typing_msg)
except ConnectionError:
pass
finally:
# User left
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "user_left",
"user_id": user.id,
"username": user.username,
"timestamp": datetime.utcnow().isoformat()
}))
# Cleanup
websocket_manager.remove_connection(connection_id)
if user.id in user_connections:
del user_connections[user.id]
async def receive_messages(websocket):
"""Async generator for receiving messages"""
try:
while True:
message = await websocket.receive()
if message.type == "text":
yield json.loads(message.data)
elif message.type == "close":
break
except ConnectionError:
pass
Broadcasting Patterns¶
# Global announcements
@app.post("/api/broadcast")
async def send_global_announcement(request):
data = await request.json()
announcement = {
"type": "announcement",
"title": data["title"],
"message": data["message"],
"priority": data.get("priority", "normal"),
"timestamp": datetime.utcnow().isoformat()
}
await websocket_manager.broadcast_to_all(json.dumps(announcement))
return {"status": "broadcasted", "connections": len(websocket_manager.connections)}
# Room-specific notifications
@app.post("/api/notify-room")
async def notify_room(request):
data = await request.json()
notification = {
"type": "room_notification",
"message": data["message"],
"from_admin": True,
"timestamp": datetime.utcnow().isoformat()
}
await websocket_manager.broadcast_to_room(
data["room_id"],
json.dumps(notification)
)
room_size = len(websocket_manager.get_room_connections(data["room_id"]))
return {"status": "sent", "room_connections": room_size}
# User-to-user messaging
@app.post("/api/send-private-message")
async def send_private_message(request):
data = await request.json()
# Find recipient connection
recipient_connection_id = user_connections.get(data["recipient_id"])
if not recipient_connection_id:
return {"error": "Recipient not online"}, 404
websocket = websocket_manager.connections.get(recipient_connection_id)
if not websocket:
return {"error": "Recipient connection not found"}, 404
message = {
"type": "private_message",
"from_user_id": data["sender_id"],
"message": data["message"],
"timestamp": datetime.utcnow().isoformat()
}
try:
await websocket.send_text(json.dumps(message))
return {"status": "delivered"}
except ConnectionError:
return {"error": "Failed to deliver message"}, 500
Advanced WebSocket Features¶
Connection Authentication¶
async def authenticate_websocket(websocket):
"""Authenticate WebSocket connection before accepting"""
# Get token from query string or headers
query = websocket.scope.query_string.decode()
token = None
for param in query.split('&'):
if param.startswith('token='):
token = param.split('=')[1]
break
if not token:
# Check headers as fallback
for name, value in websocket.scope.headers:
if name.decode() == 'authorization':
auth_header = value.decode()
if auth_header.startswith('Bearer '):
token = auth_header[7:]
break
if not token:
return None
# Verify token
try:
user = await verify_jwt_token(token)
return user
except Exception:
return None
@app.websocket("/ws/authenticated")
async def authenticated_websocket(websocket):
# Authenticate before accepting
user = await authenticate_websocket(websocket)
if not user:
await websocket.close(1008, "Authentication required")
return
await websocket.accept()
# Add user info to websocket
websocket.user = user
# Continue with authenticated handler
connection_id = str(uuid.uuid4())
websocket_manager.add_connection(connection_id, websocket)
# ... rest of handler
Message Validation¶
from dataclasses import dataclass
from typing import Optional
@dataclass
class ChatMessage:
type: str
content: str
room: Optional[str] = None
recipient: Optional[str] = None
def validate_message(data: dict) -> Optional[ChatMessage]:
"""Validate incoming WebSocket messages"""
try:
msg_type = data.get("type")
if not msg_type or not isinstance(msg_type, str):
return None
content = data.get("content", "").strip()
if not content or len(content) > 1000: # Message length limit
return None
return ChatMessage(
type=msg_type,
content=content,
room=data.get("room"),
recipient=data.get("recipient")
)
except (KeyError, ValueError, TypeError):
return None
@app.websocket("/ws/validated-chat")
async def validated_chat_handler(websocket):
await websocket.accept()
connection_id = str(uuid.uuid4())
websocket_manager.add_connection(connection_id, websocket)
try:
while True:
raw_message = await websocket.receive()
if raw_message.type == "text":
try:
data = json.loads(raw_message.data)
message = validate_message(data)
if not message:
await websocket.send_text(json.dumps({
"type": "error",
"message": "Invalid message format"
}))
continue
# Process valid message
if message.type == "chat" and message.room:
websocket_manager.join_room(connection_id, message.room)
await websocket_manager.broadcast_to_room(
message.room,
json.dumps({
"type": "message",
"content": message.content,
"timestamp": datetime.utcnow().isoformat()
})
)
except json.JSONDecodeError:
await websocket.send_text(json.dumps({
"type": "error",
"message": "Invalid JSON"
}))
elif raw_message.type == "close":
break
except ConnectionError:
pass
finally:
websocket_manager.remove_connection(connection_id)
Connection Monitoring¶
import time
class ConnectionMonitor:
def __init__(self, websocket_manager):
self.websocket_manager = websocket_manager
self.connection_stats = {} # connection_id -> stats
def track_connection(self, connection_id: str):
"""Start tracking a connection"""
self.connection_stats[connection_id] = {
"connected_at": time.time(),
"messages_sent": 0,
"messages_received": 0,
"bytes_sent": 0,
"bytes_received": 0
}
def record_message(self, connection_id: str, sent: bool, message_size: int):
"""Record message statistics"""
if connection_id in self.connection_stats:
stats = self.connection_stats[connection_id]
if sent:
stats["messages_sent"] += 1
stats["bytes_sent"] += message_size
else:
stats["messages_received"] += 1
stats["bytes_received"] += message_size
def get_connection_stats(self, connection_id: str):
"""Get statistics for a connection"""
return self.connection_stats.get(connection_id, {})
def cleanup_connection(self, connection_id: str):
"""Clean up tracking data"""
self.connection_stats.pop(connection_id, None)
# Global monitor instance
monitor = ConnectionMonitor(websocket_manager)
@app.websocket("/ws/monitored")
async def monitored_websocket(websocket):
await websocket.accept()
connection_id = str(uuid.uuid4())
# Start monitoring
websocket_manager.add_connection(connection_id, websocket)
monitor.track_connection(connection_id)
try:
while True:
message = await websocket.receive()
if message.type == "text":
# Record received message
monitor.record_message(connection_id, False, len(message.data))
# Echo back
response = f"Received: {message.data}"
await websocket.send_text(response)
# Record sent message
monitor.record_message(connection_id, True, len(response))
elif message.type == "close":
break
except ConnectionError:
pass
finally:
websocket_manager.remove_connection(connection_id)
monitor.cleanup_connection(connection_id)
@app.get("/api/websocket-stats")
async def get_websocket_stats(request):
"""Get WebSocket connection statistics"""
stats = {
"total_connections": len(websocket_manager.connections),
"rooms": {
room: len(connections)
for room, connections in websocket_manager.rooms.items()
},
"connection_details": {}
}
for conn_id in websocket_manager.connections:
connection_stats = monitor.get_connection_stats(conn_id)
if connection_stats:
stats["connection_details"][conn_id] = connection_stats
return stats
Error Handling¶
Connection Error Handling¶
@app.websocket("/ws/robust")
async def robust_websocket_handler(websocket):
connection_id = str(uuid.uuid4())
try:
await websocket.accept()
websocket_manager.add_connection(connection_id, websocket)
# Send welcome message
await websocket.send_text(json.dumps({
"type": "welcome",
"connection_id": connection_id
}))
while True:
try:
message = await websocket.receive()
if message.type == "text":
await handle_text_message(websocket, message.data)
elif message.type == "bytes":
await handle_binary_message(websocket, message.data)
elif message.type == "close":
app.logger.info(f"Client {connection_id} disconnected normally")
break
elif message.type == "error":
app.logger.error(f"WebSocket error for {connection_id}: {message.data}")
break
except json.JSONDecodeError:
await websocket.send_text(json.dumps({
"type": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
app.logger.error(f"Error handling message for {connection_id}: {e}")
await websocket.send_text(json.dumps({
"type": "error",
"message": "Internal server error"
}))
except ConnectionError:
app.logger.info(f"Connection {connection_id} lost unexpectedly")
except Exception as e:
app.logger.error(f"Unexpected error in websocket {connection_id}: {e}", exc_info=True)
finally:
# Always cleanup
websocket_manager.remove_connection(connection_id)
app.logger.info(f"Cleaned up connection {connection_id}")
async def handle_text_message(websocket, data):
"""Handle text messages with error recovery"""
try:
message = json.loads(data)
# Process message...
except Exception as e:
app.logger.error(f"Text message handling error: {e}")
raise
Performance Considerations¶
Connection Limits¶
MAX_CONNECTIONS = 1000
MAX_CONNECTIONS_PER_IP = 10
connection_counts = defaultdict(int) # ip -> count
async def check_connection_limits(websocket):
"""Check if connection should be allowed"""
client_ip = get_client_ip(websocket.scope)
# Global limit
if len(websocket_manager.connections) >= MAX_CONNECTIONS:
return False, "Server at capacity"
# Per-IP limit
if connection_counts[client_ip] >= MAX_CONNECTIONS_PER_IP:
return False, "Too many connections from this IP"
return True, "OK"
@app.websocket("/ws/limited")
async def limited_websocket(websocket):
allowed, reason = await check_connection_limits(websocket)
if not allowed:
await websocket.close(1008, reason)
return
client_ip = get_client_ip(websocket.scope)
connection_counts[client_ip] += 1
try:
await websocket.accept()
# ... handle connection
finally:
connection_counts[client_ip] -= 1
Memory Management¶
# Limit room sizes
MAX_ROOM_SIZE = 100
def can_join_room(room_id: str) -> bool:
"""Check if room has space"""
room_connections = websocket_manager.get_room_connections(room_id)
return len(room_connections) < MAX_ROOM_SIZE
# Message rate limiting
import time
from collections import defaultdict, deque
message_rates = defaultdict(deque) # connection_id -> timestamps
MAX_MESSAGES_PER_MINUTE = 60
def is_rate_limited(connection_id: str) -> bool:
"""Check if connection is sending too many messages"""
now = time.time()
minute_ago = now - 60
# Remove old timestamps
timestamps = message_rates[connection_id]
while timestamps and timestamps[0] < minute_ago:
timestamps.popleft()
# Check rate
if len(timestamps) >= MAX_MESSAGES_PER_MINUTE:
return True
# Record this message
timestamps.append(now)
return False
Integration Examples¶
Complete Chat Application¶
import uuid
import json
from datetime import datetime
from collections import defaultdict
# Global state
websocket_manager = WebSocketManager()
user_connections = {} # user_id -> connection_id
active_rooms = defaultdict(set) # room_id -> user_ids
@app.websocket("/ws/chat")
async def chat_websocket(websocket):
# Authenticate
user = await authenticate_websocket(websocket)
if not user:
await websocket.close(1008, "Authentication required")
return
await websocket.accept()
connection_id = str(uuid.uuid4())
user_connections[user.id] = connection_id
websocket_manager.add_connection(connection_id, websocket)
# User online notification
await notify_user_status(user.id, "online")
try:
while True:
message = await websocket.receive()
if message.type == "text":
data = json.loads(message.data)
await handle_chat_command(user, connection_id, data)
elif message.type == "close":
break
except ConnectionError:
pass
finally:
# Cleanup
await handle_user_disconnect(user.id, connection_id)
async def handle_chat_command(user, connection_id, data):
"""Handle different chat commands"""
command = data.get("command")
if command == "join_room":
room_id = data["room_id"]
if can_join_room(room_id):
websocket_manager.join_room(connection_id, room_id)
active_rooms[room_id].add(user.id)
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "user_joined",
"user": user.to_dict(),
"timestamp": datetime.utcnow().isoformat()
}))
elif command == "send_message":
room_id = data["room_id"]
if user.id in active_rooms[room_id]:
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "message",
"user": user.to_dict(),
"message": data["message"],
"timestamp": datetime.utcnow().isoformat()
}))
elif command == "leave_room":
room_id = data["room_id"]
websocket_manager.leave_room(connection_id, room_id)
active_rooms[room_id].discard(user.id)
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "user_left",
"user": user.to_dict(),
"timestamp": datetime.utcnow().isoformat()
}))
async def handle_user_disconnect(user_id, connection_id):
"""Handle user disconnection"""
# Remove from all rooms
for room_id, users in active_rooms.items():
if user_id in users:
users.discard(user_id)
await websocket_manager.broadcast_to_room(room_id, json.dumps({
"type": "user_disconnected",
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat()
}))
# Cleanup
websocket_manager.remove_connection(connection_id)
user_connections.pop(user_id, None)
# User offline notification
await notify_user_status(user_id, "offline")
# REST API endpoints for chat management
@app.get("/api/chat/rooms")
async def list_rooms(request):
"""List active chat rooms"""
rooms = []
for room_id, users in active_rooms.items():
if users: # Only include rooms with users
rooms.append({
"room_id": room_id,
"user_count": len(users),
"users": list(users)
})
return {"rooms": rooms}
@app.post("/api/chat/broadcast")
async def admin_broadcast(request):
"""Admin broadcast to all users"""
data = await request.json()
message = {
"type": "admin_broadcast",
"message": data["message"],
"timestamp": datetime.utcnow().isoformat()
}
await websocket_manager.broadcast_to_all(json.dumps(message))
return {
"status": "broadcasted",
"recipient_count": len(websocket_manager.connections)
}
The WebSocket system in Gobstopper provides a solid foundation for building real-time applications with efficient connection management, room-based broadcasting, and robust error handling.