From 0f010aca2957468c46cd849284873cb26f3a0df3 Mon Sep 17 00:00:00 2001 From: stale Date: Fri, 13 May 2022 17:49:36 -0300 Subject: laying the groundwork for rooms --- src/conn.rs | 286 ++++++++++++++++++++++++++++++++--------------------------- src/main.rs | 65 +++++++------- src/types.rs | 88 +++++++++++++++--- 3 files changed, 266 insertions(+), 173 deletions(-) (limited to 'src') diff --git a/src/conn.rs b/src/conn.rs index de0f704..5325bf4 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,151 +1,175 @@ use crate::types::*; -use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use warp::ws::Message; +use std::{ + sync::Arc, + net::SocketAddr, +}; +use tokio::sync::RwLock; +use futures_util::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; +use warp::ws::{ WebSocket, Message }; -pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) { - let addr = conndata.remote_addr; - let peers = &conndata.peers; - let cmd_tx = conndata.cmd_tx.clone(); - println!("Incoming TCP connection from: {}", conndata.remote_addr); - - let mut peer_name = "unknown".to_string(); - - // for game -> conn comms +pub async fn lobby(socket: WebSocket, addr: SocketAddr, room: Arc>) { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - // for server -> client comms + + // server <-> client comms let (mut outgoing, mut incoming) = socket.split(); - let process_incoming = async { - while let Ok(cmd) = incoming.try_next().await { - if let Some(cmd) = cmd { - if cmd.is_close() { - println!("closing \"{peer_name}\"@{addr}"); - let mut peers = peers.write().await; - if let Some(_) = peers.get(&addr) { - peers.remove(&addr); - } - for (paddr, p) in peers.iter() { - if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) { - println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); - } - } - break; - } - // if it ain't text we can't handle it - if !cmd.is_text() { continue; } - let cmd = cmd.to_str().to_owned().unwrap(); + println!("Incoming TCP connection from: {}", addr); - let mut fields = cmd.split(" "); - let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { - let x = fields.next().and_then(|xstr| xstr.parse::().ok()); - let y = fields.next().and_then(|ystr| ystr.parse::().ok()); - x.zip(y) - }; - if let Some(cmd_name) = fields.next() { - use crate::minesweeper::{Move,MoveType}; - match cmd_name { - "pos" => { - match parse_pos(fields) { - Some(pos) => { - let (name, clr, id) = { - let mut peers = peers.write().await; - let mut entry = peers.get_mut(&addr).unwrap(); - entry.position = pos.clone(); - (entry.name.clone(), entry.clr.clone(), entry.seq_id) - }; - let sanitized_name = name.replace(" ", " ").to_string(); - { - let peers = peers.read().await; - for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { - let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); - if let Err(e) = r { - println!("error sending pos update: {e}"); - } - } - } - }, - None => { - println!("bad position update from \"{peer_name}@{addr}\""); - }, - } - }, - "reveal" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { - println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); - }; - }, - None => { - println!("bad reveal from \"{peer_name}\"@{addr}"); - } - } - }, - "flag" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { - println!("couldn't process \"{peer_name}\"'s flag command: {e}"); - }; - }, - None => { - println!("bad flag from \"{peer_name}\"@{addr}"); - } - } - }, - "reset" => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Reset) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - "register" => { - let all_fields = fields.collect::>(); - let fcount = all_fields.len(); - peer_name = "anon".into(); - let id; - if fcount >= 2 { - peer_name = all_fields[..fcount-1].join(" "); - } - let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::(); - if clr.is_empty() { - clr = "#f03333".into(); - } - { // new scope cuz paranoid bout deadlocks - let mut peers = peers.write().await; - id = peers.len(); - peers.insert(addr, Peer { - tx: tx.clone(), - seq_id: id, - name: peer_name.clone(), - clr, - position: (0,0) - }); - } - tx.send(Message::text(format!("id {}", id))).unwrap(); - if let Err(e) = cmd_tx.send(MetaMove::Dump) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), + let mut registered = false; + while !registered { + match incoming.try_next().await { + Ok(cmd) => { + if let Some(cmd) = cmd { + // if it ain't text we can't handle it + if !cmd.is_text() { return; } + let cmd = cmd.to_str().to_owned().unwrap(); + let mut fields = cmd.split(" "); + + if fields.next() == Some("register") { + let all_fields = fields.collect::>(); + let fcount = all_fields.len(); + let mut name = "anon".to_string(); + if fcount >= 2 { + name = all_fields[..fcount-1].join(" "); + } + let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::(); + if clr.is_empty() { + clr = "#f03333".into(); + } + println!("registered {name}"); + let uid = { + // new scope cuz paranoid bout deadlocks + let conn = Conn { addr, tx: tx.clone() }; + room.write().await.players.insert_conn(conn, name, clr).await + }; + tx.send(Message::text(format!("id {uid}"))).expect("couldn't send register ack"); + if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { + println!("couldn't request game dump in behalf of {addr}: {e}"); + } + registered = true; } } + }, + Err(e) => { + println!("error reading socket {addr}: {e}"); } } - }; + } + + let drive_game = handle_room(incoming, addr, room.clone()); let send_to_client = async move { while let Some(m) = rx.recv().await { if let Err(e) = outgoing.send(m).await { println!("something went bad lol: {e}"); - }; + } } }; - futures_util::pin_mut!(process_incoming, send_to_client); - futures_util::future::select(process_incoming, send_to_client).await; + tokio::select! { + _ = drive_game => (), + _ = send_to_client => { println!("anomalous close for {addr}"); } + }; + let room_lock = room.read().await; + let mut players = room_lock.players.write().await; + if players.contains_key(&addr) { + players.remove(&addr); + } + for (paddr, p) in players.iter() { + if let Err(e) = p.conn.tx.send(Message::text("logoff {p.seqid} {p.name}")) { + println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); + } + } println!("{addr} disconnected"); - peers.write().await.remove(&addr); +} + +pub async fn handle_room(mut incoming: SplitStream, addr: SocketAddr, room: Arc>) { + let (players, cmd_tx) = { + let room = room.read().await; + (room.players.clone(), room.cmd_stream.clone()) + }; + while let Ok(cmd) = incoming.try_next().await { + if let Some(cmd) = cmd { + // if it ain't text we can't handle it + if !cmd.is_text() { continue; } + let cmd = cmd.to_str().to_owned().unwrap(); + + let mut fields = cmd.split(" "); + let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { + let x = fields.next().and_then(|xstr| xstr.parse::().ok()); + let y = fields.next().and_then(|ystr| ystr.parse::().ok()); + x.zip(y) + }; + if let Some(cmd_name) = fields.next() { + use crate::minesweeper::{Move,MoveType}; + let me_lock = players.read().await; + let me = me_lock.get(&addr).expect("player not found"); + match cmd_name { + "pos" => { + match parse_pos(fields) { + Some(pos) => { + drop(me); + drop(me_lock); // So the read lock isn't held + let (name, clr, uid) = { + let mut players = players.write().await; + let mut entry = players.get_mut(&addr).unwrap(); + entry.position = pos.clone(); + (entry.name.clone(), entry.clr.clone(), entry.uid) + }; + let sanitized_name = name.replace(" ", " ").to_string(); + { + let players = players.read().await; + for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { + let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1))); + if let Err(e) = r { + println!("error sending pos update: {e}"); + } + } + } + }, + None => { + println!("bad position update from {}", me); + }, + } + }, + "reveal" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from {me}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { + println!("couldn't process {me}'s reveal command: {e}"); + }; + }, + None => { + println!("bad reveal from {me}"); + } + } + }, + "flag" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from {me}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { + println!("couldn't process {me}'s flag command: {e}"); + }; + }, + None => { + println!("bad flag from {me}"); + } + } + }, + "reset" => { + println!("{cmd} from {me}"); + if let Err(e) = cmd_tx.send(MetaMove::Reset) { + println!("couldn't request game dump in behalf of {me}: {e}"); + } + }, + e => println!("unknown command {e:?} from {me}, \"{cmd}\""), + } + } + } else { + println!("reached end of stream for {addr}"); + break; + } + } } diff --git a/src/main.rs b/src/main.rs index fe55ab2..0a17346 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use std::{ - collections::HashMap, error::Error, net::SocketAddr, + sync::Arc, }; use warp::Filter; @@ -22,43 +22,47 @@ fn main() -> Result<(), Box> { socket_addr: ([0,0,0,0],31235).into(), }; - let state = State { - conf, - peers: PeerMap::new(RwLock::new(HashMap::new())), - }; - - tokio_main(state) + tokio_main(conf) } #[tokio::main] -async fn tokio_main(state: State) -> Result<(), Box> { - // Start the temporary single lobby - let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); - let _game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); - - let cmd_filter = warp::any().map(move || cmd_tx.clone()); +async fn tokio_main(conf: Config) -> Result<(), Box> { + // Start the temporary single room + let room = Arc::new(RwLock::new({ + let name = "Testing room".to_string(); + let players = PlayerMap::default(); + let bconf = BoardConf { w: 75, h: 35, mine_ratio: (1, 8) }; + let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), bconf)); + Room { + name, + players, + peer_limit: 32, + board_conf: bconf, + cmd_stream: cmd_tx, + driver: handle, + } + })); let page_route = { use warp::*; get() .and(path("font.ttf")) .map(|| FONT_FILE) - .or(fs::file(state.conf.page_path.clone())) + .or(fs::file(conf.page_path.clone())) }; let websocket_route = { - let state = state.clone(); + let room = room.clone(); use warp::*; path("ws") .and(ws()) .and(addr::remote()) - .and(cmd_filter) - .map(move |ws: warp::ws::Ws, saddr: Option, cmd_tx: CmdTx| { - let state = state.clone(); + .map(move |ws: warp::ws::Ws, saddr: Option| { + let room = room.clone(); println!("conn from {saddr:?}"); ws.on_upgrade(move |socket| { - let conn_data = types::ConnData { remote_addr: saddr.unwrap(), cmd_tx: cmd_tx.clone(), peers: state.peers.clone() }; - conn::peer_connection(socket, conn_data) + conn::lobby(socket, saddr.expect("socket without address"), room.clone()) }) }) }; @@ -66,18 +70,19 @@ async fn tokio_main(state: State) -> Result<(), Box> { let server = warp::serve(routes) .tls() - .cert_path(state.conf.cert_path) - .key_path(state.conf.pkey_path) - .run(state.conf.socket_addr); - println!("Serving on {}", state.conf.socket_addr); + .cert_path(conf.cert_path) + .key_path(conf.pkey_path) + .run(conf.socket_addr); + println!("Serving on {}", conf.socket_addr); server.await; Ok(()) } // If a move is made, broadcast new board, else just send current board -async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver, peers: PeerMap) { +async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver, players: PlayerMapData, bconf: BoardConf) { use minesweeper::*; - let mut game = Game::new(Board::new(75,35), (75*35)/8); + let mine_cnt = (bconf.w * bconf.h * bconf.mine_ratio.0)/(bconf.mine_ratio.1); + let mut game = Game::new(Board::new(bconf.w, bconf.h), mine_cnt); let mut latest_player_name = None; while let Some(req) = move_rx.recv().await { let done = game.phase == Phase::Die || game.phase == Phase::Win; @@ -87,10 +92,10 @@ async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver, p if game.phase == Phase::Win || game.phase == Phase::Die { game.board = game.board.grade(); } - latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone()); + latest_player_name = players.read().await.get(&o).map(|p| p.name.clone()); }, MetaMove::Dump => (), - MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, + MetaMove::Reset => { game = Game::new(Board::new(bconf.w, bconf.h), mine_cnt); }, } use warp::ws::Message; let mut reply = vec![Message::binary(game.board.render())]; @@ -101,10 +106,10 @@ async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver, p _ => (), } { - let peers = peers.read().await; + let peers = players.read().await; for (addr, p) in peers.iter() { for r in reply.iter() { - if let Err(e) = p.tx.send(r.clone()) { + if let Err(e) = p.conn.tx.send(r.clone()) { println!("couldn't send game update {r:?} to {addr}: {e}"); } } diff --git a/src/types.rs b/src/types.rs index 54a93bc..86f1e32 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,7 +1,12 @@ use std::{ collections::HashMap, net::SocketAddr, - sync::Arc, + sync::{ + Arc, + atomic::{ AtomicUsize, Ordering }, + }, + fmt::Display, + ops::{ Deref, DerefMut }, }; use warp::ws::Message; use tokio::sync::RwLock; @@ -15,10 +20,28 @@ pub struct Config { pub socket_addr: SocketAddr, } -#[derive(Debug, Clone)] -pub struct State { - pub conf: Config, - pub peers: PeerMap, +#[derive(Debug, Clone, Copy)] +pub struct BoardConf { + pub w: usize, + pub h: usize, + /// tiles to mines, expressed as (numerator, denominator) + pub mine_ratio: (usize,usize), +} + +impl Display for BoardConf { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}x{} {}/{}", self.w, self.h, self.mine_ratio.0, self.mine_ratio.1) + } +} + +#[derive(Debug)] +pub struct Room { + pub name: String, + pub players: PlayerMap, + pub peer_limit: u32, + pub driver: tokio::task::JoinHandle<()>, + pub cmd_stream: CmdTx, + pub board_conf: BoardConf, } #[derive(Debug)] @@ -29,19 +52,60 @@ pub enum MetaMove { } #[derive(Debug)] -pub struct Peer { +pub struct Conn { pub tx: tokio::sync::mpsc::UnboundedSender, - pub seq_id: usize, + pub addr: SocketAddr, +} + +#[derive(Debug)] +pub struct Player { + pub conn: Conn, + pub uid: usize, pub name: String, pub clr: String, pub position: (usize, usize), } -pub struct ConnData { - pub cmd_tx: CmdTx, - pub remote_addr: SocketAddr, - pub peers: PeerMap, +impl Display for Player { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "\"{}\"@{}", self.name, self.conn.addr) + } } pub type CmdTx = tokio::sync::mpsc::UnboundedSender; -pub type PeerMap = Arc>>; +pub type PlayerMapData = Arc>>; +#[derive(Debug)] +pub struct PlayerMap { + inner: PlayerMapData, + uid_counter: AtomicUsize, +} + +impl Deref for PlayerMap { + type Target = Arc>>; + fn deref(&self) -> &Self::Target { + &self.inner + } +} +impl DerefMut for PlayerMap { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} +impl Default for PlayerMap { + fn default() -> Self { + Self { inner: Arc::new(RwLock::new(HashMap::new())), uid_counter: 0.into() } + } +} + +impl PlayerMap { + pub async fn insert_conn(&mut self, conn: Conn, name: String, clr: String) -> usize { + let mut map = self.write().await; + let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed); + map.insert( + conn.addr, + Player { conn, uid, name, clr, position: (0,0) }, + ); + uid + } +} + -- cgit v1.2.3