Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to random load balance to >1 server nodes #631

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package tchannel

import (
"container/heap"
"errors"
"strings"
"sync"
Expand Down Expand Up @@ -63,19 +62,21 @@ type Connectable interface {
type PeerList struct {
sync.RWMutex

parent *RootPeerList
peersByHostPort map[string]*peerScore
peerHeap *peerHeap
scoreCalculator ScoreCalculator
lastSelected uint64
parent *RootPeerList
peersByHostPort map[string]*peerScore
peerHeap *peerHeap
scoreCalculator ScoreCalculator
peerConnectionCount uint32
lastSelected uint64
}

func newPeerList(root *RootPeerList) *PeerList {
return &PeerList{
parent: root,
peersByHostPort: make(map[string]*peerScore),
scoreCalculator: newPreferIncomingCalculator(),
peerHeap: newPeerHeap(),
parent: root,
peersByHostPort: make(map[string]*peerScore),
scoreCalculator: newPreferIncomingCalculator(),
peerHeap: newPeerHeap(),
peerConnectionCount: 1,
}
}

Expand All @@ -91,6 +92,20 @@ func (l *PeerList) SetStrategy(sc ScoreCalculator) {
}
}

// SetPeerConnectionCount sets the number of peer connections to be used in
// combination with the ScoreCalculator to achieve a random load balancing
// of a single client node to `peerConnectionCount` number of server nodes
func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) error {
l.Lock()
defer l.Unlock()

if peerConnectionCount == 0 {
return errors.New("peer connection count must be greater than 0")
}
l.peerConnectionCount = peerConnectionCount
return nil
}

// Siblings don't share peer lists (though they take care not to double-connect
// to the same hosts).
func (l *PeerList) newSibling() *PeerList {
Expand Down Expand Up @@ -175,8 +190,8 @@ func (l *PeerList) Remove(hostPort string) error {
return nil
}
func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) *Peer {
var psPopList []*peerScore
var ps *peerScore
var chosenPSList = make([]*peerScore, 0, l.peerConnectionCount)
var poppedList = make([]*peerScore, 0, l.peerConnectionCount)

canChoosePeer := func(hostPort string) bool {
if _, ok := prevSelected[hostPort]; ok {
Expand All @@ -191,29 +206,39 @@ func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool)
}

size := l.peerHeap.Len()

var connectionCount uint32
for i := 0; i < size; i++ {
popped := l.peerHeap.popPeer()
poppedList = append(poppedList, popped)

if canChoosePeer(popped.HostPort()) {
ps = popped
break
chosenPSList = append(chosenPSList, popped)
connectionCount++
if connectionCount >= l.peerConnectionCount {
break
}
}
psPopList = append(psPopList, popped)
}

for _, p := range psPopList {
heap.Push(l.peerHeap, p)
}

if ps == nil {
for _, p := range poppedList {
l.peerHeap.pushPeer(p)
}
if len(chosenPSList) == 0 {
return nil
}

l.peerHeap.pushPeer(ps)
ps := randomSampling(chosenPSList)
ps.chosenCount.Inc()
return ps.Peer
}

func randomSampling(psList []*peerScore) *peerScore {
r := peerRng.Intn(len(psList))
return psList[r]
}

// GetOrAdd returns a peer for the given hostPort, creating one if it doesn't yet exist.
func (l *PeerList) GetOrAdd(hostPort string) *Peer {
if ps, ok := l.exists(hostPort); ok {
Expand Down
92 changes: 92 additions & 0 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,98 @@ func TestPeerSelectionRanking(t *testing.T) {
}
}

func TestZeroPeerConnectionCount(t *testing.T) {
ch := testutils.NewClient(t, nil)
defer ch.Close()
err := ch.Peers().SetPeerConnectionCount(0)
require.Error(t, err, "peerConnectionCount should not accept 0")
}

func TestPeerRandomSampling(t *testing.T) {
const numIterations = 1000

testCases := []struct {
numPeers int
peerConnectionCount uint32
distMin float64
distMax float64
}{
// the higher `peerConnectionCount` is, the smoother the impact of uneven scores
// become as we are random sampling among `peerConnectionCount` peers
{numPeers: 10, peerConnectionCount: 1, distMin: 1000, distMax: 1000},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add peerConnectionCount: 2, i imagine this will be a pretty small value normally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed as the impact of it get's smaller with every extra connection. Added the case for 2!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed as the impact of it get's smaller with every extra connection. Added the case for 2!

{numPeers: 10, peerConnectionCount: 2, distMin: 470, distMax: 530},
{numPeers: 10, peerConnectionCount: 5, distMin: 160, distMax: 240},
{numPeers: 10, peerConnectionCount: 10, distMin: 50, distMax: 150},
{numPeers: 10, peerConnectionCount: 15, distMin: 50, distMax: 150},
}

for _, tc := range testCases {
// Selected is a map from rank -> [peer, count]
// It tracks how often a peer gets selected at a specific rank.
selected := make([]map[string]int, tc.numPeers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you make a slice here, why can't we just create a singel map in the loop for numIterations, and do the testDistribution call right there too? Makes 3 loops 1 loop, removes a slice, and simplifies the test a little.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I follow. This is a similar test as TestPeerSelectionRanking where we are checking the distribution of the ranking after numIterations which means we have to do the checking outside of the loop. Or am I missing something here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I follow. This is a similar test as TestPeerSelectionRanking where we are checking the distribution of the ranking after numIterations which means we have to do the checking outside of the loop. Or am I missing something here?

for i := 0; i < tc.numPeers; i++ {
selected[i] = make(map[string]int)
}

for i := 0; i < numIterations; i++ {
ch := testutils.NewClient(t, nil)
defer ch.Close()
ch.SetRandomSeed(int64(i * 100))
// Using a strategy that has uneven scores
strategy, _ := createScoreStrategy(0, 1)
ch.Peers().SetStrategy(strategy)
ch.Peers().SetPeerConnectionCount(tc.peerConnectionCount)

for i := 0; i < tc.numPeers; i++ {
hp := fmt.Sprintf("127.0.0.1:60%v", i)
ch.Peers().Add(hp)
}

for i := 0; i < tc.numPeers; i++ {
peer, err := ch.Peers().Get(nil)
require.NoError(t, err, "Peers.Get failed")
selected[i][peer.HostPort()]++
}
}

for _, m := range selected {
testDistribution(t, m, tc.distMin, tc.distMax)
}
}

}

func BenchmarkGetPeerWithPeerConnectionCount1(b *testing.B) {
doBenchmarkGetPeerWithPeerConnectionCount(b, 10, uint32(1))
}

func BenchmarkGetPeerWithPeerConnectionCount10(b *testing.B) {
doBenchmarkGetPeerWithPeerConnectionCount(b, 10, uint32(10))
}

func doBenchmarkGetPeerWithPeerConnectionCount(b *testing.B, numPeers int, peerConnectionCount uint32) {
ch := testutils.NewClient(b, nil)
defer ch.Close()
ch.SetRandomSeed(int64(100))
// Using a strategy that has uneven scores
strategy, _ := createScoreStrategy(0, 1)
ch.Peers().SetStrategy(strategy)
ch.Peers().SetPeerConnectionCount(peerConnectionCount)

for i := 0; i < numPeers; i++ {
hp := fmt.Sprintf("127.0.0.1:60%v", i)
ch.Peers().Add(hp)
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
peer, _ := ch.Peers().Get(nil)
if peer == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this ever happen? If not, maybe we should do a t.Fatal instead of a println

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, I just added it to guard against compiler optimization not to artificially lower the runtime of the benchmark. Changed it to Fatal!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, I just added it to guard against compiler optimization not to artificially lower the runtime of the benchmark. Changed it to Fatal!

b.Fatal("Just a dummy check to guard against compiler optimization")
}
}
}

func createScoreStrategy(initial, delta int64) (calc ScoreCalculator, retCount *atomic.Uint64) {
var (
count atomic.Uint64
Expand Down