use std::{ error::Error, sync::{ Arc, atomic::{ self, AtomicUsize }}, net::SocketAddr, convert::Infallible, collections::HashMap, }; mod minesweeper; mod tls_stuff; use minesweeper::*; use tls_stuff::*; use hyper::{ Method, StatusCode, Body, Request, Response, Server }; use hyper::server::conn::{ AddrStream, AddrIncoming }; use hyper::service::{make_service_fn, service_fn}; use tokio::sync::{ RwLock, mpsc, }; type HtmlResult = Result, Response>; use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt}; use tokio::fs; use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket }; use tokio_rustls::{ rustls, Accept, server::TlsStream }; type Tx = UnboundedSender; type MovReqTx = mpsc::UnboundedSender; type PeerMap = Arc>>; type PeerInfo = (PeerMap, Arc::); #[derive(Debug)] enum MetaMove { Move(Move,SocketAddr), Dump, Reset, } const PAGE_RELPATH: &str = "./page.html"; const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf"); #[tokio::main] async fn main() -> Result<(), Box> { let sequential_id = Arc::new(AtomicUsize::new(0)); let peers = PeerMap::new(RwLock::new(HashMap::new())); let peer_info: PeerInfo = (peers.clone(), sequential_id.clone()); let addr = SocketAddr::from(([0, 0, 0, 0], 31235)); // Build TLS configuration. let tls_cfg = { // Load public certificate. let certs = load_certs("cert.pem")?; // Load private key. let key = load_private_key("cert.rsa")?; // Do not use client certificate authentication. let mut cfg = rustls::ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() .with_single_cert(certs, key) .map_err(|e| error(format!("{}", e)))?; // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order. cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; Arc::new(cfg) }; // Create a TCP listener via tokio. let incoming = AddrIncoming::bind(&addr)?; let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone())); let serv = make_service_fn(|socket: &tls_stuff::TlsStream| { let addr = { if let State::Streaming(ref s) = &socket.state { s.get_ref().0.remote_addr() } else { std::net::SocketAddr::new(std::net::Ipv4Addr::new(0,0,0,0).into(),0) } }; let peer_info = peer_info.clone(); let cmd_tx = cmd_tx.clone(); async move { Ok::<_, Infallible>(service_fn(move |req: Request| { let peer_info = peer_info.clone(); let cmd_tx = cmd_tx.clone(); async move { Ok::<_,Infallible>(match handle_req(req, peer_info.clone(), cmd_tx.clone(), addr).await { Ok(r) => r, Err(r) => r, }) } })) } }); // Run the future, keep going until an error occurs. println!("Starting to serve on https://{}.", addr); let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming)) .serve(serv) .with_graceful_shutdown(shutdown_signal()); if let Err(e) = server.await { eprint!("server error: {}", e); } Ok(()) } // If a move is made, broadcast new board, else just send current board async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap) { let mut game = Game::new(Board::new(75,35), (75*35)/8); let mut latest_player_name = None; while let Some(req) = move_rx.recv().await { let done = game.phase == Phase::Die || game.phase == Phase::Win; match req { MetaMove::Move(m, o) => if !done { game = game.act(m); if game.phase == Phase::Win || game.phase == Phase::Die { game.board = game.board.grade(); } latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone()); }, MetaMove::Dump => (), MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, } let mut reply = vec![Message::binary(game.board.render())]; let lpname = latest_player_name.as_ref().map(|s| s.as_str()).unwrap_or("unknown player"); match game.phase { Phase::Win => { reply.push(Message::text(format!("win {lpname}"))); }, Phase::Die => { reply.push(Message::text(format!("lose {lpname}"))); }, _ => (), } { let peers = peers.read().await; for (addr, (tx, _, _, _)) in peers.iter() { for r in reply.iter() { if let Err(e) = tx.unbounded_send(r.clone()) { println!("couldn't send game update {r} to {addr}: {e}"); } } } } } } async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, socket: HyperWebsocket, addr: SocketAddr) { let socket = socket.await.unwrap(); // FIXME error handling println!("Incoming TCP connection from: {}", addr); let peer_map = peer_info.0; let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel); let mut peer_name = "unknown".to_string(); let (tx, rx) = unbounded(); let (outgoing, mut incoming) = socket.split(); let process_incoming = async { while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { if cmd.is_close() { println!("closing \"{peer_name}\"@{addr}"); let mut peers = peer_map.write().await; if let Some(_) = peers.get(&addr) { peers.remove(&addr); } for (paddr, (tx, _, pname, _)) in peers.iter() { if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) { println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}"); } } break; } // if it ain't text we can't handle it if !cmd.is_text() { continue; } let cmd = cmd.to_text().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { let x = fields.next().and_then(|xstr| xstr.parse::().ok()); let y = fields.next().and_then(|ystr| ystr.parse::().ok()); x.zip(y) }; if let Some(cmd_name) = fields.next() { match cmd_name { "pos" => { match parse_pos(fields) { Some(pos) => { let (name, id) = { let mut peers = peer_map.write().await; let mut entry = peers.get_mut(&addr).unwrap(); entry.3 = pos.clone(); (entry.2.clone(), entry.1) }; let sanitized_name = name.replace(" ", " ").to_string(); { let peers = peer_map.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) { let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } } } }, None => { println!("bad position update from \"{peer_name}@{addr}\""); }, } }, "reveal" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap(); }, None => { println!("bad reveal from \"{peer_name}\"@{addr}"); } } }, "flag" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap(); }, None => { println!("bad flag from \"{peer_name}\"@{addr}"); } } }, "reset" => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Reset) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, "register" => { let name = fields.collect::>().join(&" "); if name.is_empty() { peer_name = "anon".to_string(); } else { peer_name = name; } { // new scope cuz paranoid bout deadlocks peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0))); } tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap(); if let Err(e) = cmd_tx.send(MetaMove::Dump) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), } } } } }; let send_to_browser = rx.map(Ok).forward(outgoing); pin_mut!(process_incoming, send_to_browser); future::select(process_incoming, send_to_browser).await; println!("{} disconnected", &addr); peer_map.write().await.remove(&addr); } async fn handle_req(mut request: Request, peer_info: PeerInfo, cmd_tx: MovReqTx, addr: SocketAddr) -> HtmlResult { if hyper_tungstenite::is_upgrade_request(&request) { let (resp, wsocket) = hyper_tungstenite::upgrade(&mut request, None).expect("couldn't upgrade to websocket"); tokio::spawn(async move { peer_connection(peer_info.clone(), cmd_tx.clone(), wsocket, addr).await; }); return Ok(resp); } let page = fs::read_to_string(PAGE_RELPATH).await.unwrap(); let mut uri_path = request.uri().path().split('/').skip(1); let actual_path = uri_path.next(); match (request.method(), actual_path) { (&Method::GET, None | Some("")) => { Response::builder() .status(StatusCode::OK) .body(Body::from(page)) .map_err(errpage) }, (&Method::GET, Some("saddr")) => { Response::builder() .status(StatusCode::OK) .body(Body::from("placeholder")) .map_err(errpage) }, (&Method::GET, Some("font.ttf")) => { Response::builder() .status(StatusCode::OK) .body(Body::from(FONT_FILE_FUCKIT)) .map_err(errpage) }, _ => { Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) .header("ALLOW", "GET, POST") .body(Body::empty()).map_err(errpage) } } } fn errpage(e: T) -> Response { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(e.to_string().into()) .unwrap() } fn error(err: String) -> std::io::Error { std::io::Error::new(std::io::ErrorKind::Other, err) } async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); }