From 1f6e35cb95b01be158a3e0ddb076f83c2062631c Mon Sep 17 00:00:00 2001 From: Louis Capitanchik <contact@louiscap.co> Date: Sat, 30 Jul 2022 15:39:57 +0100 Subject: [PATCH] More graceful cleanup on desktop, support user initiated disconnect --- Cargo.toml | 2 + README.md | 3 +- src/desktop.rs | 134 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 110 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab575ff..87b68ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ name = "remote_events" version = "0.1.0" edition = "2021" +license = "BSD-3-Clause" + [dependencies] serde = { version = "1.0.140", features = ["derive"] } thiserror = "1.0.31" diff --git a/README.md b/README.md index a4ba0e0..9b01bf5 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ This library is quite immature; many features still need to be added for it to b - [x] WS Connection from desktop - [x] WS Connection from wasm - [x] Frame independent event buffering -- [ ] User initiated disconnect +- [x] User initiated disconnect on desktop +- [ ] User initiated disconnect on wasm - [ ] Automatic reconnection - [x] Map received events to user defined types - [x] Send events of a user defined type diff --git a/src/desktop.rs b/src/desktop.rs index a3b5c15..b54300c 100644 --- a/src/desktop.rs +++ b/src/desktop.rs @@ -1,4 +1,7 @@ +use std::collections::VecDeque; +use std::io::ErrorKind; use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::time::Duration; @@ -6,14 +9,14 @@ use std::time::Duration; use bevy::ecs::event::Events; use bevy::prelude::{App, Commands, CoreStage, EventReader, Plugin, ResMut}; use thiserror::Error; -use websocket::{Message, OwnedMessage}; +use websocket::{Message, OwnedMessage, WebSocketError}; use crate::events::{ FromSocketMessage, RemoteEventResult, SocketControlEvent, SocketState, ToSocketMessage, }; -fn message_stack<T>() -> Arc<Mutex<Vec<T>>> { - Arc::new(Mutex::new(Vec::<T>::new())) +fn message_stack<T>() -> Arc<Mutex<VecDeque<T>>> { + Arc::new(Mutex::new(VecDeque::<T>::new())) } pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType> @@ -22,10 +25,23 @@ where SentMessageType: ToSocketMessage + Send + Sync + 'static, { pub status: SocketState, - received_events: Arc<Mutex<Vec<ReceivedMessageType>>>, - sent_events: Arc<Mutex<Vec<SentMessageType>>>, + received_events: Arc<Mutex<VecDeque<ReceivedMessageType>>>, + sent_events: Arc<Mutex<VecDeque<SentMessageType>>>, + control_events: Arc<Mutex<VecDeque<SocketControlEvent>>>, receiver_handle: JoinHandle<()>, sender_handle: JoinHandle<()>, + kill_sig: Arc<AtomicBool>, +} + +impl<SentMessageType, ReceivedMessageType> + RemoteEventDispatcher<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + pub fn shutdown(&mut self) { + self.kill_sig.store(true, Ordering::Relaxed); + } } #[derive(Error, Debug)] @@ -45,45 +61,77 @@ where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { - let mut client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?; - let mut sent_events = message_stack(); - let mut received_events = message_stack(); + let client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?; + let sent_events = message_stack::<SentMessageType>(); + let received_events = message_stack(); + let control_events = message_stack(); + + let mut kill_sig = Arc::new(AtomicBool::new(false)); let (mut recv_channel, mut send_channel) = client.split()?; let mut recv = received_events.clone(); + let mut recv_control = control_events.clone(); let receiver_handle = std::thread::spawn(move || { for event in recv_channel.incoming_messages() { match event { Ok(OwnedMessage::Text(string)) => recv .lock() .unwrap() - .push(ReceivedMessageType::from_text(string)), + .push_back(ReceivedMessageType::from_text(string)), Ok(OwnedMessage::Binary(bin)) => recv .lock() .unwrap() - .push(ReceivedMessageType::from_binary(bin)), + .push_back(ReceivedMessageType::from_binary(bin)), Err(e) => { - log::error!("{}", e); + match e { + WebSocketError::NoDataAvailable => {} + _ => log::error!("Socket Error: {}", e), + } + break; } _ => {} } } + if let Err(ref err) = recv_channel.shutdown_all() { + match &err.kind() { + ErrorKind::NotConnected => {} + _ => log::error!("{}", err), + } + } + + recv_control + .lock() + .unwrap() + .push_front(SocketControlEvent::Disconnect); }); let mut send = sent_events.clone(); + let mut send_control = control_events.clone(); + let send_kill_sig = kill_sig.clone(); let sender_handle = std::thread::spawn(move || loop { { let mut lock = send.lock().unwrap(); - if lock.len() > 0 { - let end = lock.len(); - for i in 0..end { - let to_send: SentMessageType = lock.remove(i); - send_channel.send_message(&Message::text(to_send.to_text())); - } + while let Some(to_send) = lock.pop_front() { + if let Err(e) = send_channel.send_message(&Message::text(to_send.to_text())) { + log::error!("{}", e); + }; } }; + if send_kill_sig.load(Ordering::Relaxed) { + if let Err(ref err) = send_channel.shutdown_all() { + match &err.kind() { + ErrorKind::NotConnected => {} + _ => log::error!("{}", err), + } + } + + send_control + .lock() + .unwrap() + .push_front(SocketControlEvent::Disconnect); + } std::thread::sleep(Duration::from_millis(75)); }); @@ -91,8 +139,10 @@ where status: SocketState::Connected, sent_events, received_events, + control_events, sender_handle, receiver_handle, + kill_sig, }) } @@ -104,6 +154,36 @@ pub fn socket_manager_system<SentMessageType, ReceivedMessageType>( ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { + if let Some(ref mut client) = maybe_client { + let mut reboot_client = None; + if let Ok(mut control_events) = client.control_events.lock() { + while let Some(event) = control_events.pop_front() { + match event { + SocketControlEvent::Disconnect => { + #[rustfmt::skip] + // rustfmt breaks the ending "();" to put ");" on its own line... + commands.remove_resource::<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>(); + } + SocketControlEvent::Create(url) => { + reboot_client = Some(url); + } + } + } + } + + if let Some(url) = reboot_client { + client.shutdown(); + match connect_to::<SentMessageType, ReceivedMessageType, String>(url.to_string()) { + Ok(client) => { + commands.insert_resource(client); + } + Err(e) => { + log::error!("Error creating socket connection: {}", e); + } + } + } + } + for event in control_events.iter() { match event { SocketControlEvent::Create(url) => { @@ -125,8 +205,11 @@ pub fn socket_manager_system<SentMessageType, ReceivedMessageType>( } } SocketControlEvent::Disconnect => match maybe_client { - Some(ref client) => { - log::warn!("NOT IMPLEMENTED: Disconnect"); + Some(ref mut client) => { + client.shutdown(); + #[rustfmt::skip] + // rustfmt breaks the ending "();" to put ");" on its own line... + commands.remove_resource::<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>(); } None => { log::warn!("There was no connection to disconnect from"); @@ -147,18 +230,13 @@ pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>( if let Some(mut client) = client { if let Ok(mut list) = client.sent_events.lock() { for message in sent_batch.drain() { - list.push(message); + list.push_back(message); } } - if let Ok(mut list) = client.received_events.lock() { - if list.len() > 0 { - let end = list.len(); - for i in 0..end { - let message = list.remove(i); - received_batch.send(message); - } - } + let mut lock = client.received_events.lock().unwrap(); + while let Some(message) = lock.pop_front() { + received_batch.send(message); } } } -- GitLab