use std::{ error::Error, sync::{ Arc, atomic::{ self, AtomicUsize }}, net::SocketAddr, convert::Infallible, collections::HashMap, }; mod minesweeper; use minesweeper::*; //use std::convert::{ TryFrom, TryInto }; use hyper::{ Method, StatusCode, Body, Request, Response, Server }; use hyper::service::{make_service_fn, service_fn}; use tokio::sync::{ RwLock, mpsc, }; type HtmlResult = Result, Response>; use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::fs; use tokio_tungstenite::tungstenite::protocol::Message; type Tx = UnboundedSender; type MovReqTx = mpsc::UnboundedSender; type PeerMap = Arc>>; type PeerInfo = (PeerMap, Arc::); #[derive(Debug)] enum MetaMove { Move(Move,SocketAddr), Dump, Reset, } const PAGE_RELPATH: &str = "./page.html"; const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf"); #[tokio::main] async fn main() { let sequential_id = Arc::new(AtomicUsize::new(0)); let peers = PeerMap::new(RwLock::new(HashMap::new())); let peer_info: PeerInfo = (peers.clone(), sequential_id.clone()); let http_addr = SocketAddr::from(([0, 0, 0, 0], 31235)); println!("Http on {}", http_addr); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone())); // need to await this one at some point let conn_l = tokio::spawn(conn_listener(peer_info.clone(), cmd_tx.clone())); let http_serv = make_service_fn(|_| { async move { Ok::<_, Infallible>(service_fn(move |req: Request| { async move { Ok::<_,Infallible>(match handle_http_req(req).await { Ok(r) => r, Err(r) => r, }) } })) } }); let server = Server::bind(&http_addr) .serve(http_serv) .with_graceful_shutdown(shutdown_signal()); if let Err(e) = server.await { eprint!("server error: {}", e); } } // If a move is made, broadcast new board, else just send current board async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap) { let mut game = Game::new(Board::new(75,35), (75*35)/8); let mut latest_player_name = None; while let Some(req) = move_rx.recv().await { let done = game.phase == Phase::Die || game.phase == Phase::Win; match req { MetaMove::Move(m, o) => if !done { game = game.act(m); if game.phase == Phase::Win || game.phase == Phase::Die { game.board = game.board.grade(); } latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone()); }, MetaMove::Dump => (), MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); }, } let mut reply = vec![Message::binary(game.board.render())]; let lpname = latest_player_name.as_ref().map(|s| s.as_str()).unwrap_or("unknown player"); match game.phase { Phase::Win => { reply.push(Message::text(format!("win {lpname}"))); }, Phase::Die => { reply.push(Message::text(format!("lose {lpname}"))); }, _ => (), } { let peers = peers.read().await; for (addr, (tx, _, _, _)) in peers.iter() { for r in reply.iter() { if let Err(e) = tx.unbounded_send(r.clone()) { println!("couldn't send game update {r} to {addr}: {e}"); } } } } } } async fn conn_listener(peer_info: PeerInfo, cmd_tx: MovReqTx) { let ws_addr = SocketAddr::from(([0, 0, 0, 0], 31236)); let ws_socket = TcpListener::bind(&ws_addr).await; let ws_listener = ws_socket.expect("Failed to bind"); // Let's spawn the handling of each connection in a separate task. println!("Websocket on {}", ws_addr); while let Ok((stream, addr)) = ws_listener.accept().await { tokio::spawn(peer_connection(peer_info.clone(), cmd_tx.clone(), stream, addr)); } } async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, raw_stream: TcpStream, addr: SocketAddr) { println!("Incoming TCP connection from: {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream) .await .expect("Error during the websocket handshake occurred"); println!("WebSocket connection established: {}", addr); let peer_map = peer_info.0; let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel); let mut peer_name = "unknown".to_string(); let (tx, rx) = unbounded(); let (outgoing, mut incoming) = ws_stream.split(); let process_incoming = async { while let Ok(cmd) = incoming.try_next().await { if let Some(cmd) = cmd { if cmd.is_close() { println!("closing \"{peer_name}\"@{addr}"); let mut peers = peer_map.write().await; if let Some(_) = peers.get(&addr) { peers.remove(&addr); } for (paddr, (tx, _, pname, _)) in peers.iter() { if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) { println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}"); } } break; } // if it ain't text we can't handle it if !cmd.is_text() { continue; } let cmd = cmd.to_text().unwrap(); let mut fields = cmd.split(" "); let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> { let x = fields.next().and_then(|xstr| xstr.parse::().ok()); let y = fields.next().and_then(|ystr| ystr.parse::().ok()); x.zip(y) }; if let Some(cmd_name) = fields.next() { match cmd_name { "pos" => { match parse_pos(fields) { Some(pos) => { let (name, id) = { let mut peers = peer_map.write().await; let mut entry = peers.get_mut(&addr).unwrap(); entry.3 = pos.clone(); (entry.2.clone(), entry.1) }; let sanitized_name = name.replace(" ", " ").to_string(); { let peers = peer_map.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) { let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1))); if let Err(e) = r { println!("error sending pos update: {e}"); } } } }, None => { println!("bad position update from \"{peer_name}@{addr}\""); }, } }, "reveal" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap(); }, None => { println!("bad reveal from \"{peer_name}\"@{addr}"); } } }, "flag" => { match parse_pos(fields) { Some(pos) => { println!("{cmd} from \"{peer_name}\"@{addr}"); cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap(); }, None => { println!("bad flag from \"{peer_name}\"@{addr}"); } } }, "reset" => { println!("{cmd} from \"{peer_name}\"@{addr}"); if let Err(e) = cmd_tx.send(MetaMove::Reset) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, "register" => { let name = fields.collect::>().join(&" "); if name.is_empty() { peer_name = "anon".to_string(); } else { peer_name = name; } { // new scope cuz paranoid bout deadlocks peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0))); } tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap(); if let Err(e) = cmd_tx.send(MetaMove::Dump) { println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}"); } }, e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""), } } } } }; let send_to_browser = rx.map(Ok).forward(outgoing); pin_mut!(process_incoming, send_to_browser); future::select(process_incoming, send_to_browser).await; println!("{} disconnected", &addr); peer_map.write().await.remove(&addr); } async fn handle_http_req(request: Request) -> HtmlResult { let page = fs::read_to_string(PAGE_RELPATH).await.unwrap(); let mut uri_path = request.uri().path().split('/').skip(1); let actual_path = uri_path.next(); match (request.method(), actual_path) { (&Method::GET, None | Some("")) => { Response::builder() .status(StatusCode::OK) .body(Body::from(page)) .map_err(errpage) }, (&Method::GET, Some("saddr")) => { Response::builder() .status(StatusCode::OK) .body(Body::from("placeholder")) .map_err(errpage) }, (&Method::GET, Some("font.ttf")) => { Response::builder() .status(StatusCode::OK) .body(Body::from(FONT_FILE_FUCKIT)) .map_err(errpage) }, _ => { Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) .header("ALLOW", "GET, POST") .body(Body::empty()).map_err(errpage) } } } fn errpage(e: T) -> Response { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(e.to_string().into()) .unwrap() } async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); }