From 4cd3ae73cab489bbf72703473010920799649d03 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 20 Oct 2021 00:37:40 +0800 Subject: [PATCH 1/4] poll_read_at_aligned Signed-off-by: Ruihang Xia --- glommio/src/io/dma_file.rs | 44 ++++++++++++++++++++++++++++++++++++++ glommio/src/sys/source.rs | 11 +++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index 19c5bac90..a6802ace3 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -17,10 +17,13 @@ use futures_lite::{Stream, StreamExt}; use nix::sys::statfs::*; use std::{ cell::Ref, + future::Future, io, os::unix::io::{AsRawFd, RawFd}, path::Path, + pin::Pin, rc::Rc, + task::{Context, Poll}, }; pub(super) type Result = crate::Result; @@ -283,6 +286,21 @@ impl DmaFile { )) } + /// Poll-based version of [`Self::read_at_aligned`] + pub fn poll_read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAligned<'_> { + let source = self.file.reactor.upgrade().unwrap().read_dma( + self.as_raw_fd(), + pos, + size, + self.pollable, + self.file.scheduler.borrow().as_ref(), + ); + PollDmaReadAligned { + source: Some(source), + file: &self.file, + } + } + /// Submit many reads and process the results in a stream-like fashion via a /// [`ReadManyResult`]. /// @@ -424,6 +442,32 @@ impl DmaFile { } } +#[doc(hidden)] +#[derive(Debug)] +#[must_use = "future has no effect unless you .await or poll it"] +pub struct PollDmaReadAligned<'a> { + source: Option, + file: &'a GlommioFile, +} + +impl Future for PollDmaReadAligned<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let read_size = self + .source + .as_ref() + .expect("Polling a finished task") + .poll_collect_rw(cx) + .map(|read_size| enhanced_try!(read_size, "Reading", self.file))?; + + read_size.map(|size| { + let source = self.get_mut().source.take().unwrap(); + Ok(ReadResult::from_sliced_buffer(source, 0, size)) + }) + } +} + #[cfg(test)] pub(crate) mod test { use super::*; diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index 47cc3db38..946f16b29 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -32,7 +32,7 @@ use std::{ os::unix::io::RawFd, path::PathBuf, rc::Rc, - task::{Poll, Waker}, + task::{Context, Poll, Waker}, time::Duration, }; @@ -267,6 +267,15 @@ impl Source { }) .await } + + pub(crate) fn poll_collect_rw(&self, cx: &mut Context<'_>) -> Poll> { + if let Some(result) = self.result() { + return Poll::Ready(result); + } + + self.add_waiter_many(cx.waker().clone()); + Poll::Pending + } } impl Drop for Source { From ce174786241a4e0c962ceb01377bc0223b269f69 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 20 Oct 2021 23:33:14 +0800 Subject: [PATCH 2/4] replace async api with poll-based Signed-off-by: Ruihang Xia --- glommio/src/io/dma_file.rs | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index a6802ace3..9d1a002c1 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -245,7 +245,7 @@ impl DmaFile { /// /// The position must be aligned to for Direct I/O. In most platforms /// that means 512 bytes. - pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result { + pub fn read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAligned<'_> { let source = self.file.reactor.upgrade().unwrap().read_dma( self.as_raw_fd(), pos, @@ -253,8 +253,10 @@ impl DmaFile { self.pollable, self.file.scheduler.borrow().as_ref(), ); - let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?; - Ok(ReadResult::from_sliced_buffer(source, 0, read_size)) + PollDmaReadAligned { + source: Some(source), + file: &self.file, + } } /// Reads into buffer in buf from a specific position in the file. @@ -286,21 +288,6 @@ impl DmaFile { )) } - /// Poll-based version of [`Self::read_at_aligned`] - pub fn poll_read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAligned<'_> { - let source = self.file.reactor.upgrade().unwrap().read_dma( - self.as_raw_fd(), - pos, - size, - self.pollable, - self.file.scheduler.borrow().as_ref(), - ); - PollDmaReadAligned { - source: Some(source), - file: &self.file, - } - } - /// Submit many reads and process the results in a stream-like fashion via a /// [`ReadManyResult`]. /// @@ -442,7 +429,7 @@ impl DmaFile { } } -#[doc(hidden)] +/// Future of [`DmaFile::read_at_aligned`]. #[derive(Debug)] #[must_use = "future has no effect unless you .await or poll it"] pub struct PollDmaReadAligned<'a> { From 1d7ee1d35eea4c58fd37df170874cfb6fc0e6f79 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 24 Oct 2021 15:13:06 +0800 Subject: [PATCH 3/4] impl poll version of read_at() Signed-off-by: Ruihang Xia --- glommio/src/io/dma_file.rs | 61 ++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index 9d1a002c1..ed1169294 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -245,7 +245,12 @@ impl DmaFile { /// /// The position must be aligned to for Direct I/O. In most platforms /// that means 512 bytes. - pub fn read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAligned<'_> { + /// + /// Equal to + /// ```ignore + /// pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result; + /// ``` + pub fn read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAtAligned<'_> { let source = self.file.reactor.upgrade().unwrap().read_dma( self.as_raw_fd(), pos, @@ -253,7 +258,7 @@ impl DmaFile { self.pollable, self.file.scheduler.borrow().as_ref(), ); - PollDmaReadAligned { + PollDmaReadAtAligned { source: Some(source), file: &self.file, } @@ -267,7 +272,12 @@ impl DmaFile { /// /// If you can guarantee proper alignment, prefer [`Self::read_at_aligned`] /// instead - pub async fn read_at(&self, pos: u64, size: usize) -> Result { + /// + /// /// Equal to + /// ```ignore + /// pub async fn read_at(&self, pos: u64, size: usize) -> Result; + /// ``` + pub fn read_at(&self, pos: u64, size: usize) -> PollDmaReadAt<'_> { let eff_pos = self.align_down(pos); let b = (pos - eff_pos) as usize; @@ -279,13 +289,12 @@ impl DmaFile { self.pollable, self.file.scheduler.borrow().as_ref(), ); - - let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?; - Ok(ReadResult::from_sliced_buffer( - source, - b, - std::cmp::min(read_size, size), - )) + PollDmaReadAt { + source: Some(source), + file: &self.file, + begin: b, + size, + } } /// Submit many reads and process the results in a stream-like fashion via a @@ -432,12 +441,12 @@ impl DmaFile { /// Future of [`DmaFile::read_at_aligned`]. #[derive(Debug)] #[must_use = "future has no effect unless you .await or poll it"] -pub struct PollDmaReadAligned<'a> { +pub struct PollDmaReadAtAligned<'a> { source: Option, file: &'a GlommioFile, } -impl Future for PollDmaReadAligned<'_> { +impl Future for PollDmaReadAtAligned<'_> { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -455,6 +464,34 @@ impl Future for PollDmaReadAligned<'_> { } } +#[derive(Debug)] +#[must_use = "future has no effect unless you .await or poll it"] +pub struct PollDmaReadAt<'a> { + source: Option, + file: &'a GlommioFile, + begin: usize, + size: usize, +} +impl Future for PollDmaReadAt<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let read_size = self + .source + .as_ref() + .expect("Polling a finished task") + .poll_collect_rw(cx) + .map(|read_size| enhanced_try!(read_size, "Reading", self.file))?; + + read_size.map(|read_size| { + let offset = self.begin; + let len = self.size.min(read_size); + let source = self.get_mut().source.take().unwrap(); + Ok(ReadResult::from_sliced_buffer(source, offset, len)) + }) + } +} + #[cfg(test)] pub(crate) mod test { use super::*; From 58b79b514a4c62e2f02253cbf4296110a0ad609b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 24 Oct 2021 15:21:56 +0800 Subject: [PATCH 4/4] change ImmutableFile's read_at method Signed-off-by: Ruihang Xia --- glommio/src/io/dma_file.rs | 5 +++-- glommio/src/io/immutable_file.rs | 11 ++++++++--- glommio/src/io/mod.rs | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index ed1169294..27a776841 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -246,7 +246,7 @@ impl DmaFile { /// The position must be aligned to for Direct I/O. In most platforms /// that means 512 bytes. /// - /// Equal to + /// Equals to /// ```ignore /// pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result; /// ``` @@ -273,7 +273,7 @@ impl DmaFile { /// If you can guarantee proper alignment, prefer [`Self::read_at_aligned`] /// instead /// - /// /// Equal to + /// Equals to /// ```ignore /// pub async fn read_at(&self, pos: u64, size: usize) -> Result; /// ``` @@ -464,6 +464,7 @@ impl Future for PollDmaReadAtAligned<'_> { } } +/// Future of [`DmaFile::read_at`]. #[derive(Debug)] #[must_use = "future has no effect unless you .await or poll it"] pub struct PollDmaReadAt<'a> { diff --git a/glommio/src/io/immutable_file.rs b/glommio/src/io/immutable_file.rs index cf1663b36..5371b27e8 100644 --- a/glommio/src/io/immutable_file.rs +++ b/glommio/src/io/immutable_file.rs @@ -10,8 +10,8 @@ use crate::io::{ DmaStreamWriter, DmaStreamWriterBuilder, IoVec, + PollDmaReadAt, ReadManyResult, - ReadResult, ScheduledSource, }; use futures_lite::{future::poll_fn, io::AsyncWrite, Stream}; @@ -372,8 +372,13 @@ impl ImmutableFile { /// It is not necessary to respect the `O_DIRECT` alignment of the file, and /// this API will internally convert the positions and sizes to match, /// at a cost. - pub async fn read_at(&self, pos: u64, size: usize) -> Result { - self.stream_builder.file.read_at(pos, size).await + /// + /// Equals to + /// ```ignore + /// pub async fn read_at(&self, pos: u64, size: usize) -> Result; + /// ``` + pub fn read_at(&self, pos: u64, size: usize) -> PollDmaReadAt<'_> { + self.stream_builder.file.read_at(pos, size) } /// Submit many reads and process the results in a stream-like fashion via a diff --git a/glommio/src/io/mod.rs b/glommio/src/io/mod.rs index 647b8d3a2..5562d8c8a 100644 --- a/glommio/src/io/mod.rs +++ b/glommio/src/io/mod.rs @@ -163,7 +163,7 @@ pub use self::{ }, bulk_io::{IoVec, ReadManyResult}, directory::Directory, - dma_file::{CloseResult, DmaFile}, + dma_file::{CloseResult, DmaFile, PollDmaReadAt, PollDmaReadAtAligned}, dma_file_stream::{ DmaStreamReader, DmaStreamReaderBuilder,