use crate::types::*;
use std::{
net::SocketAddr,
convert::Infallible,
pin::Pin,
task::Poll,
};
use hyper::{ Request, Response, Body, service::Service };
use hyper_tungstenite::{
HyperWebsocket,
tungstenite::Message,
};
use futures::{
Future,
SinkExt,
StreamExt,
TryStreamExt,
TryFuture,
TryFutureExt,
channel::mpsc::unbounded,
pin_mut,
};
const FONT_FILE: &[u8] = include_bytes!("./VT323-Regular.ttf");
#[derive(Clone)]
pub struct PerConnHandler {
pub state: State,
pub local_addr: SocketAddr,
pub remote_addr: SocketAddr,
pub game_cmd_tx: MovReqTx,
pub page: String,
}
impl PerConnHandler {
async fn handle_req(self, req: Request
)
-> std::result::Result, Infallible>
{
if hyper_tungstenite::is_upgrade_request(&req) {
let (resp, wsocket) = hyper_tungstenite::upgrade(req, None).expect("couldn't upgrade to websocket");
tokio::spawn(async move {
peer_connection(self, wsocket).await;
});
return Ok(resp);
}
//let page = tokio::fs::read_to_string(self.page).await.map_err(errpage)?;
let mut uri_path = req.uri().path().split('/').skip(1);
let actual_path = uri_path.next();
use hyper::{ Method, StatusCode };
match (req.method(), actual_path) {
(&Method::GET, None | Some("")) => {
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(self.page))
.unwrap())
},
(&Method::GET, Some("font.ttf")) => {
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(FONT_FILE))
.unwrap())
},
_ => {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap())
}
}
}
}
impl Service> for PerConnHandler
{
type Response = Response;
type Error = Infallible;
type Future = Pin> + Send>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
let cloned_self = self.clone();
Box::pin(cloned_self.handle_req(req))
}
}
pub async fn peer_connection(conn: PerConnHandler, socket: HyperWebsocket) {
let socket = socket.await.expect("couldn't finish websocket connection");
let addr = conn.remote_addr;
let peers = &conn.state.peers;
let mut cmd_tx = &conn.game_cmd_tx;
println!("Incoming TCP connection from: {addr}");
let mut peer_name = "unknown".to_string();
// for game -> conn comms
let (tx, rx) = unbounded();
// for server -> client comms
let (outgoing, mut incoming) = socket.split();
let process_incoming = async {
while let Ok(cmd) = incoming.try_next().await {
if let Some(cmd) = cmd {
if cmd.is_close() {
println!("closing \"{peer_name}\"@{addr}");
let mut peers = peers.write().await;
if let Some(_) = peers.get(&addr) {
peers.remove(&addr);
}
for (paddr, p) in peers.iter() {
if let Err(e) = tx.unbounded_send(Message::text("logoff {p.seqid} {p.name}")) {
println!("couldn't deliver logoff info to \"{}\"@{}: {}", p.name, paddr, e);
}
}
break;
}
// if it ain't text we can't handle it
if !cmd.is_text() { continue; }
let cmd = cmd.to_text().unwrap();
let mut fields = cmd.split(" ");
let parse_pos = |mut fields: std::str::Split<&str>| -> Option<(usize, usize)> {
let x = fields.next().and_then(|xstr| xstr.parse::().ok());
let y = fields.next().and_then(|ystr| ystr.parse::().ok());
x.zip(y)
};
if let Some(cmd_name) = fields.next() {
use crate::minesweeper::{Move,MoveType};
match cmd_name {
"pos" => {
match parse_pos(fields) {
Some(pos) => {
let (name, id) = {
let mut peers = peers.write().await;
let mut entry = peers.get_mut(&addr).unwrap();
entry.position = pos.clone();
(entry.name.clone(), entry.seq_id)
};
let sanitized_name = name.replace(" ", " ").to_string();
{
let peers = peers.read().await;
for peer_tx in peers.iter().filter(|(s, _)| **s != addr).map(|(_,p)| &p.tx) {
let r = peer_tx.unbounded_send(Message::text(format!("pos {id} {sanitized_name} {} {}", pos.0, pos.1)));
if let Err(e) = r {
println!("error sending pos update: {e}");
}
}
}
},
None => {
println!("bad position update from \"{peer_name}@{addr}\"");
},
}
},
"reveal" => {
match parse_pos(fields) {
Some(pos) => {
println!("{cmd} from \"{peer_name}\"@{addr}");
if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::Reveal, pos }, addr)).await {
println!("couldn't process \"{peer_name}\"'s reveal command: {e}");
};
},
None => {
println!("bad reveal from \"{peer_name}\"@{addr}");
}
}
},
"flag" => {
match parse_pos(fields) {
Some(pos) => {
println!("{cmd} from \"{peer_name}\"@{addr}");
if let Err(e) = cmd_tx.send(MetaMove::Move(Move { t: MoveType::ToggleFlag, pos }, addr)).await {
println!("couldn't process \"{peer_name}\"'s flag command: {e}");
};
},
None => {
println!("bad flag from \"{peer_name}\"@{addr}");
}
}
},
"reset" => {
println!("{cmd} from \"{peer_name}\"@{addr}");
if let Err(e) = cmd_tx.send(MetaMove::Reset).await {
println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
}
},
"register" => {
let name = fields.collect::>().join(&" ");
let id;
if name.is_empty() {
peer_name = "anon".to_string();
} else {
peer_name = name;
}
{ // new scope cuz paranoid bout deadlocks
let mut peers = peers.write().await;
id = peers.len();
peers.insert(addr, Peer {
tx: tx.clone(),
seq_id: id,
name: peer_name.clone(),
position: (0,0)
});
}
tx.unbounded_send(Message::text(format!("id {id}"))).unwrap();
if let Err(e) = cmd_tx.send(MetaMove::Dump).await {
println!("couldn't send game dump to \"{peer_name}\"@{addr}: {e}");
}
},
e => println!("unknown command {e:?} from {peer_name}@{addr}, \"{cmd}\""),
}
}
}
}
};
let send_to_browser = rx.map(Ok).forward(outgoing);
pin_mut!(process_incoming, send_to_browser);
futures_util::future::select(process_incoming, send_to_browser).await;
println!("{addr} disconnected");
peers.write().await.remove(&addr);
}