diff options
| -rw-r--r-- | src/conn.rs | 286 | ||||
| -rw-r--r-- | src/main.rs | 65 | ||||
| -rw-r--r-- | src/types.rs | 88 | 
3 files changed, 266 insertions, 173 deletions
| diff --git a/src/conn.rs b/src/conn.rs index de0f704..5325bf4 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,151 +1,175 @@  use crate::types::*; -use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use warp::ws::Message; +use std::{ +    sync::Arc, +    net::SocketAddr, +}; +use tokio::sync::RwLock; +use futures_util::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; +use warp::ws::{ WebSocket, Message }; -pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) { -    let addr = conndata.remote_addr; -    let peers = &conndata.peers; -    let cmd_tx = conndata.cmd_tx.clone(); -    println!("Incoming TCP connection from: {}", conndata.remote_addr); - -    let mut peer_name = "unknown".to_string(); - -    // for game -> conn comms +pub async fn lobby(socket: WebSocket, addr: SocketAddr, room: Arc<RwLock<Room>>) {      let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); -    // for server -> client comms + +    // server <-> client comms      let (mut outgoing, mut incoming) = socket.split(); -    let process_incoming = async { -        while let Ok(cmd) = incoming.try_next().await { -            if let Some(cmd) = cmd { -                if cmd.is_close() { -                    println!("closing \"{peer_name}\"@{addr}"); -                    let mut peers = peers.write().await; -                    if let Some(_) = peers.get(&addr) { -                        peers.remove(&addr); -                    } -                    for (paddr, p) in peers.iter() { -                        if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) { -                            println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); -                        } -                    } -                    break; -                } -                // if it ain't text we can't handle it -                if !cmd.is_text() { continue; } -                let cmd = cmd.to_str().to_owned().unwrap(); +    println!("Incoming TCP connection from: {}", addr); -                let mut fields = cmd.split(" "); -                let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { -                    let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); -                    let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); -                    x.zip(y) -                }; -                if let Some(cmd_name) = fields.next() { -                    use crate::minesweeper::{Move,MoveType}; -                    match cmd_name { -                        "pos" => { -                            match parse_pos(fields) { -                                Some(pos) => { -                                    let (name, clr, id) = { -                                        let mut peers = peers.write().await; -                                        let mut entry = peers.get_mut(&addr).unwrap(); -                                        entry.position = pos.clone(); -                                        (entry.name.clone(), entry.clr.clone(), entry.seq_id) -                                    }; -                                    let sanitized_name = name.replace(" ", " ").to_string(); -                                    { -                                        let peers = peers.read().await; -                                        for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { -                                            let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); -                                            if let Err(e) = r { -                                                println!("error sending pos update: {e}"); -                                            } -                                        } -                                    } -                                }, -                                None => { -                                    println!("bad position update from \"{peer_name}@{addr}\""); -                                }, -                            } -                        }, -                        "reveal" => { -                            match parse_pos(fields) { -                                Some(pos) => { -                                    println!("{cmd} from \"{peer_name}\"@{addr}"); -                                    if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { -                                        println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); -                                    }; -                                }, -                                None => { -                                    println!("bad reveal from \"{peer_name}\"@{addr}"); -                                } -                            } -                        }, -                        "flag" => { -                            match parse_pos(fields) { -                                Some(pos) => { -                                    println!("{cmd} from \"{peer_name}\"@{addr}"); -                                    if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { -                                        println!("couldn't process \"{peer_name}\"'s flag command: {e}"); -                                    }; -                                }, -                                None => { -                                    println!("bad flag from \"{peer_name}\"@{addr}"); -                                } -                            } -                        }, -                        "reset" => { -                            println!("{cmd} from \"{peer_name}\"@{addr}"); -                            if let Err(e) = cmd_tx.send(MetaMove::Reset) { -                                println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); -                            } -                        }, -                        "register" => { -                            let all_fields = fields.collect::<Vec<&str>>(); -                            let fcount = all_fields.len(); -                            peer_name = "anon".into(); -                            let id; -                            if fcount >= 2 { -                                peer_name = all_fields[..fcount-1].join(" "); -                            } -                            let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>(); -                            if clr.is_empty() { -                                clr = "#f03333".into(); -                            } -                            {   // new scope cuz paranoid bout deadlocks -                                let mut peers = peers.write().await; -                                id = peers.len(); -                                peers.insert(addr, Peer { -                                    tx: tx.clone(), -                                    seq_id: id, -                                    name: peer_name.clone(), -                                    clr, -                                    position: (0,0) -                                }); -                            } -                            tx.send(Message::text(format!("id {}", id))).unwrap(); -                            if let Err(e) = cmd_tx.send(MetaMove::Dump) { -                                println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); -                            } -                        }, -                        e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), +    let mut registered = false; +    while !registered { +        match incoming.try_next().await { +            Ok(cmd) => { +                if let Some(cmd) = cmd { +                    // if it ain't text we can't handle it +                    if !cmd.is_text() { return; } +                    let cmd = cmd.to_str().to_owned().unwrap(); +                    let mut fields = cmd.split(" "); + +                    if fields.next() == Some("register") { +                        let all_fields = fields.collect::<Vec<&str>>(); +                        let fcount = all_fields.len(); +                        let mut name = "anon".to_string(); +                        if fcount >= 2 { +                            name = all_fields[..fcount-1].join(" "); +                        } +                        let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>(); +                        if clr.is_empty() { +                            clr = "#f03333".into(); +                        } +                        println!("registered {name}"); +                        let uid = { +                            // new scope cuz paranoid bout deadlocks +                            let conn = Conn { addr, tx: tx.clone() }; +                            room.write().await.players.insert_conn(conn, name, clr).await +                        }; +                        tx.send(Message::text(format!("id {uid}"))).expect("couldn't send register ack"); +                        if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { +                            println!("couldn't request game dump in behalf of {addr}: {e}"); +                        } +                        registered = true;                      }                  } +            }, +            Err(e) => { +                println!("error reading socket {addr}: {e}");              }          } -    }; +    } + +    let drive_game = handle_room(incoming, addr, room.clone());      let send_to_client = async move {          while let Some(m) = rx.recv().await {              if let Err(e) = outgoing.send(m).await {                  println!("something went bad lol: {e}"); -            }; +            }          }      }; -    futures_util::pin_mut!(process_incoming, send_to_client); -    futures_util::future::select(process_incoming, send_to_client).await; +    tokio::select! { +        _ = drive_game => (), +        _ = send_to_client => { println!("anomalous close for {addr}"); } +    }; +    let room_lock = room.read().await; +    let mut players = room_lock.players.write().await; +    if players.contains_key(&addr) { +        players.remove(&addr); +    } +    for (paddr, p) in players.iter() { +        if let Err(e) = p.conn.tx.send(Message::text("logoff {p.seqid} {p.name}")) { +            println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); +        } +    }      println!("{addr} disconnected"); -    peers.write().await.remove(&addr); +} + +pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, room: Arc<RwLock<Room>>) { +    let (players, cmd_tx) = { +        let room = room.read().await; +        (room.players.clone(), room.cmd_stream.clone()) +    }; +    while let Ok(cmd) = incoming.try_next().await { +        if let Some(cmd) = cmd { +            // if it ain't text we can't handle it +            if !cmd.is_text() { continue; } +            let cmd = cmd.to_str().to_owned().unwrap(); + +            let mut fields = cmd.split(" "); +            let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { +                let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); +                let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); +                x.zip(y) +            }; +            if let Some(cmd_name) = fields.next() { +                use crate::minesweeper::{Move,MoveType}; +                let me_lock = players.read().await; +                let me = me_lock.get(&addr).expect("player not found"); +                match cmd_name { +                    "pos" => { +                        match parse_pos(fields) { +                            Some(pos) => { +                                drop(me); +                                drop(me_lock); // So the read lock isn't held +                                let (name, clr, uid) = { +                                    let mut players = players.write().await; +                                    let mut entry = players.get_mut(&addr).unwrap(); +                                    entry.position = pos.clone(); +                                    (entry.name.clone(), entry.clr.clone(), entry.uid) +                                }; +                                let sanitized_name = name.replace(" ", " ").to_string(); +                                { +                                    let players = players.read().await; +                                    for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { +                                        let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); +                                        if let Err(e) = r { +                                            println!("error sending pos update: {e}"); +                                        } +                                    } +                                } +                            }, +                            None => { +                                println!("bad position update from {}", me); +                            }, +                        } +                    }, +                    "reveal" => { +                        match parse_pos(fields) { +                            Some(pos) => { +                                println!("{cmd} from {me}"); +                                if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { +                                    println!("couldn't process {me}'s reveal command: {e}"); +                                }; +                            }, +                            None => { +                                println!("bad reveal from {me}"); +                            } +                        } +                    }, +                    "flag" => { +                        match parse_pos(fields) { +                            Some(pos) => { +                                println!("{cmd} from {me}"); +                                if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { +                                    println!("couldn't process {me}'s flag command: {e}"); +                                }; +                            }, +                            None => { +                                println!("bad flag from {me}"); +                            } +                        } +                    }, +                    "reset" => { +                        println!("{cmd} from {me}"); +                        if let Err(e) = cmd_tx.send(MetaMove::Reset) { +                            println!("couldn't request game dump in behalf of {me}: {e}"); +                        } +                    }, +                    e => println!("unknown command {e:?} from {me}, \"{cmd}\""), +                } +            } +        } else { +            println!("reached end of stream for {addr}"); +            break; +        } +    }  } diff --git a/src/main.rs b/src/main.rs index fe55ab2..0a17346 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@  use std::{ -    collections::HashMap,      error::Error,      net::SocketAddr, +    sync::Arc,  };  use warp::Filter; @@ -22,43 +22,47 @@ fn main() -> Result<(), Box<dyn Error>> {          socket_addr: ([0,0,0,0],31235).into(),      }; -    let state = State { -        conf, -        peers: PeerMap::new(RwLock::new(HashMap::new())), -    }; - -    tokio_main(state) +    tokio_main(conf)  }  #[tokio::main] -async fn tokio_main(state: State) -> Result<(), Box<dyn Error>> { -    // Start the temporary single lobby -    let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); -    let _game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); - -    let cmd_filter = warp::any().map(move || cmd_tx.clone()); +async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> { +    // Start the temporary single room +    let room = Arc::new(RwLock::new({ +        let name = "Testing room".to_string(); +        let players = PlayerMap::default(); +        let bconf = BoardConf { w: 75, h: 35, mine_ratio: (1, 8) }; +        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); +        let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), bconf)); +        Room { +            name, +            players, +            peer_limit: 32, +            board_conf: bconf, +            cmd_stream: cmd_tx, +            driver: handle, +        } +    }));      let page_route = {          use warp::*;          get()          .and(path("font.ttf"))          .map(|| FONT_FILE) -        .or(fs::file(state.conf.page_path.clone())) +        .or(fs::file(conf.page_path.clone()))      };      let websocket_route = { -        let state = state.clone(); +        let room = room.clone();          use warp::*;          path("ws")              .and(ws())              .and(addr::remote()) -            .and(cmd_filter) -            .map(move |ws: warp::ws::Ws, saddr: Option<SocketAddr>, cmd_tx: CmdTx| { -                let state = state.clone(); +            .map(move |ws: warp::ws::Ws, saddr: Option<SocketAddr>| { +                let room = room.clone();                  println!("conn from {saddr:?}");                  ws.on_upgrade(move |socket| { -                    let conn_data = types::ConnData { remote_addr: saddr.unwrap(), cmd_tx: cmd_tx.clone(), peers: state.peers.clone() }; -                    conn::peer_connection(socket, conn_data) +                    conn::lobby(socket, saddr.expect("socket without address"), room.clone())                  })              })      }; @@ -66,18 +70,19 @@ async fn tokio_main(state: State) -> Result<(), Box<dyn Error>> {      let server = warp::serve(routes)          .tls() -        .cert_path(state.conf.cert_path) -        .key_path(state.conf.pkey_path) -        .run(state.conf.socket_addr); -    println!("Serving on {}", state.conf.socket_addr); +        .cert_path(conf.cert_path) +        .key_path(conf.pkey_path) +        .run(conf.socket_addr); +    println!("Serving on {}", conf.socket_addr);      server.await;      Ok(())  }  // If a move is made, broadcast new board, else just send current board -async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) { +async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, players: PlayerMapData, bconf: BoardConf) {      use minesweeper::*; -    let mut game = Game::new(Board::new(75,35), (75*35)/8); +    let mine_cnt = (bconf.w * bconf.h * bconf.mine_ratio.0)/(bconf.mine_ratio.1); +    let mut game = Game::new(Board::new(bconf.w, bconf.h), mine_cnt);      let mut latest_player_name = None;      while let Some(req) = move_rx.recv().await {          let done = game.phase == Phase::Die || game.phase == Phase::Win; @@ -87,10 +92,10 @@ async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, p                  if game.phase == Phase::Win || game.phase == Phase::Die {                      game.board = game.board.grade();                  } -                latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone()); +                latest_player_name = players.read().await.get(&o).map(|p| p.name.clone());              },              MetaMove::Dump => (), -            MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, +            MetaMove::Reset => { game = Game::new(Board::new(bconf.w, bconf.h), mine_cnt); },          }          use warp::ws::Message;          let mut reply = vec![Message::binary(game.board.render())]; @@ -101,10 +106,10 @@ async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, p              _ => (),          }          { -            let peers = peers.read().await; +            let peers = players.read().await;              for (addr, p) in peers.iter() {                  for r in reply.iter() { -                    if let Err(e) = p.tx.send(r.clone()) { +                    if let Err(e) = p.conn.tx.send(r.clone()) {                          println!("couldn't send game update {r:?} to {addr}: {e}");                      }                  } diff --git a/src/types.rs b/src/types.rs index 54a93bc..86f1e32 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,7 +1,12 @@  use std::{      collections::HashMap,      net::SocketAddr, -    sync::Arc, +    sync::{ +        Arc, +        atomic::{ AtomicUsize, Ordering }, +    }, +    fmt::Display, +    ops::{ Deref, DerefMut },  };  use warp::ws::Message;  use tokio::sync::RwLock; @@ -15,10 +20,28 @@ pub struct Config {      pub socket_addr: SocketAddr,  } -#[derive(Debug, Clone)] -pub struct State { -    pub conf: Config, -    pub peers: PeerMap, +#[derive(Debug, Clone, Copy)] +pub struct BoardConf { +    pub w: usize, +    pub h: usize, +    /// tiles to mines, expressed as (numerator, denominator) +    pub mine_ratio: (usize,usize), +} + +impl Display for BoardConf { +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        write!(f, "{}x{} {}/{}", self.w, self.h, self.mine_ratio.0, self.mine_ratio.1) +    } +} + +#[derive(Debug)] +pub struct Room { +    pub name: String, +    pub players: PlayerMap, +    pub peer_limit: u32, +    pub driver: tokio::task::JoinHandle<()>, +    pub cmd_stream: CmdTx, +    pub board_conf: BoardConf,  }  #[derive(Debug)] @@ -29,19 +52,60 @@ pub enum MetaMove {  }  #[derive(Debug)] -pub struct Peer { +pub struct Conn {      pub tx: tokio::sync::mpsc::UnboundedSender<Message>, -    pub seq_id: usize, +    pub addr: SocketAddr, +} + +#[derive(Debug)] +pub struct Player { +    pub conn: Conn, +    pub uid: usize,      pub name: String,      pub clr: String,      pub position: (usize, usize),  } -pub struct ConnData { -    pub cmd_tx: CmdTx, -    pub remote_addr: SocketAddr, -    pub peers: PeerMap, +impl Display for Player { +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        write!(f, "\"{}\"@{}", self.name, self.conn.addr) +    }  }  pub type CmdTx = tokio::sync::mpsc::UnboundedSender<MetaMove>; -pub type PeerMap = Arc<RwLock<HashMap<SocketAddr, Peer>>>; +pub type PlayerMapData = Arc<RwLock<HashMap<SocketAddr, Player>>>; +#[derive(Debug)] +pub struct PlayerMap { +    inner: PlayerMapData, +    uid_counter: AtomicUsize, +} + +impl Deref for PlayerMap { +    type Target = Arc<RwLock<HashMap<SocketAddr, Player>>>; +    fn deref(&self) -> &Self::Target { +        &self.inner +    } +} +impl DerefMut for PlayerMap { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.inner +    } +} +impl Default for PlayerMap { +    fn default() -> Self { +        Self { inner: Arc::new(RwLock::new(HashMap::new())), uid_counter: 0.into() } +    } +} + +impl PlayerMap { +    pub async fn insert_conn(&mut self, conn: Conn, name: String, clr: String) -> usize { +        let mut map = self.write().await; +        let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed); +        map.insert( +            conn.addr, +            Player { conn, uid, name, clr, position: (0,0) }, +        ); +        uid +    } +} + | 
