From f70c1df91afa0a211de0b743da6c0dad59fd23be Mon Sep 17 00:00:00 2001
From: stale <redkugelblitzin@gmail.com>
Date: Thu, 30 Jun 2022 06:34:59 -0300
Subject: redid the live cursor position system

---
 assets/client.js | 115 ++++++++++++++++++++++++++++++++++++-------------------
 src/conn.rs      |  65 +++++++++++++++++++++++--------
 src/livepos.rs   |  78 +++++++++++++++++++++++++++++++++++++
 src/main.rs      |  13 ++++++-
 src/types.rs     |   8 ++--
 5 files changed, 219 insertions(+), 60 deletions(-)
 create mode 100644 src/livepos.rs

diff --git a/assets/client.js b/assets/client.js
index 33ad796..174628f 100644
--- a/assets/client.js
+++ b/assets/client.js
@@ -1,4 +1,12 @@
 window.player = { uid: NaN };
+window.info_elem = document.getElementById("miscinfo");
+window.identform = document.getElementById("identform");
+window.statusline = document.getElementsByClassName("statusline")[0];
+window.bcont_elem = document.getElementById("board-container");
+window.board_elem = document.getElementById("board");
+window.cursor_frame = document.getElementById("cursor-frame");
+window.queued_pos = undefined;
+
 window.room = {
   name: undefined,
   bconf: { w: NaN, h: NaN, tile_w: NaN, tile_h: NaN, mine_ratio: undefined },
@@ -9,12 +17,7 @@ window.room = {
   identity: JSON.parse(localStorage.getItem("identity")),
   cursors: new Map(),
 };
-window.info_elem = document.getElementById("miscinfo");
-window.identform = document.getElementById("identform");
-window.statusline = document.getElementsByClassName("statusline")[0];
-window.bcont_elem = document.getElementById("board-container");
-window.board_elem = document.getElementById("board");
-window.cursor_frame = document.getElementById("cursor-frame");
+
 
 if (room.identity == null) {
   statusline.style.display = "none";
@@ -56,18 +59,31 @@ function connect() {
       let fields = d.split(" ");
       switch (fields[0]) {
         case "pos": {
-          let oid = Number(fields[1]);
-          let name = fields[2];
-          let clr = fields[3];
-          let x = fields[4];
-          let y = fields[5];
-          if (!room.cursors.has(oid)) {
-            createCursor(oid, name, clr);
-          }
-          let celem = room.cursors.get(oid).elem;
-          celem.style.left = x + 'px';
-          celem.style.top = y + 'px';
-          movSelWin(room.cursors.get(oid).selwin, x, y);
+          let posdata = JSON.parse(fields[1]);
+          posdata.forEach(pdat => {
+            let oid = Number(pdat[0]);
+            let x = pdat[1][0];
+            let y = pdat[1][1];
+            let curs = room.cursors.get(oid);
+            if (curs != undefined) {
+              movCursor(curs, x, y);
+            } else {
+              console.log("livepos sys incoherent");
+            }
+          });
+        } break;
+        case "players": {
+          let pdata = JSON.parse(fields[1]);
+          console.log(pdata);
+          pdata.forEach(p => {
+            let oid = Number(p[0]);
+            let name = p[1];
+            let clr = p[2];
+            console.log(oid, name, clr);
+            if (!room.cursors.has(oid)) {
+              createCursor(oid, name, clr);
+            }
+          });
         } break;
         case "regack": {
           room.name = fields[1];
@@ -148,6 +164,7 @@ function acceptBoard(data) {
     last = room.board[i];
   }
   board_elem.innerHTML = split_board.join("");
+  room.cbounds = getBoardBounds();
 }
 
 function createCursor(id, name, clr) {
@@ -166,20 +183,31 @@ function createCursor(id, name, clr) {
   cursor.style.color = clr;
   document.getElementById('cursor-frame').append(cursor);
   document.getElementById('cursor-frame').append(selection_window);
+  let c = { name: name, elem: cursor, selwin: selection_window };
   if (id == window.player.uid) {
     document.addEventListener('mousemove', e => {
-      cursor.style.left = e.pageX + 'px';
-      cursor.style.top = e.pageY + 'px';
-      movSelWin(selection_window, e.pageX, e.pageY);
-      room.socket.send(`pos ${e.pageX} ${e.pageY}`);
+      let bcoords = pageToBoardPx(e.pageX, e.pageY);
+      movCursor(c, bcoords[0], bcoords[1]);
+      window.queued_pos = bcoords;
     },
       false);
   }
   room.cursors.set(id, {name: name, elem: cursor, selwin: selection_window});
   return cursor;
 }
-function movSelWin(win, x, y) {
-  let tpos = tilepos(x,y);
+
+function pageToBoardPx(x,y) {
+  return [Math.floor(x - room.cbounds.ox), Math.floor(y - room.cbounds.oy)];
+}
+
+function movCursor(c, bx, by) {
+  c.elem.style.left = (room.cbounds.ox + bx) + 'px';
+  c.elem.style.top = (room.cbounds.oy + by) + 'px';
+  movSelWin(c.selwin, bx, by);
+}
+function movSelWin(win, bx, by) {
+  let tpos = tilepos(bx,by);
+  console.log(tpos);
   if (tpos.x > (room.bconf.w - 1) || tpos.x < 0 || tpos.y > (room.bconf.h - 1) || tpos.y < 0) {
     win.style.display = "none";
   } else {
@@ -202,31 +230,38 @@ function getBoardBounds() {
     h: a.height
   };
 }
+window.onresize = () => {
+  room.cbounds = getBoardBounds();
+}
 
 bcont_elem.onclick = function(e) {
-  let tpos = tilepos(e.pageX, e.pageY);
+  let bcoords = pageToBoardPx(e.pageX, e.pageY);
+  let tpos = tilepos(bcoords[0], bcoords[1]);
   let cmd = `reveal ${tpos.x} ${tpos.y}`;
-  //console.log(cmd);
   room.socket.send(cmd);
 }
 bcont_elem.oncontextmenu = function(e) {
-  let tpos = tilepos(e.pageX, e.pageY);
+  let bcoords = pageToBoardPx(e.pageX, e.pageY);
+  let tpos = tilepos(bcoords[0], bcoords[1]);
   let cmd = `flag ${tpos.x} ${tpos.y}`;
-  //console.log(cmd);
   room.socket.send(cmd);
   return false;
 }
-function tilepos(x,y) {
-  let b = getBoardBounds();
-  room.cbounds = b;
-  let cx = x - b.ox;
-  let cy = y - b.oy;
-  //let cx = x;
-  //let cy = y;
-  let tilex = Math.floor(room.bconf.w * cx/b.w);
-  let tiley = Math.floor(room.bconf.h * cy/b.h);
+// these are board-px coords
+function tilepos(bx,by) {
+  let b = room.cbounds; // we can assume it is already computed by earlier aux calls
+  let tilex = Math.floor(room.bconf.w * bx/b.w);
+  let tiley = Math.floor(room.bconf.h * by/b.h);
   return { x: tilex, y: tiley };
 }
-function sleep(ms) {
-  return new Promise(resolve => setTimeout(resolve, ms));
-}
+
+(function sendPos() {
+  let qp = window.queued_pos;
+  if (qp) {
+    room.socket.send(`pos ${qp[0]} ${qp[1]}`);
+    window.queued_pos = undefined;
+  }
+  setTimeout(function() {
+    sendPos();
+  }, 16);
+})();
diff --git a/src/conn.rs b/src/conn.rs
index ed4192c..f67121b 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -4,8 +4,10 @@ use std::{
     net::SocketAddr,
 };
 use tokio::sync::RwLock;
+use tokio::sync::mpsc as tokio_mpsc;
 use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream};
 use warp::ws::{ WebSocket, Message };
+use crate::livepos;
 use ammonia;
 
 const MAX_IN: usize = 2048;
@@ -46,6 +48,9 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc
     let room_lock = room.read().await;
     let mut players = room_lock.players.write().await;
     if let Some(disconn_p) = players.remove(&addr) {
+        if let Err(e) = room_lock.pos_stream.send(livepos::Req { id: disconn_p.uid, data: livepos::ReqData::Quit }) {
+            println!("{room_id} E: couldn't send removal request for {disconn_p} from the live position system: {e}");
+        }
         for p in players.values() {
             if let Err(e) = p.conn.tx.send(Message::text(format!("logoff {}", disconn_p.uid))) {
                 println!("{room_id} E: couldn't deliver logoff info to {}: {}", p, e);
@@ -57,12 +62,14 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc
     }
 }
 
-pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::UnboundedSender<Message>), addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) {
+type RoomStreams = (SplitStream<WebSocket>,tokio_mpsc::UnboundedSender<Message>);
+
+pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) {
     let (mut incoming, tx) = streams;
     let (room_id, room) = rinfo;
-    let (players, cmd_tx, room_conf) = {
+    let (players, cmd_tx, pos_tx, room_conf) = {
         let room = room.read().await;
-        (room.players.clone(), room.cmd_stream.clone(), room.conf.clone())
+        (room.players.clone(), room.cmd_stream.clone(), room.pos_stream.clone(), room.conf.clone())
     };
     while let Ok(cmd) = incoming.try_next().await {
         if let Some(cmd) = cmd {
@@ -89,18 +96,22 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb
                         "pos" => {
                             match parse_pos(fields) {
                                 Some(pos) => {
-                                    me.position = pos.clone();
-                                    let sanitized_name = me.name.replace(" ", "&nbsp").to_string();
-                                    let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1);
-                                    for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) {
-                                        let r = peer_tx.send(Message::text(&msg));
-                                        if let Err(e) = r {
-                                            println!("{room_id} E: error sending pos update: {e}");
-                                        }
-                                    }
+                                    if let Err(e) = pos_tx.send(livepos::Req { id: me.uid, data: livepos::ReqData::Pos(pos) }) {
+                                        println!("{room_id} E: couldn't process {me}'s position update: {e}");
+                                    };
+                                    // me.position = pos.clone();
+                                    // let sanitized_name = me.name.replace(" ", "&nbsp").to_string();
+                                    // let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1);
+                                    // for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) {
+                                    //     let r = peer_tx.send(Message::text(&msg));
+                                    //     if let Err(e) = r {
+                                    //         println!("{room_id} E: error sending pos update: {e}");
+                                    //     }
+                                    // }
                                 },
                                 None => {
-                                    println!("{room_id} E: bad position update from {me}");
+                                    // Too spammy
+                                    //println!("{room_id} E: bad position update from {me}");
                                 },
                             }
                         },
@@ -154,11 +165,27 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb
                                 let conn = Conn { addr, tx: tx.clone() };
                                 room.write().await.players.insert_conn(conn, name.clone(), clr).await
                             };
+                            let players_lock = players.read().await;
+                            let me = players_lock.get(&addr).unwrap();
                             tx.send(Message::text(format!("regack {} {} {} {}",
                                     room_conf.name.replace(" ", "&nbsp;"), name.replace(" ", "&nbsp;"), uid, room_conf.board_conf))
                             ).expect("couldn't send register ack");
-                            if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) {
-                                println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}");
+
+                            {
+                                let msg = Message::text(format!("players {}",
+                                            jsonenc_players(players_lock.values())
+                                            .expect("couldn't JSONify players")));
+                                for p in players_lock.values() {
+                                    if let Err(e) = p.conn.tx.send(msg.clone()) {
+                                        println!("{room_id} E: couldn't dump players for {me}: {e}");
+                                    }
+                                }
+                            }
+                            if let Err(e) = pos_tx.send(livepos::Req { id: uid, data: livepos::ReqData::StateDump }) {
+                                println!("{room_id} E: couldn't request position dump for {me}: {e}");
+                            }
+                            if let Err(e) = cmd_tx.send(MetaMove::Dump) {
+                                println!("{room_id} E: couldn't request game dump for {me}: {e}");
                             }
                         }
                     }
@@ -170,3 +197,11 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb
         }
     }
 }
+
+fn jsonenc_players<'a, I: IntoIterator<Item=&'a Player>>(players: I) -> Result<String, serde_json::Error> {
+    let mut pairs = Vec::new();
+    for player in players {
+        pairs.push((player.uid, player.name.replace(" ", "&nbsp"), player.clr.clone()));
+    }
+    serde_json::to_string(&pairs)
+}
diff --git a/src/livepos.rs b/src/livepos.rs
new file mode 100644
index 0000000..be56b1e
--- /dev/null
+++ b/src/livepos.rs
@@ -0,0 +1,78 @@
+use crate::types::*;
+use tokio::sync::mpsc as tokio_mpsc;
+use tokio::sync::Mutex;
+use std::collections::{HashMap,HashSet};
+use tokio::time::{self, Duration};
+use warp::ws::Message;
+
+pub enum ReqData {
+    Pos((usize,usize)),
+    StateDump,
+    Quit,
+}
+
+pub struct Req {
+    pub id: usize,
+    pub data: ReqData,
+}
+
+pub async fn livepos(players: PlayerMapData, mut recv: tokio_mpsc::UnboundedReceiver<Req>) {
+    let positions = Mutex::new(HashMap::new());
+    let dirty = Mutex::new(HashSet::new());
+    let process_upds = async {
+        while let Some(update) = recv.recv().await {
+            let mut dirty = dirty.lock().await;
+            let mut positions = positions.lock().await;
+            match update.data {
+                ReqData::Pos(p) => {
+                    let old = positions.get(&update.id).unwrap_or(&(0,0));
+                    if p != *old {
+                        dirty.insert(update.id);
+                    }
+                    positions.insert(update.id, p);
+                },
+                ReqData::StateDump => {
+                    dirty.clear();
+                    dirty.extend(positions.keys().map(|x| *x));
+                },
+                ReqData::Quit => {
+                    positions.remove(&update.id);
+                    dirty.retain(|x| *x != update.id);
+                }
+            }
+        }
+    };
+    let periodic_send = async {
+        let mut interv = tokio::time::interval(Duration::from_millis(16));
+        interv.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
+        loop {
+            interv.tick().await;
+            let mut dirty = dirty.lock().await;
+            if dirty.len() > 0 {
+                let mut positions = positions.lock().await;
+                let msg = jsonenc_ids(&mut positions, &*dirty).expect("couldn't JSONify player positions");
+                dirty.clear();
+                let plock = players.read().await;
+                for player in plock.values() {
+                    if let Err(e) = player.conn.tx.send(Message::text(format!("pos {}", msg))) {
+                        println!("E: couldn't send livepos update to {}: {}", player, e);
+                    }
+                }
+            }
+        }
+    };
+
+    tokio::select!(
+        _ = process_upds => (),
+        _ = periodic_send => ()
+    );
+}
+
+fn jsonenc_ids<'a, I: IntoIterator<Item=&'a usize>>(positions: &mut HashMap<usize, (usize,usize)>, ids: I) -> Result<String, serde_json::Error> {
+    let mut pairs = Vec::new();
+    for id in ids {
+        pairs.push((id, positions[id]));
+    };
+    serde_json::to_string(&pairs)
+}
+
diff --git a/src/main.rs b/src/main.rs
index 5012e95..82f1c8e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -8,6 +8,7 @@ use std::{
 use futures::stream::StreamExt;
 
 mod types;
+mod livepos;
 mod conn;
 mod minesweeper;
 use types::*;
@@ -129,9 +130,15 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {
                         let n = rinfo.get("rname").unwrap().to_owned();
                         if n.is_empty() { uid.to_string() } else { n }
                     };
+
                     let players = PlayerMap::default();
+
                     let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
-                    let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf));
+                    let game_handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf));
+
+                    let (pos_tx, pos_rx) = tokio::sync::mpsc::unbounded_channel();
+                    let livepos_handle = tokio::spawn(livepos::livepos(players.clone(), pos_rx));
+
                     let room_conf = RoomConf {
                         name,
                         player_cap: match limit { Some(i) => i, None => usize::MAX },
@@ -141,8 +148,10 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {
                     let new_room = Room {
                         conf: room_conf,
                         players,
-                        driver: handle,
+                        game_driver: game_handle,
                         cmd_stream: cmd_tx,
+                        livepos_driver: livepos_handle,
+                        pos_stream: pos_tx,
                     };
                     if access.is_some() {
                         pubs.write().await.insert(uid.clone(), serde_json::to_string(&new_room.conf).unwrap());
diff --git a/src/types.rs b/src/types.rs
index 2d6ae22..cb16042 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -12,6 +12,7 @@ use warp::ws::Message;
 use tokio::sync::RwLock;
 use serde::Serialize;
 use crate::minesweeper;
+use crate::livepos;
 
 #[derive(Debug, Clone)]
 pub struct Config {
@@ -35,8 +36,10 @@ pub struct RoomConf {
 pub struct Room {
     pub conf: RoomConf,
     pub players: PlayerMap,
-    pub driver: tokio::task::JoinHandle<()>,
+    pub game_driver: tokio::task::JoinHandle<()>,
     pub cmd_stream: CmdTx,
+    pub livepos_driver: tokio::task::JoinHandle<()>,
+    pub pos_stream: tokio::sync::mpsc::UnboundedSender<livepos::Req>,
 }
 
 #[derive(Debug)]
@@ -58,7 +61,6 @@ pub struct Player {
     pub uid: usize,
     pub name: String,
     pub clr: String,
-    pub position: (usize, usize),
 }
 
 impl Display for Player {
@@ -125,7 +127,7 @@ impl PlayerMap {
         let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed);
         map.insert(
             conn.addr,
-            Player { conn, uid, name, clr, position: (0,0) },
+            Player { conn, uid, name, clr },
         );
         uid
     }
-- 
cgit v1.2.3