From 5d3ab3dcf8deb887877523e0293237bcee804b1b Mon Sep 17 00:00:00 2001 From: stale Date: Wed, 11 May 2022 07:00:55 -0300 Subject: Ported, it woooorks --- src/conn.rs | 133 ++++++++++++------------------------------------------------ 1 file changed, 26 insertions(+), 107 deletions(-) (limited to 'src/conn.rs') diff --git a/src/conn.rs b/src/conn.rs index 6d08942..fda3a2b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,106 +1,19 @@ use crate::types::*; -use std::{ - net::SocketAddr, - convert::Infallible, - pin::Pin, - task::Poll, -}; -use hyper::{ Request, Response, Body, service::Service }; -use hyper_tungstenite::{ - HyperWebsocket, - tungstenite::Message, -}; -use futures::{ - Future, - SinkExt, - StreamExt, - TryStreamExt, - TryFuture, - TryFutureExt, - channel::mpsc::unbounded, - pin_mut, -}; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; +use warp::ws::Message; -const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); - -#[derive(Clone)] -pub struct PerConnHandler { - pub state: State, - pub local_addr: SocketAddr, - pub remote_addr: SocketAddr, - pub game_cmd_tx: MovReqTx, - pub page: String, -} - -impl PerConnHandler { - async fn handle_req(self, req: Request) - -> std::result::Result, Infallible> - { - if hyper_tungstenite::is_upgrade_request(&req) { - let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket"); - tokio::spawn(async move { - peer_connection(self, wsocket).await; - }); - return Ok(resp); - } - - //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?; - let mut uri_path = req.uri().path().split('/').skip(1); - let actual_path = uri_path.next(); - - use hyper::{ Method, StatusCode }; - match (req.method(), actual_path) { - (&Method::GET, None | Some("")) => { - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(self.page)) - .unwrap()) - }, - (&Method::GET, Some("font.ttf")) => { - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(FONT_FILE)) - .unwrap()) - }, - _ => { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap()) - } - } - } -} - -impl Service> for PerConnHandler -{ - type Response = Response; - type Error = Infallible; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let cloned_self = self.clone(); - Box::pin(cloned_self.handle_req(req)) - } -} - -pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { - let socket = socket.await.expect("couldn't finish websocket connection"); - let addr = conn.remote_addr; - let peers = &conn.state.peers; - let mut cmd_tx = &conn.game_cmd_tx; - println!("Incoming TCP connection from: {addr}"); +pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) { + let addr = conndata.remote_addr; + let peers = &conndata.peers; + let cmd_tx = conndata.cmd_tx.clone(); + println!("Incoming TCP connection from: {}", conndata.remote_addr); let mut peer_name = "unknown".to_string(); // for game -> conn comms - let (tx, rx) = unbounded(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); // for server -> client comms - let (outgoing, mut incoming) = socket.split(); + let (mut outgoing, mut incoming) = socket.split(); let process_incoming = async { while let Ok(cmd) = incoming.try_next().await { @@ -112,7 +25,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { peers.remove(&addr); } for (paddr, p) in peers.iter() { - if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) { + if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) { println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); } } @@ -120,7 +33,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { } // if it ain't text we can't handle it if !cmd.is_text() { continue; } - let cmd = cmd.to_text().unwrap(); + let cmd = cmd.to_str().to_owned().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { @@ -144,7 +57,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { { let peers = peers.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { - let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); + let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } @@ -160,7 +73,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await { + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); }; }, @@ -173,7 +86,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await { + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { println!("couldn't process \"{peer_name}\"'s flag command: {e}"); }; }, @@ -184,7 +97,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { }, "reset" => { println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Reset).await { + if let Err(e) = cmd_tx.send(MetaMove::Reset) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, @@ -206,8 +119,8 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { position: (0,0) }); } - tx.unbounded_send(Message::text(format!("id {id}"))).unwrap(); - if let Err(e) = cmd_tx.send(MetaMove::Dump).await { + tx.send(Message::text(format!("id {id}"))).unwrap(); + if let Err(e) = cmd_tx.send(MetaMove::Dump) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, @@ -217,10 +130,16 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { } } }; - let send_to_browser = rx.map(Ok).forward(outgoing); + let send_to_client = async move { + while let Some(m) = rx.recv().await { + if let Err(e) = outgoing.send(m).await { + println!("something went bad lol: {e}"); + }; + } + }; - pin_mut!(process_incoming, send_to_browser); - futures_util::future::select(process_incoming, send_to_browser).await; + futures_util::pin_mut!(process_incoming, send_to_client); + futures_util::future::select(process_incoming, send_to_client).await; println!("{addr} disconnected"); peers.write().await.remove(&addr); -- cgit v1.2.3