summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/conn.rs286
-rw-r--r--src/main.rs65
-rw-r--r--src/types.rs88
3 files changed, 266 insertions, 173 deletions
diff --git a/src/conn.rs b/src/conn.rs
index de0f704..5325bf4 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -1,151 +1,175 @@
use crate::types::*;
-use futures_util::{SinkExt, StreamExt, TryStreamExt};
-use warp::ws::Message;
+use std::{
+ sync::Arc,
+ net::SocketAddr,
+};
+use tokio::sync::RwLock;
+use futures_util::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream};
+use warp::ws::{ WebSocket, Message };
-pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) {
- let addr = conndata.remote_addr;
- let peers = &conndata.peers;
- let cmd_tx = conndata.cmd_tx.clone();
- println!("Incoming TCP connection from: {}", conndata.remote_addr);
-
- let mut peer_name = "unknown".to_string();
-
- // for game -> conn comms
+pub async fn lobby(socket: WebSocket, addr: SocketAddr, room: Arc<RwLock<Room>>) {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- // for server -> client comms
+
+ // server <-> client comms
let (mut outgoing, mut incoming) = socket.split();
- let process_incoming = async {
- while let Ok(cmd) = incoming.try_next().await {
- if let Some(cmd) = cmd {
- if cmd.is_close() {
- println!("closing \"{peer_name}\"@{addr}");
- let mut peers = peers.write().await;
- if let Some(_) = peers.get(&addr) {
- peers.remove(&addr);
- }
- for (paddr, p) in peers.iter() {
- if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) {
- println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
- }
- }
- break;
- }
- // if it ain't text we can't handle it
- if !cmd.is_text() { continue; }
- let cmd = cmd.to_str().to_owned().unwrap();
+ println!("Incoming TCP connection from: {}", addr);
- let mut fields = cmd.split(" ");
- let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
- let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
- let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
- x.zip(y)
- };
- if let Some(cmd_name) = fields.next() {
- use crate::minesweeper::{Move,MoveType};
- match cmd_name {
- "pos" => {
- match parse_pos(fields) {
- Some(pos) => {
- let (name, clr, id) = {
- let mut peers = peers.write().await;
- let mut entry = peers.get_mut(&addr).unwrap();
- entry.position = pos.clone();
- (entry.name.clone(), entry.clr.clone(), entry.seq_id)
- };
- let sanitized_name = name.replace(" ", "&nbsp").to_string();
- {
- let peers = peers.read().await;
- for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
- let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {} {}", clr, pos.0, pos.1)));
- if let Err(e) = r {
- println!("error sending pos update: {e}");
- }
- }
- }
- },
- None => {
- println!("bad position update from \"{peer_name}@{addr}\"");
- },
- }
- },
- "reveal" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
- println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
- };
- },
- None => {
- println!("bad reveal from \"{peer_name}\"@{addr}");
- }
- }
- },
- "flag" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
- println!("couldn't process \"{peer_name}\"'s flag command: {e}");
- };
- },
- None => {
- println!("bad flag from \"{peer_name}\"@{addr}");
- }
- }
- },
- "reset" => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Reset) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- "register" => {
- let all_fields = fields.collect::<Vec<&str>>();
- let fcount = all_fields.len();
- peer_name = "anon".into();
- let id;
- if fcount >= 2 {
- peer_name = all_fields[..fcount-1].join("&nbsp");
- }
- let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>();
- if clr.is_empty() {
- clr = "#f03333".into();
- }
- { // new scope cuz paranoid bout deadlocks
- let mut peers = peers.write().await;
- id = peers.len();
- peers.insert(addr, Peer {
- tx: tx.clone(),
- seq_id: id,
- name: peer_name.clone(),
- clr,
- position: (0,0)
- });
- }
- tx.send(Message::text(format!("id {}", id))).unwrap();
- if let Err(e) = cmd_tx.send(MetaMove::Dump) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
+ let mut registered = false;
+ while !registered {
+ match incoming.try_next().await {
+ Ok(cmd) => {
+ if let Some(cmd) = cmd {
+ // if it ain't text we can't handle it
+ if !cmd.is_text() { return; }
+ let cmd = cmd.to_str().to_owned().unwrap();
+ let mut fields = cmd.split(" ");
+
+ if fields.next() == Some("register") {
+ let all_fields = fields.collect::<Vec<&str>>();
+ let fcount = all_fields.len();
+ let mut name = "anon".to_string();
+ if fcount >= 2 {
+ name = all_fields[..fcount-1].join("&nbsp");
+ }
+ let mut clr = all_fields[fcount-1].chars().filter(|c| c.is_digit(16) || *c == '#').collect::<String>();
+ if clr.is_empty() {
+ clr = "#f03333".into();
+ }
+ println!("registered {name}");
+ let uid = {
+ // new scope cuz paranoid bout deadlocks
+ let conn = Conn { addr, tx: tx.clone() };
+ room.write().await.players.insert_conn(conn, name, clr).await
+ };
+ tx.send(Message::text(format!("id {uid}"))).expect("couldn't send register ack");
+ if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) {
+ println!("couldn't request game dump in behalf of {addr}: {e}");
+ }
+ registered = true;
}
}
+ },
+ Err(e) => {
+ println!("error reading socket {addr}: {e}");
}
}
- };
+ }
+
+ let drive_game = handle_room(incoming, addr, room.clone());
let send_to_client = async move {
while let Some(m) = rx.recv().await {
if let Err(e) = outgoing.send(m).await {
println!("something went bad lol: {e}");
- };
+ }
}
};
- futures_util::pin_mut!(process_incoming, send_to_client);
- futures_util::future::select(process_incoming, send_to_client).await;
+ tokio::select! {
+ _ = drive_game => (),
+ _ = send_to_client => { println!("anomalous close for {addr}"); }
+ };
+ let room_lock = room.read().await;
+ let mut players = room_lock.players.write().await;
+ if players.contains_key(&addr) {
+ players.remove(&addr);
+ }
+ for (paddr, p) in players.iter() {
+ if let Err(e) = p.conn.tx.send(Message::text("logoff {p.seqid} {p.name}")) {
+ println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
+ }
+ }
println!("{addr} disconnected");
- peers.write().await.remove(&addr);
+}
+
+pub async fn handle_room(mut incoming: SplitStream<WebSocket>, addr: SocketAddr, room: Arc<RwLock<Room>>) {
+ let (players, cmd_tx) = {
+ let room = room.read().await;
+ (room.players.clone(), room.cmd_stream.clone())
+ };
+ while let Ok(cmd) = incoming.try_next().await {
+ if let Some(cmd) = cmd {
+ // if it ain't text we can't handle it
+ if !cmd.is_text() { continue; }
+ let cmd = cmd.to_str().to_owned().unwrap();
+
+ let mut fields = cmd.split(" ");
+ let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
+ let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
+ let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
+ x.zip(y)
+ };
+ if let Some(cmd_name) = fields.next() {
+ use crate::minesweeper::{Move,MoveType};
+ let me_lock = players.read().await;
+ let me = me_lock.get(&addr).expect("player not found");
+ match cmd_name {
+ "pos" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ drop(me);
+ drop(me_lock); // So the read lock isn't held
+ let (name, clr, uid) = {
+ let mut players = players.write().await;
+ let mut entry = players.get_mut(&addr).unwrap();
+ entry.position = pos.clone();
+ (entry.name.clone(), entry.clr.clone(), entry.uid)
+ };
+ let sanitized_name = name.replace(" ", "&nbsp").to_string();
+ {
+ let players = players.read().await;
+ for peer_tx in players.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) {
+ let r = peer_tx.send(Message::text(format!("pos {uid} {sanitized_name} {} {} {}", clr, pos.0, pos.1)));
+ if let Err(e) = r {
+ println!("error sending pos update: {e}");
+ }
+ }
+ }
+ },
+ None => {
+ println!("bad position update from {}", me);
+ },
+ }
+ },
+ "reveal" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from {me}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
+ println!("couldn't process {me}'s reveal command: {e}");
+ };
+ },
+ None => {
+ println!("bad reveal from {me}");
+ }
+ }
+ },
+ "flag" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from {me}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
+ println!("couldn't process {me}'s flag command: {e}");
+ };
+ },
+ None => {
+ println!("bad flag from {me}");
+ }
+ }
+ },
+ "reset" => {
+ println!("{cmd} from {me}");
+ if let Err(e) = cmd_tx.send(MetaMove::Reset) {
+ println!("couldn't request game dump in behalf of {me}: {e}");
+ }
+ },
+ e => println!("unknown command {e:?} from {me}, \"{cmd}\""),
+ }
+ }
+ } else {
+ println!("reached end of stream for {addr}");
+ break;
+ }
+ }
}
diff --git a/src/main.rs b/src/main.rs
index fe55ab2..0a17346 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,7 +1,7 @@
use std::{
- collections::HashMap,
error::Error,
net::SocketAddr,
+ sync::Arc,
};
use warp::Filter;
@@ -22,43 +22,47 @@ fn main() -> Result<(), Box<dyn Error>> {
socket_addr: ([0,0,0,0],31235).into(),
};
- let state = State {
- conf,
- peers: PeerMap::new(RwLock::new(HashMap::new())),
- };
-
- tokio_main(state)
+ tokio_main(conf)
}
#[tokio::main]
-async fn tokio_main(state: State) -> Result<(), Box<dyn Error>> {
- // Start the temporary single lobby
- let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
- let _game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone()));
-
- let cmd_filter = warp::any().map(move || cmd_tx.clone());
+async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {
+ // Start the temporary single room
+ let room = Arc::new(RwLock::new({
+ let name = "Testing room".to_string();
+ let players = PlayerMap::default();
+ let bconf = BoardConf { w: 75, h: 35, mine_ratio: (1, 8) };
+ let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
+ let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), bconf));
+ Room {
+ name,
+ players,
+ peer_limit: 32,
+ board_conf: bconf,
+ cmd_stream: cmd_tx,
+ driver: handle,
+ }
+ }));
let page_route = {
use warp::*;
get()
.and(path("font.ttf"))
.map(|| FONT_FILE)
- .or(fs::file(state.conf.page_path.clone()))
+ .or(fs::file(conf.page_path.clone()))
};
let websocket_route = {
- let state = state.clone();
+ let room = room.clone();
use warp::*;
path("ws")
.and(ws())
.and(addr::remote())
- .and(cmd_filter)
- .map(move |ws: warp::ws::Ws, saddr: Option<SocketAddr>, cmd_tx: CmdTx| {
- let state = state.clone();
+ .map(move |ws: warp::ws::Ws, saddr: Option<SocketAddr>| {
+ let room = room.clone();
println!("conn from {saddr:?}");
ws.on_upgrade(move |socket| {
- let conn_data = types::ConnData { remote_addr: saddr.unwrap(), cmd_tx: cmd_tx.clone(), peers: state.peers.clone() };
- conn::peer_connection(socket, conn_data)
+ conn::lobby(socket, saddr.expect("socket without address"), room.clone())
})
})
};
@@ -66,18 +70,19 @@ async fn tokio_main(state: State) -> Result<(), Box<dyn Error>> {
let server = warp::serve(routes)
.tls()
- .cert_path(state.conf.cert_path)
- .key_path(state.conf.pkey_path)
- .run(state.conf.socket_addr);
- println!("Serving on {}", state.conf.socket_addr);
+ .cert_path(conf.cert_path)
+ .key_path(conf.pkey_path)
+ .run(conf.socket_addr);
+ println!("Serving on {}", conf.socket_addr);
server.await;
Ok(())
}
// If a move is made, broadcast new board, else just send current board
-async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
+async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, players: PlayerMapData, bconf: BoardConf) {
use minesweeper::*;
- let mut game = Game::new(Board::new(75,35), (75*35)/8);
+ let mine_cnt = (bconf.w * bconf.h * bconf.mine_ratio.0)/(bconf.mine_ratio.1);
+ let mut game = Game::new(Board::new(bconf.w, bconf.h), mine_cnt);
let mut latest_player_name = None;
while let Some(req) = move_rx.recv().await {
let done = game.phase == Phase::Die || game.phase == Phase::Win;
@@ -87,10 +92,10 @@ async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, p
if game.phase == Phase::Win || game.phase == Phase::Die {
game.board = game.board.grade();
}
- latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone());
+ latest_player_name = players.read().await.get(&o).map(|p| p.name.clone());
},
MetaMove::Dump => (),
- MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); },
+ MetaMove::Reset => { game = Game::new(Board::new(bconf.w, bconf.h), mine_cnt); },
}
use warp::ws::Message;
let mut reply = vec![Message::binary(game.board.render())];
@@ -101,10 +106,10 @@ async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, p
_ => (),
}
{
- let peers = peers.read().await;
+ let peers = players.read().await;
for (addr, p) in peers.iter() {
for r in reply.iter() {
- if let Err(e) = p.tx.send(r.clone()) {
+ if let Err(e) = p.conn.tx.send(r.clone()) {
println!("couldn't send game update {r:?} to {addr}: {e}");
}
}
diff --git a/src/types.rs b/src/types.rs
index 54a93bc..86f1e32 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -1,7 +1,12 @@
use std::{
collections::HashMap,
net::SocketAddr,
- sync::Arc,
+ sync::{
+ Arc,
+ atomic::{ AtomicUsize, Ordering },
+ },
+ fmt::Display,
+ ops::{ Deref, DerefMut },
};
use warp::ws::Message;
use tokio::sync::RwLock;
@@ -15,10 +20,28 @@ pub struct Config {
pub socket_addr: SocketAddr,
}
-#[derive(Debug, Clone)]
-pub struct State {
- pub conf: Config,
- pub peers: PeerMap,
+#[derive(Debug, Clone, Copy)]
+pub struct BoardConf {
+ pub w: usize,
+ pub h: usize,
+ /// tiles to mines, expressed as (numerator, denominator)
+ pub mine_ratio: (usize,usize),
+}
+
+impl Display for BoardConf {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}x{} {}/{}", self.w, self.h, self.mine_ratio.0, self.mine_ratio.1)
+ }
+}
+
+#[derive(Debug)]
+pub struct Room {
+ pub name: String,
+ pub players: PlayerMap,
+ pub peer_limit: u32,
+ pub driver: tokio::task::JoinHandle<()>,
+ pub cmd_stream: CmdTx,
+ pub board_conf: BoardConf,
}
#[derive(Debug)]
@@ -29,19 +52,60 @@ pub enum MetaMove {
}
#[derive(Debug)]
-pub struct Peer {
+pub struct Conn {
pub tx: tokio::sync::mpsc::UnboundedSender<Message>,
- pub seq_id: usize,
+ pub addr: SocketAddr,
+}
+
+#[derive(Debug)]
+pub struct Player {
+ pub conn: Conn,
+ pub uid: usize,
pub name: String,
pub clr: String,
pub position: (usize, usize),
}
-pub struct ConnData {
- pub cmd_tx: CmdTx,
- pub remote_addr: SocketAddr,
- pub peers: PeerMap,
+impl Display for Player {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "\"{}\"@{}", self.name, self.conn.addr)
+ }
}
pub type CmdTx = tokio::sync::mpsc::UnboundedSender<MetaMove>;
-pub type PeerMap = Arc<RwLock<HashMap<SocketAddr, Peer>>>;
+pub type PlayerMapData = Arc<RwLock<HashMap<SocketAddr, Player>>>;
+#[derive(Debug)]
+pub struct PlayerMap {
+ inner: PlayerMapData,
+ uid_counter: AtomicUsize,
+}
+
+impl Deref for PlayerMap {
+ type Target = Arc<RwLock<HashMap<SocketAddr, Player>>>;
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+impl DerefMut for PlayerMap {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.inner
+ }
+}
+impl Default for PlayerMap {
+ fn default() -> Self {
+ Self { inner: Arc::new(RwLock::new(HashMap::new())), uid_counter: 0.into() }
+ }
+}
+
+impl PlayerMap {
+ pub async fn insert_conn(&mut self, conn: Conn, name: String, clr: String) -> usize {
+ let mut map = self.write().await;
+ let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed);
+ map.insert(
+ conn.addr,
+ Player { conn, uid, name, clr, position: (0,0) },
+ );
+ uid
+ }
+}
+