diff --git a/crates/kornia-io/Cargo.toml b/crates/kornia-io/Cargo.toml index ccfbccf6..891875df 100644 --- a/crates/kornia-io/Cargo.toml +++ b/crates/kornia-io/Cargo.toml @@ -20,8 +20,8 @@ log = { workspace = true } thiserror = { workspace = true } # optional dependencies -gst = { version = "0.23.0", package = "gstreamer", optional = true } -gst-app = { version = "0.23.0", package = "gstreamer-app", optional = true } +gst = { version = "0.23.4", package = "gstreamer", optional = true } +gst-app = { version = "0.23.4", package = "gstreamer-app", optional = true } memmap2 = "0.9.4" turbojpeg = { version = "1.0.0", optional = true } diff --git a/crates/kornia-io/src/stream/capture.rs b/crates/kornia-io/src/stream/capture.rs index 93ebc317..852d8507 100644 --- a/crates/kornia-io/src/stream/capture.rs +++ b/crates/kornia-io/src/stream/capture.rs @@ -1,11 +1,20 @@ +use std::sync::{Arc, Mutex}; + use crate::stream::error::StreamCaptureError; use gst::prelude::*; use kornia_image::Image; +// utility struct to store the frame buffer +struct FrameBuffer { + buffer: gst::MappedBuffer, + width: usize, + height: usize, +} + /// Represents a stream capture pipeline using GStreamer. pub struct StreamCapture { pipeline: gst::Pipeline, - appsink: gst_app::AppSink, + last_frame: Arc>>, } impl StreamCapture { @@ -31,7 +40,33 @@ impl StreamCapture { .dynamic_cast::() .map_err(StreamCaptureError::DowncastPipelineError)?; - Ok(Self { pipeline, appsink }) + let last_frame = Arc::new(Mutex::new(None)); + + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample({ + let last_frame = last_frame.clone(); + move |sink| { + last_frame + .lock() + .map_err(|_| gst::FlowError::Error) + .and_then(|mut guard| { + Self::extract_frame_buffer(sink) + .map(|frame_buffer| { + guard.replace(frame_buffer); + gst::FlowSuccess::Ok + }) + .map_err(|_| gst::FlowError::Error) + }) + } + }) + .build(), + ); + + Ok(Self { + pipeline, + last_frame, + }) } /// Starts the stream capture pipeline and processes messages on the bus. @@ -54,11 +89,23 @@ impl StreamCapture { /// # Returns /// /// An Option containing the last captured Image or None if no image has been captured yet. - pub fn grab(&self) -> Result>, StreamCaptureError> { - self.appsink - .try_pull_sample(gst::ClockTime::ZERO) - .map(Self::extract_image_frame) - .transpose() + pub fn grab(&mut self) -> Result>, StreamCaptureError> { + let mut last_frame = self + .last_frame + .lock() + .map_err(|_| StreamCaptureError::LockError)?; + + last_frame.take().map_or(Ok(None), |frame_buffer| { + // TODO: solve the zero copy issue + // https://discourse.gstreamer.org/t/zero-copy-video-frames/3856/2 + let img = Image::::new( + [frame_buffer.width, frame_buffer.height].into(), + frame_buffer.buffer.to_owned(), + ) + .map_err(|_| StreamCaptureError::CreateImageFrameError)?; + + Ok(Some(img)) + }) } /// Closes the stream capture pipeline. @@ -73,16 +120,18 @@ impl StreamCapture { Ok(()) } - /// Extracts an image frame from the AppSink. + /// Extracts a frame buffer from the AppSink. /// /// # Arguments /// - /// * `sample` - The sample to extract the frame from. + /// * `appsink` - The AppSink to extract the frame buffer from. /// /// # Returns /// - /// A Result containing the extracted Image or a StreamCaptureError. - fn extract_image_frame(sample: gst::Sample) -> Result, StreamCaptureError> { + /// A Result containing the extracted FrameBuffer or a StreamCaptureError. + fn extract_frame_buffer(appsink: &gst_app::AppSink) -> Result { + let sample = appsink.pull_sample()?; + let caps = sample .caps() .ok_or_else(|| StreamCaptureError::GetCapsError)?; @@ -100,12 +149,18 @@ impl StreamCapture { .map_err(|_| StreamCaptureError::GetWidthError)? as usize; let buffer = sample - .buffer() + .buffer_owned() .ok_or_else(|| StreamCaptureError::GetBufferError)? - .map_readable()?; + .into_mapped_buffer_readable() + .map_err(|_| StreamCaptureError::GetBufferError)?; + + let frame_buffer = FrameBuffer { + buffer, + width, + height, + }; - Image::::new([width, height].into(), buffer.to_owned()) - .map_err(|_| StreamCaptureError::CreateImageFrameError) + Ok(frame_buffer) } } diff --git a/crates/kornia-io/src/stream/error.rs b/crates/kornia-io/src/stream/error.rs index 6b811e39..30f150eb 100644 --- a/crates/kornia-io/src/stream/error.rs +++ b/crates/kornia-io/src/stream/error.rs @@ -79,6 +79,10 @@ pub enum StreamCaptureError { /// An error occurred when the pipeline is not running. #[error("Pipeline is not running")] PipelineNotRunning, + + /// An error occurred when the allocator is not found. + #[error("Cannot lock the last frame")] + LockError, } // ensure that can be sent over threads diff --git a/examples/features/src/main.rs b/examples/features/src/main.rs index d5310e69..2e13e709 100644 --- a/examples/features/src/main.rs +++ b/examples/features/src/main.rs @@ -18,7 +18,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let webcam = V4L2CameraConfig::new().with_size(size).build()?; + let mut webcam = V4L2CameraConfig::new().with_size(size).build()?; // start the background pipeline webcam.start()?; diff --git a/examples/filters/src/main.rs b/examples/filters/src/main.rs index dd8b918a..eae37c2d 100644 --- a/examples/filters/src/main.rs +++ b/examples/filters/src/main.rs @@ -45,7 +45,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let webcam = V4L2CameraConfig::new().with_size(size).build()?; + let mut webcam = V4L2CameraConfig::new().with_size(size).build()?; // start the background pipeline webcam.start()?; diff --git a/examples/rtspcam/src/main.rs b/examples/rtspcam/src/main.rs index c5e2612a..9c2db100 100644 --- a/examples/rtspcam/src/main.rs +++ b/examples/rtspcam/src/main.rs @@ -34,7 +34,7 @@ fn main() -> Result<(), Box> { let rec = rerun::RecordingStreamBuilder::new("Kornia Rtsp Stream Capture App").spawn()?; //// create a stream capture object - let capture = RTSPCameraConfig::new() + let mut capture = RTSPCameraConfig::new() .with_settings( &args.username, &args.password, diff --git a/examples/video_write/src/main.rs b/examples/video_write/src/main.rs index 651fca75..90f2ef95 100644 --- a/examples/video_write/src/main.rs +++ b/examples/video_write/src/main.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let webcam = V4L2CameraConfig::new() + let mut webcam = V4L2CameraConfig::new() .with_camera_id(args.camera_id) .with_fps(args.fps as u32) .with_size(frame_size) diff --git a/examples/webcam/src/main.rs b/examples/webcam/src/main.rs index 1c2b801b..ff3af0c6 100644 --- a/examples/webcam/src/main.rs +++ b/examples/webcam/src/main.rs @@ -30,7 +30,7 @@ fn main() -> Result<(), Box> { // create a webcam capture object with camera id 0 // and force the image size to 640x480 - let webcam = V4L2CameraConfig::new() + let mut webcam = V4L2CameraConfig::new() .with_camera_id(args.camera_id) .with_fps(args.fps) .with_size(ImageSize {