Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kamran-m committed Jun 30, 2017
1 parent 61b1142 commit f1d327f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 37 deletions.
19 changes: 11 additions & 8 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ var (
// ErrNoNewPeers indicates that no previously unselected peer is available.
ErrNoNewPeers = errors.New("no new peer available")

// ErrZeroPeerConnectionCount indicates that the peer connection count is set to zero.
ErrZeroPeerConnectionCount = errors.New("peer connection count must be greater than 0")

peerRng = trand.NewSeeded()
)

Expand Down Expand Up @@ -95,11 +98,15 @@ func (l *PeerList) SetStrategy(sc ScoreCalculator) {
// SetPeerConnectionCount sets the number of peer connections to be used in
// combination with the ScoreCalculator to achieve a random load balancing
// of a single client node to `peerConnectionCount` number of server nodes
func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) {
func (l *PeerList) SetPeerConnectionCount(peerConnectionCount uint32) error {
l.Lock()
defer l.Unlock()

if peerConnectionCount == 0 {
return ErrZeroPeerConnectionCount
}
l.peerConnectionCount = peerConnectionCount
return nil
}

// Siblings don't share peer lists (though they take care not to double-connect
Expand Down Expand Up @@ -186,8 +193,8 @@ func (l *PeerList) Remove(hostPort string) error {
return nil
}
func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool) *Peer {
var chosenPSList []*peerScore
var poppedList []*peerScore
var chosenPSList = make([]*peerScore, 0, l.peerConnectionCount)
var poppedList = make([]*peerScore, 0, l.peerConnectionCount)

canChoosePeer := func(hostPort string) bool {
if _, ok := prevSelected[hostPort]; ok {
Expand Down Expand Up @@ -226,16 +233,12 @@ func (l *PeerList) choosePeer(prevSelected map[string]struct{}, avoidHost bool)
}

ps := randomSampling(chosenPSList)
if ps == nil {
return nil
}
ps.chosenCount.Inc()
return ps.Peer
}

func randomSampling(psList []*peerScore) *peerScore {
peerRand := trand.NewSeeded()
r := peerRand.Intn(len(psList))
r := peerRng.Intn(len(psList))
return psList[r]
}

Expand Down
127 changes: 98 additions & 29 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,46 +697,115 @@ func TestPeerSelectionRanking(t *testing.T) {
}
}

func TestZeroPeerConnectionCount(t *testing.T) {
ch := testutils.NewClient(t, nil)
defer ch.Close()
err := ch.Peers().SetPeerConnectionCount(0)
require.Error(t, err, "peerConnectionCount should not accept 0")
}

func TestPeerRandomSampling(t *testing.T) {
const numPeers = 10
const numIterations = 1000
// Using `numPeers + 1` should just do a random load balancing among `numPeers`
// as we only have `numPeers` of server nodes
const peerConnectionCount = numPeers + 1

// Selected is a map from rank -> [peer, count]
// It tracks how often a peer gets selected at a specific rank.
selected := make([]map[string]int, numPeers)
for i := 0; i < numPeers; i++ {
selected[i] = make(map[string]int)
testCases := []struct {
numPeers int
peerConnectionCount uint32
distMin float64
distMax float64
}{
// the higher `peerConnectionCount` is, the smoother the impact of uneven scores
// become as we are random sampling among `peerConnectionCount` peers
{numPeers: 10, peerConnectionCount: 1, distMin: 1000, distMax: 1000},
{numPeers: 10, peerConnectionCount: 5, distMin: 160, distMax: 240},
{numPeers: 10, peerConnectionCount: 10, distMin: 50, distMax: 150},
{numPeers: 10, peerConnectionCount: 15, distMin: 50, distMax: 150},
}

for i := 0; i < numIterations; i++ {
ch := testutils.NewClient(t, nil)
defer ch.Close()
ch.SetRandomSeed(int64(i * 100))
// Using a strategy that has uneven scores
strategy, _ := createScoreStrategy(0, 1)
ch.Peers().SetStrategy(strategy)
// `peerConnectionCount > 1` load balances among the top candidates
// so with `peerConnectionCount` == `numPeers`, the score strategy
// shouldn't have any effect
ch.Peers().SetPeerConnectionCount(peerConnectionCount)
for _, tc := range testCases {
// Selected is a map from rank -> [peer, count]
// It tracks how often a peer gets selected at a specific rank.
selected := make([]map[string]int, tc.numPeers)
for i := 0; i < tc.numPeers; i++ {
selected[i] = make(map[string]int)
}

for i := 0; i < numPeers; i++ {
hp := fmt.Sprintf("127.0.0.1:60%v", i)
ch.Peers().Add(hp)
for i := 0; i < numIterations; i++ {
ch := testutils.NewClient(t, nil)
defer ch.Close()
ch.SetRandomSeed(int64(i * 100))
// Using a strategy that has uneven scores
strategy, _ := createScoreStrategy(0, 1)
ch.Peers().SetStrategy(strategy)
ch.Peers().SetPeerConnectionCount(tc.peerConnectionCount)

for i := 0; i < tc.numPeers; i++ {
hp := fmt.Sprintf("127.0.0.1:60%v", i)
ch.Peers().Add(hp)
}

for i := 0; i < tc.numPeers; i++ {
peer, err := ch.Peers().Get(nil)
require.NoError(t, err, "Peers.Get failed")
selected[i][peer.HostPort()]++
}
}

for i := 0; i < numPeers; i++ {
peer, err := ch.Peers().Get(nil)
require.NoError(t, err, "Peers.Get failed")
selected[i][peer.HostPort()]++
for _, m := range selected {
testDistribution(t, m, tc.distMin, tc.distMax)
}
}

for _, m := range selected {
testDistribution(t, m, 50, 150)
}

func BenchmarkGetPeerWithPeerConnectionCount1(b *testing.B) {
numPeers := 10
peerConnectionCount := uint32(1)

ch := testutils.NewClient(b, nil)
defer ch.Close()
ch.SetRandomSeed(int64(100))
// Using a strategy that has uneven scores
strategy, _ := createScoreStrategy(0, 1)
ch.Peers().SetStrategy(strategy)
ch.Peers().SetPeerConnectionCount(peerConnectionCount)

for i := 0; i < numPeers; i++ {
hp := fmt.Sprintf("127.0.0.1:60%v", i)
ch.Peers().Add(hp)
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
peer, _ := ch.Peers().Get(nil)
if peer == nil {
fmt.Println("Just a dummy check to guard against compiler optimization")
}
}
}

func BenchmarkGetPeerWithPeerConnectionCount10(b *testing.B) {
numPeers := 10
peerConnectionCount := uint32(10)

ch := testutils.NewClient(b, nil)
defer ch.Close()
ch.SetRandomSeed(int64(100))
// Using a strategy that has uneven scores
strategy, _ := createScoreStrategy(0, 1)
ch.Peers().SetStrategy(strategy)
ch.Peers().SetPeerConnectionCount(peerConnectionCount)

for i := 0; i < numPeers; i++ {
hp := fmt.Sprintf("127.0.0.1:60%v", i)
ch.Peers().Add(hp)
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
peer, _ := ch.Peers().Get(nil)
if peer == nil {
fmt.Println("Just a dummy check to guard against compiler optimization")
}
}
}

Expand Down

0 comments on commit f1d327f

Please sign in to comment.