summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/conn.rs133
-rw-r--r--src/main.rs145
-rw-r--r--src/types.rs15
3 files changed, 81 insertions, 212 deletions
diff --git a/src/conn.rs b/src/conn.rs
index 6d08942..fda3a2b 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -1,106 +1,19 @@
use crate::types::*;
-use std::{
- net::SocketAddr,
- convert::Infallible,
- pin::Pin,
- task::Poll,
-};
-use hyper::{ Request, Response, Body, service::Service };
-use hyper_tungstenite::{
- HyperWebsocket,
- tungstenite::Message,
-};
-use futures::{
- Future,
- SinkExt,
- StreamExt,
- TryStreamExt,
- TryFuture,
- TryFutureExt,
- channel::mpsc::unbounded,
- pin_mut,
-};
+use futures_util::{SinkExt, StreamExt, TryStreamExt};
+use warp::ws::Message;
-const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf");
-
-#[derive(Clone)]
-pub struct PerConnHandler {
- pub state: State,
- pub local_addr: SocketAddr,
- pub remote_addr: SocketAddr,
- pub game_cmd_tx: MovReqTx,
- pub page: String,
-}
-
-impl PerConnHandler {
- async fn handle_req(self, req: Request<Body>)
- -> std::result::Result<Response<Body>, Infallible>
- {
- if hyper_tungstenite::is_upgrade_request(&req) {
- let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket");
- tokio::spawn(async move {
- peer_connection(self, wsocket).await;
- });
- return Ok(resp);
- }
-
- //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?;
- let mut uri_path = req.uri().path().split('/').skip(1);
- let actual_path = uri_path.next();
-
- use hyper::{ Method, StatusCode };
- match (req.method(), actual_path) {
- (&Method::GET, None | Some("")) => {
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(self.page))
- .unwrap())
- },
- (&Method::GET, Some("font.ttf")) => {
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(FONT_FILE))
- .unwrap())
- },
- _ => {
- Ok(Response::builder()
- .status(StatusCode::NOT_FOUND)
- .body(Body::empty())
- .unwrap())
- }
- }
- }
-}
-
-impl Service<Request<Body>> for PerConnHandler
-{
- type Response = Response<Body>;
- type Error = Infallible;
- type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response,Self::Error>> + Send>>;
-
- fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let cloned_self = self.clone();
- Box::pin(cloned_self.handle_req(req))
- }
-}
-
-pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
- let socket = socket.await.expect("couldn't finish websocket connection");
- let addr = conn.remote_addr;
- let peers = &conn.state.peers;
- let mut cmd_tx = &conn.game_cmd_tx;
- println!("Incoming TCP connection from: {addr}");
+pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) {
+ let addr = conndata.remote_addr;
+ let peers = &conndata.peers;
+ let cmd_tx = conndata.cmd_tx.clone();
+ println!("Incoming TCP connection from: {}", conndata.remote_addr);
let mut peer_name = "unknown".to_string();
// for game -> conn comms
- let (tx, rx) = unbounded();
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// for server -> client comms
- let (outgoing, mut incoming) = socket.split();
+ let (mut outgoing, mut incoming) = socket.split();
let process_incoming = async {
while let Ok(cmd) = incoming.try_next().await {
@@ -112,7 +25,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
peers.remove(&addr);
}
for (paddr, p) in peers.iter() {
- if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) {
+ if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) {
println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
}
}
@@ -120,7 +33,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
}
// if it ain't text we can't handle it
if !cmd.is_text() { continue; }
- let cmd = cmd.to_text().unwrap();
+ let cmd = cmd.to_str().to_owned().unwrap();
let mut fields = cmd.split(" ");
let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
@@ -144,7 +57,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
{
let peers = peers.read().await;
for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
- let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
+ let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
if let Err(e) = r {
println!("error sending pos update: {e}");
}
@@ -160,7 +73,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
match parse_pos(fields) {
Some(pos) => {
println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await {
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
};
},
@@ -173,7 +86,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
match parse_pos(fields) {
Some(pos) => {
println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await {
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
println!("couldn't process \"{peer_name}\"'s flag command: {e}");
};
},
@@ -184,7 +97,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
},
"reset" => {
println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Reset).await {
+ if let Err(e) = cmd_tx.send(MetaMove::Reset) {
println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
}
},
@@ -206,8 +119,8 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
position: (0,0)
});
}
- tx.unbounded_send(Message::text(format!("id {id}"))).unwrap();
- if let Err(e) = cmd_tx.send(MetaMove::Dump).await {
+ tx.send(Message::text(format!("id {id}"))).unwrap();
+ if let Err(e) = cmd_tx.send(MetaMove::Dump) {
println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
}
},
@@ -217,10 +130,16 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
}
}
};
- let send_to_browser = rx.map(Ok).forward(outgoing);
+ let send_to_client = async move {
+ while let Some(m) = rx.recv().await {
+ if let Err(e) = outgoing.send(m).await {
+ println!("something went bad lol: {e}");
+ };
+ }
+ };
- pin_mut!(process_incoming, send_to_browser);
- futures_util::future::select(process_incoming, send_to_browser).await;
+ futures_util::pin_mut!(process_incoming, send_to_client);
+ futures_util::future::select(process_incoming, send_to_client).await;
println!("{addr} disconnected");
peers.write().await.remove(&addr);
diff --git a/src/main.rs b/src/main.rs
index cafc9d4..b3c3cf9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,33 +1,21 @@
-#[macro_use] extern crate anyhow;
-
-pub use anyhow::{ Result, Error };
-
use std::{
convert::Infallible,
collections::HashMap,
+ error::Error,
net::SocketAddr,
- sync::Arc,
};
+use warp::Filter;
mod types;
mod conn;
mod minesweeper;
use types::*;
-use hyper::{ Body, Request, Server };
-use hyper::server::conn::{ AddrStream, AddrIncoming };
-use hyper::service::{make_service_fn, service_fn, Service};
use tokio::sync::RwLock;
-use std::fs;
-
-use futures_util::{future, pin_mut, ready, stream::TryStreamExt, StreamExt, sink::SinkExt, TryFutureExt};
-use hyper_tungstenite::{ tungstenite::protocol::Message, HyperWebsocket };
-use tokio_rustls::{ rustls, Accept, server::TlsStream, TlsAcceptor };
-use rustls::{ServerConnection, SupportedCipherSuite, ProtocolVersion, ServerConfig,
- Certificate, PrivateKey};
+const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf");
-fn main() -> Result<()> {
+fn main() -> Result<(), Box<dyn Error>> {
let conf = Config {
cert_path: "./cert.pem".to_owned(),
pkey_path: "./cert.rsa".to_owned(),
@@ -40,91 +28,59 @@ fn main() -> Result<()> {
peers: PeerMap::new(RwLock::new(HashMap::new())),
};
- let cert_pem = fs::read(&state.conf.cert_path)?;
- let pkey_pem = fs::read(&state.conf.pkey_path)?;
+ tokio_main(state)
+}
- // Build TLS configuration.
- let tls_cfg = {
- let certs: Vec<Certificate> = rustls_pemfile::certs(&mut &*cert_pem)
- .map(|mut certs| certs.drain(..).map(Certificate).collect())?;
- if certs.len() < 1 {
- return Err(anyhow!("No certificates found"));
- }
- let mut keys: Vec<PrivateKey> = rustls_pemfile::rsa_private_keys(&mut &*pkey_pem)
- .map(|mut keys| keys.drain(..).map(PrivateKey).collect())?;
- if keys.len() < 1 {
- return Err(anyhow!("No private keys found"));
- }
+#[tokio::main]
+async fn tokio_main(state: State) -> Result<(), Box<dyn Error>> {
+ // Start the temporary single lobby
+ let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
+ let _game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone()));
- let mut tls_cfg = ServerConfig::builder()
- .with_safe_defaults()
- .with_no_client_auth()
- .with_single_cert(certs, keys.remove(0))?;
+ let cmd_filter = warp::any().map(move || cmd_tx.clone());
- // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order.
- tls_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
- tls_cfg
+ let page_route = {
+ use warp::*;
+ get()
+ .and(path("font.ttf"))
+ .map(|| FONT_FILE)
+ .or(fs::file(state.conf.page_path.clone()))
};
- tokio_main(state, tls_cfg)
-}
+ let websocket_route = {
+ let state = state.clone();
+ use warp::*;
+ path("ws")
+ .and(ws())
+ .and(addr::remote())
+ .and(cmd_filter)
+ .map(move |ws: warp::ws::Ws, saddr: Option<SocketAddr>, cmd_tx: CmdTx| {
+ let state = state.clone();
+ println!("conn from {saddr:?}");
+ ws.on_upgrade(move |socket| {
+ let conn_data = types::ConnData { remote_addr: saddr.unwrap(), cmd_tx: cmd_tx.clone(), peers: state.peers.clone() };
+ conn::peer_connection(socket, conn_data)
+ })
+ })
+ };
+ let routes = websocket_route.or(page_route);
-#[tokio::main]
-async fn tokio_main(state: State, tls_conf: ServerConfig) -> Result<()> {
- let server = main_server(state.clone(), tls_conf);
+ let server = warp::serve(routes)
+ .tls()
+ .cert_path(state.conf.cert_path)
+ .key_path(state.conf.pkey_path)
+ .run(state.conf.socket_addr);
println!("Serving on {}", state.conf.socket_addr);
- server.await?;
+ server.await;
Ok(())
}
-async fn main_server(state: State, tls_conf: ServerConfig) -> Result<()> {
- // Create a TCP listener, that uses TLS
- let listener = tokio::net::TcpListener::bind(&state.conf.socket_addr).await?;
- let local_addr = listener.local_addr()?;
- let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_conf));
- let http = hyper::server::conn::Http::new();
-
- // Start the temporary single lobby
- let (cmd_tx, cmd_rx) = futures::channel::mpsc::unbounded();
- let game_t = tokio::spawn(gameloop(cmd_rx, state.peers.clone()));
-
- loop {
- let (conn, remote_addr) = listener.accept().await?;
- let acceptor = acceptor.clone();
- let http = http.clone();
- let cloned_state = state.clone();
- let cmd_tx = cmd_tx.clone();
- let fut = async move {
- match acceptor.accept(conn).await {
- Ok(stream) => {
- let page = tokio::fs::read_to_string(&cloned_state.conf.page_path).await;
- if let Err(e) = page { return Err(anyhow!("couldn't read page file: {e}")); }
-
- let handler = conn::PerConnHandler {
- state: cloned_state,
- local_addr,
- remote_addr,
- game_cmd_tx: cmd_tx,
- page: page.unwrap(),
- };
- if let Err(e) = http.serve_connection(stream, handler).await {
- eprintln!("hyper error for {remote_addr}: {e}");
- }
- },
- Err(e) => eprintln!("TLS error for {remote_addr}: {e}"),
- }
- Ok(())
- };
- tokio::spawn(fut);
- }
-}
-
// If a move is made, broadcast new board, else just send current board
-async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
+async fn gameloop(mut move_rx: tokio::sync::mpsc::UnboundedReceiver<MetaMove>, peers: PeerMap) {
use minesweeper::*;
let mut game = Game::new(Board::new(75,35), (75*35)/8);
let mut latest_player_name = None;
- while let Some(req) = move_rx.next().await {
+ while let Some(req) = move_rx.recv().await {
let done = game.phase == Phase::Die || game.phase == Phase::Win;
match req {
MetaMove::Move(m, o) => if !done {
@@ -137,6 +93,7 @@ async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMov
MetaMove::Dump => (),
MetaMove::Reset => { game = Game::new(Board::new(75,35), (75*35)/8); },
}
+ use warp::ws::Message;
let mut reply = vec![Message::binary(game.board.render())];
let lpname = latest_player_name.as_ref().map(|s| s.as_str()).unwrap_or("unknown player");
match game.phase {
@@ -148,21 +105,11 @@ async fn gameloop(mut move_rx: futures::channel::mpsc::UnboundedReceiver<MetaMov
let peers = peers.read().await;
for (addr, p) in peers.iter() {
for r in reply.iter() {
- if let Err(e) = p.tx.unbounded_send(r.clone()) {
- println!("couldn't send game update {r} to {addr}: {e}");
+ if let Err(e) = p.tx.send(r.clone()) {
+ println!("couldn't send game update {r:?} to {addr}: {e}");
}
}
}
}
}
}
-
-fn error(err: String) -> std::io::Error {
- std::io::Error::new(std::io::ErrorKind::Other, err)
-}
-
-async fn shutdown_signal() {
- tokio::signal::ctrl_c()
- .await
- .expect("failed to install CTRL+C signal handler");
-}
diff --git a/src/types.rs b/src/types.rs
index eed69dd..73d51b8 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -3,10 +3,8 @@ use std::{
net::SocketAddr,
sync::Arc,
};
+use warp::ws::Message;
use tokio::sync::RwLock;
-use hyper_tungstenite::tungstenite::Message;
-use hyper::{ Response, Body };
-use futures::channel::mpsc::UnboundedSender;
use crate::minesweeper;
#[derive(Debug, Clone)]
@@ -32,12 +30,17 @@ pub enum MetaMove {
#[derive(Debug)]
pub struct Peer {
- pub tx: UnboundedSender<Message>,
+ pub tx: tokio::sync::mpsc::UnboundedSender<Message>,
pub seq_id: usize,
pub name: String,
pub position: (usize, usize),
}
-pub type HtmlResult = Result<Response<Body>, Response<Body>>;
-pub type MovReqTx = futures::channel::mpsc::UnboundedSender<MetaMove>;
+pub struct ConnData {
+ pub cmd_tx: CmdTx,
+ pub remote_addr: SocketAddr,
+ pub peers: PeerMap,
+}
+
+pub type CmdTx = tokio::sync::mpsc::UnboundedSender<MetaMove>;
pub type PeerMap = Arc<RwLock<HashMap<SocketAddr, Peer>>>;