use axum::{ extract::{ FromRequest,FromRequestParts, State, }, http::StatusCode, response::{sse::KeepAlive, IntoResponse}, }; //use axum::extract::ws::Message; use chrono::NaiveDateTime; use rusqlite::params; use serde::Serialize; use serde_json::json; use crate::chat::extractor::{ AddUserConversationPayload, CreateConversationPayload, GetMessagesPayload, SendMessagePayload }; use crate::utils::db_pool::{AppState}; use crate::utils::auth::AuthClaims; pub async fn create_conversation( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, CreateConversationPayload(payload): CreateConversationPayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let result = conn.execute( "INSERT INTO conversation (creator_id, title) VALUES (?1, ?2)", params![&user_id, &payload.title], ); match result { Ok(rows) if rows > 0 => (StatusCode::OK, format!("Created conversation {}", payload.title)), Ok(_) => (StatusCode::NOT_FOUND, "not able to create the conversation".to_string() ), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Error when creating the conversation : {err}")), } } pub async fn add_user_to_conv( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, AddUserConversationPayload(payload):AddUserConversationPayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let mut statement = match conn.prepare( "SELECT 1 FROM conversation WHERE creator_id = ?1 AND id = ?2" , ){ Ok(statement) => statement, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "prepare failed".to_string()) }; match statement.exists(params![user_id, payload.conv_id]) { Ok(true) => { //user is creator } Ok(false) => { //user is not the creator return (StatusCode::FORBIDDEN, "Not the creato of the conversation".to_string()) } Err(_) => { return(StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string()) } } for target_id in &payload.users { let rows_inserted = match conn.execute( "INSERT INTO conversation_participants (conversation_id, user_id) VALUES (?1, ?2)", params![payload.conv_id, target_id], ) { Ok(n) => n, Err(err) => { return (StatusCode::INTERNAL_SERVER_ERROR, format!("Err adding user {}: {}", target_id, err)); } }; if rows_inserted == 0 { return (StatusCode::NOT_FOUND, format!("Could not add user {}", target_id)); } } return (StatusCode::OK, "ok".to_string()); } pub async fn send_message( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, SendMessagePayload(payload):SendMessagePayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let mut statement = match conn.prepare( "SELECT 1 FROM conversation_participants WHERE user_id = ?1 AND conversation_id = ?2" , ){ Ok(statement) => statement, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "prepare failed".to_string()) }; match statement.exists(params![user_id, payload.conv_id]) { Ok(true) => { // user is part of the conversation — continue } Ok(false) => { // early exit: not part of the conversation return (StatusCode::FORBIDDEN, "Not part of the conversation".to_string()); } Err(_) => { // query failed return (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string()); } } let result = conn.execute( "INSERT INTO message (sender_id, content, conversation_id) VALUES (?1, ?2, ?3)", params![&user_id, &payload.message, &payload.conv_id], ); match result { Ok(rows) if rows > 0 => { // --- send to conversation participants --- let mut stmt_participants = conn .prepare("SELECT user_id FROM conversation_participants WHERE conversation_id = ?1") .expect("prepare participants failed"); let participant_ids: Vec = stmt_participants .query_map(params![payload.conv_id], |row| row.get(0)) .expect("query_map failed") .filter_map(Result::ok) .collect(); if let Some(hotel_users) = state.ws_map.get(&hotel_id) { let update_msg = serde_json::json!({ "conv_id": payload.conv_id, "sender": user_id, "content": payload.message, }) .to_string(); for uid in &participant_ids { if let Some(sender) = hotel_users.get(&uid) { let _ = sender.send(axum::extract::ws::Message::Text(update_msg.clone().into())); } } } (StatusCode::OK, format!("sent message: {}, to users:{:?}", payload.message, participant_ids)) } Ok(_) => (StatusCode::NOT_FOUND, "Conversation not found".to_string()), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Error from DB: {err}")), } } #[derive(Debug, Serialize)] struct Message { id: i32, sender_id: i32, content: String, sent_at: String, } pub async fn get_message( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, GetMessagesPayload(payload):GetMessagesPayload ) -> Result { let pool = state.hotel_pools.get_pool(hotel_id); let conn = pool.get() .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Pool error".to_string()))?; let from_time = match payload.timestamp.as_deref() { Some("0") | None => "1970-01-01 00:00:00", // default to epoch Some(ts) => ts, }; let mut stmt = conn.prepare( "SELECT id, sender_id, content, sent_at FROM message WHERE conversation_id = ?1 AND sent_at > ?2 ORDER BY sent_at DESC LIMIT 50" ).map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Prepare failed".to_string()))?; let messages = stmt.query_map( params![payload.conv_id, from_time], |row| { Ok(Message { id: row.get(0)?, sender_id: row.get(1)?, content: row.get(2)?, sent_at: row.get(3)?, }) } ) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string()))? .collect::, _>>() .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Collect failed".to_string()))?; let response = serde_json::to_string(&messages) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Serialisation failed")); Ok((StatusCode::OK, response )) } #[derive(Serialize)] struct User { id: i32, username: String, display_name: String, } pub async fn get_hotel_users( State(state): State, AuthClaims { hotel_id, .. }: AuthClaims, ) -> impl IntoResponse { let conn = match state.logs_pool.get() { Ok(c) => c, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "DB connection error".to_string()), }; let mut stmt = match conn.prepare( "SELECT id, username, displayname FROM users WHERE hotel_id = ?1", ) { Ok(s) => s, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Prepare failed: {}", e)), }; let users_iter = match stmt.query_map(params![hotel_id], |row| { Ok(User { id: row.get(0)?, username: row.get(1)?, display_name: row.get(2)?, }) }) { Ok(iter) => iter, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Query failed: {}", e)), }; let users: Vec = match users_iter.collect::, _>>() { Ok(u) => u, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Collect failed: {}", e)), }; match serde_json::to_string(&users) { Ok(json) => (StatusCode::OK, json), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Serialization failed: {}", e)), } }