diff options
| -rw-r--r-- | assets/client.js | 115 | ||||
| -rw-r--r-- | src/conn.rs | 65 | ||||
| -rw-r--r-- | src/livepos.rs | 78 | ||||
| -rw-r--r-- | src/main.rs | 13 | ||||
| -rw-r--r-- | src/types.rs | 8 | 
5 files changed, 219 insertions, 60 deletions
| diff --git a/assets/client.js b/assets/client.js index 33ad796..174628f 100644 --- a/assets/client.js +++ b/assets/client.js @@ -1,4 +1,12 @@  window.player = { uid: NaN }; +window.info_elem = document.getElementById("miscinfo"); +window.identform = document.getElementById("identform"); +window.statusline = document.getElementsByClassName("statusline")[0]; +window.bcont_elem = document.getElementById("board-container"); +window.board_elem = document.getElementById("board"); +window.cursor_frame = document.getElementById("cursor-frame"); +window.queued_pos = undefined; +  window.room = {    name: undefined,    bconf: { w: NaN, h: NaN, tile_w: NaN, tile_h: NaN, mine_ratio: undefined }, @@ -9,12 +17,7 @@ window.room = {    identity: JSON.parse(localStorage.getItem("identity")),    cursors: new Map(),  }; -window.info_elem = document.getElementById("miscinfo"); -window.identform = document.getElementById("identform"); -window.statusline = document.getElementsByClassName("statusline")[0]; -window.bcont_elem = document.getElementById("board-container"); -window.board_elem = document.getElementById("board"); -window.cursor_frame = document.getElementById("cursor-frame"); +  if (room.identity == null) {    statusline.style.display = "none"; @@ -56,18 +59,31 @@ function connect() {        let fields = d.split(" ");        switch (fields[0]) {          case "pos": { -          let oid = Number(fields[1]); -          let name = fields[2]; -          let clr = fields[3]; -          let x = fields[4]; -          let y = fields[5]; -          if (!room.cursors.has(oid)) { -            createCursor(oid, name, clr); -          } -          let celem = room.cursors.get(oid).elem; -          celem.style.left = x + 'px'; -          celem.style.top = y + 'px'; -          movSelWin(room.cursors.get(oid).selwin, x, y); +          let posdata = JSON.parse(fields[1]); +          posdata.forEach(pdat => { +            let oid = Number(pdat[0]); +            let x = pdat[1][0]; +            let y = pdat[1][1]; +            let curs = room.cursors.get(oid); +            if (curs != undefined) { +              movCursor(curs, x, y); +            } else { +              console.log("livepos sys incoherent"); +            } +          }); +        } break; +        case "players": { +          let pdata = JSON.parse(fields[1]); +          console.log(pdata); +          pdata.forEach(p => { +            let oid = Number(p[0]); +            let name = p[1]; +            let clr = p[2]; +            console.log(oid, name, clr); +            if (!room.cursors.has(oid)) { +              createCursor(oid, name, clr); +            } +          });          } break;          case "regack": {            room.name = fields[1]; @@ -148,6 +164,7 @@ function acceptBoard(data) {      last = room.board[i];    }    board_elem.innerHTML = split_board.join(""); +  room.cbounds = getBoardBounds();  }  function createCursor(id, name, clr) { @@ -166,20 +183,31 @@ function createCursor(id, name, clr) {    cursor.style.color = clr;    document.getElementById('cursor-frame').append(cursor);    document.getElementById('cursor-frame').append(selection_window); +  let c = { name: name, elem: cursor, selwin: selection_window };    if (id == window.player.uid) {      document.addEventListener('mousemove', e => { -      cursor.style.left = e.pageX + 'px'; -      cursor.style.top = e.pageY + 'px'; -      movSelWin(selection_window, e.pageX, e.pageY); -      room.socket.send(`pos ${e.pageX} ${e.pageY}`); +      let bcoords = pageToBoardPx(e.pageX, e.pageY); +      movCursor(c, bcoords[0], bcoords[1]); +      window.queued_pos = bcoords;      },        false);    }    room.cursors.set(id, {name: name, elem: cursor, selwin: selection_window});    return cursor;  } -function movSelWin(win, x, y) { -  let tpos = tilepos(x,y); + +function pageToBoardPx(x,y) { +  return [Math.floor(x - room.cbounds.ox), Math.floor(y - room.cbounds.oy)]; +} + +function movCursor(c, bx, by) { +  c.elem.style.left = (room.cbounds.ox + bx) + 'px'; +  c.elem.style.top = (room.cbounds.oy + by) + 'px'; +  movSelWin(c.selwin, bx, by); +} +function movSelWin(win, bx, by) { +  let tpos = tilepos(bx,by); +  console.log(tpos);    if (tpos.x > (room.bconf.w - 1) || tpos.x < 0 || tpos.y > (room.bconf.h - 1) || tpos.y < 0) {      win.style.display = "none";    } else { @@ -202,31 +230,38 @@ function getBoardBounds() {      h: a.height    };  } +window.onresize = () => { +  room.cbounds = getBoardBounds(); +}  bcont_elem.onclick = function(e) { -  let tpos = tilepos(e.pageX, e.pageY); +  let bcoords = pageToBoardPx(e.pageX, e.pageY); +  let tpos = tilepos(bcoords[0], bcoords[1]);    let cmd = `reveal ${tpos.x} ${tpos.y}`; -  //console.log(cmd);    room.socket.send(cmd);  }  bcont_elem.oncontextmenu = function(e) { -  let tpos = tilepos(e.pageX, e.pageY); +  let bcoords = pageToBoardPx(e.pageX, e.pageY); +  let tpos = tilepos(bcoords[0], bcoords[1]);    let cmd = `flag ${tpos.x} ${tpos.y}`; -  //console.log(cmd);    room.socket.send(cmd);    return false;  } -function tilepos(x,y) { -  let b = getBoardBounds(); -  room.cbounds = b; -  let cx = x - b.ox; -  let cy = y - b.oy; -  //let cx = x; -  //let cy = y; -  let tilex = Math.floor(room.bconf.w * cx/b.w); -  let tiley = Math.floor(room.bconf.h * cy/b.h); +// these are board-px coords +function tilepos(bx,by) { +  let b = room.cbounds; // we can assume it is already computed by earlier aux calls +  let tilex = Math.floor(room.bconf.w * bx/b.w); +  let tiley = Math.floor(room.bconf.h * by/b.h);    return { x: tilex, y: tiley };  } -function sleep(ms) { -  return new Promise(resolve => setTimeout(resolve, ms)); -} + +(function sendPos() { +  let qp = window.queued_pos; +  if (qp) { +    room.socket.send(`pos ${qp[0]} ${qp[1]}`); +    window.queued_pos = undefined; +  } +  setTimeout(function() { +    sendPos(); +  }, 16); +})(); diff --git a/src/conn.rs b/src/conn.rs index ed4192c..f67121b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -4,8 +4,10 @@ use std::{      net::SocketAddr,  };  use tokio::sync::RwLock; +use tokio::sync::mpsc as tokio_mpsc;  use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream};  use warp::ws::{ WebSocket, Message }; +use crate::livepos;  use ammonia;  const MAX_IN: usize = 2048; @@ -46,6 +48,9 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc      let room_lock = room.read().await;      let mut players = room_lock.players.write().await;      if let Some(disconn_p) = players.remove(&addr) { +        if let Err(e) = room_lock.pos_stream.send(livepos::Req { id: disconn_p.uid, data: livepos::ReqData::Quit }) { +            println!("{room_id} E: couldn't send removal request for {disconn_p} from the live position system: {e}"); +        }          for p in players.values() {              if let Err(e) = p.conn.tx.send(Message::text(format!("logoff {}", disconn_p.uid))) {                  println!("{room_id} E: couldn't deliver logoff info to {}: {}", p, e); @@ -57,12 +62,14 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc      }  } -pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::UnboundedSender<Message>), addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) { +type RoomStreams = (SplitStream<WebSocket>,tokio_mpsc::UnboundedSender<Message>); + +pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) {      let (mut incoming, tx) = streams;      let (room_id, room) = rinfo; -    let (players, cmd_tx, room_conf) = { +    let (players, cmd_tx, pos_tx, room_conf) = {          let room = room.read().await; -        (room.players.clone(), room.cmd_stream.clone(), room.conf.clone()) +        (room.players.clone(), room.cmd_stream.clone(), room.pos_stream.clone(), room.conf.clone())      };      while let Ok(cmd) = incoming.try_next().await {          if let Some(cmd) = cmd { @@ -89,18 +96,22 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb                          "pos" => {                              match parse_pos(fields) {                                  Some(pos) => { -                                    me.position = pos.clone(); -                                    let sanitized_name = me.name.replace(" ", " ").to_string(); -                                    let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); -                                    for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { -                                        let r = peer_tx.send(Message::text(&msg)); -                                        if let Err(e) = r { -                                            println!("{room_id} E: error sending pos update: {e}"); -                                        } -                                    } +                                    if let Err(e) = pos_tx.send(livepos::Req { id: me.uid, data: livepos::ReqData::Pos(pos) }) { +                                        println!("{room_id} E: couldn't process {me}'s position update: {e}"); +                                    }; +                                    // me.position = pos.clone(); +                                    // let sanitized_name = me.name.replace(" ", " ").to_string(); +                                    // let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1); +                                    // for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) { +                                    //     let r = peer_tx.send(Message::text(&msg)); +                                    //     if let Err(e) = r { +                                    //         println!("{room_id} E: error sending pos update: {e}"); +                                    //     } +                                    // }                                  },                                  None => { -                                    println!("{room_id} E: bad position update from {me}"); +                                    // Too spammy +                                    //println!("{room_id} E: bad position update from {me}");                                  },                              }                          }, @@ -154,11 +165,27 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb                                  let conn = Conn { addr, tx: tx.clone() };                                  room.write().await.players.insert_conn(conn, name.clone(), clr).await                              }; +                            let players_lock = players.read().await; +                            let me = players_lock.get(&addr).unwrap();                              tx.send(Message::text(format!("regack {} {} {} {}",                                      room_conf.name.replace(" ", " "), name.replace(" ", " "), uid, room_conf.board_conf))                              ).expect("couldn't send register ack"); -                            if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) { -                                println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}"); + +                            { +                                let msg = Message::text(format!("players {}", +                                            jsonenc_players(players_lock.values()) +                                            .expect("couldn't JSONify players"))); +                                for p in players_lock.values() { +                                    if let Err(e) = p.conn.tx.send(msg.clone()) { +                                        println!("{room_id} E: couldn't dump players for {me}: {e}"); +                                    } +                                } +                            } +                            if let Err(e) = pos_tx.send(livepos::Req { id: uid, data: livepos::ReqData::StateDump }) { +                                println!("{room_id} E: couldn't request position dump for {me}: {e}"); +                            } +                            if let Err(e) = cmd_tx.send(MetaMove::Dump) { +                                println!("{room_id} E: couldn't request game dump for {me}: {e}");                              }                          }                      } @@ -170,3 +197,11 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb          }      }  } + +fn jsonenc_players<'a, I: IntoIterator<Item=&'a Player>>(players: I) -> Result<String, serde_json::Error> { +    let mut pairs = Vec::new(); +    for player in players { +        pairs.push((player.uid, player.name.replace(" ", " "), player.clr.clone())); +    } +    serde_json::to_string(&pairs) +} diff --git a/src/livepos.rs b/src/livepos.rs new file mode 100644 index 0000000..be56b1e --- /dev/null +++ b/src/livepos.rs @@ -0,0 +1,78 @@ +use crate::types::*; +use tokio::sync::mpsc as tokio_mpsc; +use tokio::sync::Mutex; +use std::collections::{HashMap,HashSet}; +use tokio::time::{self, Duration}; +use warp::ws::Message; + +pub enum ReqData { +    Pos((usize,usize)), +    StateDump, +    Quit, +} + +pub struct Req { +    pub id: usize, +    pub data: ReqData, +} + +pub async fn livepos(players: PlayerMapData, mut recv: tokio_mpsc::UnboundedReceiver<Req>) { +    let positions = Mutex::new(HashMap::new()); +    let dirty = Mutex::new(HashSet::new()); +    let process_upds = async { +        while let Some(update) = recv.recv().await { +            let mut dirty = dirty.lock().await; +            let mut positions = positions.lock().await; +            match update.data { +                ReqData::Pos(p) => { +                    let old = positions.get(&update.id).unwrap_or(&(0,0)); +                    if p != *old { +                        dirty.insert(update.id); +                    } +                    positions.insert(update.id, p); +                }, +                ReqData::StateDump => { +                    dirty.clear(); +                    dirty.extend(positions.keys().map(|x| *x)); +                }, +                ReqData::Quit => { +                    positions.remove(&update.id); +                    dirty.retain(|x| *x != update.id); +                } +            } +        } +    }; +    let periodic_send = async { +        let mut interv = tokio::time::interval(Duration::from_millis(16)); +        interv.set_missed_tick_behavior(time::MissedTickBehavior::Skip); +        loop { +            interv.tick().await; +            let mut dirty = dirty.lock().await; +            if dirty.len() > 0 { +                let mut positions = positions.lock().await; +                let msg = jsonenc_ids(&mut positions, &*dirty).expect("couldn't JSONify player positions"); +                dirty.clear(); +                let plock = players.read().await; +                for player in plock.values() { +                    if let Err(e) = player.conn.tx.send(Message::text(format!("pos {}", msg))) { +                        println!("E: couldn't send livepos update to {}: {}", player, e); +                    } +                } +            } +        } +    }; + +    tokio::select!( +        _ = process_upds => (), +        _ = periodic_send => () +    ); +} + +fn jsonenc_ids<'a, I: IntoIterator<Item=&'a usize>>(positions: &mut HashMap<usize, (usize,usize)>, ids: I) -> Result<String, serde_json::Error> { +    let mut pairs = Vec::new(); +    for id in ids { +        pairs.push((id, positions[id])); +    }; +    serde_json::to_string(&pairs) +} + diff --git a/src/main.rs b/src/main.rs index 5012e95..82f1c8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use std::{  use futures::stream::StreamExt;  mod types; +mod livepos;  mod conn;  mod minesweeper;  use types::*; @@ -129,9 +130,15 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {                          let n = rinfo.get("rname").unwrap().to_owned();                          if n.is_empty() { uid.to_string() } else { n }                      }; +                      let players = PlayerMap::default(); +                      let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); -                    let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf)); +                    let game_handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf)); + +                    let (pos_tx, pos_rx) = tokio::sync::mpsc::unbounded_channel(); +                    let livepos_handle = tokio::spawn(livepos::livepos(players.clone(), pos_rx)); +                      let room_conf = RoomConf {                          name,                          player_cap: match limit { Some(i) => i, None => usize::MAX }, @@ -141,8 +148,10 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {                      let new_room = Room {                          conf: room_conf,                          players, -                        driver: handle, +                        game_driver: game_handle,                          cmd_stream: cmd_tx, +                        livepos_driver: livepos_handle, +                        pos_stream: pos_tx,                      };                      if access.is_some() {                          pubs.write().await.insert(uid.clone(), serde_json::to_string(&new_room.conf).unwrap()); diff --git a/src/types.rs b/src/types.rs index 2d6ae22..cb16042 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,7 @@ use warp::ws::Message;  use tokio::sync::RwLock;  use serde::Serialize;  use crate::minesweeper; +use crate::livepos;  #[derive(Debug, Clone)]  pub struct Config { @@ -35,8 +36,10 @@ pub struct RoomConf {  pub struct Room {      pub conf: RoomConf,      pub players: PlayerMap, -    pub driver: tokio::task::JoinHandle<()>, +    pub game_driver: tokio::task::JoinHandle<()>,      pub cmd_stream: CmdTx, +    pub livepos_driver: tokio::task::JoinHandle<()>, +    pub pos_stream: tokio::sync::mpsc::UnboundedSender<livepos::Req>,  }  #[derive(Debug)] @@ -58,7 +61,6 @@ pub struct Player {      pub uid: usize,      pub name: String,      pub clr: String, -    pub position: (usize, usize),  }  impl Display for Player { @@ -125,7 +127,7 @@ impl PlayerMap {          let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed);          map.insert(              conn.addr, -            Player { conn, uid, name, clr, position: (0,0) }, +            Player { conn, uid, name, clr },          );          uid      } | 
