diff --git a/client_stream_processor_fmp4.go b/client_stream_processor_fmp4.go index d50b8fc..cbb6c09 100644 --- a/client_stream_processor_fmp4.go +++ b/client_stream_processor_fmp4.go @@ -162,6 +162,7 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg dts := p.timeConv.convert(int64(leadingPartTrack.BaseTime), leadingPartTrackProc.track.track.ClockRate) p.timeConv.setNTP(*seg.dateTime, dts, leadingPartTrackProc.track.track.ClockRate) } + p.timeConv.setLeadingNTPReceived() } partTrackCount := 0 @@ -174,7 +175,7 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg } dts := p.timeConv.convert(int64(partTrack.BaseTime), trackProc.track.track.ClockRate) - ntp := p.timeConv.getNTP(dts, trackProc.track.track.ClockRate) + ntp := p.timeConv.getNTP(ctx, dts, trackProc.track.track.ClockRate) err := trackProc.push(ctx, &procEntryFMP4{ partTrack: partTrack, diff --git a/client_stream_processor_mpegts.go b/client_stream_processor_mpegts.go index 4fa0b22..c17ede2 100644 --- a/client_stream_processor_mpegts.go +++ b/client_stream_processor_mpegts.go @@ -214,9 +214,10 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs if p.curSegment.dateTime != nil { p.timeConv.setNTP(*p.curSegment.dateTime, dts) } + p.timeConv.setLeadingNTPReceived() } - ntp := p.timeConv.getNTP(dts) + ntp := p.timeConv.getNTP(ctx, dts) return trackProc.push(ctx, &procEntryMPEGTS{ pts: pts, diff --git a/client_time_conv_fmp4.go b/client_time_conv_fmp4.go index 11f406d..4e79017 100644 --- a/client_time_conv_fmp4.go +++ b/client_time_conv_fmp4.go @@ -1,6 +1,7 @@ package gohlslib import ( + "context" "sync" "time" ) @@ -14,9 +15,12 @@ type clientTimeConvFMP4 struct { ntpValue time.Time ntpTimestamp int64 ntpClockRate int + + chLeadingNTPReceived chan struct{} } func (ts *clientTimeConvFMP4) initialize() { + ts.chLeadingNTPReceived = make(chan struct{}) } func (ts *clientTimeConvFMP4) convert(v int64, clockRate int) int64 { @@ -33,7 +37,22 @@ func (ts *clientTimeConvFMP4) setNTP(value time.Time, timestamp int64, clockRate ts.ntpClockRate = clockRate } -func (ts *clientTimeConvFMP4) getNTP(timestamp int64, clockRate int) *time.Time { +func (ts *clientTimeConvFMP4) setLeadingNTPReceived() { + select { + case <-ts.chLeadingNTPReceived: + return + default: + } + close(ts.chLeadingNTPReceived) +} + +func (ts *clientTimeConvFMP4) getNTP(ctx context.Context, timestamp int64, clockRate int) *time.Time { + select { + case <-ts.chLeadingNTPReceived: + case <-ctx.Done(): + return nil + } + ts.mutex.Lock() defer ts.mutex.Unlock() diff --git a/client_time_conv_mpegts.go b/client_time_conv_mpegts.go index 9162c68..b3a7fd6 100644 --- a/client_time_conv_mpegts.go +++ b/client_time_conv_mpegts.go @@ -1,6 +1,7 @@ package gohlslib import ( + "context" "sync" "time" @@ -15,11 +16,14 @@ type clientTimeConvMPEGTS struct { ntpAvailable bool ntpValue time.Time ntpTimestamp int64 + + chLeadingNTPReceived chan struct{} } func (ts *clientTimeConvMPEGTS) initialize() { ts.td = mpegts.NewTimeDecoder2() ts.td.Decode(ts.startDTS) + ts.chLeadingNTPReceived = make(chan struct{}) } func (ts *clientTimeConvMPEGTS) convert(v int64) int64 { @@ -38,7 +42,22 @@ func (ts *clientTimeConvMPEGTS) setNTP(value time.Time, timestamp int64) { ts.ntpTimestamp = timestamp } -func (ts *clientTimeConvMPEGTS) getNTP(timestamp int64) *time.Time { +func (ts *clientTimeConvMPEGTS) setLeadingNTPReceived() { + select { + case <-ts.chLeadingNTPReceived: + return + default: + } + close(ts.chLeadingNTPReceived) +} + +func (ts *clientTimeConvMPEGTS) getNTP(ctx context.Context, timestamp int64) *time.Time { + select { + case <-ts.chLeadingNTPReceived: + case <-ctx.Done(): + return nil + } + ts.mutex.Lock() defer ts.mutex.Unlock()