diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..4fffb2f89cbd8f2169ce9914bd16bd43785bb368 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..ab575ff06152e7e92fcb39404329746844256c2d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "remote_events" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0.140", features = ["derive"] } +thiserror = "1.0.31" +log = "0.4.17" + +[dependencies.bevy] +version = "0.7" +default-features = false +features = [] + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +websocket = { version = "0.26.5" , default-features = false, features = ["sync", "sync-ssl", "bytes", "native-tls"]} + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-sockets = "0.2.2" diff --git a/src/desktop.rs b/src/desktop.rs new file mode 100644 index 0000000000000000000000000000000000000000..a3b5c153c1f0363926d0e4d5cc2bc95cc5a13903 --- /dev/null +++ b/src/desktop.rs @@ -0,0 +1,208 @@ +use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; +use std::time::Duration; + +use bevy::ecs::event::Events; +use bevy::prelude::{App, Commands, CoreStage, EventReader, Plugin, ResMut}; +use thiserror::Error; +use websocket::{Message, OwnedMessage}; + +use crate::events::{ + FromSocketMessage, RemoteEventResult, SocketControlEvent, SocketState, ToSocketMessage, +}; + +fn message_stack<T>() -> Arc<Mutex<Vec<T>>> { + Arc::new(Mutex::new(Vec::<T>::new())) +} + +pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + pub status: SocketState, + received_events: Arc<Mutex<Vec<ReceivedMessageType>>>, + sent_events: Arc<Mutex<Vec<SentMessageType>>>, + receiver_handle: JoinHandle<()>, + sender_handle: JoinHandle<()>, +} + +#[derive(Error, Debug)] +pub enum RemoteEventError { + #[error("{0}")] + ConnectionStringError(#[from] websocket::client::ParseError), + #[error("{0}")] + WebsocketError(#[from] websocket::WebSocketError), + #[error("{0}")] + IOError(#[from] std::io::Error), +} + +fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>( + remote_host: T, +) -> RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + let mut client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?; + let mut sent_events = message_stack(); + let mut received_events = message_stack(); + + let (mut recv_channel, mut send_channel) = client.split()?; + + let mut recv = received_events.clone(); + let receiver_handle = std::thread::spawn(move || { + for event in recv_channel.incoming_messages() { + match event { + Ok(OwnedMessage::Text(string)) => recv + .lock() + .unwrap() + .push(ReceivedMessageType::from_text(string)), + Ok(OwnedMessage::Binary(bin)) => recv + .lock() + .unwrap() + .push(ReceivedMessageType::from_binary(bin)), + Err(e) => { + log::error!("{}", e); + } + _ => {} + } + } + }); + + let mut send = sent_events.clone(); + let sender_handle = std::thread::spawn(move || loop { + { + let mut lock = send.lock().unwrap(); + if lock.len() > 0 { + let end = lock.len(); + for i in 0..end { + let to_send: SentMessageType = lock.remove(i); + send_channel.send_message(&Message::text(to_send.to_text())); + } + } + }; + + std::thread::sleep(Duration::from_millis(75)); + }); + + Ok(RemoteEventDispatcher { + status: SocketState::Connected, + sent_events, + received_events, + sender_handle, + receiver_handle, + }) +} + +pub fn socket_manager_system<SentMessageType, ReceivedMessageType>( + mut commands: Commands, + mut maybe_client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, + mut control_events: EventReader<SocketControlEvent>, +) where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + for event in control_events.iter() { + match event { + SocketControlEvent::Create(url) => { + if maybe_client.is_some() { + log::warn!( + "Trying to create a second socket connection while the first is alive" + ) + } else { + match connect_to::<SentMessageType, ReceivedMessageType, String>( + url.to_string(), + ) { + Ok(client) => { + commands.insert_resource(client); + } + Err(e) => { + log::error!("Error creating socket connection: {}", e); + } + } + } + } + SocketControlEvent::Disconnect => match maybe_client { + Some(ref client) => { + log::warn!("NOT IMPLEMENTED: Disconnect"); + } + None => { + log::warn!("There was no connection to disconnect from"); + } + }, + } + } +} + +pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>( + mut client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, + mut received_batch: ResMut<Events<ReceivedMessageType>>, + mut sent_batch: ResMut<Events<SentMessageType>>, +) where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + if let Some(mut client) = client { + if let Ok(mut list) = client.sent_events.lock() { + for message in sent_batch.drain() { + list.push(message); + } + } + + if let Ok(mut list) = client.received_events.lock() { + if list.len() > 0 { + let end = list.len(); + for i in 0..end { + let message = list.remove(i); + received_batch.send(message); + } + } + } + } +} + +#[derive(Default)] +pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + _send: PhantomData<SentMessageType>, + _recv: PhantomData<ReceivedMessageType>, +} + +impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + pub fn new() -> Self { + Self { + _recv: Default::default(), + _send: Default::default(), + } + } +} + +impl<SentMessageType, ReceivedMessageType> Plugin + for RemoteEventPlugin<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + fn build(&self, app: &mut App) { + app.add_event::<SentMessageType>() + .add_event::<ReceivedMessageType>() + .add_event::<SocketControlEvent>() + .add_system_to_stage( + CoreStage::Last, + socket_manager_system::<SentMessageType, ReceivedMessageType>, + ) + .add_system_to_stage( + CoreStage::PostUpdate, + socket_event_wrangler::<SentMessageType, ReceivedMessageType>, + ); + } +} diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000000000000000000000000000000000000..0661ba0b78df4118fa278371ef01c6a0fc3d999e --- /dev/null +++ b/src/events.rs @@ -0,0 +1,37 @@ +use std::fmt::{Display, Formatter}; + +#[derive(Clone, Debug)] +pub enum SocketControlEvent { + Create(String), + Disconnect, +} + +#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug, Default)] +pub enum SocketState { + #[default] + Pending, + Connected, + Disconnected, + Closed, +} + +impl Display for SocketState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +pub trait FromSocketMessage { + fn from_text(value: String) -> Self; + fn from_binary(value: Vec<u8>) -> Self; +} +pub trait ToSocketMessage { + fn to_text(&self) -> String; +} + +#[cfg(not(target_arch = "wasm32"))] +pub use crate::desktop::RemoteEventError; +#[cfg(target_arch = "wasm32")] +pub use crate::wasm::RemoteEventError; + +pub type RemoteEventResult<T> = Result<T, RemoteEventError>; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..2d59537d3953f726893287d153dcc703112625fa --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,42 @@ +use std::fmt::{Debug, Display, Formatter}; + +#[cfg(not(target_arch = "wasm32"))] +mod desktop; +pub mod events; +#[cfg(target_arch = "wasm32")] +mod wasm; + +#[cfg(not(target_arch = "wasm32"))] +pub use self::desktop::RemoteEventDispatcher; +#[cfg(not(target_arch = "wasm32"))] +pub use self::desktop::RemoteEventPlugin; +#[cfg(target_arch = "wasm32")] +pub use self::wasm::RemoteEventDispatcher; +#[cfg(target_arch = "wasm32")] +pub use self::wasm::RemoteEventPlugin; + +impl<SentMessageType, ReceivedMessageType> Debug + for RemoteEventDispatcher<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: crate::events::FromSocketMessage + Send + Sync + 'static, + SentMessageType: crate::events::ToSocketMessage + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RemoteEventDispatcher") + .field("status", &self.status) + .finish() + } +} + +impl<SentMessageType, ReceivedMessageType> Display + for RemoteEventDispatcher<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: crate::events::FromSocketMessage + Send + Sync + 'static, + SentMessageType: crate::events::ToSocketMessage + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("RemoteEventDispatcher") + .field(&self.status) + .finish() + } +} diff --git a/src/wasm.rs b/src/wasm.rs new file mode 100644 index 0000000000000000000000000000000000000000..70629aee519e495166e4fd08b33bdf87c9f0dd37 --- /dev/null +++ b/src/wasm.rs @@ -0,0 +1,180 @@ +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; + +use bevy::ecs::event::Events; +use bevy::ecs::system::SystemState; +use bevy::prelude::{ + App, CoreStage, EventReader, EventWriter, ExclusiveSystemDescriptorCoercion, + IntoExclusiveSystem, NonSendMut, Plugin, ResMut, World, +}; +use thiserror::Error; +use wasm_sockets::Message; + +use crate::events::{FromSocketMessage, SocketControlEvent, SocketState, ToSocketMessage}; + +fn message_stack<T>() -> Arc<Mutex<Vec<T>>> { + Arc::new(Mutex::new(Vec::<T>::new())) +} + +#[derive(Error, Debug)] +pub enum RemoteEventError { + #[error("{0}")] + WebsocketError(#[from] wasm_sockets::WebSocketError), +} + +pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + pub status: SocketState, + received_events: Arc<Mutex<Vec<ReceivedMessageType>>>, + sent_events: Arc<Mutex<Vec<SentMessageType>>>, + client: wasm_sockets::PollingClient, +} + +fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>( + remote_host: T, +) -> crate::events::RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + let client = wasm_sockets::PollingClient::new(remote_host.to_string().as_str())?; + Ok(RemoteEventDispatcher { + client, + sent_events: message_stack(), + received_events: message_stack(), + status: SocketState::Connected, + }) +} + +pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(mut world: &mut World) +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + let mut state: SystemState<( + Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, + EventReader<SocketControlEvent>, + )> = SystemState::new(&mut world); + + let mut client_to_insert = None; + + { + let (mut maybe_client, mut reader): ( + Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, + EventReader<SocketControlEvent>, + ) = state.get_mut(&mut world); + + for event in reader.iter() { + match event { + SocketControlEvent::Create(url) => { + if maybe_client.is_some() { + log::warn!( + "Trying to create a second socket connection while the first is alive" + ) + } else { + match connect_to::<SentMessageType, ReceivedMessageType, String>( + url.to_string(), + ) { + Ok(client) => { + client_to_insert = Some(client); + } + Err(e) => { + log::error!("Error creating socket connection: {}", e); + } + } + } + } + SocketControlEvent::Disconnect => match maybe_client { + Some(ref client) => { + log::warn!("NOT IMPLEMENTED: Disconnect"); + } + None => { + log::warn!("There was no connection to disconnect from"); + } + }, + } + } + }; + + if let Some(new_client) = client_to_insert { + world.insert_non_send_resource(new_client); + } +} + +pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>( + mut client: Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, + mut incoming: ResMut<Events<SentMessageType>>, + mut outgoing: ResMut<Events<ReceivedMessageType>>, +) where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + if let Some(mut client) = client { + for event in incoming.drain() { + client.client.send_string(&event.to_text()); + } + + for event in client.client.receive() { + match event { + Message::Text(text) => { + outgoing.send(ReceivedMessageType::from_text(text)); + } + Message::Binary(bin) => { + outgoing.send(ReceivedMessageType::from_binary(bin)); + } + } + } + } +} + +#[derive(Default)] +pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + _send: PhantomData<SentMessageType>, + _recv: PhantomData<ReceivedMessageType>, +} + +impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + pub fn new() -> Self { + Self { + _recv: Default::default(), + _send: Default::default(), + } + } +} + +impl<SentMessageType, ReceivedMessageType> Plugin + for RemoteEventPlugin<SentMessageType, ReceivedMessageType> +where + ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, + SentMessageType: ToSocketMessage + Send + Sync + 'static, +{ + fn build(&self, app: &mut App) { + app.add_event::<SentMessageType>() + .add_event::<ReceivedMessageType>() + .add_event::<SocketControlEvent>() + .add_system_to_stage( + CoreStage::Last, + socket_manager_system::<SentMessageType, ReceivedMessageType> + .exclusive_system() + .at_end(), + ) + .add_system_to_stage( + CoreStage::PostUpdate, + socket_event_wrangler::<SentMessageType, ReceivedMessageType> + .exclusive_system() + .at_end(), + ); + } +}