-
Louis authored
- Desktop support with websockets library. Runs a background thread to receive / send events, client does not need to block as it is send+sync - Wasm support with wasm-sockets library. Uses an exclusive polling system at the end of each frame due to client not being thread safe (+ lack of threads) - Send and receive messages
Verifiedaa7e62ad
wasm.rs 5.11 KiB
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use bevy::ecs::event::Events;
use bevy::ecs::system::SystemState;
use bevy::prelude::{
App, CoreStage, EventReader, EventWriter, ExclusiveSystemDescriptorCoercion,
IntoExclusiveSystem, NonSendMut, Plugin, ResMut, World,
};
use thiserror::Error;
use wasm_sockets::Message;
use crate::events::{FromSocketMessage, SocketControlEvent, SocketState, ToSocketMessage};
fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
Arc::new(Mutex::new(Vec::<T>::new()))
}
#[derive(Error, Debug)]
pub enum RemoteEventError {
#[error("{0}")]
WebsocketError(#[from] wasm_sockets::WebSocketError),
}
pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub status: SocketState,
received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
sent_events: Arc<Mutex<Vec<SentMessageType>>>,
client: wasm_sockets::PollingClient,
}
fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
remote_host: T,
) -> crate::events::RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
let client = wasm_sockets::PollingClient::new(remote_host.to_string().as_str())?;
Ok(RemoteEventDispatcher {
client,
sent_events: message_stack(),
received_events: message_stack(),
status: SocketState::Connected,
})
}
pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(mut world: &mut World)
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
let mut state: SystemState<(
Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
EventReader<SocketControlEvent>,
)> = SystemState::new(&mut world);
let mut client_to_insert = None;
{
let (mut maybe_client, mut reader): (
Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
EventReader<SocketControlEvent>,
) = state.get_mut(&mut world);
for event in reader.iter() {
match event {
SocketControlEvent::Create(url) => {
if maybe_client.is_some() {
log::warn!(
"Trying to create a second socket connection while the first is alive"
)
} else {
match connect_to::<SentMessageType, ReceivedMessageType, String>(
url.to_string(),
) {
Ok(client) => {
client_to_insert = Some(client);
}
Err(e) => {
log::error!("Error creating socket connection: {}", e);
}
}
}
}
SocketControlEvent::Disconnect => match maybe_client {
Some(ref client) => {
log::warn!("NOT IMPLEMENTED: Disconnect");
}
None => {
log::warn!("There was no connection to disconnect from");
}
},
}
}
};
if let Some(new_client) = client_to_insert {
world.insert_non_send_resource(new_client);
}
}
pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
mut client: Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
mut incoming: ResMut<Events<SentMessageType>>,
mut outgoing: ResMut<Events<ReceivedMessageType>>,
) where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
if let Some(mut client) = client {
for event in incoming.drain() {
client.client.send_string(&event.to_text());
}
for event in client.client.receive() {
match event {
Message::Text(text) => {
outgoing.send(ReceivedMessageType::from_text(text));
}
Message::Binary(bin) => {
outgoing.send(ReceivedMessageType::from_binary(bin));
}
}
}
}
}
#[derive(Default)]
pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
_send: PhantomData<SentMessageType>,
_recv: PhantomData<ReceivedMessageType>,
}
impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
_recv: Default::default(),
_send: Default::default(),
}
}
}
impl<SentMessageType, ReceivedMessageType> Plugin
for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
fn build(&self, app: &mut App) {
app.add_event::<SentMessageType>()
.add_event::<ReceivedMessageType>()
.add_event::<SocketControlEvent>()
.add_system_to_stage(
CoreStage::Last,
socket_manager_system::<SentMessageType, ReceivedMessageType>
.exclusive_system()
.at_end(),
)
.add_system_to_stage(
CoreStage::PostUpdate,
socket_event_wrangler::<SentMessageType, ReceivedMessageType>
.exclusive_system()
.at_end(),
);
}
}