Skip to content

Commit

Permalink
refactor(example): added explanation about subscription and scope
Browse files Browse the repository at this point in the history
Added information about how `Subscription` and `SubscriptionSet` instance leaving the scope can
affect subscription loop.

refactor(presence): leave from all channels trigger different event

Made changes which will trigger different event when unsubscribing from all channels and groups.
  • Loading branch information
parfeon committed Jan 25, 2024
1 parent 8e18cb3 commit e21b5b6
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 44 deletions.
32 changes: 20 additions & 12 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,23 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("defined message: {:?}", message),
Ok(message) => println!("(a) defined message: {:?}", message),
Err(_) => {
println!("other message: {:?}", String::from_utf8(message.data))
println!("(a) other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("presence: {:?}", presence)
println!("(a) presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("object: {:?}", object)
println!("(a) object: {:?}", object)
}
Update::MessageAction(action) => {
println!("message action: {:?}", action)
println!("(a) message action: {:?}", action)
}
Update::File(file) => {
println!("file: {:?}", file)
println!("(a) file: {:?}", file)
}
}
}));
Expand All @@ -101,23 +101,23 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("~~~~~> defined message: {:?}", message),
Ok(message) => println!("(b) defined message: {:?}", message),
Err(_) => {
println!("other message: {:?}", String::from_utf8(message.data))
println!("(b) other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("~~~~~> presence: {:?}", presence)
println!("(b) presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("~~~~~> object: {:?}", object)
println!("(b) object: {:?}", object)
}
Update::MessageAction(action) => {
println!("~~~~~> message action: {:?}", action)
println!("(b) message action: {:?}", action)
}
Update::File(file) => {
println!("~~~~~> file: {:?}", file)
println!("(b) file: {:?}", file)
}
}
}));
Expand All @@ -131,15 +131,23 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
// You can also cancel the subscription at any time.
// subscription.unsubscribe();

println!("\nDisconnect from the real-time data stream");
client.disconnect();

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("\nReconnect to the real-time data stream");
client.reconnect(None);

// Let event engine process unsubscribe request
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

// If Subscription or Subscription will go out of scope they will unsubscribe.
// drop(subscription);
// drop(subscription_clone);

println!(
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
// Clean up before complete work with PubNub client instance.
client.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Expand Down
22 changes: 12 additions & 10 deletions src/dx/subscribe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,6 @@ where
manager.unregister_all()
}
}

#[cfg(feature = "presence")]
self.announce_left_all();
}

/// Subscription manager which maintains Subscription EE.
Expand Down Expand Up @@ -443,12 +440,12 @@ where
*slot = Some(SubscriptionManager::new(
self.subscribe_event_engine(),
#[cfg(feature = "presence")]
Arc::new(move |channels, groups| {
Arc::new(move |channels, groups, _all| {
Self::subscribe_heartbeat_call(heartbeat_self.clone(), channels, groups);
}),
#[cfg(feature = "presence")]
Arc::new(move |channels, groups| {
Self::subscribe_leave_call(leave_self.clone(), channels, groups);
Arc::new(move |channels, groups, all| {
Self::subscribe_leave_call(leave_self.clone(), channels, groups, all);
}),
));
}
Expand Down Expand Up @@ -572,11 +569,16 @@ where
client: Self,
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
all: bool,
) {
client.announce_left(
Self::presence_filtered_entries(channels),
Self::presence_filtered_entries(channel_groups),
);
if !all {
client.announce_left(
Self::presence_filtered_entries(channels),
Self::presence_filtered_entries(channel_groups),
);
} else {
client.announce_left_all()
}
}

fn emit_status(client: Self, status: &ConnectionStatus) {
Expand Down
43 changes: 33 additions & 10 deletions src/dx/subscribe/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
core::{
cmp::PartialEq,
fmt::{Debug, Formatter, Result},
ops::{Add, Deref, DerefMut},
ops::{Add, Deref, DerefMut, Drop},
},
},
subscribe::{
Expand Down Expand Up @@ -89,7 +89,10 @@ use crate::{
/// # Ok(())
/// # }
/// ```
pub struct Subscription<T: Send + Sync, D: Send + Sync> {
pub struct Subscription<
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
> {
/// Subscription reference.
pub(super) inner: Arc<SubscriptionRef<T, D>>,
}
Expand Down Expand Up @@ -234,8 +237,8 @@ where

impl<T, D> Deref for Subscription<T, D>
where
T: Send + Sync,
D: Send + Sync,
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
type Target = SubscriptionRef<T, D>;

Expand All @@ -246,8 +249,8 @@ where

impl<T, D> DerefMut for Subscription<T, D>
where
T: Send + Sync,
D: Send + Sync,
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn deref_mut(&mut self) -> &mut Self::Target {
Arc::get_mut(&mut self.inner)
Expand All @@ -257,8 +260,8 @@ where

impl<T, D> Clone for Subscription<T, D>
where
T: Send + Sync,
D: Send + Sync,
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn clone(&self) -> Self {
Self {
Expand All @@ -267,6 +270,26 @@ where
}
}

impl<T, D> Drop for Subscription<T, D>
where
T: Transport + Send + Sync + 'static,
D: Deserializer + Send + Sync + 'static,
{
fn drop(&mut self) {
// Unregistering self to clean up subscriptions list if required.
let Some(client) = self.client().upgrade().clone() else {
return;
};

if let Some(manager) = client.subscription_manager(false).write().as_mut() {
if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.unregister(&handler);
}
}
}
}

impl<T, D> Add for Subscription<T, D>
where
T: Transport + Send + Sync + 'static,
Expand All @@ -284,8 +307,8 @@ where

impl<T, D> PartialEq for Subscription<T, D>
where
T: Send + Sync,
D: Send + Sync,
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
Expand Down
27 changes: 16 additions & 11 deletions src/dx/subscribe/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{

#[cfg(feature = "presence")]
pub(in crate::dx::subscribe) type PresenceCall =
dyn Fn(Option<Vec<String>>, Option<Vec<String>>) + Send + Sync;
dyn Fn(Option<Vec<String>>, Option<Vec<String>>, bool) + Send + Sync;

/// Active subscriptions' manager.
///
Expand Down Expand Up @@ -296,12 +296,17 @@ where

#[cfg(feature = "presence")]
{
(!inputs.is_empty && removed.is_none())
.then(|| self.heartbeat_call.as_ref()(channels.clone(), channel_groups.clone()));
(!inputs.is_empty && removed.is_none()).then(|| {
self.heartbeat_call.as_ref()(channels.clone(), channel_groups.clone(), false)
});

if let Some(removed) = removed {
if !removed.is_empty {
self.leave_call.as_ref()(removed.channels(), removed.channel_groups());
self.leave_call.as_ref()(
removed.channels(),
removed.channel_groups(),
inputs.is_empty,
);
}
}
}
Expand All @@ -318,7 +323,7 @@ where

#[cfg(feature = "presence")]
if !inputs.is_empty {
self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups());
self.heartbeat_call.as_ref()(inputs.channels(), inputs.channel_groups(), false);
}

self.event_engine
Expand Down Expand Up @@ -424,12 +429,12 @@ mod should {
let mut manager = SubscriptionManager::new(
event_engine(),
#[cfg(feature = "presence")]
Arc::new(|channels, _| {
Arc::new(|channels, _, _| {
assert!(channels.is_some());
assert_eq!(channels.unwrap().len(), 1);
}),
#[cfg(feature = "presence")]
Arc::new(|_, _| {}),
Arc::new(|_, _, _| {}),
);
let channel = client.channel("test");
let subscription = channel.subscription(None);
Expand All @@ -447,9 +452,9 @@ mod should {
let mut manager = SubscriptionManager::new(
event_engine(),
#[cfg(feature = "presence")]
Arc::new(|_, _| {}),
Arc::new(|_, _, _| {}),
#[cfg(feature = "presence")]
Arc::new(|channels, _| {
Arc::new(|channels, _, _| {
assert!(channels.is_some());
assert_eq!(channels.unwrap().len(), 1);
}),
Expand All @@ -471,9 +476,9 @@ mod should {
let mut manager = SubscriptionManager::new(
event_engine(),
#[cfg(feature = "presence")]
Arc::new(|_, _| {}),
Arc::new(|_, _, _| {}),
#[cfg(feature = "presence")]
Arc::new(|_, _| {}),
Arc::new(|_, _, _| {}),
);
let cursor: SubscriptionCursor = "15800701771129796".to_string().into();
let channel = client.channel("test");
Expand Down
20 changes: 20 additions & 0 deletions src/dx/subscribe/subscription_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,26 @@ where
}
}

impl<T, D> Drop for SubscriptionSet<T, D>
where
T: Transport + Send + Sync,
D: Deserializer + Send + Sync,
{
fn drop(&mut self) {
// Unregistering self to clean up subscriptions list if required.
let Some(client) = self.client().upgrade().clone() else {
return;
};

if let Some(manager) = client.subscription_manager(false).write().as_mut() {
if let Some((_, handler)) = self.clones.read().iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.unregister(&handler);
}
}
}
}

impl<T, D> Debug for SubscriptionSet<T, D>
where
T: Transport + Send + Sync + 'static,
Expand Down
3 changes: 2 additions & 1 deletion src/dx/subscribe/traits/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
//! which is used by types to provide ability to subscribe for real-time events.
use crate::{
core::{Deserializer, Transport},
lib::alloc::vec::Vec,
subscribe::{Subscription, SubscriptionOptions},
};

/// Trait representing a subscriber.
pub trait Subscriber<T: Send + Sync, D: Send + Sync> {
pub trait Subscriber<T: Transport + Send + Sync, D: Deserializer + Send + Sync> {
/// Creates a new subscription with the specified options.
///
/// # Arguments
Expand Down

0 comments on commit e21b5b6

Please sign in to comment.