Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental routing code refactoring (1) #2

Open
wants to merge 13 commits into
base: routing-refactor
Choose a base branch
from
2 changes: 1 addition & 1 deletion commons/zenoh-protocol-core/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl From<&'static str> for Encoding {
}
}

impl<'a> From<String> for Encoding {
impl From<String> for Encoding {
fn from(s: String) -> Self {
for (i, v) in consts::MIMES.iter().enumerate() {
if i != 0 && s.starts_with(v) {
Expand Down
39 changes: 21 additions & 18 deletions commons/zenoh-protocol-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub type NonZeroZInt = NonZeroU64;
pub const ZINT_MAX_BYTES: usize = 10;

// WhatAmI values
pub type WhatAmI = whatami::WhatAmI;
pub use whatami::WhatAmI;

/// Constants and helpers for zenoh `whatami` flags.
pub mod whatami;
Expand All @@ -54,14 +54,14 @@ pub use locators::Locator;
pub mod endpoints;
pub use endpoints::EndPoint;

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Property {
pub key: ZInt,
pub value: Vec<u8>,
}

/// The global unique id of a zenoh peer.
#[derive(Clone, Copy, Eq)]
#[derive(Clone, Copy)]
pub struct PeerId {
size: usize,
id: [u8; PeerId::MAX_SIZE],
Expand Down Expand Up @@ -119,10 +119,12 @@ impl FromStr for PeerId {
impl PartialEq for PeerId {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.size == other.size && self.as_slice() == other.as_slice()
self.as_slice() == other.as_slice()
}
}

impl Eq for PeerId {}

impl Hash for PeerId {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_slice().hash(state);
Expand All @@ -148,7 +150,7 @@ impl From<&PeerId> for uhlc::ID {
}
}

#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum Priority {
Control = 0,
Expand All @@ -170,6 +172,7 @@ impl Default for Priority {
Priority::Data
}
}

impl TryFrom<u8> for Priority {
type Error = zenoh_core::Error;

Expand All @@ -191,7 +194,7 @@ impl TryFrom<u8> for Priority {
}
}

#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum Reliability {
BestEffort,
Expand All @@ -204,13 +207,13 @@ impl Default for Reliability {
}
}

#[derive(Debug, Copy, Clone, PartialEq, Default)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
pub struct Channel {
pub priority: Priority,
pub reliability: Reliability,
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ConduitSnList {
Plain(ConduitSn),
QoS(Box<[ConduitSn; Priority::NUM]>),
Expand Down Expand Up @@ -248,14 +251,14 @@ impl fmt::Display for ConduitSnList {
}

/// The kind of reliability.
#[derive(Debug, Copy, Clone, PartialEq, Default)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
pub struct ConduitSn {
pub reliable: ZInt,
pub best_effort: ZInt,
}

/// The kind of congestion control.
#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum CongestionControl {
Block,
Expand All @@ -269,7 +272,7 @@ impl Default for CongestionControl {
}

/// The subscription mode.
#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum SubMode {
Push,
Expand All @@ -284,21 +287,21 @@ impl Default for SubMode {
}

/// A time period.
#[derive(Debug, Copy, Clone, PartialEq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct Period {
pub origin: ZInt,
pub period: ZInt,
pub duration: ZInt,
}

#[derive(Debug, Clone, PartialEq, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct SubInfo {
pub reliability: Reliability,
pub mode: SubMode,
pub period: Option<Period>,
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueryableInfo {
pub complete: ZInt,
pub distance: ZInt,
Expand All @@ -320,7 +323,7 @@ pub mod queryable {
}

/// The kind of consolidation.
#[derive(Debug, Clone, PartialEq, Copy)]
#[derive(Debug, Clone, PartialEq, Copy, Eq, Hash)]
#[repr(u8)]
pub enum ConsolidationMode {
None,
Expand All @@ -330,7 +333,7 @@ pub enum ConsolidationMode {

/// The kind of consolidation that should be applied on replies to a [`get`](zenoh::Session::get)
/// at different stages of the reply process.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConsolidationStrategy {
pub first_routers: ConsolidationMode,
pub last_router: ConsolidationMode,
Expand Down Expand Up @@ -415,7 +418,7 @@ impl Default for ConsolidationStrategy {
}

/// The [`Queryable`](zenoh::queryable::Queryable)s that should be target of a [`get`](zenoh::Session::get).
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Target {
BestMatching,
All,
Expand All @@ -432,7 +435,7 @@ impl Default for Target {
}

/// The [`Queryable`](zenoh::queryable::Queryable)s that should be target of a [`get`](zenoh::Session::get).
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueryTarget {
pub kind: ZInt,
pub target: Target,
Expand Down
128 changes: 128 additions & 0 deletions io/zenoh-transport/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::protocol::io::ZBuf;
use super::protocol::proto::{DataInfo, RoutingContext};
pub use demux::*;
pub use mux::*;
use std::ops::Deref;

pub trait Primitives: Send + Sync {
fn decl_resource(&self, expr_id: ZInt, key_expr: &KeyExpr);
Expand Down Expand Up @@ -96,6 +97,133 @@ pub trait Primitives: Send + Sync {
fn send_close(&self);
}

impl<P> Primitives for P
where
P: Deref + Send + Sync,
P::Target: Primitives + Send + Sync,
{
fn decl_resource(&self, expr_id: ZInt, key_expr: &KeyExpr) {
self.deref().decl_resource(expr_id, key_expr)
}

fn forget_resource(&self, expr_id: ZInt) {
self.deref().forget_resource(expr_id)
}

fn decl_publisher(&self, key_expr: &KeyExpr, routing_context: Option<RoutingContext>) {
self.deref().decl_publisher(key_expr, routing_context)
}

fn forget_publisher(&self, key_expr: &KeyExpr, routing_context: Option<RoutingContext>) {
self.deref().forget_publisher(key_expr, routing_context)
}

fn decl_subscriber(
&self,
key_expr: &KeyExpr,
sub_info: &SubInfo,
routing_context: Option<RoutingContext>,
) {
self.deref()
.decl_subscriber(key_expr, sub_info, routing_context)
}

fn forget_subscriber(&self, key_expr: &KeyExpr, routing_context: Option<RoutingContext>) {
self.deref().forget_subscriber(key_expr, routing_context)
}

fn decl_queryable(
&self,
key_expr: &KeyExpr,
kind: ZInt,
qabl_info: &QueryableInfo,
routing_context: Option<RoutingContext>,
) {
self.deref()
.decl_queryable(key_expr, kind, qabl_info, routing_context)
}

fn forget_queryable(
&self,
key_expr: &KeyExpr,
kind: ZInt,
routing_context: Option<RoutingContext>,
) {
self.deref()
.forget_queryable(key_expr, kind, routing_context)
}

fn send_data(
&self,
key_expr: &KeyExpr,
payload: ZBuf,
channel: Channel,
cogestion_control: CongestionControl,
data_info: Option<DataInfo>,
routing_context: Option<RoutingContext>,
) {
self.deref().send_data(
key_expr,
payload,
channel,
cogestion_control,
data_info,
routing_context,
)
}

fn send_query(
&self,
key_expr: &KeyExpr,
value_selector: &str,
qid: ZInt,
target: QueryTarget,
consolidation: ConsolidationStrategy,
routing_context: Option<RoutingContext>,
) {
self.deref().send_query(
key_expr,
value_selector,
qid,
target,
consolidation,
routing_context,
)
}

fn send_reply_data(
&self,
qid: ZInt,
replier_kind: ZInt,
replier_id: PeerId,
key_expr: KeyExpr,
info: Option<DataInfo>,
payload: ZBuf,
) {
self.deref()
.send_reply_data(qid, replier_kind, replier_id, key_expr, info, payload)
}

fn send_reply_final(&self, qid: ZInt) {
self.deref().send_reply_final(qid)
}

fn send_pull(
&self,
is_final: bool,
key_expr: &KeyExpr,
pull_id: ZInt,
max_samples: &Option<ZInt>,
) {
self.deref()
.send_pull(is_final, key_expr, pull_id, max_samples)
}

fn send_close(&self) {
self.deref().send_close()
}
}

#[derive(Default)]
pub struct DummyPrimitives;

Expand Down
26 changes: 21 additions & 5 deletions zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// Contributors:
// ADLINK zenoh team, <[email protected]>
//
use super::network::LinkId;
use super::router::*;
use async_std::sync::Arc;
use std::collections::{HashMap, HashSet};
Expand All @@ -24,12 +25,28 @@ use zenoh_protocol_core::{
};
use zenoh_transport::Primitives;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(transparent)]
pub struct FaceId(usize);

impl FaceId {
pub fn new(id: usize) -> Self {
Self(id)
}
}

impl fmt::Display for FaceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

pub struct FaceState {
pub(super) id: usize,
pub(super) id: FaceId,
pub(super) pid: PeerId,
pub(super) whatami: WhatAmI,
pub(super) primitives: Arc<dyn Primitives + Send + Sync>,
pub(super) link_id: usize,
pub(super) link_id: LinkId,
pub(super) local_mappings: HashMap<ZInt, ResourceTreeIndex>,
pub(super) remote_mappings: HashMap<ZInt, ResourceTreeIndex>,
pub(super) local_subs: HashSet<ResourceTreeIndex>,
Expand All @@ -42,11 +59,11 @@ pub struct FaceState {

impl FaceState {
pub(super) fn new(
id: usize,
id: FaceId,
pid: PeerId,
whatami: WhatAmI,
primitives: Arc<dyn Primitives + Send + Sync>,
link_id: usize,
link_id: LinkId,
) -> Arc<FaceState> {
Arc::new(FaceState {
id,
Expand Down Expand Up @@ -157,7 +174,6 @@ impl fmt::Display for FaceState {
}
}

#[derive(Clone)]
pub struct Face {
pub(crate) tables: Arc<RwLock<Tables>>,
pub(crate) state: Arc<FaceState>,
Expand Down
Loading