diff --git a/src/subscriber.rs b/src/subscriber.rs index 49d6aae23..e34ca9f5d 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -107,9 +107,9 @@ type Senders = Map, SyncSender>>)>; /// ``` /// Aynchronous, non-blocking subscriber: /// -/// `Subscription` implements `Future>`. +/// `Subscription` provides a `next` method which returns an `impl Future>`. /// -/// `while let Some(event) = (&mut subscriber).await { /* use it */ }` +/// `while let Some(event) = subscriber.next_event().await { /* use it */ }` pub struct Subscriber { id: usize, rx: Receiver>>, @@ -125,6 +125,44 @@ impl Drop for Subscriber { } impl Subscriber { + /// Creates a future that resolves to the next value of the + /// subscriber, or None if the backing `Db` shuts down + pub fn next_event(&mut self) -> impl Future> + '_ { + Next { subscriber: self } + } + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + match self.rx.try_recv() { + Ok(future_rx) => future_rx, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Poll::Ready(None) + } + } + }; + + match Future::poll(Pin::new(&mut future_rx), cx) { + Poll::Ready(Some(event)) => return Poll::Ready(event), + Poll::Ready(None) => continue, + Poll::Pending => { + self.existing = Some(future_rx); + return Poll::Pending; + } + } + } + let mut home = self.home.write(); + let entry = home.get_mut(&self.id).unwrap(); + entry.0 = Some(cx.waker().clone()); + Poll::Pending + } + /// Attempts to wait for a value on this `Subscriber`, returning /// an error if no event arrives within the provided `Duration` /// or if the backing `Db` shuts down. @@ -165,42 +203,6 @@ impl Subscriber { } } -impl Future for Subscriber { - type Output = Option; - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll { - loop { - let mut future_rx = if let Some(future_rx) = self.existing.take() { - future_rx - } else { - match self.rx.try_recv() { - Ok(future_rx) => future_rx, - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - return Poll::Ready(None) - } - } - }; - - match Future::poll(Pin::new(&mut future_rx), cx) { - Poll::Ready(Some(event)) => return Poll::Ready(event), - Poll::Ready(None) => continue, - Poll::Pending => { - self.existing = Some(future_rx); - return Poll::Pending; - } - } - } - let mut home = self.home.write(); - let entry = home.get_mut(&self.id).unwrap(); - entry.0 = Some(cx.waker().clone()); - Poll::Pending - } -} - impl Iterator for Subscriber { type Item = Event; @@ -216,6 +218,23 @@ impl Iterator for Subscriber { } } +#[doc(hidden)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +struct Next<'a> { + subscriber: &'a mut Subscriber, +} + +impl<'a> Future for Next<'a> { + type Output = Option; + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + Pin::new(&mut *self.subscriber).poll_next(cx) + } +} + #[derive(Debug, Default)] pub(crate) struct Subscribers { watched: RwLock, Arc>>>, @@ -239,10 +258,7 @@ impl Drop for Subscribers { } impl Subscribers { - pub(crate) fn register( - &self, - prefix: &[u8] - ) -> Subscriber { + pub(crate) fn register(&self, prefix: &[u8]) -> Subscriber { self.ever_used.store(true, Relaxed); let r_mu = { let r_mu = self.watched.read(); diff --git a/src/tree.rs b/src/tree.rs index f472e9ef7..02fda1f36 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -891,7 +891,7 @@ impl Tree { /// # let config = sled::Config::new().temporary(true); /// # let db = config.open().unwrap(); /// # let mut subscriber = db.watch_prefix(vec![]); - /// while let Some(event) = (&mut subscriber).await { + /// while let Some(event) = subscriber.next_event().await { /// /* use it */ /// } /// # }