Skip to content

Commit

Permalink
*: refactor request pkg
Browse files Browse the repository at this point in the history
* Update LoadProfileSpec with ContentType and DisableHTTP2
* Introduce option pattern to tweak client-go/rest' config
* Add log to show setting in schedule.go
* Add --disable-http2 flag to runner

Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Jan 25, 2024
1 parent 46d3c09 commit 6588f79
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 38 deletions.
33 changes: 33 additions & 0 deletions api/types/load_traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@ package types

import "fmt"

// ContentType represents the format of response.
type ContentType string

const (
// ContentTypeJSON means the format is json.
ContentTypeJSON ContentType = "json"
// ContentTypeProtobuffer means the format is protobuf.
ContentTypeProtobuffer = "protobuf"
)

// Validate returns error if ContentType is not supported.
func (ct ContentType) Validate() error {
switch ct {
case ContentTypeJSON, ContentTypeProtobuffer:
return nil
default:
return fmt.Errorf("unsupported content type %s", ct)
}
}

// LoadProfile defines how to create load traffic from one host to kube-apiserver.
type LoadProfile struct {
// Version defines the version of this object.
Expand All @@ -22,6 +42,10 @@ type LoadProfileSpec struct {
Conns int `json:"conns" yaml:"conns"`
// Client defines total number of HTTP clients.
Client int `json:"client" yaml:"client"`
// ContentType defines response's content type.
ContentType ContentType `json:"contentType" yaml:"contentType"`
// DisableHTTP2 means client will use HTTP/1.1 protocol if it's true.
DisableHTTP2 bool `json:"disableHTTP2" yaml:"disableHTTP2"`
// Requests defines the different kinds of requests with weights.
// The executor should randomly pick by weight.
Requests []*WeightedRequest
Expand Down Expand Up @@ -118,6 +142,15 @@ func (spec LoadProfileSpec) Validate() error {
return fmt.Errorf("total requires > 0: %v", spec.Total)
}

if spec.Client <= 0 {
return fmt.Errorf("client requires > 0: %v", spec.Client)
}

err := spec.ContentType.Validate()
if err != nil {
return err
}

for idx, req := range spec.Requests {
if err := req.Validate(); err != nil {
return fmt.Errorf("idx: %v request: %v", idx, err)
Expand Down
38 changes: 25 additions & 13 deletions cmd/kperf/commands/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var runCommand = cli.Command{
},
cli.StringFlag{
Name: "content-type",
Usage: "Content type (json or protobuf)",
Value: "json",
Usage: fmt.Sprintf("Content type (%v or %v)", types.ContentTypeJSON, types.ContentTypeProtobuffer),
Value: string(types.ContentTypeJSON),
},
cli.Float64Flag{
Name: "rate",
Expand All @@ -66,6 +66,10 @@ var runCommand = cli.Command{
Name: "user-agent",
Usage: "User Agent",
},
cli.BoolFlag{
Name: "disable-http2",
Usage: "Disable HTTP2 protocol",
},
cli.StringFlag{
Name: "result",
Usage: "Path to the file which stores results",
Expand All @@ -76,31 +80,32 @@ var runCommand = cli.Command{
},
},
Action: func(cliCtx *cli.Context) error {
kubeCfgPath := cliCtx.String("kubeconfig")

profileCfg, err := loadConfig(cliCtx)
if err != nil {
return err
}

// Get the content type from the command-line flag
contentType := cliCtx.String("content-type")
kubeCfgPath := cliCtx.String("kubeconfig")
userAgent := cliCtx.String("user-agent")
outputFilePath := cliCtx.String("result")
rawDataFlagIncluded := cliCtx.Bool("raw-data")

conns := profileCfg.Spec.Conns
rate := profileCfg.Spec.Rate
restClis, err := request.NewClients(kubeCfgPath, conns, userAgent, rate, contentType)
clientNum := profileCfg.Spec.Conns
restClis, err := request.NewClients(kubeCfgPath,
clientNum,
request.WithClientUserAgentOpt(cliCtx.String("user-agent")),
request.WithClientQPSOpt(profileCfg.Spec.Rate),
request.WithClientContentTypeOpt(profileCfg.Spec.ContentType),
request.WithClientDisableHTTP2Opt(profileCfg.Spec.DisableHTTP2),
)
if err != nil {
return err
}
stats, err := request.Schedule(context.TODO(), &profileCfg.Spec, restClis)

stats, err := request.Schedule(context.TODO(), &profileCfg.Spec, restClis)
if err != nil {
return err
}

var f *os.File = os.Stdout
outputFilePath := cliCtx.String("result")
if outputFilePath != "" {
outputFileDir := filepath.Dir(outputFilePath)

Expand All @@ -119,6 +124,7 @@ var runCommand = cli.Command{
defer f.Close()
}

rawDataFlagIncluded := cliCtx.Bool("raw-data")
err = printResponseStats(f, rawDataFlagIncluded, stats)
if err != nil {
return fmt.Errorf("error while printing response stats: %w", err)
Expand Down Expand Up @@ -156,6 +162,12 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) {
if v := "total"; cliCtx.IsSet(v) || profileCfg.Spec.Total == 0 {
profileCfg.Spec.Total = cliCtx.Int(v)
}
if v := "content-type"; cliCtx.IsSet(v) || profileCfg.Spec.ContentType == "" {
profileCfg.Spec.ContentType = types.ContentType(cliCtx.String(v))
}
if v := "disable-http2"; cliCtx.IsSet(v) {
profileCfg.Spec.DisableHTTP2 = cliCtx.Bool(v)
}

if err := profileCfg.Validate(); err != nil {
return nil, err
Expand Down
104 changes: 85 additions & 19 deletions request/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"math"

"github.com/Azure/kperf/api/types"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubectl/pkg/scheme"
Expand All @@ -15,42 +17,106 @@ import (
//
// 1. Is it possible to build one http2 client with multiple connections?
// 2. How to monitor HTTP2 GOAWAY frame?
func NewClients(kubeCfgPath string, ConnsNum int, userAgent string, qps float64, contentType string) ([]rest.Interface, error) {
func NewClients(kubeCfgPath string, connsNum int, opts ...ClientCfgOpt) ([]rest.Interface, error) {
var cfg = defaultClientCfg
for _, opt := range opts {
opt(&cfg)
}

restCfg, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath)
if err != nil {
return nil, err
}
restCfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

if qps == 0 {
qps = float64(math.MaxInt32)
err = cfg.apply(restCfg)
if err != nil {
return nil, err
}
restCfg.QPS = float32(qps)
restCfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

restCfg.UserAgent = userAgent
restClients := make([]rest.Interface, 0, connsNum)
for i := 0; i < connsNum; i++ {
cfgShallowCopy := *restCfg

restCli, err := rest.UnversionedRESTClientFor(&cfgShallowCopy)
if err != nil {
return nil, err
}
restClients = append(restClients, restCli)
}
return restClients, nil
}

// defaultClientCfg is default setting for http client.
var defaultClientCfg = clientCfg{
qps: float64(math.MaxInt32),
contentType: types.ContentTypeJSON,
}

type clientCfg struct {
userAgent string
qps float64
contentType types.ContentType
disableHTTP2 bool
}

// apply sets value to k8s.io/client-go/rest.Config.
func (cfg *clientCfg) apply(restCfg *rest.Config) error {
// set qps
restCfg.QPS = float32(cfg.qps)

// set user agent
restCfg.UserAgent = cfg.userAgent
if restCfg.UserAgent == "" {
restCfg.UserAgent = rest.DefaultKubernetesUserAgent()
}

// Set the content type
switch contentType {
case "json":
// set the content type
switch cfg.contentType {
case types.ContentTypeJSON:
restCfg.ContentType = "application/json"
case "protobuf":
case types.ContentTypeProtobuffer:
restCfg.ContentType = "application/vnd.kubernetes.protobuf"
default:
return nil, fmt.Errorf("invalid content type: %s", contentType)
return fmt.Errorf("invalid content type: %s", cfg.contentType)
}

restClients := make([]rest.Interface, 0, ConnsNum)
for i := 0; i < ConnsNum; i++ {
cfgShallowCopy := *restCfg
// disable HTTP2
if cfg.disableHTTP2 {
restCfg.NextProtos = []string{"http/1.1"}
}
return nil
}

restCli, err := rest.UnversionedRESTClientFor(&cfgShallowCopy)
if err != nil {
return nil, err
// ClientCfgOpt is used to update default client setting.
type ClientCfgOpt func(*clientCfg)

// WithClientQPSOpt updates QPS value.
func WithClientQPSOpt(qps float64) ClientCfgOpt {
return func(cfg *clientCfg) {
if qps > 0 {
cfg.qps = qps
}
restClients = append(restClients, restCli)
}
return restClients, nil
}

// WithClientUserAgentOpt updates user agent.
func WithClientUserAgentOpt(ua string) ClientCfgOpt {
return func(cfg *clientCfg) {
cfg.userAgent = ua
}
}

// WithClientContentTypeOpt updates content type of response.
func WithClientContentTypeOpt(ct types.ContentType) ClientCfgOpt {
return func(cfg *clientCfg) {
cfg.contentType = ct
}
}

// WithClientDisableHTTP2Opt disables HTTP2 protocol.
func WithClientDisableHTTP2Opt(b bool) ClientCfgOpt {
return func(cfg *clientCfg) {
cfg.disableHTTP2 = b
}
}
24 changes: 19 additions & 5 deletions request/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,16 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I
}
limiter := rate.NewLimiter(rate.Limit(qps), 1)

clients := spec.Client
if clients == 0 {
clients = spec.Conns
}

reqBuilderCh := rndReqs.Chan()
var wg sync.WaitGroup

respMetric := metrics.NewResponseMetric()
for i := 0; i < spec.Client; i++ {
for i := 0; i < clients; i++ {
// reuse connection if clients > conns
cli := restCli[i%len(restCli)]
wg.Add(1)
Expand All @@ -56,14 +61,14 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I
for builder := range reqBuilderCh {
_, req := builder.Build(cli)

klog.V(9).Infof("Request URL: %s", req.URL())

if err := limiter.Wait(ctx); err != nil {
klog.V(9).Infof("Rate limiter wait failed: %v", err)
klog.V(5).Infof("Rate limiter wait failed: %v", err)
cancel()
return
}

klog.V(5).Infof("Request URL: %s", req.URL())

req = req.Timeout(defaultTimeout)
func() {
start := time.Now()
Expand All @@ -81,13 +86,22 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I

if err != nil {
respMetric.ObserveFailure(err)
klog.V(9).Infof("Request stream failed: %v", err)
klog.V(5).Infof("Request stream failed: %v", err)
}
}()
}
}(cli)
}

klog.V(2).InfoS("Setting",
"clients", clients,
"connections", len(restCli),
"rate", qps,
"total", spec.Total,
"http2", !spec.DisableHTTP2,
"content-type", spec.ContentType,
)

start := time.Now()

rndReqs.Run(ctx, spec.Total)
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -euo pipefail

result_file=/data/${POD_NAMESPACE}-${POD_NAME}-${POD_UID}.json

/kperf runner run --config=/config/load_profile.yaml \
/kperf -v=2 runner run --config=/config/load_profile.yaml \
--user-agent=${POD_NAME} \
--result=${result_file} \
--raw-data
Expand Down

0 comments on commit 6588f79

Please sign in to comment.