From aa7e62ad3f13bfde01415fbaf362295af7022f9f Mon Sep 17 00:00:00 2001
From: Louis Capitanchik <contact@louiscap.co>
Date: Fri, 29 Jul 2022 23:46:02 +0100
Subject: [PATCH] Basic Impl

- Desktop support with websockets library. Runs a background thread to
  receive / send events, client does not need to block as it is send+sync

- Wasm support with wasm-sockets library. Uses an exclusive polling system
  at the end of each frame due to client not being thread safe (+ lack of threads)

- Send and receive messages
---
 .gitignore     |   2 +
 Cargo.toml     |  20 +++++
 src/desktop.rs | 208 +++++++++++++++++++++++++++++++++++++++++++++++++
 src/events.rs  |  37 +++++++++
 src/lib.rs     |  42 ++++++++++
 src/wasm.rs    | 180 ++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 489 insertions(+)
 create mode 100644 .gitignore
 create mode 100644 Cargo.toml
 create mode 100644 src/desktop.rs
 create mode 100644 src/events.rs
 create mode 100644 src/lib.rs
 create mode 100644 src/wasm.rs

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4fffb2f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+/target
+/Cargo.lock
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..ab575ff
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "remote_events"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+serde = { version = "1.0.140", features = ["derive"] }
+thiserror = "1.0.31"
+log = "0.4.17"
+
+[dependencies.bevy]
+version = "0.7"
+default-features = false
+features = []
+
+[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
+websocket = { version = "0.26.5" , default-features = false, features = ["sync", "sync-ssl", "bytes", "native-tls"]}
+
+[target.'cfg(target_arch = "wasm32")'.dependencies]
+wasm-sockets = "0.2.2"
diff --git a/src/desktop.rs b/src/desktop.rs
new file mode 100644
index 0000000..a3b5c15
--- /dev/null
+++ b/src/desktop.rs
@@ -0,0 +1,208 @@
+use std::marker::PhantomData;
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+use std::time::Duration;
+
+use bevy::ecs::event::Events;
+use bevy::prelude::{App, Commands, CoreStage, EventReader, Plugin, ResMut};
+use thiserror::Error;
+use websocket::{Message, OwnedMessage};
+
+use crate::events::{
+	FromSocketMessage, RemoteEventResult, SocketControlEvent, SocketState, ToSocketMessage,
+};
+
+fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
+	Arc::new(Mutex::new(Vec::<T>::new()))
+}
+
+pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	pub status: SocketState,
+	received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
+	sent_events: Arc<Mutex<Vec<SentMessageType>>>,
+	receiver_handle: JoinHandle<()>,
+	sender_handle: JoinHandle<()>,
+}
+
+#[derive(Error, Debug)]
+pub enum RemoteEventError {
+	#[error("{0}")]
+	ConnectionStringError(#[from] websocket::client::ParseError),
+	#[error("{0}")]
+	WebsocketError(#[from] websocket::WebSocketError),
+	#[error("{0}")]
+	IOError(#[from] std::io::Error),
+}
+
+fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
+	remote_host: T,
+) -> RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	let mut client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?;
+	let mut sent_events = message_stack();
+	let mut received_events = message_stack();
+
+	let (mut recv_channel, mut send_channel) = client.split()?;
+
+	let mut recv = received_events.clone();
+	let receiver_handle = std::thread::spawn(move || {
+		for event in recv_channel.incoming_messages() {
+			match event {
+				Ok(OwnedMessage::Text(string)) => recv
+					.lock()
+					.unwrap()
+					.push(ReceivedMessageType::from_text(string)),
+				Ok(OwnedMessage::Binary(bin)) => recv
+					.lock()
+					.unwrap()
+					.push(ReceivedMessageType::from_binary(bin)),
+				Err(e) => {
+					log::error!("{}", e);
+				}
+				_ => {}
+			}
+		}
+	});
+
+	let mut send = sent_events.clone();
+	let sender_handle = std::thread::spawn(move || loop {
+		{
+			let mut lock = send.lock().unwrap();
+			if lock.len() > 0 {
+				let end = lock.len();
+				for i in 0..end {
+					let to_send: SentMessageType = lock.remove(i);
+					send_channel.send_message(&Message::text(to_send.to_text()));
+				}
+			}
+		};
+
+		std::thread::sleep(Duration::from_millis(75));
+	});
+
+	Ok(RemoteEventDispatcher {
+		status: SocketState::Connected,
+		sent_events,
+		received_events,
+		sender_handle,
+		receiver_handle,
+	})
+}
+
+pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(
+	mut commands: Commands,
+	mut maybe_client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
+	mut control_events: EventReader<SocketControlEvent>,
+) where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	for event in control_events.iter() {
+		match event {
+			SocketControlEvent::Create(url) => {
+				if maybe_client.is_some() {
+					log::warn!(
+						"Trying to create a second socket connection while the first is alive"
+					)
+				} else {
+					match connect_to::<SentMessageType, ReceivedMessageType, String>(
+						url.to_string(),
+					) {
+						Ok(client) => {
+							commands.insert_resource(client);
+						}
+						Err(e) => {
+							log::error!("Error creating socket connection: {}", e);
+						}
+					}
+				}
+			}
+			SocketControlEvent::Disconnect => match maybe_client {
+				Some(ref client) => {
+					log::warn!("NOT IMPLEMENTED: Disconnect");
+				}
+				None => {
+					log::warn!("There was no connection to disconnect from");
+				}
+			},
+		}
+	}
+}
+
+pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
+	mut client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
+	mut received_batch: ResMut<Events<ReceivedMessageType>>,
+	mut sent_batch: ResMut<Events<SentMessageType>>,
+) where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	if let Some(mut client) = client {
+		if let Ok(mut list) = client.sent_events.lock() {
+			for message in sent_batch.drain() {
+				list.push(message);
+			}
+		}
+
+		if let Ok(mut list) = client.received_events.lock() {
+			if list.len() > 0 {
+				let end = list.len();
+				for i in 0..end {
+					let message = list.remove(i);
+					received_batch.send(message);
+				}
+			}
+		}
+	}
+}
+
+#[derive(Default)]
+pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	_send: PhantomData<SentMessageType>,
+	_recv: PhantomData<ReceivedMessageType>,
+}
+
+impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	pub fn new() -> Self {
+		Self {
+			_recv: Default::default(),
+			_send: Default::default(),
+		}
+	}
+}
+
+impl<SentMessageType, ReceivedMessageType> Plugin
+	for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	fn build(&self, app: &mut App) {
+		app.add_event::<SentMessageType>()
+			.add_event::<ReceivedMessageType>()
+			.add_event::<SocketControlEvent>()
+			.add_system_to_stage(
+				CoreStage::Last,
+				socket_manager_system::<SentMessageType, ReceivedMessageType>,
+			)
+			.add_system_to_stage(
+				CoreStage::PostUpdate,
+				socket_event_wrangler::<SentMessageType, ReceivedMessageType>,
+			);
+	}
+}
diff --git a/src/events.rs b/src/events.rs
new file mode 100644
index 0000000..0661ba0
--- /dev/null
+++ b/src/events.rs
@@ -0,0 +1,37 @@
+use std::fmt::{Display, Formatter};
+
+#[derive(Clone, Debug)]
+pub enum SocketControlEvent {
+	Create(String),
+	Disconnect,
+}
+
+#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug, Default)]
+pub enum SocketState {
+	#[default]
+	Pending,
+	Connected,
+	Disconnected,
+	Closed,
+}
+
+impl Display for SocketState {
+	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+		write!(f, "{:?}", self)
+	}
+}
+
+pub trait FromSocketMessage {
+	fn from_text(value: String) -> Self;
+	fn from_binary(value: Vec<u8>) -> Self;
+}
+pub trait ToSocketMessage {
+	fn to_text(&self) -> String;
+}
+
+#[cfg(not(target_arch = "wasm32"))]
+pub use crate::desktop::RemoteEventError;
+#[cfg(target_arch = "wasm32")]
+pub use crate::wasm::RemoteEventError;
+
+pub type RemoteEventResult<T> = Result<T, RemoteEventError>;
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..2d59537
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,42 @@
+use std::fmt::{Debug, Display, Formatter};
+
+#[cfg(not(target_arch = "wasm32"))]
+mod desktop;
+pub mod events;
+#[cfg(target_arch = "wasm32")]
+mod wasm;
+
+#[cfg(not(target_arch = "wasm32"))]
+pub use self::desktop::RemoteEventDispatcher;
+#[cfg(not(target_arch = "wasm32"))]
+pub use self::desktop::RemoteEventPlugin;
+#[cfg(target_arch = "wasm32")]
+pub use self::wasm::RemoteEventDispatcher;
+#[cfg(target_arch = "wasm32")]
+pub use self::wasm::RemoteEventPlugin;
+
+impl<SentMessageType, ReceivedMessageType> Debug
+	for RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: crate::events::FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: crate::events::ToSocketMessage + Send + Sync + 'static,
+{
+	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+		f.debug_struct("RemoteEventDispatcher")
+			.field("status", &self.status)
+			.finish()
+	}
+}
+
+impl<SentMessageType, ReceivedMessageType> Display
+	for RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: crate::events::FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: crate::events::ToSocketMessage + Send + Sync + 'static,
+{
+	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+		f.debug_tuple("RemoteEventDispatcher")
+			.field(&self.status)
+			.finish()
+	}
+}
diff --git a/src/wasm.rs b/src/wasm.rs
new file mode 100644
index 0000000..70629ae
--- /dev/null
+++ b/src/wasm.rs
@@ -0,0 +1,180 @@
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::{Arc, Mutex};
+
+use bevy::ecs::event::Events;
+use bevy::ecs::system::SystemState;
+use bevy::prelude::{
+	App, CoreStage, EventReader, EventWriter, ExclusiveSystemDescriptorCoercion,
+	IntoExclusiveSystem, NonSendMut, Plugin, ResMut, World,
+};
+use thiserror::Error;
+use wasm_sockets::Message;
+
+use crate::events::{FromSocketMessage, SocketControlEvent, SocketState, ToSocketMessage};
+
+fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
+	Arc::new(Mutex::new(Vec::<T>::new()))
+}
+
+#[derive(Error, Debug)]
+pub enum RemoteEventError {
+	#[error("{0}")]
+	WebsocketError(#[from] wasm_sockets::WebSocketError),
+}
+
+pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	pub status: SocketState,
+	received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
+	sent_events: Arc<Mutex<Vec<SentMessageType>>>,
+	client: wasm_sockets::PollingClient,
+}
+
+fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
+	remote_host: T,
+) -> crate::events::RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	let client = wasm_sockets::PollingClient::new(remote_host.to_string().as_str())?;
+	Ok(RemoteEventDispatcher {
+		client,
+		sent_events: message_stack(),
+		received_events: message_stack(),
+		status: SocketState::Connected,
+	})
+}
+
+pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(mut world: &mut World)
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	let mut state: SystemState<(
+		Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
+		EventReader<SocketControlEvent>,
+	)> = SystemState::new(&mut world);
+
+	let mut client_to_insert = None;
+
+	{
+		let (mut maybe_client, mut reader): (
+			Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
+			EventReader<SocketControlEvent>,
+		) = state.get_mut(&mut world);
+
+		for event in reader.iter() {
+			match event {
+				SocketControlEvent::Create(url) => {
+					if maybe_client.is_some() {
+						log::warn!(
+							"Trying to create a second socket connection while the first is alive"
+						)
+					} else {
+						match connect_to::<SentMessageType, ReceivedMessageType, String>(
+							url.to_string(),
+						) {
+							Ok(client) => {
+								client_to_insert = Some(client);
+							}
+							Err(e) => {
+								log::error!("Error creating socket connection: {}", e);
+							}
+						}
+					}
+				}
+				SocketControlEvent::Disconnect => match maybe_client {
+					Some(ref client) => {
+						log::warn!("NOT IMPLEMENTED: Disconnect");
+					}
+					None => {
+						log::warn!("There was no connection to disconnect from");
+					}
+				},
+			}
+		}
+	};
+
+	if let Some(new_client) = client_to_insert {
+		world.insert_non_send_resource(new_client);
+	}
+}
+
+pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
+	mut client: Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
+	mut incoming: ResMut<Events<SentMessageType>>,
+	mut outgoing: ResMut<Events<ReceivedMessageType>>,
+) where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	if let Some(mut client) = client {
+		for event in incoming.drain() {
+			client.client.send_string(&event.to_text());
+		}
+
+		for event in client.client.receive() {
+			match event {
+				Message::Text(text) => {
+					outgoing.send(ReceivedMessageType::from_text(text));
+				}
+				Message::Binary(bin) => {
+					outgoing.send(ReceivedMessageType::from_binary(bin));
+				}
+			}
+		}
+	}
+}
+
+#[derive(Default)]
+pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	_send: PhantomData<SentMessageType>,
+	_recv: PhantomData<ReceivedMessageType>,
+}
+
+impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	pub fn new() -> Self {
+		Self {
+			_recv: Default::default(),
+			_send: Default::default(),
+		}
+	}
+}
+
+impl<SentMessageType, ReceivedMessageType> Plugin
+	for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
+where
+	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
+	SentMessageType: ToSocketMessage + Send + Sync + 'static,
+{
+	fn build(&self, app: &mut App) {
+		app.add_event::<SentMessageType>()
+			.add_event::<ReceivedMessageType>()
+			.add_event::<SocketControlEvent>()
+			.add_system_to_stage(
+				CoreStage::Last,
+				socket_manager_system::<SentMessageType, ReceivedMessageType>
+					.exclusive_system()
+					.at_end(),
+			)
+			.add_system_to_stage(
+				CoreStage::PostUpdate,
+				socket_event_wrangler::<SentMessageType, ReceivedMessageType>
+					.exclusive_system()
+					.at_end(),
+			);
+	}
+}
-- 
GitLab