From ac938b0d57d2e2630bf05668fec6b3d6c18d0450 Mon Sep 17 00:00:00 2001 From: stale Date: Sat, 7 May 2022 21:38:25 -0300 Subject: this don't work no good --- Cargo.lock | 64 +++++++++- Cargo.toml | 2 + src/conn.rs | 227 +++++++++++++++++++++++++++++++++++ src/main.rs | 356 ++++++++++++++++--------------------------------------- src/tls_stuff.rs | 159 ------------------------- src/types.rs | 43 +++++++ 6 files changed, 431 insertions(+), 420 deletions(-) create mode 100644 src/conn.rs delete mode 100644 src/tls_stuff.rs create mode 100644 src/types.rs diff --git a/Cargo.lock b/Cargo.lock index 74011a6..00bb05a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "anyhow" +version = "1.0.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" + [[package]] name = "autocfg" version = "1.1.0" @@ -124,6 +130,21 @@ dependencies = [ "syn", ] +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.21" @@ -131,6 +152,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -139,6 +161,34 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.21" @@ -157,9 +207,13 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -487,9 +541,9 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "proc-macro2" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" +checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa" dependencies = [ "unicode-xid", ] @@ -721,9 +775,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.3" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls", "tokio", @@ -976,7 +1030,9 @@ dependencies = [ name = "websweeper" version = "1.0.0" dependencies = [ + "anyhow", "format-bytes", + "futures", "futures-channel", "futures-util", "hyper", diff --git a/Cargo.toml b/Cargo.toml index 9d4c83c..d54c86d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,8 @@ tokio-rustls = "0.23" rustls-pemfile = "1" hyper-tungstenite = "0.8" rand = "0.8" +futures = "0.3" futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } futures-channel = "0.3" format-bytes = "0.3" +anyhow = "1.0" diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..6d08942 --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,227 @@ +use crate::types::*; +use std::{ + net::SocketAddr, + convert::Infallible, + pin::Pin, + task::Poll, +}; +use hyper::{ Request, Response, Body, service::Service }; +use hyper_tungstenite::{ + HyperWebsocket, + tungstenite::Message, +}; +use futures::{ + Future, + SinkExt, + StreamExt, + TryStreamExt, + TryFuture, + TryFutureExt, + channel::mpsc::unbounded, + pin_mut, +}; + +const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf"); + +#[derive(Clone)] +pub struct PerConnHandler { + pub state: State, + pub local_addr: SocketAddr, + pub remote_addr: SocketAddr, + pub game_cmd_tx: MovReqTx, + pub page: String, +} + +impl PerConnHandler { + async fn handle_req(self, req: Request) + -> std::result::Result, Infallible> + { + if hyper_tungstenite::is_upgrade_request(&req) { + let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket"); + tokio::spawn(async move { + peer_connection(self, wsocket).await; + }); + return Ok(resp); + } + + //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?; + let mut uri_path = req.uri().path().split('/').skip(1); + let actual_path = uri_path.next(); + + use hyper::{ Method, StatusCode }; + match (req.method(), actual_path) { + (&Method::GET, None | Some("")) => { + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(self.page)) + .unwrap()) + }, + (&Method::GET, Some("font.ttf")) => { + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(FONT_FILE)) + .unwrap()) + }, + _ => { + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap()) + } + } + } +} + +impl Service> for PerConnHandler +{ + type Response = Response; + type Error = Infallible; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let cloned_self = self.clone(); + Box::pin(cloned_self.handle_req(req)) + } +} + +pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) { + let socket = socket.await.expect("couldn't finish websocket connection"); + let addr = conn.remote_addr; + let peers = &conn.state.peers; + let mut cmd_tx = &conn.game_cmd_tx; + println!("Incoming TCP connection from: {addr}"); + + let mut peer_name = "unknown".to_string(); + + // for game -> conn comms + let (tx, rx) = unbounded(); + // for server -> client comms + let (outgoing, mut incoming) = socket.split(); + + let process_incoming = async { + while let Ok(cmd) = incoming.try_next().await { + if let Some(cmd) = cmd { + if cmd.is_close() { + println!("closing \"{peer_name}\"@{addr}"); + let mut peers = peers.write().await; + if let Some(_) = peers.get(&addr) { + peers.remove(&addr); + } + for (paddr, p) in peers.iter() { + if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) { + println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e); + } + } + break; + } + // if it ain't text we can't handle it + if !cmd.is_text() { continue; } + let cmd = cmd.to_text().unwrap(); + + let mut fields = cmd.split(" "); + let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { + let x = fields.next().and_then(|xstr| xstr.parse::().ok()); + let y = fields.next().and_then(|ystr| ystr.parse::().ok()); + x.zip(y) + }; + if let Some(cmd_name) = fields.next() { + use crate::minesweeper::{Move,MoveType}; + match cmd_name { + "pos" => { + match parse_pos(fields) { + Some(pos) => { + let (name, id) = { + let mut peers = peers.write().await; + let mut entry = peers.get_mut(&addr).unwrap(); + entry.position = pos.clone(); + (entry.name.clone(), entry.seq_id) + }; + let sanitized_name = name.replace(" ", " ").to_string(); + { + let peers = peers.read().await; + for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) { + let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); + if let Err(e) = r { + println!("error sending pos update: {e}"); + } + } + } + }, + None => { + println!("bad position update from \"{peer_name}@{addr}\""); + }, + } + }, + "reveal" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from \"{peer_name}\"@{addr}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await { + println!("couldn't process \"{peer_name}\"'s reveal command: {e}"); + }; + }, + None => { + println!("bad reveal from \"{peer_name}\"@{addr}"); + } + } + }, + "flag" => { + match parse_pos(fields) { + Some(pos) => { + println!("{cmd} from \"{peer_name}\"@{addr}"); + if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await { + println!("couldn't process \"{peer_name}\"'s flag command: {e}"); + }; + }, + None => { + println!("bad flag from \"{peer_name}\"@{addr}"); + } + } + }, + "reset" => { + println!("{cmd} from \"{peer_name}\"@{addr}"); + if let Err(e) = cmd_tx.send(MetaMove::Reset).await { + println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); + } + }, + "register" => { + let name = fields.collect::>().join(&" "); + let id; + if name.is_empty() { + peer_name = "anon".to_string(); + } else { + peer_name = name; + } + { // new scope cuz paranoid bout deadlocks + let mut peers = peers.write().await; + id = peers.len(); + peers.insert(addr, Peer { + tx: tx.clone(), + seq_id: id, + name: peer_name.clone(), + position: (0,0) + }); + } + tx.unbounded_send(Message::text(format!("id {id}"))).unwrap(); + if let Err(e) = cmd_tx.send(MetaMove::Dump).await { + println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); + } + }, + e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), + } + } + } + } + }; + let send_to_browser = rx.map(Ok).forward(outgoing); + + pin_mut!(process_incoming, send_to_browser); + futures_util::future::select(process_incoming, send_to_browser).await; + + println!("{addr} disconnected"); + peers.write().await.remove(&addr); +} diff --git a/src/main.rs b/src/main.rs index 1367c60..cafc9d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,117 +1,130 @@ +#[macro_use] extern crate anyhow; + +pub use anyhow::{ Result, Error }; + use std::{ - error::Error, - sync::{ Arc, atomic::{ self, AtomicUsize }}, - net::SocketAddr, convert::Infallible, collections::HashMap, + net::SocketAddr, + sync::Arc, }; +mod types; +mod conn; mod minesweeper; -mod tls_stuff; -use minesweeper::*; -use tls_stuff::*; +use types::*; -use hyper::{ Method, StatusCode, Body, Request, Response, Server }; +use hyper::{ Body, Request, Server }; use hyper::server::conn::{ AddrStream, AddrIncoming }; -use hyper::service::{make_service_fn, service_fn}; -use tokio::sync::{ - RwLock, - mpsc, -}; +use hyper::service::{make_service_fn, service_fn, Service}; +use tokio::sync::RwLock; +use std::fs; -type HtmlResult = Result, Response>; -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt}; +use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt}; -use tokio::fs; use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket }; -use tokio_rustls::{ rustls, Accept, server::TlsStream }; - -type Tx = UnboundedSender; -type MovReqTx = mpsc::UnboundedSender; -type PeerMap = Arc>>; -type PeerInfo = (PeerMap, Arc::); - -#[derive(Debug)] -enum MetaMove { - Move(Move,SocketAddr), - Dump, - Reset, -} +use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor }; +use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig, + Certificate, PrivateKey}; + +fn main() -> Result<()> { + let conf = Config { + cert_path: "./cert.pem".to_owned(), + pkey_path: "./cert.rsa".to_owned(), + page_path: "./page.html".to_owned(), + socket_addr: ([0,0,0,0],31235).into(), + }; -const PAGE_RELPATH: &str = "./page.html"; -const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf"); + let state = State { + conf, + peers: PeerMap::new(RwLock::new(HashMap::new())), + }; -#[tokio::main] -async fn main() -> Result<(), Box> { - let sequential_id = Arc::new(AtomicUsize::new(0)); - let peers = PeerMap::new(RwLock::new(HashMap::new())); - let peer_info: PeerInfo = (peers.clone(), sequential_id.clone()); - let addr = SocketAddr::from(([0, 0, 0, 0], 31235)); + let cert_pem = fs::read(&state.conf.cert_path)?; + let pkey_pem = fs::read(&state.conf.pkey_path)?; // Build TLS configuration. let tls_cfg = { - // Load public certificate. - let certs = load_certs("cert.pem")?; - // Load private key. - let key = load_private_key("cert.rsa")?; - // Do not use client certificate authentication. - let mut cfg = rustls::ServerConfig::builder() + let certs: Vec = rustls_pemfile::certs(&mut &*cert_pem) + .map(|mut certs| certs.drain(..).map(Certificate).collect())?; + if certs.len() < 1 { + return Err(anyhow!("No certificates found")); + } + let mut keys: Vec = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem) + .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + if keys.len() < 1 { + return Err(anyhow!("No private keys found")); + } + + let mut tls_cfg = ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() - .with_single_cert(certs, key) - .map_err(|e| error(format!("{}", e)))?; + .with_single_cert(certs, keys.remove(0))?; + // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order. - cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - Arc::new(cfg) + tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + tls_cfg }; - // Create a TCP listener via tokio. - let incoming = AddrIncoming::bind(&addr)?; + tokio_main(state, tls_cfg) +} - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone())); +#[tokio::main] +async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> { + let server = main_server(state.clone(), tls_conf); + println!("Serving on {}", state.conf.socket_addr); + server.await?; + Ok(()) +} - let serv = make_service_fn(|socket: &tls_stuff::TlsStream| { - let addr = { - if let State::Streaming(ref s) = &socket.state { - s.get_ref().0.remote_addr() - } else { - std::net::SocketAddr::new(std::net::Ipv4Addr::new(0,0,0,0).into(),0) +async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> { + // Create a TCP listener, that uses TLS + let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?; + let local_addr = listener.local_addr()?; + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf)); + let http = hyper::server::conn::Http::new(); + + // Start the temporary single lobby + let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded(); + let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); + + loop { + let (conn, remote_addr) = listener.accept().await?; + let acceptor = acceptor.clone(); + let http = http.clone(); + let cloned_state = state.clone(); + let cmd_tx = cmd_tx.clone(); + let fut = async move { + match acceptor.accept(conn).await { + Ok(stream) => { + let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await; + if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); } + + let handler = conn::PerConnHandler { + state: cloned_state, + local_addr, + remote_addr, + game_cmd_tx: cmd_tx, + page: page.unwrap(), + }; + if let Err(e) = http.serve_connection(stream, handler).await { + eprintln!("hyper error for {remote_addr}: {e}"); + } + }, + Err(e) => eprintln!("TLS error for {remote_addr}: {e}"), } + Ok(()) }; - let peer_info = peer_info.clone(); - let cmd_tx = cmd_tx.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req: Request| { - let peer_info = peer_info.clone(); - let cmd_tx = cmd_tx.clone(); - async move { - Ok::<_,Infallible>(match handle_req(req, peer_info.clone(), cmd_tx.clone(), addr).await { - Ok(r) => r, - Err(r) => r, - }) - } - })) - } - }); - - // Run the future, keep going until an error occurs. - println!("Starting to serve on https://{}.", addr); - let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming)) - .serve(serv) - .with_graceful_shutdown(shutdown_signal()); - if let Err(e) = server.await { - eprint!("server error: {}", e); + tokio::spawn(fut); } - Ok(()) } // If a move is made, broadcast new board, else just send current board -async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap) { +async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver, peers: PeerMap) { + use minesweeper::*; let mut game = Game::new(Board::new(75,35), (75*35)/8); let mut latest_player_name = None; - while let Some(req) = move_rx.recv().await { + while let Some(req) = move_rx.next().await { let done = game.phase == Phase::Die || game.phase == Phase::Win; match req { MetaMove::Move(m, o) => if !done { @@ -119,7 +132,7 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap if game.phase == Phase::Win || game.phase == Phase::Die { game.board = game.board.grade(); } - latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone()); + latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone()); }, MetaMove::Dump => (), MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, @@ -133,9 +146,9 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap } { let peers = peers.read().await; - for (addr, (tx, _, _, _)) in peers.iter() { + for (addr, p) in peers.iter() { for r in reply.iter() { - if let Err(e) = tx.unbounded_send(r.clone()) { + if let Err(e) = p.tx.unbounded_send(r.clone()) { println!("couldn't send game update {r} to {addr}: {e}"); } } @@ -144,177 +157,6 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap } } -async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, socket: HyperWebsocket, addr: SocketAddr) { - let socket = socket.await.unwrap(); // FIXME error handling - println!("Incoming TCP connection from: {}", addr); - - let peer_map = peer_info.0; - let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel); - let mut peer_name = "unknown".to_string(); - - let (tx, rx) = unbounded(); - - let (outgoing, mut incoming) = socket.split(); - - let process_incoming = async { - while let Ok(cmd) = incoming.try_next().await { - if let Some(cmd) = cmd { - if cmd.is_close() { - println!("closing \"{peer_name}\"@{addr}"); - let mut peers = peer_map.write().await; - if let Some(_) = peers.get(&addr) { - peers.remove(&addr); - } - for (paddr, (tx, _, pname, _)) in peers.iter() { - if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) { - println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}"); - } - } - break; - } - // if it ain't text we can't handle it - if !cmd.is_text() { continue; } - let cmd = cmd.to_text().unwrap(); - - let mut fields = cmd.split(" "); - let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { - let x = fields.next().and_then(|xstr| xstr.parse::().ok()); - let y = fields.next().and_then(|ystr| ystr.parse::().ok()); - x.zip(y) - }; - if let Some(cmd_name) = fields.next() { - match cmd_name { - "pos" => { - match parse_pos(fields) { - Some(pos) => { - let (name, id) = { - let mut peers = peer_map.write().await; - let mut entry = peers.get_mut(&addr).unwrap(); - entry.3 = pos.clone(); - (entry.2.clone(), entry.1) - }; - let sanitized_name = name.replace(" ", " ").to_string(); - { - let peers = peer_map.read().await; - for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) { - let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); - if let Err(e) = r { - println!("error sending pos update: {e}"); - } - } - } - }, - None => { - println!("bad position update from \"{peer_name}@{addr}\""); - }, - } - }, - "reveal" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap(); - }, - None => { - println!("bad reveal from \"{peer_name}\"@{addr}"); - } - } - }, - "flag" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap(); - }, - None => { - println!("bad flag from \"{peer_name}\"@{addr}"); - } - } - }, - "reset" => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Reset) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - "register" => { - let name = fields.collect::>().join(&" "); - if name.is_empty() { - peer_name = "anon".to_string(); - } else { - peer_name = name; - } - { // new scope cuz paranoid bout deadlocks - peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0))); - } - tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap(); - if let Err(e) = cmd_tx.send(MetaMove::Dump) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), - } - } - } - } - }; - let send_to_browser = rx.map(Ok).forward(outgoing); - - pin_mut!(process_incoming, send_to_browser); - future::select(process_incoming, send_to_browser).await; - - println!("{} disconnected", &addr); - peer_map.write().await.remove(&addr); -} - -async fn handle_req(mut request: Request, peer_info: PeerInfo, cmd_tx: MovReqTx, addr: SocketAddr) -> HtmlResult { - if hyper_tungstenite::is_upgrade_request(&request) { - let (resp, wsocket) = hyper_tungstenite::upgrade(&mut request, None).expect("couldn't upgrade to websocket"); - tokio::spawn(async move { - peer_connection(peer_info.clone(), cmd_tx.clone(), wsocket, addr).await; - }); - return Ok(resp); - } - - let page = fs::read_to_string(PAGE_RELPATH).await.unwrap(); - let mut uri_path = request.uri().path().split('/').skip(1); - let actual_path = uri_path.next(); - - match (request.method(), actual_path) { - (&Method::GET, None | Some("")) => { - Response::builder() - .status(StatusCode::OK) - .body(Body::from(page)) - .map_err(errpage) - }, - (&Method::GET, Some("saddr")) => { - Response::builder() - .status(StatusCode::OK) - .body(Body::from("placeholder")) - .map_err(errpage) - }, - (&Method::GET, Some("font.ttf")) => { - Response::builder() - .status(StatusCode::OK) - .body(Body::from(FONT_FILE_FUCKIT)) - .map_err(errpage) - }, - _ => { - Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .header("ALLOW", "GET, POST") - .body(Body::empty()).map_err(errpage) - } - } -} - -fn errpage(e: T) -> Response { - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(e.to_string().into()) - .unwrap() -} - fn error(err: String) -> std::io::Error { std::io::Error::new(std::io::ErrorKind::Other, err) } diff --git a/src/tls_stuff.rs b/src/tls_stuff.rs deleted file mode 100644 index 83c0489..0000000 --- a/src/tls_stuff.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! Simple HTTPS echo service based on hyper-rustls -//! -//! First parameter is the mandatory port to use. -//! Certificate and private key are hardcoded to sample files. -//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2, -//! otherwise HTTP/1.1 will be used. -use core::task::{Context, Poll}; -use futures_util::ready; -use hyper::server::accept::Accept; -use hyper::server::conn::{AddrIncoming, AddrStream}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::vec::Vec; -use std::{fs, io}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_rustls::rustls::{self, ServerConfig}; - -fn error(err: String) -> io::Error { - io::Error::new(io::ErrorKind::Other, err) -} - -pub enum State { - Handshaking(tokio_rustls::Accept), - Streaming(tokio_rustls::server::TlsStream), -} - -// tokio_rustls::server::TlsStream doesn't expose constructor methods, -// so we have to TlsAcceptor::accept and handshake to have access to it -// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first -pub struct TlsStream { - pub state: State, -} - -impl TlsStream { - pub fn new(stream: AddrStream, config: Arc) -> TlsStream { - let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream); - TlsStream { - state: State::Handshaking(accept), - } - } -} - -impl AsyncRead for TlsStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf, - ) -> Poll> { - let pin = self.get_mut(); - match pin.state { - State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) { - Ok(mut stream) => { - let result = Pin::new(&mut stream).poll_read(cx, buf); - pin.state = State::Streaming(stream); - result - } - Err(err) => Poll::Ready(Err(err)), - }, - State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf), - } - } -} - -impl AsyncWrite for TlsStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let pin = self.get_mut(); - match pin.state { - State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) { - Ok(mut stream) => { - let result = Pin::new(&mut stream).poll_write(cx, buf); - pin.state = State::Streaming(stream); - result - } - Err(err) => Poll::Ready(Err(err)), - }, - State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.state { - State::Handshaking(_) => Poll::Ready(Ok(())), - State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.state { - State::Handshaking(_) => Poll::Ready(Ok(())), - State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), - } - } -} - -pub struct TlsAcceptor { - config: Arc, - incoming: AddrIncoming, -} - -impl TlsAcceptor { - pub fn new(config: Arc, incoming: AddrIncoming) -> TlsAcceptor { - TlsAcceptor { config, incoming } - } -} - -impl Accept for TlsAcceptor { - type Conn = TlsStream; - type Error = io::Error; - - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let pin = self.get_mut(); - match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) { - Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))), - Some(Err(e)) => Poll::Ready(Some(Err(e))), - None => Poll::Ready(None), - } - } -} - -// Load public certificate from file. -pub fn load_certs(filename: &str) -> io::Result> { - // Open certificate file. - let certfile = fs::File::open(filename) - .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; - let mut reader = io::BufReader::new(certfile); - - // Load and return certificate. - let certs = rustls_pemfile::certs(&mut reader) - .map_err(|_| error("failed to load certificate".into()))?; - Ok(certs - .into_iter() - .map(rustls::Certificate) - .collect()) -} - -// Load private key from file. -pub fn load_private_key(filename: &str) -> io::Result { - // Open keyfile. - let keyfile = fs::File::open(filename) - .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; - let mut reader = io::BufReader::new(keyfile); - - // Load and return a single private key. - let keys = rustls_pemfile::rsa_private_keys(&mut reader) - .map_err(|_| error("failed to load private key".into()))?; - if keys.len() != 1 { - return Err(error("expected a single private key".into())); - } - - Ok(rustls::PrivateKey(keys[0].clone())) -} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..eed69dd --- /dev/null +++ b/src/types.rs @@ -0,0 +1,43 @@ +use std::{ + collections::HashMap, + net::SocketAddr, + sync::Arc, +}; +use tokio::sync::RwLock; +use hyper_tungstenite::tungstenite::Message; +use hyper::{ Response, Body }; +use futures::channel::mpsc::UnboundedSender; +use crate::minesweeper; + +#[derive(Debug, Clone)] +pub struct Config { + pub cert_path: String, + pub pkey_path: String, + pub page_path: String, + pub socket_addr: SocketAddr, +} + +#[derive(Debug, Clone)] +pub struct State { + pub conf: Config, + pub peers: PeerMap, +} + +#[derive(Debug)] +pub enum MetaMove { + Move(minesweeper::Move,SocketAddr), + Dump, + Reset, +} + +#[derive(Debug)] +pub struct Peer { + pub tx: UnboundedSender, + pub seq_id: usize, + pub name: String, + pub position: (usize, usize), +} + +pub type HtmlResult = Result, Response>; +pub type MovReqTx = futures::channel::mpsc::UnboundedSender; +pub type PeerMap = Arc>>; -- cgit v1.2.3