From 5d3ab3dcf8deb887877523e0293237bcee804b1b Mon Sep 17 00:00:00 2001 From: stale Date: Wed, 11 May 2022 07:00:55 -0300 Subject: Ported, it woooorks --- src/conn.rs | 133 +++++++++++------------------------------------------- src/main.rs | 145 +++++++++++++++++++---------------------------------------- src/types.rs | 15 ++++--- 3 files changed, 81 insertions(+), 212 deletions(-) (limited to 'src') diff --git a/src/conn.rs b/src/conn.rs index 6d08942..fda3a2b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,106 +1,19 @@ use crate::types::*; -use std::{ - net::SocketAddr, - convert::Infallible, - pin::Pin, - task::Poll, -}; -use hyper::{ Request, Response, Body, service::Service }; -use hyper_tungstenite::{ - HyperWebsocket, - tungstenite::Message, -}; -use futures::{ - Future, - SinkExt, - StreamExt, - TryStreamExt, - TryFuture, - TryFutureExt, - channel::mpsc::unbounded, - pin_mut, -}; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; +use warp::ws::Message; -const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); - -#[derive(Clone)] -pub struct PerConnHandler { - pub state: State, - pub local_addr: SocketAddr, - pub remote_addr: SocketAddr, - pub game_cmd_tx: MovReqTx, - pub page: String, -} - -impl PerConnHandler { - async fn handle_req(self, req: Request) - -> std::result::Result, Infallible> - { - if hyper_tungstenite::is_upgrade_request(&req) { - let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket"); - tokio::spawn(async move { - peer_connection(self, wsocket).await; - }); - return Ok(resp); - } - - //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?; - let mut uri_path = req.uri().path().split('/').skip(1); - let actual_path = uri_path.next(); - - use hyper::{ Method, StatusCode }; - match (req.method(), actual_path) { - (&Method::GET, None | Some("")) => { - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(self.page)) - .unwrap()) - }, - (&Method::GET, Some("font.ttf")) => { - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(FONT_FILE)) - .unwrap()) - }, - _ => { - Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap()) - } - } - } -} - -impl Service> for PerConnHandler -{ - type Response = Response; - type Error = Infallible; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let cloned_self = self.clone(); - Box::pin(cloned_self.handle_req(req)) - } -} - -pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { - let socket = socket.await.expect("couldn't finish websocket connection"); - let addr = conn.remote_addr; - let peers = &conn.state.peers; - let mut cmd_tx = &conn.game_cmd_tx; - println!("Incoming TCP connection from: {addr}"); +pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) { + let addr = conndata.remote_addr; + let peers = &conndata.peers; + let cmd_tx = conndata.cmd_tx.clone(); + println!("Incoming TCP connection from: {}", conndata.remote_addr); let mut peer_name = "unknown".to_string(); // for game -> conn comms - let (tx, rx) = unbounded(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); // for server -> client comms - let (outgoing, mut incoming) = socket.split(); + let (mut outgoing, mut incoming) = socket.split(); let process_incoming = async { while let Ok(cmd) = incoming.try_next().await { @@ -112,7 +25,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { peers.remove(&addr); } for (paddr, p) in peers.iter() { - if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) { + if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) { println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); } } @@ -120,7 +33,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { } // if it ain't text we can't handle it if !cmd.is_text() { continue; } - let cmd = cmd.to_text().unwrap(); + let cmd = cmd.to_str().to_owned().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { @@ -144,7 +57,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { { let peers = peers.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { - let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); + let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } @@ -160,7 +73,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await { + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) { println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); }; }, @@ -173,7 +86,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await { + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) { println!("couldn't process \"{peer_name}\"'s flag command: {e}"); }; }, @@ -184,7 +97,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { }, "reset" => { println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Reset).await { + if let Err(e) = cmd_tx.send(MetaMove::Reset) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, @@ -206,8 +119,8 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { position: (0,0) }); } - tx.unbounded_send(Message::text(format!("id {id}"))).unwrap(); - if let Err(e) = cmd_tx.send(MetaMove::Dump).await { + tx.send(Message::text(format!("id {id}"))).unwrap(); + if let Err(e) = cmd_tx.send(MetaMove::Dump) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, @@ -217,10 +130,16 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { } } }; - let send_to_browser = rx.map(Ok).forward(outgoing); + let send_to_client = async move { + while let Some(m) = rx.recv().await { + if let Err(e) = outgoing.send(m).await { + println!("something went bad lol: {e}"); + }; + } + }; - pin_mut!(process_incoming, send_to_browser); - futures_util::future::select(process_incoming, send_to_browser).await; + futures_util::pin_mut!(process_incoming, send_to_client); + futures_util::future::select(process_incoming, send_to_client).await; println!("{addr} disconnected"); peers.write().await.remove(&addr); diff --git a/src/main.rs b/src/main.rs index cafc9d4..b3c3cf9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,33 +1,21 @@ -#[macro_use] extern crate anyhow; - -pub use anyhow::{ Result, Error }; - use std::{ convert::Infallible, collections::HashMap, + error::Error, net::SocketAddr, - sync::Arc, }; +use warp::Filter; mod types; mod conn; mod minesweeper; use types::*; -use hyper::{ Body, Request, Server }; -use hyper::server::conn::{ AddrStream, AddrIncoming }; -use hyper::service::{make_service_fn, service_fn, Service}; use tokio::sync::RwLock; -use std::fs; - -use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt}; -use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket }; -use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor }; -use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig, - Certificate, PrivateKey}; +const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); -fn main() -> Result<()> { +fn main() -> Result<(), Box> { let conf = Config { cert_path: "./cert.pem".to_owned(), pkey_path: "./cert.rsa".to_owned(), @@ -40,91 +28,59 @@ fn main() -> Result<()> { peers: PeerMap::new(RwLock::new(HashMap::new())), }; - let cert_pem = fs::read(&state.conf.cert_path)?; - let pkey_pem = fs::read(&state.conf.pkey_path)?; + tokio_main(state) +} - // Build TLS configuration. - let tls_cfg = { - let certs: Vec = rustls_pemfile::certs(&mut &*cert_pem) - .map(|mut certs| certs.drain(..).map(Certificate).collect())?; - if certs.len() < 1 { - return Err(anyhow!("No certificates found")); - } - let mut keys: Vec = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem) - .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; - if keys.len() < 1 { - return Err(anyhow!("No private keys found")); - } +#[tokio::main] +async fn tokio_main(state: State) -> Result<(), Box> { + // Start the temporary single lobby + let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); + let _game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); - let mut tls_cfg = ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, keys.remove(0))?; + let cmd_filter = warp::any().map(move || cmd_tx.clone()); - // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order. - tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - tls_cfg + let page_route = { + use warp::*; + get() + .and(path("font.ttf")) + .map(|| FONT_FILE) + .or(fs::file(state.conf.page_path.clone())) }; - tokio_main(state, tls_cfg) -} + let websocket_route = { + let state = state.clone(); + use warp::*; + path("ws") + .and(ws()) + .and(addr::remote()) + .and(cmd_filter) + .map(move |ws: warp::ws::Ws, saddr: Option, cmd_tx: CmdTx| { + let state = state.clone(); + println!("conn from {saddr:?}"); + ws.on_upgrade(move |socket| { + let conn_data = types::ConnData { remote_addr: saddr.unwrap(), cmd_tx: cmd_tx.clone(), peers: state.peers.clone() }; + conn::peer_connection(socket, conn_data) + }) + }) + }; + let routes = websocket_route.or(page_route); -#[tokio::main] -async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> { - let server = main_server(state.clone(), tls_conf); + let server = warp::serve(routes) + .tls() + .cert_path(state.conf.cert_path) + .key_path(state.conf.pkey_path) + .run(state.conf.socket_addr); println!("Serving on {}", state.conf.socket_addr); - server.await?; + server.await; Ok(()) } -async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> { - // Create a TCP listener, that uses TLS - let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?; - let local_addr = listener.local_addr()?; - let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf)); - let http = hyper::server::conn::Http::new(); - - // Start the temporary single lobby - let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded(); - let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); - - loop { - let (conn, remote_addr) = listener.accept().await?; - let acceptor = acceptor.clone(); - let http = http.clone(); - let cloned_state = state.clone(); - let cmd_tx = cmd_tx.clone(); - let fut = async move { - match acceptor.accept(conn).await { - Ok(stream) => { - let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await; - if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); } - - let handler = conn::PerConnHandler { - state: cloned_state, - local_addr, - remote_addr, - game_cmd_tx: cmd_tx, - page: page.unwrap(), - }; - if let Err(e) = http.serve_connection(stream, handler).await { - eprintln!("hyper error for {remote_addr}: {e}"); - } - }, - Err(e) => eprintln!("TLS error for {remote_addr}: {e}"), - } - Ok(()) - }; - tokio::spawn(fut); - } -} - // If a move is made, broadcast new board, else just send current board -async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver, peers: PeerMap) { +async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver, peers: PeerMap) { use minesweeper::*; let mut game = Game::new(Board::new(75,35), (75*35)/8); let mut latest_player_name = None; - while let Some(req) = move_rx.next().await { + while let Some(req) = move_rx.recv().await { let done = game.phase == Phase::Die || game.phase == Phase::Win; match req { MetaMove::Move(m, o) => if !done { @@ -137,6 +93,7 @@ async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver (), MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, } + use warp::ws::Message; let mut reply = vec![Message::binary(game.board.render())]; let lpname = latest_player_name.as_ref().map(|s| s.as_str()).unwrap_or("unknown player"); match game.phase { @@ -148,21 +105,11 @@ async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, err) -} - -async fn shutdown_signal() { - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); -} diff --git a/src/types.rs b/src/types.rs index eed69dd..73d51b8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -3,10 +3,8 @@ use std::{ net::SocketAddr, sync::Arc, }; +use warp::ws::Message; use tokio::sync::RwLock; -use hyper_tungstenite::tungstenite::Message; -use hyper::{ Response, Body }; -use futures::channel::mpsc::UnboundedSender; use crate::minesweeper; #[derive(Debug, Clone)] @@ -32,12 +30,17 @@ pub enum MetaMove { #[derive(Debug)] pub struct Peer { - pub tx: UnboundedSender, + pub tx: tokio::sync::mpsc::UnboundedSender, pub seq_id: usize, pub name: String, pub position: (usize, usize), } -pub type HtmlResult = Result, Response>; -pub type MovReqTx = futures::channel::mpsc::UnboundedSender; +pub struct ConnData { + pub cmd_tx: CmdTx, + pub remote_addr: SocketAddr, + pub peers: PeerMap, +} + +pub type CmdTx = tokio::sync::mpsc::UnboundedSender; pub type PeerMap = Arc>>; -- cgit v1.2.3