use std::fmt::Debug; use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Mutex}; use bevy::ecs::event::Events; use bevy::ecs::system::SystemState; use bevy::prelude::{ App, CoreStage, EventReader, EventWriter, ExclusiveSystemDescriptorCoercion, IntoExclusiveSystem, NonSendMut, Plugin, ResMut, World, }; use thiserror::Error; use wasm_sockets::{EventClient, Message}; use crate::events::{FromSocketMessage, SocketControlEvent, SocketState, ToSocketMessage}; fn message_stack<T>() -> Arc<Mutex<Vec<T>>> { Arc::new(Mutex::new(Vec::<T>::new())) } #[derive(Error, Debug)] pub enum RemoteEventError { #[error("{0}")] WebsocketError(#[from] wasm_sockets::WebSocketError), } pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType> where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { pub status: SocketState, received_events: Arc<Mutex<Vec<ReceivedMessageType>>>, sent_events: Arc<Mutex<Vec<SentMessageType>>>, client: wasm_sockets::PollingClient, } pub trait ClosableExt { fn close(&mut self); } impl<SentMessageType, ReceivedMessageType> Drop for RemoteEventDispatcher<SentMessageType, ReceivedMessageType> where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { fn drop(&mut self) { self.client.close(); } } fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>( remote_host: T, ) -> crate::events::RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>> where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { let client = wasm_sockets::PollingClient::new(remote_host.to_string().as_str())?; Ok(RemoteEventDispatcher { client, sent_events: message_stack(), received_events: message_stack(), status: SocketState::Connected, }) } pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(mut world: &mut World) where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { let mut state: SystemState<( Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, EventReader<SocketControlEvent>, )> = SystemState::new(&mut world); let mut client_to_insert = None; let mut remove_client = false; { let (mut maybe_client, mut reader): ( Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, EventReader<SocketControlEvent>, ) = state.get_mut(&mut world); if let Some(ref client) = maybe_client { if client.client.status.borrow().deref() == &wasm_sockets::ConnectionStatus::Disconnected { #[rustfmt::skip] // rustfmt breaks the ending "();" to put ");" on its own line... let it = world.remove_non_send_resource::<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>(); drop(it); return; } } for event in reader.iter() { match event { SocketControlEvent::Create(url) => { if maybe_client.is_some() { log::warn!( "Trying to create a second socket connection while the first is alive" ) } else { match connect_to::<SentMessageType, ReceivedMessageType, String>( url.to_string(), ) { Ok(client) => { client_to_insert = Some(client); } Err(e) => { log::error!("Error creating socket connection: {}", e); } } } } SocketControlEvent::Disconnect => match maybe_client { Some(ref client) => { remove_client = true; } None => { log::warn!("There was no connection to disconnect from"); } }, } } }; if remove_client { #[rustfmt::skip] // rustfmt breaks the ending "();" to put ");" on its own line... let it = world.remove_non_send_resource::<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>(); drop(it); } if let Some(new_client) = client_to_insert { world.insert_non_send_resource(new_client); } } pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>( mut client: Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>, mut incoming: ResMut<Events<SentMessageType>>, mut outgoing: ResMut<Events<ReceivedMessageType>>, ) where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { if let Some(mut client) = client { for event in incoming.drain() { client.client.send_string(&event.to_text()); } for event in client.client.receive() { match event { Message::Text(text) => { outgoing.send(ReceivedMessageType::from_text(text)); } Message::Binary(bin) => { outgoing.send(ReceivedMessageType::from_binary(bin)); } } } } } #[derive(Default)] pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType> where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { _send: PhantomData<SentMessageType>, _recv: PhantomData<ReceivedMessageType>, } impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType> where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { pub fn new() -> Self { Self { _recv: Default::default(), _send: Default::default(), } } } impl<SentMessageType, ReceivedMessageType> Plugin for RemoteEventPlugin<SentMessageType, ReceivedMessageType> where ReceivedMessageType: FromSocketMessage + Send + Sync + 'static, SentMessageType: ToSocketMessage + Send + Sync + 'static, { fn build(&self, app: &mut App) { app.add_event::<SentMessageType>() .add_event::<ReceivedMessageType>() .add_event::<SocketControlEvent>() .add_system_to_stage( CoreStage::Last, socket_manager_system::<SentMessageType, ReceivedMessageType> .exclusive_system() .at_end(), ) .add_system_to_stage( CoreStage::PostUpdate, socket_event_wrangler::<SentMessageType, ReceivedMessageType> .exclusive_system() .at_end(), ); } }