summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs356
1 files changed, 99 insertions, 257 deletions
diff --git a/src/main.rs b/src/main.rs
index 1367c60..cafc9d4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,117 +1,130 @@
+#[macro_use] extern crate anyhow;
+
+pub use anyhow::{ Result, Error };
+
use std::{
- error::Error,
- sync::{ Arc, atomic::{ self, AtomicUsize }},
- net::SocketAddr,
convert::Infallible,
collections::HashMap,
+ net::SocketAddr,
+ sync::Arc,
};
+mod types;
+mod conn;
mod minesweeper;
-mod tls_stuff;
-use minesweeper::*;
-use tls_stuff::*;
+use types::*;
-use hyper::{ Method, StatusCode, Body, Request, Response, Server };
+use hyper::{ Body, Request, Server };
use hyper::server::conn::{ AddrStream, AddrIncoming };
-use hyper::service::{make_service_fn, service_fn};
-use tokio::sync::{
- RwLock,
- mpsc,
-};
+use hyper::service::{make_service_fn, service_fn, Service};
+use tokio::sync::RwLock;
+use std::fs;
-type HtmlResult = Result<Response<Body>, Response<Body>>;
-use futures_channel::mpsc::{unbounded, UnboundedSender};
-use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt};
+use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt};
-use tokio::fs;
use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket };
-use tokio_rustls::{ rustls, Accept, server::TlsStream };
-
-type Tx = UnboundedSender<Message>;
-type MovReqTx = mpsc::UnboundedSender<MetaMove>;
-type PeerMap = Arc<RwLock<HashMap<SocketAddr, (Tx, usize, String, (usize, usize))>>>;
-type PeerInfo = (PeerMap, Arc::<AtomicUsize>);
-
-#[derive(Debug)]
-enum MetaMove {
- Move(Move,SocketAddr),
- Dump,
- Reset,
-}
+use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor };
+use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig,
+ Certificate, PrivateKey};
+
+fn main() -> Result<()> {
+ let conf = Config {
+ cert_path: "./cert.pem".to_owned(),
+ pkey_path: "./cert.rsa".to_owned(),
+ page_path: "./page.html".to_owned(),
+ socket_addr: ([0,0,0,0],31235).into(),
+ };
-const PAGE_RELPATH: &str = "./page.html";
-const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf");
+ let state = State {
+ conf,
+ peers: PeerMap::new(RwLock::new(HashMap::new())),
+ };
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- let sequential_id = Arc::new(AtomicUsize::new(0));
- let peers = PeerMap::new(RwLock::new(HashMap::new()));
- let peer_info: PeerInfo = (peers.clone(), sequential_id.clone());
- let addr = SocketAddr::from(([0, 0, 0, 0], 31235));
+ let cert_pem = fs::read(&state.conf.cert_path)?;
+ let pkey_pem = fs::read(&state.conf.pkey_path)?;
// Build TLS configuration.
let tls_cfg = {
- // Load public certificate.
- let certs = load_certs("cert.pem")?;
- // Load private key.
- let key = load_private_key("cert.rsa")?;
- // Do not use client certificate authentication.
- let mut cfg = rustls::ServerConfig::builder()
+ let certs: Vec<Certificate> = rustls_pemfile::certs(&mut &*cert_pem)
+ .map(|mut certs| certs.drain(..).map(Certificate).collect())?;
+ if certs.len() < 1 {
+ return Err(anyhow!("No certificates found"));
+ }
+ let mut keys: Vec<PrivateKey> = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem)
+ .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?;
+ if keys.len() < 1 {
+ return Err(anyhow!("No private keys found"));
+ }
+
+ let mut tls_cfg = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
- .with_single_cert(certs, key)
- .map_err(|e| error(format!("{}", e)))?;
+ .with_single_cert(certs, keys.remove(0))?;
+
// Configure ALPN to accept HTTP/2, HTTP/1.1 in that order.
- cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
- Arc::new(cfg)
+ tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
+ tls_cfg
};
- // Create a TCP listener via tokio.
- let incoming = AddrIncoming::bind(&addr)?;
+ tokio_main(state, tls_cfg)
+}
- let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
- let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone()));
+#[tokio::main]
+async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> {
+ let server = main_server(state.clone(), tls_conf);
+ println!("Serving on {}", state.conf.socket_addr);
+ server.await?;
+ Ok(())
+}
- let serv = make_service_fn(|socket: &tls_stuff::TlsStream| {
- let addr = {
- if let State::Streaming(ref s) = &socket.state {
- s.get_ref().0.remote_addr()
- } else {
- std::net::SocketAddr::new(std::net::Ipv4Addr::new(0,0,0,0).into(),0)
+async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> {
+ // Create a TCP listener, that uses TLS
+ let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?;
+ let local_addr = listener.local_addr()?;
+ let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf));
+ let http = hyper::server::conn::Http::new();
+
+ // Start the temporary single lobby
+ let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded();
+ let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone()));
+
+ loop {
+ let (conn, remote_addr) = listener.accept().await?;
+ let acceptor = acceptor.clone();
+ let http = http.clone();
+ let cloned_state = state.clone();
+ let cmd_tx = cmd_tx.clone();
+ let fut = async move {
+ match acceptor.accept(conn).await {
+ Ok(stream) => {
+ let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await;
+ if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); }
+
+ let handler = conn::PerConnHandler {
+ state: cloned_state,
+ local_addr,
+ remote_addr,
+ game_cmd_tx: cmd_tx,
+ page: page.unwrap(),
+ };
+ if let Err(e) = http.serve_connection(stream, handler).await {
+ eprintln!("hyper error for {remote_addr}: {e}");
+ }
+ },
+ Err(e) => eprintln!("TLS error for {remote_addr}: {e}"),
}
+ Ok(())
};
- let peer_info = peer_info.clone();
- let cmd_tx = cmd_tx.clone();
- async move {
- Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
- let peer_info = peer_info.clone();
- let cmd_tx = cmd_tx.clone();
- async move {
- Ok::<_,Infallible>(match handle_req(req, peer_info.clone(), cmd_tx.clone(), addr).await {
- Ok(r) => r,
- Err(r) => r,
- })
- }
- }))
- }
- });
-
- // Run the future, keep going until an error occurs.
- println!("Starting to serve on https://{}.", addr);
- let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming))
- .serve(serv)
- .with_graceful_shutdown(shutdown_signal());
- if let Err(e) = server.await {
- eprint!("server error: {}", e);
+ tokio::spawn(fut);
}
- Ok(())
}
// If a move is made, broadcast new board, else just send current board
-async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
+async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
+ use minesweeper::*;
let mut game = Game::new(Board::new(75,35), (75*35)/8);
let mut latest_player_name = None;
- while let Some(req) = move_rx.recv().await {
+ while let Some(req) = move_rx.next().await {
let done = game.phase == Phase::Die || game.phase == Phase::Win;
match req {
MetaMove::Move(m, o) => if !done {
@@ -119,7 +132,7 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap
if game.phase == Phase::Win || game.phase == Phase::Die {
game.board = game.board.grade();
}
- latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone());
+ latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone());
},
MetaMove::Dump => (),
MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); },
@@ -133,9 +146,9 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap
}
{
let peers = peers.read().await;
- for (addr, (tx, _, _, _)) in peers.iter() {
+ for (addr, p) in peers.iter() {
for r in reply.iter() {
- if let Err(e) = tx.unbounded_send(r.clone()) {
+ if let Err(e) = p.tx.unbounded_send(r.clone()) {
println!("couldn't send game update {r} to {addr}: {e}");
}
}
@@ -144,177 +157,6 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap
}
}
-async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, socket: HyperWebsocket, addr: SocketAddr) {
- let socket = socket.await.unwrap(); // FIXME error handling
- println!("Incoming TCP connection from: {}", addr);
-
- let peer_map = peer_info.0;
- let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel);
- let mut peer_name = "unknown".to_string();
-
- let (tx, rx) = unbounded();
-
- let (outgoing, mut incoming) = socket.split();
-
- let process_incoming = async {
- while let Ok(cmd) = incoming.try_next().await {
- if let Some(cmd) = cmd {
- if cmd.is_close() {
- println!("closing \"{peer_name}\"@{addr}");
- let mut peers = peer_map.write().await;
- if let Some(_) = peers.get(&addr) {
- peers.remove(&addr);
- }
- for (paddr, (tx, _, pname, _)) in peers.iter() {
- if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) {
- println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}");
- }
- }
- break;
- }
- // if it ain't text we can't handle it
- if !cmd.is_text() { continue; }
- let cmd = cmd.to_text().unwrap();
-
- let mut fields = cmd.split(" ");
- let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
- let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
- let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
- x.zip(y)
- };
- if let Some(cmd_name) = fields.next() {
- match cmd_name {
- "pos" => {
- match parse_pos(fields) {
- Some(pos) => {
- let (name, id) = {
- let mut peers = peer_map.write().await;
- let mut entry = peers.get_mut(&addr).unwrap();
- entry.3 = pos.clone();
- (entry.2.clone(), entry.1)
- };
- let sanitized_name = name.replace(" ", "&nbsp").to_string();
- {
- let peers = peer_map.read().await;
- for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) {
- let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
- if let Err(e) = r {
- println!("error sending pos update: {e}");
- }
- }
- }
- },
- None => {
- println!("bad position update from \"{peer_name}@{addr}\"");
- },
- }
- },
- "reveal" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap();
- },
- None => {
- println!("bad reveal from \"{peer_name}\"@{addr}");
- }
- }
- },
- "flag" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap();
- },
- None => {
- println!("bad flag from \"{peer_name}\"@{addr}");
- }
- }
- },
- "reset" => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Reset) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- "register" => {
- let name = fields.collect::<Vec<&str>>().join(&" ");
- if name.is_empty() {
- peer_name = "anon".to_string();
- } else {
- peer_name = name;
- }
- { // new scope cuz paranoid bout deadlocks
- peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0)));
- }
- tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap();
- if let Err(e) = cmd_tx.send(MetaMove::Dump) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
- }
- }
- }
- }
- };
- let send_to_browser = rx.map(Ok).forward(outgoing);
-
- pin_mut!(process_incoming, send_to_browser);
- future::select(process_incoming, send_to_browser).await;
-
- println!("{} disconnected", &addr);
- peer_map.write().await.remove(&addr);
-}
-
-async fn handle_req(mut request: Request<Body>, peer_info: PeerInfo, cmd_tx: MovReqTx, addr: SocketAddr) -> HtmlResult {
- if hyper_tungstenite::is_upgrade_request(&request) {
- let (resp, wsocket) = hyper_tungstenite::upgrade(&mut request, None).expect("couldn't upgrade to websocket");
- tokio::spawn(async move {
- peer_connection(peer_info.clone(), cmd_tx.clone(), wsocket, addr).await;
- });
- return Ok(resp);
- }
-
- let page = fs::read_to_string(PAGE_RELPATH).await.unwrap();
- let mut uri_path = request.uri().path().split('/').skip(1);
- let actual_path = uri_path.next();
-
- match (request.method(), actual_path) {
- (&Method::GET, None | Some("")) => {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(page))
- .map_err(errpage)
- },
- (&Method::GET, Some("saddr")) => {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::from("placeholder"))
- .map_err(errpage)
- },
- (&Method::GET, Some("font.ttf")) => {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(FONT_FILE_FUCKIT))
- .map_err(errpage)
- },
- _ => {
- Response::builder()
- .status(StatusCode::METHOD_NOT_ALLOWED)
- .header("ALLOW", "GET, POST")
- .body(Body::empty()).map_err(errpage)
- }
- }
-}
-
-fn errpage<T: Error>(e: T) -> Response<Body> {
- Response::builder()
- .status(StatusCode::INTERNAL_SERVER_ERROR)
- .body(e.to_string().into())
- .unwrap()
-}
-
fn error(err: String) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, err)
}