summaryrefslogtreecommitdiff
path: root/src/conn.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/conn.rs')
-rw-r--r--src/conn.rs227
1 files changed, 227 insertions, 0 deletions
diff --git a/src/conn.rs b/src/conn.rs
new file mode 100644
index 0000000..6d08942
--- /dev/null
+++ b/src/conn.rs
@@ -0,0 +1,227 @@
+use crate::types::*;
+use std::{
+ net::SocketAddr,
+ convert::Infallible,
+ pin::Pin,
+ task::Poll,
+};
+use hyper::{ Request, Response, Body, service::Service };
+use hyper_tungstenite::{
+ HyperWebsocket,
+ tungstenite::Message,
+};
+use futures::{
+ Future,
+ SinkExt,
+ StreamExt,
+ TryStreamExt,
+ TryFuture,
+ TryFutureExt,
+ channel::mpsc::unbounded,
+ pin_mut,
+};
+
+const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf");
+
+#[derive(Clone)]
+pub struct PerConnHandler {
+ pub state: State,
+ pub local_addr: SocketAddr,
+ pub remote_addr: SocketAddr,
+ pub game_cmd_tx: MovReqTx,
+ pub page: String,
+}
+
+impl PerConnHandler {
+ async fn handle_req(self, req: Request<Body>)
+ -> std::result::Result<Response<Body>, Infallible>
+ {
+ if hyper_tungstenite::is_upgrade_request(&req) {
+ let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket");
+ tokio::spawn(async move {
+ peer_connection(self, wsocket).await;
+ });
+ return Ok(resp);
+ }
+
+ //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?;
+ let mut uri_path = req.uri().path().split('/').skip(1);
+ let actual_path = uri_path.next();
+
+ use hyper::{ Method, StatusCode };
+ match (req.method(), actual_path) {
+ (&Method::GET, None | Some("")) => {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(self.page))
+ .unwrap())
+ },
+ (&Method::GET, Some("font.ttf")) => {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(FONT_FILE))
+ .unwrap())
+ },
+ _ => {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::empty())
+ .unwrap())
+ }
+ }
+ }
+}
+
+impl Service<Request<Body>> for PerConnHandler
+{
+ type Response = Response<Body>;
+ type Error = Infallible;
+ type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response,Self::Error>> + Send>>;
+
+ fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let cloned_self = self.clone();
+ Box::pin(cloned_self.handle_req(req))
+ }
+}
+
+pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
+ let socket = socket.await.expect("couldn't finish websocket connection");
+ let addr = conn.remote_addr;
+ let peers = &conn.state.peers;
+ let mut cmd_tx = &conn.game_cmd_tx;
+ println!("Incoming TCP connection from: {addr}");
+
+ let mut peer_name = "unknown".to_string();
+
+ // for game -> conn comms
+ let (tx, rx) = unbounded();
+ // for server -> client comms
+ let (outgoing, mut incoming) = socket.split();
+
+ let process_incoming = async {
+ while let Ok(cmd) = incoming.try_next().await {
+ if let Some(cmd) = cmd {
+ if cmd.is_close() {
+ println!("closing \"{peer_name}\"@{addr}");
+ let mut peers = peers.write().await;
+ if let Some(_) = peers.get(&addr) {
+ peers.remove(&addr);
+ }
+ for (paddr, p) in peers.iter() {
+ if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) {
+ println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
+ }
+ }
+ break;
+ }
+ // if it ain't text we can't handle it
+ if !cmd.is_text() { continue; }
+ let cmd = cmd.to_text().unwrap();
+
+ let mut fields = cmd.split(" ");
+ let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
+ let x = fields.next().and_then(|xstr| xstr.parse::<usize>().ok());
+ let y = fields.next().and_then(|ystr| ystr.parse::<usize>().ok());
+ x.zip(y)
+ };
+ if let Some(cmd_name) = fields.next() {
+ use crate::minesweeper::{Move,MoveType};
+ match cmd_name {
+ "pos" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ let (name, id) = {
+ let mut peers = peers.write().await;
+ let mut entry = peers.get_mut(&addr).unwrap();
+ entry.position = pos.clone();
+ (entry.name.clone(), entry.seq_id)
+ };
+ let sanitized_name = name.replace(" ", "&nbsp").to_string();
+ {
+ let peers = peers.read().await;
+ for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
+ let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
+ if let Err(e) = r {
+ println!("error sending pos update: {e}");
+ }
+ }
+ }
+ },
+ None => {
+ println!("bad position update from \"{peer_name}@{addr}\"");
+ },
+ }
+ },
+ "reveal" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from \"{peer_name}\"@{addr}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await {
+ println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
+ };
+ },
+ None => {
+ println!("bad reveal from \"{peer_name}\"@{addr}");
+ }
+ }
+ },
+ "flag" => {
+ match parse_pos(fields) {
+ Some(pos) => {
+ println!("{cmd} from \"{peer_name}\"@{addr}");
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await {
+ println!("couldn't process \"{peer_name}\"'s flag command: {e}");
+ };
+ },
+ None => {
+ println!("bad flag from \"{peer_name}\"@{addr}");
+ }
+ }
+ },
+ "reset" => {
+ println!("{cmd} from \"{peer_name}\"@{addr}");
+ if let Err(e) = cmd_tx.send(MetaMove::Reset).await {
+ println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
+ }
+ },
+ "register" => {
+ let name = fields.collect::<Vec<&str>>().join(&" ");
+ let id;
+ if name.is_empty() {
+ peer_name = "anon".to_string();
+ } else {
+ peer_name = name;
+ }
+ { // new scope cuz paranoid bout deadlocks
+ let mut peers = peers.write().await;
+ id = peers.len();
+ peers.insert(addr, Peer {
+ tx: tx.clone(),
+ seq_id: id,
+ name: peer_name.clone(),
+ position: (0,0)
+ });
+ }
+ tx.unbounded_send(Message::text(format!("id {id}"))).unwrap();
+ if let Err(e) = cmd_tx.send(MetaMove::Dump).await {
+ println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
+ }
+ },
+ e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
+ }
+ }
+ }
+ }
+ };
+ let send_to_browser = rx.map(Ok).forward(outgoing);
+
+ pin_mut!(process_incoming, send_to_browser);
+ futures_util::future::select(process_incoming, send_to_browser).await;
+
+ println!("{addr} disconnected");
+ peers.write().await.remove(&addr);
+}