Build a Personal Chat Application Using Rust and Axum with WebSocket
Build a real-time personal chat application using Rust and the Axum framework. Enable seamless, bidirectional communication between users, allowing them to send and receive messages instantly.
Required Dependencies
[dependencies]
async-trait = "0.1.85"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header"] }
futures = "0.3.31"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
headers = "0.4.0"
tokio = { version = "1.43.0", features = ["full"] }
tokio-tungstenite = "0.26.1"
tower-http = { version = "0.6.2", features = ["fs", "trace"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
Define the Query Parameters
To allow users to specify who they are and who they want to chat with, Define a struct to represent the query parameters for the WebSocket connection. This struct will be used to extract query parameters from the WebSocket connection URL, such as /ws?user_id=alice&recipient_id=bob.
#[derive(Debug, Deserialize)]
struct ChatParam {
user_id: String,
recipient_id: String,
}
Set Up Shared State
To manage active WebSocket connections, create a shared state using Arc (atomic reference-counted pointer) and Mutex (mutual exclusion). This ensures thread-safe access to the connections map
struct AppState {
connections: Mutex<HashMap<String, broadcast::Sender<String>>>,
}
impl AppState {
fn new() -> Arc<Self> {
Arc::new(Self {
connections: Mutex::new(HashMap::new()),
})
}
}
Handle WebSocket Connections
Next, implement the WebSocket handler. This function extracts the WebSocket connection, client address, query parameters, and shared state. It then upgrades the HTTP connection to a WebSocket connection and passes control to the handle_message function.
ws.on_upgrade: Upgrades the HTTP connection to a WebSocket connection.handle_message: Manages the WebSocket connection after the upgrade.
async fn ws_handle(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Query(params): Query<ChatParam>,
state: State<Arc<AppState>>,
) -> impl IntoResponse {
let state = Arc::clone(&state.0);
let user_id = params.user_id;
let recipient_id = params.recipient_id;
ws.on_upgrade(move |socket| handle_message(socket, addr, user_id, recipient_id, state))
}
Manage WebSocket Messages
The handle_message function is responsible for managing the WebSocket connection. It splits the WebSocket into a sender and receiver, creates a broadcast channel for the recipient, and spawns tasks to handle incoming and outgoing messages.
async fn handle_message(
socket: WebSocket,
who: SocketAddr,
user_id: String,
recipient_id: String,
state: Arc<AppState>,
) {
let (mut sender, mut receiver) = socket.split();
let (private_tx, _) = broadcast::channel(16);
{
let mut private_connection = state.connections.lock().unwrap();
private_connection.insert(recipient_id.clone(), private_tx.clone());
}
let mut rx = private_tx.subscribe();
if sender
.send(Message::Ping(vec![1, 2, 3].into()))
.await
.is_err()
{
return;
}
let send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if sender.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
});
let state_clone = Arc::clone(&state);
let recipient_id_clone = recipient_id.clone();
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
if receive_message(msg, &user_id, &recipient_id_clone, &state_clone).is_break() {
break;
}
}
});
tokio::select! {
_ = send_task =>{},
_ = recv_task =>{},
}
{
let mut private_connection = state.connections.lock().unwrap();
private_connection.remove(&recipient_id);
}
}
Process Incoming Messages
The receive_message function processes incoming WebSocket messages. If the message is text, it forwards the message to the recipient using the broadcast::Sender. If the message is a close frame, it breaks the loop and closes the connection.
fn receive_message(
msg: Message,
user_id: &str,
recipient_id: &str,
state: &Arc<AppState>,
) -> std::ops::ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
let private_connection = state.connections.lock().unwrap();
if let Some(tx) = private_connection.get(user_id) {
let _ = tx.send(format!("{} : {}", user_id, t));
}
}
Message::Close(_) => {
return std::ops::ControlFlow::Break(());
}
_ => {}
}
std::ops::ControlFlow::Continue(())
}
Run the Server
Finally, we’ll set up the Axum server and start listening for WebSocket connections.
#[tokio::main]
async fn main() {
let assets_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("assets");
let state = AppState::new();
let app: Router = Router::new()
.fallback_service(ServeDir::new(assets_dir).append_index_html_on_directories(true))
.route(
"/ws",
get(ws_handle).with_state(state).layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
),
);
let listener = TcpListener::bind("127.0.0.1:3000").await.unwrap();
tracing::debug!("listening on {}", listener.local_addr().unwrap());
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
}
Testing the Chat Application Using websocat
To ensure everything works as expected. For this, we’ll use websocat, a command-line WebSocket client that allows you to connect to WebSocket servers, send messages, and receive responses.
- Install
websocat:
cargo install websocat
- Start the Server
cargo run
-
Connect as a User Open two terminal windows to simulate two users: one for Alice and one for Bob.
-
In the first terminal
websocat "ws://127.0.0.1:3000/ws?user_id=alice&recipient_id=bob"
- In the second terminal,
websocat "ws://127.0.0.1:3000/ws?user_id=bob&recipient_id=alice"
- Send and Receive Messages
In the Alice terminal, type a message and press Enter:
Hello, Bob!
In the Bob terminal, you should see the message:
alice: Hello, Bob!
Thanks for reading! If you found this article helpful, feel free to share it with others who might benefit.