use crate::types::*; use std::{ sync::Arc, net::SocketAddr, }; use tokio::sync::RwLock; use futures_util::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; use warp::ws::{ WebSocket, Message }; pub async fn lobby(socket: WebSocket, addr: SocketAddr, room: Arc>) { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); // server <-> client comms let (mut outgoing, mut incoming) = socket.split(); println!("Incoming TCP connection from: {}", addr); let mut registered = false; while !registered { match incoming.try_next().await { Ok(cmd) => { if let Some(cmd) = cmd { // if it ain't text we can't handle it if !cmd.is_text() { return; } let cmd = cmd.to_str().to_owned().unwrap(); let mut fields = cmd.split(" "); if fields.next() == Some("register") { let mut all_fields = fields.collect::>(); let clr = all_fields.pop().expect("register without color").chars().filter(|c| c.is_digit(16) || *c == '#').collect::(); let name = { let def = "anon".to_string(); if all_fields.is_empty() { def } else { let n = all_fields.join(" "); if n.is_empty() { def } else { n } } }; println!("registered \"{name}@{addr}\""); let uid = { // new scope cuz paranoid bout deadlocks let conn = Conn { addr, tx: tx.clone() }; room.write().await.players.insert_conn(conn, name.clone(), clr).await }; tx.send(Message::text(format!("regack {} {uid} {}", name.replace(" ", " "), room.read().await.board_conf))).expect("couldn't send register ack"); if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { println!("couldn't request game dump in behalf of {addr}: {e}"); } registered = true; } } }, Err(e) => { println!("error reading socket {addr}: {e}"); } } } let drive_game = handle_room(incoming, addr, room.clone()); let send_to_client = async move { while let Some(m) = rx.recv().await { if let Err(e) = outgoing.send(m).await { println!("something went bad lol: {e}"); } } }; tokio::select! { _ = drive_game => (), _ = send_to_client => { println!("anomalous close for {addr}"); } }; let room_lock = room.read().await; let mut players = room_lock.players.write().await; let disconn_p = players.remove(&addr); for p in players.values() { if let Err(e) = p.conn.tx.send(Message::text("logoff {disconn_p.seqid} {disconn_p.name}")) { println!("couldn't deliver logoff info to {}: {}", p, e); } } println!("{} disconnected", if let Some(p) = disconn_p { p.to_string() } else { addr.to_string() }); } pub async fn handle_room(mut incoming: SplitStream, addr: SocketAddr, room: Arc>) { let (players, cmd_tx) = { let room = room.read().await; (room.players.clone(), room.cmd_stream.clone()) }; while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { // if it ain't text we can't handle it if !cmd.is_text() { continue; } let cmd = cmd.to_str().to_owned().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { let x = fields.next().and_then(|xstr| xstr.parse::().ok()); let y = fields.next().and_then(|ystr| ystr.parse::().ok()); x.zip(y) }; if let Some(cmd_name) = fields.next() { use crate::minesweeper::{Move,MoveType}; let me_lock = players.read().await; let me = me_lock.get(&addr).expect("player not found"); match cmd_name { "pos" => { match parse_pos(fields) { Some(pos) => { drop(me); drop(me_lock); // So the read lock isn't held let (name, clr, uid) = { let mut players = players.write().await; let mut entry = players.get_mut(&addr).unwrap(); entry.position = pos.clone(); (entry.name.clone(), entry.clr.clone(), entry.uid) }; let sanitized_name = name.replace(" ", " ").to_string(); { let players = players.read().await; for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } } } }, None => { println!("bad position update from {}", me); }, } }, "reveal" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from {me}"); if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { println!("couldn't process {me}'s reveal command: {e}"); }; }, None => { println!("bad reveal from {me}"); } } }, "flag" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from {me}"); if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { println!("couldn't process {me}'s flag command: {e}"); }; }, None => { println!("bad flag from {me}"); } } }, "reset" => { println!("{cmd} from {me}"); if let Err(e) = cmd_tx.send(MetaMove::Reset) { println!("couldn't request game dump in behalf of {me}: {e}"); } }, e => println!("unknown command {e:?} from {me}, \"{cmd}\""), } } } else { println!("reached end of stream for {addr}"); break; } } }