Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync: enable rate limiting for servers #5151

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
57f8202
integrate peers tracker
dshulyak Oct 11, 2023
3f5b776
Merge branch 'develop' into sync/priotize-fast
dshulyak Oct 11, 2023
a102583
add coverage
dshulyak Oct 12, 2023
2ac58c8
add doc
dshulyak Oct 12, 2023
e0e3cfa
Merge branch 'develop' into sync/priotize-fast
dshulyak Oct 12, 2023
379aee2
add benchmark to select 10 peers from 10000
dshulyak Oct 12, 2023
138b8ec
review
dshulyak Oct 13, 2023
1c93431
store rate on struct
dshulyak Oct 13, 2023
0fbcb58
add changelog
dshulyak Oct 13, 2023
4cb8f33
Merge branch 'develop' into sync/priotize-fast
dshulyak Oct 13, 2023
b2701a2
first version
dshulyak Oct 13, 2023
b0c679d
import
dshulyak Oct 13, 2023
a7372b3
support configuration for queue and rps
dshulyak Oct 13, 2023
d605cfe
add a sanity test
dshulyak Oct 13, 2023
5c8a818
add metrics
dshulyak Oct 13, 2023
44910ee
debug and configure metrics
dshulyak Oct 13, 2023
f332bc5
Merge branch 'develop' into sync/rate-limit-server
dshulyak Oct 13, 2023
b814cde
set hard limit for goroutines
dshulyak Oct 16, 2023
3517793
set defaults
dshulyak Oct 16, 2023
679ee80
Merge branch 'develop' into sync/rate-limit-server
dshulyak Oct 17, 2023
3cf0214
linter
dshulyak Oct 17, 2023
bb0975f
go mod tidyy
dshulyak Oct 17, 2023
89852b7
await eg in test
dshulyak Oct 19, 2023
c7a4175
switch for metrics and remove shared ctx
dshulyak Oct 19, 2023
b371d85
rework client latency tracking
dshulyak Oct 20, 2023
e239cbd
Merge branch 'develop' into sync/rate-limit-server
dshulyak Oct 20, 2023
9a052a2
linnter
dshulyak Oct 20, 2023
9fa0feb
enable sync metrics
dshulyak Oct 21, 2023
e668c58
dont prune too earlyy
dshulyak Oct 21, 2023
d9ffcc7
wait for protocols correctly and compute rps
dshulyak Oct 21, 2023
b1942d2
Merge branch 'develop' into sync/rate-limit-server
dshulyak Oct 21, 2023
9622d85
adjust test
dshulyak Oct 21, 2023
e2b4fff
Revert "dont prune too earlyy"
dshulyak Oct 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 82 additions & 28 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,72 @@
return m
}

type ServerConfig struct {
Queue int `mapstructure:"queue"`
Requests int `mapstructure:"requests"`
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
}
if s.Requests != 0 && s.Interval != 0 {
opts = append(opts, server.WithRequestsPerInterval(s.Requests, s.Interval))
}
return opts
}

// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
BatchTimeout time.Duration
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
RequestTimeout time.Duration
MaxRetriesForRequest int
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
EnableServesMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
}

func (c Config) getServerConfig(protocol string) ServerConfig {
cfg, exists := c.ServersConfig[protocol]
if exists {
return cfg
}
return ServerConfig{
Queue: 10000,
Requests: 100,
Interval: time.Second,
}
}

// DefaultConfig is the default config for the fetch component.
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
BatchTimeout: 50 * time.Millisecond,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
RequestTimeout: 10 * time.Second,
MaxRetriesForRequest: 100,
PeersRateThreshold: 0.02,
ServersConfig: map[string]ServerConfig{
// serves 1 MB of data
atxProtocol: {Queue: 10, Requests: 1, Interval: time.Second},
// serves 1 KB of data
lyrDataProtocol: {Queue: 1000, Requests: 100, Interval: time.Second},
// serves atxs, ballots, active sets
// atx - 1 KB
// ballots > 300 bytes
// often queried after receiving gossip message
hashProtocol: {Queue: 2000, Requests: 200, Interval: time.Second},
// serves at most 100 hashes - 3KB
meshHashProtocol: {Queue: 1000, Requests: 100, Interval: time.Second},
// serves all malicious ids (id - 32 byte) - 10KB
malProtocol: {Queue: 100, Requests: 10, Interval: time.Second},
// 64 bytes
OpnProtocol: {Queue: 10000, Requests: 1000, Interval: time.Second},
},
PeersRateThreshold: 0.02,
}
}

Expand Down Expand Up @@ -220,34 +268,34 @@
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithLog(f.logger),
}
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
f.registerServer(host, atxProtocol, h.handleEpochInfoReq)
f.registerServer(host, lyrDataProtocol, h.handleLayerDataReq)
f.registerServer(host, hashProtocol, h.handleHashReq)
f.registerServer(host, meshHashProtocol, h.handleMeshHashReq)
f.registerServer(host, malProtocol, h.handleMaliciousIDsReq)
f.registerServer(host, OpnProtocol, h.handleLayerOpinionsReq2)
}
return f
}

func (f *Fetch) registerServer(
host *p2p.Host,
protocol string,
handler server.Handler,
) {
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithLog(f.logger),
}
if f.cfg.EnableServesMetrics {
opts = append(opts, server.WithMetrics())
}

Check warning on line 294 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L293-L294

Added lines #L293 - L294 were not covered by tests
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

type dataValidators struct {
atx SyncValidator
poet SyncValidator
Expand Down Expand Up @@ -295,6 +343,12 @@
f.loop()
return nil
})
for _, srv := range f.servers {
srv := srv
f.eg.Go(func() error {
return srv.Run(f.shutdownCtx)
})
}
})
return nil
}
Expand Down
12 changes: 11 additions & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -65,6 +66,9 @@ func createFetch(tb testing.TB) *testFetch {
mTxProposalH: mocks.NewMockSyncValidator(ctrl),
mPoetH: mocks.NewMockSyncValidator(ctrl),
}
for _, srv := range []*mocks.Mockrequester{tf.mMalS, tf.mAtxS, tf.mLyrS, tf.mHashS, tf.mMHashS, tf.mOpn2S} {
srv.EXPECT().Run(gomock.Any()).AnyTimes()
}
cfg := Config{
BatchTimeout: 2 * time.Second, // make sure we never hit the batch timeout
BatchSize: 3,
Expand Down Expand Up @@ -373,7 +377,13 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
}
return result, nil
}
server.New(badPeerHost, hashProtocol, badPeerHandler)
badsrv := server.New(badPeerHost, hashProtocol, badPeerHandler)
var eg errgroup.Group
eg.Go(func() error {
badsrv.Run(ctx)
return nil
})
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
defer eg.Wait()

fetcher := NewFetch(datastore.NewCachedDB(sql.InMemory(), lg), nil, nil, h,
WithContext(ctx),
Expand Down
1 change: 1 addition & 0 deletions fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go

type requester interface {
Run(context.Context) error
Request(context.Context, p2p.Peer, []byte, func([]byte), func(error)) error
}

Expand Down
38 changes: 38 additions & 0 deletions fetch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.4.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -206,7 +207,6 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
78 changes: 78 additions & 0 deletions p2p/server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package server

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/spacemeshos/go-spacemesh/metrics"
)

const (
namespace = "server"
protoLabel = "protocol"
)

var (
targetQueue = metrics.NewGauge(
"target_queue",
namespace,
"target size of the queue",
[]string{protoLabel},
)
queue = metrics.NewGauge(
"queue",
namespace,
"actual size of the queue",
[]string{protoLabel},
)
targetRps = metrics.NewGauge(
"rps",
namespace,
"target requests per second",
[]string{protoLabel},
)
requests = metrics.NewCounter(
"requests",
namespace,
"requests counter",
[]string{protoLabel, "state"},
)
clientLatency = metrics.NewHistogramWithBuckets(
"client_latency_seconds",
namespace,
"latency since initiating a request",
[]string{protoLabel, "result"},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
serverLatency = metrics.NewHistogramWithBuckets(
"server_latency_seconds",
namespace,
"latency since accepting new stream",
[]string{protoLabel},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
)

func newTracker(protocol string) *tracker {
return &tracker{
targetQueue: targetQueue.WithLabelValues(protocol),
queue: queue.WithLabelValues(protocol),
targetRps: targetRps.WithLabelValues(protocol),
completed: requests.WithLabelValues(protocol, "completed"),
accepted: requests.WithLabelValues(protocol, "accepted"),
dropped: requests.WithLabelValues(protocol, "dropped"),
serverLatency: serverLatency.WithLabelValues(protocol),
clientLatency: clientLatency.WithLabelValues(protocol, "success"),
clientLatencyFailure: clientLatency.WithLabelValues(protocol, "failure"),
}
}

type tracker struct {
targetQueue prometheus.Gauge
queue prometheus.Gauge
targetRps prometheus.Gauge
completed prometheus.Counter
accepted prometheus.Counter
dropped prometheus.Counter
serverLatency prometheus.Observer
clientLatency, clientLatencyFailure prometheus.Observer
}
Loading