diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 356 |
1 files changed, 99 insertions, 257 deletions
diff --git a/src/main.rs b/src/main.rs index 1367c60..cafc9d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,117 +1,130 @@ +#[macro_use] extern crate anyhow; + +pub use anyhow::{ Result, Error }; + use std::{ - error::Error, - sync::{ Arc, atomic::{ self, AtomicUsize }}, - net::SocketAddr, convert::Infallible, collections::HashMap, + net::SocketAddr, + sync::Arc, }; +mod types; +mod conn; mod minesweeper; -mod tls_stuff; -use minesweeper::*; -use tls_stuff::*; +use types::*; -use hyper::{ Method, StatusCode, Body, Request, Response, Server }; +use hyper::{ Body, Request, Server }; use hyper::server::conn::{ AddrStream, AddrIncoming }; -use hyper::service::{make_service_fn, service_fn}; -use tokio::sync::{ - RwLock, - mpsc, -}; +use hyper::service::{make_service_fn, service_fn, Service}; +use tokio::sync::RwLock; +use std::fs; -type HtmlResult = Result<Response<Body>, Response<Body>>; -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt}; +use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt}; -use tokio::fs; use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket }; -use tokio_rustls::{ rustls, Accept, server::TlsStream }; - -type Tx = UnboundedSender<Message>; -type MovReqTx = mpsc::UnboundedSender<MetaMove>; -type PeerMap = Arc<RwLock<HashMap<SocketAddr, (Tx, usize, String, (usize, usize))>>>; -type PeerInfo = (PeerMap, Arc::<AtomicUsize>); - -#[derive(Debug)] -enum MetaMove { - Move(Move,SocketAddr), - Dump, - Reset, -} +use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor }; +use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig, + Certificate, PrivateKey}; + +fn main() -> Result<()> { + let conf = Config { + cert_path: "./cert.pem".to_owned(), + pkey_path: "./cert.rsa".to_owned(), + page_path: "./page.html".to_owned(), + socket_addr: ([0,0,0,0],31235).into(), + }; -const PAGE_RELPATH: &str = "./page.html"; -const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf"); + let state = State { + conf, + peers: PeerMap::new(RwLock::new(HashMap::new())), + }; -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { - let sequential_id = Arc::new(AtomicUsize::new(0)); - let peers = PeerMap::new(RwLock::new(HashMap::new())); - let peer_info: PeerInfo = (peers.clone(), sequential_id.clone()); - let addr = SocketAddr::from(([0, 0, 0, 0], 31235)); + let cert_pem = fs::read(&state.conf.cert_path)?; + let pkey_pem = fs::read(&state.conf.pkey_path)?; // Build TLS configuration. let tls_cfg = { - // Load public certificate. - let certs = load_certs("cert.pem")?; - // Load private key. - let key = load_private_key("cert.rsa")?; - // Do not use client certificate authentication. - let mut cfg = rustls::ServerConfig::builder() + let certs: Vec<Certificate> = rustls_pemfile::certs(&mut &*cert_pem) + .map(|mut certs| certs.drain(..).map(Certificate).collect())?; + if certs.len() < 1 { + return Err(anyhow!("No certificates found")); + } + let mut keys: Vec<PrivateKey> = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem) + .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?; + if keys.len() < 1 { + return Err(anyhow!("No private keys found")); + } + + let mut tls_cfg = ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() - .with_single_cert(certs, key) - .map_err(|e| error(format!("{}", e)))?; + .with_single_cert(certs, keys.remove(0))?; + // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order. - cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - Arc::new(cfg) + tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + tls_cfg }; - // Create a TCP listener via tokio. - let incoming = AddrIncoming::bind(&addr)?; + tokio_main(state, tls_cfg) +} - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone())); +#[tokio::main] +async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> { + let server = main_server(state.clone(), tls_conf); + println!("Serving on {}", state.conf.socket_addr); + server.await?; + Ok(()) +} - let serv = make_service_fn(|socket: &tls_stuff::TlsStream| { - let addr = { - if let State::Streaming(ref s) = &socket.state { - s.get_ref().0.remote_addr() - } else { - std::net::SocketAddr::new(std::net::Ipv4Addr::new(0,0,0,0).into(),0) +async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> { + // Create a TCP listener, that uses TLS + let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?; + let local_addr = listener.local_addr()?; + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf)); + let http = hyper::server::conn::Http::new(); + + // Start the temporary single lobby + let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded(); + let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone())); + + loop { + let (conn, remote_addr) = listener.accept().await?; + let acceptor = acceptor.clone(); + let http = http.clone(); + let cloned_state = state.clone(); + let cmd_tx = cmd_tx.clone(); + let fut = async move { + match acceptor.accept(conn).await { + Ok(stream) => { + let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await; + if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); } + + let handler = conn::PerConnHandler { + state: cloned_state, + local_addr, + remote_addr, + game_cmd_tx: cmd_tx, + page: page.unwrap(), + }; + if let Err(e) = http.serve_connection(stream, handler).await { + eprintln!("hyper error for {remote_addr}: {e}"); + } + }, + Err(e) => eprintln!("TLS error for {remote_addr}: {e}"), } + Ok(()) }; - let peer_info = peer_info.clone(); - let cmd_tx = cmd_tx.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req: Request<Body>| { - let peer_info = peer_info.clone(); - let cmd_tx = cmd_tx.clone(); - async move { - Ok::<_,Infallible>(match handle_req(req, peer_info.clone(), cmd_tx.clone(), addr).await { - Ok(r) => r, - Err(r) => r, - }) - } - })) - } - }); - - // Run the future, keep going until an error occurs. - println!("Starting to serve on https://{}.", addr); - let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming)) - .serve(serv) - .with_graceful_shutdown(shutdown_signal()); - if let Err(e) = server.await { - eprint!("server error: {}", e); + tokio::spawn(fut); } - Ok(()) } // If a move is made, broadcast new board, else just send current board -async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) { +async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) { + use minesweeper::*; let mut game = Game::new(Board::new(75,35), (75*35)/8); let mut latest_player_name = None; - while let Some(req) = move_rx.recv().await { + while let Some(req) = move_rx.next().await { let done = game.phase == Phase::Die || game.phase == Phase::Win; match req { MetaMove::Move(m, o) => if !done { @@ -119,7 +132,7 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap if game.phase == Phase::Win || game.phase == Phase::Die { game.board = game.board.grade(); } - latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone()); + latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone()); }, MetaMove::Dump => (), MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, @@ -133,9 +146,9 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap } { let peers = peers.read().await; - for (addr, (tx, _, _, _)) in peers.iter() { + for (addr, p) in peers.iter() { for r in reply.iter() { - if let Err(e) = tx.unbounded_send(r.clone()) { + if let Err(e) = p.tx.unbounded_send(r.clone()) { println!("couldn't send game update {r} to {addr}: {e}"); } } @@ -144,177 +157,6 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap } } -async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, socket: HyperWebsocket, addr: SocketAddr) { - let socket = socket.await.unwrap(); // FIXME error handling - println!("Incoming TCP connection from: {}", addr); - - let peer_map = peer_info.0; - let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel); - let mut peer_name = "unknown".to_string(); - - let (tx, rx) = unbounded(); - - let (outgoing, mut incoming) = socket.split(); - - let process_incoming = async { - while let Ok(cmd) = incoming.try_next().await { - if let Some(cmd) = cmd { - if cmd.is_close() { - println!("closing \"{peer_name}\"@{addr}"); - let mut peers = peer_map.write().await; - if let Some(_) = peers.get(&addr) { - peers.remove(&addr); - } - for (paddr, (tx, _, pname, _)) in peers.iter() { - if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) { - println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}"); - } - } - break; - } - // if it ain't text we can't handle it - if !cmd.is_text() { continue; } - let cmd = cmd.to_text().unwrap(); - - let mut fields = cmd.split(" "); - let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { - let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok()); - let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok()); - x.zip(y) - }; - if let Some(cmd_name) = fields.next() { - match cmd_name { - "pos" => { - match parse_pos(fields) { - Some(pos) => { - let (name, id) = { - let mut peers = peer_map.write().await; - let mut entry = peers.get_mut(&addr).unwrap(); - entry.3 = pos.clone(); - (entry.2.clone(), entry.1) - }; - let sanitized_name = name.replace(" ", " ").to_string(); - { - let peers = peer_map.read().await; - for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) { - let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); - if let Err(e) = r { - println!("error sending pos update: {e}"); - } - } - } - }, - None => { - println!("bad position update from \"{peer_name}@{addr}\""); - }, - } - }, - "reveal" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap(); - }, - None => { - println!("bad reveal from \"{peer_name}\"@{addr}"); - } - } - }, - "flag" => { - match parse_pos(fields) { - Some(pos) => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap(); - }, - None => { - println!("bad flag from \"{peer_name}\"@{addr}"); - } - } - }, - "reset" => { - println!("{cmd} from \"{peer_name}\"@{addr}"); - if let Err(e) = cmd_tx.send(MetaMove::Reset) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - "register" => { - let name = fields.collect::<Vec<&str>>().join(&" "); - if name.is_empty() { - peer_name = "anon".to_string(); - } else { - peer_name = name; - } - { // new scope cuz paranoid bout deadlocks - peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0))); - } - tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap(); - if let Err(e) = cmd_tx.send(MetaMove::Dump) { - println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); - } - }, - e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), - } - } - } - } - }; - let send_to_browser = rx.map(Ok).forward(outgoing); - - pin_mut!(process_incoming, send_to_browser); - future::select(process_incoming, send_to_browser).await; - - println!("{} disconnected", &addr); - peer_map.write().await.remove(&addr); -} - -async fn handle_req(mut request: Request<Body>, peer_info: PeerInfo, cmd_tx: MovReqTx, addr: SocketAddr) -> HtmlResult { - if hyper_tungstenite::is_upgrade_request(&request) { - let (resp, wsocket) = hyper_tungstenite::upgrade(&mut request, None).expect("couldn't upgrade to websocket"); - tokio::spawn(async move { - peer_connection(peer_info.clone(), cmd_tx.clone(), wsocket, addr).await; - }); - return Ok(resp); - } - - let page = fs::read_to_string(PAGE_RELPATH).await.unwrap(); - let mut uri_path = request.uri().path().split('/').skip(1); - let actual_path = uri_path.next(); - - match (request.method(), actual_path) { - (&Method::GET, None | Some("")) => { - Response::builder() - .status(StatusCode::OK) - .body(Body::from(page)) - .map_err(errpage) - }, - (&Method::GET, Some("saddr")) => { - Response::builder() - .status(StatusCode::OK) - .body(Body::from("placeholder")) - .map_err(errpage) - }, - (&Method::GET, Some("font.ttf")) => { - Response::builder() - .status(StatusCode::OK) - .body(Body::from(FONT_FILE_FUCKIT)) - .map_err(errpage) - }, - _ => { - Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .header("ALLOW", "GET, POST") - .body(Body::empty()).map_err(errpage) - } - } -} - -fn errpage<T: Error>(e: T) -> Response<Body> { - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(e.to_string().into()) - .unwrap() -} - fn error(err: String) -> std::io::Error { std::io::Error::new(std::io::ErrorKind::Other, err) } |