summaryrefslogtreecommitdiff
path: root/src/conn.rs
diff options
context:
space:
mode:
authorstale <redkugelblitzin@gmail.com>2022-05-13 17:49:36 -0300
committerstale <redkugelblitzin@gmail.com>2022-05-13 17:49:36 -0300
commit0f010aca2957468c46cd849284873cb26f3a0df3 (patch)
treefdc4da96268661ca3831291017339cc842c485a1 /src/conn.rs
parentcedcb4049a5dd690b9617d34b99f25feb0095c7a (diff)
laying the groundwork for rooms
Diffstat (limited to 'src/conn.rs')
-rw-r--r--src/conn.rs286
1 files changed, 155 insertions, 131 deletions
diff --git a/src/conn.rs b/src/conn.rs
index de0f704..5325bf4 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -1,151 +1,175 @@
use crate::types::*;
-use futures_util::{SinkExt, StreamExt, TryStreamExt};
-use warp::ws::Message;
+use std::{
+ sync::Arc,
+ net::SocketAddr,
+};
+use tokio::sync::RwLock;
+use futures_util::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream};
+use warp::ws::{ WebSocket, Message };
-pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) {
- let addr = conndata.remote_addr;
- let peers = &conndata.peers;
- let cmd_tx = conndata.cmd_tx.clone();
- println!("Incoming TCP connection from: {}", conndata.remote_addr);
-
- let mut peer_name = "unknown".to_string();
-
- // for game -> conn comms
+pub async fn lobby(socket: WebSocket, addr: SocketAddr, room: Arc<RwLock<Room>>) {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- // for server -> client comms
+
+ // server <-> client comms
let (mut outgoing, mut incoming) = socket.split();
- let process_incoming = async {
- while let Ok(cmd) = incoming.try_next().await {
- if let Some(cmd) = cmd {
- if cmd.is_close() {
- println!("closing \"{peer_name}\"@{addr}");
- let mut peers = peers.write().await;
- if let Some(_) = peers.get(&addr) {
- peers.remove(&addr);
- }
- for (paddr, p) in peers.iter() {
- if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) {
- println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
- }
- }
- break;
- }
- // if it ain't text we can't handle it
- if !cmd.is_text() { continue; }
- let cmd = cmd.to_str().to_owned().unwrap();
+ println!("Incoming TCP connection from: {}", addr);
- let mut fields = cmd.split(" ");
- let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
- let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
- let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
- x.zip(y)
- };
- if let Some(cmd_name) = fields.next() {
- use crate::minesweeper::{Move,MoveType};
- match cmd_name {
- "pos" => {
- match parse_pos(fields) {
- Some(pos) => {
- let (name, clr, id) = {
- let mut peers = peers.write().await;
- let mut entry = peers.get_mut(&addr).unwrap();
- entry.position = pos.clone();
- (entry.name.clone(), entry.clr.clone(), entry.seq_id)
- };
- let sanitized_name = name.replace(" ", "&nbsp").to_string();
- {
- let peers = peers.read().await;
- for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
- let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {} {}", clr, pos.0, pos.1)));
- if let Err(e) = r {
- println!("error sending pos update: {e}");
- }
- }
- }
- },
- None => {
- println!("bad position update from \"{peer_name}@{addr}\"");
- },
- }
- },
- "reveal" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
- println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
- };
- },
- None => {
- println!("bad reveal from \"{peer_name}\"@{addr}");
- }
- }
- },
- "flag" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
- println!("couldn't process \"{peer_name}\"'s flag command: {e}");
- };
- },
- None => {
- println!("bad flag from \"{peer_name}\"@{addr}");
- }
- }
- },
- "reset" => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Reset) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- "register" => {
- let all_fields = fields.collect::<Vec<&str>>();
- let fcount = all_fields.len();
- peer_name = "anon".into();
- let id;
- if fcount >= 2 {
- peer_name = all_fields[..fcount-1].join("&nbsp");
- }
- let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>();
- if clr.is_empty() {
- clr = "#f03333".into();
- }
- { // new scope cuz paranoid bout deadlocks
- let mut peers = peers.write().await;
- id = peers.len();
- peers.insert(addr, Peer {
- tx: tx.clone(),
- seq_id: id,
- name: peer_name.clone(),
- clr,
- position: (0,0)
- });
- }
- tx.send(Message::text(format!("id {}", id))).unwrap();
- if let Err(e) = cmd_tx.send(MetaMove::Dump) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
+ let mut registered = false;
+ while !registered {
+ match incoming.try_next().await {
+ Ok(cmd) => {
+ if let Some(cmd) = cmd {
+ // if it ain't text we can't handle it
+ if !cmd.is_text() { return; }
+ let cmd = cmd.to_str().to_owned().unwrap();
+ let mut fields = cmd.split(" ");
+
+ if fields.next() == Some("register") {
+ let all_fields = fields.collect::<Vec<&str>>();
+ let fcount = all_fields.len();
+ let mut name = "anon".to_string();
+ if fcount >= 2 {
+ name = all_fields[..fcount-1].join("&nbsp");
+ }
+ let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>();
+ if clr.is_empty() {
+ clr = "#f03333".into();
+ }
+ println!("registered {name}");
+ let uid = {
+ // new scope cuz paranoid bout deadlocks
+ let conn = Conn { addr, tx: tx.clone() };
+ room.write().await.players.insert_conn(conn, name, clr).await
+ };
+ tx.send(Message::text(format!("id {uid}"))).expect("couldn't send register ack");
+ if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) {
+ println!("couldn't request game dump in behalf of {addr}: {e}");
+ }
+ registered = true;
}
}
+ },
+ Err(e) => {
+ println!("error reading socket {addr}: {e}");
}
}
- };
+ }
+
+ let drive_game = handle_room(incoming, addr, room.clone());
let send_to_client = async move {
while let Some(m) = rx.recv().await {
if let Err(e) = outgoing.send(m).await {
println!("something went bad lol: {e}");
- };
+ }
}
};
- futures_util::pin_mut!(process_incoming, send_to_client);
- futures_util::future::select(process_incoming, send_to_client).await;
+ tokio::select! {
+ _ = drive_game => (),
+ _ = send_to_client => { println!("anomalous close for {addr}"); }
+ };
+ let room_lock = room.read().await;
+ let mut players = room_lock.players.write().await;
+ if players.contains_key(&addr) {
+ players.remove(&addr);
+ }
+ for (paddr, p) in players.iter() {
+ if let Err(e) = p.conn.tx.send(Message::text("logoff {p.seqid} {p.name}")) {
+ println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
+ }
+ }
println!("{addr} disconnected");
- peers.write().await.remove(&addr);
+}
+
+pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, room: Arc<RwLock<Room>>) {
+ let (players, cmd_tx) = {
+ let room = room.read().await;
+ (room.players.clone(), room.cmd_stream.clone())
+ };
+ while let Ok(cmd) = incoming.try_next().await {
+ if let Some(cmd) = cmd {
+ // if it ain't text we can't handle it
+ if !cmd.is_text() { continue; }
+ let cmd = cmd.to_str().to_owned().unwrap();
+
+ let mut fields = cmd.split(" ");
+ let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
+ let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
+ let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
+ x.zip(y)
+ };
+ if let Some(cmd_name) = fields.next() {
+ use crate::minesweeper::{Move,MoveType};
+ let me_lock = players.read().await;
+ let me = me_lock.get(&addr).expect("player not found");
+ match cmd_name {
+ "pos" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ drop(me);
+ drop(me_lock); // So the read lock isn't held
+ let (name, clr, uid) = {
+ let mut players = players.write().await;
+ let mut entry = players.get_mut(&addr).unwrap();
+ entry.position = pos.clone();
+ (entry.name.clone(), entry.clr.clone(), entry.uid)
+ };
+ let sanitized_name = name.replace(" ", "&nbsp").to_string();
+ {
+ let players = players.read().await;
+ for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) {
+ let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1)));
+ if let Err(e) = r {
+ println!("error sending pos update: {e}");
+ }
+ }
+ }
+ },
+ None => {
+ println!("bad position update from {}", me);
+ },
+ }
+ },
+ "reveal" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from {me}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
+ println!("couldn't process {me}'s reveal command: {e}");
+ };
+ },
+ None => {
+ println!("bad reveal from {me}");
+ }
+ }
+ },
+ "flag" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from {me}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
+ println!("couldn't process {me}'s flag command: {e}");
+ };
+ },
+ None => {
+ println!("bad flag from {me}");
+ }
+ }
+ },
+ "reset" => {
+ println!("{cmd} from {me}");
+ if let Err(e) = cmd_tx.send(MetaMove::Reset) {
+ println!("couldn't request game dump in behalf of {me}: {e}");
+ }
+ },
+ e => println!("unknown command {e:?} from {me}, \"{cmd}\""),
+ }
+ }
+ } else {
+ println!("reached end of stream for {addr}");
+ break;
+ }
+ }
}