From 02fd16059b49ea7fb0847428bffddf7db8a07c22 Mon Sep 17 00:00:00 2001 From: Kamran Massoudi Date: Fri, 16 Jun 2017 10:52:33 -0700 Subject: [PATCH 1/3] Add option to random load balance to >1 server nodes Adding possibility to set `peerConnectionCount` so that a single client node can do a random load balancing among `peerConnectionCount` top nodes using the score calculator and peer heap. --- peer.go | 63 ++++++++++++++++++++++++++++++++++++---------------- peer_test.go | 43 +++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 19 deletions(-) diff --git a/peer.go b/peer.go index 47ef67356..6e330fcc4 100644 --- a/peer.go +++ b/peer.go @@ -21,7 +21,6 @@ package tchannel import ( - "container/heap" "errors" "strings" "sync" @@ -63,19 +62,21 @@ type Connectable interface { type PeerList struct { sync.RWMutex - parent *RootPeerList - peersByHostPort map[string]*peerScore - peerHeap *peerHeap - scoreCalculator ScoreCalculator - lastSelected uint64 + parent *RootPeerList + peersByHostPort map[string]*peerScore + peerHeap *peerHeap + scoreCalculator ScoreCalculator + peerConnectionCount uint32 + lastSelected uint64 } func newPeerList(root *RootPeerList) *PeerList { return &PeerList{ - parent: root, - peersByHostPort: make(map[string]*peerScore), - scoreCalculator: newPreferIncomingCalculator(), - peerHeap: newPeerHeap(), + parent: root, + peersByHostPort: make(map[string]*peerScore), + scoreCalculator: newPreferIncomingCalculator(), + peerHeap: newPeerHeap(), + peerConnectionCount: 1, } } @@ -91,6 +92,16 @@ func (l *PeerList) SetStrategy(sc ScoreCalculator) { } } +// SetPeerConnectionCount sets the number of peer connections to be used in +// combination with the ScoreCalculator to achieve a random load balancing +// of a single client node to `peerConnectionCount` number of server nodes +func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) { + l.Lock() + defer l.Unlock() + + l.peerConnectionCount = peerConnectionCount +} + // Siblings don't share peer lists (though they take care not to double-connect // to the same hosts). func (l *PeerList) newSibling() *PeerList { @@ -175,8 +186,8 @@ func (l *PeerList) Remove(hostPort string) error { return nil } func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) *Peer { - var psPopList []*peerScore - var ps *peerScore + var chosenPSList []*peerScore + var poppedList []*peerScore canChoosePeer := func(hostPort string) bool { if _, ok := prevSelected[hostPort]; ok { @@ -191,29 +202,43 @@ func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) } size := l.peerHeap.Len() + + var connectionCount uint32 for i := 0; i < size; i++ { popped := l.peerHeap.popPeer() + poppedList = append(poppedList, popped) if canChoosePeer(popped.HostPort()) { - ps = popped - break + chosenPSList = append(chosenPSList, popped) + connectionCount++ + if connectionCount >= l.peerConnectionCount { + break + } } - psPopList = append(psPopList, popped) + } - for _, p := range psPopList { - heap.Push(l.peerHeap, p) + for _, p := range poppedList { + l.peerHeap.pushPeer(p) + } + if len(chosenPSList) == 0 { + return nil } + ps := randomSampling(chosenPSList) if ps == nil { return nil } - - l.peerHeap.pushPeer(ps) ps.chosenCount.Inc() return ps.Peer } +func randomSampling(psList []*peerScore) *peerScore { + peerRand := trand.NewSeeded() + r := peerRand.Intn(len(psList)) + return psList[r] +} + // GetOrAdd returns a peer for the given hostPort, creating one if it doesn't yet exist. func (l *PeerList) GetOrAdd(hostPort string) *Peer { if ps, ok := l.exists(hostPort); ok { diff --git a/peer_test.go b/peer_test.go index b465c8685..c4471fc5e 100644 --- a/peer_test.go +++ b/peer_test.go @@ -697,6 +697,49 @@ func TestPeerSelectionRanking(t *testing.T) { } } +func TestPeerRandomSampling(t *testing.T) { + const numPeers = 10 + const numIterations = 1000 + // Using `numPeers + 1` should just do a random load balancing among `numPeers` + // as we only have `numPeers` of server nodes + const peerConnectionCount = numPeers + 1 + + // Selected is a map from rank -> [peer, count] + // It tracks how often a peer gets selected at a specific rank. + selected := make([]map[string]int, numPeers) + for i := 0; i < numPeers; i++ { + selected[i] = make(map[string]int) + } + + for i := 0; i < numIterations; i++ { + ch := testutils.NewClient(t, nil) + defer ch.Close() + ch.SetRandomSeed(int64(i * 100)) + // Using a strategy that has uneven scores + strategy, _ := createScoreStrategy(0, 1) + ch.Peers().SetStrategy(strategy) + // `peerConnectionCount > 1` load balances among the top candidates + // so with `peerConnectionCount` == `numPeers`, the score strategy + // shouldn't have any effect + ch.Peers().SetPeerConnectionCount(peerConnectionCount) + + for i := 0; i < numPeers; i++ { + hp := fmt.Sprintf("127.0.0.1:60%v", i) + ch.Peers().Add(hp) + } + + for i := 0; i < numPeers; i++ { + peer, err := ch.Peers().Get(nil) + require.NoError(t, err, "Peers.Get failed") + selected[i][peer.HostPort()]++ + } + } + + for _, m := range selected { + testDistribution(t, m, 50, 150) + } +} + func createScoreStrategy(initial, delta int64) (calc ScoreCalculator, retCount *atomic.Uint64) { var ( count atomic.Uint64 From 072d525bbe3e2b770fd27cd078044daafe4db4d7 Mon Sep 17 00:00:00 2001 From: Kamran Massoudi Date: Thu, 29 Jun 2017 20:04:19 -0700 Subject: [PATCH 2/3] Address comments --- peer.go | 19 ++++---- peer_test.go | 127 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 109 insertions(+), 37 deletions(-) diff --git a/peer.go b/peer.go index 6e330fcc4..a41c2b9fd 100644 --- a/peer.go +++ b/peer.go @@ -47,6 +47,9 @@ var ( // ErrNoNewPeers indicates that no previously unselected peer is available. ErrNoNewPeers = errors.New("no new peer available") + // ErrZeroPeerConnectionCount indicates that the peer connection count is set to zero. + ErrZeroPeerConnectionCount = errors.New("peer connection count must be greater than 0") + peerRng = trand.NewSeeded() ) @@ -95,11 +98,15 @@ func (l *PeerList) SetStrategy(sc ScoreCalculator) { // SetPeerConnectionCount sets the number of peer connections to be used in // combination with the ScoreCalculator to achieve a random load balancing // of a single client node to `peerConnectionCount` number of server nodes -func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) { +func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) error { l.Lock() defer l.Unlock() + if peerConnectionCount == 0 { + return ErrZeroPeerConnectionCount + } l.peerConnectionCount = peerConnectionCount + return nil } // Siblings don't share peer lists (though they take care not to double-connect @@ -186,8 +193,8 @@ func (l *PeerList) Remove(hostPort string) error { return nil } func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) *Peer { - var chosenPSList []*peerScore - var poppedList []*peerScore + var chosenPSList = make([]*peerScore, 0, l.peerConnectionCount) + var poppedList = make([]*peerScore, 0, l.peerConnectionCount) canChoosePeer := func(hostPort string) bool { if _, ok := prevSelected[hostPort]; ok { @@ -226,16 +233,12 @@ func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) } ps := randomSampling(chosenPSList) - if ps == nil { - return nil - } ps.chosenCount.Inc() return ps.Peer } func randomSampling(psList []*peerScore) *peerScore { - peerRand := trand.NewSeeded() - r := peerRand.Intn(len(psList)) + r := peerRng.Intn(len(psList)) return psList[r] } diff --git a/peer_test.go b/peer_test.go index c4471fc5e..addb9b015 100644 --- a/peer_test.go +++ b/peer_test.go @@ -697,46 +697,115 @@ func TestPeerSelectionRanking(t *testing.T) { } } +func TestZeroPeerConnectionCount(t *testing.T) { + ch := testutils.NewClient(t, nil) + defer ch.Close() + err := ch.Peers().SetPeerConnectionCount(0) + require.Error(t, err, "peerConnectionCount should not accept 0") +} + func TestPeerRandomSampling(t *testing.T) { - const numPeers = 10 const numIterations = 1000 - // Using `numPeers + 1` should just do a random load balancing among `numPeers` - // as we only have `numPeers` of server nodes - const peerConnectionCount = numPeers + 1 - // Selected is a map from rank -> [peer, count] - // It tracks how often a peer gets selected at a specific rank. - selected := make([]map[string]int, numPeers) - for i := 0; i < numPeers; i++ { - selected[i] = make(map[string]int) + testCases := []struct { + numPeers int + peerConnectionCount uint32 + distMin float64 + distMax float64 + }{ + // the higher `peerConnectionCount` is, the smoother the impact of uneven scores + // become as we are random sampling among `peerConnectionCount` peers + {numPeers: 10, peerConnectionCount: 1, distMin: 1000, distMax: 1000}, + {numPeers: 10, peerConnectionCount: 5, distMin: 160, distMax: 240}, + {numPeers: 10, peerConnectionCount: 10, distMin: 50, distMax: 150}, + {numPeers: 10, peerConnectionCount: 15, distMin: 50, distMax: 150}, } - for i := 0; i < numIterations; i++ { - ch := testutils.NewClient(t, nil) - defer ch.Close() - ch.SetRandomSeed(int64(i * 100)) - // Using a strategy that has uneven scores - strategy, _ := createScoreStrategy(0, 1) - ch.Peers().SetStrategy(strategy) - // `peerConnectionCount > 1` load balances among the top candidates - // so with `peerConnectionCount` == `numPeers`, the score strategy - // shouldn't have any effect - ch.Peers().SetPeerConnectionCount(peerConnectionCount) + for _, tc := range testCases { + // Selected is a map from rank -> [peer, count] + // It tracks how often a peer gets selected at a specific rank. + selected := make([]map[string]int, tc.numPeers) + for i := 0; i < tc.numPeers; i++ { + selected[i] = make(map[string]int) + } - for i := 0; i < numPeers; i++ { - hp := fmt.Sprintf("127.0.0.1:60%v", i) - ch.Peers().Add(hp) + for i := 0; i < numIterations; i++ { + ch := testutils.NewClient(t, nil) + defer ch.Close() + ch.SetRandomSeed(int64(i * 100)) + // Using a strategy that has uneven scores + strategy, _ := createScoreStrategy(0, 1) + ch.Peers().SetStrategy(strategy) + ch.Peers().SetPeerConnectionCount(tc.peerConnectionCount) + + for i := 0; i < tc.numPeers; i++ { + hp := fmt.Sprintf("127.0.0.1:60%v", i) + ch.Peers().Add(hp) + } + + for i := 0; i < tc.numPeers; i++ { + peer, err := ch.Peers().Get(nil) + require.NoError(t, err, "Peers.Get failed") + selected[i][peer.HostPort()]++ + } } - for i := 0; i < numPeers; i++ { - peer, err := ch.Peers().Get(nil) - require.NoError(t, err, "Peers.Get failed") - selected[i][peer.HostPort()]++ + for _, m := range selected { + testDistribution(t, m, tc.distMin, tc.distMax) } } - for _, m := range selected { - testDistribution(t, m, 50, 150) +} + +func BenchmarkGetPeerWithPeerConnectionCount1(b *testing.B) { + numPeers := 10 + peerConnectionCount := uint32(1) + + ch := testutils.NewClient(b, nil) + defer ch.Close() + ch.SetRandomSeed(int64(100)) + // Using a strategy that has uneven scores + strategy, _ := createScoreStrategy(0, 1) + ch.Peers().SetStrategy(strategy) + ch.Peers().SetPeerConnectionCount(peerConnectionCount) + + for i := 0; i < numPeers; i++ { + hp := fmt.Sprintf("127.0.0.1:60%v", i) + ch.Peers().Add(hp) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + peer, _ := ch.Peers().Get(nil) + if peer == nil { + fmt.Println("Just a dummy check to guard against compiler optimization") + } + } +} + +func BenchmarkGetPeerWithPeerConnectionCount10(b *testing.B) { + numPeers := 10 + peerConnectionCount := uint32(10) + + ch := testutils.NewClient(b, nil) + defer ch.Close() + ch.SetRandomSeed(int64(100)) + // Using a strategy that has uneven scores + strategy, _ := createScoreStrategy(0, 1) + ch.Peers().SetStrategy(strategy) + ch.Peers().SetPeerConnectionCount(peerConnectionCount) + + for i := 0; i < numPeers; i++ { + hp := fmt.Sprintf("127.0.0.1:60%v", i) + ch.Peers().Add(hp) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + peer, _ := ch.Peers().Get(nil) + if peer == nil { + fmt.Println("Just a dummy check to guard against compiler optimization") + } } } From f8f21396d99e6a3d536df7e4c4bf882c2ae80789 Mon Sep 17 00:00:00 2001 From: Kamran Massoudi Date: Mon, 7 Aug 2017 23:40:59 -0700 Subject: [PATCH 3/3] Address comments --- peer.go | 5 +---- peer_test.go | 32 ++++++-------------------------- 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/peer.go b/peer.go index a41c2b9fd..43ca61fb9 100644 --- a/peer.go +++ b/peer.go @@ -47,9 +47,6 @@ var ( // ErrNoNewPeers indicates that no previously unselected peer is available. ErrNoNewPeers = errors.New("no new peer available") - // ErrZeroPeerConnectionCount indicates that the peer connection count is set to zero. - ErrZeroPeerConnectionCount = errors.New("peer connection count must be greater than 0") - peerRng = trand.NewSeeded() ) @@ -103,7 +100,7 @@ func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) error { defer l.Unlock() if peerConnectionCount == 0 { - return ErrZeroPeerConnectionCount + return errors.New("peer connection count must be greater than 0") } l.peerConnectionCount = peerConnectionCount return nil diff --git a/peer_test.go b/peer_test.go index addb9b015..aa2761f16 100644 --- a/peer_test.go +++ b/peer_test.go @@ -716,6 +716,7 @@ func TestPeerRandomSampling(t *testing.T) { // the higher `peerConnectionCount` is, the smoother the impact of uneven scores // become as we are random sampling among `peerConnectionCount` peers {numPeers: 10, peerConnectionCount: 1, distMin: 1000, distMax: 1000}, + {numPeers: 10, peerConnectionCount: 2, distMin: 470, distMax: 530}, {numPeers: 10, peerConnectionCount: 5, distMin: 160, distMax: 240}, {numPeers: 10, peerConnectionCount: 10, distMin: 50, distMax: 150}, {numPeers: 10, peerConnectionCount: 15, distMin: 50, distMax: 150}, @@ -758,35 +759,14 @@ func TestPeerRandomSampling(t *testing.T) { } func BenchmarkGetPeerWithPeerConnectionCount1(b *testing.B) { - numPeers := 10 - peerConnectionCount := uint32(1) - - ch := testutils.NewClient(b, nil) - defer ch.Close() - ch.SetRandomSeed(int64(100)) - // Using a strategy that has uneven scores - strategy, _ := createScoreStrategy(0, 1) - ch.Peers().SetStrategy(strategy) - ch.Peers().SetPeerConnectionCount(peerConnectionCount) - - for i := 0; i < numPeers; i++ { - hp := fmt.Sprintf("127.0.0.1:60%v", i) - ch.Peers().Add(hp) - } - b.ResetTimer() - - for i := 0; i < b.N; i++ { - peer, _ := ch.Peers().Get(nil) - if peer == nil { - fmt.Println("Just a dummy check to guard against compiler optimization") - } - } + doBenchmarkGetPeerWithPeerConnectionCount(b, 10, uint32(1)) } func BenchmarkGetPeerWithPeerConnectionCount10(b *testing.B) { - numPeers := 10 - peerConnectionCount := uint32(10) + doBenchmarkGetPeerWithPeerConnectionCount(b, 10, uint32(10)) +} +func doBenchmarkGetPeerWithPeerConnectionCount(b *testing.B, numPeers int, peerConnectionCount uint32) { ch := testutils.NewClient(b, nil) defer ch.Close() ch.SetRandomSeed(int64(100)) @@ -804,7 +784,7 @@ func BenchmarkGetPeerWithPeerConnectionCount10(b *testing.B) { for i := 0; i < b.N; i++ { peer, _ := ch.Peers().Get(nil) if peer == nil { - fmt.Println("Just a dummy check to guard against compiler optimization") + b.Fatal("Just a dummy check to guard against compiler optimization") } } }