Skip to content
Snippets Groups Projects
Verified Commit aa7e62ad authored by Louis's avatar Louis :fire:
Browse files

Basic Impl

- Desktop support with websockets library. Runs a background thread to
  receive / send events, client does not need to block as it is send+sync

- Wasm support with wasm-sockets library. Uses an exclusive polling system
  at the end of each frame due to client not being thread safe (+ lack of threads)

- Send and receive messages
parent c7903121
No related branches found
No related tags found
No related merge requests found
/target
/Cargo.lock
[package]
name = "remote_events"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0.140", features = ["derive"] }
thiserror = "1.0.31"
log = "0.4.17"
[dependencies.bevy]
version = "0.7"
default-features = false
features = []
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
websocket = { version = "0.26.5" , default-features = false, features = ["sync", "sync-ssl", "bytes", "native-tls"]}
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-sockets = "0.2.2"
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use bevy::ecs::event::Events;
use bevy::prelude::{App, Commands, CoreStage, EventReader, Plugin, ResMut};
use thiserror::Error;
use websocket::{Message, OwnedMessage};
use crate::events::{
FromSocketMessage, RemoteEventResult, SocketControlEvent, SocketState, ToSocketMessage,
};
fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
Arc::new(Mutex::new(Vec::<T>::new()))
}
pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub status: SocketState,
received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
sent_events: Arc<Mutex<Vec<SentMessageType>>>,
receiver_handle: JoinHandle<()>,
sender_handle: JoinHandle<()>,
}
#[derive(Error, Debug)]
pub enum RemoteEventError {
#[error("{0}")]
ConnectionStringError(#[from] websocket::client::ParseError),
#[error("{0}")]
WebsocketError(#[from] websocket::WebSocketError),
#[error("{0}")]
IOError(#[from] std::io::Error),
}
fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
remote_host: T,
) -> RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
let mut client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?;
let mut sent_events = message_stack();
let mut received_events = message_stack();
let (mut recv_channel, mut send_channel) = client.split()?;
let mut recv = received_events.clone();
let receiver_handle = std::thread::spawn(move || {
for event in recv_channel.incoming_messages() {
match event {
Ok(OwnedMessage::Text(string)) => recv
.lock()
.unwrap()
.push(ReceivedMessageType::from_text(string)),
Ok(OwnedMessage::Binary(bin)) => recv
.lock()
.unwrap()
.push(ReceivedMessageType::from_binary(bin)),
Err(e) => {
log::error!("{}", e);
}
_ => {}
}
}
});
let mut send = sent_events.clone();
let sender_handle = std::thread::spawn(move || loop {
{
let mut lock = send.lock().unwrap();
if lock.len() > 0 {
let end = lock.len();
for i in 0..end {
let to_send: SentMessageType = lock.remove(i);
send_channel.send_message(&Message::text(to_send.to_text()));
}
}
};
std::thread::sleep(Duration::from_millis(75));
});
Ok(RemoteEventDispatcher {
status: SocketState::Connected,
sent_events,
received_events,
sender_handle,
receiver_handle,
})
}
pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(
mut commands: Commands,
mut maybe_client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
mut control_events: EventReader<SocketControlEvent>,
) where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
for event in control_events.iter() {
match event {
SocketControlEvent::Create(url) => {
if maybe_client.is_some() {
log::warn!(
"Trying to create a second socket connection while the first is alive"
)
} else {
match connect_to::<SentMessageType, ReceivedMessageType, String>(
url.to_string(),
) {
Ok(client) => {
commands.insert_resource(client);
}
Err(e) => {
log::error!("Error creating socket connection: {}", e);
}
}
}
}
SocketControlEvent::Disconnect => match maybe_client {
Some(ref client) => {
log::warn!("NOT IMPLEMENTED: Disconnect");
}
None => {
log::warn!("There was no connection to disconnect from");
}
},
}
}
}
pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
mut client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
mut received_batch: ResMut<Events<ReceivedMessageType>>,
mut sent_batch: ResMut<Events<SentMessageType>>,
) where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
if let Some(mut client) = client {
if let Ok(mut list) = client.sent_events.lock() {
for message in sent_batch.drain() {
list.push(message);
}
}
if let Ok(mut list) = client.received_events.lock() {
if list.len() > 0 {
let end = list.len();
for i in 0..end {
let message = list.remove(i);
received_batch.send(message);
}
}
}
}
}
#[derive(Default)]
pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
_send: PhantomData<SentMessageType>,
_recv: PhantomData<ReceivedMessageType>,
}
impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
_recv: Default::default(),
_send: Default::default(),
}
}
}
impl<SentMessageType, ReceivedMessageType> Plugin
for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
fn build(&self, app: &mut App) {
app.add_event::<SentMessageType>()
.add_event::<ReceivedMessageType>()
.add_event::<SocketControlEvent>()
.add_system_to_stage(
CoreStage::Last,
socket_manager_system::<SentMessageType, ReceivedMessageType>,
)
.add_system_to_stage(
CoreStage::PostUpdate,
socket_event_wrangler::<SentMessageType, ReceivedMessageType>,
);
}
}
use std::fmt::{Display, Formatter};
#[derive(Clone, Debug)]
pub enum SocketControlEvent {
Create(String),
Disconnect,
}
#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug, Default)]
pub enum SocketState {
#[default]
Pending,
Connected,
Disconnected,
Closed,
}
impl Display for SocketState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
pub trait FromSocketMessage {
fn from_text(value: String) -> Self;
fn from_binary(value: Vec<u8>) -> Self;
}
pub trait ToSocketMessage {
fn to_text(&self) -> String;
}
#[cfg(not(target_arch = "wasm32"))]
pub use crate::desktop::RemoteEventError;
#[cfg(target_arch = "wasm32")]
pub use crate::wasm::RemoteEventError;
pub type RemoteEventResult<T> = Result<T, RemoteEventError>;
use std::fmt::{Debug, Display, Formatter};
#[cfg(not(target_arch = "wasm32"))]
mod desktop;
pub mod events;
#[cfg(target_arch = "wasm32")]
mod wasm;
#[cfg(not(target_arch = "wasm32"))]
pub use self::desktop::RemoteEventDispatcher;
#[cfg(not(target_arch = "wasm32"))]
pub use self::desktop::RemoteEventPlugin;
#[cfg(target_arch = "wasm32")]
pub use self::wasm::RemoteEventDispatcher;
#[cfg(target_arch = "wasm32")]
pub use self::wasm::RemoteEventPlugin;
impl<SentMessageType, ReceivedMessageType> Debug
for RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: crate::events::FromSocketMessage + Send + Sync + 'static,
SentMessageType: crate::events::ToSocketMessage + Send + Sync + 'static,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteEventDispatcher")
.field("status", &self.status)
.finish()
}
}
impl<SentMessageType, ReceivedMessageType> Display
for RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: crate::events::FromSocketMessage + Send + Sync + 'static,
SentMessageType: crate::events::ToSocketMessage + Send + Sync + 'static,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RemoteEventDispatcher")
.field(&self.status)
.finish()
}
}
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use bevy::ecs::event::Events;
use bevy::ecs::system::SystemState;
use bevy::prelude::{
App, CoreStage, EventReader, EventWriter, ExclusiveSystemDescriptorCoercion,
IntoExclusiveSystem, NonSendMut, Plugin, ResMut, World,
};
use thiserror::Error;
use wasm_sockets::Message;
use crate::events::{FromSocketMessage, SocketControlEvent, SocketState, ToSocketMessage};
fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
Arc::new(Mutex::new(Vec::<T>::new()))
}
#[derive(Error, Debug)]
pub enum RemoteEventError {
#[error("{0}")]
WebsocketError(#[from] wasm_sockets::WebSocketError),
}
pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub status: SocketState,
received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
sent_events: Arc<Mutex<Vec<SentMessageType>>>,
client: wasm_sockets::PollingClient,
}
fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
remote_host: T,
) -> crate::events::RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
let client = wasm_sockets::PollingClient::new(remote_host.to_string().as_str())?;
Ok(RemoteEventDispatcher {
client,
sent_events: message_stack(),
received_events: message_stack(),
status: SocketState::Connected,
})
}
pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(mut world: &mut World)
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
let mut state: SystemState<(
Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
EventReader<SocketControlEvent>,
)> = SystemState::new(&mut world);
let mut client_to_insert = None;
{
let (mut maybe_client, mut reader): (
Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
EventReader<SocketControlEvent>,
) = state.get_mut(&mut world);
for event in reader.iter() {
match event {
SocketControlEvent::Create(url) => {
if maybe_client.is_some() {
log::warn!(
"Trying to create a second socket connection while the first is alive"
)
} else {
match connect_to::<SentMessageType, ReceivedMessageType, String>(
url.to_string(),
) {
Ok(client) => {
client_to_insert = Some(client);
}
Err(e) => {
log::error!("Error creating socket connection: {}", e);
}
}
}
}
SocketControlEvent::Disconnect => match maybe_client {
Some(ref client) => {
log::warn!("NOT IMPLEMENTED: Disconnect");
}
None => {
log::warn!("There was no connection to disconnect from");
}
},
}
}
};
if let Some(new_client) = client_to_insert {
world.insert_non_send_resource(new_client);
}
}
pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
mut client: Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
mut incoming: ResMut<Events<SentMessageType>>,
mut outgoing: ResMut<Events<ReceivedMessageType>>,
) where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
if let Some(mut client) = client {
for event in incoming.drain() {
client.client.send_string(&event.to_text());
}
for event in client.client.receive() {
match event {
Message::Text(text) => {
outgoing.send(ReceivedMessageType::from_text(text));
}
Message::Binary(bin) => {
outgoing.send(ReceivedMessageType::from_binary(bin));
}
}
}
}
}
#[derive(Default)]
pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
_send: PhantomData<SentMessageType>,
_recv: PhantomData<ReceivedMessageType>,
}
impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
_recv: Default::default(),
_send: Default::default(),
}
}
}
impl<SentMessageType, ReceivedMessageType> Plugin
for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
fn build(&self, app: &mut App) {
app.add_event::<SentMessageType>()
.add_event::<ReceivedMessageType>()
.add_event::<SocketControlEvent>()
.add_system_to_stage(
CoreStage::Last,
socket_manager_system::<SentMessageType, ReceivedMessageType>
.exclusive_system()
.at_end(),
)
.add_system_to_stage(
CoreStage::PostUpdate,
socket_event_wrangler::<SentMessageType, ReceivedMessageType>
.exclusive_system()
.at_end(),
);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment