summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorstale <redkugelblitzin@gmail.com>2022-05-07 21:38:25 -0300
committerstale <redkugelblitzin@gmail.com>2022-05-07 21:38:25 -0300
commitac938b0d57d2e2630bf05668fec6b3d6c18d0450 (patch)
treed591b5a48b6c4974854c578bc157a7a8ffa2414c
parent88cc92f64dfc4a241410f67b20dec4680461a344 (diff)
this don't work no good
-rw-r--r--Cargo.lock64
-rw-r--r--Cargo.toml2
-rw-r--r--src/conn.rs227
-rw-r--r--src/main.rs356
-rw-r--r--src/tls_stuff.rs159
-rw-r--r--src/types.rs43
6 files changed, 431 insertions, 420 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 74011a6..00bb05a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3,6 +3,12 @@
version = 3
[[package]]
+name = "anyhow"
+version = "1.0.57"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
+
+[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -125,12 +131,28 @@ dependencies = [
]
[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-channel"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
+ "futures-sink",
]
[[package]]
@@ -140,6 +162,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
+name = "futures-executor"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -157,9 +207,13 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
+ "futures-channel",
"futures-core",
+ "futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
+ "memchr",
"pin-project-lite",
"pin-utils",
"slab",
@@ -487,9 +541,9 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro2"
-version = "1.0.37"
+version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
+checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa"
dependencies = [
"unicode-xid",
]
@@ -721,9 +775,9 @@ dependencies = [
[[package]]
name = "tokio-rustls"
-version = "0.23.3"
+version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e"
+checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio",
@@ -976,7 +1030,9 @@ dependencies = [
name = "websweeper"
version = "1.0.0"
dependencies = [
+ "anyhow",
"format-bytes",
+ "futures",
"futures-channel",
"futures-util",
"hyper",
diff --git a/Cargo.toml b/Cargo.toml
index 9d4c83c..d54c86d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,6 +13,8 @@ tokio-rustls = "0.23"
rustls-pemfile = "1"
hyper-tungstenite = "0.8"
rand = "0.8"
+futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
futures-channel = "0.3"
format-bytes = "0.3"
+anyhow = "1.0"
diff --git a/src/conn.rs b/src/conn.rs
new file mode 100644
index 0000000..6d08942
--- /dev/null
+++ b/src/conn.rs
@@ -0,0 +1,227 @@
+use crate::types::*;
+use std::{
+ net::SocketAddr,
+ convert::Infallible,
+ pin::Pin,
+ task::Poll,
+};
+use hyper::{ Request, Response, Body, service::Service };
+use hyper_tungstenite::{
+ HyperWebsocket,
+ tungstenite::Message,
+};
+use futures::{
+ Future,
+ SinkExt,
+ StreamExt,
+ TryStreamExt,
+ TryFuture,
+ TryFutureExt,
+ channel::mpsc::unbounded,
+ pin_mut,
+};
+
+const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf");
+
+#[derive(Clone)]
+pub struct PerConnHandler {
+ pub state: State,
+ pub local_addr: SocketAddr,
+ pub remote_addr: SocketAddr,
+ pub game_cmd_tx: MovReqTx,
+ pub page: String,
+}
+
+impl PerConnHandler {
+ async fn handle_req(self, req: Request<Body>)
+ -> std::result::Result<Response<Body>, Infallible>
+ {
+ if hyper_tungstenite::is_upgrade_request(&req) {
+ let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket");
+ tokio::spawn(async move {
+ peer_connection(self, wsocket).await;
+ });
+ return Ok(resp);
+ }
+
+ //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?;
+ let mut uri_path = req.uri().path().split('/').skip(1);
+ let actual_path = uri_path.next();
+
+ use hyper::{ Method, StatusCode };
+ match (req.method(), actual_path) {
+ (&Method::GET, None | Some("")) => {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(self.page))
+ .unwrap())
+ },
+ (&Method::GET, Some("font.ttf")) => {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(FONT_FILE))
+ .unwrap())
+ },
+ _ => {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::empty())
+ .unwrap())
+ }
+ }
+ }
+}
+
+impl Service<Request<Body>> for PerConnHandler
+{
+ type Response = Response<Body>;
+ type Error = Infallible;
+ type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response,Self::Error>> + Send>>;
+
+ fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let cloned_self = self.clone();
+ Box::pin(cloned_self.handle_req(req))
+ }
+}
+
+pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
+ let socket = socket.await.expect("couldn't finish websocket connection");
+ let addr = conn.remote_addr;
+ let peers = &conn.state.peers;
+ let mut cmd_tx = &conn.game_cmd_tx;
+ println!("Incoming TCP connection from: {addr}");
+
+ let mut peer_name = "unknown".to_string();
+
+ // for game -> conn comms
+ let (tx, rx) = unbounded();
+ // for server -> client comms
+ let (outgoing, mut incoming) = socket.split();
+
+ let process_incoming = async {
+ while let Ok(cmd) = incoming.try_next().await {
+ if let Some(cmd) = cmd {
+ if cmd.is_close() {
+ println!("closing \"{peer_name}\"@{addr}");
+ let mut peers = peers.write().await;
+ if let Some(_) = peers.get(&addr) {
+ peers.remove(&addr);
+ }
+ for (paddr, p) in peers.iter() {
+ if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) {
+ println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
+ }
+ }
+ break;
+ }
+ // if it ain't text we can't handle it
+ if !cmd.is_text() { continue; }
+ let cmd = cmd.to_text().unwrap();
+
+ let mut fields = cmd.split(" ");
+ let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
+ let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
+ let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
+ x.zip(y)
+ };
+ if let Some(cmd_name) = fields.next() {
+ use crate::minesweeper::{Move,MoveType};
+ match cmd_name {
+ "pos" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ let (name, id) = {
+ let mut peers = peers.write().await;
+ let mut entry = peers.get_mut(&addr).unwrap();
+ entry.position = pos.clone();
+ (entry.name.clone(), entry.seq_id)
+ };
+ let sanitized_name = name.replace(" ", "&nbsp").to_string();
+ {
+ let peers = peers.read().await;
+ for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
+ let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
+ if let Err(e) = r {
+ println!("error sending pos update: {e}");
+ }
+ }
+ }
+ },
+ None => {
+ println!("bad position update from \"{peer_name}@{addr}\"");
+ },
+ }
+ },
+ "reveal" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from \"{peer_name}\"@{addr}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await {
+ println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
+ };
+ },
+ None => {
+ println!("bad reveal from \"{peer_name}\"@{addr}");
+ }
+ }
+ },
+ "flag" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from \"{peer_name}\"@{addr}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await {
+ println!("couldn't process \"{peer_name}\"'s flag command: {e}");
+ };
+ },
+ None => {
+ println!("bad flag from \"{peer_name}\"@{addr}");
+ }
+ }
+ },
+ "reset" => {
+ println!("{cmd} from \"{peer_name}\"@{addr}");
+ if let Err(e) = cmd_tx.send(MetaMove::Reset).await {
+ println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
+ }
+ },
+ "register" => {
+ let name = fields.collect::<Vec<&str>>().join(&" ");
+ let id;
+ if name.is_empty() {
+ peer_name = "anon".to_string();
+ } else {
+ peer_name = name;
+ }
+ { // new scope cuz paranoid bout deadlocks
+ let mut peers = peers.write().await;
+ id = peers.len();
+ peers.insert(addr, Peer {
+ tx: tx.clone(),
+ seq_id: id,
+ name: peer_name.clone(),
+ position: (0,0)
+ });
+ }
+ tx.unbounded_send(Message::text(format!("id {id}"))).unwrap();
+ if let Err(e) = cmd_tx.send(MetaMove::Dump).await {
+ println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
+ }
+ },
+ e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
+ }
+ }
+ }
+ }
+ };
+ let send_to_browser = rx.map(Ok).forward(outgoing);
+
+ pin_mut!(process_incoming, send_to_browser);
+ futures_util::future::select(process_incoming, send_to_browser).await;
+
+ println!("{addr} disconnected");
+ peers.write().await.remove(&addr);
+}
diff --git a/src/main.rs b/src/main.rs
index 1367c60..cafc9d4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,117 +1,130 @@
+#[macro_use] extern crate anyhow;
+
+pub use anyhow::{ Result, Error };
+
use std::{
- error::Error,
- sync::{ Arc, atomic::{ self, AtomicUsize }},
- net::SocketAddr,
convert::Infallible,
collections::HashMap,
+ net::SocketAddr,
+ sync::Arc,
};
+mod types;
+mod conn;
mod minesweeper;
-mod tls_stuff;
-use minesweeper::*;
-use tls_stuff::*;
+use types::*;
-use hyper::{ Method, StatusCode, Body, Request, Response, Server };
+use hyper::{ Body, Request, Server };
use hyper::server::conn::{ AddrStream, AddrIncoming };
-use hyper::service::{make_service_fn, service_fn};
-use tokio::sync::{
- RwLock,
- mpsc,
-};
+use hyper::service::{make_service_fn, service_fn, Service};
+use tokio::sync::RwLock;
+use std::fs;
-type HtmlResult = Result<Response<Body>, Response<Body>>;
-use futures_channel::mpsc::{unbounded, UnboundedSender};
-use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt, sink::SinkExt};
+use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt};
-use tokio::fs;
use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket };
-use tokio_rustls::{ rustls, Accept, server::TlsStream };
-
-type Tx = UnboundedSender<Message>;
-type MovReqTx = mpsc::UnboundedSender<MetaMove>;
-type PeerMap = Arc<RwLock<HashMap<SocketAddr, (Tx, usize, String, (usize, usize))>>>;
-type PeerInfo = (PeerMap, Arc::<AtomicUsize>);
-
-#[derive(Debug)]
-enum MetaMove {
- Move(Move,SocketAddr),
- Dump,
- Reset,
-}
+use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor };
+use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig,
+ Certificate, PrivateKey};
+
+fn main() -> Result<()> {
+ let conf = Config {
+ cert_path: "./cert.pem".to_owned(),
+ pkey_path: "./cert.rsa".to_owned(),
+ page_path: "./page.html".to_owned(),
+ socket_addr: ([0,0,0,0],31235).into(),
+ };
-const PAGE_RELPATH: &str = "./page.html";
-const FONT_FILE_FUCKIT: &[u8] = include_bytes!("./VT323-Regular.ttf");
+ let state = State {
+ conf,
+ peers: PeerMap::new(RwLock::new(HashMap::new())),
+ };
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- let sequential_id = Arc::new(AtomicUsize::new(0));
- let peers = PeerMap::new(RwLock::new(HashMap::new()));
- let peer_info: PeerInfo = (peers.clone(), sequential_id.clone());
- let addr = SocketAddr::from(([0, 0, 0, 0], 31235));
+ let cert_pem = fs::read(&state.conf.cert_path)?;
+ let pkey_pem = fs::read(&state.conf.pkey_path)?;
// Build TLS configuration.
let tls_cfg = {
- // Load public certificate.
- let certs = load_certs("cert.pem")?;
- // Load private key.
- let key = load_private_key("cert.rsa")?;
- // Do not use client certificate authentication.
- let mut cfg = rustls::ServerConfig::builder()
+ let certs: Vec<Certificate> = rustls_pemfile::certs(&mut &*cert_pem)
+ .map(|mut certs| certs.drain(..).map(Certificate).collect())?;
+ if certs.len() < 1 {
+ return Err(anyhow!("No certificates found"));
+ }
+ let mut keys: Vec<PrivateKey> = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem)
+ .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?;
+ if keys.len() < 1 {
+ return Err(anyhow!("No private keys found"));
+ }
+
+ let mut tls_cfg = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
- .with_single_cert(certs, key)
- .map_err(|e| error(format!("{}", e)))?;
+ .with_single_cert(certs, keys.remove(0))?;
+
// Configure ALPN to accept HTTP/2, HTTP/1.1 in that order.
- cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
- Arc::new(cfg)
+ tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
+ tls_cfg
};
- // Create a TCP listener via tokio.
- let incoming = AddrIncoming::bind(&addr)?;
+ tokio_main(state, tls_cfg)
+}
- let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
- let game_t = tokio::spawn(gameloop(cmd_rx, peers.clone()));
+#[tokio::main]
+async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> {
+ let server = main_server(state.clone(), tls_conf);
+ println!("Serving on {}", state.conf.socket_addr);
+ server.await?;
+ Ok(())
+}
- let serv = make_service_fn(|socket: &tls_stuff::TlsStream| {
- let addr = {
- if let State::Streaming(ref s) = &socket.state {
- s.get_ref().0.remote_addr()
- } else {
- std::net::SocketAddr::new(std::net::Ipv4Addr::new(0,0,0,0).into(),0)
+async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> {
+ // Create a TCP listener, that uses TLS
+ let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?;
+ let local_addr = listener.local_addr()?;
+ let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf));
+ let http = hyper::server::conn::Http::new();
+
+ // Start the temporary single lobby
+ let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded();
+ let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone()));
+
+ loop {
+ let (conn, remote_addr) = listener.accept().await?;
+ let acceptor = acceptor.clone();
+ let http = http.clone();
+ let cloned_state = state.clone();
+ let cmd_tx = cmd_tx.clone();
+ let fut = async move {
+ match acceptor.accept(conn).await {
+ Ok(stream) => {
+ let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await;
+ if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); }
+
+ let handler = conn::PerConnHandler {
+ state: cloned_state,
+ local_addr,
+ remote_addr,
+ game_cmd_tx: cmd_tx,
+ page: page.unwrap(),
+ };
+ if let Err(e) = http.serve_connection(stream, handler).await {
+ eprintln!("hyper error for {remote_addr}: {e}");
+ }
+ },
+ Err(e) => eprintln!("TLS error for {remote_addr}: {e}"),
}
+ Ok(())
};
- let peer_info = peer_info.clone();
- let cmd_tx = cmd_tx.clone();
- async move {
- Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
- let peer_info = peer_info.clone();
- let cmd_tx = cmd_tx.clone();
- async move {
- Ok::<_,Infallible>(match handle_req(req, peer_info.clone(), cmd_tx.clone(), addr).await {
- Ok(r) => r,
- Err(r) => r,
- })
- }
- }))
- }
- });
-
- // Run the future, keep going until an error occurs.
- println!("Starting to serve on https://{}.", addr);
- let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming))
- .serve(serv)
- .with_graceful_shutdown(shutdown_signal());
- if let Err(e) = server.await {
- eprint!("server error: {}", e);
+ tokio::spawn(fut);
}
- Ok(())
}
// If a move is made, broadcast new board, else just send current board
-async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
+async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
+ use minesweeper::*;
let mut game = Game::new(Board::new(75,35), (75*35)/8);
let mut latest_player_name = None;
- while let Some(req) = move_rx.recv().await {
+ while let Some(req) = move_rx.next().await {
let done = game.phase == Phase::Die || game.phase == Phase::Win;
match req {
MetaMove::Move(m, o) => if !done {
@@ -119,7 +132,7 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap
if game.phase == Phase::Win || game.phase == Phase::Die {
game.board = game.board.grade();
}
- latest_player_name = peers.read().await.get(&o).map(|(_,_,n,_)| n.clone());
+ latest_player_name = peers.read().await.get(&o).map(|p| p.name.clone());
},
MetaMove::Dump => (),
MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); },
@@ -133,9 +146,9 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap
}
{
let peers = peers.read().await;
- for (addr, (tx, _, _, _)) in peers.iter() {
+ for (addr, p) in peers.iter() {
for r in reply.iter() {
- if let Err(e) = tx.unbounded_send(r.clone()) {
+ if let Err(e) = p.tx.unbounded_send(r.clone()) {
println!("couldn't send game update {r} to {addr}: {e}");
}
}
@@ -144,177 +157,6 @@ async fn gameloop(mut move_rx: mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap
}
}
-async fn peer_connection(peer_info: PeerInfo, cmd_tx: MovReqTx, socket: HyperWebsocket, addr: SocketAddr) {
- let socket = socket.await.unwrap(); // FIXME error handling
- println!("Incoming TCP connection from: {}", addr);
-
- let peer_map = peer_info.0;
- let peer_seqid = peer_info.1.fetch_add(1, atomic::Ordering::AcqRel);
- let mut peer_name = "unknown".to_string();
-
- let (tx, rx) = unbounded();
-
- let (outgoing, mut incoming) = socket.split();
-
- let process_incoming = async {
- while let Ok(cmd) = incoming.try_next().await {
- if let Some(cmd) = cmd {
- if cmd.is_close() {
- println!("closing \"{peer_name}\"@{addr}");
- let mut peers = peer_map.write().await;
- if let Some(_) = peers.get(&addr) {
- peers.remove(&addr);
- }
- for (paddr, (tx, _, pname, _)) in peers.iter() {
- if let Err(e) = tx.unbounded_send(Message::text("logoff {peer_seqid} {peer_name}")) {
- println!("couldn't deliver logoff info to \"{pname}\"@{paddr}: {e}");
- }
- }
- break;
- }
- // if it ain't text we can't handle it
- if !cmd.is_text() { continue; }
- let cmd = cmd.to_text().unwrap();
-
- let mut fields = cmd.split(" ");
- let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
- let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
- let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
- x.zip(y)
- };
- if let Some(cmd_name) = fields.next() {
- match cmd_name {
- "pos" => {
- match parse_pos(fields) {
- Some(pos) => {
- let (name, id) = {
- let mut peers = peer_map.write().await;
- let mut entry = peers.get_mut(&addr).unwrap();
- entry.3 = pos.clone();
- (entry.2.clone(), entry.1)
- };
- let sanitized_name = name.replace(" ", "&nbsp").to_string();
- {
- let peers = peer_map.read().await;
- for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,(peer_tx,_,_,_))| peer_tx) {
- let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
- if let Err(e) = r {
- println!("error sending pos update: {e}");
- }
- }
- }
- },
- None => {
- println!("bad position update from \"{peer_name}@{addr}\"");
- },
- }
- },
- "reveal" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).unwrap();
- },
- None => {
- println!("bad reveal from \"{peer_name}\"@{addr}");
- }
- }
- },
- "flag" => {
- match parse_pos(fields) {
- Some(pos) => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).unwrap();
- },
- None => {
- println!("bad flag from \"{peer_name}\"@{addr}");
- }
- }
- },
- "reset" => {
- println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Reset) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- "register" => {
- let name = fields.collect::<Vec<&str>>().join(&" ");
- if name.is_empty() {
- peer_name = "anon".to_string();
- } else {
- peer_name = name;
- }
- { // new scope cuz paranoid bout deadlocks
- peer_map.write().await.insert(addr, (tx.clone(), peer_seqid, peer_name.clone(), (0,0)));
- }
- tx.unbounded_send(Message::text(format!("id {}", peer_seqid))).unwrap();
- if let Err(e) = cmd_tx.send(MetaMove::Dump) {
- println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
- }
- },
- e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
- }
- }
- }
- }
- };
- let send_to_browser = rx.map(Ok).forward(outgoing);
-
- pin_mut!(process_incoming, send_to_browser);
- future::select(process_incoming, send_to_browser).await;
-
- println!("{} disconnected", &addr);
- peer_map.write().await.remove(&addr);
-}
-
-async fn handle_req(mut request: Request<Body>, peer_info: PeerInfo, cmd_tx: MovReqTx, addr: SocketAddr) -> HtmlResult {
- if hyper_tungstenite::is_upgrade_request(&request) {
- let (resp, wsocket) = hyper_tungstenite::upgrade(&mut request, None).expect("couldn't upgrade to websocket");
- tokio::spawn(async move {
- peer_connection(peer_info.clone(), cmd_tx.clone(), wsocket, addr).await;
- });
- return Ok(resp);
- }
-
- let page = fs::read_to_string(PAGE_RELPATH).await.unwrap();
- let mut uri_path = request.uri().path().split('/').skip(1);
- let actual_path = uri_path.next();
-
- match (request.method(), actual_path) {
- (&Method::GET, None | Some("")) => {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(page))
- .map_err(errpage)
- },
- (&Method::GET, Some("saddr")) => {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::from("placeholder"))
- .map_err(errpage)
- },
- (&Method::GET, Some("font.ttf")) => {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(FONT_FILE_FUCKIT))
- .map_err(errpage)
- },
- _ => {
- Response::builder()
- .status(StatusCode::METHOD_NOT_ALLOWED)
- .header("ALLOW", "GET, POST")
- .body(Body::empty()).map_err(errpage)
- }
- }
-}
-
-fn errpage<T: Error>(e: T) -> Response<Body> {
- Response::builder()
- .status(StatusCode::INTERNAL_SERVER_ERROR)
- .body(e.to_string().into())
- .unwrap()
-}
-
fn error(err: String) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, err)
}
diff --git a/src/tls_stuff.rs b/src/tls_stuff.rs
deleted file mode 100644
index 83c0489..0000000
--- a/src/tls_stuff.rs
+++ /dev/null
@@ -1,159 +0,0 @@
-//! Simple HTTPS echo service based on hyper-rustls
-//!
-//! First parameter is the mandatory port to use.
-//! Certificate and private key are hardcoded to sample files.
-//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2,
-//! otherwise HTTP/1.1 will be used.
-use core::task::{Context, Poll};
-use futures_util::ready;
-use hyper::server::accept::Accept;
-use hyper::server::conn::{AddrIncoming, AddrStream};
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::vec::Vec;
-use std::{fs, io};
-use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
-use tokio_rustls::rustls::{self, ServerConfig};
-
-fn error(err: String) -> io::Error {
- io::Error::new(io::ErrorKind::Other, err)
-}
-
-pub enum State {
- Handshaking(tokio_rustls::Accept<AddrStream>),
- Streaming(tokio_rustls::server::TlsStream<AddrStream>),
-}
-
-// tokio_rustls::server::TlsStream doesn't expose constructor methods,
-// so we have to TlsAcceptor::accept and handshake to have access to it
-// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first
-pub struct TlsStream {
- pub state: State,
-}
-
-impl TlsStream {
- pub fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream {
- let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream);
- TlsStream {
- state: State::Handshaking(accept),
- }
- }
-}
-
-impl AsyncRead for TlsStream {
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut ReadBuf,
- ) -> Poll<io::Result<()>> {
- let pin = self.get_mut();
- match pin.state {
- State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
- Ok(mut stream) => {
- let result = Pin::new(&mut stream).poll_read(cx, buf);
- pin.state = State::Streaming(stream);
- result
- }
- Err(err) => Poll::Ready(Err(err)),
- },
- State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf),
- }
- }
-}
-
-impl AsyncWrite for TlsStream {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- let pin = self.get_mut();
- match pin.state {
- State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
- Ok(mut stream) => {
- let result = Pin::new(&mut stream).poll_write(cx, buf);
- pin.state = State::Streaming(stream);
- result
- }
- Err(err) => Poll::Ready(Err(err)),
- },
- State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf),
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- match self.state {
- State::Handshaking(_) => Poll::Ready(Ok(())),
- State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx),
- }
- }
-
- fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- match self.state {
- State::Handshaking(_) => Poll::Ready(Ok(())),
- State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx),
- }
- }
-}
-
-pub struct TlsAcceptor {
- config: Arc<ServerConfig>,
- incoming: AddrIncoming,
-}
-
-impl TlsAcceptor {
- pub fn new(config: Arc<ServerConfig>, incoming: AddrIncoming) -> TlsAcceptor {
- TlsAcceptor { config, incoming }
- }
-}
-
-impl Accept for TlsAcceptor {
- type Conn = TlsStream;
- type Error = io::Error;
-
- fn poll_accept(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
- let pin = self.get_mut();
- match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) {
- Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))),
- Some(Err(e)) => Poll::Ready(Some(Err(e))),
- None => Poll::Ready(None),
- }
- }
-}
-
-// Load public certificate from file.
-pub fn load_certs(filename: &str) -> io::Result<Vec<rustls::Certificate>> {
- // Open certificate file.
- let certfile = fs::File::open(filename)
- .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
- let mut reader = io::BufReader::new(certfile);
-
- // Load and return certificate.
- let certs = rustls_pemfile::certs(&mut reader)
- .map_err(|_| error("failed to load certificate".into()))?;
- Ok(certs
- .into_iter()
- .map(rustls::Certificate)
- .collect())
-}
-
-// Load private key from file.
-pub fn load_private_key(filename: &str) -> io::Result<rustls::PrivateKey> {
- // Open keyfile.
- let keyfile = fs::File::open(filename)
- .map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
- let mut reader = io::BufReader::new(keyfile);
-
- // Load and return a single private key.
- let keys = rustls_pemfile::rsa_private_keys(&mut reader)
- .map_err(|_| error("failed to load private key".into()))?;
- if keys.len() != 1 {
- return Err(error("expected a single private key".into()));
- }
-
- Ok(rustls::PrivateKey(keys[0].clone()))
-}
diff --git a/src/types.rs b/src/types.rs
new file mode 100644
index 0000000..eed69dd
--- /dev/null
+++ b/src/types.rs
@@ -0,0 +1,43 @@
+use std::{
+ collections::HashMap,
+ net::SocketAddr,
+ sync::Arc,
+};
+use tokio::sync::RwLock;
+use hyper_tungstenite::tungstenite::Message;
+use hyper::{ Response, Body };
+use futures::channel::mpsc::UnboundedSender;
+use crate::minesweeper;
+
+#[derive(Debug, Clone)]
+pub struct Config {
+ pub cert_path: String,
+ pub pkey_path: String,
+ pub page_path: String,
+ pub socket_addr: SocketAddr,
+}
+
+#[derive(Debug, Clone)]
+pub struct State {
+ pub conf: Config,
+ pub peers: PeerMap,
+}
+
+#[derive(Debug)]
+pub enum MetaMove {
+ Move(minesweeper::Move,SocketAddr),
+ Dump,
+ Reset,
+}
+
+#[derive(Debug)]
+pub struct Peer {
+ pub tx: UnboundedSender<Message>,
+ pub seq_id: usize,
+ pub name: String,
+ pub position: (usize, usize),
+}
+
+pub type HtmlResult = Result<Response<Body>, Response<Body>>;
+pub type MovReqTx = futures::channel::mpsc::UnboundedSender<MetaMove>;
+pub type PeerMap = Arc<RwLock<HashMap<SocketAddr, Peer>>>;