Skip to content
Snippets Groups Projects
  • Louis's avatar
    Basic Impl · aa7e62ad
    Louis authored
    - Desktop support with websockets library. Runs a background thread to
      receive / send events, client does not need to block as it is send+sync
    
    - Wasm support with wasm-sockets library. Uses an exclusive polling system
      at the end of each frame due to client not being thread safe (+ lack of threads)
    
    - Send and receive messages
    Verified
    aa7e62ad
desktop.rs 5.75 KiB
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;

use bevy::ecs::event::Events;
use bevy::prelude::{App, Commands, CoreStage, EventReader, Plugin, ResMut};
use thiserror::Error;
use websocket::{Message, OwnedMessage};

use crate::events::{
	FromSocketMessage, RemoteEventResult, SocketControlEvent, SocketState, ToSocketMessage,
};

fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
	Arc::new(Mutex::new(Vec::<T>::new()))
}

pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	pub status: SocketState,
	received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
	sent_events: Arc<Mutex<Vec<SentMessageType>>>,
	receiver_handle: JoinHandle<()>,
	sender_handle: JoinHandle<()>,
}

#[derive(Error, Debug)]
pub enum RemoteEventError {
	#[error("{0}")]
	ConnectionStringError(#[from] websocket::client::ParseError),
	#[error("{0}")]
	WebsocketError(#[from] websocket::WebSocketError),
	#[error("{0}")]
	IOError(#[from] std::io::Error),
}

fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
	remote_host: T,
) -> RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	let mut client = websocket::ClientBuilder::new(&remote_host.to_string())?.connect_insecure()?;
	let mut sent_events = message_stack();
	let mut received_events = message_stack();

	let (mut recv_channel, mut send_channel) = client.split()?;

	let mut recv = received_events.clone();
	let receiver_handle = std::thread::spawn(move || {
		for event in recv_channel.incoming_messages() {
			match event {
				Ok(OwnedMessage::Text(string)) => recv
					.lock()
					.unwrap()
					.push(ReceivedMessageType::from_text(string)),
				Ok(OwnedMessage::Binary(bin)) => recv
					.lock()
					.unwrap()
					.push(ReceivedMessageType::from_binary(bin)),
				Err(e) => {
					log::error!("{}", e);
				}
				_ => {}
			}
		}
	});

	let mut send = sent_events.clone();
	let sender_handle = std::thread::spawn(move || loop {
		{
			let mut lock = send.lock().unwrap();
			if lock.len() > 0 {
				let end = lock.len();
				for i in 0..end {
					let to_send: SentMessageType = lock.remove(i);
					send_channel.send_message(&Message::text(to_send.to_text()));
				}
			}
		};

		std::thread::sleep(Duration::from_millis(75));
	});

	Ok(RemoteEventDispatcher {
		status: SocketState::Connected,
		sent_events,
		received_events,
		sender_handle,
		receiver_handle,
	})
}

pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(
	mut commands: Commands,
	mut maybe_client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
	mut control_events: EventReader<SocketControlEvent>,
) where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	for event in control_events.iter() {
		match event {
			SocketControlEvent::Create(url) => {
				if maybe_client.is_some() {
					log::warn!(
						"Trying to create a second socket connection while the first is alive"
					)
				} else {
					match connect_to::<SentMessageType, ReceivedMessageType, String>(
						url.to_string(),
					) {
						Ok(client) => {
							commands.insert_resource(client);
						}
						Err(e) => {
							log::error!("Error creating socket connection: {}", e);
						}
					}
				}
			}
			SocketControlEvent::Disconnect => match maybe_client {
				Some(ref client) => {
					log::warn!("NOT IMPLEMENTED: Disconnect");
				}
				None => {
					log::warn!("There was no connection to disconnect from");
				}
			},
		}
	}
}

pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
	mut client: Option<ResMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
	mut received_batch: ResMut<Events<ReceivedMessageType>>,
	mut sent_batch: ResMut<Events<SentMessageType>>,
) where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	if let Some(mut client) = client {
		if let Ok(mut list) = client.sent_events.lock() {
			for message in sent_batch.drain() {
				list.push(message);
			}
		}

		if let Ok(mut list) = client.received_events.lock() {
			if list.len() > 0 {
				let end = list.len();
				for i in 0..end {
					let message = list.remove(i);
					received_batch.send(message);
				}
			}
		}
	}
}

#[derive(Default)]
pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	_send: PhantomData<SentMessageType>,
	_recv: PhantomData<ReceivedMessageType>,
}

impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	pub fn new() -> Self {
		Self {
			_recv: Default::default(),
			_send: Default::default(),
		}
	}
}

impl<SentMessageType, ReceivedMessageType> Plugin
	for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	fn build(&self, app: &mut App) {
		app.add_event::<SentMessageType>()
			.add_event::<ReceivedMessageType>()
			.add_event::<SocketControlEvent>()
			.add_system_to_stage(
				CoreStage::Last,
				socket_manager_system::<SentMessageType, ReceivedMessageType>,
			)
			.add_system_to_stage(
				CoreStage::PostUpdate,
				socket_event_wrangler::<SentMessageType, ReceivedMessageType>,
			);
	}
}