Skip to content

Commit

Permalink
Simplify stream capture and remove thread (#213)
Browse files Browse the repository at this point in the history
* simplify stream capture and remove thread

* remove mutable self methods

* try_pull_sample timeout ZERO
  • Loading branch information
edgarriba authored Jan 3, 2025
1 parent 6eeb7e2 commit 746f5fb
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 77 deletions.
86 changes: 15 additions & 71 deletions crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::sync::{Arc, Mutex};

use crate::stream::error::StreamCaptureError;
use gst::prelude::*;
use kornia_image::{Image, ImageSize};
use kornia_image::Image;

/// Represents a stream capture pipeline using GStreamer.
pub struct StreamCapture {
pipeline: gst::Pipeline,
last_frame: Arc<Mutex<Option<Image<u8, 3>>>>,
running: bool,
handle: Option<std::thread::JoinHandle<()>>,
appsink: gst_app::AppSink,
}

impl StreamCapture {
Expand All @@ -35,64 +31,20 @@ impl StreamCapture {
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

let last_frame = Arc::new(Mutex::new(None));

appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let last_frame = last_frame.clone();
move |sink| match Self::extract_image_frame(sink) {
Ok(frame) => {
// SAFETY: we have a lock on the last_frame
*last_frame.lock().unwrap() = Some(frame);
Ok(gst::FlowSuccess::Ok)
}
Err(_) => Err(gst::FlowError::Error),
}
})
.build(),
);

Ok(Self {
pipeline,
last_frame,
running: false,
handle: None,
})
Ok(Self { pipeline, appsink })
}

/// Starts the stream capture pipeline and processes messages on the bus.
pub fn start(&mut self) -> Result<(), StreamCaptureError> {
pub fn start(&self) -> Result<(), StreamCaptureError> {
self.pipeline.set_state(gst::State::Playing)?;
self.running = true;

let bus = self
.pipeline
.bus()
.ok_or_else(|| StreamCaptureError::BusError)?;

let handle = std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
break;
}
MessageView::Error(err) => {
eprintln!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
break;
}
_ => (),
}
}
});

self.handle = Some(handle);
// handle bus messages
bus.set_sync_handler(|_bus, _msg| gst::BusSyncReply::Pass);

Ok(())
}
Expand All @@ -103,42 +55,34 @@ impl StreamCapture {
///
/// An Option containing the last captured Image or None if no image has been captured yet.
pub fn grab(&self) -> Result<Option<Image<u8, 3>>, StreamCaptureError> {
if !self.running {
return Err(StreamCaptureError::PipelineNotRunning);
}

// SAFETY: we have a lock on the last_frame
Ok(self.last_frame.lock().unwrap().take())
self.appsink
.try_pull_sample(gst::ClockTime::ZERO)
.map(Self::extract_image_frame)
.transpose()
}

/// Closes the stream capture pipeline.
pub fn close(&mut self) -> Result<(), StreamCaptureError> {
pub fn close(&self) -> Result<(), StreamCaptureError> {
let res = self.pipeline.send_event(gst::event::Eos::new());
if !res {
return Err(StreamCaptureError::SendEosError);
}

if let Some(handle) = self.handle.take() {
handle.join().expect("Failed to join thread");
}

self.pipeline.set_state(gst::State::Null)?;
self.running = false;

Ok(())
}

/// Extracts an image frame from the AppSink.
///
/// # Arguments
///
/// * `appsink` - The AppSink to extract the frame from.
/// * `sample` - The sample to extract the frame from.
///
/// # Returns
///
/// A Result containing the extracted Image or a StreamCaptureError.
fn extract_image_frame(appsink: &gst_app::AppSink) -> Result<Image<u8, 3>, StreamCaptureError> {
let sample = appsink.pull_sample()?;

fn extract_image_frame(sample: gst::Sample) -> Result<Image<u8, 3>, StreamCaptureError> {
let caps = sample
.caps()
.ok_or_else(|| StreamCaptureError::GetCapsError)?;
Expand All @@ -160,7 +104,7 @@ impl StreamCapture {
.ok_or_else(|| StreamCaptureError::GetBufferError)?
.map_readable()?;

Image::<u8, 3>::new(ImageSize { width, height }, buffer.as_slice().to_vec())
Image::<u8, 3>::new([width, height].into(), buffer.to_owned())
.map_err(|_| StreamCaptureError::CreateImageFrameError)
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/features/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;
let webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/filters/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new().with_size(size).build()?;
let webcam = V4L2CameraConfig::new().with_size(size).build()?;

// start the background pipeline
webcam.start()?;
Expand Down
2 changes: 1 addition & 1 deletion examples/rtspcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let rec = rerun::RecordingStreamBuilder::new("Kornia Rtsp Stream Capture App").spawn()?;

//// create a stream capture object
let mut capture = RTSPCameraConfig::new()
let capture = RTSPCameraConfig::new()
.with_settings(
&args.username,
&args.password,
Expand Down
2 changes: 1 addition & 1 deletion examples/video_write/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new()
let webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps as u32)
.with_size(frame_size)
Expand Down
2 changes: 1 addition & 1 deletion examples/webcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// create a webcam capture object with camera id 0
// and force the image size to 640x480
let mut webcam = V4L2CameraConfig::new()
let webcam = V4L2CameraConfig::new()
.with_camera_id(args.camera_id)
.with_fps(args.fps)
.with_size(ImageSize {
Expand Down
2 changes: 1 addition & 1 deletion kornia-viz/src/bin/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct KorniaApp {

impl Default for KorniaApp {
fn default() -> Self {
let mut capture = kornia::io::stream::V4L2CameraConfig::new()
let capture = kornia::io::stream::V4L2CameraConfig::new()
.with_camera_id(0)
.build()
.unwrap();
Expand Down

0 comments on commit 746f5fb

Please sign in to comment.