Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: force strict mode for patch for safe concurrent writes #3912

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
log.Error(err, "couldn't get the parent job workload")
return ctrl.Result{}, err
} else if parentWorkload == nil || !workload.IsAdmitted(parentWorkload) {
if err := clientutil.Patch(ctx, r.client, object, true, func() (bool, error) {
if err := clientutil.Patch(ctx, r.client, object, func() (bool, error) {
job.Suspend()
return true, nil
}); err != nil {
Expand Down Expand Up @@ -855,7 +855,7 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
return err
}
} else {
if err := clientutil.Patch(ctx, r.client, object, true, func() (bool, error) {
if err := clientutil.Patch(ctx, r.client, object, func() (bool, error) {
return true, job.RunWithPodSetsInfo(info)
}); err != nil {
return err
Expand Down Expand Up @@ -899,7 +899,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
return nil
}

if err := clientutil.Patch(ctx, r.client, object, true, func() (bool, error) {
if err := clientutil.Patch(ctx, r.client, object, func() (bool, error) {
job.Suspend()
if info != nil {
job.RestorePodSetsInfo(info)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.Po
stoppedNow := false

if !j.IsSuspended() {
if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) {
if err := clientutil.Patch(ctx, c, object, func() (bool, error) {
j.Suspend()
if j.ObjectMeta.Annotations == nil {
j.ObjectMeta.Annotations = map[string]string{}
Expand All @@ -194,7 +194,7 @@ func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.Po
}
}

if err := clientutil.Patch(ctx, c, object, true, func() (bool, error) {
if err := clientutil.Patch(ctx, c, object, func() (bool, error) {
j.RestorePodSetsInfo(podSetsInfo)
delete(j.ObjectMeta.Annotations, StoppingAnnotation)
return true, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (p *Pod) Run(ctx context.Context, c client.Client, podSetsInfo []podset.Pod
return nil
}

if err := clientutil.Patch(ctx, c, &p.pod, true, func() (bool, error) {
if err := clientutil.Patch(ctx, c, &p.pod, func() (bool, error) {
ungate(&p.pod)
return true, podset.Merge(&p.pod.ObjectMeta, &p.pod.Spec, podSetsInfo[0])
}); err != nil {
Expand All @@ -260,7 +260,7 @@ func (p *Pod) Run(ctx context.Context, c client.Client, podSetsInfo []podset.Pod
return nil
}

if err := clientutil.Patch(ctx, c, pod, true, func() (bool, error) {
if err := clientutil.Patch(ctx, c, pod, func() (bool, error) {
ungate(pod)

roleHash, err := getRoleHash(*pod)
Expand Down Expand Up @@ -511,7 +511,7 @@ func (p *Pod) Finalize(ctx context.Context, c client.Client) error {

return parallelize.Until(ctx, len(podsInGroup.Items), func(i int) error {
pod := &podsInGroup.Items[i]
return clientutil.Patch(ctx, c, pod, false, func() (bool, error) {
return clientutil.Patch(ctx, c, pod, func() (bool, error) {
return controllerutil.RemoveFinalizer(pod, PodFinalizer), nil
})
})
Expand Down Expand Up @@ -851,7 +851,7 @@ func (p *Pod) removeExcessPods(ctx context.Context, c client.Client, r record.Ev
// Finalize and delete the active pods created last
err := parallelize.Until(ctx, len(extraPods), func(i int) error {
pod := extraPods[i]
if err := clientutil.Patch(ctx, c, &pod, false, func() (bool, error) {
if err := clientutil.Patch(ctx, c, &pod, func() (bool, error) {
removed := controllerutil.RemoveFinalizer(&pod, PodFinalizer)
log.V(3).Info("Finalizing excess pod in group", "excessPod", klog.KObj(&pod))
return removed, nil
Expand Down Expand Up @@ -891,7 +891,7 @@ func (p *Pod) finalizePods(ctx context.Context, c client.Client, extraPods []cor
err := parallelize.Until(ctx, len(extraPods), func(i int) error {
pod := extraPods[i]
var removed bool
if err := clientutil.Patch(ctx, c, &pod, false, func() (bool, error) {
if err := clientutil.Patch(ctx, c, &pod, func() (bool, error) {
removed = controllerutil.RemoveFinalizer(&pod, PodFinalizer)
log.V(3).Info("Finalizing pod in group", "Pod", klog.KObj(&pod))
return removed, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/statefulset/statefulset_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *Reconciler) finalizePods(ctx context.Context, pods []corev1.Pod) error
if p.Status.Phase != corev1.PodSucceeded && p.Status.Phase != corev1.PodFailed {
return nil
}
err := clientutil.Patch(ctx, r.client, p, true, func() (bool, error) {
err := clientutil.Patch(ctx, r.client, p, func() (bool, error) {
removed := controllerutil.RemoveFinalizer(p, pod.PodFinalizer)
if removed {
log.V(3).Info("Finalizing pod in group", "pod", klog.KObj(p), "group", p.Labels[pod.GroupNameLabel])
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/tas/topology_ungater.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request)
err = parallelize.Until(ctx, len(allToUngate), func(i int) error {
podWithUngateInfo := &allToUngate[i]
var ungated bool
e := utilclient.Patch(ctx, r.client, podWithUngateInfo.pod, true, func() (bool, error) {
e := utilclient.Patch(ctx, r.client, podWithUngateInfo.pod, func() (bool, error) {
log.V(3).Info("ungating pod", "pod", klog.KObj(podWithUngateInfo.pod), "nodeLabels", podWithUngateInfo.nodeLabels)
ungated = utilpod.Ungate(podWithUngateInfo.pod, kueuealpha.TopologySchedulingGate)
if podWithUngateInfo.pod.Spec.NodeSelector == nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func CreatePatch(before, after client.Object) (client.Patch, error) {
}

// Patch applies the merge patch of client.Object.
// If strict is true, the resourceVersion will be part of the patch, make this call fail if
// The resourceVersion will be part of the patch, make this call fail if
// client.Object was changed.
func Patch(ctx context.Context, c client.Client, obj client.Object, strict bool, update func() (bool, error)) error {
func Patch(ctx context.Context, c client.Client, obj client.Object, update func() (bool, error)) error {
objOriginal := obj.DeepCopyObject().(client.Object)
if strict {
// Clearing ResourceVersion from the original object to make sure it is included in the generated patch.
objOriginal.SetResourceVersion("")
}

// Clearing ResourceVersion from the original object to make sure it is included in the generated patch.
objOriginal.SetResourceVersion("")

updated, err := update()
if err != nil || !updated {
return err
Expand Down