summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/conn.rs65
-rw-r--r--src/livepos.rs78
-rw-r--r--src/main.rs13
-rw-r--r--src/types.rs8
4 files changed, 144 insertions, 20 deletions
diff --git a/src/conn.rs b/src/conn.rs
index ed4192c..f67121b 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -4,8 +4,10 @@ use std::{
net::SocketAddr,
};
use tokio::sync::RwLock;
+use tokio::sync::mpsc as tokio_mpsc;
use futures::{SinkExt, StreamExt, TryStreamExt, stream::SplitStream};
use warp::ws::{ WebSocket, Message };
+use crate::livepos;
use ammonia;
const MAX_IN: usize = 2048;
@@ -46,6 +48,9 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc
let room_lock = room.read().await;
let mut players = room_lock.players.write().await;
if let Some(disconn_p) = players.remove(&addr) {
+ if let Err(e) = room_lock.pos_stream.send(livepos::Req { id: disconn_p.uid, data: livepos::ReqData::Quit }) {
+ println!("{room_id} E: couldn't send removal request for {disconn_p} from the live position system: {e}");
+ }
for p in players.values() {
if let Err(e) = p.conn.tx.send(Message::text(format!("logoff {}", disconn_p.uid))) {
println!("{room_id} E: couldn't deliver logoff info to {}: {}", p, e);
@@ -57,12 +62,14 @@ pub async fn lobby(socket: WebSocket, addr: SocketAddr, rinfo: (RoomId,Arc<RwLoc
}
}
-pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::UnboundedSender<Message>), addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) {
+type RoomStreams = (SplitStream<WebSocket>,tokio_mpsc::UnboundedSender<Message>);
+
+pub async fn handle_room(streams: RoomStreams, addr: SocketAddr, rinfo: (RoomId, Arc<RwLock<Room>>)) {
let (mut incoming, tx) = streams;
let (room_id, room) = rinfo;
- let (players, cmd_tx, room_conf) = {
+ let (players, cmd_tx, pos_tx, room_conf) = {
let room = room.read().await;
- (room.players.clone(), room.cmd_stream.clone(), room.conf.clone())
+ (room.players.clone(), room.cmd_stream.clone(), room.pos_stream.clone(), room.conf.clone())
};
while let Ok(cmd) = incoming.try_next().await {
if let Some(cmd) = cmd {
@@ -89,18 +96,22 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb
"pos" => {
match parse_pos(fields) {
Some(pos) => {
- me.position = pos.clone();
- let sanitized_name = me.name.replace(" ", "&nbsp").to_string();
- let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1);
- for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) {
- let r = peer_tx.send(Message::text(&msg));
- if let Err(e) = r {
- println!("{room_id} E: error sending pos update: {e}");
- }
- }
+ if let Err(e) = pos_tx.send(livepos::Req { id: me.uid, data: livepos::ReqData::Pos(pos) }) {
+ println!("{room_id} E: couldn't process {me}'s position update: {e}");
+ };
+ // me.position = pos.clone();
+ // let sanitized_name = me.name.replace(" ", "&nbsp").to_string();
+ // let msg = format!("pos {} {sanitized_name} {} {} {}", me.uid, me.clr, pos.0, pos.1);
+ // for peer_tx in players_lock.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.conn.tx) {
+ // let r = peer_tx.send(Message::text(&msg));
+ // if let Err(e) = r {
+ // println!("{room_id} E: error sending pos update: {e}");
+ // }
+ // }
},
None => {
- println!("{room_id} E: bad position update from {me}");
+ // Too spammy
+ //println!("{room_id} E: bad position update from {me}");
},
}
},
@@ -154,11 +165,27 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb
let conn = Conn { addr, tx: tx.clone() };
room.write().await.players.insert_conn(conn, name.clone(), clr).await
};
+ let players_lock = players.read().await;
+ let me = players_lock.get(&addr).unwrap();
tx.send(Message::text(format!("regack {} {} {} {}",
room_conf.name.replace(" ", "&nbsp;"), name.replace(" ", "&nbsp;"), uid, room_conf.board_conf))
).expect("couldn't send register ack");
- if let Err(e) = room.read().await.cmd_stream.send(MetaMove::Dump) {
- println!("{room_id} E: couldn't request game dump in behalf of {addr}: {e}");
+
+ {
+ let msg = Message::text(format!("players {}",
+ jsonenc_players(players_lock.values())
+ .expect("couldn't JSONify players")));
+ for p in players_lock.values() {
+ if let Err(e) = p.conn.tx.send(msg.clone()) {
+ println!("{room_id} E: couldn't dump players for {me}: {e}");
+ }
+ }
+ }
+ if let Err(e) = pos_tx.send(livepos::Req { id: uid, data: livepos::ReqData::StateDump }) {
+ println!("{room_id} E: couldn't request position dump for {me}: {e}");
+ }
+ if let Err(e) = cmd_tx.send(MetaMove::Dump) {
+ println!("{room_id} E: couldn't request game dump for {me}: {e}");
}
}
}
@@ -170,3 +197,11 @@ pub async fn handle_room(streams: (SplitStream<WebSocket>,tokio::sync::mpsc::Unb
}
}
}
+
+fn jsonenc_players<'a, I: IntoIterator<Item=&'a Player>>(players: I) -> Result<String, serde_json::Error> {
+ let mut pairs = Vec::new();
+ for player in players {
+ pairs.push((player.uid, player.name.replace(" ", "&nbsp"), player.clr.clone()));
+ }
+ serde_json::to_string(&pairs)
+}
diff --git a/src/livepos.rs b/src/livepos.rs
new file mode 100644
index 0000000..be56b1e
--- /dev/null
+++ b/src/livepos.rs
@@ -0,0 +1,78 @@
+use crate::types::*;
+use tokio::sync::mpsc as tokio_mpsc;
+use tokio::sync::Mutex;
+use std::collections::{HashMap,HashSet};
+use tokio::time::{self, Duration};
+use warp::ws::Message;
+
+pub enum ReqData {
+ Pos((usize,usize)),
+ StateDump,
+ Quit,
+}
+
+pub struct Req {
+ pub id: usize,
+ pub data: ReqData,
+}
+
+pub async fn livepos(players: PlayerMapData, mut recv: tokio_mpsc::UnboundedReceiver<Req>) {
+ let positions = Mutex::new(HashMap::new());
+ let dirty = Mutex::new(HashSet::new());
+ let process_upds = async {
+ while let Some(update) = recv.recv().await {
+ let mut dirty = dirty.lock().await;
+ let mut positions = positions.lock().await;
+ match update.data {
+ ReqData::Pos(p) => {
+ let old = positions.get(&update.id).unwrap_or(&(0,0));
+ if p != *old {
+ dirty.insert(update.id);
+ }
+ positions.insert(update.id, p);
+ },
+ ReqData::StateDump => {
+ dirty.clear();
+ dirty.extend(positions.keys().map(|x| *x));
+ },
+ ReqData::Quit => {
+ positions.remove(&update.id);
+ dirty.retain(|x| *x != update.id);
+ }
+ }
+ }
+ };
+ let periodic_send = async {
+ let mut interv = tokio::time::interval(Duration::from_millis(16));
+ interv.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
+ loop {
+ interv.tick().await;
+ let mut dirty = dirty.lock().await;
+ if dirty.len() > 0 {
+ let mut positions = positions.lock().await;
+ let msg = jsonenc_ids(&mut positions, &*dirty).expect("couldn't JSONify player positions");
+ dirty.clear();
+ let plock = players.read().await;
+ for player in plock.values() {
+ if let Err(e) = player.conn.tx.send(Message::text(format!("pos {}", msg))) {
+ println!("E: couldn't send livepos update to {}: {}", player, e);
+ }
+ }
+ }
+ }
+ };
+
+ tokio::select!(
+ _ = process_upds => (),
+ _ = periodic_send => ()
+ );
+}
+
+fn jsonenc_ids<'a, I: IntoIterator<Item=&'a usize>>(positions: &mut HashMap<usize, (usize,usize)>, ids: I) -> Result<String, serde_json::Error> {
+ let mut pairs = Vec::new();
+ for id in ids {
+ pairs.push((id, positions[id]));
+ };
+ serde_json::to_string(&pairs)
+}
+
diff --git a/src/main.rs b/src/main.rs
index 5012e95..82f1c8e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -8,6 +8,7 @@ use std::{
use futures::stream::StreamExt;
mod types;
+mod livepos;
mod conn;
mod minesweeper;
use types::*;
@@ -129,9 +130,15 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {
let n = rinfo.get("rname").unwrap().to_owned();
if n.is_empty() { uid.to_string() } else { n }
};
+
let players = PlayerMap::default();
+
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
- let handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf));
+ let game_handle = tokio::spawn(gameloop(cmd_rx, players.clone(), board_conf));
+
+ let (pos_tx, pos_rx) = tokio::sync::mpsc::unbounded_channel();
+ let livepos_handle = tokio::spawn(livepos::livepos(players.clone(), pos_rx));
+
let room_conf = RoomConf {
name,
player_cap: match limit { Some(i) => i, None => usize::MAX },
@@ -141,8 +148,10 @@ async fn tokio_main(conf: Config) -> Result<(), Box<dyn Error>> {
let new_room = Room {
conf: room_conf,
players,
- driver: handle,
+ game_driver: game_handle,
cmd_stream: cmd_tx,
+ livepos_driver: livepos_handle,
+ pos_stream: pos_tx,
};
if access.is_some() {
pubs.write().await.insert(uid.clone(), serde_json::to_string(&new_room.conf).unwrap());
diff --git a/src/types.rs b/src/types.rs
index 2d6ae22..cb16042 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -12,6 +12,7 @@ use warp::ws::Message;
use tokio::sync::RwLock;
use serde::Serialize;
use crate::minesweeper;
+use crate::livepos;
#[derive(Debug, Clone)]
pub struct Config {
@@ -35,8 +36,10 @@ pub struct RoomConf {
pub struct Room {
pub conf: RoomConf,
pub players: PlayerMap,
- pub driver: tokio::task::JoinHandle<()>,
+ pub game_driver: tokio::task::JoinHandle<()>,
pub cmd_stream: CmdTx,
+ pub livepos_driver: tokio::task::JoinHandle<()>,
+ pub pos_stream: tokio::sync::mpsc::UnboundedSender<livepos::Req>,
}
#[derive(Debug)]
@@ -58,7 +61,6 @@ pub struct Player {
pub uid: usize,
pub name: String,
pub clr: String,
- pub position: (usize, usize),
}
impl Display for Player {
@@ -125,7 +127,7 @@ impl PlayerMap {
let uid = self.uid_counter.fetch_add(1, Ordering::Relaxed);
map.insert(
conn.addr,
- Player { conn, uid, name, clr, position: (0,0) },
+ Player { conn, uid, name, clr },
);
uid
}