diff options
| -rw-r--r-- | Cargo.lock | 64 | ||||
| -rw-r--r-- | Cargo.toml | 2 | ||||
| -rw-r--r-- | src/conn.rs | 227 | ||||
| -rw-r--r-- | src/main.rs | 356 | ||||
| -rw-r--r-- | src/tls_stuff.rs | 159 | ||||
| -rw-r--r-- | src/types.rs | 43 | 
6 files changed, 431 insertions, 420 deletions
| @@ -3,6 +3,12 @@  version = 3  [[package]] +name = "anyhow" +version = "1.0.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" + +[[package]]  name = "autocfg"  version = "1.1.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -125,12 +131,28 @@ dependencies = [  ]  [[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]]  name = "futures-channel"  version = "0.3.21"  source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"  dependencies = [   "futures-core", + "futures-sink",  ]  [[package]] @@ -140,6 +162,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"  [[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]]  name = "futures-sink"  version = "0.3.21"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -157,9 +207,13 @@ version = "0.3.21"  source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"  dependencies = [ + "futures-channel",   "futures-core", + "futures-io", + "futures-macro",   "futures-sink",   "futures-task", + "memchr",   "pin-project-lite",   "pin-utils",   "slab", @@ -487,9 +541,9 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"  [[package]]  name = "proc-macro2" -version = "1.0.37" +version = "1.0.38"  source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" +checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa"  dependencies = [   "unicode-xid",  ] @@ -721,9 +775,9 @@ dependencies = [  [[package]]  name = "tokio-rustls" -version = "0.23.3" +version = "0.23.4"  source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"  dependencies = [   "rustls",   "tokio", @@ -976,7 +1030,9 @@ dependencies = [  name = "websweeper"  version = "1.0.0"  dependencies = [ + "anyhow",   "format-bytes", + "futures",   "futures-channel",   "futures-util",   "hyper", @@ -13,6 +13,8 @@ tokio-rustls = "0.23"  rustls-pemfile = "1"  hyper-tungstenite = "0.8"  rand = "0.8" +futures = "0.3"  futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }  futures-channel = "0.3"  format-bytes = "0.3" +anyhow = "1.0" diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..6d08942 --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,227 @@ +use crate::types::*; +use std::{ +    net::SocketAddr, +    convert::Infallible, +    pin::Pin, +    task::Poll, +}; +use hyper::{ Request, Response, Body, service::Service }; +use hyper_tungstenite::{ +    HyperWebsocket, +    tungstenite::Message, +}; +use futures::{ +    Future, +    SinkExt, +    StreamExt, +    TryStreamExt, +    TryFuture, +    TryFutureExt, +    channel::mpsc::unbounded, +    pin_mut, +}; + +const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); + +#[derive(Clone)] +pub struct PerConnHandler { +    pub state: State, +    pub local_addr: SocketAddr, +    pub remote_addr: SocketAddr, +    pub game_cmd_tx: MovReqTx, +    pub page: String, +} + +impl PerConnHandler { +    async fn handle_req(self, req: Request<Body>) +        -> std::result::Result<Response<Body>, Infallible> +    { +        if hyper_tungstenite::is_upgrade_request(&req) { +            let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket"); +            tokio::spawn(async move { +                peer_connection(self, wsocket).await; +            }); +            return Ok(resp); +        } + +        //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?; +        let mut uri_path = req.uri().path().split('/').skip(1); +        let actual_path = uri_path.next(); + +        use hyper::{ Method, StatusCode }; +        match (req.method(), actual_path) { +            (&Method::GET, None | Some("")) => { +                Ok(Response::builder() +                    .status(StatusCode::OK) +                    .body(Body::from(self.page)) +                    .unwrap()) +            }, +            (&Method::GET, Some("font.ttf")) => { +                Ok(Response::builder() +                    .status(StatusCode::OK) +                    .body(Body::from(FONT_FILE)) +                    .unwrap()) +            }, +            _ => { +                Ok(Response::builder() +                    .status(StatusCode::NOT_FOUND) +                    .body(Body::empty()) +                    .unwrap()) +            } +        } +    } +} + +impl Service<Request<Body>> for PerConnHandler +{ +    type Response = Response<Body>; +    type Error = Infallible; +    type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response,Self::Error>> + Send>>; + +    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { +        Poll::Ready(Ok(())) +    } + +    fn call(&mut self, req: Request<Body>) -> Self::Future { +        let cloned_self = self.clone(); +        Box::pin(cloned_self.handle_req(req)) +    } +} + +pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { +    let socket = socket.await.expect("couldn't finish websocket connection"); +    let addr = conn.remote_addr; +    let peers = &conn.state.peers; +    let mut cmd_tx = &conn.game_cmd_tx; +    println!("Incoming TCP connection from: {addr}"); + +    let mut peer_name = "unknown".to_string(); + +    // for game -> conn comms +    let (tx, rx) = unbounded(); +    // for server -> client comms +    let (outgoing, mut incoming) = socket.split(); + +    let process_incoming = async { +        while let Ok(cmd) = incoming.try_next().await { +            if let Some(cmd) = cmd { +                if cmd.is_close() { +                    println!("closing \"{peer_name}\"@{addr}"); +                    let mut peers = peers.write().await; +                    if let Some(_) = peers.get(&addr) { +                        peers.remove(&addr); +                    } +                    for (paddr, p) in peers.iter() { +                        if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) { +                            println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); +                        } +                    } +                    break; +                } +                // if it ain't text we can't handle it +                if !cmd.is_text() { continue; } +                let cmd = cmd.to_text().unwrap(); + +                let mut fields = cmd.split(" "); +                let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { +                    let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); +                    let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); +                    x.zip(y) +                }; +                if let Some(cmd_name) = fields.next() { +                    use crate::minesweeper::{Move,MoveType}; +                    match cmd_name { +                        "pos" => { +                            match parse_pos(fields) { +                                Some(pos) => { +                                    let (name, id) = { +                                        let mut peers = peers.write().await; +                                        let mut entry = peers.get_mut(&addr).unwrap(); +                                        entry.position = pos.clone(); +                                        (entry.name.clone(), entry.seq_id) +                                    }; +                                    let sanitized_name = name.replace(" ", " ").to_string(); +                                    { +                                        let peers = peers.read().await; +                                        for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { +                                            let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); +                                            if let Err(e) = r { +                                                println!("error sending pos update: {e}"); +                                            } +                                        } +                                    } +                                }, +                                None => { +                                    println!("bad position update from \"{peer_name}@{addr}\""); +                                }, +                            } +                        }, +                        "reveal" => { +                            match parse_pos(fields) { +                                Some(pos) => { +                                    println!("{cmd} from \"{peer_name}\"@{addr}"); +                                    if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await { +                                        println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); +                                    }; +                                }, +                                None => { +                                    println!("bad reveal from \"{peer_name}\"@{addr}"); +                                } +                            } +                        }, +                        "flag" => { +                            match parse_pos(fields) { +                                Some(pos) => { +                                    println!("{cmd} from \"{peer_name}\"@{addr}"); +                                    if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await { +                                        println!("couldn't process \"{peer_name}\"'s flag command: {e}"); +                                    }; +                                }, +                                None => { +                                    println!("bad flag from \"{peer_name}\"@{addr}"); +                                } +                            } +                        }, +                        "reset" => { +                            println!("{cmd} from \"{peer_name}\"@{addr}"); +                            if let Err(e) = cmd_tx.send(MetaMove::Reset).await { +                                println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); +                            } +                        }, +                        "register" => { +                            let name = fields.collect::<Vec<&str>>().join(&" "); +                            let id; +                            if name.is_empty() { +                                peer_name = "anon".to_string(); +                            } else { +                                peer_name = name; +                            } +                            {   // new scope cuz paranoid bout deadlocks +                                let mut peers = peers.write().await; +                                id = peers.len(); +                                peers.insert(addr, Peer { +                                    tx: tx.clone(), +                                    seq_id: id, +                                    name: peer_name.clone(), +                                    position: (0,0) +                                }); +                            } +                            tx.unbounded_send(Message::text(format!("id {id}"))).unwrap(); +                            if let Err(e) = cmd_tx.send(MetaMove::Dump).await { +                                println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); +                            } +                        }, +                        e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), +                    } +                } +            } +        } +    }; +    let send_to_browser = rx.map(Ok).forward(outgoing); + +    pin_mut!(process_incoming, send_to_browser); +    futures_util::future::select(process_incoming, send_to_browser).await; + +    println!("{addr} disconnected"); +    peers.write().await.remove(&addr); +} diff --git a/src/main.rs b/src/main.rs index 1367c60..cafc9d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,117 +1,130 @@ +#[macro_use] extern crate anyhow; + +pub use anyhow::{ Result, Error }; +  use std::{ -    error::Error, -    sync::{ Arc, atomic::{ self, AtomicUsize }}, -    net::SocketAddr,      convert::Infallible,      collections::HashMap, +    net::SocketAddr, +    sync::Arc,  }; +mod types; +mod conn;  mod minesweeper; -mod tls_stuff; -use minesweeper::*; -use tls_stuff::*; +use types::*; -use hyper::{ Method, StatusCode, Body, Request, Response, Server }; +use hyper::{ Body, Request, Server };  use hyper::server::conn::{ AddrStream, AddrIncoming }; -use hyper::service::{make_service_fn, service_fn}; -use tokio::sync::{ -    RwLock, -    mpsc, -}; +use hyper::service::{make_service_fn, service_fn, Service}; +use tokio::sync::RwLock; +use std::fs; -type HtmlResult = Result<Response<Body>, Response<Body>>; -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt}; +use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt}; -use tokio::fs;  use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket }; -use tokio_rustls::{ rustls, Accept, server::TlsStream }; - -type Tx = UnboundedSender<Message>; -type MovReqTx = mpsc::UnboundedSender<MetaMove>; -type PeerMap = Arc<RwLock<HashMap<SocketAddr, (Tx, usize, String, (usize, usize))>>>; -type PeerInfo = (PeerMap, Arc::<AtomicUsize>); - -#[derive(Debug)] -enum MetaMove { -    Move(Move,SocketAddr), -    Dump, -    Reset, -} +use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor }; +use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig, +             Certificate, PrivateKey}; + +fn main() -> Result<()> { +    let conf = Config { +        cert_path: "./cert.pem".to_owned(), +        pkey_path: "./cert.rsa".to_owned(), +        page_path: "./page.html".to_owned(), +        socket_addr: ([0,0,0,0],31235).into(), +    }; -const PAGE_RELPATH: &str = "./page.html"; -const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf"); +    let state = State { +        conf, +        peers: PeerMap::new(RwLock::new(HashMap::new())), +    }; -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { -    let sequential_id = Arc::new(AtomicUsize::new(0)); -    let peers = PeerMap::new(RwLock::new(HashMap::new())); -    let peer_info: PeerInfo = (peers.clone(), sequential_id.clone()); -    let addr = SocketAddr::from(([0, 0, 0, 0], 31235)); +    let cert_pem = fs::read(&state.conf.cert_path)?; +    let pkey_pem = fs::read(&state.conf.pkey_path)?;      // Build TLS configuration.      let tls_cfg = { -        // Load public certificate. -        let certs = load_certs("cert.pem")?; -        // Load private key. -        let key = load_private_key("cert.rsa")?; -        // Do not use client certificate authentication. -        let mut cfg = rustls::ServerConfig::builder() +        let certs: Vec<Certificate> = rustls_pemfile::certs(&mut &*cert_pem) +            .map(|mut certs| certs.drain(..).map(Certificate).collect())?; +        if certs.len() < 1 { +            return Err(anyhow!("No certificates found")); +        } +        let mut keys: Vec<PrivateKey> = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem) +            .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; +        if keys.len() < 1 { +            return Err(anyhow!("No private keys found")); +        } + +        let mut tls_cfg = ServerConfig::builder()              .with_safe_defaults()              .with_no_client_auth() -            .with_single_cert(certs, key) -            .map_err(|e| error(format!("{}", e)))?; +            .with_single_cert(certs, keys.remove(0))?; +          // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order. -        cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; -        Arc::new(cfg) +        tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; +        tls_cfg      }; -    // Create a TCP listener via tokio. -    let incoming = AddrIncoming::bind(&addr)?; +    tokio_main(state, tls_cfg) +} -    let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); -    let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone())); +#[tokio::main] +async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> { +    let server = main_server(state.clone(), tls_conf); +    println!("Serving on {}", state.conf.socket_addr); +    server.await?; +    Ok(()) +} -    let serv = make_service_fn(|socket: &tls_stuff::TlsStream| { -        let addr = { -            if let State::Streaming(ref s) = &socket.state { -                s.get_ref().0.remote_addr() -            } else { -                std::net::SocketAddr::new(std::net::Ipv4Addr::new(0,0,0,0).into(),0) +async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> { +    // Create a TCP listener, that uses TLS +    let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?; +    let local_addr = listener.local_addr()?; +    let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf)); +    let http = hyper::server::conn::Http::new(); + +    // Start the temporary single lobby +    let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded(); +    let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); + +    loop { +        let (conn, remote_addr) = listener.accept().await?; +        let acceptor = acceptor.clone(); +        let http = http.clone(); +        let cloned_state = state.clone(); +        let cmd_tx = cmd_tx.clone(); +        let fut = async move { +            match acceptor.accept(conn).await { +                Ok(stream) => { +                    let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await; +                    if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); } + +                    let handler = conn::PerConnHandler { +                        state: cloned_state, +                        local_addr, +                        remote_addr, +                        game_cmd_tx: cmd_tx, +                        page: page.unwrap(), +                    }; +                    if let Err(e) = http.serve_connection(stream, handler).await { +                        eprintln!("hyper error for {remote_addr}: {e}"); +                    } +                }, +                Err(e) => eprintln!("TLS error for {remote_addr}: {e}"),              } +            Ok(())          }; -        let peer_info = peer_info.clone(); -        let cmd_tx = cmd_tx.clone(); -        async move { -            Ok::<_, Infallible>(service_fn(move |req: Request<Body>| { -                let peer_info = peer_info.clone(); -                let cmd_tx = cmd_tx.clone(); -                async move { -                    Ok::<_,Infallible>(match handle_req(req, peer_info.clone(), cmd_tx.clone(), addr).await { -                        Ok(r) => r, -                        Err(r) => r, -                    }) -                } -            })) -        } -    }); - -    // Run the future, keep going until an error occurs. -    println!("Starting to serve on https://{}.", addr); -    let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming)) -        .serve(serv) -        .with_graceful_shutdown(shutdown_signal()); -    if let Err(e) = server.await { -        eprint!("server error: {}", e); +        tokio::spawn(fut);      } -    Ok(())  }  // If a move is made, broadcast new board, else just send current board -async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) { +async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) { +    use minesweeper::*;      let mut game = Game::new(Board::new(75,35), (75*35)/8);      let mut latest_player_name = None; -    while let Some(req) = move_rx.recv().await { +    while let Some(req) = move_rx.next().await {          let done = game.phase == Phase::Die || game.phase == Phase::Win;          match req {              MetaMove::Move(m, o) => if !done { @@ -119,7 +132,7 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap                  if game.phase == Phase::Win || game.phase == Phase::Die {                      game.board = game.board.grade();                  } -                latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone()); +                latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone());              },              MetaMove::Dump => (),              MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, @@ -133,9 +146,9 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap          }          {              let peers = peers.read().await; -            for (addr, (tx, _, _, _)) in peers.iter() { +            for (addr, p) in peers.iter() {                  for r in reply.iter() { -                    if let Err(e) = tx.unbounded_send(r.clone()) { +                    if let Err(e) = p.tx.unbounded_send(r.clone()) {                          println!("couldn't send game update {r} to {addr}: {e}");                      }                  } @@ -144,177 +157,6 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap      }  } -async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, socket: HyperWebsocket, addr: SocketAddr) { -    let socket = socket.await.unwrap(); // FIXME error handling -    println!("Incoming TCP connection from: {}", addr); - -    let peer_map = peer_info.0; -    let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel); -    let mut peer_name = "unknown".to_string(); - -    let (tx, rx) = unbounded(); - -    let (outgoing, mut incoming) = socket.split(); - -    let process_incoming = async { -        while let Ok(cmd) = incoming.try_next().await { -            if let Some(cmd) = cmd { -                if cmd.is_close() { -                    println!("closing \"{peer_name}\"@{addr}"); -                    let mut peers = peer_map.write().await; -                    if let Some(_) = peers.get(&addr) { -                        peers.remove(&addr); -                    } -                    for (paddr, (tx, _, pname, _)) in peers.iter() { -                        if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) { -                            println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}"); -                        } -                    } -                    break; -                } -                // if it ain't text we can't handle it -                if !cmd.is_text() { continue; } -                let cmd = cmd.to_text().unwrap(); - -                let mut fields = cmd.split(" "); -                let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { -                    let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); -                    let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); -                    x.zip(y) -                }; -                if let Some(cmd_name) = fields.next() { -                    match cmd_name { -                        "pos" => { -                            match parse_pos(fields) { -                                Some(pos) => { -                                    let (name, id) = { -                                        let mut peers = peer_map.write().await; -                                        let mut entry = peers.get_mut(&addr).unwrap(); -                                        entry.3 = pos.clone(); -                                        (entry.2.clone(), entry.1) -                                    }; -                                    let sanitized_name = name.replace(" ", " ").to_string(); -                                    { -                                        let peers = peer_map.read().await; -                                        for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) { -                                            let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); -                                            if let Err(e) = r { -                                                println!("error sending pos update: {e}"); -                                            } -                                        } -                                    } -                                }, -                                None => { -                                    println!("bad position update from \"{peer_name}@{addr}\""); -                                }, -                            } -                        }, -                        "reveal" => { -                            match parse_pos(fields) { -                                Some(pos) => { -                                    println!("{cmd} from \"{peer_name}\"@{addr}"); -                                    cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap(); -                                }, -                                None => { -                                    println!("bad reveal from \"{peer_name}\"@{addr}"); -                                } -                            } -                        }, -                        "flag" => { -                            match parse_pos(fields) { -                                Some(pos) => { -                                    println!("{cmd} from \"{peer_name}\"@{addr}"); -                                    cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap(); -                                }, -                                None => { -                                    println!("bad flag from \"{peer_name}\"@{addr}"); -                                } -                            } -                        }, -                        "reset" => { -                            println!("{cmd} from \"{peer_name}\"@{addr}"); -                            if let Err(e) = cmd_tx.send(MetaMove::Reset) { -                                println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); -                            } -                        }, -                        "register" => { -                            let name = fields.collect::<Vec<&str>>().join(&" "); -                            if name.is_empty() { -                                peer_name = "anon".to_string(); -                            } else { -                                peer_name = name; -                            } -                            {   // new scope cuz paranoid bout deadlocks -                                peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0))); -                            } -                            tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap(); -                            if let Err(e) = cmd_tx.send(MetaMove::Dump) { -                                println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); -                            } -                        }, -                        e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), -                    } -                } -            } -        } -    }; -    let send_to_browser = rx.map(Ok).forward(outgoing); - -    pin_mut!(process_incoming, send_to_browser); -    future::select(process_incoming, send_to_browser).await; - -    println!("{} disconnected", &addr); -    peer_map.write().await.remove(&addr); -} - -async fn handle_req(mut request: Request<Body>, peer_info: PeerInfo, cmd_tx: MovReqTx, addr: SocketAddr) -> HtmlResult { -    if hyper_tungstenite::is_upgrade_request(&request) { -        let (resp, wsocket) = hyper_tungstenite::upgrade(&mut request, None).expect("couldn't upgrade to websocket"); -        tokio::spawn(async move { -            peer_connection(peer_info.clone(), cmd_tx.clone(), wsocket, addr).await; -        }); -        return Ok(resp); -    } - -    let page = fs::read_to_string(PAGE_RELPATH).await.unwrap(); -    let mut uri_path = request.uri().path().split('/').skip(1); -    let actual_path = uri_path.next(); - -    match (request.method(), actual_path) { -        (&Method::GET, None | Some("")) => { -            Response::builder() -                .status(StatusCode::OK) -                .body(Body::from(page)) -                .map_err(errpage) -        }, -        (&Method::GET, Some("saddr")) => { -            Response::builder() -                .status(StatusCode::OK) -                .body(Body::from("placeholder")) -                .map_err(errpage) -        }, -        (&Method::GET, Some("font.ttf")) => { -            Response::builder() -                .status(StatusCode::OK) -                .body(Body::from(FONT_FILE_FUCKIT)) -                .map_err(errpage) -        }, -        _ => { -            Response::builder() -                .status(StatusCode::METHOD_NOT_ALLOWED) -                .header("ALLOW", "GET, POST") -                .body(Body::empty()).map_err(errpage) -        } -    } -} - -fn errpage<T: Error>(e: T) -> Response<Body> { -    Response::builder() -        .status(StatusCode::INTERNAL_SERVER_ERROR) -        .body(e.to_string().into()) -        .unwrap() -} -  fn error(err: String) -> std::io::Error {      std::io::Error::new(std::io::ErrorKind::Other, err)  } diff --git a/src/tls_stuff.rs b/src/tls_stuff.rs deleted file mode 100644 index 83c0489..0000000 --- a/src/tls_stuff.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! Simple HTTPS echo service based on hyper-rustls -//! -//! First parameter is the mandatory port to use. -//! Certificate and private key are hardcoded to sample files. -//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2, -//! otherwise HTTP/1.1 will be used. -use core::task::{Context, Poll}; -use futures_util::ready; -use hyper::server::accept::Accept; -use hyper::server::conn::{AddrIncoming, AddrStream}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::vec::Vec; -use std::{fs, io}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_rustls::rustls::{self, ServerConfig}; - -fn error(err: String) -> io::Error { -    io::Error::new(io::ErrorKind::Other, err) -} - -pub enum State { -    Handshaking(tokio_rustls::Accept<AddrStream>), -    Streaming(tokio_rustls::server::TlsStream<AddrStream>), -} - -// tokio_rustls::server::TlsStream doesn't expose constructor methods, -// so we have to TlsAcceptor::accept and handshake to have access to it -// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first -pub struct TlsStream { -    pub state: State, -} - -impl TlsStream { -    pub fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream { -        let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream); -        TlsStream { -            state: State::Handshaking(accept), -        } -    } -} - -impl AsyncRead for TlsStream { -    fn poll_read( -        self: Pin<&mut Self>, -        cx: &mut Context, -        buf: &mut ReadBuf, -    ) -> Poll<io::Result<()>> { -        let pin = self.get_mut(); -        match pin.state { -            State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) { -                Ok(mut stream) => { -                    let result = Pin::new(&mut stream).poll_read(cx, buf); -                    pin.state = State::Streaming(stream); -                    result -                } -                Err(err) => Poll::Ready(Err(err)), -            }, -            State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf), -        } -    } -} - -impl AsyncWrite for TlsStream { -    fn poll_write( -        self: Pin<&mut Self>, -        cx: &mut Context<'_>, -        buf: &[u8], -    ) -> Poll<io::Result<usize>> { -        let pin = self.get_mut(); -        match pin.state { -            State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) { -                Ok(mut stream) => { -                    let result = Pin::new(&mut stream).poll_write(cx, buf); -                    pin.state = State::Streaming(stream); -                    result -                } -                Err(err) => Poll::Ready(Err(err)), -            }, -            State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf), -        } -    } - -    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { -        match self.state { -            State::Handshaking(_) => Poll::Ready(Ok(())), -            State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), -        } -    } - -    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { -        match self.state { -            State::Handshaking(_) => Poll::Ready(Ok(())), -            State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), -        } -    } -} - -pub struct TlsAcceptor { -    config: Arc<ServerConfig>, -    incoming: AddrIncoming, -} - -impl TlsAcceptor { -    pub fn new(config: Arc<ServerConfig>, incoming: AddrIncoming) -> TlsAcceptor { -        TlsAcceptor { config, incoming } -    } -} - -impl Accept for TlsAcceptor { -    type Conn = TlsStream; -    type Error = io::Error; - -    fn poll_accept( -        self: Pin<&mut Self>, -        cx: &mut Context<'_>, -    ) -> Poll<Option<Result<Self::Conn, Self::Error>>> { -        let pin = self.get_mut(); -        match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) { -            Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))), -            Some(Err(e)) => Poll::Ready(Some(Err(e))), -            None => Poll::Ready(None), -        } -    } -} - -// Load public certificate from file. -pub fn load_certs(filename: &str) -> io::Result<Vec<rustls::Certificate>> { -    // Open certificate file. -    let certfile = fs::File::open(filename) -        .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; -    let mut reader = io::BufReader::new(certfile); - -    // Load and return certificate. -    let certs = rustls_pemfile::certs(&mut reader) -        .map_err(|_| error("failed to load certificate".into()))?; -    Ok(certs -        .into_iter() -        .map(rustls::Certificate) -        .collect()) -} - -// Load private key from file. -pub fn load_private_key(filename: &str) -> io::Result<rustls::PrivateKey> { -    // Open keyfile. -    let keyfile = fs::File::open(filename) -        .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; -    let mut reader = io::BufReader::new(keyfile); - -    // Load and return a single private key. -    let keys = rustls_pemfile::rsa_private_keys(&mut reader) -        .map_err(|_| error("failed to load private key".into()))?; -    if keys.len() != 1 { -        return Err(error("expected a single private key".into())); -    } - -    Ok(rustls::PrivateKey(keys[0].clone())) -} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..eed69dd --- /dev/null +++ b/src/types.rs @@ -0,0 +1,43 @@ +use std::{ +    collections::HashMap, +    net::SocketAddr, +    sync::Arc, +}; +use tokio::sync::RwLock; +use hyper_tungstenite::tungstenite::Message; +use hyper::{ Response, Body }; +use futures::channel::mpsc::UnboundedSender; +use crate::minesweeper; + +#[derive(Debug, Clone)] +pub struct Config { +    pub cert_path: String, +    pub pkey_path: String, +    pub page_path: String, +    pub socket_addr: SocketAddr, +} + +#[derive(Debug, Clone)] +pub struct State { +    pub conf: Config, +    pub peers: PeerMap, +} + +#[derive(Debug)] +pub enum MetaMove { +    Move(minesweeper::Move,SocketAddr), +    Dump, +    Reset, +} + +#[derive(Debug)] +pub struct Peer { +    pub tx: UnboundedSender<Message>, +    pub seq_id: usize, +    pub name: String, +    pub position: (usize, usize), +} + +pub type HtmlResult = Result<Response<Body>, Response<Body>>; +pub type MovReqTx = futures::channel::mpsc::UnboundedSender<MetaMove>; +pub type PeerMap = Arc<RwLock<HashMap<SocketAddr, Peer>>>; | 
