summaryrefslogtreecommitdiff
path: root/src/conn.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/conn.rs')
-rw-r--r--src/conn.rs38
1 files changed, 16 insertions, 22 deletions
diff --git a/src/conn.rs b/src/conn.rs
index 7adeeae..047d7db 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -4,19 +4,15 @@ use std::{
net::SocketAddr,
};
use tokio::sync::RwLock;
-use tokio::sync::mpsc as tokio_mpsc;
-use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream};
+use futures::{SinkExt, TryStreamExt, StreamExt, stream::SplitStream};
use warp::ws::{ WebSocket, Message };
use crate::livepos;
-const MAX_IN: usize = 2048;
-
-pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLock<Room>>)) {
+pub async fn setup_conn(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLock<Room>>), max_in: usize) {
let (room_id, room) = rinfo;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
-
- // server <-> client comms
let (mut outgoing, incoming) = socket.split();
+ let conn = Conn { addr, tx };
println!("{room_id} I: Incoming TCP connection from: {}", addr);
@@ -24,10 +20,10 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc
let rl = room.read().await;
let pcap = rl.conf.player_cap;
let pl = rl.players.read().await;
- pl.len() >= pcap
+ pl.len() >= pcap.get()
};
if full { return }
- let drive_game = handle_room((incoming,tx), addr, (room_id.clone(),room.clone()));
+ let drive_game = drive_conn((conn, incoming), (room_id.clone(),room.clone()), max_in);
let send_to_client = {
let room_id = room_id.clone();
async move {
@@ -61,10 +57,9 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc
}
}
-type RoomStreams = (SplitStream<WebSocket>,tokio_mpsc::UnboundedSender<Message>);
-pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) {
- let (mut incoming, tx) = streams;
+pub async fn drive_conn(conn: (Conn, SplitStream<WebSocket>), rinfo: (RoomId, Arc<RwLock<Room>>), max_in: usize) {
+ let (conn, mut incoming) = conn;
let (room_id, room) = rinfo;
let (players, cmd_tx, pos_tx, room_conf) = {
let room = room.read().await;
@@ -74,7 +69,7 @@ pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId,
if let Some(cmd) = cmd {
// if it ain't text we can't handle it
let cmd = match cmd.to_str() {
- Ok(cmd) => { if cmd.len() > MAX_IN {
+ Ok(cmd) => { if cmd.len() > max_in {
println!("{room_id} E: string too big: {cmd}");
return
} else { cmd.to_owned() } },
@@ -90,7 +85,7 @@ pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId,
if let Some(cmd_name) = fields.next() {
use crate::minesweeper::{Move,MoveType};
let mut players_lock = players.write().await;
- match players_lock.get_mut(&addr) {
+ match players_lock.get_mut(&conn.addr) {
Some(me) => match cmd_name {
"pos" => {
if let Some(pos) = parse_pos(fields) {
@@ -102,7 +97,7 @@ pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId,
"reveal" => {
match parse_pos(fields) {
Some(pos) => {
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, conn.addr)) {
println!("{room_id} E: couldn't process {me}'s reveal command: {e}");
};
},
@@ -114,7 +109,7 @@ pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId,
"flag" => {
match parse_pos(fields) {
Some(pos) => {
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, conn.addr)) {
println!("{room_id} E: couldn't process {me}'s flag command: {e}");
};
},
@@ -142,16 +137,15 @@ pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId,
if n.is_empty() { def } else { n }
}
};
- println!("{room_id} I: registered \"{name}@{addr}\"");
+ println!("{room_id} I: registered \"{name}@{}\"", conn.addr);
drop(players_lock);
let uid = {
// new scope cuz paranoid bout deadlocks
- let conn = Conn { addr, tx: tx.clone() };
- room.write().await.players.write().await.insert_conn(conn, name.clone(), clr)
+ room.write().await.players.write().await.insert_conn(conn.clone(), name.clone(), clr)
};
let players_lock = players.read().await;
- let me = players_lock.get(&addr).unwrap();
- tx.send(Message::text(format!("regack {} {} {} {}",
+ let me = players_lock.get(&conn.addr).unwrap();
+ conn.tx.send(Message::text(format!("regack {} {} {} {}",
room_conf.name.replace(' ', "&nbsp;"), name.replace(' ', "&nbsp;"), uid, room_conf.board_conf))
).expect("couldn't send register ack");
@@ -176,7 +170,7 @@ pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId,
}
}
} else {
- println!("{room_id} E: reached end of stream for {addr}");
+ println!("{room_id} E: reached end of stream for {}", conn.addr);
break;
}
}