Skip to content

Commit

Permalink
Merge pull request #76 from fuweid/rg-server-support-affinity
Browse files Browse the repository at this point in the history
*: add --affinity flag to rg run
  • Loading branch information
fuweid authored Jan 30, 2024
2 parents 4a07483 + 02f3113 commit 1f8cb74
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 5 deletions.
11 changes: 11 additions & 0 deletions cmd/kperf/commands/runnergroup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/cmd/kperf/commands/utils"
"github.com/Azure/kperf/runner"
runnergroup "github.com/Azure/kperf/runner/group"

Expand All @@ -31,13 +32,22 @@ var runCommand = cli.Command{
// Right now, we need to set image manually.
Required: true,
},
cli.StringSliceFlag{
Name: "affinity",
Usage: "Deploy server to the node with a specific labels (FORMAT: KEY=VALUE[,VALUE])",
},
},
Action: func(cliCtx *cli.Context) error {
imgRef := cliCtx.String("runner-image")
if len(imgRef) == 0 {
return fmt.Errorf("required valid runner image")
}

affinityLabels, err := utils.KeyValuesMap(cliCtx.StringSlice("affinity"))
if err != nil {
return fmt.Errorf("failed to parse affinity: %w", err)
}

specs, err := loadRunnerGroupSpec(cliCtx)
if err != nil {
return fmt.Errorf("failed to load runner group spec: %w", err)
Expand All @@ -51,6 +61,7 @@ var runCommand = cli.Command{
kubeCfgPath,
imgRef,
specs[0],
affinityLabels,
)
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package virtualcluster
package utils

import (
"fmt"
"strings"
)

// stringSliceToMap converts key=value[,value] into map[string][]string.
func stringSliceToMap(strs []string) (map[string][]string, error) {
// KeyValuesMap converts key=value[,value] into map[string][]string.
func KeyValuesMap(strs []string) (map[string][]string, error) {
res := make(map[string][]string, len(strs))
for _, str := range strs {
key, valuesInStr, ok := strings.Cut(str, "=")
Expand Down
3 changes: 2 additions & 1 deletion cmd/kperf/commands/virtualcluster/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/Azure/kperf/cmd/kperf/commands/utils"
"github.com/Azure/kperf/virtualcluster"

"github.com/urfave/cli"
Expand Down Expand Up @@ -62,7 +63,7 @@ var nodepoolAddCommand = cli.Command{

kubeCfgPath := cliCtx.String("kubeconfig")

affinityLabels, err := stringSliceToMap(cliCtx.StringSlice("affinity"))
affinityLabels, err := utils.KeyValuesMap(cliCtx.StringSlice("affinity"))
if err != nil {
return fmt.Errorf("failed to parse affinity: %w", err)
}
Expand Down
15 changes: 15 additions & 0 deletions manifests/runnergroup/server/templates/pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ metadata:
name: {{ .Values.name }}
namespace: {{ .Release.Namespace }}
spec:
{{- if .Values.nodeSelectors }}
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
{{- range $key, $values := .Values.nodeSelectors }}
- key: "{{ $key }}"
operator: In
values:
{{- range $values }}
- {{ . }}
{{- end }}
{{- end }}
{{- end }}
containers:
- name: server
command:
Expand Down
1 change: 1 addition & 0 deletions manifests/runnergroup/server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ name: ""
image: ""
# TODO(weifu): need https://github.com/Azure/kperf/issues/25 to support list
runnerGroupSpec: ""
nodeSelectors: {}
32 changes: 31 additions & 1 deletion runner/runnergroup_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@ import (
// TODO:
// 1. create a new package to define ErrNotFound, ErrAlreadyExists, ... errors.
// 2. support configurable timeout.
func CreateRunnerGroupServer(ctx context.Context, kubeconfigPath string, runnerImage string, rgSpec *types.RunnerGroupSpec) error {
func CreateRunnerGroupServer(ctx context.Context,
kubeconfigPath string,
runnerImage string,
rgSpec *types.RunnerGroupSpec,
nodeSelectors map[string][]string,
) error {
specInStr, err := tweakAndMarshalSpec(rgSpec)
if err != nil {
return err
}

nodeSelectorsInYAML, err := renderNodeSelectors(nodeSelectors)
if err != nil {
return err
}

nodeSelectorsAppiler, err := helmcli.YAMLValuesApplier(nodeSelectorsInYAML)
if err != nil {
return fmt.Errorf("failed to prepare YAML value applier for nodeSelectors: %w", err)
}

getCli, err := helmcli.NewGetCli(kubeconfigPath, runnerGroupReleaseNamespace)
if err != nil {
return fmt.Errorf("failed to create helm get client: %w", err)
Expand All @@ -49,6 +64,7 @@ func CreateRunnerGroupServer(ctx context.Context, kubeconfigPath string, runnerI
"image="+runnerImage,
"runnerGroupSpec="+specInStr,
),
nodeSelectorsAppiler,
)
if err != nil {
return fmt.Errorf("failed to create helm release client: %w", err)
Expand All @@ -71,3 +87,17 @@ func tweakAndMarshalSpec(spec *types.RunnerGroupSpec) (string, error) {
}
return string(data), nil
}

// renderNodeSelectors renders labels into YAML string.
func renderNodeSelectors(labels map[string][]string) (string, error) {
// NOTE: It should be aligned with ../manifests/runnergroup/server/values.yaml.
target := map[string]interface{}{
"nodeSelectors": labels,
}

rawData, err := yaml.Marshal(target)
if err != nil {
return "", fmt.Errorf("failed to render nodeSelectors: %w", err)
}
return string(rawData), nil
}

0 comments on commit 1f8cb74

Please sign in to comment.