Skip to content

Commit

Permalink
LeaderWorkerSet integration.
Browse files Browse the repository at this point in the history
Co-authored-by: vladikkuzn <[email protected]>
  • Loading branch information
mbobrovskyi and vladikkuzn committed Jan 2, 2025
1 parent a8d4958 commit 9e45939
Show file tree
Hide file tree
Showing 17 changed files with 1,171 additions and 354 deletions.
8 changes: 8 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ rules:
- list
- update
- watch
- apiGroups:
- leaderworkerset.x-k8s.io
resources:
- leaderworkersets
verbs:
- get
- list
- watch
- apiGroups:
- node.k8s.io
resources:
Expand Down
40 changes: 40 additions & 0 deletions charts/kueue/templates/webhook/webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,26 @@ webhooks:
resources:
- xgboostjobs
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: '{{ include "kueue.fullname" . }}-webhook-service'
namespace: '{{ .Release.Namespace }}'
path: /mutate-leaderworkerset-x-k8s-io-v1-leaderworkerset
failurePolicy: Fail
name: mleaderworkerset.kb.io
rules:
- apiGroups:
- leaderworkerset.x-k8s.io
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- leaderworkersets
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down Expand Up @@ -562,6 +582,26 @@ webhooks:
resources:
- xgboostjobs
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: '{{ include "kueue.fullname" . }}-webhook-service'
namespace: '{{ .Release.Namespace }}'
path: /validate-leaderworkerset-x-k8s-io-v1-leaderworkerset
failurePolicy: Fail
name: vleaderworkerset.kb.io
rules:
- apiGroups:
- leaderworkerset.x-k8s.io
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- leaderworkersets
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down
8 changes: 8 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ rules:
- list
- update
- watch
- apiGroups:
- leaderworkerset.x-k8s.io
resources:
- leaderworkersets
verbs:
- get
- list
- watch
- apiGroups:
- node.k8s.io
resources:
Expand Down
1 change: 1 addition & 0 deletions config/components/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ webhooks:
- v1
operations:
- CREATE
- UPDATE
resources:
- leaderworkersets
sideEffects: None
Expand Down
5 changes: 4 additions & 1 deletion hack/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ function prepare_docker_images {
if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then
docker pull "${KUBEFLOW_MPI_IMAGE}"
fi
if [[ -n ${LEADERWORKERSET_VERSION:-} ]]; then
docker pull "${LEADERWORKERSET_IMAGE}"
fi
}

# $1 cluster
Expand Down Expand Up @@ -142,7 +145,7 @@ function install_mpi {
}

#$1 - cluster name
function install_leaderworkerset {
function install_lws {
cluster_kind_load_image "${1}" "${LEADERWORKERSET_IMAGE/#v}"
kubectl config use-context "kind-${1}"
kubectl apply --server-side -f "${LEADERWORKERSET_MANIFEST}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
func init() {
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{
SetupIndexes: SetupIndexes,
NewReconciler: jobframework.NewNoopReconcilerFactory(gvk),
NewReconciler: NewPodReconciler,
SetupWebhook: SetupWebhook,
JobType: &leaderworkersetv1.LeaderWorkerSet{},
AddToScheme: leaderworkersetv1.AddToScheme,
Expand All @@ -54,11 +54,11 @@ func fromObject(o runtime.Object) *LeaderWorkerSet {
return (*LeaderWorkerSet)(o.(*leaderworkersetv1.LeaderWorkerSet))
}

func (d *LeaderWorkerSet) Object() client.Object {
return (*leaderworkersetv1.LeaderWorkerSet)(d)
func (lws *LeaderWorkerSet) Object() client.Object {
return (*leaderworkersetv1.LeaderWorkerSet)(lws)
}

func (d *LeaderWorkerSet) GVK() schema.GroupVersionKind {
func (lws *LeaderWorkerSet) GVK() schema.GroupVersionKind {
return gvk
}

Expand Down
170 changes: 170 additions & 0 deletions pkg/controller/jobs/leaderworkerset/leaderworkerset_pod_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package leaderworkerset

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"

"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
utilpod "sigs.k8s.io/kueue/pkg/util/pod"
)

type PodReconciler struct {
client client.Client
}

func NewPodReconciler(client client.Client, _ record.EventRecorder, _ ...jobframework.Option) jobframework.JobReconcilerInterface {
return &PodReconciler{client: client}
}

var _ jobframework.JobReconcilerInterface = (*PodReconciler)(nil)

func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
ctrl.Log.V(3).Info("Setting up Pod reconciler for LeaderWorkerSet")
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Named("leaderworkerset-pod").
WithEventFilter(r).
Complete(r)
}

// +kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch

func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
pod := &corev1.Pod{}
err := r.client.Get(ctx, req.NamespacedName, pod)
if err != nil {
// we'll ignore not-found errors, since there is nothing to do.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(pod))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling LeaderWorkerSet Pod")

if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
err = client.IgnoreNotFound(clientutil.Patch(ctx, r.client, pod, true, func() (bool, error) {
removed := controllerutil.RemoveFinalizer(pod, podcontroller.PodFinalizer)
if removed {
log.V(3).Info(
"Finalizing leaderworkerset pod in group",
"leaderworkerset", pod.Labels[leaderworkersetv1.SetNameLabelKey],
"pod", klog.KObj(pod),
"group", pod.Labels[podcontroller.GroupNameLabel],
)
}
return removed, nil
}))
} else {
err = client.IgnoreNotFound(clientutil.Patch(ctx, r.client, pod, true, func() (bool, error) {
updated, err := r.setDefault(ctx, pod)
if err != nil {
return false, err
}
if updated {
log.V(3).Info("Updating pod in group", "pod", klog.KObj(pod), "group", pod.Labels[podcontroller.GroupNameLabel])
}
return updated, nil
}))
}

return ctrl.Result{}, err
}

func (r *PodReconciler) setDefault(ctx context.Context, pod *corev1.Pod) (bool, error) {
// If queue label already exist nothing to update.
if _, ok := pod.Labels[constants.QueueLabel]; ok {
return false, nil
}

// We should wait for GroupIndexLabelKey.
if _, ok := pod.Labels[leaderworkersetv1.GroupIndexLabelKey]; !ok {
return false, nil
}

lws := &leaderworkersetv1.LeaderWorkerSet{}
err := r.client.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Labels[leaderworkersetv1.SetNameLabelKey]}, lws)
if err != nil {
return false, client.IgnoreNotFound(err)
}

queueName := jobframework.QueueNameForObject(lws)
// Ignore LeaderWorkerSet without queue name.
if queueName == "" {
return false, nil
}

groupName, err := GetWorkloadName(lws, pod.Labels[leaderworkersetv1.GroupIndexLabelKey])
if err != nil {
return false, err
}

pod.Labels[constants.QueueLabel] = queueName
pod.Labels[podcontroller.GroupNameLabel] = groupName
pod.Annotations[podcontroller.GroupTotalCountAnnotation] = fmt.Sprint(ptr.Deref(lws.Spec.LeaderWorkerTemplate.Size, 1))

hash, err := utilpod.GenerateShape(pod.Spec)
if err != nil {
return false, err
}
pod.Annotations[podcontroller.RoleHashAnnotation] = hash

return true, nil
}

var _ predicate.Predicate = (*PodReconciler)(nil)

func (r *PodReconciler) Generic(event.GenericEvent) bool {
return false
}

func (r *PodReconciler) Create(e event.CreateEvent) bool {
return r.handle(e.Object)
}

func (r *PodReconciler) Update(e event.UpdateEvent) bool {
return r.handle(e.ObjectNew)
}

func (r *PodReconciler) Delete(event.DeleteEvent) bool {
return false
}

func (r *PodReconciler) handle(obj client.Object) bool {
pod, isPod := obj.(*corev1.Pod)
if !isPod {
return false
}
// Handle only leaderworkerset pods.
return pod.Annotations[podcontroller.SuspendedByParentAnnotation] == FrameworkName
}
Loading

0 comments on commit 9e45939

Please sign in to comment.