use std::collections::{HashMap, HashSet}; use axum::{ Json, extract::{ FromRequest,FromRequestParts, State, Path, }, http::StatusCode, response::{IntoResponse, sse::KeepAlive} }; //use axum::extract::ws::Message; use chrono::NaiveDateTime; use rusqlite::{Name, Statement, params}; use rusqlite::OptionalExtension; use serde::Serialize; use serde_json::{json, to_value}; use crate::chat::extractor::{ AddUserConversationPayload, CreateConversationPayload, GetMessagesPayload, SendMessagePayload }; use crate::utils::db_pool::{AppState}; use crate::utils::auth::AuthClaims; pub async fn create_conversation( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, CreateConversationPayload(payload): CreateConversationPayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let result = conn.execute( "INSERT INTO conversation (creator_id, title) VALUES (?1, ?2)", params![&user_id, &payload.title], ); match result { Ok(rows) if rows > 0 => (StatusCode::OK, format!("Created conversation {}", payload.title)), Ok(_) => (StatusCode::NOT_FOUND, "not able to create the conversation".to_string() ), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Error when creating the conversation : {err}")), } } pub async fn add_user_to_conv( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, AddUserConversationPayload(payload):AddUserConversationPayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let mut conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let is_creator = match conn .query_row( "SELECT 1 FROM conversation WHERE creator_id = ?1 AND id = ?2", params![user_id, payload.conv_id], |_| Ok(()), ) .optional() { Ok(Some(_)) => true, Ok(None) => false, Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, "Creator check failed".to_string(), ) } }; if !is_creator { return ( StatusCode::FORBIDDEN, "Not the creator of the conversation".to_string(), ); } //fix this let existing: HashSet = { let mut stmt = match conn.prepare( "SELECT user_id FROM conversation_participants WHERE conversation_id = ?1", ) { Ok(s) => s, Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, "Prepare participants stmt failed".to_string(), ) } }; match stmt.query_map(params![payload.conv_id], |row| row.get(0)) { Ok(rows) => rows.filter_map(Result::ok).collect(), Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, "Query participants failed".to_string(), ) } } }; // ← stmt dropped HERE let payload_users: HashSet = payload.users.into_iter().collect(); let to_add: Vec = payload_users .difference(&existing) .copied() .collect(); let to_remove : Vec = existing .difference(&payload_users) .copied() .collect(); let tx = match conn.transaction() { Ok(t) => t, Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, "Transaction start failed for update conv participants".to_string(), ) } }; for user_id in to_add { if let Err(err) = tx.execute( "INSERT INTO conversation_participants (conversation_id, user_id) VALUES (?1, ?2)", params![payload.conv_id, user_id], ) { return ( StatusCode::INTERNAL_SERVER_ERROR, format!("Insert failed for {}: {}", user_id, err), ); } } for user_id in to_remove { if let Err(err) = tx.execute( "DELETE FROM conversation_participants WHERE conversation_id = ?1 AND user_id = ?2", params![payload.conv_id, user_id], ) { return ( StatusCode::INTERNAL_SERVER_ERROR, format!("Delete failed for {}: {}", user_id, err), ); } } if let Err(_) = tx.commit() { return ( StatusCode::INTERNAL_SERVER_ERROR, "Transaction commit failed".to_string(), ); } return (StatusCode::OK, "ok".to_string()); } /* #[derive(Deserialize, Serialize, Debug)] pub struct Get */ pub async fn get_conv_users( State(state): State, AuthClaims{user_id, hotel_id}: AuthClaims, Path(conv_id): Path<(i32)>, ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get (){ Ok(c)=> c, Err(err)=> return (StatusCode::INTERNAL_SERVER_ERROR, format!("Error opening pol connection : {err}") ) }; let mut stmt = match conn.prepare( "SELECT conversation_id, name FROM conversation_participants WHERE user_id = ?1", ) { Ok(s) => s, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Prepare failed: {}", e)), }; //fix this let existing: HashSet = { let mut stmt = match conn.prepare( "SELECT user_id FROM conversation_participants WHERE conversation_id = ?1", ) { Ok(s) => s, Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, "Prepare participants stmt failed".to_string(), ) } }; match stmt.query_map(params![conv_id], |row| row.get(0)) { Ok(rows) => rows.filter_map(Result::ok).collect(), Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, "Query participants failed".to_string(), ) } } }; // ← stmt dropped HERE let present: Vec = existing .into_iter() .collect(); match serde_json::to_string(&present) { Ok(json) => (StatusCode::OK, json), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Serialization failed: {}", e)), } } pub async fn send_message( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, SendMessagePayload(payload):SendMessagePayload ) -> impl IntoResponse { //TODO: make sur the convid is valid let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let mut statement = match conn.prepare( "SELECT 1 FROM conversation_participants WHERE user_id = ?1 AND conversation_id = ?2" , ){ Ok(statement) => statement, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "prepare failed".to_string()) }; match statement.exists(params![user_id, payload.conv_id]) { Ok(true) => { // user is part of the conversation — continue } Ok(false) => { // early exit: not part of the conversation return (StatusCode::FORBIDDEN, "Not part of the conversation".to_string()); } Err(_) => { // query failed return (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string()); } } let result = conn.execute( "INSERT INTO message (sender_id, content, conversation_id) VALUES (?1, ?2, ?3)", params![&user_id, &payload.message, &payload.conv_id], ); match result { Ok(rows) if rows > 0 => { // --- send to conversation participants --- let mut stmt_participants = conn .prepare("SELECT user_id FROM conversation_participants WHERE conversation_id = ?1") .expect("prepare participants failed"); let participant_ids: Vec = stmt_participants .query_map(params![payload.conv_id], |row| row.get(0)) .expect("query_map failed") .filter_map(Result::ok) .collect(); if let Some(hotel_users) = state.ws_map.get(&hotel_id) { let update_msg = serde_json::json!({ "event-type": "chat-message", "conv_id": payload.conv_id, "sender": user_id, "content": payload.message, }) .to_string(); for uid in &participant_ids { if let Some(sender) = hotel_users.get(&uid) { let _ = sender.send(axum::extract::ws::Message::Text(update_msg.clone().into())); } } } (StatusCode::OK, format!("sent message: {}, to users:{:?}", payload.message, participant_ids)) } Ok(_) => (StatusCode::NOT_FOUND, "Conversation not found".to_string()), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Error from DB: {err}")), } } #[derive(Debug, Serialize)] struct Message { id: i32, sender_id: i32, content: String, sent_at: String, } pub async fn get_message( State(state): State, AuthClaims {user_id, hotel_id}: AuthClaims, GetMessagesPayload(payload):GetMessagesPayload ) -> Result { let pool = state.hotel_pools.get_pool(hotel_id); let conn = pool.get() .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Pool error".to_string()))?; let from_time = match payload.timestamp.as_deref() { Some("0") | None => "1970-01-01 00:00:00", // default to epoch Some(ts) => ts, }; let mut stmt = conn.prepare( "SELECT id, sender_id, content, sent_at FROM message WHERE conversation_id = ?1 AND sent_at > ?2 ORDER BY sent_at DESC LIMIT 50" ).map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Prepare failed".to_string()))?; let messages = stmt.query_map( params![payload.conv_id, from_time], |row| { Ok(Message { id: row.get(0)?, sender_id: row.get(1)?, content: row.get(2)?, sent_at: row.get(3)?, }) } ) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string()))? .collect::, _>>() .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Collect failed".to_string()))?; let response = serde_json::to_string(&messages) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Serialisation failed")); Ok((StatusCode::OK, response )) } #[derive(Serialize)] struct User { id: i32, username: String, //display_name: String, } pub async fn get_hotel_users( State(state): State, AuthClaims { hotel_id, .. }: AuthClaims, ) -> impl IntoResponse { let conn = match state.logs_pool.get() { Ok(c) => c, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "DB connection error".to_string()), }; let mut stmt = match conn.prepare( "SELECT user_id, username FROM hotel_user_link WHERE hotel_id = ?1", ) { Ok(s) => s, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Prepare failed: {}", e)), }; let users_iter = match stmt.query_map(params![hotel_id], |row| { Ok(User { id: row.get(0)?, username: row.get(1)?, //display_name: row.get(2)?, }) }) { Ok(iter) => iter, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Query failed: {}", e)), }; let users: Vec = match users_iter.collect::, _>>() { Ok(u) => u, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Collect failed: {}", e)), }; match serde_json::to_string(&users) { Ok(json) => (StatusCode::OK, json), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Serialization failed: {}", e)), } } #[derive(Debug, Serialize)] struct Conversation { id: i32, title: String, } pub async fn get_convs( State(state): State, //Path((item_name, item_amount)): Path<(String, i32)>, AuthClaims{ user_id, hotel_id}: AuthClaims, ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error: {err}")), }; let mut stmt = match conn.prepare( "SELECT conversation_id, name FROM conversation_participants WHERE user_id = ?1", ) { Ok(s) => s, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Prepare failed: {}", e)), }; let rows = match stmt.query_map(params![user_id], |row| { let conversation_id: i32 = row.get(0)?; let name: String = row.get(1)?; Ok(Conversation { id: row.get(0)?, title: row.get(1)?, }) }) { Ok(rows) => rows, //Ok(_) => {}, IMPLEMENT NO CONV ? Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Query failed: {}", e)), }; let convs: Vec = match rows.collect::, _>>() { Ok(u) => u, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Collect failed: {}", e)), }; //.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json("error".to_string()))); match serde_json::to_string(&convs) { Ok(json) => (StatusCode::OK, json), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Serialization failed: {}", e)), } } /* pub async fn get_convs( State(state): State, //Path((item_name, item_amount)): Path<(String, i32)>, AuthClaims{ user_id, hotel_id}: AuthClaims, ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error: {}", err).into_response() ) }; let mut stmt = match conn.prepare( "SELECT id, title FROM conversation WHERE creator_id = ?1", ) { Ok(s) => s, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Prepare failed: {}", e).into_response() ) }; let rows = match stmt.query_map(params![user_id], |row| { let id: i32 = row.get(0)?; let title: String = row.get(1)?; Ok((title, id)) }) { Ok(rows) => rows, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Query failed: {}", e).into_response() ) }; let mut map = HashMap::new(); // ✅ Iterate through the row results for row_result in rows { match row_result { Ok((title, id)) => { map.insert(title, id); } Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Row parsing failed: {}", e).into_response() ) } } let conv_map_json = match to_value(map) { Ok(c) => c, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("List unwrapping failed: {}", e).into_response() ) }; let conv_map_clean_json = serde_json::to_value(map) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Serialization failed: {}", e).into_response() )); (StatusCode::OK, Json(conv_map_clean_json)).into_response() } */