From e3a5cdbe2e938dcf7474665dfd8da4a78414d2e0 Mon Sep 17 00:00:00 2001 From: Muhammad Hassan Date: Thu, 16 Mar 2023 19:14:18 +0500 Subject: [PATCH] Fix cancellation of thumbnailer loop on input stream close --- internal/inputs/rtmp/rtmp.go | 3 ++- pkg/control/control.go | 13 +++++++------ pkg/control/stream.go | 15 +++++++++++++++ pkg/control/thumbnailer.go | 37 +++++++++++++++++++----------------- 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/internal/inputs/rtmp/rtmp.go b/internal/inputs/rtmp/rtmp.go index 361ff25..eda4176 100644 --- a/internal/inputs/rtmp/rtmp.go +++ b/internal/inputs/rtmp/rtmp.go @@ -208,9 +208,10 @@ func (h *connHandler) OnPublish(ctx *gortmp.StreamContext, timestamp uint32, cmd } func (h *connHandler) OnClose() { - h.log.Info("OnClose") + h.log.Info("RTMP OnClose") h.stopMetadataCollection <- true + h.log.Debug("sent stop metadata collection signal") // We only want to publish the stop if it's ours // We also don't want control to stop the stream if we're respond to a stop diff --git a/pkg/control/control.go b/pkg/control/control.go index 8d1f9e1..9143405 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -128,7 +128,9 @@ func (ctrl *Control) Authenticate(channelID types.ChannelID, streamKey types.Str } func (ctrl *Control) StartStream(channelID types.ChannelID) (*Stream, error) { - stream, err := ctrl.newStream(channelID) + ctx, cancel := context.WithCancel(ctrl.Context()) + + stream, err := ctrl.newStream(channelID, cancel) if err != nil { return nil, err } @@ -153,7 +155,7 @@ func (ctrl *Control) StartStream(channelID types.ChannelID) (*Stream, error) { // Really gross, I'm sorry. whepEndpoint := fmt.Sprintf("%s/whep/endpoint", ctrl.HTTPServerURL()) go func() { - err := stream.thumbnailer(ctrl.Context(), whepEndpoint) + err := stream.thumbnailer(ctx, whepEndpoint) if err != nil { stream.log.Error(err) ctrl.StopStream(channelID) @@ -169,10 +171,8 @@ func (ctrl *Control) StopStream(channelID types.ChannelID) error { return err } - stream.log.Infof("Stopping stream") + stream.Stop() - stream.stopHeartbeat <- struct{}{} // not being used anywhere, is it really needed? - stream.stopThumbnailer <- struct{}{} ctrl.metadataCollectors[channelID] <- true // Make sure we send stop commands to everyone, and don't return until they've all been sent @@ -338,10 +338,11 @@ func (ctrl *Control) sendThumbnail(channelID types.ChannelID) (err error) { return nil } -func (ctrl *Control) newStream(channelID types.ChannelID) (*Stream, error) { +func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.CancelFunc) (*Stream, error) { stream := &Stream{ log: ctrl.log.WithField("channel_id", channelID), + cancelFunc: cancelFunc, authenticated: true, mediaStarted: false, ChannelID: channelID, diff --git a/pkg/control/stream.go b/pkg/control/stream.go index e24a16e..730834f 100644 --- a/pkg/control/stream.go +++ b/pkg/control/stream.go @@ -1,6 +1,7 @@ package control import ( + "context" "errors" "github.com/Glimesh/waveguide/pkg/types" @@ -18,6 +19,8 @@ type StreamTrack struct { type Stream struct { log logrus.FieldLogger + cancelFunc context.CancelFunc + // authenticated is set after the stream has successfully authed with a remote service authenticated bool // mediaStarted is set after media bytes have come in from the client @@ -83,3 +86,15 @@ func (s *Stream) ReportMetadata(metadatas ...Metadata) error { return nil } + +func (s *Stream) Stop() { + s.log.Infof("stopping stream") + + s.stopHeartbeat <- struct{}{} // not being used anywhere, is it really needed? + + s.stopThumbnailer <- struct{}{} + s.log.Debug("sent stop thumbnailer signal") + + s.cancelFunc() + s.log.Debug("canceled stream ctx") +} diff --git a/pkg/control/thumbnailer.go b/pkg/control/thumbnailer.go index fa2007d..c0d7bd2 100644 --- a/pkg/control/thumbnailer.go +++ b/pkg/control/thumbnailer.go @@ -29,24 +29,27 @@ func (s *Stream) thumbnailer(ctx context.Context, whepEndpoint string) error { if codec.MimeType == "video/H264" { for { - if ctx.Err() != nil { + log.Debug("OnTrack loop") + select { + case <-ctx.Done(): + log.Debug("received ctx cancel signal") return - } - - // Read RTP Packets in a loop - p, _, readErr := track.ReadRTP() - if readErr != nil { - // Don't kill the thumbnailer after one weird RTP packet - continue - } - - keyframe := kfer.WriteRTP(p) - if keyframe != nil { - // fmt.Printf("!!! PEER KEYFRAME !!! %s\n\n", kfer) - // saveImage(int(p.SequenceNumber), keyframe) - // os.WriteFile(fmt.Sprintf("%d-peer.h264", p.SequenceNumber), keyframe, 0666) - s.lastThumbnail <- keyframe - kfer.Reset() + default: + // Read RTP Packets in a loop + p, _, readErr := track.ReadRTP() + if readErr != nil { + // Don't kill the thumbnailer after one weird RTP packet + continue + } + + keyframe := kfer.WriteRTP(p) + if keyframe != nil { + // fmt.Printf("!!! PEER KEYFRAME !!! %s\n\n", kfer) + // saveImage(int(p.SequenceNumber), keyframe) + // os.WriteFile(fmt.Sprintf("%d-peer.h264", p.SequenceNumber), keyframe, 0666) + s.lastThumbnail <- keyframe + kfer.Reset() + } } } }