Skip to content

Commit

Permalink
Try #5151:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Oct 21, 2023
2 parents b168b41 + e668c58 commit 2150285
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 113 deletions.
110 changes: 82 additions & 28 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,72 @@ func (b *batchInfo) toMap() map[types.Hash32]RequestMessage {
return m
}

type ServerConfig struct {
Queue int `mapstructure:"queue"`
Requests int `mapstructure:"requests"`
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
}
if s.Requests != 0 && s.Interval != 0 {
opts = append(opts, server.WithRequestsPerInterval(s.Requests, s.Interval))
}
return opts
}

// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
BatchTimeout time.Duration
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
RequestTimeout time.Duration
MaxRetriesForRequest int
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
EnableServesMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
}

func (c Config) getServerConfig(protocol string) ServerConfig {
cfg, exists := c.ServersConfig[protocol]
if exists {
return cfg
}
return ServerConfig{
Queue: 10000,
Requests: 100,
Interval: time.Second,
}
}

// DefaultConfig is the default config for the fetch component.
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
BatchTimeout: 50 * time.Millisecond,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
RequestTimeout: 10 * time.Second,
MaxRetriesForRequest: 100,
PeersRateThreshold: 0.02,
ServersConfig: map[string]ServerConfig{
// serves 1 MB of data
atxProtocol: {Queue: 10, Requests: 1, Interval: time.Second},
// serves 1 KB of data
lyrDataProtocol: {Queue: 1000, Requests: 100, Interval: time.Second},
// serves atxs, ballots, active sets
// atx - 1 KB
// ballots > 300 bytes
// often queried after receiving gossip message
hashProtocol: {Queue: 2000, Requests: 200, Interval: time.Second},
// serves at most 100 hashes - 3KB
meshHashProtocol: {Queue: 1000, Requests: 100, Interval: time.Second},
// serves all malicious ids (id - 32 byte) - 10KB
malProtocol: {Queue: 100, Requests: 10, Interval: time.Second},
// 64 bytes
OpnProtocol: {Queue: 10000, Requests: 1000, Interval: time.Second},
},
PeersRateThreshold: 0.02,
}
}

Expand Down Expand Up @@ -220,34 +268,34 @@ func NewFetch(
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithLog(f.logger),
}
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
f.registerServer(host, atxProtocol, h.handleEpochInfoReq)
f.registerServer(host, lyrDataProtocol, h.handleLayerDataReq)
f.registerServer(host, hashProtocol, h.handleHashReq)
f.registerServer(host, meshHashProtocol, h.handleMeshHashReq)
f.registerServer(host, malProtocol, h.handleMaliciousIDsReq)
f.registerServer(host, OpnProtocol, h.handleLayerOpinionsReq2)
}
return f
}

func (f *Fetch) registerServer(
host *p2p.Host,
protocol string,
handler server.Handler,
) {
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithLog(f.logger),
}
if f.cfg.EnableServesMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

type dataValidators struct {
atx SyncValidator
poet SyncValidator
Expand Down Expand Up @@ -295,6 +343,12 @@ func (f *Fetch) Start() error {
f.loop()
return nil
})
for _, srv := range f.servers {
srv := srv
f.eg.Go(func() error {
return srv.Run(f.shutdownCtx)
})
}
})
return nil
}
Expand Down
12 changes: 11 additions & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -65,6 +66,9 @@ func createFetch(tb testing.TB) *testFetch {
mTxProposalH: mocks.NewMockSyncValidator(ctrl),
mPoetH: mocks.NewMockSyncValidator(ctrl),
}
for _, srv := range []*mocks.Mockrequester{tf.mMalS, tf.mAtxS, tf.mLyrS, tf.mHashS, tf.mMHashS, tf.mOpn2S} {
srv.EXPECT().Run(gomock.Any()).AnyTimes()
}
cfg := Config{
BatchTimeout: 2 * time.Second, // make sure we never hit the batch timeout
BatchSize: 3,
Expand Down Expand Up @@ -373,7 +377,13 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
}
return result, nil
}
server.New(badPeerHost, hashProtocol, badPeerHandler)
badsrv := server.New(badPeerHost, hashProtocol, badPeerHandler)
var eg errgroup.Group
eg.Go(func() error {
badsrv.Run(ctx)
return nil
})
defer eg.Wait()

fetcher := NewFetch(datastore.NewCachedDB(sql.InMemory(), lg), nil, nil, h,
WithContext(ctx),
Expand Down
1 change: 1 addition & 0 deletions fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go

type requester interface {
Run(context.Context) error
Request(context.Context, p2p.Peer, []byte, func([]byte), func(error)) error
}

Expand Down
38 changes: 38 additions & 0 deletions fetch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.4.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down Expand Up @@ -206,7 +207,6 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
78 changes: 78 additions & 0 deletions p2p/server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package server

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/spacemeshos/go-spacemesh/metrics"
)

const (
namespace = "server"
protoLabel = "protocol"
)

var (
targetQueue = metrics.NewGauge(
"target_queue",
namespace,
"target size of the queue",
[]string{protoLabel},
)
queue = metrics.NewGauge(
"queue",
namespace,
"actual size of the queue",
[]string{protoLabel},
)
targetRps = metrics.NewGauge(
"rps",
namespace,
"target requests per second",
[]string{protoLabel},
)
requests = metrics.NewCounter(
"requests",
namespace,
"requests counter",
[]string{protoLabel, "state"},
)
clientLatency = metrics.NewHistogramWithBuckets(
"client_latency_seconds",
namespace,
"latency since initiating a request",
[]string{protoLabel, "result"},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
serverLatency = metrics.NewHistogramWithBuckets(
"server_latency_seconds",
namespace,
"latency since accepting new stream",
[]string{protoLabel},
prometheus.ExponentialBuckets(0.01, 2, 10),
)
)

func newTracker(protocol string) *tracker {
return &tracker{
targetQueue: targetQueue.WithLabelValues(protocol),
queue: queue.WithLabelValues(protocol),
targetRps: targetRps.WithLabelValues(protocol),
completed: requests.WithLabelValues(protocol, "completed"),
accepted: requests.WithLabelValues(protocol, "accepted"),
dropped: requests.WithLabelValues(protocol, "dropped"),
serverLatency: serverLatency.WithLabelValues(protocol),
clientLatency: clientLatency.WithLabelValues(protocol, "success"),
clientLatencyFailure: clientLatency.WithLabelValues(protocol, "failure"),
}
}

type tracker struct {
targetQueue prometheus.Gauge
queue prometheus.Gauge
targetRps prometheus.Gauge
completed prometheus.Counter
accepted prometheus.Counter
dropped prometheus.Counter
serverLatency prometheus.Observer
clientLatency, clientLatencyFailure prometheus.Observer
}
Loading

0 comments on commit 2150285

Please sign in to comment.