diff options
Diffstat (limited to 'src/conn.rs')
-rw-r--r-- | src/conn.rs | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..6d08942 --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,227 @@ +use crate::types::*; +use std::{ + net::SocketAddr, + convert::Infallible, + pin::Pin, + task::Poll, +}; +use hyper::{ Request, Response, Body, service::Service }; +use hyper_tungstenite::{ + HyperWebsocket, + tungstenite::Message, +}; +use futures::{ + Future, + SinkExt, + StreamExt, + TryStreamExt, + TryFuture, + TryFutureExt, + channel::mpsc::unbounded, + pin_mut, +}; + +const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); + +#[derive(Clone)] +pub struct PerConnHandler { + pub state: State, + pub local_addr: SocketAddr, + pub remote_addr: SocketAddr, + pub game_cmd_tx: MovReqTx, + pub page: String, +} + +impl PerConnHandler { + async fn handle_req(self, req: Request<Body>) + -> std::result::Result<Response<Body>, Infallible> + { + if hyper_tungstenite::is_upgrade_request(&req) { + let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket"); + tokio::spawn(async move { + peer_connection(self, wsocket).await; + }); + return Ok(resp); + } + + //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?; + let mut uri_path = req.uri().path().split('/').skip(1); + let actual_path = uri_path.next(); + + use hyper::{ Method, StatusCode }; + match (req.method(), actual_path) { + (&Method::GET, None | Some("")) => { + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(self.page)) + .unwrap()) + }, + (&Method::GET, Some("font.ttf")) => { + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(FONT_FILE)) + .unwrap()) + }, + _ => { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap()) + } + } + } +} + +impl Service<Request<Body>> for PerConnHandler +{ + type Response = Response<Body>; + type Error = Infallible; + type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response,Self::Error>> + Send>>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + let cloned_self = self.clone(); + Box::pin(cloned_self.handle_req(req)) + } +} + +pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { + let socket = socket.await.expect("couldn't finish websocket connection"); + let addr = conn.remote_addr; + let peers = &conn.state.peers; + let mut cmd_tx = &conn.game_cmd_tx; + println!("Incoming TCP connection from: {addr}"); + + let mut peer_name = "unknown".to_string(); + + // for game -> conn comms + let (tx, rx) = unbounded(); + // for server -> client comms + let (outgoing, mut incoming) = socket.split(); + + let process_incoming = async { + while let Ok(cmd) = incoming.try_next().await { + if let Some(cmd) = cmd { + if cmd.is_close() { + println!("closing \"{peer_name}\"@{addr}"); + let mut peers = peers.write().await; + if let Some(_) = peers.get(&addr) { + peers.remove(&addr); + } + for (paddr, p) in peers.iter() { + if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) { + println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); + } + } + break; + } + // if it ain't text we can't handle it + if !cmd.is_text() { continue; } + let cmd = cmd.to_text().unwrap(); + + let mut fields = cmd.split(" "); + let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { + let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); + let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); + x.zip(y) + }; + if let Some(cmd_name) = fields.next() { + use crate::minesweeper::{Move,MoveType}; + match cmd_name { + "pos" => { + match parse_pos(fields) { + Some(pos) => { + let (name, id) = { + let mut peers = peers.write().await; + let mut entry = peers.get_mut(&addr).unwrap(); + entry.position = pos.clone(); + (entry.name.clone(), entry.seq_id) + }; + let sanitized_name = name.replace(" ", " ").to_string(); + { + let peers = peers.read().await; + for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { + let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); + if let Err(e) = r { + println!("error sending pos update: {e}"); + } + } + } + }, + None => { + println!("bad position update from \"{peer_name}@{addr}\""); + }, + } + }, + "reveal" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from \"{peer_name}\"@{addr}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await { + println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); + }; + }, + None => { + println!("bad reveal from \"{peer_name}\"@{addr}"); + } + } + }, + "flag" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from \"{peer_name}\"@{addr}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await { + println!("couldn't process \"{peer_name}\"'s flag command: {e}"); + }; + }, + None => { + println!("bad flag from \"{peer_name}\"@{addr}"); + } + } + }, + "reset" => { + println!("{cmd} from \"{peer_name}\"@{addr}"); + if let Err(e) = cmd_tx.send(MetaMove::Reset).await { + println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); + } + }, + "register" => { + let name = fields.collect::<Vec<&str>>().join(&" "); + let id; + if name.is_empty() { + peer_name = "anon".to_string(); + } else { + peer_name = name; + } + { // new scope cuz paranoid bout deadlocks + let mut peers = peers.write().await; + id = peers.len(); + peers.insert(addr, Peer { + tx: tx.clone(), + seq_id: id, + name: peer_name.clone(), + position: (0,0) + }); + } + tx.unbounded_send(Message::text(format!("id {id}"))).unwrap(); + if let Err(e) = cmd_tx.send(MetaMove::Dump).await { + println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); + } + }, + e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), + } + } + } + } + }; + let send_to_browser = rx.map(Ok).forward(outgoing); + + pin_mut!(process_incoming, send_to_browser); + futures_util::future::select(process_incoming, send_to_browser).await; + + println!("{addr} disconnected"); + peers.write().await.remove(&addr); +} |