-
Louis authored
- Desktop support with websockets library. Runs a background thread to receive / send events, client does not need to block as it is send+sync - Wasm support with wasm-sockets library. Uses an exclusive polling system at the end of each frame due to client not being thread safe (+ lack of threads) - Send and receive messages
Verifiedaa7e62ad
desktop.rs 5.75 KiB
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use bevy::ecs::event::Events;
use bevy::prelude::{App, Commands, CoreStage, EventReader, Plugin, ResMut};
use thiserror::Error;
use websocket::{Message, OwnedMessage};
use crate::events::{
FromSocketMessage, RemoteEventResult, SocketControlEvent, SocketState, ToSocketMessage,
};
fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
Arc::new(Mutex::new(Vec::<T>::new()))
}
pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub status: SocketState,
received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
sent_events: Arc<Mutex<Vec<SentMessageType>>>,
receiver_handle: JoinHandle<()>,
sender_handle: JoinHandle<()>,
}
#[derive(Error, Debug)]
pub enum RemoteEventError {
#[error("{0}")]
ConnectionStringError(#[from] websocket::client::ParseError),
#[error("{0}")]
WebsocketError(#[from] websocket::WebSocketError),
#[error("{0}")]
IOError(#[from] std::io::Error),
}
fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
remote_host: T,
) -> RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
let mut client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?;
let mut sent_events = message_stack();
let mut received_events = message_stack();
let (mut recv_channel, mut send_channel) = client.split()?;
let mut recv = received_events.clone();
let receiver_handle = std::thread::spawn(move || {
for event in recv_channel.incoming_messages() {
match event {
Ok(OwnedMessage::Text(string)) => recv
.lock()
.unwrap()
.push(ReceivedMessageType::from_text(string)),
Ok(OwnedMessage::Binary(bin)) => recv
.lock()
.unwrap()
.push(ReceivedMessageType::from_binary(bin)),
Err(e) => {
log::error!("{}", e);
}
_ => {}
}
}
});
let mut send = sent_events.clone();
let sender_handle = std::thread::spawn(move || loop {
{
let mut lock = send.lock().unwrap();
if lock.len() > 0 {
let end = lock.len();
for i in 0..end {
let to_send: SentMessageType = lock.remove(i);
send_channel.send_message(&Message::text(to_send.to_text()));
}
}
};
std::thread::sleep(Duration::from_millis(75));
});
Ok(RemoteEventDispatcher {
status: SocketState::Connected,
sent_events,
received_events,
sender_handle,
receiver_handle,
})
}
pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(
mut commands: Commands,
mut maybe_client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
mut control_events: EventReader<SocketControlEvent>,
) where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
for event in control_events.iter() {
match event {
SocketControlEvent::Create(url) => {
if maybe_client.is_some() {
log::warn!(
"Trying to create a second socket connection while the first is alive"
)
} else {
match connect_to::<SentMessageType, ReceivedMessageType, String>(
url.to_string(),
) {
Ok(client) => {
commands.insert_resource(client);
}
Err(e) => {
log::error!("Error creating socket connection: {}", e);
}
}
}
}
SocketControlEvent::Disconnect => match maybe_client {
Some(ref client) => {
log::warn!("NOT IMPLEMENTED: Disconnect");
}
None => {
log::warn!("There was no connection to disconnect from");
}
},
}
}
}
pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
mut client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
mut received_batch: ResMut<Events<ReceivedMessageType>>,
mut sent_batch: ResMut<Events<SentMessageType>>,
) where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
if let Some(mut client) = client {
if let Ok(mut list) = client.sent_events.lock() {
for message in sent_batch.drain() {
list.push(message);
}
}
if let Ok(mut list) = client.received_events.lock() {
if list.len() > 0 {
let end = list.len();
for i in 0..end {
let message = list.remove(i);
received_batch.send(message);
}
}
}
}
}
#[derive(Default)]
pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
_send: PhantomData<SentMessageType>,
_recv: PhantomData<ReceivedMessageType>,
}
impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
_recv: Default::default(),
_send: Default::default(),
}
}
}
impl<SentMessageType, ReceivedMessageType> Plugin
for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
fn build(&self, app: &mut App) {
app.add_event::<SentMessageType>()
.add_event::<ReceivedMessageType>()
.add_event::<SocketControlEvent>()
.add_system_to_stage(
CoreStage::Last,
socket_manager_system::<SentMessageType, ReceivedMessageType>,
)
.add_system_to_stage(
CoreStage::PostUpdate,
socket_event_wrangler::<SentMessageType, ReceivedMessageType>,
);
}
}