Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at writing the library #1

Merged
merged 8 commits into from
Feb 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,19 @@ edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
# TODO: these should not be the default
default = ["async-tungstenite", "cynic"]
obmarg marked this conversation as resolved.
Show resolved Hide resolved

[dependencies]
async_executors = { version = "0.4" }
futures = { version = "0.3"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
uuid = { version = "0.8", features = ["v4"] }

async-tungstenite = { version = "0.12", optional = true }
obmarg marked this conversation as resolved.
Show resolved Hide resolved

#cynic = { version = "0.11", optional = true }
cynic = { path = "/Users/graeme/src/cynic/cynic", optional = true }
obmarg marked this conversation as resolved.
Show resolved Hide resolved
317 changes: 317 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
use std::{collections::HashMap, marker::PhantomData, sync::Arc};

use async_executors::{JoinHandle, SpawnHandle, SpawnHandleExt};
use futures::{
channel::{mpsc, oneshot},
lock::Mutex,
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use uuid::Uuid;

use super::{
graphql::{self, GraphqlOperation},
protocol::{ConnectionAck, ConnectionInit, Event, Message},
websockets::WebsocketMessage,
};

// TODO: Make this customisable somehow.
const SUBSCRIPTION_BUFFER_SIZE: usize = 5;
obmarg marked this conversation as resolved.
Show resolved Hide resolved

/// A websocket client
///
// TODO: Is this a good name?
// TODO: example
obmarg marked this conversation as resolved.
Show resolved Hide resolved
pub struct AsyncWebsocketClient<GraphqlClient, WsMessage>
where
GraphqlClient: graphql::GraphqlClient,
{
inner: Arc<ClientInner<GraphqlClient>>,
sender_sink: mpsc::Sender<WsMessage>,
phantom: PhantomData<*const GraphqlClient>,
}

// TODO: Better error (somehow)
#[derive(thiserror::Error, Debug)]
#[error("Something went wrong")]
pub struct Error {}
obmarg marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Docstrings, book etc.
obmarg marked this conversation as resolved.
Show resolved Hide resolved

impl<GraphqlClient, WsMessage> AsyncWebsocketClient<GraphqlClient, WsMessage>
where
WsMessage: WebsocketMessage + Send + 'static,
GraphqlClient: crate::graphql::GraphqlClient + 'static,
{
/// Constructs an AsyncWebsocketClient
///
/// Accepts a stream and a sink for the underlying websocket connection,
/// and an `async_executors::SpawnHandle` that tells the client which
/// async runtime to use.
///
/// TODO: example
pub async fn new(
mut websocket_stream: impl Stream<Item = Result<WsMessage, WsMessage::Error>>
+ Unpin
+ Send
+ 'static,
mut websocket_sink: impl Sink<WsMessage, Error = WsMessage::Error> + Unpin + Send + 'static,
runtime: impl SpawnHandle<()>,
) -> Result<Self, Error> {
// TODO: actual error handling, ditch unwraps
obmarg marked this conversation as resolved.
Show resolved Hide resolved

websocket_sink
.send(json_message(ConnectionInit::new()).unwrap())
.await
.map_err(|_| Error {})?;

match websocket_stream.next().await {
None => todo!(),
Some(Err(_)) => todo!(),
Some(Ok(data)) => {
decode_message::<ConnectionAck<()>, _>(data).unwrap();
println!("Connection acked");
}
}

let (shutdown_sender, shutdown_receiver) = oneshot::channel();

let operations = Arc::new(Mutex::new(HashMap::new()));

// TODO: Don't force the graphql error type here to be ()
obmarg marked this conversation as resolved.
Show resolved Hide resolved
let receiver_handle = runtime
.spawn_handle(receiver_loop::<_, _, GraphqlClient>(
websocket_stream,
Arc::clone(&operations),
shutdown_sender,
))
.unwrap();

let (sender_sink, sender_stream) = mpsc::channel(1);

let sender_handle = runtime
.spawn_handle(sender_loop(
sender_stream,
websocket_sink,
Arc::clone(&operations),
shutdown_receiver,
))
.unwrap();

Ok(AsyncWebsocketClient {
inner: Arc::new(ClientInner {
receiver_handle,
operations,
sender_handle,
}),
sender_sink,
phantom: PhantomData,
})
}

/*
pub async fn operation<'a, T: 'a>(&self, _op: Operation<'a, T>) -> Result<(), ()> {
todo!()
// Probably hook into streaming operation and do a take 1 -> into_future
}*/
obmarg marked this conversation as resolved.
Show resolved Hide resolved

/// Starts a streaming operation on this client.
///
/// Returns a `Stream` of responses.
pub async fn streaming_operation<Operation>(
&mut self,
op: Operation,
) -> impl Stream<Item = Operation::Response>
where
Operation: GraphqlOperation<GenericResponse = GraphqlClient::Response>,
{
// TODO: no unwraps
let id = Uuid::new_v4();
let (sender, receiver) = mpsc::channel(SUBSCRIPTION_BUFFER_SIZE);

// TODO: THink I need to move operations map into the futures
self.inner.operations.lock().await.insert(id, sender);

let msg = json_message(Message::Subscribe {
id: id.to_string(),
payload: &op,
})
.unwrap();

self.sender_sink.send(msg).await.unwrap();

// TODO: This needs to return a type that
// has close & some sort of status func on it.
// Have the receiver send details and have that intercepted
// by this type and stored.
obmarg marked this conversation as resolved.
Show resolved Hide resolved
receiver.map(move |response| op.decode(response).unwrap())
}
}

type OperationSender<GenericResponse> = mpsc::Sender<GenericResponse>;

type OperationMap<GenericResponse> = Arc<Mutex<HashMap<Uuid, OperationSender<GenericResponse>>>>;

// TODO: Think about whether there's actually some Arc cycles here
// that I need to care about
obmarg marked this conversation as resolved.
Show resolved Hide resolved

async fn receiver_loop<S, WsMessage, GraphqlClient>(
mut receiver: S,
operations: OperationMap<GraphqlClient::Response>,
shutdown: oneshot::Sender<()>,
) where
S: Stream<Item = Result<WsMessage, WsMessage::Error>> + Unpin,
WsMessage: WebsocketMessage,
GraphqlClient: crate::graphql::GraphqlClient,
{
// TODO: Ok, so basically need a oneshot from here -> sender that
// tells the sender to stop. It can close it's incoming, drain it's stream
// and then close the streams in the HashMap.
obmarg marked this conversation as resolved.
Show resolved Hide resolved
while let Some(msg) = receiver.next().await {
println!("Received message: {:?}", msg);
if handle_message::<WsMessage, GraphqlClient>(msg, &operations)
.await
.is_err()
{
println!("Error happened, killing things");
break;
}
}

shutdown.send(()).expect("Couldn't shutdown sender");
}

type BoxError = Box<dyn std::error::Error>;

async fn handle_message<WsMessage, GraphqlClient>(
msg: Result<WsMessage, WsMessage::Error>,
operations: &OperationMap<GraphqlClient::Response>,
) -> Result<(), BoxError>
where
WsMessage: WebsocketMessage,
GraphqlClient: crate::graphql::GraphqlClient,
{
let event = decode_message::<Event<GraphqlClient::Response>, WsMessage>(msg?)?;

if event.is_none() {
return Ok(());
}
let event = event.unwrap();

let id = &Uuid::parse_str(event.id())?;
match event {
Event::Next { payload, .. } => {
let mut sink = operations
.lock()
.await
.get(&id)
.ok_or("Received message for unknown subscription")?
.clone();

sink.send(payload).await?
}
Event::Complete { .. } => {
println!("Stream complete");
operations.lock().await.remove(&id);
}
Event::Error { payload, .. } => {
let mut sink = operations
.lock()
.await
.remove(&id)
.ok_or("Received error for unknown subscription")?;

// TODO: Guess I need a way of constructing a GQL response here
// to send it via the sink?
// Could just define our own GraphQLResponse type?
obmarg marked this conversation as resolved.
Show resolved Hide resolved
sink.send(GraphqlClient::error_response(payload)?).await?;
}
}

Ok(())
}

async fn sender_loop<M, S, E, GenericResponse>(
// TODO: Maybe don't use Message or M here - have this func transform
// so the type param doesn't escape this func
obmarg marked this conversation as resolved.
Show resolved Hide resolved
message_stream: mpsc::Receiver<M>,
mut ws_sender: S,
operations: OperationMap<GenericResponse>,
shutdown: oneshot::Receiver<()>,
) where
M: WebsocketMessage,
S: Sink<M, Error = E> + Unpin,
E: std::error::Error,
{
use futures::{future::FutureExt, select};

let mut message_stream = message_stream.fuse();
let mut shutdown = shutdown.fuse();

loop {
select! {
// TODO: Could use select_next_some here?
obmarg marked this conversation as resolved.
Show resolved Hide resolved
msg = message_stream.next() => {
if let Some(msg) = msg {
println!("Sending message: {:?}", msg);
ws_sender.send(msg).await.unwrap();
} else {
// TODO: Do I need to indicate errors in here to the rest of the system?
obmarg marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
_ = shutdown => {
// Shutdown the incoming message stream
let mut message_stream = message_stream.into_inner();
message_stream.close();
while message_stream.next().await.is_some() {}

// Clear out any operations
operations.lock().await.clear();

return;
}
}
}
}

struct ClientInner<GraphqlClient>
where
GraphqlClient: crate::graphql::GraphqlClient,
{
#[allow(dead_code)]
receiver_handle: JoinHandle<()>,
#[allow(dead_code)]
sender_handle: JoinHandle<()>,
operations: OperationMap<GraphqlClient::Response>,
}

fn json_message<M: WebsocketMessage>(payload: impl serde::Serialize) -> Result<M, BoxError> {
Ok(M::new(serde_json::to_string(&payload)?))
}

fn decode_message<T: serde::de::DeserializeOwned, WsMessage: WebsocketMessage>(
message: WsMessage,
) -> Result<Option<T>, BoxError> {
if message.is_ping() || message.is_pong() {
// TODO: logging
Ok(None)
} else if message.is_close() {
todo!()
} else if let Some(s) = message.text() {
println!("Received {}", s);
Ok(Some(serde_json::from_str::<T>(&s)?))
} else {
// TODO: logging
Ok(None)
obmarg marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[cfg(test)]
mod tests {
// TODO: Need to allow the client to just use stream & sink directly.
// That way I can impl tests for it indepdendant of tungsten stuff...

// TODO: tests of shutdown behaviour etc would be good.
// also mocked tests and what not
obmarg marked this conversation as resolved.
Show resolved Hide resolved
}
Loading