From 6588f792b6250346243f9f28c621cbde72f99432 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 25 Jan 2024 05:09:01 +0000 Subject: [PATCH] *: refactor request pkg * Update LoadProfileSpec with ContentType and DisableHTTP2 * Introduce option pattern to tweak client-go/rest' config * Add log to show setting in schedule.go * Add --disable-http2 flag to runner Signed-off-by: Wei Fu --- api/types/load_traffic.go | 33 +++++++++ cmd/kperf/commands/runner/runner.go | 38 ++++++---- request/client.go | 104 +++++++++++++++++++++++----- request/schedule.go | 24 +++++-- scripts/run_runner.sh | 2 +- 5 files changed, 163 insertions(+), 38 deletions(-) diff --git a/api/types/load_traffic.go b/api/types/load_traffic.go index f544d22..0d61661 100644 --- a/api/types/load_traffic.go +++ b/api/types/load_traffic.go @@ -2,6 +2,26 @@ package types import "fmt" +// ContentType represents the format of response. +type ContentType string + +const ( + // ContentTypeJSON means the format is json. + ContentTypeJSON ContentType = "json" + // ContentTypeProtobuffer means the format is protobuf. + ContentTypeProtobuffer = "protobuf" +) + +// Validate returns error if ContentType is not supported. +func (ct ContentType) Validate() error { + switch ct { + case ContentTypeJSON, ContentTypeProtobuffer: + return nil + default: + return fmt.Errorf("unsupported content type %s", ct) + } +} + // LoadProfile defines how to create load traffic from one host to kube-apiserver. type LoadProfile struct { // Version defines the version of this object. @@ -22,6 +42,10 @@ type LoadProfileSpec struct { Conns int `json:"conns" yaml:"conns"` // Client defines total number of HTTP clients. Client int `json:"client" yaml:"client"` + // ContentType defines response's content type. + ContentType ContentType `json:"contentType" yaml:"contentType"` + // DisableHTTP2 means client will use HTTP/1.1 protocol if it's true. + DisableHTTP2 bool `json:"disableHTTP2" yaml:"disableHTTP2"` // Requests defines the different kinds of requests with weights. // The executor should randomly pick by weight. Requests []*WeightedRequest @@ -118,6 +142,15 @@ func (spec LoadProfileSpec) Validate() error { return fmt.Errorf("total requires > 0: %v", spec.Total) } + if spec.Client <= 0 { + return fmt.Errorf("client requires > 0: %v", spec.Client) + } + + err := spec.ContentType.Validate() + if err != nil { + return err + } + for idx, req := range spec.Requests { if err := req.Validate(); err != nil { return fmt.Errorf("idx: %v request: %v", idx, err) diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 0968a68..74dae83 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -50,8 +50,8 @@ var runCommand = cli.Command{ }, cli.StringFlag{ Name: "content-type", - Usage: "Content type (json or protobuf)", - Value: "json", + Usage: fmt.Sprintf("Content type (%v or %v)", types.ContentTypeJSON, types.ContentTypeProtobuffer), + Value: string(types.ContentTypeJSON), }, cli.Float64Flag{ Name: "rate", @@ -66,6 +66,10 @@ var runCommand = cli.Command{ Name: "user-agent", Usage: "User Agent", }, + cli.BoolFlag{ + Name: "disable-http2", + Usage: "Disable HTTP2 protocol", + }, cli.StringFlag{ Name: "result", Usage: "Path to the file which stores results", @@ -76,31 +80,32 @@ var runCommand = cli.Command{ }, }, Action: func(cliCtx *cli.Context) error { + kubeCfgPath := cliCtx.String("kubeconfig") + profileCfg, err := loadConfig(cliCtx) if err != nil { return err } - // Get the content type from the command-line flag - contentType := cliCtx.String("content-type") - kubeCfgPath := cliCtx.String("kubeconfig") - userAgent := cliCtx.String("user-agent") - outputFilePath := cliCtx.String("result") - rawDataFlagIncluded := cliCtx.Bool("raw-data") - - conns := profileCfg.Spec.Conns - rate := profileCfg.Spec.Rate - restClis, err := request.NewClients(kubeCfgPath, conns, userAgent, rate, contentType) + clientNum := profileCfg.Spec.Conns + restClis, err := request.NewClients(kubeCfgPath, + clientNum, + request.WithClientUserAgentOpt(cliCtx.String("user-agent")), + request.WithClientQPSOpt(profileCfg.Spec.Rate), + request.WithClientContentTypeOpt(profileCfg.Spec.ContentType), + request.WithClientDisableHTTP2Opt(profileCfg.Spec.DisableHTTP2), + ) if err != nil { return err } - stats, err := request.Schedule(context.TODO(), &profileCfg.Spec, restClis) + stats, err := request.Schedule(context.TODO(), &profileCfg.Spec, restClis) if err != nil { return err } var f *os.File = os.Stdout + outputFilePath := cliCtx.String("result") if outputFilePath != "" { outputFileDir := filepath.Dir(outputFilePath) @@ -119,6 +124,7 @@ var runCommand = cli.Command{ defer f.Close() } + rawDataFlagIncluded := cliCtx.Bool("raw-data") err = printResponseStats(f, rawDataFlagIncluded, stats) if err != nil { return fmt.Errorf("error while printing response stats: %w", err) @@ -156,6 +162,12 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { if v := "total"; cliCtx.IsSet(v) || profileCfg.Spec.Total == 0 { profileCfg.Spec.Total = cliCtx.Int(v) } + if v := "content-type"; cliCtx.IsSet(v) || profileCfg.Spec.ContentType == "" { + profileCfg.Spec.ContentType = types.ContentType(cliCtx.String(v)) + } + if v := "disable-http2"; cliCtx.IsSet(v) { + profileCfg.Spec.DisableHTTP2 = cliCtx.Bool(v) + } if err := profileCfg.Validate(); err != nil { return nil, err diff --git a/request/client.go b/request/client.go index d8438c3..261c07e 100644 --- a/request/client.go +++ b/request/client.go @@ -4,6 +4,8 @@ import ( "fmt" "math" + "github.com/Azure/kperf/api/types" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/kubectl/pkg/scheme" @@ -15,42 +17,106 @@ import ( // // 1. Is it possible to build one http2 client with multiple connections? // 2. How to monitor HTTP2 GOAWAY frame? -func NewClients(kubeCfgPath string, ConnsNum int, userAgent string, qps float64, contentType string) ([]rest.Interface, error) { +func NewClients(kubeCfgPath string, connsNum int, opts ...ClientCfgOpt) ([]rest.Interface, error) { + var cfg = defaultClientCfg + for _, opt := range opts { + opt(&cfg) + } + restCfg, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) if err != nil { return nil, err } + restCfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion() - if qps == 0 { - qps = float64(math.MaxInt32) + err = cfg.apply(restCfg) + if err != nil { + return nil, err } - restCfg.QPS = float32(qps) - restCfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion() - restCfg.UserAgent = userAgent + restClients := make([]rest.Interface, 0, connsNum) + for i := 0; i < connsNum; i++ { + cfgShallowCopy := *restCfg + + restCli, err := rest.UnversionedRESTClientFor(&cfgShallowCopy) + if err != nil { + return nil, err + } + restClients = append(restClients, restCli) + } + return restClients, nil +} + +// defaultClientCfg is default setting for http client. +var defaultClientCfg = clientCfg{ + qps: float64(math.MaxInt32), + contentType: types.ContentTypeJSON, +} + +type clientCfg struct { + userAgent string + qps float64 + contentType types.ContentType + disableHTTP2 bool +} + +// apply sets value to k8s.io/client-go/rest.Config. +func (cfg *clientCfg) apply(restCfg *rest.Config) error { + // set qps + restCfg.QPS = float32(cfg.qps) + + // set user agent + restCfg.UserAgent = cfg.userAgent if restCfg.UserAgent == "" { restCfg.UserAgent = rest.DefaultKubernetesUserAgent() } - // Set the content type - switch contentType { - case "json": + // set the content type + switch cfg.contentType { + case types.ContentTypeJSON: restCfg.ContentType = "application/json" - case "protobuf": + case types.ContentTypeProtobuffer: restCfg.ContentType = "application/vnd.kubernetes.protobuf" default: - return nil, fmt.Errorf("invalid content type: %s", contentType) + return fmt.Errorf("invalid content type: %s", cfg.contentType) } - restClients := make([]rest.Interface, 0, ConnsNum) - for i := 0; i < ConnsNum; i++ { - cfgShallowCopy := *restCfg + // disable HTTP2 + if cfg.disableHTTP2 { + restCfg.NextProtos = []string{"http/1.1"} + } + return nil +} - restCli, err := rest.UnversionedRESTClientFor(&cfgShallowCopy) - if err != nil { - return nil, err +// ClientCfgOpt is used to update default client setting. +type ClientCfgOpt func(*clientCfg) + +// WithClientQPSOpt updates QPS value. +func WithClientQPSOpt(qps float64) ClientCfgOpt { + return func(cfg *clientCfg) { + if qps > 0 { + cfg.qps = qps } - restClients = append(restClients, restCli) } - return restClients, nil +} + +// WithClientUserAgentOpt updates user agent. +func WithClientUserAgentOpt(ua string) ClientCfgOpt { + return func(cfg *clientCfg) { + cfg.userAgent = ua + } +} + +// WithClientContentTypeOpt updates content type of response. +func WithClientContentTypeOpt(ct types.ContentType) ClientCfgOpt { + return func(cfg *clientCfg) { + cfg.contentType = ct + } +} + +// WithClientDisableHTTP2Opt disables HTTP2 protocol. +func WithClientDisableHTTP2Opt(b bool) ClientCfgOpt { + return func(cfg *clientCfg) { + cfg.disableHTTP2 = b + } } diff --git a/request/schedule.go b/request/schedule.go index cda2d61..c2621e2 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -42,11 +42,16 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I } limiter := rate.NewLimiter(rate.Limit(qps), 1) + clients := spec.Client + if clients == 0 { + clients = spec.Conns + } + reqBuilderCh := rndReqs.Chan() var wg sync.WaitGroup respMetric := metrics.NewResponseMetric() - for i := 0; i < spec.Client; i++ { + for i := 0; i < clients; i++ { // reuse connection if clients > conns cli := restCli[i%len(restCli)] wg.Add(1) @@ -56,14 +61,14 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I for builder := range reqBuilderCh { _, req := builder.Build(cli) - klog.V(9).Infof("Request URL: %s", req.URL()) - if err := limiter.Wait(ctx); err != nil { - klog.V(9).Infof("Rate limiter wait failed: %v", err) + klog.V(5).Infof("Rate limiter wait failed: %v", err) cancel() return } + klog.V(5).Infof("Request URL: %s", req.URL()) + req = req.Timeout(defaultTimeout) func() { start := time.Now() @@ -81,13 +86,22 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I if err != nil { respMetric.ObserveFailure(err) - klog.V(9).Infof("Request stream failed: %v", err) + klog.V(5).Infof("Request stream failed: %v", err) } }() } }(cli) } + klog.V(2).InfoS("Setting", + "clients", clients, + "connections", len(restCli), + "rate", qps, + "total", spec.Total, + "http2", !spec.DisableHTTP2, + "content-type", spec.ContentType, + ) + start := time.Now() rndReqs.Run(ctx, spec.Total) diff --git a/scripts/run_runner.sh b/scripts/run_runner.sh index 050707d..930fe04 100755 --- a/scripts/run_runner.sh +++ b/scripts/run_runner.sh @@ -4,7 +4,7 @@ set -euo pipefail result_file=/data/${POD_NAMESPACE}-${POD_NAME}-${POD_UID}.json -/kperf runner run --config=/config/load_profile.yaml \ +/kperf -v=2 runner run --config=/config/load_profile.yaml \ --user-agent=${POD_NAME} \ --result=${result_file} \ --raw-data