summaryrefslogtreecommitdiff
path: root/src/conn.rs
diff options
context:
space:
mode:
authorstale <redkugelblitzin@gmail.com>2022-05-11 07:00:55 -0300
committerstale <redkugelblitzin@gmail.com>2022-05-11 07:00:55 -0300
commit5d3ab3dcf8deb887877523e0293237bcee804b1b (patch)
treead854cc97ae5558ac08a4417cb4b10f49a498a9c /src/conn.rs
parentac938b0d57d2e2630bf05668fec6b3d6c18d0450 (diff)
Ported, it woooorks
Diffstat (limited to 'src/conn.rs')
-rw-r--r--src/conn.rs133
1 files changed, 26 insertions, 107 deletions
diff --git a/src/conn.rs b/src/conn.rs
index 6d08942..fda3a2b 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -1,106 +1,19 @@
use crate::types::*;
-use std::{
- net::SocketAddr,
- convert::Infallible,
- pin::Pin,
- task::Poll,
-};
-use hyper::{ Request, Response, Body, service::Service };
-use hyper_tungstenite::{
- HyperWebsocket,
- tungstenite::Message,
-};
-use futures::{
- Future,
- SinkExt,
- StreamExt,
- TryStreamExt,
- TryFuture,
- TryFutureExt,
- channel::mpsc::unbounded,
- pin_mut,
-};
+use futures_util::{SinkExt, StreamExt, TryStreamExt};
+use warp::ws::Message;
-const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf");
-
-#[derive(Clone)]
-pub struct PerConnHandler {
- pub state: State,
- pub local_addr: SocketAddr,
- pub remote_addr: SocketAddr,
- pub game_cmd_tx: MovReqTx,
- pub page: String,
-}
-
-impl PerConnHandler {
- async fn handle_req(self, req: Request<Body>)
- -> std::result::Result<Response<Body>, Infallible>
- {
- if hyper_tungstenite::is_upgrade_request(&req) {
- let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket");
- tokio::spawn(async move {
- peer_connection(self, wsocket).await;
- });
- return Ok(resp);
- }
-
- //let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?;
- let mut uri_path = req.uri().path().split('/').skip(1);
- let actual_path = uri_path.next();
-
- use hyper::{ Method, StatusCode };
- match (req.method(), actual_path) {
- (&Method::GET, None | Some("")) => {
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(self.page))
- .unwrap())
- },
- (&Method::GET, Some("font.ttf")) => {
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(FONT_FILE))
- .unwrap())
- },
- _ => {
- Ok(Response::builder()
- .status(StatusCode::NOT_FOUND)
- .body(Body::empty())
- .unwrap())
- }
- }
- }
-}
-
-impl Service<Request<Body>> for PerConnHandler
-{
- type Response = Response<Body>;
- type Error = Infallible;
- type Future = Pin<Box<dyn Future<Output = std::result::Result<Self::Response,Self::Error>> + Send>>;
-
- fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let cloned_self = self.clone();
- Box::pin(cloned_self.handle_req(req))
- }
-}
-
-pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
- let socket = socket.await.expect("couldn't finish websocket connection");
- let addr = conn.remote_addr;
- let peers = &conn.state.peers;
- let mut cmd_tx = &conn.game_cmd_tx;
- println!("Incoming TCP connection from: {addr}");
+pub async fn peer_connection(socket: warp::ws::WebSocket, conndata: ConnData) {
+ let addr = conndata.remote_addr;
+ let peers = &conndata.peers;
+ let cmd_tx = conndata.cmd_tx.clone();
+ println!("Incoming TCP connection from: {}", conndata.remote_addr);
let mut peer_name = "unknown".to_string();
// for game -> conn comms
- let (tx, rx) = unbounded();
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// for server -> client comms
- let (outgoing, mut incoming) = socket.split();
+ let (mut outgoing, mut incoming) = socket.split();
let process_incoming = async {
while let Ok(cmd) = incoming.try_next().await {
@@ -112,7 +25,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
peers.remove(&addr);
}
for (paddr, p) in peers.iter() {
- if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) {
+ if let Err(e) = tx.send(Message::text("logoff {p.seqid} {p.name}")) {
println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
}
}
@@ -120,7 +33,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
}
// if it ain't text we can't handle it
if !cmd.is_text() { continue; }
- let cmd = cmd.to_text().unwrap();
+ let cmd = cmd.to_str().to_owned().unwrap();
let mut fields = cmd.split(" ");
let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
@@ -144,7 +57,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
{
let peers = peers.read().await;
for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
- let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
+ let r = peer_tx.send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
if let Err(e) = r {
println!("error sending pos update: {e}");
}
@@ -160,7 +73,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
match parse_pos(fields) {
Some(pos) => {
println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await {
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)) {
println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
};
},
@@ -173,7 +86,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
match parse_pos(fields) {
Some(pos) => {
println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await {
+ if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)) {
println!("couldn't process \"{peer_name}\"'s flag command: {e}");
};
},
@@ -184,7 +97,7 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
},
"reset" => {
println!("{cmd} from \"{peer_name}\"@{addr}");
- if let Err(e) = cmd_tx.send(MetaMove::Reset).await {
+ if let Err(e) = cmd_tx.send(MetaMove::Reset) {
println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
}
},
@@ -206,8 +119,8 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
position: (0,0)
});
}
- tx.unbounded_send(Message::text(format!("id {id}"))).unwrap();
- if let Err(e) = cmd_tx.send(MetaMove::Dump).await {
+ tx.send(Message::text(format!("id {id}"))).unwrap();
+ if let Err(e) = cmd_tx.send(MetaMove::Dump) {
println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
}
},
@@ -217,10 +130,16 @@ pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
}
}
};
- let send_to_browser = rx.map(Ok).forward(outgoing);
+ let send_to_client = async move {
+ while let Some(m) = rx.recv().await {
+ if let Err(e) = outgoing.send(m).await {
+ println!("something went bad lol: {e}");
+ };
+ }
+ };
- pin_mut!(process_incoming, send_to_browser);
- futures_util::future::select(process_incoming, send_to_browser).await;
+ futures_util::pin_mut!(process_incoming, send_to_client);
+ futures_util::future::select(process_incoming, send_to_client).await;
println!("{addr} disconnected");
peers.write().await.remove(&addr);