Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Gstreamer Buffer map in StreamCapture #216

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/kornia-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ log = { workspace = true }
thiserror = { workspace = true }

# optional dependencies
gst = { version = "0.23.0", package = "gstreamer", optional = true }
gst-app = { version = "0.23.0", package = "gstreamer-app", optional = true }
gst = { version = "0.23.4", package = "gstreamer", optional = true }
gst-app = { version = "0.23.4", package = "gstreamer-app", optional = true }
memmap2 = "0.9.4"
turbojpeg = { version = "1.0.0", optional = true }

Expand Down
85 changes: 70 additions & 15 deletions crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
use std::sync::{Arc, Mutex};

use crate::stream::error::StreamCaptureError;
use gst::prelude::*;
use kornia_image::Image;

// utility struct to store the frame buffer
struct FrameBuffer {
buffer: gst::MappedBuffer<gst::buffer::Readable>,
width: usize,
height: usize,
}

/// Represents a stream capture pipeline using GStreamer.
pub struct StreamCapture {
pipeline: gst::Pipeline,
appsink: gst_app::AppSink,
last_frame: Arc<Mutex<Option<FrameBuffer>>>,
}

impl StreamCapture {
Expand All @@ -31,7 +40,33 @@ impl StreamCapture {
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

Ok(Self { pipeline, appsink })
let last_frame = Arc::new(Mutex::new(None));

appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let last_frame = last_frame.clone();
move |sink| {
last_frame
.lock()
.map_err(|_| gst::FlowError::Error)
.and_then(|mut guard| {
Self::extract_frame_buffer(sink)
.map(|frame_buffer| {
guard.replace(frame_buffer);
gst::FlowSuccess::Ok
})
.map_err(|_| gst::FlowError::Error)
})
}
})
.build(),
);

Ok(Self {
pipeline,
last_frame,
})
}

/// Starts the stream capture pipeline and processes messages on the bus.
Expand All @@ -54,11 +89,23 @@ impl StreamCapture {
/// # Returns
///
/// An Option containing the last captured Image or None if no image has been captured yet.
pub fn grab(&self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
self.appsink
.try_pull_sample(gst::ClockTime::ZERO)
.map(Self::extract_image_frame)
.transpose()
pub fn grab(&mut self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
let mut last_frame = self
.last_frame
.lock()
.map_err(|_| StreamCaptureError::LockError)?;

last_frame.take().map_or(Ok(None), |frame_buffer| {
// TODO: solve the zero copy issue
// https://discourse.gstreamer.org/t/zero-copy-video-frames/3856/2
let img = Image::<u8, 3>::new(
[frame_buffer.width, frame_buffer.height].into(),
frame_buffer.buffer.to_owned(),
)
.map_err(|_| StreamCaptureError::CreateImageFrameError)?;

Ok(Some(img))
})
}

/// Closes the stream capture pipeline.
Expand All @@ -73,16 +120,18 @@ impl StreamCapture {
Ok(())
}

/// Extracts an image frame from the AppSink.
/// Extracts a frame buffer from the AppSink.
///
/// # Arguments
///
/// * `sample` - The sample to extract the frame from.
/// * `appsink` - The AppSink to extract the frame buffer from.
///
/// # Returns
///
/// A Result containing the extracted Image or a StreamCaptureError.
fn extract_image_frame(sample: gst::Sample) -> Result<Image<u8, 3>, StreamCaptureError> {
/// A Result containing the extracted FrameBuffer or a StreamCaptureError.
fn extract_frame_buffer(appsink: &gst_app::AppSink) -> Result<FrameBuffer, StreamCaptureError> {
let sample = appsink.pull_sample()?;

let caps = sample
.caps()
.ok_or_else(|| StreamCaptureError::GetCapsError)?;
Expand All @@ -100,12 +149,18 @@ impl StreamCapture {
.map_err(|_| StreamCaptureError::GetWidthError)? as usize;

let buffer = sample
.buffer()
.buffer_owned()
.ok_or_else(|| StreamCaptureError::GetBufferError)?
.map_readable()?;
.into_mapped_buffer_readable()
.map_err(|_| StreamCaptureError::GetBufferError)?;

let frame_buffer = FrameBuffer {
buffer,
width,
height,
};

Image::<u8, 3>::new([width, height].into(), buffer.to_owned())
.map_err(|_| StreamCaptureError::CreateImageFrameError)
Ok(frame_buffer)
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/kornia-io/src/stream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub enum StreamCaptureError {
/// An error occurred when the pipeline is not running.
#[error("Pipeline is not running")]
PipelineNotRunning,

/// An error occurred when the allocator is not found.
#[error("Cannot lock the last frame")]
LockError,
}

// ensure that can be sent over threads
Expand Down
2 changes: 1 addition & 1 deletion examples/features/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new().with_size(size).build()?;
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/filters/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new().with_size(size).build()?;
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/rtspcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("Kornia Rtsp Stream Capture App").spawn()?;

//// create a stream capture object
let capture = RTSPCameraConfig::new()
let mut capture = RTSPCameraConfig::new()
.with_settings(
&args.username,
&args.password,
Expand Down
2 changes: 1 addition & 1 deletion examples/video_write/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new()
let mut webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps as u32)
.with_size(frame_size)
Expand Down
2 changes: 1 addition & 1 deletion examples/webcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let webcam = V4L2CameraConfig::new()
let mut webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps)
.with_size(ImageSize {
Expand Down
Loading