Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework thumbnailer #26

Merged
merged 4 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions pkg/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Glimesh/waveguide/pkg/orchestrator"
"github.com/Glimesh/waveguide/pkg/service"
"github.com/Glimesh/waveguide/pkg/types"
"github.com/pion/rtp"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -152,11 +153,8 @@ func (ctrl *Control) StartStream(channelID types.ChannelID) (*Stream, error) {

go ctrl.setupHeartbeat(channelID)

// Really gross, I'm sorry.
whepEndpoint := fmt.Sprintf("%s/whep/endpoint", ctrl.HTTPServerURL())
go func() {
err := stream.thumbnailer(ctx, whepEndpoint)
if err != nil {
if err := stream.Ingest(ctx); err != nil { //nolint not shadowed
stream.log.Error(err)
ctrl.StopStream(channelID)
}
Expand All @@ -180,6 +178,7 @@ func (ctrl *Control) StopStream(channelID types.ChannelID) error {
stream.Stop()
}
ctrl.metadataCollectors[channelID] <- true
ctrl.log.Debug("sent metadata collector signal")

// Make sure we send stop commands to everyone, and don't return until they've all been sent
serviceErr := ctrl.service.EndStream(stream.StreamID)
Expand Down Expand Up @@ -295,6 +294,14 @@ func (ctrl *Control) sendThumbnail(channelID types.ChannelID) (err error) {
return err
}

// signal the stream thumnailer to get me some thumbnails
select {
case stream.requestThumbnail <- struct{}{}:
default:
}

ctrl.log.Debug("thumbnail requested")

var data []byte
// Since stream.lastThumbnail is a buffered chan, let's read all values to get the newest
for len(stream.lastThumbnail) > 0 {
Expand Down Expand Up @@ -343,22 +350,24 @@ func (ctrl *Control) sendThumbnail(channelID types.ChannelID) (err error) {
}

func (ctrl *Control) newStream(channelID types.ChannelID, cancelFunc context.CancelFunc) (*Stream, error) {
stream := &Stream{
log: ctrl.log.WithField("channel_id", channelID),

cancelFunc: cancelFunc,
authenticated: true,
mediaStarted: false,
ChannelID: channelID,
stopHeartbeat: make(chan struct{}, 1),
stopThumbnailer: make(chan struct{}, 1),
// 10 keyframes in 5 seconds is probably a bit extreme
lastThumbnail: make(chan []byte, 10),
startTime: time.Now().Unix(),
totalAudioPackets: 0,
totalVideoPackets: 0,
clientVendorName: "",
clientVendorVersion: "",
stream := &Stream{ //nolint exhaustive struct
ChannelID: channelID,

log: ctrl.log.WithField("channel_id", channelID),
whepURI: ctrl.HTTPServerURL() + "/whep/endpoint/" + channelID.String(),
authenticated: true,

cancelFunc: cancelFunc,
keyframer: NewKeyframer(),
rtpIngest: make(chan *rtp.Packet),
stopHeartbeat: make(chan struct{}, 1),
stopThumbnailer: make(chan struct{}, 1),
thumbnailReceiver: make(chan *rtp.Packet, 50),
requestThumbnail: make(chan struct{}, 1),

lastThumbnail: make(chan []byte, 1),

startTime: time.Now().Unix(),
}

if _, exists := ctrl.streams[channelID]; exists {
Expand Down
8 changes: 4 additions & 4 deletions pkg/control/keyframer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ func (kf *Keyframer) Reset() {
kf.packets = make(map[uint16][]byte)
}

func (kf *Keyframer) WriteRTP(p *rtp.Packet) []byte {
func (kf *Keyframer) NewKeyframe(p *rtp.Packet) []byte {
// fmt.Printf("frameStarted=%t\n", kf.frameStarted)
// Frame has started, but timestamps don't match, continue
if kf.frameStarted && kf.timestamp != p.Timestamp {
fmt.Println("frame started but timestamps don't match:", kf.timestamp, p.Timestamp)
// kf.Reset()
return nil
}

Expand All @@ -47,6 +49,7 @@ func (kf *Keyframer) WriteRTP(p *rtp.Packet) []byte {
kf.frameStarted = true
}
if !kf.frameStarted {
// fmt.Println("keyframe did not start")
return nil
}

Expand Down Expand Up @@ -79,10 +82,7 @@ func (kf *Keyframer) WriteRTP(p *rtp.Packet) []byte {
kf.lastFullKeyframe = newFrame

return newFrame
} else {
// fmt.Println("No marker, no end")
}
// fmt.Println(p)

return nil
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/control/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/Glimesh/waveguide/pkg/types"

"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
)
Expand All @@ -24,17 +25,22 @@ type Stream struct {

// authenticated is set after the stream has successfully authed with a remote service
authenticated bool

whepURI string

// mediaStarted is set after media bytes have come in from the client
mediaStarted bool
hasSomeAudio bool
hasSomeVideo bool

stopHeartbeat chan struct{}

// channel used to signal thumbnailer to stop
stopThumbnailer chan struct{}

keyframer *Keyframer
rtpIngest chan *rtp.Packet
lastThumbnail chan []byte
// channel used to signal thumbnailer to stop
stopThumbnailer chan struct{}
stopHeartbeat chan struct{}
requestThumbnail chan struct{}
thumbnailReceiver chan *rtp.Packet

ChannelID types.ChannelID
StreamID types.StreamID
Expand Down Expand Up @@ -93,11 +99,10 @@ func (s *Stream) Stop() {

s.stopHeartbeat <- struct{}{} // not being used anywhere, is it really needed?

s.stopThumbnailer <- struct{}{}
s.log.Debug("sent stop thumbnailer signal")

s.cancelFunc()
s.stopped = true
s.log.Debug("sent stop thumbnailer signal")

s.log.Debug("canceled stream ctx")
}

Expand Down
Loading