This commit is contained in:
@@ -1,32 +1,33 @@
|
||||
use axum::extract::Path;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::{
|
||||
Extension,
|
||||
extract::{
|
||||
State,
|
||||
ws::{Message, WebSocket, WebSocketUpgrade},
|
||||
},
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use reqwest::StatusCode;
|
||||
use std::sync::Arc;
|
||||
use axum::{Extension, extract::{State, ws::{Message, WebSocket, WebSocketUpgrade}}};
|
||||
use tokio::sync::mpsc;
|
||||
use axum::extract::Path;
|
||||
use axum::response::IntoResponse;
|
||||
//use futures_util::stream::stream::StreamExt;
|
||||
use futures_util::{StreamExt, SinkExt};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
|
||||
use crate::utils::{auth::{AuthClaims, JwtKeys, auth_claims_from_token}, db_pool::{AppState, HotelPool}};
|
||||
use crate::utils::{
|
||||
auth::{AuthClaims, JwtKeys, auth_claims_from_token},
|
||||
db_pool::{AppState, HotelPool},
|
||||
};
|
||||
|
||||
|
||||
|
||||
/// Type alias: user_id → sender to that user
|
||||
/// Type alias: user_id → sender to that user
|
||||
pub type UserMap = DashMap<i32, mpsc::UnboundedSender<Message>>;
|
||||
/// hotel_id → users
|
||||
pub type HotelMap = DashMap<i32, Arc<UserMap>>;
|
||||
/// global map of all hotels
|
||||
pub type WsMap = Arc<HotelMap>;
|
||||
/// Type alias: user_id → sender to that user
|
||||
/// Type alias: user_id → sender to that user
|
||||
|
||||
|
||||
async fn handle_socket(
|
||||
mut socket: WebSocket,
|
||||
state: AppState,
|
||||
hotel_id: i32,
|
||||
user_id: i32,
|
||||
) {
|
||||
async fn handle_socket(mut socket: WebSocket, state: AppState, hotel_id: i32, user_id: i32) {
|
||||
// channel for sending messages TO this client
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
|
||||
|
||||
@@ -45,8 +46,6 @@ async fn handle_socket(
|
||||
// split socket into sender/receiver
|
||||
let (mut sender, mut receiver) = socket.split();
|
||||
|
||||
|
||||
|
||||
// task for sending messages from server to client
|
||||
let mut rx_task = tokio::spawn(async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
@@ -96,21 +95,17 @@ pub async fn ws_handler(
|
||||
State(state): State<AppState>,
|
||||
Path((req_token)): Path<(String)>,
|
||||
) -> impl IntoResponse {
|
||||
|
||||
|
||||
let token = req_token;
|
||||
|
||||
let claims = match auth_claims_from_token(&token, &keys) {
|
||||
|
||||
let claims = match auth_claims_from_token(&token, &keys) {
|
||||
Err(_) => {
|
||||
print!("error during auth claims processing");
|
||||
return StatusCode::UNAUTHORIZED.into_response();
|
||||
|
||||
}
|
||||
Ok(c) => c
|
||||
|
||||
Ok(c) => c,
|
||||
};
|
||||
|
||||
print!("{token}, web socket tried to connect", );
|
||||
|
||||
print!("{token}, web socket tried to connect",);
|
||||
|
||||
/*
|
||||
let claims = match auth_claims_from_token(&token, &keys) {
|
||||
@@ -119,7 +114,6 @@ pub async fn ws_handler(
|
||||
};
|
||||
*/
|
||||
|
||||
|
||||
ws.on_upgrade(move |socket| handle_socket(socket, state, claims.hotel_id, claims.user_id))
|
||||
}
|
||||
|
||||
@@ -132,4 +126,4 @@ fn print_ws_state(state: &AppState) {
|
||||
println!("Hotel {hotel_id}: users {:?}", users);
|
||||
}
|
||||
println!("--------------------------------");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user