Skip to content
Snippets Groups Projects
wasm.rs 6.09 KiB
Newer Older
Louis's avatar
Louis committed
use std::fmt::Debug;
use std::marker::PhantomData;
Louis's avatar
Louis committed
use std::ops::Deref;
Louis's avatar
Louis committed
use std::sync::{Arc, Mutex};

use bevy::ecs::event::Events;
use bevy::ecs::system::SystemState;
use bevy::prelude::{
	App, CoreStage, EventReader, EventWriter, ExclusiveSystemDescriptorCoercion,
	IntoExclusiveSystem, NonSendMut, Plugin, ResMut, World,
};
use thiserror::Error;
Louis's avatar
Louis committed
use wasm_sockets::{EventClient, Message};
Louis's avatar
Louis committed

use crate::events::{FromSocketMessage, SocketControlEvent, SocketState, ToSocketMessage};

fn message_stack<T>() -> Arc<Mutex<Vec<T>>> {
	Arc::new(Mutex::new(Vec::<T>::new()))
}

#[derive(Error, Debug)]
pub enum RemoteEventError {
	#[error("{0}")]
	WebsocketError(#[from] wasm_sockets::WebSocketError),
}

pub struct RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	pub status: SocketState,
	received_events: Arc<Mutex<Vec<ReceivedMessageType>>>,
	sent_events: Arc<Mutex<Vec<SentMessageType>>>,
	client: wasm_sockets::PollingClient,
}

Louis's avatar
Louis committed
pub trait ClosableExt {
	fn close(&mut self);
}

impl<SentMessageType, ReceivedMessageType> Drop
	for RemoteEventDispatcher<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	fn drop(&mut self) {
		self.client.close();
	}
}

Louis's avatar
Louis committed
fn connect_to<SentMessageType, ReceivedMessageType, T: ToString>(
	remote_host: T,
) -> crate::events::RemoteEventResult<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	let client = wasm_sockets::PollingClient::new(remote_host.to_string().as_str())?;
	Ok(RemoteEventDispatcher {
		client,
		sent_events: message_stack(),
		received_events: message_stack(),
		status: SocketState::Connected,
	})
}

pub fn socket_manager_system<SentMessageType, ReceivedMessageType>(mut world: &mut World)
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	let mut state: SystemState<(
		Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
		EventReader<SocketControlEvent>,
	)> = SystemState::new(&mut world);

	let mut client_to_insert = None;
Louis's avatar
Louis committed
	let mut remove_client = false;
Louis's avatar
Louis committed

	{
		let (mut maybe_client, mut reader): (
			Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
			EventReader<SocketControlEvent>,
		) = state.get_mut(&mut world);

Louis's avatar
Louis committed
		if let Some(ref client) = maybe_client {
			if client.client.status.borrow().deref()
				== &wasm_sockets::ConnectionStatus::Disconnected
			{
				#[rustfmt::skip]
				// rustfmt breaks the ending "();" to put ");" on its own line...
				let it = world.remove_non_send_resource::<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>();
				drop(it);
				return;
			}
		}

Louis's avatar
Louis committed
		for event in reader.iter() {
			match event {
				SocketControlEvent::Create(url) => {
					if maybe_client.is_some() {
						log::warn!(
							"Trying to create a second socket connection while the first is alive"
						)
					} else {
						match connect_to::<SentMessageType, ReceivedMessageType, String>(
							url.to_string(),
						) {
							Ok(client) => {
								client_to_insert = Some(client);
							}
							Err(e) => {
								log::error!("Error creating socket connection: {}", e);
							}
						}
					}
				}
				SocketControlEvent::Disconnect => match maybe_client {
					Some(ref client) => {
Louis's avatar
Louis committed
						remove_client = true;
Louis's avatar
Louis committed
					}
					None => {
						log::warn!("There was no connection to disconnect from");
					}
				},
			}
		}
	};

Louis's avatar
Louis committed
	if remove_client {
		#[rustfmt::skip]
		// rustfmt breaks the ending "();" to put ");" on its own line...
		let it = world.remove_non_send_resource::<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>();
		drop(it);
	}

Louis's avatar
Louis committed
	if let Some(new_client) = client_to_insert {
		world.insert_non_send_resource(new_client);
	}
}

pub fn socket_event_wrangler<SentMessageType, ReceivedMessageType>(
	mut client: Option<NonSendMut<RemoteEventDispatcher<SentMessageType, ReceivedMessageType>>>,
	mut incoming: ResMut<Events<SentMessageType>>,
	mut outgoing: ResMut<Events<ReceivedMessageType>>,
) where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	if let Some(mut client) = client {
		for event in incoming.drain() {
			client.client.send_string(&event.to_text());
		}

		for event in client.client.receive() {
			match event {
				Message::Text(text) => {
					outgoing.send(ReceivedMessageType::from_text(text));
				}
				Message::Binary(bin) => {
					outgoing.send(ReceivedMessageType::from_binary(bin));
				}
			}
		}
	}
}

#[derive(Default)]
pub struct RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	_send: PhantomData<SentMessageType>,
	_recv: PhantomData<ReceivedMessageType>,
}

impl<SentMessageType, ReceivedMessageType> RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	pub fn new() -> Self {
		Self {
			_recv: Default::default(),
			_send: Default::default(),
		}
	}
}

impl<SentMessageType, ReceivedMessageType> Plugin
	for RemoteEventPlugin<SentMessageType, ReceivedMessageType>
where
	ReceivedMessageType: FromSocketMessage + Send + Sync + 'static,
	SentMessageType: ToSocketMessage + Send + Sync + 'static,
{
	fn build(&self, app: &mut App) {
		app.add_event::<SentMessageType>()
			.add_event::<ReceivedMessageType>()
			.add_event::<SocketControlEvent>()
			.add_system_to_stage(
				CoreStage::Last,
				socket_manager_system::<SentMessageType, ReceivedMessageType>
					.exclusive_system()
					.at_end(),
			)
			.add_system_to_stage(
				CoreStage::PostUpdate,
				socket_event_wrangler::<SentMessageType, ReceivedMessageType>
					.exclusive_system()
					.at_end(),
			);
	}
}