use std::{ error::Error, sync::{ Arc, atomic::{ self, AtomicUsize }}, net::SocketAddr, convert::Infallible, collections::HashMap, }; mod minesweeper; use minesweeper::*; //use std::convert::{ TryFrom, TryInto }; use hyper::{ Method, StatusCode, Body, Request, Response, Server }; use hyper::service::{make_service_fn, service_fn}; use tokio::sync::{ RwLock, mpsc, }; type HtmlResult = Result, Response>; use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::fs; use tokio_tungstenite::tungstenite::protocol::Message; type Tx = UnboundedSender; type MovReqTx = mpsc::UnboundedSender; type PeerMap = Arc>>; type PeerInfo = (PeerMap, Arc::); #[derive(Debug)] enum MetaMove { Move(Move), Dump, Reset, } const PAGE_RELPATH: &str = "./page.html"; const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf"); #[tokio::main] async fn main() { let sequential_id = Arc::new(AtomicUsize::new(0)); let peers = PeerMap::new(RwLock::new(HashMap::new())); let peer_info: PeerInfo = (peers.clone(), sequential_id.clone()); let http_addr = SocketAddr::from(([0, 0, 0, 0], 31235)); println!("Http on {}", http_addr); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone())); // need to await this one at some point let conn_l = tokio::spawn(conn_listener(peer_info.clone(), cmd_tx.clone())); let http_serv = make_service_fn(|_| { async move { Ok::<_, Infallible>(service_fn(move |req: Request| { async move { Ok::<_,Infallible>(match handle_http_req(req).await { Ok(r) => r, Err(r) => r, }) } })) } }); let server = Server::bind(&http_addr) .serve(http_serv) .with_graceful_shutdown(shutdown_signal()); if let Err(e) = server.await { eprint!("server error: {}", e); } } // If a move is made, broadcast new board, else just send current board async fn gameloop(mut move_rx: mpsc::UnboundedReceiver, peers: PeerMap) { let mut game = Game::new(Board::new(75,35), (75*35)/8); while let Some(req) = move_rx.recv().await { let mut done = game.phase == Phase::Die || game.phase == Phase::Win; match req { MetaMove::Move(m) => if !done { game = game.act(m)}, MetaMove::Dump => (), MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); done = false;}, } if !done { let mut reply = vec![]; match game.phase { Phase::Win => { reply.push(Message::text("win")); game.board = game.board.grade(); }, Phase::Die => { reply.push(Message::text("lose")); game.board = game.board.grade(); }, _ => (), } reply.push(Message::binary(game.board.render())); { let peers = peers.read().await; for (_, (tx, _, _, _)) in peers.iter() { for r in reply.iter() { tx.unbounded_send(r.clone()).unwrap(); } } } } } } async fn conn_listener(peer_info: PeerInfo, cmd_tx: MovReqTx) { let ws_addr = SocketAddr::from(([0, 0, 0, 0], 31236)); let ws_socket = TcpListener::bind(&ws_addr).await; let ws_listener = ws_socket.expect("Failed to bind"); // Let's spawn the handling of each connection in a separate task. println!("Websocket on {}", ws_addr); while let Ok((stream, addr)) = ws_listener.accept().await { tokio::spawn(peer_connection(peer_info.clone(), cmd_tx.clone(), stream, addr)); } } async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, raw_stream: TcpStream, addr: SocketAddr) { println!("Incoming TCP connection from: {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream) .await .expect("Error during the websocket handshake occurred"); println!("WebSocket connection established: {}", addr); let peer_map = peer_info.0; let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel); // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); let (outgoing, mut incoming) = ws_stream.split(); let process_incoming = async { while let Some(cmd) = incoming.try_next().await.unwrap() { let cmd = cmd.to_text().unwrap(); let mut fields = cmd.split(" ").skip(1); if cmd.starts_with("pos") { let pos = (fields.next().unwrap().parse::().unwrap(), fields.next().unwrap().parse::().unwrap()); let (name, id) = { let mut peers = peer_map.write().await; let mut entry = peers.get_mut(&addr).unwrap(); entry.3 = pos.clone(); (entry.2.clone(), entry.1) }; { let peers = peer_map.read().await; for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) { peer_tx.unbounded_send(Message::text(format!("pos {} {} {} {}", id, name, pos.0, pos.1))).unwrap(); } } } else if cmd.starts_with("reveal") { println!("got {} from {}", cmd, addr); let pos = (fields.next().unwrap().parse::().unwrap(), fields.next().unwrap().parse::().unwrap()); cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos })).unwrap(); } else if cmd.starts_with("flag") { println!("got {} from {}", cmd, addr); let pos = (fields.next().unwrap().parse::().unwrap(), fields.next().unwrap().parse::().unwrap()); cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos })).unwrap(); } else if cmd.starts_with("reset") { println!("got {} from {}", cmd, addr); cmd_tx.send(MetaMove::Reset).unwrap(); } else if cmd.starts_with("register") { let name = fields.next().unwrap(); { // new scope cuz paranoid bout deadlocks peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, name.to_string(), (0,0))); } tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap(); cmd_tx.send(MetaMove::Dump).unwrap(); } } }; let send_to_browser = rx.map(Ok).forward(outgoing); pin_mut!(process_incoming, send_to_browser); future::select(process_incoming, send_to_browser).await; println!("{} disconnected", &addr); peer_map.write().await.remove(&addr); } async fn handle_http_req(request: Request) -> HtmlResult { let page = fs::read_to_string(PAGE_RELPATH).await.unwrap(); let mut uri_path = request.uri().path().split('/').skip(1); let actual_path = uri_path.next(); match (request.method(), actual_path) { (&Method::GET, None | Some("")) => { Response::builder() .status(StatusCode::OK) .body(Body::from(page)) .map_err(errpage) }, (&Method::GET, Some("saddr")) => { Response::builder() .status(StatusCode::OK) .body(Body::from("placeholder")) .map_err(errpage) }, (&Method::GET, Some("font.ttf")) => { Response::builder() .status(StatusCode::OK) .body(Body::from(FONT_FILE_FUCKIT)) .map_err(errpage) }, _ => { Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) .header("ALLOW", "GET, POST") .body(Body::empty()).map_err(errpage) } } } fn errpage(e: T) -> Response { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(e.to_string().into()) .unwrap() } async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); }