use axum::{ extract::{ FromRequest,FromRequestParts, State, }, http::StatusCode, response::{sse::KeepAlive, IntoResponse} }; use chrono::NaiveDateTime; use rusqlite::params; use serde::Serialize; use crate::chat::extractor::{ AddUserConversationPayload, CreateConversationPayload, GetMessagesPayload, SendMessagePayload }; use crate::utils::db_pool::{AppState}; use crate::utils::auth::AuthClaims; pub async fn create_conversation( State(state): State, AuthClaims {user_id, hotel_id, username}: AuthClaims, CreateConversationPayload(payload): CreateConversationPayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let result = conn.execute( "INSERT INTO conversation (creator_id, title) VALUES (?1, ?2)", params![&user_id, &payload.title], ); match result { Ok(rows) if rows > 0 => (StatusCode::OK, format!("Created conversation {}", payload.title)), Ok(_) => (StatusCode::NOT_FOUND, "not able to create the conversation".to_string() ), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Error when creating the conversation : {err}")), } } pub async fn add_user_to_conv( State(state): State, AuthClaims {user_id, hotel_id,username}: AuthClaims, AddUserConversationPayload(payload):AddUserConversationPayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let mut statement = match conn.prepare( "SELECT 1 FROM conversation WHERE creator_id = ?1 AND id = ?2" , ){ Ok(statement) => statement, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "prepare failed".to_string()) }; if !statement.exists(params![user_id, payload.conv_id]) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string())).unwrap() { // Early exit if not creator return ((StatusCode::FORBIDDEN, "Not the creator".to_string())); } for target_id in &payload.users { let rows_inserted = match conn.execute( "INSERT INTO conversation_participants (conversation_id, user_id) VALUES (?1, ?2)", params![payload.conv_id, target_id], ) { Ok(n) => n, Err(err) => { return (StatusCode::INTERNAL_SERVER_ERROR, format!("Err adding user {}: {}", target_id, err)); } }; if rows_inserted == 0 { return (StatusCode::NOT_FOUND, format!("Could not add user {}", target_id)); } } return (StatusCode::OK, "ok".to_string()); } pub async fn send_message( State(state): State, AuthClaims {user_id, hotel_id,username}: AuthClaims, SendMessagePayload(payload):SendMessagePayload ) -> impl IntoResponse { let pool = state.hotel_pools.get_pool(hotel_id); let conn = match pool.get(){ Ok(conn) => conn, Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("Pool error")) }; let mut statement = match conn.prepare( "SELECT 1 FROM conversation_participants WHERE user_id = ?1 AND conversation_id = ?2" , ){ Ok(statement) => statement, Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "prepare failed".to_string()) }; if !statement.exists(params![user_id, payload.conv_id]) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string())).unwrap() { // Early exit if not creator return ((StatusCode::FORBIDDEN, "Not the creator".to_string())); } let result = conn.execute( "INSERT INTO message (sender_id, content, conversation_id) VALUES (?1, ?2, ?3)", params![&user_id, &payload.message, &payload.conv_id], ); match result { Ok(rows) if rows > 0 => (StatusCode::OK, "added message succesfully".to_string()), Ok(_) => (StatusCode::NOT_FOUND, "not able to add the message, conversation may not exist".to_string() ), Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Error when adding the message : {err}")), } } #[derive(Debug, Serialize)] struct Message { id: i32, sender_id: i32, content: String, sent_at: String, } pub async fn get_message( State(state): State, AuthClaims {user_id, hotel_id,username}: AuthClaims, GetMessagesPayload(payload):GetMessagesPayload ) -> Result { let pool = state.hotel_pools.get_pool(hotel_id); let conn = pool.get() .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Pool error".to_string()))?; let from_time = match payload.timestamp.as_deref() { Some("0") | None => "1970-01-01 00:00:00", // default to epoch Some(ts) => ts, }; let mut stmt = conn.prepare( "SELECT id, sender_id, content, sent_at FROM message WHERE conversation_id = ?1 AND sent_at > ?2 ORDER BY sent_at DESC LIMIT 50" ).map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Prepare failed".to_string()))?; let messages = stmt.query_map( params![payload.conv_id, from_time], |row| { Ok(Message { id: row.get(0)?, sender_id: row.get(1)?, content: row.get(2)?, sent_at: row.get(3)?, }) } ).map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Query failed".to_string()))? .collect::, _>>() .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Collect failed".to_string()))?; Ok((StatusCode::OK, serde_json::to_string(&messages).unwrap())) }