Skip to content

Commit

Permalink
Implement gRPC log stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk committed Jan 19, 2025
1 parent 1bd445a commit cc6e789
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 183 deletions.
16 changes: 16 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub enum DataSource {
/// over `rerun://` gRPC interface.
#[cfg(feature = "grpc")]
RerunGrpcUrl { url: String },

/// A stream of messages over gRPC.
#[cfg(feature = "grpc")]
MessageProxy { url: String },
}

impl DataSource {
Expand Down Expand Up @@ -95,6 +99,11 @@ impl DataSource {
return Self::RerunGrpcUrl { url: uri };
}

#[cfg(feature = "grpc")]
if uri.starts_with("temp://") {
return Self::MessageProxy { url: uri };
}

if uri.starts_with("file://") || path.exists() {
Self::FilePath(file_source, path)
} else if uri.starts_with("http://")
Expand Down Expand Up @@ -138,6 +147,8 @@ impl DataSource {
Self::Stdin => None,
#[cfg(feature = "grpc")]
Self::RerunGrpcUrl { .. } => None, // TODO(jleibs): This needs to come from the server.
#[cfg(feature = "grpc")]
Self::MessageProxy { .. } => None,
}
}

Expand Down Expand Up @@ -249,6 +260,11 @@ impl DataSource {
Self::RerunGrpcUrl { url } => {
re_grpc_client::stream_from_redap(url, on_msg).map_err(|err| err.into())
}

#[cfg(feature = "grpc")]
Self::MessageProxy { url } => {
re_grpc_client::message_proxy::stream(url, on_msg).map_err(|err| err.into())
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ re_arrow_util.workspace = true
re_chunk.workspace = true
re_error.workspace = true
re_log.workspace = true
re_log_encoding.workspace = true
re_log_encoding = { workspace = true, features = ["encoder", "decoder"] }
re_log_types.workspace = true
re_protos.workspace = true
re_smart_channel.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub enum StreamError {
#[error(transparent)]
ChunkError(#[from] re_chunk::ChunkError),

#[error(transparent)]
DecodeError(#[from] re_log_encoding::decoder::DecodeError),

#[error("Invalid URI: {0}")]
InvalidUri(String),
}
Expand Down Expand Up @@ -176,8 +179,7 @@ async fn stream_recording_async(
#[cfg(target_arch = "wasm32")]
let tonic_client = tonic_web_wasm_client::Client::new_with_options(
redap_endpoint.to_string(),
tonic_web_wasm_client::options::FetchOptions::new()
.mode(tonic_web_wasm_client::options::Mode::Cors), // I'm not 100% sure this is needed, but it felt right.
tonic_web_wasm_client::options::FetchOptions::new(),
);

#[cfg(not(target_arch = "wasm32"))]
Expand Down
176 changes: 4 additions & 172 deletions crates/store/re_grpc_client/src/message_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,173 +1,5 @@
use std::thread;
use std::thread::JoinHandle;
pub mod read;
pub mod write;

use re_log_encoding::Compression;
use re_log_types::LogMsg;
use re_protos::sdk_comms::v0::message_proxy_client::MessageProxyClient;
use tokio::runtime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tonic::transport::Endpoint;

enum Cmd {
LogMsg(LogMsg),
Flush(oneshot::Sender<()>),
}

#[derive(Clone)]
pub struct Options {
compression: Compression,
}

impl Default for Options {
fn default() -> Self {
Self {
compression: Compression::LZ4,
}
}
}

pub struct Client {
thread: Option<JoinHandle<()>>,
cmd_tx: UnboundedSender<Cmd>,
shutdown_tx: Sender<()>,
}

impl Client {
#[expect(clippy::needless_pass_by_value)]
pub fn new(addr: impl Into<String>, options: Options) -> Self {
let addr: String = addr.into();
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);

let thread = thread::Builder::new()
.name("message_proxy_client".to_owned())
.spawn(move || {
let mut runtime = runtime::Builder::new_current_thread();
runtime.enable_all();
runtime
.build()
.expect("Failed to build tokio runtime")
.block_on(message_proxy_client(
addr,
cmd_rx,
shutdown_rx,
options.compression,
));
})
.expect("Failed to spawn message proxy client thread");

Self {
thread: Some(thread),
cmd_tx,
shutdown_tx,
}
}

pub fn send(&self, msg: LogMsg) {
self.cmd_tx.send(Cmd::LogMsg(msg)).ok();
}

pub fn flush(&self) {
let (tx, rx) = oneshot::channel();
if self.cmd_tx.send(Cmd::Flush(tx)).is_err() {
re_log::debug!("Flush failed: already shut down.");
return;
};

match rx.blocking_recv() {
Ok(_) => {
re_log::debug!("Flush complete");
}
Err(_) => {
re_log::debug!("Flush failed, not all messages were sent");
}
}
}
}

impl Drop for Client {
fn drop(&mut self) {
re_log::debug!("Shutting down message proxy client");
// Wait for flush
self.flush();
// Quit immediately after that - no messages are left in the channel
self.shutdown_tx.try_send(()).ok();
// Wait for the shutdown
self.thread.take().map(|t| t.join().ok());
re_log::debug!("Message proxy client has shut down");
}
}

async fn message_proxy_client(
addr: String,
mut cmd_rx: UnboundedReceiver<Cmd>,
mut shutdown_rx: Receiver<()>,
compression: Compression,
) {
let endpoint = match Endpoint::from_shared(addr) {
Ok(endpoint) => endpoint,
Err(err) => {
re_log::error!("Failed to connect to message proxy server: {err}");
return;
}
};
let channel = match endpoint.connect().await {
Ok(channel) => channel,
Err(err) => {
re_log::error!("Failed to connect to message proxy server: {err}");
return;
}
};
let mut client = MessageProxyClient::new(channel);

let stream = async_stream::stream! {
loop {
tokio::select! {
cmd = cmd_rx.recv() => {
match cmd {
Some(Cmd::LogMsg(msg)) => {
let msg = match re_log_encoding::protobuf_conversions::log_msg_to_proto(msg, compression) {
Ok(msg) => msg,
Err(err) => {
re_log::error!("Failed to encode message: {err}");
break;
}
};

yield msg;
}

Some(Cmd::Flush(tx)) => {
// Messages are received in order, so once we receive a `flush`
// we know we've sent all messages before that flush through already.
re_log::debug!("Flush requested");
if tx.send(()).is_err() {
re_log::debug!("Failed to respond to flush: channel is closed");
return;
};
}

None => {
re_log::debug!("Channel closed");
break;
}
}
}

_ = shutdown_rx.recv() => {
re_log::debug!("Shutting down without flush");
return;
}
}
}
};

if let Err(err) = client.write_messages(stream).await {
re_log::error!("Write messages call failed: {err}");
};
}
pub use read::stream;
pub use write::Client;
140 changes: 140 additions & 0 deletions crates/store/re_grpc_client/src/message_proxy/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::fmt::Display;

use re_log_encoding::protobuf_conversions::log_msg_from_proto;
use re_log_types::LogMsg;
use re_protos::sdk_comms::v0::message_proxy_client::MessageProxyClient;
use re_protos::sdk_comms::v0::Empty;
use tokio_stream::StreamExt;
use url::Url;

use crate::StreamError;
use crate::TonicStatusError;

pub fn stream(
url: String,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<re_smart_channel::Receiver<LogMsg>, InvalidMessageProxyAddress> {
re_log::debug!("Loading {url} via gRPC…");

let parsed_url = MessageProxyAddress::parse(&url)?;

let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::MessageProxy { url: url.clone() },
re_smart_channel::SmartChannelSource::MessageProxy { url: url.clone() },
);

crate::spawn_future(async move {
if let Err(err) = stream_async(parsed_url, tx, on_msg).await {
re_log::error!(
"Error while streaming from {url}: {}",
re_error::format_ref(&err)
);
}
});

Ok(rx)
}

struct MessageProxyAddress(String);

impl MessageProxyAddress {
fn parse(url: &str) -> Result<Self, InvalidMessageProxyAddress> {
let mut parsed = Url::parse(url).map_err(|err| InvalidMessageProxyAddress {
url: url.to_owned(),
msg: err.to_string(),
})?;

if !parsed.scheme().starts_with("temp") {
return Err(InvalidMessageProxyAddress {
url: url.to_owned(),
msg: format!(
"Invalid scheme {:?}, expected {:?}",
parsed.scheme(),
"temp"
),
});
}

parsed.set_scheme("http").ok();

Ok(Self(parsed.to_string()))
}

fn to_http(&self) -> String {
self.0.clone()
}
}

impl Display for MessageProxyAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}

#[derive(Debug, thiserror::Error)]
#[error("invalid message proxy address {url:?}: {msg}")]
pub struct InvalidMessageProxyAddress {
pub url: String,
pub msg: String,
}

async fn stream_async(
url: MessageProxyAddress,
tx: re_smart_channel::Sender<LogMsg>,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<(), StreamError> {
let mut client = {
let url = url.to_http();

#[cfg(target_arch = "wasm32")]
let tonic_client = {
tonic_web_wasm_client::Client::new_with_options(
url,
tonic_web_wasm_client::options::FetchOptions::new(),
)
};

#[cfg(not(target_arch = "wasm32"))]
let tonic_client = { tonic::transport::Endpoint::new(url)?.connect().await? };

// TODO(#8411): figure out the right size for this
MessageProxyClient::new(tonic_client).max_decoding_message_size(usize::MAX)
};

re_log::debug!("Streaming messages from gRPC endpoint {url}");

let stream = client
.read_messages(Empty {})
.await
.map_err(TonicStatusError)?
.into_inner();
tokio::pin!(stream);

loop {
match stream.try_next().await {
Ok(Some(msg)) => {
let msg = log_msg_from_proto(msg)?;
if tx.send(msg).is_err() {
re_log::debug!("gRPC stream smart channel closed");
break;
}
if let Some(on_msg) = &on_msg {
on_msg();
}
}

// Stream closed
Ok(None) => {
re_log::debug!("gRPC stream disconnected");
break;
}

Err(_) => {
re_log::debug!("gRPC stream timed out");
break;
}
}
}

Ok(())
}
Loading

0 comments on commit cc6e789

Please sign in to comment.