-
-
Notifications
You must be signed in to change notification settings - Fork 389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delegate Future implementation to Next struct #1372
base: main
Are you sure you want to change the base?
Conversation
Previously, reading values out of Subscriber asynchronously would involve awaiting an &mut Subscriber, which causes re-polling the same future after it yields a value. This is considered 'improper' for a Future, and more closely maps to the Stream trait. Unfortunately, Stream has not been standardized and so implementing that trait would require pulling in the 'futures' dependency, which is not yet 1.0 and subject to change. Instead, this implements a Stream-like poll_next method on Subscriber, and exposes that from an implementation of Future for a new Next<'_> type, which can be constructed by calling '.next()' on a Subscriber. This allows for a more traditional interaction with futures that mimics how other libraries produce values. Note that it is still possible to indefinitely poll on an &mut Next<'_> type to pull every value out of the subscriber, but it is less intuitive to perform that operation and it likely won't occur in downstream projects.
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Option<Event>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API matches the current definition of the Stream
trait in Futures 0.3, so if that ends up stabilizing as-is then a Stream implementation will be "free"
Otherwise some changes will need to be made later down the line, but it's a private method anyway
src/subscriber.rs
Outdated
@@ -125,6 +125,44 @@ impl Drop for Subscriber { | |||
} | |||
|
|||
impl Subscriber { | |||
/// Creates a future that resolves to the next value of the | |||
/// subscriber, or None if the backing `Db` shuts down | |||
pub fn next(&mut self) -> impl Future<Output = Option<Event>> + '_ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this is returning a concrete Next<'_> type in the implementation, we don't need to leak the new type in the public API in the event it needs to change later for some reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realizing now that next
here conflicts with Iterator's next
Maybe naming this something else would be better
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Self::Output> { | ||
Pin::new(&mut *self.subscriber).poll_next(cx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.subscriber
is an &mut Subscriber
but it can't move, so &mut *self.subscriber
to make it move-able.
&self, | ||
prefix: &[u8] | ||
) -> Subscriber { | ||
pub(crate) fn register(&self, prefix: &[u8]) -> Subscriber { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unintentional formatting change. I can put this back if you want
Inspired by #1368
I don't know if this is something you care to merge or not, but I've done the work to implement a more "standard" way to interact with Subscriber asynchronously
Previously, reading values out of Subscriber asynchronously would involve awaiting an
&mut Subscriber
, which causes re-polling the same future after it yields a value. This is considered 'improper' for a Future, and more closely maps to the Stream trait. Unfortunately,Stream
has not been standardized and so implementing that trait would require pulling in thefutures-core
dependency, which is not yet 1.0 and subject to change. Instead, this implements a Stream-likepoll_next
method onSubscriber
, and exposes that from an implementation of Future for a newNext<'_>
type, which can be constructed by calling.next()
on a Subscriber. This allows for a more traditional interaction with futures that mimics how other libraries produce values.Note that it is still possible to indefinitely poll on an
&mut Next<'_>
type to pull every value out of the subscriber, but it is less intuitive to perform that operation and it likely won't occur in downstream projects.