diff options
Diffstat (limited to 'src/conn.rs')
-rw-r--r-- | src/conn.rs | 65 |
1 files changed, 50 insertions, 15 deletions
diff --git a/src/conn.rs b/src/conn.rs index ed4192c..f67121b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -4,8 +4,10 @@ use std::{ net::SocketAddr, }; use tokio::sync::RwLock; +use tokio::sync::mpsc as tokio_mpsc; use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; use warp::ws::{ WebSocket, Message }; +use crate::livepos; use ammonia; const MAX_IN: usize = 2048; @@ -46,6 +48,9 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc let room_lock = room.read().await; let mut players = room_lock.players.write().await; if let Some(disconn_p) = players.remove(&addr) { + if let Err(e) = room_lock.pos_stream.send(livepos::Req { id: disconn_p.uid, data: livepos::ReqData::Quit }) { + println!("{room_id} E: couldn't send removal request for {disconn_p} from the live position system: {e}"); + } for p in players.values() { if let Err(e) = p.conn.tx.send(Message::text(format!("logoff {}", disconn_p.uid))) { println!("{room_id} E: couldn't deliver logoff info to {}: {}", p, e); @@ -57,12 +62,14 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc } } -pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::UnboundedSender<Message>), addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { +type RoomStreams = (SplitStream<WebSocket>,tokio_mpsc::UnboundedSender<Message>); + +pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { let (mut incoming, tx) = streams; let (room_id, room) = rinfo; - let (players, cmd_tx, room_conf) = { + let (players, cmd_tx, pos_tx, room_conf) = { let room = room.read().await; - (room.players.clone(), room.cmd_stream.clone(), room.conf.clone()) + (room.players.clone(), room.cmd_stream.clone(), room.pos_stream.clone(), room.conf.clone()) }; while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { @@ -89,18 +96,22 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb "pos" => { match parse_pos(fields) { Some(pos) => { - me.position = pos.clone(); - let sanitized_name = me.name.replace(" ", " ").to_string(); - let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); - for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { - let r = peer_tx.send(Message::text(&msg)); - if let Err(e) = r { - println!("{room_id} E: error sending pos update: {e}"); - } - } + if let Err(e) = pos_tx.send(livepos::Req { id: me.uid, data: livepos::ReqData::Pos(pos) }) { + println!("{room_id} E: couldn't process {me}'s position update: {e}"); + }; + // me.position = pos.clone(); + // let sanitized_name = me.name.replace(" ", " ").to_string(); + // let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); + // for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { + // let r = peer_tx.send(Message::text(&msg)); + // if let Err(e) = r { + // println!("{room_id} E: error sending pos update: {e}"); + // } + // } }, None => { - println!("{room_id} E: bad position update from {me}"); + // Too spammy + //println!("{room_id} E: bad position update from {me}"); }, } }, @@ -154,11 +165,27 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb let conn = Conn { addr, tx: tx.clone() }; room.write().await.players.insert_conn(conn, name.clone(), clr).await }; + let players_lock = players.read().await; + let me = players_lock.get(&addr).unwrap(); tx.send(Message::text(format!("regack {} {} {} {}", room_conf.name.replace(" ", " "), name.replace(" ", " "), uid, room_conf.board_conf)) ).expect("couldn't send register ack"); - if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { - println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}"); + + { + let msg = Message::text(format!("players {}", + jsonenc_players(players_lock.values()) + .expect("couldn't JSONify players"))); + for p in players_lock.values() { + if let Err(e) = p.conn.tx.send(msg.clone()) { + println!("{room_id} E: couldn't dump players for {me}: {e}"); + } + } + } + if let Err(e) = pos_tx.send(livepos::Req { id: uid, data: livepos::ReqData::StateDump }) { + println!("{room_id} E: couldn't request position dump for {me}: {e}"); + } + if let Err(e) = cmd_tx.send(MetaMove::Dump) { + println!("{room_id} E: couldn't request game dump for {me}: {e}"); } } } @@ -170,3 +197,11 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb } } } + +fn jsonenc_players<'a, I: IntoIterator<Item=&'a Player>>(players: I) -> Result<String, serde_json::Error> { + let mut pairs = Vec::new(); + for player in players { + pairs.push((player.uid, player.name.replace(" ", " "), player.clr.clone())); + } + serde_json::to_string(&pairs) +} |