use crate::types::*; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use warp::ws::Message; pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) { let addr = conndata.remote_addr; let peers = &conndata.peers; let cmd_tx = conndata.cmd_tx.clone(); println!("Incoming TCP connection from: {}", conndata.remote_addr); let mut peer_name = "unknown".to_string(); // for game -> conn comms let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); // for server -> client comms let (mut outgoing, mut incoming) = socket.split(); let process_incoming = async { while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { if cmd.is_close() { println!("closing \"{peer_name}\"@{addr}"); let mut peers = peers.write().await; if let Some(_) = peers.get(&addr) { peers.remove(&addr); } for (paddr, p) in peers.iter() { if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) { println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); } } break; } // if it ain't text we can't handle it if !cmd.is_text() { continue; } let cmd = cmd.to_str().to_owned().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { let x = fields.next().and_then(|xstr| xstr.parse::().ok()); let y = fields.next().and_then(|ystr| ystr.parse::().ok()); x.zip(y) }; if let Some(cmd_name) = fields.next() { use crate::minesweeper::{Move,MoveType}; match cmd_name { "pos" => { match parse_pos(fields) { Some(pos) => { let (name, id) = { let mut peers = peers.write().await; let mut entry = peers.get_mut(&addr).unwrap(); entry.position = pos.clone(); (entry.name.clone(), entry.seq_id) }; let sanitized_name = name.replace(" ", " ").to_string(); { let peers = peers.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } } } }, None => { println!("bad position update from \"{peer_name}@{addr}\""); }, } }, "reveal" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); }; }, None => { println!("bad reveal from \"{peer_name}\"@{addr}"); } } }, "flag" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { println!("couldn't process \"{peer_name}\"'s flag command: {e}"); }; }, None => { println!("bad flag from \"{peer_name}\"@{addr}"); } } }, "reset" => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Reset) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, "register" => { let name = fields.collect::>().join(&" "); let id; if name.is_empty() { peer_name = "anon".to_string(); } else { peer_name = name; } { // new scope cuz paranoid bout deadlocks let mut peers = peers.write().await; id = peers.len(); peers.insert(addr, Peer { tx: tx.clone(), seq_id: id, name: peer_name.clone(), position: (0,0) }); } tx.send(Message::text(format!("id {id}"))).unwrap(); if let Err(e) = cmd_tx.send(MetaMove::Dump) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), } } } } }; let send_to_client = async move { while let Some(m) = rx.recv().await { if let Err(e) = outgoing.send(m).await { println!("something went bad lol: {e}"); }; } }; futures_util::pin_mut!(process_incoming, send_to_client); futures_util::future::select(process_incoming, send_to_client).await; println!("{addr} disconnected"); peers.write().await.remove(&addr); }