Skip to content

Commit

Permalink
Use shared create_tonic_channel_from_advertised_address utility
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 6, 2025
1 parent 446bd56 commit 60dbc4e
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 170 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/cli-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ comfy-table = { workspace = true }
crossterm = { workspace = true }
dialoguer = { workspace = true }
dotenvy = { version = "0.15" }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tracing = { workspace = true }
tracing-log = { version = "0.2.0" }
tracing-subscriber = { workspace = true }
unicode-width = { version = "0.1.11" }

restate-core = { workspace = true }
2 changes: 1 addition & 1 deletion crates/cli-util/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static GLOBAL_CLI_CONTEXT: OnceLock<ArcSwap<CliContext>> = OnceLock::new();
pub struct CliContext {
confirm_mode: ConfirmMode,
ui: UiOpts,
network: NetworkOpts,
pub network: NetworkOpts,
colors_enabled: bool,
loaded_dotenv: Option<PathBuf>,
}
Expand Down
24 changes: 23 additions & 1 deletion crates/cli-util/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::time::Duration;

use clap::{Args, ValueEnum};
use clap_verbosity_flag::LogLevel;
use cling::Collect;

use restate_core::network::net_util::CommonClientConnectionOptions;

const DEFAULT_CONNECT_TIMEOUT: u64 = 5_000;
const DEFAULT_REQUEST_TIMEOUT: u64 = 13_000;

Expand Down Expand Up @@ -63,7 +67,7 @@ pub(crate) struct ConfirmMode {
}

#[derive(Args, Clone, Default)]
pub(crate) struct NetworkOpts {
pub struct NetworkOpts {
/// Connection timeout for network calls, in milliseconds.
#[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT, global = true)]
pub connect_timeout: u64,
Expand All @@ -72,6 +76,24 @@ pub(crate) struct NetworkOpts {
pub request_timeout: u64,
}

impl CommonClientConnectionOptions for NetworkOpts {
fn connect_timeout(&self) -> Duration {
Duration::from_millis(self.connect_timeout)
}

fn keep_alive_interval(&self) -> Duration {
Duration::from_secs(60)
}

fn keep_alive_timeout(&self) -> Duration {
Duration::from_millis(self.request_timeout)
}

fn http2_adaptive_window(&self) -> bool {
true
}
}

#[derive(Args, Collect, Clone, Default)]
pub struct CommonOpts {
#[clap(flatten)]
Expand Down
66 changes: 37 additions & 29 deletions server/tests/trim_gap_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,25 @@ use std::time::Duration;
use enumset::enum_set;
use futures_util::StreamExt;
use googletest::fail;
use hyper_util::rt::TokioIo;
use tempfile::TempDir;
use tokio::io;
use tokio::net::UnixStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint, Uri};
use tower::service_fn;
use tonic::transport::Channel;
use tracing::{error, info};
use url::Url;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest,
};
use restate_core::network::net_util::{
create_tonic_channel_from_advertised_address, CommonClientConnectionOptions,
};
use restate_local_cluster_runner::{
cluster::Cluster,
node::{BinarySource, Node},
};
use restate_types::config::{LogFormat, MetadataStoreClient};
use restate_types::logs::metadata::ProviderKind::Replicated;
use restate_types::net::AdvertisedAddress;
use restate_types::protobuf::cluster::node_state::State;
use restate_types::protobuf::cluster::RunMode;
use restate_types::retries::RetryPolicy;
Expand Down Expand Up @@ -84,9 +82,11 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?;
tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?;

let mut client =
ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?)
.accept_compressed(CompressionEncoding::Gzip);
let mut client = ClusterCtrlSvcClient::new(create_tonic_channel_from_advertised_address(
cluster.nodes[0].node_address().clone(),
&TestNetworkOptions::default(),
))
.accept_compressed(CompressionEncoding::Gzip);

any_partition_active(&mut client, Duration::from_secs(5)).await?;

Expand Down Expand Up @@ -329,26 +329,34 @@ async fn applied_lsn_converged(
Ok(())
}

async fn grpc_connect(address: AdvertisedAddress) -> Result<Channel, tonic::transport::Error> {
match address {
AdvertisedAddress::Uds(uds_path) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Endpoint::try_from("http://127.0.0.1")
.expect("/ should be a valid Uri")
.connect_with_connector(service_fn(move |_: Uri| {
let uds_path = uds_path.clone();
async move {
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?))
}
})).await
}
AdvertisedAddress::Http(uri) => {
Channel::builder(uri)
.connect_timeout(Duration::from_secs(2))
.timeout(Duration::from_secs(2))
.http2_adaptive_window(true)
.connect()
.await
struct TestNetworkOptions {
connect_timeout: u64,
request_timeout: u64,
}

impl Default for TestNetworkOptions {
fn default() -> Self {
Self {
connect_timeout: 1000,
request_timeout: 5000,
}
}
}

impl CommonClientConnectionOptions for TestNetworkOptions {
fn connect_timeout(&self) -> Duration {
Duration::from_millis(self.connect_timeout)
}

fn keep_alive_interval(&self) -> Duration {
Duration::from_secs(60)
}

fn keep_alive_timeout(&self) -> Duration {
Duration::from_millis(self.request_timeout)
}

fn http2_adaptive_window(&self) -> bool {
true
}
}
10 changes: 1 addition & 9 deletions tools/restatectl/src/commands/cluster/config/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::Context;
use clap::Parser;
use cling::{Collect, Run};
use tonic::{codec::CompressionEncoding, Code};
Expand All @@ -27,14 +26,7 @@ use crate::{
pub struct ConfigGetOpts {}

async fn config_get(connection: &ConnectionInfo, _get_opts: &ConfigGetOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down
9 changes: 1 addition & 8 deletions tools/restatectl/src/commands/cluster/config/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,7 @@ pub struct ConfigSetOpts {
}

async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down
9 changes: 1 addition & 8 deletions tools/restatectl/src/commands/log/describe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,7 @@ async fn describe_logs(
connection: &ConnectionInfo,
opts: &DescribeLogIdOpts,
) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());

let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);
Expand Down
10 changes: 1 addition & 9 deletions tools/restatectl/src/commands/log/find_tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::Context;
use cling::prelude::*;
use restate_cli_util::_comfy_table::{Cell, Color, Table};
use restate_cli_util::ui::console::StyledTable;
Expand All @@ -32,14 +31,7 @@ pub struct FindTailOpts {
}

async fn find_tail(connection: &ConnectionInfo, opts: &FindTailOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down
10 changes: 1 addition & 9 deletions tools/restatectl/src/commands/log/list_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use std::collections::BTreeMap;

use anyhow::Context;
use cling::prelude::*;
use tonic::codec::CompressionEncoding;

Expand All @@ -34,14 +33,7 @@ use crate::util::grpc_connect;
pub struct ListLogsOpts {}

pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down
9 changes: 1 addition & 8 deletions tools/restatectl/src/commands/log/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,7 @@ pub struct ReconfigureOpts {
}

async fn reconfigure(connection: &ConnectionInfo, opts: &ReconfigureOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());

let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);
Expand Down
9 changes: 1 addition & 8 deletions tools/restatectl/src/commands/log/trim_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,7 @@ pub struct TrimLogOpts {
}

async fn trim_log(connection: &ConnectionInfo, opts: &TrimLogOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down
14 changes: 3 additions & 11 deletions tools/restatectl/src/commands/node/list_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use std::collections::{BTreeMap, HashMap};
use std::time::Duration;

use anyhow::Context;
use chrono::TimeDelta;
use cling::prelude::*;
use itertools::Itertools;
Expand Down Expand Up @@ -47,14 +46,7 @@ pub struct ListNodesOpts {
}

pub async fn list_nodes(connection: &ConnectionInfo, opts: &ListNodesOpts) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down Expand Up @@ -154,9 +146,9 @@ async fn fetch_extra_info(
for (node_id, node_config) in nodes_configuration.iter() {
let address = node_config.address.clone();
let get_ident = async move {
let node_channel = grpc_connect(address).await?;
let channel = grpc_connect(address);
let mut node_ctl_svc_client =
NodeCtlSvcClient::new(node_channel).accept_compressed(CompressionEncoding::Gzip);
NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Ok(node_ctl_svc_client
.get_ident(())
Expand Down
10 changes: 1 addition & 9 deletions tools/restatectl/src/commands/partition/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use std::cmp::PartialOrd;
use std::collections::{BTreeMap, HashMap};

use anyhow::Context;
use cling::prelude::*;
use itertools::Itertools;
use tonic::codec::CompressionEncoding;
Expand Down Expand Up @@ -72,14 +71,7 @@ pub async fn list_partitions(
connection: &ConnectionInfo,
opts: &ListPartitionsOpts,
) -> anyhow::Result<()> {
let channel = grpc_connect(connection.cluster_controller.clone())
.await
.with_context(|| {
format!(
"cannot connect to cluster controller at {}",
connection.cluster_controller
)
})?;
let channel = grpc_connect(connection.cluster_controller.clone());
let mut client =
ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip);

Expand Down
Loading

0 comments on commit 60dbc4e

Please sign in to comment.