WebSocket Support
Ultimo provides zero-dependency, RFC 6455 compliant WebSocket support with built-in pub/sub functionality.
Features
- ✅ Zero Dependencies - Built directly on hyper's upgrade mechanism
- ✅ Type-Safe - WebSocketHandler trait with typed context data
- ✅ Built-in Pub/Sub - Topic-based messaging with ChannelManager
- ✅ Production Ready - 93 comprehensive tests, fully compliant with RFC 6455
- ✅ Easy Integration - Seamless router integration with
app.websocket()
Quick Start
Basic WebSocket Handler
use ultimo::prelude::*;
use ultimo::websocket::{WebSocketHandler, WebSocket, Message};
use async_trait::async_trait;
#[derive(Clone)]
struct ChatHandler;
#[async_trait]
impl WebSocketHandler for ChatHandler {
type Data = ();
async fn on_open(&self, ws: &WebSocket<Self::Data>) {
println!("Client connected");
ws.send("Welcome to the chat!").await.ok();
}
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
match msg {
Message::Text(text) => {
println!("Received: {}", text);
ws.send(format!("Echo: {}", text)).await.ok();
}
Message::Close(_) => {
println!("Client disconnected");
}
_ => {}
}
}
}
#[tokio::main]
async fn main() {
let mut app = Ultimo::new();
// Add WebSocket route
app.websocket("/ws", ChatHandler);
app.listen("127.0.0.1:3000").await.unwrap();
}Client Connection
const ws = new WebSocket("ws://localhost:3000/ws");
ws.addEventListener("open", () => {
console.log("Connected!");
ws.send("Hello, server!");
});
ws.addEventListener("message", (event) => {
console.log("Received:", event.data);
});Typed Context Data
WebSocket connections can carry typed context data:
use serde::{Serialize, Deserialize};
#[derive(Clone, Serialize, Deserialize)]
struct UserContext {
user_id: String,
username: String,
}
#[derive(Clone)]
struct AuthHandler;
#[async_trait]
impl WebSocketHandler for AuthHandler {
type Data = UserContext;
async fn on_open(&self, ws: &WebSocket<Self::Data>) {
let user = ws.data();
println!("User {} connected", user.username);
ws.send(format!("Welcome, {}!", user.username)).await.ok();
}
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
let user = ws.data();
// Access user context in message handler
println!("Message from {}: {:?}", user.username, msg);
}
}Pub/Sub System
Built-in topic-based pub/sub for broadcasting messages:
#[derive(Clone)]
struct ChatRoomHandler;
#[async_trait]
impl WebSocketHandler for ChatRoomHandler {
type Data = String; // User ID
async fn on_open(&self, ws: &WebSocket<Self::Data>) {
let user_id = ws.data();
// Subscribe to chat room
ws.subscribe("chat:general").await.ok();
// Announce join
let msg = serde_json::json!({
"type": "join",
"user_id": user_id,
});
ws.publish("chat:general", &msg).await.ok();
}
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
if let Message::Text(text) = msg {
let user_id = ws.data();
// Broadcast to all subscribers
let msg = serde_json::json!({
"type": "message",
"user_id": user_id,
"text": text,
});
ws.publish("chat:general", &msg).await.ok();
}
}
async fn on_close(&self, ws: &WebSocket<Self::Data>, _code: u16, _reason: &str) {
let user_id = ws.data();
// Announce leave
let msg = serde_json::json!({
"type": "leave",
"user_id": user_id,
});
ws.publish("chat:general", &msg).await.ok();
// Cleanup
ws.unsubscribe("chat:general").await.ok();
}
}Lifecycle Callbacks
WebSocketHandler provides several lifecycle hooks:
#[async_trait]
impl WebSocketHandler for MyHandler {
type Data = ();
// Called when connection is established
async fn on_open(&self, ws: &WebSocket<Self::Data>) {
// Initialize connection
}
// Called when message is received
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
// Handle incoming messages
}
// Called when connection is closed
async fn on_close(&self, ws: &WebSocket<Self::Data>, code: u16, reason: &str) {
// Cleanup resources
}
}Message Types
WebSocket supports multiple message types:
use ultimo::websocket::Message;
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
match msg {
Message::Text(text) => {
println!("Text message: {}", text);
}
Message::Binary(data) => {
println!("Binary message: {} bytes", data.len());
}
Message::Ping(data) => {
// Pings are automatically handled
println!("Ping received");
}
Message::Pong(data) => {
println!("Pong received");
}
Message::Close(frame) => {
if let Some(f) = frame {
println!("Close: code={}, reason={}", f.code, f.reason);
}
}
}
}JSON Messages
Send and receive JSON messages easily:
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct ChatMessage {
user: String,
text: String,
timestamp: u64,
}
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
if let Message::Text(text) = msg {
// Parse JSON
if let Ok(chat_msg) = serde_json::from_str::<ChatMessage>(&text) {
println!("{} says: {}", chat_msg.user, chat_msg.text);
// Send JSON response
let response = ChatMessage {
user: "Server".to_string(),
text: "Message received".to_string(),
timestamp: now(),
};
ws.send_json(&response).await.ok();
}
}
}Examples
Check out the complete examples in the repository:
Simple Chat
A basic HTML/JS chat application demonstrating real-time messaging and pub/sub.
cargo run -p websocket-chatReact Chat
Modern React + TypeScript chat with shadcn/ui components, featuring:
- Type-safe WebSocket hook
- Real-time message broadcasting
- Connection status indicators
- Message history with auto-scroll
cargo run -p websocket-chat-reactArchitecture
Ultimo's WebSocket implementation:
- Frame Codec - RFC 6455 compliant frame parsing and encoding
- Connection - WebSocket connection state management
- Upgrade - HTTP to WebSocket upgrade mechanism
- Pub/Sub - ChannelManager for topic-based messaging
- Router - Optimized Radix Tree for efficient route matching
Testing
The WebSocket implementation includes 93 comprehensive tests:
- 21 unit tests (frame codec, pub/sub, upgrade)
- 9 integration tests (real connections, JSON, pub/sub)
- 12 property-based tests (thousands of random test cases)
- 5 router integration tests
- 14 error handling tests
- 11 concurrency tests
- 18 edge case tests
Performance
- Zero Dependencies - No tokio-tungstenite or similar libraries
- Efficient - O(L) router lookups with Radix Tree
- Memory - Minimal per-connection overhead
- Tested - 210 tests passing, production-ready
Coming in Phase 2
Future enhancements planned:
- Message fragmentation for large payloads
- Automatic ping/pong heartbeat
- Backpressure handling
- Per-message deflate compression (RFC 7692)
- Configuration system (timeouts, buffer sizes)
- Graceful shutdown support