Skip to content

Commit

Permalink
Run eventPool heartbeat to wake up waiters in the slow path (#685)
Browse files Browse the repository at this point in the history
Run eventPool heartbeat to wake up waiters in the slow path
  • Loading branch information
vadimalekseev authored Oct 30, 2024
1 parent 2da508d commit a90df8b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
43 changes: 39 additions & 4 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,21 @@ type eventPool struct {

getMu *sync.Mutex
getCond *sync.Cond

stopped *atomic.Bool
runHeartbeatOnce *sync.Once
slowWaiters *atomic.Int64
}

func newEventPool(capacity, avgEventSize int) *eventPool {
eventPool := &eventPool{
avgEventSize: avgEventSize,
capacity: capacity,
getMu: &sync.Mutex{},
backCounter: *atomic.NewInt64(int64(capacity)),
avgEventSize: avgEventSize,
capacity: capacity,
getMu: &sync.Mutex{},
backCounter: *atomic.NewInt64(int64(capacity)),
runHeartbeatOnce: &sync.Once{},
stopped: atomic.NewBool(false),
slowWaiters: atomic.NewInt64(0),
}

eventPool.getCond = sync.NewCond(eventPool.getMu)
Expand Down Expand Up @@ -270,10 +277,17 @@ func (p *eventPool) get() *Event {
// slow path
runtime.Gosched()
} else {
p.runHeartbeatOnce.Do(func() {
// Run heartbeat to periodically wake up goroutines that are waiting.
go p.wakeupWaiters()
})

// slowest path
p.slowWaiters.Inc()
p.getMu.Lock()
p.getCond.Wait()
p.getMu.Unlock()
p.slowWaiters.Dec()
tries = 0
}
}
Expand Down Expand Up @@ -317,6 +331,27 @@ func (p *eventPool) back(event *Event) {
p.getCond.Broadcast()
}

func (p *eventPool) wakeupWaiters() {
for {
if p.stopped.Load() {
return
}

const wakeupInterval = 5 * time.Second
time.Sleep(wakeupInterval)
waiters := p.slowWaiters.Load()
eventsAvailable := p.inUseEvents.Load() < int64(p.capacity)
if waiters > 0 && eventsAvailable {
// There are events in the pool, wake up waiting goroutines.
p.getCond.Broadcast()
}
}
}

func (p *eventPool) stop() {
p.stopped.Store(true)
}

func (p *eventPool) dump() string {
out := logger.Cond(len(p.events) == 0, logger.Header("no events"), func() string {
o := logger.Header("events")
Expand Down
22 changes: 22 additions & 0 deletions pipeline/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,25 @@ func BenchmarkEventPoolSlowestPath(b *testing.B) {
wg.Wait()
}
}

func TestSlowPath(t *testing.T) {
t.Parallel()
const (
poolCapacity = 256
concurrency = 5_000
)
pool := newEventPool(poolCapacity, DefaultAvgInputEventSize)
for i := 0; i < 1_000; i++ {
wg := new(sync.WaitGroup)
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
e := pool.get()
runtime.Gosched()
pool.back(e)
}()
}
wg.Wait()
}
}
2 changes: 2 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func (p *Pipeline) Stop() {
p.output.Stop()

p.shouldStop.Store(true)

p.eventPool.stop()
}

func (p *Pipeline) SetInput(info *InputPluginInfo) {
Expand Down

0 comments on commit a90df8b

Please sign in to comment.