diff options
-rw-r--r-- | assets/client.js | 115 | ||||
-rw-r--r-- | src/conn.rs | 65 | ||||
-rw-r--r-- | src/livepos.rs | 78 | ||||
-rw-r--r-- | src/main.rs | 13 | ||||
-rw-r--r-- | src/types.rs | 8 |
5 files changed, 219 insertions, 60 deletions
diff --git a/assets/client.js b/assets/client.js index 33ad796..174628f 100644 --- a/assets/client.js +++ b/assets/client.js @@ -1,4 +1,12 @@ window.player = { uid: NaN }; +window.info_elem = document.getElementById("miscinfo"); +window.identform = document.getElementById("identform"); +window.statusline = document.getElementsByClassName("statusline")[0]; +window.bcont_elem = document.getElementById("board-container"); +window.board_elem = document.getElementById("board"); +window.cursor_frame = document.getElementById("cursor-frame"); +window.queued_pos = undefined; + window.room = { name: undefined, bconf: { w: NaN, h: NaN, tile_w: NaN, tile_h: NaN, mine_ratio: undefined }, @@ -9,12 +17,7 @@ window.room = { identity: JSON.parse(localStorage.getItem("identity")), cursors: new Map(), }; -window.info_elem = document.getElementById("miscinfo"); -window.identform = document.getElementById("identform"); -window.statusline = document.getElementsByClassName("statusline")[0]; -window.bcont_elem = document.getElementById("board-container"); -window.board_elem = document.getElementById("board"); -window.cursor_frame = document.getElementById("cursor-frame"); + if (room.identity == null) { statusline.style.display = "none"; @@ -56,18 +59,31 @@ function connect() { let fields = d.split(" "); switch (fields[0]) { case "pos": { - let oid = Number(fields[1]); - let name = fields[2]; - let clr = fields[3]; - let x = fields[4]; - let y = fields[5]; - if (!room.cursors.has(oid)) { - createCursor(oid, name, clr); - } - let celem = room.cursors.get(oid).elem; - celem.style.left = x + 'px'; - celem.style.top = y + 'px'; - movSelWin(room.cursors.get(oid).selwin, x, y); + let posdata = JSON.parse(fields[1]); + posdata.forEach(pdat => { + let oid = Number(pdat[0]); + let x = pdat[1][0]; + let y = pdat[1][1]; + let curs = room.cursors.get(oid); + if (curs != undefined) { + movCursor(curs, x, y); + } else { + console.log("livepos sys incoherent"); + } + }); + } break; + case "players": { + let pdata = JSON.parse(fields[1]); + console.log(pdata); + pdata.forEach(p => { + let oid = Number(p[0]); + let name = p[1]; + let clr = p[2]; + console.log(oid, name, clr); + if (!room.cursors.has(oid)) { + createCursor(oid, name, clr); + } + }); } break; case "regack": { room.name = fields[1]; @@ -148,6 +164,7 @@ function acceptBoard(data) { last = room.board[i]; } board_elem.innerHTML = split_board.join(""); + room.cbounds = getBoardBounds(); } function createCursor(id, name, clr) { @@ -166,20 +183,31 @@ function createCursor(id, name, clr) { cursor.style.color = clr; document.getElementById('cursor-frame').append(cursor); document.getElementById('cursor-frame').append(selection_window); + let c = { name: name, elem: cursor, selwin: selection_window }; if (id == window.player.uid) { document.addEventListener('mousemove', e => { - cursor.style.left = e.pageX + 'px'; - cursor.style.top = e.pageY + 'px'; - movSelWin(selection_window, e.pageX, e.pageY); - room.socket.send(`pos ${e.pageX} ${e.pageY}`); + let bcoords = pageToBoardPx(e.pageX, e.pageY); + movCursor(c, bcoords[0], bcoords[1]); + window.queued_pos = bcoords; }, false); } room.cursors.set(id, {name: name, elem: cursor, selwin: selection_window}); return cursor; } -function movSelWin(win, x, y) { - let tpos = tilepos(x,y); + +function pageToBoardPx(x,y) { + return [Math.floor(x - room.cbounds.ox), Math.floor(y - room.cbounds.oy)]; +} + +function movCursor(c, bx, by) { + c.elem.style.left = (room.cbounds.ox + bx) + 'px'; + c.elem.style.top = (room.cbounds.oy + by) + 'px'; + movSelWin(c.selwin, bx, by); +} +function movSelWin(win, bx, by) { + let tpos = tilepos(bx,by); + console.log(tpos); if (tpos.x > (room.bconf.w - 1) || tpos.x < 0 || tpos.y > (room.bconf.h - 1) || tpos.y < 0) { win.style.display = "none"; } else { @@ -202,31 +230,38 @@ function getBoardBounds() { h: a.height }; } +window.onresize = () => { + room.cbounds = getBoardBounds(); +} bcont_elem.onclick = function(e) { - let tpos = tilepos(e.pageX, e.pageY); + let bcoords = pageToBoardPx(e.pageX, e.pageY); + let tpos = tilepos(bcoords[0], bcoords[1]); let cmd = `reveal ${tpos.x} ${tpos.y}`; - //console.log(cmd); room.socket.send(cmd); } bcont_elem.oncontextmenu = function(e) { - let tpos = tilepos(e.pageX, e.pageY); + let bcoords = pageToBoardPx(e.pageX, e.pageY); + let tpos = tilepos(bcoords[0], bcoords[1]); let cmd = `flag ${tpos.x} ${tpos.y}`; - //console.log(cmd); room.socket.send(cmd); return false; } -function tilepos(x,y) { - let b = getBoardBounds(); - room.cbounds = b; - let cx = x - b.ox; - let cy = y - b.oy; - //let cx = x; - //let cy = y; - let tilex = Math.floor(room.bconf.w * cx/b.w); - let tiley = Math.floor(room.bconf.h * cy/b.h); +// these are board-px coords +function tilepos(bx,by) { + let b = room.cbounds; // we can assume it is already computed by earlier aux calls + let tilex = Math.floor(room.bconf.w * bx/b.w); + let tiley = Math.floor(room.bconf.h * by/b.h); return { x: tilex, y: tiley }; } -function sleep(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); -} + +(function sendPos() { + let qp = window.queued_pos; + if (qp) { + room.socket.send(`pos ${qp[0]} ${qp[1]}`); + window.queued_pos = undefined; + } + setTimeout(function() { + sendPos(); + }, 16); +})(); diff --git a/src/conn.rs b/src/conn.rs index ed4192c..f67121b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -4,8 +4,10 @@ use std::{ net::SocketAddr, }; use tokio::sync::RwLock; +use tokio::sync::mpsc as tokio_mpsc; use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream}; use warp::ws::{ WebSocket, Message }; +use crate::livepos; use ammonia; const MAX_IN: usize = 2048; @@ -46,6 +48,9 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc let room_lock = room.read().await; let mut players = room_lock.players.write().await; if let Some(disconn_p) = players.remove(&addr) { + if let Err(e) = room_lock.pos_stream.send(livepos::Req { id: disconn_p.uid, data: livepos::ReqData::Quit }) { + println!("{room_id} E: couldn't send removal request for {disconn_p} from the live position system: {e}"); + } for p in players.values() { if let Err(e) = p.conn.tx.send(Message::text(format!("logoff {}", disconn_p.uid))) { println!("{room_id} E: couldn't deliver logoff info to {}: {}", p, e); @@ -57,12 +62,14 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc } } -pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::UnboundedSender<Message>), addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { +type RoomStreams = (SplitStream<WebSocket>,tokio_mpsc::UnboundedSender<Message>); + +pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { let (mut incoming, tx) = streams; let (room_id, room) = rinfo; - let (players, cmd_tx, room_conf) = { + let (players, cmd_tx, pos_tx, room_conf) = { let room = room.read().await; - (room.players.clone(), room.cmd_stream.clone(), room.conf.clone()) + (room.players.clone(), room.cmd_stream.clone(), room.pos_stream.clone(), room.conf.clone()) }; while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { @@ -89,18 +96,22 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb "pos" => { match parse_pos(fields) { Some(pos) => { - me.position = pos.clone(); - let sanitized_name = me.name.replace(" ", " ").to_string(); - let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); - for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { - let r = peer_tx.send(Message::text(&msg)); - if let Err(e) = r { - println!("{room_id} E: error sending pos update: {e}"); - } - } + if let Err(e) = pos_tx.send(livepos::Req { id: me.uid, data: livepos::ReqData::Pos(pos) }) { + println!("{room_id} E: couldn't process {me}'s position update: {e}"); + }; + // me.position = pos.clone(); + // let sanitized_name = me.name.replace(" ", " ").to_string(); + // let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); + // for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { + // let r = peer_tx.send(Message::text(&msg)); + // if let Err(e) = r { + // println!("{room_id} E: error sending pos update: {e}"); + // } + // } }, None => { - println!("{room_id} E: bad position update from {me}"); + // Too spammy + //println!("{room_id} E: bad position update from {me}"); }, } }, @@ -154,11 +165,27 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb let conn = Conn { addr, tx: tx.clone() }; room.write().await.players.insert_conn(conn, name.clone(), clr).await }; + let players_lock = players.read().await; + let me = players_lock.get(&addr).unwrap(); tx.send(Message::text(format!("regack {} {} {} {}", room_conf.name.replace(" ", " "), name.replace(" ", " "), uid, room_conf.board_conf)) ).expect("couldn't send register ack"); - if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { - println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}"); + + { + let msg = Message::text(format!("players {}", + jsonenc_players(players_lock.values()) + .expect("couldn't JSONify players"))); + for p in players_lock.values() { + if let Err(e) = p.conn.tx.send(msg.clone()) { + println!("{room_id} E: couldn't dump players for {me}: {e}"); + } + } + } + if let Err(e) = pos_tx.send(livepos::Req { id: uid, data: livepos::ReqData::StateDump }) { + println!("{room_id} E: couldn't request position dump for {me}: {e}"); + } + if let Err(e) = cmd_tx.send(MetaMove::Dump) { + println!("{room_id} E: couldn't request game dump for {me}: {e}"); } } } @@ -170,3 +197,11 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb } } } + +fn jsonenc_players<'a, I: IntoIterator<Item=&'a Player>>(players: I) -> Result<String, serde_json::Error> { + let mut pairs = Vec::new(); + for player in players { + pairs.push((player.uid, player.name.replace(" ", " "), player.clr.clone())); + } + serde_json::to_string(&pairs) +} diff --git a/src/livepos.rs b/src/livepos.rs new file mode 100644 index 0000000..be56b1e --- /dev/null +++ b/src/livepos.rs @@ -0,0 +1,78 @@ +use crate::types::*; +use tokio::sync::mpsc as tokio_mpsc; +use tokio::sync::Mutex; +use std::collections::{HashMap,HashSet}; +use tokio::time::{self, Duration}; +use warp::ws::Message; + +pub enum ReqData { + Pos((usize,usize)), + StateDump, + Quit, +} + +pub struct Req { + pub id: usize, + pub data: ReqData, +} + +pub async fn livepos(players: PlayerMapData, mut recv: tokio_mpsc::UnboundedReceiver<Req>) { + let positions = Mutex::new(HashMap::new()); + let dirty = Mutex::new(HashSet::new()); + let process_upds = async { + while let Some(update) = recv.recv().await { + let mut dirty = dirty.lock().await; + let mut positions = positions.lock().await; + match update.data { + ReqData::Pos(p) => { + let old = positions.get(&update.id).unwrap_or(&(0,0)); + if p != *old { + dirty.insert(update.id); + } + positions.insert(update.id, p); + }, + ReqData::StateDump => { + dirty.clear(); + dirty.extend(positions.keys().map(|x| *x)); + }, + ReqData::Quit => { + positions.remove(&update.id); + dirty.retain(|x| *x != update.id); + } + } + } + }; + let periodic_send = async { + let mut interv = tokio::time::interval(Duration::from_millis(16)); + interv.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { + interv.tick().await; + let mut dirty = dirty.lock().await; + if dirty.len() > 0 { + let mut positions = positions.lock().await; + let msg = jsonenc_ids(&mut positions, &*dirty).expect("couldn't JSONify player positions"); + dirty.clear(); + let plock = players.read().await; + for player in plock.values() { + if let Err(e) = player.conn.tx.send(Message::text(format!("pos {}", msg))) { + println!("E: couldn't send livepos update to {}: {}", player, e); + } + } + } + } + }; + + tokio::select!( + _ = process_upds => (), + _ = periodic_send => () + ); +} + +fn jsonenc_ids<'a, I: IntoIterator<Item=&'a usize>>(positions: &mut HashMap<usize, (usize,usize)>, ids: I) -> Result<String, serde_json::Error> { + let mut pairs = Vec::new(); + for id in ids { + pairs.push((id, positions[id])); + }; + serde_json::to_string(&pairs) +} + diff --git a/src/main.rs b/src/main.rs index 5012e95..82f1c8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use std::{ use futures::stream::StreamExt; mod types; +mod livepos; mod conn; mod minesweeper; use types::*; @@ -129,9 +130,15 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> { let n = rinfo.get("rname").unwrap().to_owned(); if n.is_empty() { uid.to_string() } else { n } }; + let players = PlayerMap::default(); + let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); - let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf)); + let game_handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf)); + + let (pos_tx, pos_rx) = tokio::sync::mpsc::unbounded_channel(); + let livepos_handle = tokio::spawn(livepos::livepos(players.clone(), pos_rx)); + let room_conf = RoomConf { name, player_cap: match limit { Some(i) => i, None => usize::MAX }, @@ -141,8 +148,10 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> { let new_room = Room { conf: room_conf, players, - driver: handle, + game_driver: game_handle, cmd_stream: cmd_tx, + livepos_driver: livepos_handle, + pos_stream: pos_tx, }; if access.is_some() { pubs.write().await.insert(uid.clone(), serde_json::to_string(&new_room.conf).unwrap()); diff --git a/src/types.rs b/src/types.rs index 2d6ae22..cb16042 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,7 @@ use warp::ws::Message; use tokio::sync::RwLock; use serde::Serialize; use crate::minesweeper; +use crate::livepos; #[derive(Debug, Clone)] pub struct Config { @@ -35,8 +36,10 @@ pub struct RoomConf { pub struct Room { pub conf: RoomConf, pub players: PlayerMap, - pub driver: tokio::task::JoinHandle<()>, + pub game_driver: tokio::task::JoinHandle<()>, pub cmd_stream: CmdTx, + pub livepos_driver: tokio::task::JoinHandle<()>, + pub pos_stream: tokio::sync::mpsc::UnboundedSender<livepos::Req>, } #[derive(Debug)] @@ -58,7 +61,6 @@ pub struct Player { pub uid: usize, pub name: String, pub clr: String, - pub position: (usize, usize), } impl Display for Player { @@ -125,7 +127,7 @@ impl PlayerMap { let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed); map.insert( conn.addr, - Player { conn, uid, name, clr, position: (0,0) }, + Player { conn, uid, name, clr }, ); uid } |