use crate::types::*; use std::{ net::SocketAddr, convert::Infallible, pin::Pin, task::Poll, }; use hyper::{ Request, Response, Body, service::Service }; use hyper_tungstenite::{ HyperWebsocket, tungstenite::Message, }; use futures::{ Future, SinkExt, StreamExt, TryStreamExt, TryFuture, TryFutureExt, channel::mpsc::unbounded, pin_mut, }; const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); #[derive(Clone)] pub struct PerConnHandler { pub state: State, pub local_addr: SocketAddr, pub remote_addr: SocketAddr, pub game_cmd_tx: MovReqTx, pub page: String, } impl PerConnHandler { async fn handle_req(self, req: Request) -> std::result::Result, Infallible> { if hyper_tungstenite::is_upgrade_request(&req) { let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket"); tokio::spawn(async move { peer_connection(self, wsocket).await; }); return Ok(resp); } //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?; let mut uri_path = req.uri().path().split('/').skip(1); let actual_path = uri_path.next(); use hyper::{ Method, StatusCode }; match (req.method(), actual_path) { (&Method::GET, None | Some("")) => { Ok(Response::builder() .status(StatusCode::OK) .body(Body::from(self.page)) .unwrap()) }, (&Method::GET, Some("font.ttf")) => { Ok(Response::builder() .status(StatusCode::OK) .body(Body::from(FONT_FILE)) .unwrap()) }, _ => { Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::empty()) .unwrap()) } } } } impl Service> for PerConnHandler { type Response = Response; type Error = Infallible; type Future = Pin> + Send>>; fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: Request) -> Self::Future { let cloned_self = self.clone(); Box::pin(cloned_self.handle_req(req)) } } pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { let socket = socket.await.expect("couldn't finish websocket connection"); let addr = conn.remote_addr; let peers = &conn.state.peers; let mut cmd_tx = &conn.game_cmd_tx; println!("Incoming TCP connection from: {addr}"); let mut peer_name = "unknown".to_string(); // for game -> conn comms let (tx, rx) = unbounded(); // for server -> client comms let (outgoing, mut incoming) = socket.split(); let process_incoming = async { while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { if cmd.is_close() { println!("closing \"{peer_name}\"@{addr}"); let mut peers = peers.write().await; if let Some(_) = peers.get(&addr) { peers.remove(&addr); } for (paddr, p) in peers.iter() { if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) { println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); } } break; } // if it ain't text we can't handle it if !cmd.is_text() { continue; } let cmd = cmd.to_text().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { let x = fields.next().and_then(|xstr| xstr.parse::().ok()); let y = fields.next().and_then(|ystr| ystr.parse::().ok()); x.zip(y) }; if let Some(cmd_name) = fields.next() { use crate::minesweeper::{Move,MoveType}; match cmd_name { "pos" => { match parse_pos(fields) { Some(pos) => { let (name, id) = { let mut peers = peers.write().await; let mut entry = peers.get_mut(&addr).unwrap(); entry.position = pos.clone(); (entry.name.clone(), entry.seq_id) }; let sanitized_name = name.replace(" ", " ").to_string(); { let peers = peers.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } } } }, None => { println!("bad position update from \"{peer_name}@{addr}\""); }, } }, "reveal" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await { println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); }; }, None => { println!("bad reveal from \"{peer_name}\"@{addr}"); } } }, "flag" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await { println!("couldn't process \"{peer_name}\"'s flag command: {e}"); }; }, None => { println!("bad flag from \"{peer_name}\"@{addr}"); } } }, "reset" => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Reset).await { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, "register" => { let name = fields.collect::>().join(&" "); let id; if name.is_empty() { peer_name = "anon".to_string(); } else { peer_name = name; } { // new scope cuz paranoid bout deadlocks let mut peers = peers.write().await; id = peers.len(); peers.insert(addr, Peer { tx: tx.clone(), seq_id: id, name: peer_name.clone(), position: (0,0) }); } tx.unbounded_send(Message::text(format!("id {id}"))).unwrap(); if let Err(e) = cmd_tx.send(MetaMove::Dump).await { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), } } } } }; let send_to_browser = rx.map(Ok).forward(outgoing); pin_mut!(process_incoming, send_to_browser); futures_util::future::select(process_incoming, send_to_browser).await; println!("{addr} disconnected"); peers.write().await.remove(&addr); }