diff options
Diffstat (limited to 'src/conn.rs')
-rw-r--r-- | src/conn.rs | 286 |
1 files changed, 155 insertions, 131 deletions
diff --git a/src/conn.rs b/src/conn.rs index de0f704..5325bf4 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,151 +1,175 @@ use crate::types::*; -use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use warp::ws::Message; +use std::{ + sync::Arc, + net::SocketAddr, +}; +use tokio::sync::RwLock; +use futures_util::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; +use warp::ws::{ WebSocket, Message }; -pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) { - let addr = conndata.remote_addr; - let peers = &conndata.peers; - let cmd_tx = conndata.cmd_tx.clone(); - println!("Incoming TCP connection from: {}", conndata.remote_addr); - - let mut peer_name = "unknown".to_string(); - - // for game -> conn comms +pub async fn lobby(socket: WebSocket, addr: SocketAddr, room: Arc<RwLock<Room>>) { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - // for server -> client comms + + // server <-> client comms let (mut outgoing, mut incoming) = socket.split(); - let process_incoming = async { - while let Ok(cmd) = incoming.try_next().await { - if let Some(cmd) = cmd { - if cmd.is_close() { - println!("closing \"{peer_name}\"@{addr}"); - let mut peers = peers.write().await; - if let Some(_) = peers.get(&addr) { - peers.remove(&addr); - } - for (paddr, p) in peers.iter() { - if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) { - println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); - } - } - break; - } - // if it ain't text we can't handle it - if !cmd.is_text() { continue; } - let cmd = cmd.to_str().to_owned().unwrap(); + println!("Incoming TCP connection from: {}", addr); - let mut fields = cmd.split(" "); - let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { - let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); - let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); - x.zip(y) - }; - if let Some(cmd_name) = fields.next() { - use crate::minesweeper::{Move,MoveType}; - match cmd_name { - "pos" => { - match parse_pos(fields) { - Some(pos) => { - let (name, clr, id) = { - let mut peers = peers.write().await; - let mut entry = peers.get_mut(&addr).unwrap(); - entry.position = pos.clone(); - (entry.name.clone(), entry.clr.clone(), entry.seq_id) - }; - let sanitized_name = name.replace(" ", " ").to_string(); - { - let peers = peers.read().await; - for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { - let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); - if let Err(e) = r { - println!("error sending pos update: {e}"); - } - } - } - }, - None => { - println!("bad position update from \"{peer_name}@{addr}\""); - }, - } - }, - "reveal" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { - println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); - }; - }, - None => { - println!("bad reveal from \"{peer_name}\"@{addr}"); - } - } - }, - "flag" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { - println!("couldn't process \"{peer_name}\"'s flag command: {e}"); - }; - }, - None => { - println!("bad flag from \"{peer_name}\"@{addr}"); - } - } - }, - "reset" => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Reset) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - "register" => { - let all_fields = fields.collect::<Vec<&str>>(); - let fcount = all_fields.len(); - peer_name = "anon".into(); - let id; - if fcount >= 2 { - peer_name = all_fields[..fcount-1].join(" "); - } - let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>(); - if clr.is_empty() { - clr = "#f03333".into(); - } - { // new scope cuz paranoid bout deadlocks - let mut peers = peers.write().await; - id = peers.len(); - peers.insert(addr, Peer { - tx: tx.clone(), - seq_id: id, - name: peer_name.clone(), - clr, - position: (0,0) - }); - } - tx.send(Message::text(format!("id {}", id))).unwrap(); - if let Err(e) = cmd_tx.send(MetaMove::Dump) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), + let mut registered = false; + while !registered { + match incoming.try_next().await { + Ok(cmd) => { + if let Some(cmd) = cmd { + // if it ain't text we can't handle it + if !cmd.is_text() { return; } + let cmd = cmd.to_str().to_owned().unwrap(); + let mut fields = cmd.split(" "); + + if fields.next() == Some("register") { + let all_fields = fields.collect::<Vec<&str>>(); + let fcount = all_fields.len(); + let mut name = "anon".to_string(); + if fcount >= 2 { + name = all_fields[..fcount-1].join(" "); + } + let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>(); + if clr.is_empty() { + clr = "#f03333".into(); + } + println!("registered {name}"); + let uid = { + // new scope cuz paranoid bout deadlocks + let conn = Conn { addr, tx: tx.clone() }; + room.write().await.players.insert_conn(conn, name, clr).await + }; + tx.send(Message::text(format!("id {uid}"))).expect("couldn't send register ack"); + if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { + println!("couldn't request game dump in behalf of {addr}: {e}"); + } + registered = true; } } + }, + Err(e) => { + println!("error reading socket {addr}: {e}"); } } - }; + } + + let drive_game = handle_room(incoming, addr, room.clone()); let send_to_client = async move { while let Some(m) = rx.recv().await { if let Err(e) = outgoing.send(m).await { println!("something went bad lol: {e}"); - }; + } } }; - futures_util::pin_mut!(process_incoming, send_to_client); - futures_util::future::select(process_incoming, send_to_client).await; + tokio::select! { + _ = drive_game => (), + _ = send_to_client => { println!("anomalous close for {addr}"); } + }; + let room_lock = room.read().await; + let mut players = room_lock.players.write().await; + if players.contains_key(&addr) { + players.remove(&addr); + } + for (paddr, p) in players.iter() { + if let Err(e) = p.conn.tx.send(Message::text("logoff {p.seqid} {p.name}")) { + println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); + } + } println!("{addr} disconnected"); - peers.write().await.remove(&addr); +} + +pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, room: Arc<RwLock<Room>>) { + let (players, cmd_tx) = { + let room = room.read().await; + (room.players.clone(), room.cmd_stream.clone()) + }; + while let Ok(cmd) = incoming.try_next().await { + if let Some(cmd) = cmd { + // if it ain't text we can't handle it + if !cmd.is_text() { continue; } + let cmd = cmd.to_str().to_owned().unwrap(); + + let mut fields = cmd.split(" "); + let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { + let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); + let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); + x.zip(y) + }; + if let Some(cmd_name) = fields.next() { + use crate::minesweeper::{Move,MoveType}; + let me_lock = players.read().await; + let me = me_lock.get(&addr).expect("player not found"); + match cmd_name { + "pos" => { + match parse_pos(fields) { + Some(pos) => { + drop(me); + drop(me_lock); // So the read lock isn't held + let (name, clr, uid) = { + let mut players = players.write().await; + let mut entry = players.get_mut(&addr).unwrap(); + entry.position = pos.clone(); + (entry.name.clone(), entry.clr.clone(), entry.uid) + }; + let sanitized_name = name.replace(" ", " ").to_string(); + { + let players = players.read().await; + for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { + let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); + if let Err(e) = r { + println!("error sending pos update: {e}"); + } + } + } + }, + None => { + println!("bad position update from {}", me); + }, + } + }, + "reveal" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from {me}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { + println!("couldn't process {me}'s reveal command: {e}"); + }; + }, + None => { + println!("bad reveal from {me}"); + } + } + }, + "flag" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from {me}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { + println!("couldn't process {me}'s flag command: {e}"); + }; + }, + None => { + println!("bad flag from {me}"); + } + } + }, + "reset" => { + println!("{cmd} from {me}"); + if let Err(e) = cmd_tx.send(MetaMove::Reset) { + println!("couldn't request game dump in behalf of {me}: {e}"); + } + }, + e => println!("unknown command {e:?} from {me}, \"{cmd}\""), + } + } + } else { + println!("reached end of stream for {addr}"); + break; + } + } } |