diff --git a/.gitignore b/.gitignore index 0bfa83c..7aebfc0 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ bin/ # Go workspace file go.work + +temp/ +cmd/kperf/commands/virtualcluster \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7ce80ae --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.20 AS build-stage + +WORKDIR /gomod +COPY go.mod go.sum ./ +RUN go mod download + +RUN mkdir -p /output + +WORKDIR /kperf-build +RUN --mount=source=./,target=/kperf-build,rw make build && PREFIX=/output make install + +FROM gcr.io/distroless/static-debian12:nonroot AS release-stage + +WORKDIR / + +COPY --from=build-stage /output/bin/kperf /kperf + +USER nonroot:nonroot + +ENTRYPOINT ["/kperf"] diff --git a/Makefile b/Makefile index 8c0049b..b357424 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,15 @@ COMMANDS=kperf +# PREFIX is base path to install. +PREFIX ?= /usr/local + +GO_BUILDTAGS = -tags "osusergo netgo static_build" + +# IMAGE_REPO is default repo for image-build recipe. +IMAGE_REPO ?= localhost:5000 +IMAGE_TAG ?= latest +IMAGE_NAME = $(IMAGE_REPO)/kperf:$(IMAGE_TAG) + BINARIES=$(addprefix bin/,$(COMMANDS)) # default recipe is build @@ -9,11 +19,23 @@ BINARIES=$(addprefix bin/,$(COMMANDS)) ALWAYS: bin/%: cmd/% ALWAYS - @go build -o $@ ./$< + @GO_ENABLED=0 go build -o $@ ${GO_BUILDTAGS} ./$< build: $(BINARIES) ## build binaries @echo "$@" +install: ## install binaries + @install -d $(PREFIX)/bin + @install $(BINARIES) $(PREFIX)/bin + +image-build: ## build image + @echo building ${IMAGE_NAME} + @docker build . -t ${IMAGE_NAME} + +image-push: image-build ## push image + @echo pushing ${IMAGE_NAME} + @docker push ${IMAGE_NAME} + test: ## run test @go test -v ./... diff --git a/api/types/load_traffic.go b/api/types/load_traffic.go index a395c3a..4abf8e8 100644 --- a/api/types/load_traffic.go +++ b/api/types/load_traffic.go @@ -1,5 +1,7 @@ package types +import "fmt" + // LoadProfile defines how to create load traffic from one host to kube-apiserver. type LoadProfile struct { // Version defines the version of this object. @@ -23,12 +25,14 @@ type LoadProfileSpec struct { Requests []*WeightedRequest } -// KubeTypeMeta represents metadata of kubernetes object. -type KubeTypeMeta struct { - // Kind is a string value representing the REST resource the object represents. - Kind string `json:"kind" yaml:"kind"` - // APIVersion defines the versioned schema of the representation of an object. - APIVersion string `json:"apiVersion" yaml:"apiVersion"` +// KubeGroupVersionResource identifies the resource URI. +type KubeGroupVersionResource struct { + // Group is the name about a collection of related functionality. + Group string `json:"group" yaml:"group"` + // Version is a version of that group. + Version string `json:"version" yaml:"version"` + // Resource is a type in that versioned group APIs. + Resource string `json:"resource" yaml:"resource"` } // WeightedRequest represents request with weight. @@ -50,8 +54,8 @@ type WeightedRequest struct { // RequestGet defines GET request for target object. type RequestGet struct { - // KubeTypeMeta represents object's resource type. - KubeTypeMeta `yaml:",inline"` + // KubeGroupVersionResource identifies the resource URI. + KubeGroupVersionResource `yaml:",inline"` // Namespace is object's namespace. Namespace string `json:"namespace" yaml:"namespace"` // Name is object's name. @@ -60,8 +64,8 @@ type RequestGet struct { // RequestList defines LIST request for target objects. type RequestList struct { - // KubeTypeMeta represents object's resource type. - KubeTypeMeta `yaml:",inline"` + // KubeGroupVersionResource identifies the resource URI. + KubeGroupVersionResource `yaml:",inline"` // Namespace is object's namespace. Namespace string `json:"namespace" yaml:"namespace"` // Limit defines the page size. @@ -72,14 +76,14 @@ type RequestList struct { // RequestPut defines PUT request for target resource type. type RequestPut struct { - // KubeTypeMeta represents object's resource type. + // KubeGroupVersionResource identifies the resource URI. // // NOTE: Currently, it should be configmap or secrets because we can // generate random bytes as blob for it. However, for the pod resource, // we need to ensure a lot of things are ready, for instance, volumes, // resource capacity. It's not easy to generate it randomly. Maybe we // can introduce pod template in the future. - KubeTypeMeta `yaml:",inline"` + KubeGroupVersionResource `yaml:",inline"` // Namespace is object's namespace. Namespace string `json:"namespace" yaml:"namespace"` // Name is object's prefix name. @@ -89,3 +93,110 @@ type RequestPut struct { // ValueSize is the object's size in bytes. ValueSize int `json:"valueSize" yaml:"valueSize"` } + +// Validate verifies fields of LoadProfile. +func (lp LoadProfile) Validate() error { + if lp.Version != 1 { + return fmt.Errorf("version should be 1") + } + return lp.Spec.Validate() +} + +// Validate verifies fields of LoadProfileSpec. +func (spec LoadProfileSpec) Validate() error { + if spec.Conns <= 0 { + return fmt.Errorf("conns requires > 0: %v", spec.Conns) + } + + if spec.Rate < 0 { + return fmt.Errorf("rate requires >= 0: %v", spec.Rate) + } + + if spec.Total <= 0 { + return fmt.Errorf("total requires > 0: %v", spec.Total) + } + + for idx, req := range spec.Requests { + if err := req.Validate(); err != nil { + return fmt.Errorf("idx: %v request: %v", idx, err) + } + } + return nil +} + +// Validate verifies fields of WeightedRequest. +func (r WeightedRequest) Validate() error { + if r.Shares < 0 { + return fmt.Errorf("shares(%v) requires >= 0", r.Shares) + } + + switch { + case r.StaleList != nil: + return r.StaleList.Validate() + case r.QuorumList != nil: + return r.QuorumList.Validate() + case r.StaleGet != nil: + return r.StaleGet.Validate() + case r.QuorumGet != nil: + return r.QuorumGet.Validate() + case r.Put != nil: + return r.Put.Validate() + default: + return fmt.Errorf("empty request value") + } +} + +// RequestList validates RequestList type. +func (r *RequestList) Validate() error { + if err := r.KubeGroupVersionResource.Validate(); err != nil { + return fmt.Errorf("kube metadata: %v", err) + } + + if r.Limit < 0 { + return fmt.Errorf("limit must >= 0") + } + return nil +} + +// Validate validates RequestGet type. +func (r *RequestGet) Validate() error { + if err := r.KubeGroupVersionResource.Validate(); err != nil { + return fmt.Errorf("kube metadata: %v", err) + } + + if r.Name == "" { + return fmt.Errorf("name is required") + } + return nil +} + +// Validate validates RequestPut type. +func (r *RequestPut) Validate() error { + if err := r.KubeGroupVersionResource.Validate(); err != nil { + return fmt.Errorf("kube metadata: %v", err) + } + + // TODO: check resource type + if r.Name == "" { + return fmt.Errorf("name pattern is required") + } + if r.KeySpaceSize <= 0 { + return fmt.Errorf("keySpaceSize must > 0") + } + if r.ValueSize <= 0 { + return fmt.Errorf("valueSize must > 0") + } + return nil +} + +// Validate validates KubeGroupVersionResource. +func (m *KubeGroupVersionResource) Validate() error { + if m.Version == "" { + return fmt.Errorf("version is required") + } + + if m.Resource == "" { + return fmt.Errorf("resource is required") + } + return nil +} diff --git a/api/types/load_traffic_test.go b/api/types/load_traffic_test.go index 16ea503..05e9135 100644 --- a/api/types/load_traffic_test.go +++ b/api/types/load_traffic_test.go @@ -18,34 +18,39 @@ spec: conns: 2 requests: - staleGet: - kind: pods - apiVersion: v1 + group: core + version: v1 + resource: pods namespace: default name: x1 shares: 100 - quorumGet: - kind: configmap - apiVersion: v1 + group: core + version: v1 + resource: configmaps namespace: default name: x2 shares: 150 - staleList: - kind: pods - apiVersion: v1 + group: core + version: v1 + resource: pods namespace: default limit: 10000 seletor: app=x2 shares: 200 - quorumList: - kind: configmap - apiVersion: v1 + group: core + version: v1 + resource: configmaps namespace: default limit: 10000 seletor: app=x3 shares: 400 - put: - kind: configmap - apiVersion: v1 + group: core + version: v1 + resource: configmaps namespace: kperf name: kperf- keySpaceSize: 1000 @@ -64,8 +69,9 @@ spec: assert.Equal(t, 100, target.Spec.Requests[0].Shares) assert.NotNil(t, target.Spec.Requests[0].StaleGet) - assert.Equal(t, "pods", target.Spec.Requests[0].StaleGet.Kind) - assert.Equal(t, "v1", target.Spec.Requests[0].StaleGet.APIVersion) + assert.Equal(t, "pods", target.Spec.Requests[0].StaleGet.Resource) + assert.Equal(t, "v1", target.Spec.Requests[0].StaleGet.Version) + assert.Equal(t, "core", target.Spec.Requests[0].StaleGet.Group) assert.Equal(t, "default", target.Spec.Requests[0].StaleGet.Namespace) assert.Equal(t, "x1", target.Spec.Requests[0].StaleGet.Name) @@ -74,8 +80,9 @@ spec: assert.Equal(t, 200, target.Spec.Requests[2].Shares) assert.NotNil(t, target.Spec.Requests[2].StaleList) - assert.Equal(t, "pods", target.Spec.Requests[2].StaleList.Kind) - assert.Equal(t, "v1", target.Spec.Requests[2].StaleList.APIVersion) + assert.Equal(t, "pods", target.Spec.Requests[2].StaleList.Resource) + assert.Equal(t, "v1", target.Spec.Requests[2].StaleList.Version) + assert.Equal(t, "core", target.Spec.Requests[0].StaleGet.Group) assert.Equal(t, "default", target.Spec.Requests[2].StaleList.Namespace) assert.Equal(t, 10000, target.Spec.Requests[2].StaleList.Limit) assert.Equal(t, "app=x2", target.Spec.Requests[2].StaleList.Selector) @@ -85,10 +92,95 @@ spec: assert.Equal(t, 1000, target.Spec.Requests[4].Shares) assert.NotNil(t, target.Spec.Requests[4].Put) - assert.Equal(t, "configmap", target.Spec.Requests[4].Put.Kind) - assert.Equal(t, "v1", target.Spec.Requests[4].Put.APIVersion) + assert.Equal(t, "configmaps", target.Spec.Requests[4].Put.Resource) + assert.Equal(t, "v1", target.Spec.Requests[4].Put.Version) + assert.Equal(t, "core", target.Spec.Requests[0].StaleGet.Group) assert.Equal(t, "kperf", target.Spec.Requests[4].Put.Namespace) assert.Equal(t, "kperf-", target.Spec.Requests[4].Put.Name) assert.Equal(t, 1000, target.Spec.Requests[4].Put.KeySpaceSize) assert.Equal(t, 1024, target.Spec.Requests[4].Put.ValueSize) } + +func TestWeightedRequest(t *testing.T) { + for _, r := range []struct { + name string + req *WeightedRequest + hasErr bool + }{ + { + name: "shares < 0", + req: &WeightedRequest{Shares: -1}, + hasErr: true, + }, + { + name: "no request setting", + req: &WeightedRequest{Shares: 10}, + hasErr: true, + }, + { + name: "empty version", + req: &WeightedRequest{ + Shares: 10, + StaleGet: &RequestGet{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Resource: "pods", + }, + }, + }, + hasErr: true, + }, + { + name: "empty resource", + req: &WeightedRequest{ + Shares: 10, + StaleGet: &RequestGet{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Group: "core", + Version: "v1", + }, + }, + }, + hasErr: true, + }, + { + name: "wrong limit", + req: &WeightedRequest{ + Shares: 10, + StaleList: &RequestList{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Group: "core", + Version: "v1", + Resource: "pods", + }, + Limit: -1, + }, + }, + hasErr: true, + }, + { + name: "no error", + req: &WeightedRequest{ + Shares: 10, + StaleGet: &RequestGet{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Group: "core", + Version: "v1", + Resource: "pods", + }, + Namespace: "default", + Name: "testing", + }, + }, + }, + } { + r := r + t.Run(r.name, func(t *testing.T) { + err := r.req.Validate() + if r.hasErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/api/types/metric.go b/api/types/metric.go new file mode 100644 index 0000000..d6eb2c1 --- /dev/null +++ b/api/types/metric.go @@ -0,0 +1,21 @@ +package types + +import "time" + +// ResponseStats is the report about benchmark result. +type ResponseStats struct { + // Total represents total number of requests. + Total int + // Failures represents number of failure request. + Failures int + // Duration means the time of benchmark. + Duration time.Duration + // PercentileLatencies represents the latency distribution in seconds. + // + // NOTE: The key represents quantile. + PercentileLatencies map[float64]float64 + // TODO: + // 1. Support total read/upload bytes + // 2. Support failures partitioned by http code and verb + // 3. Support to dump all latency data +} diff --git a/cmd/kperf/commands/root.go b/cmd/kperf/commands/root.go index 6bac055..144f08d 100644 --- a/cmd/kperf/commands/root.go +++ b/cmd/kperf/commands/root.go @@ -3,6 +3,7 @@ package commands import ( "github.com/Azure/kperf/cmd/kperf/commands/multirunners" "github.com/Azure/kperf/cmd/kperf/commands/runner" + "github.com/Azure/kperf/cmd/kperf/commands/virtualcluster" "github.com/urfave/cli" ) @@ -15,6 +16,7 @@ func App() *cli.App { Commands: []cli.Command{ runner.Command, multirunners.Command, + virtualcluster.Command, }, } } diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 72ab406..00f741d 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -1,40 +1,145 @@ package runner import ( + "context" "fmt" + "os" + "sort" + + "github.com/Azure/kperf/api/types" + "github.com/Azure/kperf/request" "github.com/urfave/cli" + "gopkg.in/yaml.v2" ) -// Command represents runner sub-command. -// -// Subcommand runner is to create request load to apiserver. -// -// NOTE: It can work with subcommand multirunners. The multirunners subcommand -// will deploy subcommand runner in pod. Details in ../multirunners. -// -// Command line interface: -// -// kperf runner --help -// -// Options: -// -// --kubeconfig PATH (default: empty_string, use token if it's empty) -// --load-config PATH (default: empty_string, required, the config defined in api/types/load_traffic.go) -// --conns INT (default: 1, Total number of connections. It can override corresponding value defined by --load-config) -// --rate INT (default: 0, Maximum requests per second. It can override corresponding value defined by --load-config) -// --total INT (default: 1000, Total number of request. It can override corresponding value defined by --load-config) +// Command represents runner subcommand. var Command = cli.Command{ Name: "runner", - Usage: "run a load test to kube-apiserver", - Flags: []cli.Flag{}, + Usage: "Setup benchmark to kube-apiserver from one endpoint", + Subcommands: []cli.Command{ + runCommand, + }, +} + +var runCommand = cli.Command{ + Name: "run", + Usage: "run a benchmark test to kube-apiserver", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "kubeconfig", + Usage: "Path to the kubeconfig file", + }, + cli.IntFlag{ + Name: "client", + Usage: "Total number of HTTP clients", + Value: 1, + }, + cli.StringFlag{ + Name: "config", + Usage: "Path to the configuration file", + Required: true, + }, + cli.IntFlag{ + Name: "conns", + Usage: "Total number of connections. It can override corresponding value defined by --config", + Value: 1, + }, + cli.StringFlag{ + Name: "content-type", + Usage: "Content type (json or protobuf)", + Value: "json", + }, + cli.IntFlag{ + Name: "rate", + Usage: "Maximum requests per second (Zero means no limitation). It can override corresponding value defined by --config", + }, + cli.IntFlag{ + Name: "total", + Usage: "Total number of requests. It can override corresponding value defined by --config", + Value: 1000, + }, + cli.StringFlag{ + Name: "user-agent", + Usage: "User Agent", + }, + }, Action: func(cliCtx *cli.Context) error { - // 1. Parse options - // 2. Setup producer-consumer goroutines - // 2.1 Use go limter to generate request - // 2.2 Use client-go's client to file requests - // 3. Build progress tracker to track failure number and P99/P95/P90 latencies. - // 4. Export summary in stdout. - return fmt.Errorf("runner - not implemented") + profileCfg, err := loadConfig(cliCtx) + if err != nil { + return err + } + + // Get the content type from the command-line flag + contentType := cliCtx.String("content-type") + kubeCfgPath := cliCtx.String("kubeconfig") + userAgent := cliCtx.String("user-agent") + + conns := profileCfg.Spec.Conns + client := profileCfg.Spec.Conns + rate := profileCfg.Spec.Rate + restClis, err := request.NewClients(kubeCfgPath, conns, userAgent, rate, contentType) + if err != nil { + return err + } + stats, err := request.Schedule(context.TODO(), client, &profileCfg.Spec, restClis) + + if err != nil { + return err + } + printResponseStats(stats) + return nil }, } + +// loadConfig loads and validates the config. +func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { + var profileCfg types.LoadProfile + + cfgPath := cliCtx.String("config") + + cfgInRaw, err := os.ReadFile(cfgPath) + if err != nil { + return nil, fmt.Errorf("failed to read file %s: %w", cfgPath, err) + } + + if err := yaml.Unmarshal(cfgInRaw, &profileCfg); err != nil { + return nil, fmt.Errorf("failed to unmarshal %s from yaml format: %w", cfgPath, err) + } + + // override value by flags + if v := "rate"; cliCtx.IsSet(v) { + profileCfg.Spec.Rate = cliCtx.Int(v) + } + if v := "conns"; cliCtx.IsSet(v) || profileCfg.Spec.Conns == 0 { + profileCfg.Spec.Conns = cliCtx.Int(v) + } + if v := "total"; cliCtx.IsSet(v) || profileCfg.Spec.Total == 0 { + profileCfg.Spec.Total = cliCtx.Int(v) + } + + if err := profileCfg.Validate(); err != nil { + return nil, err + } + return &profileCfg, nil +} + +// printResponseStats prints ResponseStats into stdout. +func printResponseStats(stats *types.ResponseStats) { + fmt.Println("Response stat:") + fmt.Printf(" Total: %v\n", stats.Total) + fmt.Printf(" Failures: %v\n", stats.Failures) + fmt.Printf(" Duration: %v\n", stats.Duration) + fmt.Printf(" Requests/sec: %.2f\n", float64(stats.Total)/stats.Duration.Seconds()) + + fmt.Println(" Latency Distribution:") + keys := make([]float64, 0, len(stats.PercentileLatencies)) + for q := range stats.PercentileLatencies { + keys = append(keys, q) + } + sort.Float64s(keys) + + for _, q := range keys { + fmt.Printf(" [%.2f] %.3fs\n", q/100.0, stats.PercentileLatencies[q]) + } +} diff --git a/go.mod b/go.mod index 11cb7c0..d414d58 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,40 @@ go 1.20 require ( github.com/stretchr/testify v1.8.4 github.com/urfave/cli v1.22.14 + golang.org/x/time v0.3.0 gopkg.in/yaml.v2 v2.4.0 + k8s.io/apimachinery v0.28.4 + k8s.io/client-go v0.28.4 + k8s.io/kubectl v0.28.4 ) require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/gofuzz v1.2.0 // indirect + github.com/imdario/mergo v0.3.6 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/oauth2 v0.8.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.28.4 // indirect + k8s.io/klog/v2 v2.100.1 // indirect + k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect + sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index 2e3b51b..e21baab 100644 --- a/go.sum +++ b/go.sum @@ -4,23 +4,132 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= +github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= +github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= +github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= +golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.28.4 h1:8ZBrLjwosLl/NYgv1P7EQLqoO8MGQApnbgH8tu3BMzY= +k8s.io/api v0.28.4/go.mod h1:axWTGrY88s/5YE+JSt4uUi6NMM+gur1en2REMR7IRj0= +k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8= +k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= +k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY= +k8s.io/client-go v0.28.4/go.mod h1:0VDZFpgoZfelyP5Wqu0/r/TRYcLYuJ2U1KEeoaPa1N4= +k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= +k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= +k8s.io/kubectl v0.28.4 h1:gWpUXW/T7aFne+rchYeHkyB8eVDl5UZce8G4X//kjUQ= +k8s.io/kubectl v0.28.4/go.mod h1:CKOccVx3l+3MmDbkXtIUtibq93nN2hkDR99XDCn7c/c= +k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= +k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= +sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/metrics/request.go b/metrics/request.go new file mode 100644 index 0000000..374c929 --- /dev/null +++ b/metrics/request.go @@ -0,0 +1,81 @@ +package metrics + +import ( + "container/list" + "math" + "sort" + "sync" + "sync/atomic" +) + +// ResponseMetric is a measurement related to http response. +type ResponseMetric interface { + // ObserveLatency observes latency. + ObserveLatency(seconds float64) + // ObserveFailure observes failure response. + ObserveFailure() + // Gather returns the summary. + Gather() (latencies []float64, percentileLatencies map[float64]float64, failure int) +} + +type responseMetricImpl struct { + mu sync.Mutex + failureCount int64 + latencies *list.List +} + +func NewResponseMetric() ResponseMetric { + return &responseMetricImpl{ + latencies: list.New(), + } +} + +// ObserveLatency implements ResponseMetric. +func (m *responseMetricImpl) ObserveLatency(seconds float64) { + m.mu.Lock() + defer m.mu.Unlock() + m.latencies.PushBack(seconds) +} + +// ObserveFailure implements ResponseMetric. +func (m *responseMetricImpl) ObserveFailure() { + atomic.AddInt64(&m.failureCount, 1) +} + +// Gather implements ResponseMetric. +func (m *responseMetricImpl) Gather() ([]float64, map[float64]float64, int) { + latencies := m.dumpLatencies() + + return latencies, buildPercentileLatencies(latencies), int(atomic.LoadInt64(&m.failureCount)) +} + +func (m *responseMetricImpl) dumpLatencies() []float64 { + m.mu.Lock() + defer m.mu.Unlock() + res := make([]float64, 0, m.latencies.Len()) + for e := m.latencies.Front(); e != nil; e = e.Next() { + res = append(res, e.Value.(float64)) + } + return res +} + +var percentiles = []float64{0, 50, 90, 95, 99, 100} + +func buildPercentileLatencies(latencies []float64) map[float64]float64 { + if len(latencies) == 0 { + return nil + } + + res := make(map[float64]float64, len(percentiles)) + + n := len(latencies) + sort.Float64s(latencies) + for _, p := range percentiles { + idx := int(math.Ceil(float64(n) * p / 100)) + if idx > 0 { + idx-- + } + res[p] = latencies[idx] + } + return res +} diff --git a/metrics/request_test.go b/metrics/request_test.go new file mode 100644 index 0000000..2cd8e15 --- /dev/null +++ b/metrics/request_test.go @@ -0,0 +1,48 @@ +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBuildPercentileLatencies(t *testing.T) { + ls := make([]float64, 100) + ls[0] = 50 + ls[1] = 49 + ls[2] = 1 + res := buildPercentileLatencies(ls) + assert.Equal(t, float64(0), res[0]) + assert.Equal(t, float64(0), res[50]) + assert.Equal(t, float64(0), res[90]) + assert.Equal(t, float64(0), res[95]) + assert.Equal(t, float64(49), res[99]) + assert.Equal(t, float64(50), res[100]) + + ls = make([]float64, 1000) + ls[0] = 50 + ls[1] = 49 + ls[2] = -1 + res = buildPercentileLatencies(ls) + assert.Equal(t, float64(-1), res[0]) + assert.Equal(t, float64(0), res[50]) + assert.Equal(t, float64(0), res[90]) + assert.Equal(t, float64(0), res[95]) + assert.Equal(t, float64(0), res[99]) + assert.Equal(t, float64(50), res[100]) +} + +func TestResponseMetric(t *testing.T) { + c := NewResponseMetric() + for i := 100; i > 0; i-- { + c.ObserveLatency(float64(i)) + } + + _, res, _ := c.Gather() + assert.Equal(t, float64(1), res[0]) + assert.Equal(t, float64(50), res[50]) + assert.Equal(t, float64(90), res[90]) + assert.Equal(t, float64(95), res[95]) + assert.Equal(t, float64(99), res[99]) + assert.Equal(t, float64(100), res[100]) +} diff --git a/random.go b/random.go deleted file mode 100644 index b5807f9..0000000 --- a/random.go +++ /dev/null @@ -1,6 +0,0 @@ -package kperf - -// WeightedRandomPick returns index randomly based on weights. -func WeightedRandomPick(_ []int) (_index int) { - panic("not implemented") -} diff --git a/request/client.go b/request/client.go new file mode 100644 index 0000000..cab0618 --- /dev/null +++ b/request/client.go @@ -0,0 +1,61 @@ +package request + +import ( + "fmt" + "math" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubectl/pkg/scheme" +) + +// NewClients creates N rest.Interface. +// +// FIXME(weifu): +// +// 1. Is it possible to build one http2 client with multiple connections? +// 2. How to monitor HTTP2 GOAWAY frame? +// 3. Support Protobuf as accepted content +func NewClients(kubeCfgPath string, ConnsNum int, userAgent string, qps int, contentType string) ([]rest.Interface, error) { + restCfg, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) + + if err != nil { + return nil, err + } + + if qps == 0 { + qps = math.MaxInt32 + } + + restCfg.QPS = float32(qps) + restCfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + + restCfg.UserAgent = userAgent + + if restCfg.UserAgent == "" { + restCfg.UserAgent = rest.DefaultKubernetesUserAgent() + } + + // Set the content type + switch contentType { + case "json": + restCfg.ContentType = "application/json" + case "protobuf": + restCfg.ContentType = "application/vnd.kubernetes.protobuf" + default: + return nil, fmt.Errorf("invalid content type: %s", contentType) + } + + restClients := make([]rest.Interface, 0, ConnsNum) + for i := 0; i < ConnsNum; i++ { + cfgShallowCopy := *restCfg + + restCli, err := rest.UnversionedRESTClientFor(&cfgShallowCopy) + if err != nil { + fmt.Printf("Failed to create rest client: %v\n", err) + return nil, err + } + restClients = append(restClients, restCli) + } + return restClients, nil +} diff --git a/request/random.go b/request/random.go new file mode 100644 index 0000000..8bd061f --- /dev/null +++ b/request/random.go @@ -0,0 +1,219 @@ +package request + +import ( + "context" + "crypto/rand" + "fmt" + "math/big" + "sync" + + "github.com/Azure/kperf/api/types" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +// WeightedRandomRequests is used to generate requests based on LoadProfileSpec. +type WeightedRandomRequests struct { + once sync.Once + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + reqBuilderCh chan RESTRequestBuilder + + shares []int + reqBuilders []RESTRequestBuilder +} + +// NewWeightedRandomRequests creates new instance of WeightedRandomRequests. +func NewWeightedRandomRequests(spec *types.LoadProfileSpec) (*WeightedRandomRequests, error) { + if err := spec.Validate(); err != nil { + return nil, fmt.Errorf("invalid load profile spec: %v", err) + } + + shares := make([]int, 0, len(spec.Requests)) + reqBuilders := make([]RESTRequestBuilder, 0, len(spec.Requests)) + for _, r := range spec.Requests { + shares = append(shares, r.Shares) + + var builder RESTRequestBuilder + switch { + case r.StaleList != nil: + builder = newRequestListBuilder(r.StaleList, "0") + case r.QuorumList != nil: + builder = newRequestListBuilder(r.QuorumList, "") + case r.StaleGet != nil: + builder = newRequestGetBuilder(r.StaleGet, "0") + case r.QuorumGet != nil: + builder = newRequestGetBuilder(r.QuorumGet, "") + default: + return nil, fmt.Errorf("not implement for PUT yet") + } + reqBuilders = append(reqBuilders, builder) + } + + ctx, cancel := context.WithCancel(context.Background()) + return &WeightedRandomRequests{ + ctx: ctx, + cancel: cancel, + reqBuilderCh: make(chan RESTRequestBuilder), + shares: shares, + reqBuilders: reqBuilders, + }, nil +} + +// Run starts to random pick request. +func (r *WeightedRandomRequests) Run(ctx context.Context, total int) { + defer r.wg.Done() + r.wg.Add(1) + + sum := 0 + for sum < total { + builder := r.randomPick() + select { + case r.reqBuilderCh <- builder: + sum++ + case <-r.ctx.Done(): + return + case <-ctx.Done(): + return + } + } +} + +// Chan returns channel to get random request. +func (r *WeightedRandomRequests) Chan() chan RESTRequestBuilder { + return r.reqBuilderCh +} + +func (r *WeightedRandomRequests) randomPick() RESTRequestBuilder { + sum := 0 + for _, s := range r.shares { + sum += s + } + + rndInt, err := rand.Int(rand.Reader, big.NewInt(int64(sum))) + if err != nil { + panic(err) + } + + rnd := rndInt.Int64() + for i := range r.shares { + s := int64(r.shares[i]) + if rnd < s { + return r.reqBuilders[i] + } + rnd -= s + } + panic("unreachable") +} + +// Stop stops request generator. +func (r *WeightedRandomRequests) Stop() { + r.once.Do(func() { + r.cancel() + r.wg.Wait() + close(r.reqBuilderCh) + }) +} + +// RESTRequestBuilder is used to build rest.Request. +type RESTRequestBuilder interface { + Build(cli rest.Interface) (method string, _ *rest.Request) +} + +type requestGetBuilder struct { + version schema.GroupVersion + resource string + namespace string + name string + resourceVersion string +} + +func newRequestGetBuilder(src *types.RequestGet, resourceVersion string) *requestGetBuilder { + return &requestGetBuilder{ + version: schema.GroupVersion{ + Group: src.Group, + Version: src.Version, + }, + resource: src.Resource, + namespace: src.Namespace, + name: src.Name, + resourceVersion: resourceVersion, + } +} + +// Build implements RequestBuilder.Build. +func (b *requestGetBuilder) Build(cli rest.Interface) (string, *rest.Request) { + // https://kubernetes.io/docs/reference/using-api/#api-groups + apiPath := "apis" + if b.version.Group == "" { + apiPath = "api" + } + + comps := make([]string, 2, 5) + comps[0], comps[1] = apiPath, b.version.Version + if b.namespace != "" { + comps = append(comps, "namespaces", b.namespace) + } + comps = append(comps, b.resource, b.name) + + return "GET", cli.Get().AbsPath(comps...). + SpecificallyVersionedParams( + &metav1.GetOptions{ResourceVersion: b.resourceVersion}, + scheme.ParameterCodec, + schema.GroupVersion{Version: "v1"}, + ) +} + +type requestListBuilder struct { + version schema.GroupVersion + resource string + namespace string + limit int64 + labelSelector string + resourceVersion string +} + +func newRequestListBuilder(src *types.RequestList, resourceVersion string) *requestListBuilder { + return &requestListBuilder{ + version: schema.GroupVersion{ + Group: src.Group, + Version: src.Version, + }, + resource: src.Resource, + namespace: src.Namespace, + limit: int64(src.Limit), + labelSelector: src.Selector, + resourceVersion: resourceVersion, + } +} + +// Build implements RequestBuilder.Build. +func (b *requestListBuilder) Build(cli rest.Interface) (string, *rest.Request) { + // https://kubernetes.io/docs/reference/using-api/#api-groups + apiPath := "apis" + if b.version.Group == "" { + apiPath = "api" + } + + comps := make([]string, 2, 5) + comps[0], comps[1] = apiPath, b.version.Version + if b.namespace != "" { + comps = append(comps, "namespaces", b.namespace) + } + comps = append(comps, b.resource) + + return "LIST", cli.Get().AbsPath(comps...). + SpecificallyVersionedParams( + &metav1.ListOptions{ + LabelSelector: b.labelSelector, + ResourceVersion: b.resourceVersion, + Limit: b.limit, + }, + scheme.ParameterCodec, + schema.GroupVersion{Version: "v1"}, + ) +} diff --git a/request/schedule.go b/request/schedule.go new file mode 100644 index 0000000..c94e307 --- /dev/null +++ b/request/schedule.go @@ -0,0 +1,91 @@ +package request + +import ( + "context" + "io" + "math" + "sync" + "time" + + "github.com/Azure/kperf/api/types" + "github.com/Azure/kperf/metrics" + + "golang.org/x/time/rate" + "k8s.io/client-go/rest" +) + +const defaultTimeout = 60 * time.Second + +// Schedule files requests to apiserver based on LoadProfileSpec. +func Schedule(ctx context.Context, clientNum int, spec *types.LoadProfileSpec, restCli []rest.Interface) (*types.ResponseStats, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + rndReqs, err := NewWeightedRandomRequests(spec) + if err != nil { + return nil, err + } + + qps := spec.Rate + if qps == 0 { + qps = math.MaxInt32 + } + limiter := rate.NewLimiter(rate.Limit(qps), 10) + + reqBuilderCh := rndReqs.Chan() + var wg sync.WaitGroup + + respMetric := metrics.NewResponseMetric() + for i := 0; i < clientNum; i++ { + //reuse connection if client > conns + cli := restCli[i%len(restCli)] + wg.Add(1) + go func(cli rest.Interface) { + defer wg.Done() + + for builder := range reqBuilderCh { + _, req := builder.Build(cli) + + if err := limiter.Wait(ctx); err != nil { + cancel() + return + } + + req = req.Timeout(defaultTimeout) + func() { + start := time.Now() + defer func() { + respMetric.ObserveLatency(time.Since(start).Seconds()) + }() + + respBody, err := req.Stream(context.Background()) + if err == nil { + defer respBody.Close() + // NOTE: It's to reduce memory usage because + // we don't need that unmarshal object. + _, err = io.Copy(io.Discard, respBody) + } + if err != nil { + respMetric.ObserveFailure() + } + }() + } + }(cli) + } + + start := time.Now() + + rndReqs.Run(ctx, spec.Total) + rndReqs.Stop() + wg.Wait() + + totalDuration := time.Since(start) + + _, percentileLatencies, failures := respMetric.Gather() + return &types.ResponseStats{ + Total: spec.Total, + Failures: failures, + Duration: totalDuration, + PercentileLatencies: percentileLatencies, + }, nil +}