Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancellation of thumbnailer loop on input stream close #21

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/inputs/rtmp/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ func (h *connHandler) OnPublish(ctx *gortmp.StreamContext, timestamp uint32, cmd
}

func (h *connHandler) OnClose() {
h.log.Info("OnClose")
h.log.Info("RTMP OnClose")

h.stopMetadataCollection <- true
h.log.Debug("sent stop metadata collection signal")

// We only want to publish the stop if it's ours
// We also don't want control to stop the stream if we're respond to a stop
Expand Down
13 changes: 7 additions & 6 deletions pkg/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (ctrl *Control) Authenticate(channelID types.ChannelID, streamKey types.Str
}

func (ctrl *Control) StartStream(channelID types.ChannelID) (*Stream, error) {
stream, err := ctrl.newStream(channelID)
ctx, cancel := context.WithCancel(ctrl.Context())

stream, err := ctrl.newStream(channelID, cancel)
if err != nil {
return nil, err
}
Expand All @@ -153,7 +155,7 @@ func (ctrl *Control) StartStream(channelID types.ChannelID) (*Stream, error) {
// Really gross, I'm sorry.
whepEndpoint := fmt.Sprintf("%s/whep/endpoint", ctrl.HTTPServerURL())
go func() {
err := stream.thumbnailer(ctrl.Context(), whepEndpoint)
err := stream.thumbnailer(ctx, whepEndpoint)
if err != nil {
stream.log.Error(err)
ctrl.StopStream(channelID)
Expand All @@ -169,10 +171,8 @@ func (ctrl *Control) StopStream(channelID types.ChannelID) error {
return err
}

stream.log.Infof("Stopping stream")
stream.Stop()

stream.stopHeartbeat <- struct{}{} // not being used anywhere, is it really needed?
stream.stopThumbnailer <- struct{}{}
ctrl.metadataCollectors[channelID] <- true

// Make sure we send stop commands to everyone, and don't return until they've all been sent
Expand Down Expand Up @@ -338,10 +338,11 @@ func (ctrl *Control) sendThumbnail(channelID types.ChannelID) (err error) {
return nil
}

func (ctrl *Control) newStream(channelID types.ChannelID) (*Stream, error) {
func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.CancelFunc) (*Stream, error) {
stream := &Stream{
log: ctrl.log.WithField("channel_id", channelID),

cancelFunc: cancelFunc,
authenticated: true,
mediaStarted: false,
ChannelID: channelID,
Expand Down
15 changes: 15 additions & 0 deletions pkg/control/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package control

import (
"context"
"errors"

"github.com/Glimesh/waveguide/pkg/types"
Expand All @@ -18,6 +19,8 @@ type StreamTrack struct {
type Stream struct {
log logrus.FieldLogger

cancelFunc context.CancelFunc

// authenticated is set after the stream has successfully authed with a remote service
authenticated bool
// mediaStarted is set after media bytes have come in from the client
Expand Down Expand Up @@ -83,3 +86,15 @@ func (s *Stream) ReportMetadata(metadatas ...Metadata) error {

return nil
}

func (s *Stream) Stop() {
s.log.Infof("stopping stream")

s.stopHeartbeat <- struct{}{} // not being used anywhere, is it really needed?

s.stopThumbnailer <- struct{}{}
s.log.Debug("sent stop thumbnailer signal")

s.cancelFunc()
s.log.Debug("canceled stream ctx")
}
37 changes: 20 additions & 17 deletions pkg/control/thumbnailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,27 @@ func (s *Stream) thumbnailer(ctx context.Context, whepEndpoint string) error {

if codec.MimeType == "video/H264" {
for {
if ctx.Err() != nil {
log.Debug("OnTrack loop")
select {
case <-ctx.Done():
log.Debug("received ctx cancel signal")
return
}

// Read RTP Packets in a loop
p, _, readErr := track.ReadRTP()
if readErr != nil {
// Don't kill the thumbnailer after one weird RTP packet
continue
}

keyframe := kfer.WriteRTP(p)
if keyframe != nil {
// fmt.Printf("!!! PEER KEYFRAME !!! %s\n\n", kfer)
// saveImage(int(p.SequenceNumber), keyframe)
// os.WriteFile(fmt.Sprintf("%d-peer.h264", p.SequenceNumber), keyframe, 0666)
s.lastThumbnail <- keyframe
kfer.Reset()
default:
// Read RTP Packets in a loop
p, _, readErr := track.ReadRTP()
if readErr != nil {
// Don't kill the thumbnailer after one weird RTP packet
continue
}

keyframe := kfer.WriteRTP(p)
if keyframe != nil {
// fmt.Printf("!!! PEER KEYFRAME !!! %s\n\n", kfer)
// saveImage(int(p.SequenceNumber), keyframe)
// os.WriteFile(fmt.Sprintf("%d-peer.h264", p.SequenceNumber), keyframe, 0666)
s.lastThumbnail <- keyframe
kfer.Reset()
}
}
}
}
Expand Down