Skip to content

Commit

Permalink
RB suspension: scheduler and controller change
Browse files Browse the repository at this point in the history
Signed-off-by: Monokaix <[email protected]>
  • Loading branch information
Monokaix committed Dec 28, 2024
1 parent b0c94ec commit 208246d
Show file tree
Hide file tree
Showing 14 changed files with 541 additions and 19 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/work/v1alpha2/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ const (
// FullyApplied represents the condition that the resource referencing by ResourceBinding or ClusterResourceBinding
// has been applied to all scheduled clusters.
FullyApplied string = "FullyApplied"

// SchedulingSuspended represents the condition that the ResourceBinding or ClusterResourceBinding is suspended to schedule.
SchedulingSuspended string = "SchedulingSuspended"
)

// These are reasons for a binding's transition to a Scheduled condition.
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle
return c.removeFinalizer(ctx, binding)
}

if err := updateBindingDispatchingConditionIfNeeded(ctx, c.Client, c.EventRecorder, binding, apiextensionsv1.NamespaceScoped); err != nil {
klog.ErrorS(err, "Failed to update binding condition.", "name", klog.KObj(binding), "type", workv1alpha2.SchedulingSuspended)
return controllerruntime.Result{}, err
}

return c.syncBinding(ctx, binding)
}

Expand Down
122 changes: 122 additions & 0 deletions pkg/controllers/binding/binding_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
fakedynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -39,6 +44,7 @@ import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
testing2 "github.com/karmada-io/karmada/pkg/search/proxy/testing"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
Expand Down Expand Up @@ -445,3 +451,119 @@ func TestResourceBindingController_removeFinalizer(t *testing.T) {
})
}
}

func TestUpdateBindingDispatchingConditionIfNeeded(t *testing.T) {
tests := []struct {
name string
binding *workv1alpha2.ResourceBinding
expectedCondition metav1.Condition
expectedEventCount int
expectEventMessage string
}{
{
name: "Binding scheduling is suspended",
binding: newRb(true, metav1.Condition{}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
},
expectedEventCount: 1,
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SuspendedSchedulingConditionMessage),
},
{
name: "Binding scheduling is not suspended",
binding: newRb(false, metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
Reason: SuspendedSchedulingConditionReason,
Message: SuspendedSchedulingConditionMessage,
}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionFalse,
},
expectedEventCount: 1,
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SchedulingConditionMessage),
},
{
name: "Condition already matches, no update needed",
binding: newRb(true, metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
Reason: SuspendedSchedulingConditionReason,
Message: SuspendedSchedulingConditionMessage,
}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
},
},
{
name: "No SchedulingSuspended condition and scheduling is not suspended, no update needed",
binding: newRb(false, metav1.Condition{
Type: workv1alpha2.BindingReasonUnschedulable,
Status: metav1.ConditionTrue,
}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.BindingReasonUnschedulable,
Status: metav1.ConditionTrue,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventRecorder := record.NewFakeRecorder(1)
c := newResourceBindingController(tt.binding, eventRecorder)

updatedBinding := &workv1alpha2.ResourceBinding{}
assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))

err := updateBindingDispatchingConditionIfNeeded(context.Background(), c.Client, c.EventRecorder, tt.binding, apiextensionsv1.NamespaceScoped)
if err != nil {
t.Errorf("updateBindingDispatchingConditionIfNeeded() returned an error: %v", err)
}

assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))
assert.True(t, meta.IsStatusConditionPresentAndEqual(updatedBinding.Status.Conditions, tt.expectedCondition.Type, tt.expectedCondition.Status))
assert.Equal(t, tt.expectedEventCount, len(eventRecorder.Events))
if tt.expectEventMessage != "" {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}
})
}
}

func newResourceBindingController(binding *workv1alpha2.ResourceBinding, eventRecord record.EventRecorder) ResourceBindingController {
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(binding).WithStatusSubresource(binding).WithRESTMapper(restMapper).Build()
return ResourceBindingController{
Client: fakeClient,
EventRecorder: eventRecord,
}
}

func newRb(suspended bool, condition metav1.Condition) *workv1alpha2.ResourceBinding {
return &workv1alpha2.ResourceBinding{
TypeMeta: metav1.TypeMeta{
Kind: workv1alpha2.ResourceKindResourceBinding,
APIVersion: workv1alpha2.GroupVersion.Version,
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-rb",
Namespace: "default",
UID: uuid.NewUUID(),
},
Spec: workv1alpha2.ResourceBindingSpec{
Suspension: &workv1alpha2.Suspension{
Scheduling: ptr.To(suspended),
},
},
Status: workv1alpha2.ResourceBindingStatus{
Conditions: []metav1.Condition{
condition,
},
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co
return c.removeFinalizer(ctx, clusterResourceBinding)
}

if err := updateBindingDispatchingConditionIfNeeded(ctx, c.Client, c.EventRecorder, clusterResourceBinding, apiextensionsv1.ClusterScoped); err != nil {
klog.ErrorS(err, "Failed to update binding condition.", "name", klog.KObj(clusterResourceBinding), "type", workv1alpha2.SchedulingSuspended)
return controllerruntime.Result{}, err
}

return c.syncBinding(ctx, clusterResourceBinding)
}

Expand Down
122 changes: 122 additions & 0 deletions pkg/controllers/binding/cluster_resource_binding_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
fakedynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -39,6 +44,7 @@ import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
testing2 "github.com/karmada-io/karmada/pkg/search/proxy/testing"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
Expand Down Expand Up @@ -439,3 +445,119 @@ func TestClusterResourceBindingController_newOverridePolicyFunc(t *testing.T) {
})
}
}

func TestUpdateClusterBindingDispatchingConditionIfNeeded(t *testing.T) {
tests := []struct {
name string
binding *workv1alpha2.ClusterResourceBinding
expectedCondition metav1.Condition
expectedEventCount int
expectEventMessage string
}{
{
name: "Binding scheduling is suspended",
binding: newCrb(true, metav1.Condition{}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
},
expectedEventCount: 1,
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SuspendedSchedulingConditionMessage),
},
{
name: "Binding scheduling is not suspended",
binding: newCrb(false, metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
Reason: SuspendedSchedulingConditionReason,
Message: SuspendedSchedulingConditionMessage,
}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionFalse,
},
expectedEventCount: 1,
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SchedulingConditionMessage),
},
{
name: "Condition already matches, no update needed",
binding: newCrb(true, metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
Reason: SuspendedSchedulingConditionReason,
Message: SuspendedSchedulingConditionMessage,
}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.SchedulingSuspended,
Status: metav1.ConditionTrue,
},
},
{
name: "No SchedulingSuspended condition and scheduling is not suspended, no update needed",
binding: newCrb(false, metav1.Condition{
Type: workv1alpha2.BindingReasonUnschedulable,
Status: metav1.ConditionTrue,
}),
expectedCondition: metav1.Condition{
Type: workv1alpha2.BindingReasonUnschedulable,
Status: metav1.ConditionTrue,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventRecorder := record.NewFakeRecorder(1)
c := newClusterResourceBindingController(tt.binding, eventRecorder)

updatedBinding := &workv1alpha2.ClusterResourceBinding{}
assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))

err := updateBindingDispatchingConditionIfNeeded(context.Background(), c.Client, c.EventRecorder, tt.binding, apiextensionsv1.ClusterScoped)
if err != nil {
t.Errorf("updateBindingDispatchingConditionIfNeeded() returned an error: %v", err)
}

assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))
assert.True(t, meta.IsStatusConditionPresentAndEqual(updatedBinding.Status.Conditions, tt.expectedCondition.Type, tt.expectedCondition.Status))
assert.Equal(t, tt.expectedEventCount, len(eventRecorder.Events))
if tt.expectEventMessage != "" {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}
})
}
}

func newClusterResourceBindingController(binding *workv1alpha2.ClusterResourceBinding, eventRecord record.EventRecorder) ClusterResourceBindingController {
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(binding).WithStatusSubresource(binding).WithRESTMapper(restMapper).Build()
return ClusterResourceBindingController{
Client: fakeClient,
EventRecorder: eventRecord,
}
}

func newCrb(suspended bool, condition metav1.Condition) *workv1alpha2.ClusterResourceBinding {
return &workv1alpha2.ClusterResourceBinding{
TypeMeta: metav1.TypeMeta{
Kind: workv1alpha2.ResourceKindResourceBinding,
APIVersion: workv1alpha2.GroupVersion.Version,
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-rb",
Namespace: "default",
UID: uuid.NewUUID(),
},
Spec: workv1alpha2.ResourceBindingSpec{
Suspension: &workv1alpha2.Suspension{
Scheduling: ptr.To(suspended),
},
},
Status: workv1alpha2.ResourceBindingStatus{
Conditions: []metav1.Condition{
condition,
},
},
}
}
Loading

0 comments on commit 208246d

Please sign in to comment.