diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/conn.rs | 180 |
1 files changed, 81 insertions, 99 deletions
diff --git a/src/conn.rs b/src/conn.rs index 47d0ddc..dfd6623 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -8,12 +8,14 @@ use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; use warp::ws::{ WebSocket, Message }; use ammonia; +const MAX_IN: usize = 2048; + pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLock<Room>>)) { let (room_id, room) = rinfo; let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); // server <-> client comms - let (mut outgoing, mut incoming) = socket.split(); + let (mut outgoing, incoming) = socket.split(); println!("{room_id} I: Incoming TCP connection from: {}", addr); @@ -24,50 +26,7 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc pl.len() >= pcap }; if full { return } - - let mut registered = false; - while !registered { - match incoming.try_next().await { - Ok(cmd) => { - if let Some(cmd) = cmd { - // if it ain't text we can't handle it - if !cmd.is_text() { return; } - let cmd = cmd.to_str().to_owned().unwrap(); - let mut fields = cmd.split(" "); - - if fields.next() == Some("register") { - let mut all_fields = fields.collect::<Vec<&str>>(); - let clr = all_fields.pop().expect("register without color").chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>(); - let name = { - let def = "anon".to_string(); - if all_fields.is_empty() { def } - else { - let n = ammonia::clean(&all_fields.join(" ")); - if n.is_empty() { def } else { n } - } - }; - println!("{room_id} I: registered \"{name}@{addr}\""); - let uid = { - // new scope cuz paranoid bout deadlocks - let conn = Conn { addr, tx: tx.clone() }; - room.write().await.players.insert_conn(conn, name.clone(), clr).await - }; - tx.send(Message::text(format!("regack {} {uid} {}", name.replace(" ", " "), room.read().await.conf.board_conf))).expect("couldn't send register ack"); - if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { - println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}"); - } - registered = true; - } - } - }, - Err(e) => { - println!("{room_id} E: error reading socket {addr}: {e}"); - } - } - } - - let drive_game = handle_room(incoming, addr, (room_id.clone(),room.clone())); - + let drive_game = handle_room((incoming,tx), addr, (room_id.clone(),room.clone())); let send_to_client = { let room_id = room_id.clone(); async move { @@ -98,7 +57,8 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc } } -pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { +pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::UnboundedSender<Message>), addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { + let (mut incoming, tx) = streams; let (room_id, room) = rinfo; let (players, cmd_tx) = { let room = room.read().await; @@ -107,8 +67,13 @@ pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { // if it ain't text we can't handle it - if !cmd.is_text() { continue; } - let cmd = cmd.to_str().to_owned().unwrap(); + let cmd = match cmd.to_str() { + Ok(cmd) => { if cmd.len() > MAX_IN { + println!("{room_id} E: string too big: {cmd}"); + return + } else { cmd.to_owned() } }, + Err(_) => return + }; let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { @@ -118,66 +83,83 @@ pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, }; if let Some(cmd_name) = fields.next() { use crate::minesweeper::{Move,MoveType}; - let me_lock = players.read().await; - let me = me_lock.get(&addr).expect("player not found"); - match cmd_name { - "pos" => { - match parse_pos(fields) { - Some(pos) => { - drop(me); - drop(me_lock); // So the read lock isn't held - let (name, clr, uid) = { - let mut players = players.write().await; - let mut entry = players.get_mut(&addr).unwrap(); - entry.position = pos.clone(); - (entry.name.clone(), entry.clr.clone(), entry.uid) - }; - let sanitized_name = name.replace(" ", " ").to_string(); - { - let players = players.read().await; - for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { - let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); + let mut players_lock = players.write().await; + match players_lock.get_mut(&addr) { + Some(me) => match cmd_name { + "pos" => { + match parse_pos(fields) { + Some(pos) => { + me.position = pos.clone(); + let sanitized_name = me.name.replace(" ", " ").to_string(); + let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); + for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { + let r = peer_tx.send(Message::text(&msg)); if let Err(e) = r { println!("{room_id} E: error sending pos update: {e}"); } } + }, + None => { + println!("{room_id} E: bad position update from {me}"); + }, + } + }, + "reveal" => { + match parse_pos(fields) { + Some(pos) => { + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { + println!("{room_id} E: couldn't process {me}'s reveal command: {e}"); + }; + }, + None => { + println!("{room_id} E: bad reveal from {me}"); } - }, - None => { - println!("{room_id} E: bad position update from {me}"); - }, - } - }, - "reveal" => { - match parse_pos(fields) { - Some(pos) => { - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { - println!("{room_id} E: couldn't process {me}'s reveal command: {e}"); - }; - }, - None => { - println!("{room_id} E: bad reveal from {me}"); } - } - }, - "flag" => { - match parse_pos(fields) { - Some(pos) => { - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { - println!("{room_id} E: couldn't process {me}'s flag command: {e}"); - }; - }, - None => { - println!("{room_id} E: bad flag from {me}"); + }, + "flag" => { + match parse_pos(fields) { + Some(pos) => { + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { + println!("{room_id} E: couldn't process {me}'s flag command: {e}"); + }; + }, + None => { + println!("{room_id} E: bad flag from {me}"); + } } - } + }, + "reset" => { + if let Err(e) = cmd_tx.send(MetaMove::Reset) { + println!("{room_id} E: couldn't request game dump in behalf of {me}: {e}"); + } + }, + e => println!("{room_id} E: unknown command {e:?} from {me}: \"{cmd}\""), }, - "reset" => { - if let Err(e) = cmd_tx.send(MetaMove::Reset) { - println!("{room_id} E: couldn't request game dump in behalf of {me}: {e}"); + None => { + if cmd_name == "register" { + let mut all_fields = fields.collect::<Vec<&str>>(); + let clr = all_fields.pop().expect("register without color").chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>(); + let name = { + let def = "anon".to_string(); + if all_fields.is_empty() { def } + else { + let n = ammonia::clean(&all_fields.join(" ")); + if n.is_empty() { def } else { n } + } + }; + println!("{room_id} I: registered \"{name}@{addr}\""); + drop(players_lock); + let uid = { + // new scope cuz paranoid bout deadlocks + let conn = Conn { addr, tx: tx.clone() }; + room.write().await.players.insert_conn(conn, name.clone(), clr).await + }; + tx.send(Message::text(format!("regack {} {uid} {}", name.replace(" ", " "), room.read().await.conf.board_conf))).expect("couldn't send register ack"); + if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { + println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}"); + } } - }, - e => println!("{room_id} E: unknown command {e:?} from {me}: \"{cmd}\""), + } } } } else { |