Skip to content

Commit

Permalink
Merge pull request #77 from fuweid/add-runner-flowcontrol
Browse files Browse the repository at this point in the history
*: introduce --runner-flowcontrol to rg run
  • Loading branch information
fuweid authored Jan 30, 2024
2 parents 1f8cb74 + 3147c73 commit d5b7088
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 17 deletions.
31 changes: 30 additions & 1 deletion cmd/kperf/commands/runnergroup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package runnergroup
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/cmd/kperf/commands/utils"
Expand Down Expand Up @@ -32,6 +34,11 @@ var runCommand = cli.Command{
// Right now, we need to set image manually.
Required: true,
},
cli.StringFlag{
Name: "runner-flowcontrol",
Usage: "Apply flowcontrol to runner group. (FORMAT: PriorityLevel:MatchingPrecedence)",
Value: "workload-low:1000",
},
cli.StringSliceFlag{
Name: "affinity",
Usage: "Deploy server to the node with a specific labels (FORMAT: KEY=VALUE[,VALUE])",
Expand All @@ -48,6 +55,11 @@ var runCommand = cli.Command{
return fmt.Errorf("failed to parse affinity: %w", err)
}

priorityLevel, matchingPrecedence, err := parseFlowControl(cliCtx.String("runner-flowcontrol"))
if err != nil {
return fmt.Errorf("failed to parse runner-flowcontrol: %w", err)
}

specs, err := loadRunnerGroupSpec(cliCtx)
if err != nil {
return fmt.Errorf("failed to load runner group spec: %w", err)
Expand All @@ -61,7 +73,8 @@ var runCommand = cli.Command{
kubeCfgPath,
imgRef,
specs[0],
affinityLabels,
runner.WithRunCmdServerNodeSelectorsOpt(affinityLabels),
runner.WithRunCmdRunnerGroupFlowControl(priorityLevel, matchingPrecedence),
)
},
}
Expand All @@ -86,3 +99,19 @@ func loadRunnerGroupSpec(cliCtx *cli.Context) ([]*types.RunnerGroupSpec, error)
}
return specs, nil
}

// parseFlowControl parses PriorityLevel:MatchingPrecedence into string and int.
func parseFlowControl(value string) (priorityLevel string, matchingPrecedence int, err error) {
l, r, ok := strings.Cut(value, ":")
if !ok || len(l) == 0 || len(r) == 0 {
err = fmt.Errorf("expected PriorityLevel:MatchingPrecedence format, but got %s", value)
return
}

priorityLevel = l
matchingPrecedence, err = strconv.Atoi(r)
if err != nil {
err = fmt.Errorf("failed to parse matchingPrecedence into int: %w", err)
}
return
}
27 changes: 27 additions & 0 deletions manifests/runnergroup/server/templates/flowcontrol.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: flowcontrol.apiserver.k8s.io/v1beta3
kind: FlowSchema
metadata:
name: {{ .Values.name }}
namespace: {{ .Release.Namespace }}
spec:
distinguisherMethod:
type: ByUser
matchingPrecedence: {{ .Values.flowcontrol.matchingPrecedence }}
priorityLevelConfiguration:
name: {{ .Values.flowcontrol.priorityLevelConfiguration }}
rules:
- resourceRules:
- apiGroups:
- '*'
clusterScope: true
namespaces:
- '*'
resources:
- '*'
verbs:
- '*'
subjects:
- kind: ServiceAccount
serviceAccount:
name: {{ .Values.name }}
namespace: {{ .Release.Namespace }}
3 changes: 3 additions & 0 deletions manifests/runnergroup/server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ image: ""
# TODO(weifu): need https://github.com/Azure/kperf/issues/25 to support list
runnerGroupSpec: ""
nodeSelectors: {}
flowcontrol:
priorityLevelConfiguration: workload-low
matchingPrecedence: 1000
88 changes: 72 additions & 16 deletions runner/runnergroup_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,19 @@ import (
"github.com/Azure/kperf/helmcli"
"github.com/Azure/kperf/manifests"

"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)

var (
defaultRunCmdCfg = runCmdConfig{
runnerGroupFlowcontrol: struct {
priorityLevel string
matchingPrecedence int
}{
priorityLevel: "workload-low",
matchingPrecedence: 1000,
},
}
)

// CreateRunnerGroupServer creates a long running server to deploy runner groups.
Expand All @@ -21,21 +33,21 @@ func CreateRunnerGroupServer(ctx context.Context,
kubeconfigPath string,
runnerImage string,
rgSpec *types.RunnerGroupSpec,
nodeSelectors map[string][]string,
opts ...RunCmdOpt,
) error {
specInStr, err := tweakAndMarshalSpec(rgSpec)
if err != nil {
return err
}

nodeSelectorsInYAML, err := renderNodeSelectors(nodeSelectors)
if err != nil {
return err
cfg := defaultRunCmdCfg
for _, opt := range opts {
opt(&cfg)
}

nodeSelectorsAppiler, err := helmcli.YAMLValuesApplier(nodeSelectorsInYAML)
appiler, err := cfg.toServerHelmValuesAppiler()
if err != nil {
return fmt.Errorf("failed to prepare YAML value applier for nodeSelectors: %w", err)
return err
}

getCli, err := helmcli.NewGetCli(kubeconfigPath, runnerGroupReleaseNamespace)
Expand Down Expand Up @@ -64,7 +76,7 @@ func CreateRunnerGroupServer(ctx context.Context,
"image="+runnerImage,
"runnerGroupSpec="+specInStr,
),
nodeSelectorsAppiler,
appiler,
)
if err != nil {
return fmt.Errorf("failed to create helm release client: %w", err)
Expand All @@ -88,16 +100,60 @@ func tweakAndMarshalSpec(spec *types.RunnerGroupSpec) (string, error) {
return string(data), nil
}

// renderNodeSelectors renders labels into YAML string.
func renderNodeSelectors(labels map[string][]string) (string, error) {
// NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml.
target := map[string]interface{}{
"nodeSelectors": labels,
type runCmdConfig struct {
// serverNodeSelectors forces to schedule server to nodes with that specific labels.
serverNodeSelectors map[string][]string
// runnerGroupFlowcontrol applies flowcontrol settings to runners.
//
// NOTE: Please align with ../manifests/runnergroup/server/values.yaml
//
// FIXME(weifu): before v1.0.0, we should define type in ../manifests.
runnerGroupFlowcontrol struct {
priorityLevel string
matchingPrecedence int
}

// TODO(weifu): merge name/image/specs into this
}

// RunCmdOpt is used to update default run command's setting.
type RunCmdOpt func(*runCmdConfig)

// WithRunCmdServerNodeSelectorsOpt updates server's node selectors.
func WithRunCmdServerNodeSelectorsOpt(labels map[string][]string) RunCmdOpt {
return func(cfg *runCmdConfig) {
cfg.serverNodeSelectors = labels
}
}

// WithRunCmdRunnerGroupFlowControl updates runner groups' flowcontrol.
func WithRunCmdRunnerGroupFlowControl(priorityLevel string, matchingPrecedence int) RunCmdOpt {
return func(cfg *runCmdConfig) {
cfg.runnerGroupFlowcontrol.priorityLevel = priorityLevel
cfg.runnerGroupFlowcontrol.matchingPrecedence = matchingPrecedence
}
}

// toServerHelmValuesAppiler creates ValuesApplier.
//
// NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml.
func (cfg *runCmdConfig) toServerHelmValuesAppiler() (helmcli.ValuesApplier, error) {
values := map[string]interface{}{
"nodeSelectors": cfg.serverNodeSelectors,
"flowcontrol": map[string]interface{}{
"priorityLevelConfiguration": cfg.runnerGroupFlowcontrol.priorityLevel,
"matchingPrecedence": cfg.runnerGroupFlowcontrol.matchingPrecedence,
},
}

rawData, err := yaml.Marshal(values)
if err != nil {
return nil, fmt.Errorf("failed to render run command config into YAML: %w", err)
}

rawData, err := yaml.Marshal(target)
appiler, err := helmcli.YAMLValuesApplier(string(rawData))
if err != nil {
return "", fmt.Errorf("failed to render nodeSelectors: %w", err)
return nil, fmt.Errorf("failed to prepare value appiler for run command config: %w", err)
}
return string(rawData), nil
return appiler, nil
}

0 comments on commit d5b7088

Please sign in to comment.