From 93cc36d57b200d5a4d900ba48e1b35f5cdb6152b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=AF=93?= Date: Tue, 7 Nov 2023 11:31:16 +0800 Subject: [PATCH] feat(sync): sync to none cluster when object is fed hpa --- config/sample/host/01-ftc.yaml | 2 +- pkg/controllers/common/constants.go | 14 +- pkg/controllers/federatedhpa/controller.go | 277 ++++++++++-------- pkg/controllers/federatedhpa/util.go | 155 ++++++---- pkg/controllers/federatedhpa/util_test.go | 267 +++++++++++++++++ pkg/controllers/follower/controller.go | 3 +- .../override/overridepolicy_controller.go | 17 +- pkg/controllers/scheduler/scheduler.go | 17 +- pkg/controllers/sync/controller.go | 12 +- pkg/controllers/sync/util.go | 42 +++ pkg/controllers/sync/util_test.go | 112 +++++++ pkg/util/fedobjectadapters/adapters.go | 31 ++ 12 files changed, 739 insertions(+), 210 deletions(-) create mode 100644 pkg/controllers/federatedhpa/util_test.go create mode 100644 pkg/controllers/sync/util.go create mode 100644 pkg/controllers/sync/util_test.go diff --git a/config/sample/host/01-ftc.yaml b/config/sample/host/01-ftc.yaml index 956c4ead..6de1df3a 100644 --- a/config/sample/host/01-ftc.yaml +++ b/config/sample/host/01-ftc.yaml @@ -414,7 +414,7 @@ spec: kind: HorizontalPodAutoscaler pluralName: horizontalpodautoscalers scope: Namespaced - version: v1 + version: v2beta2 controllers: - - kubeadmiral.io/federatedhpa-controller - - kubeadmiral.io/global-scheduler diff --git a/pkg/controllers/common/constants.go b/pkg/controllers/common/constants.go index 3bd228d4..81b9c1c1 100644 --- a/pkg/controllers/common/constants.go +++ b/pkg/controllers/common/constants.go @@ -116,6 +116,14 @@ const ( TemplateGeneratorMergePatchAnnotation = FederateControllerPrefix + "template-generator-merge-patch" LatestReplicasetDigestsAnnotation = DefaultPrefix + "latest-replicaset-digests" + + HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path" +) + +// The following consts are labels key-values used by Kubeadmiral controllers. + +const ( + FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled" ) // PropagatedAnnotationKeys and PropagatedLabelKeys are used to store the keys of annotations and labels that are present @@ -190,9 +198,3 @@ var ( // //nolint:lll const MaxFederatedObjectNameLength = 253 - -// HPAScaleTargetRefPath defines the fed hpa annotations and labels -const ( - HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path" - FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled" -) diff --git a/pkg/controllers/federatedhpa/controller.go b/pkg/controllers/federatedhpa/controller.go index cd80fac6..290772ad 100644 --- a/pkg/controllers/federatedhpa/controller.go +++ b/pkg/controllers/federatedhpa/controller.go @@ -19,6 +19,7 @@ package federatedhpa import ( "context" "fmt" + "sync" "time" "github.com/pkg/errors" @@ -26,7 +27,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -80,9 +80,10 @@ type FederatedHPAController struct { worker worker.ReconcileWorker[Resource] cacheSyncRateLimiter workqueue.RateLimiter - scaleTargetRefMapping map[schema.GroupVersionKind]string - workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource] - ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource] + gvkToScaleTargetRefLock sync.RWMutex + gvkToScaleTargetRef map[schema.GroupVersionKind]string + workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource] + ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource] metrics stats.Metrics logger klog.Logger @@ -114,9 +115,9 @@ func NewFederatedHPAController( cacheSyncRateLimiter: workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), - scaleTargetRefMapping: map[schema.GroupVersionKind]string{}, - workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](), - ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](), + gvkToScaleTargetRef: map[schema.GroupVersionKind]string{}, + workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](), + ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](), metrics: metrics, logger: logger.WithValues("controller", FederatedHPAControllerName), @@ -210,24 +211,19 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.Fed logger := f.logger.WithValues("ftc", ftc.GetName()) if scaleTargetRefPath, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath]; ok { - f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = scaleTargetRefPath + f.setGVKToScaleTargetRef(ftc.GetSourceTypeGVK(), scaleTargetRefPath) } else { - delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK()) + f.deleteGVKToScaleTargetRef(ftc.GetSourceTypeGVK()) return } logger.V(2).Info("Enqueue federated objects for FTC") - allObjects := []fedcorev1a1.GenericFederatedObject{} - labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} - fedObjects, err := f.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet)) + allObjects, err := fedobjectadapters.ListAllFedObjsForFTC(ftc, f.fedObjectInformer, nil) if err != nil { - logger.Error(err, "Failed to enqueue FederatedObjects for policy") + f.logger.Error(err, "Failed to enqueue Objects for fed hpa") return } - for _, obj := range fedObjects { - allObjects = append(allObjects, obj) - } for _, obj := range allObjects { sourceObjectResource, err := fedObjectToSourceObjectResource(obj) @@ -278,23 +274,120 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s logger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcile") }() + ctx, handleResp, res := f.handleCache(ctx, key) + if handleResp == nil { + return res + } + + logger = klog.FromContext(ctx) + + hpaFTC, hpaGVR, hpaObject, fedHPAObject, fedWorkloadObject, ppObject, newPPResource := handleResp.hpaFTC, handleResp.hpaGVR, + handleResp.hpaObject, handleResp.fedHPAObject, handleResp.fedWorkloadObject, handleResp.ppObject, handleResp.newPPResource + + var err error + var isHPAObjectUpdated, isFedHPAObjectUpdated bool + switch hpaObject.GetLabels()[FederatedHPAMode] { + case FederatedHPAModeFederation: + if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject, hpaFTC); err != nil { + return worker.StatusError + } + + if fedWorkloadObject == nil || isPropagationPolicyExist(ppObject) && isPropagationPolicyDividedMode(ppObject) { + isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(ctx, hpaObject) + isHPAObjectUpdated = addFedHPAEnableLabel(ctx, hpaObject) || isHPAObjectUpdated + } else { + hpaNotWorkReason := generateFederationHPANotWorkReason(newPPResource, ppObject) + f.eventRecorder.Eventf( + hpaObject, + corev1.EventTypeWarning, + EventReasonFederationHPANotWork, + "Federation HPA not work: %s", + hpaNotWorkReason, + ) + + isHPAObjectUpdated = addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaNotWorkReason) + isHPAObjectUpdated = removeFedHPAEnableLabel(ctx, hpaObject) || isHPAObjectUpdated + } + + case FederatedHPAModeDistributed, FederatedHPAModeDefault: + isHPAObjectUpdated = removeFedHPAEnableLabel(ctx, hpaObject) + + if fedWorkloadObject == nil || isPropagationPolicyExist(ppObject) && + isPropagationPolicyDuplicateMode(ppObject) && + isPropagationPolicyFollowerEnabled(ppObject) && + isWorkloadRetainReplicas(fedWorkloadObject) && + isHPAFollowTheWorkload(ctx, hpaObject, fedWorkloadObject) { + isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(ctx, hpaObject) || isHPAObjectUpdated + if isFedHPAObjectUpdated, err = removePendingController(ctx, hpaFTC, fedHPAObject); err != nil { + return worker.StatusError + } + } else { + hpaNotWorkReason := generateDistributedHPANotWorkReason(ctx, newPPResource, ppObject, fedWorkloadObject, hpaObject) + f.eventRecorder.Eventf( + hpaObject, + corev1.EventTypeWarning, + EventReasonDistributedHPANotWork, + "Distributed HPA not work: %s", + hpaNotWorkReason, + ) + + isHPAObjectUpdated = addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaNotWorkReason) || isHPAObjectUpdated + if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject, hpaFTC); err != nil { + return worker.StatusError + } + } + } + + if isHPAObjectUpdated { + logger.V(1).Info("Updating hpa object") + _, err := f.dynamicClient.Resource(hpaGVR).Namespace(hpaObject.GetNamespace()).Update(ctx, hpaObject, metav1.UpdateOptions{}) + if err != nil { + errMsg := "Failed to update hpa object" + logger.Error(err, errMsg) + f.eventRecorder.Eventf(hpaObject, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, + errMsg+" %v, err: %v, retry later", key, err) + if apierrors.IsConflict(err) { + return worker.StatusConflict + } + return worker.StatusError + } + } + + if isFedHPAObjectUpdated { + logger.V(1).Info("Updating fed hpa object") + if _, err = fedobjectadapters.Update(ctx, f.fedClient.CoreV1alpha1(), fedHPAObject, metav1.UpdateOptions{}); err != nil { + errMsg := "Failed to update fed hpa object" + logger.Error(err, errMsg) + f.eventRecorder.Eventf(fedHPAObject, corev1.EventTypeWarning, EventReasonUpdateHPAFedObject, + errMsg+" %v, err: %v, retry later", fedHPAObject, err) + if apierrors.IsConflict(err) { + return worker.StatusConflict + } + return worker.StatusError + } + } + + return worker.StatusAllOK +} + +func (f *FederatedHPAController) handleCache(ctx context.Context, key Resource) (context.Context, *handleCacheResp, worker.Result) { hpaFTC, exists := f.informerManager.GetResourceFTC(key.gvk) if !exists { // Waiting for func enqueueFedHPAObjectsForFTC enqueue it again. - return worker.StatusAllOK + return ctx, nil, worker.StatusAllOK } hpaGVR := hpaFTC.GetSourceTypeGVR() - ctx, logger = logging.InjectLoggerValues(ctx, "hpa-ftc", hpaFTC.Name) + ctx, logger := logging.InjectLoggerValues(ctx, "hpa-ftc", hpaFTC.Name) lister, hasSynced, exists := f.informerManager.GetResourceLister(key.gvk) if !exists { - return worker.StatusAllOK + return ctx, nil, worker.StatusAllOK } if !hasSynced() { // If lister is not yet synced, simply reenqueue after a short delay. logger.V(3).Info("Lister for source hpa type not yet synced, will reenqueue") - return worker.Result{ + return ctx, nil, worker.Result{ Success: true, RequeueAfter: pointer.Duration(f.cacheSyncRateLimiter.When(key)), } @@ -304,7 +397,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s hpaUns, err := lister.Get(key.QualifiedName().String()) if err != nil { logger.Error(err, "Failed to get source hpa object from store") - return worker.StatusAllOK + return ctx, nil, worker.StatusAllOK } hpaObject := hpaUns.(*unstructured.Unstructured).DeepCopy() @@ -319,21 +412,20 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s ) if err != nil { logger.Error(err, "Failed to get federated hpa object from store") - return worker.StatusError + return ctx, nil, worker.StatusError } if ok, err := pendingcontrollers.ControllerDependenciesFulfilled(fedHPAObject, PrefixedFederatedHPAControllerName); err != nil { logger.Error(err, "Failed to check controller dependencies") - return worker.StatusError + return ctx, nil, worker.StatusError } else if !ok { - return worker.StatusAllOK + return ctx, nil, worker.StatusAllOK } - scaleTargetRef := f.scaleTargetRefMapping[key.gvk] - newWorkloadResource, err := scaleTargetRefToResource(hpaObject, scaleTargetRef) + newWorkloadResource, err := f.scaleTargetRefToResource(key.gvk, hpaObject) if err != nil { logger.Error(err, "Failed to get workload resource from hpa") - return worker.StatusError + return ctx, nil, worker.StatusError } ctx, logger = logging.InjectLoggerValues(ctx, @@ -347,21 +439,20 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s } if err := f.workloadHPAMapping.Add(newWorkloadResource, key); err != nil { logger.Error(err, "Failed to add workload and hpa mapping") - return worker.StatusError + return ctx, nil, worker.StatusError } - workloadExist := true - fedWorkload, err := f.getFedWorkLoadFromResource(newWorkloadResource) + fedWorkloadObject, err := f.getFedWorkLoadFromResource(newWorkloadResource) if err != nil { if !apierrors.IsNotFound(err) { logger.Error(err, "Failed to get fed workload from fed workload resource") - return worker.StatusError + return ctx, nil, worker.StatusError } - workloadExist = false } - var pp fedcorev1a1.GenericPropagationPolicy - if workloadExist { - newPPResource := getPropagationPolicyResourceFromFedWorkload(fedWorkload) + var ppObject fedcorev1a1.GenericPropagationPolicy + var newPPResource *Resource + if fedWorkloadObject != nil { + newPPResource = getPropagationPolicyResourceFromFedWorkload(fedWorkloadObject) if newPPResource != nil { _, exist = f.ppWorkloadMapping.LookupByT2(newWorkloadResource) if exist { @@ -369,105 +460,49 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s } if err := f.ppWorkloadMapping.Add(*newPPResource, newWorkloadResource); err != nil { - logger.Error(err, "Failed to add workload and pp mapping") - return worker.StatusError + logger.Error(err, "Failed to add workload and ppObject mapping") + return ctx, nil, worker.StatusError } - pp, err = f.getPropagationPolicyFromResource(newPPResource) + ppObject, err = f.getPropagationPolicyFromResource(newPPResource) if err != nil && !apierrors.IsNotFound(err) { - logger.Error(err, "Failed to get pp from pp resource") - return worker.StatusError + logger.Error(err, "Failed to get ppObject from ppObject resource") + return ctx, nil, worker.StatusError } } } - var isHPAObjectUpdated, isFedHPAObjectUpdated bool - switch hpaObject.GetLabels()[FederatedHPAMode] { - case FederatedHPAModeFederation: - if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject, hpaFTC); err != nil { - return worker.StatusError - } - - if !workloadExist || isPropagationPolicyDividedMode(pp) { - isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason) - isHPAObjectUpdated = isHPAObjectUpdated || addHPALabel(hpaObject, common.FedHPAEnableKey, common.AnnotationValueTrue) - } else { - hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp)) - f.eventRecorder.Eventf( - hpaObject, - corev1.EventTypeWarning, - EventReasonFederationHPANotWork, - "Federation HPA not work: %s", - hpaNotWorkReason, - ) - - isHPAObjectUpdated = addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason) - isHPAObjectUpdated = isHPAObjectUpdated || removeHPALabel(hpaObject, common.FedHPAEnableKey) - } + return ctx, &handleCacheResp{ + hpaGVR: hpaGVR, + hpaFTC: hpaFTC, + hpaObject: hpaObject, + fedHPAObject: fedHPAObject, + fedWorkloadObject: fedWorkloadObject, + ppObject: ppObject, + newPPResource: newPPResource, + }, worker.StatusAllOK +} - case FederatedHPAModeDistributed, FederatedHPAModeDefault: - isHPAObjectUpdated = removeHPALabel(hpaObject, common.FedHPAEnableKey) +func (f *FederatedHPAController) getGVKToScaleTargetRef(gvk schema.GroupVersionKind) (string, bool) { + f.gvkToScaleTargetRefLock.RLock() + defer f.gvkToScaleTargetRefLock.RUnlock() - if !workloadExist || isPropagationPolicyDuplicateMode(pp) && - isPropagationPolicyFollowerEnabled(pp) && - isWorkloadRetainReplicas(fedWorkload) && - isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) { - isHPAObjectUpdated = isHPAObjectUpdated || removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason) - if isFedHPAObjectUpdated, err = removePendingController(ctx, hpaFTC, fedHPAObject); err != nil { - return worker.StatusError - } - } else { - hpaNotWorkReason := generateDistributedHPANotWorkReason( - isPropagationPolicyExist(pp), - isPropagationPolicyDuplicateMode(pp), - isPropagationPolicyFollowerEnabled(pp), - isWorkloadRetainReplicas(fedWorkload), - isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload)) - f.eventRecorder.Eventf( - hpaObject, - corev1.EventTypeWarning, - EventReasonDistributedHPANotWork, - "Distributed HPA not work: %s", - hpaNotWorkReason, - ) + val, exists := f.gvkToScaleTargetRef[gvk] + return val, exists +} - isHPAObjectUpdated = isHPAObjectUpdated || addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason) - if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject, hpaFTC); err != nil { - return worker.StatusError - } - } - } +func (f *FederatedHPAController) setGVKToScaleTargetRef(gvk schema.GroupVersionKind, val string) { + f.gvkToScaleTargetRefLock.Lock() + defer f.gvkToScaleTargetRefLock.Unlock() - if isHPAObjectUpdated { - logger.V(1).Info("Updating hpa object") - _, err := f.dynamicClient.Resource(hpaGVR).Namespace(hpaObject.GetNamespace()).UpdateStatus(ctx, hpaObject, metav1.UpdateOptions{}) - if err != nil { - errMsg := "Failed to update hpa object" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(hpaObject, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, - errMsg+" %v, err: %v, retry later", key, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } - } + f.gvkToScaleTargetRef[gvk] = val +} - if isFedHPAObjectUpdated { - logger.V(1).Info("Updating fed hpa object") - if _, err = fedobjectadapters.Update(ctx, f.fedClient.CoreV1alpha1(), fedHPAObject, metav1.UpdateOptions{}); err != nil { - errMsg := "Failed to update fed hpa object" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(fedHPAObject, corev1.EventTypeWarning, EventReasonUpdateHPAFedObject, - errMsg+" %v, err: %v, retry later", fedHPAObject, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } - } +func (f *FederatedHPAController) deleteGVKToScaleTargetRef(gvk schema.GroupVersionKind) { + f.gvkToScaleTargetRefLock.Lock() + defer f.gvkToScaleTargetRefLock.Unlock() - return worker.StatusAllOK + delete(f.gvkToScaleTargetRef, gvk) } func (f *FederatedHPAController) getFedWorkLoadFromResource(workload Resource) (fedcorev1a1.GenericFederatedObject, error) { diff --git a/pkg/controllers/federatedhpa/util.go b/pkg/controllers/federatedhpa/util.go index 79ed6d33..b3c2f0d2 100644 --- a/pkg/controllers/federatedhpa/util.go +++ b/pkg/controllers/federatedhpa/util.go @@ -52,6 +52,16 @@ type Resource struct { name string } +type handleCacheResp struct { + hpaFTC *fedcorev1a1.FederatedTypeConfig + hpaGVR schema.GroupVersionResource + hpaObject *unstructured.Unstructured + fedHPAObject fedcorev1a1.GenericFederatedObject + fedWorkloadObject fedcorev1a1.GenericFederatedObject + ppObject fedcorev1a1.GenericPropagationPolicy + newPPResource *Resource +} + func (r Resource) QualifiedName() common.QualifiedName { return common.QualifiedName{ Namespace: r.namespace, @@ -73,58 +83,83 @@ func fedObjectToSourceObjectResource(object metav1.Object) (Resource, error) { } func policyObjectToResource(object metav1.Object) Resource { - if cpp, ok := object.(*fedcorev1a1.ClusterPropagationPolicy); ok { + if pp, ok := object.(*fedcorev1a1.PropagationPolicy); ok { return Resource{ - name: cpp.GetName(), - namespace: cpp.GetNamespace(), - gvk: cpp.GroupVersionKind(), + name: pp.GetName(), + namespace: pp.GetNamespace(), + gvk: schema.GroupVersionKind{ + Group: fedcorev1a1.SchemeGroupVersion.Group, + Version: fedcorev1a1.SchemeGroupVersion.Version, + Kind: PropagationPolicyKind, + }, } } - if pp, ok := object.(*fedcorev1a1.PropagationPolicy); ok { + if cpp, ok := object.(*fedcorev1a1.ClusterPropagationPolicy); ok { return Resource{ - name: pp.GetName(), - namespace: pp.GetNamespace(), - gvk: pp.GroupVersionKind(), + name: cpp.GetName(), + namespace: cpp.GetNamespace(), + gvk: schema.GroupVersionKind{ + Group: fedcorev1a1.SchemeGroupVersion.Group, + Version: fedcorev1a1.SchemeGroupVersion.Version, + Kind: ClusterPropagationPolicyKind, + }, } } + return Resource{} } -func generateFederationHPANotWorkReason(isPropagationPolicyExist, isPropagationPolicyDividedMode bool) string { +func generateFederationHPANotWorkReason(newPPResource *Resource, pp fedcorev1a1.GenericPropagationPolicy) string { var reasons []string - if !isPropagationPolicyExist { - reasons = append(reasons, "PropagationPolicy is not exist.") + if newPPResource == nil { + reasons = append(reasons, "The workload is not bound to any propagationPolicy.") + return fmt.Sprintf("%v", reasons) + } + + ppKind, ppName := newPPResource.gvk.Kind, newPPResource.name + if !isPropagationPolicyExist(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload does not exist.", ppKind, ppName)) + return fmt.Sprintf("%v", reasons) } - if isPropagationPolicyExist && !isPropagationPolicyDividedMode { - reasons = append(reasons, "PropagationPolicy is not divide.") + if !isPropagationPolicyDividedMode(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not Divided mode.", ppKind, ppName)) } return fmt.Sprintf("%v", reasons) } func generateDistributedHPANotWorkReason( - isPropagationPolicyExist, - isPropagationPolicyDuplicateMode, - isPropagationPolicyFollowerEnabled, - isWorkloadRetainReplicas, - isHPAFollowTheWorkload bool, + ctx context.Context, + newPPResource *Resource, + pp fedcorev1a1.GenericPropagationPolicy, + fedWorkload fedcorev1a1.GenericFederatedObject, + hpaObject *unstructured.Unstructured, ) string { var reasons []string - if !isPropagationPolicyExist { - reasons = append(reasons, "PropagationPolicy is not exist.") + + if !isWorkloadRetainReplicas(fedWorkload) { + reasons = append(reasons, "The workload is not enabled for retain replicas.") } - if !isPropagationPolicyDuplicateMode { - reasons = append(reasons, "PropagationPolicy is not Duplicate.") + if !isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) { + reasons = append(reasons, "The hpa is not follow the workload.") } - if !isPropagationPolicyFollowerEnabled { - reasons = append(reasons, "PropagationPolicy follower is not enable.") + + if newPPResource == nil { + reasons = append(reasons, "The workload is not bound to any propagationPolicy.") + return fmt.Sprintf("%v", reasons) + } + + ppKind, ppName := newPPResource.gvk.Kind, newPPResource.name + if !isPropagationPolicyExist(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload does not exist.", ppKind, ppName)) + return fmt.Sprintf("%v", reasons) } - if !isWorkloadRetainReplicas { - reasons = append(reasons, "Workload is not retain replicas.") + if !isPropagationPolicyDuplicateMode(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not Duplicate mode.", ppKind, ppName)) } - if !isHPAFollowTheWorkload { - reasons = append(reasons, "Hpa is not follow the workload.") + if !isPropagationPolicyFollowerEnabled(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not enabled for follower scheduling.", ppKind, ppName)) } return fmt.Sprintf("%v", reasons) @@ -139,11 +174,11 @@ func isPropagationPolicyExist(pp fedcorev1a1.GenericPropagationPolicy) bool { } func isPropagationPolicyDividedMode(pp fedcorev1a1.GenericPropagationPolicy) bool { - return pp != nil && pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDivide + return pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDivide } func isPropagationPolicyDuplicateMode(pp fedcorev1a1.GenericPropagationPolicy) bool { - return pp != nil && pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDuplicate + return pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDuplicate } func isPropagationPolicyFollowerEnabled(pp fedcorev1a1.GenericPropagationPolicy) bool { @@ -179,15 +214,15 @@ func (f *FederatedHPAController) isHPAType(resourceGVK schema.GroupVersionKind) } // HPA gvk has already been stored - if _, ok := f.scaleTargetRefMapping[resourceGVK]; ok { + if _, ok := f.getGVKToScaleTargetRef(resourceGVK); ok { return true } if path, ok := ftc.Annotations[common.HPAScaleTargetRefPath]; ok { - f.scaleTargetRefMapping[resourceGVK] = path + f.setGVKToScaleTargetRef(resourceGVK, path) return true } else { - delete(f.scaleTargetRefMapping, resourceGVK) + f.deleteGVKToScaleTargetRef(resourceGVK) return false } } @@ -196,8 +231,13 @@ func isWorkloadRetainReplicas(fedObj metav1.Object) bool { return fedObj.GetAnnotations()[common.RetainReplicasAnnotation] == common.AnnotationValueTrue } -func scaleTargetRefToResource(hpaUns *unstructured.Unstructured, scaleTargetRef string) (Resource, error) { - fieldVal, found, err := unstructured.NestedFieldCopy(hpaUns.Object, strings.Split(scaleTargetRef, ".")...) +func (f *FederatedHPAController) scaleTargetRefToResource(gvk schema.GroupVersionKind, uns *unstructured.Unstructured) (Resource, error) { + scaleTargetRef, exists := f.getGVKToScaleTargetRef(gvk) + if !exists { + return Resource{}, errors.New("Failed to get gvk to ScaleTargetRef") + } + + fieldVal, found, err := unstructured.NestedFieldCopy(uns.Object, strings.Split(scaleTargetRef, ".")...) if err != nil || !found { if err != nil { return Resource{}, errors.New(fmt.Sprintf("%s: %s", scaleTargetRef, err.Error())) @@ -220,8 +260,8 @@ func scaleTargetRefToResource(hpaUns *unstructured.Unstructured, scaleTargetRef return Resource{ name: targetResource.Name, - gvk: schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: targetResource.Kind}, - namespace: hpaUns.GetNamespace(), + gvk: gv.WithKind(targetResource.Kind), + namespace: uns.GetNamespace(), }, nil } @@ -252,28 +292,34 @@ func getPropagationPolicyResourceFromFedWorkload(workload fedcorev1a1.GenericFed return nil } -func addHPALabel(uns *unstructured.Unstructured, key, value string) bool { +func addFedHPAEnableLabel(ctx context.Context, uns *unstructured.Unstructured) bool { + logger := klog.FromContext(ctx) + labels := uns.GetLabels() if labels == nil { labels = make(map[string]string, 1) } - if oldValue, ok := labels[key]; ok && oldValue == value { + if oldValue, ok := labels[common.FedHPAEnableKey]; ok && oldValue == common.AnnotationValueTrue { return false } - labels[key] = value + logger.V(3).Info("Adding fed hpa enable label") + labels[common.FedHPAEnableKey] = common.AnnotationValueTrue uns.SetLabels(labels) return true } -func removeHPALabel(uns *unstructured.Unstructured, key string) bool { +func removeFedHPAEnableLabel(ctx context.Context, uns *unstructured.Unstructured) bool { + logger := klog.FromContext(ctx) + labels := uns.GetLabels() - if _, ok := labels[key]; !ok { + if _, ok := labels[common.FedHPAEnableKey]; !ok { return false } - delete(labels, key) + logger.V(3).Info("Removing fed hpa enable label") + delete(labels, common.FedHPAEnableKey) uns.SetLabels(labels) return true @@ -300,6 +346,8 @@ func addFedHPAPendingController( } } + logger.V(3).Info(fmt.Sprintf("Setting pending controllers %v", ftc.GetControllers())) + // TODO: By default, fed-hpa controller is the first controller. // Otherwise, this code needs to be modified. _, err = pendingcontrollers.SetPendingControllers(fedObject, ftc.GetControllers()) @@ -328,32 +376,41 @@ func removePendingController( logger.Error(err, "Failed to update pending controllers") return false, err } + if updated { + logger.V(3).Info(fmt.Sprintf("Removing pending controller %s", PrefixedFederatedHPAControllerName)) + } return updated, nil } -func addFedHPANotWorkReasonAnno(uns *unstructured.Unstructured, key string, value string) bool { +func addFedHPANotWorkReasonAnno(ctx context.Context, uns *unstructured.Unstructured, value string) bool { + logger := klog.FromContext(ctx) + annotations := uns.GetAnnotations() if annotations == nil { annotations = make(map[string]string, 1) } - if oldValue, ok := annotations[key]; ok && oldValue == value { + if oldValue, ok := annotations[FedHPANotWorkReason]; ok && oldValue == value { return false } - annotations[key] = value + logger.V(3).Info(fmt.Sprintf("Adding fed hpa not work reason annotation %s", value)) + annotations[FedHPANotWorkReason] = value uns.SetAnnotations(annotations) return true } -func removeFedHPANotWorkReasonAnno(uns *unstructured.Unstructured, key string) bool { +func removeFedHPANotWorkReasonAnno(ctx context.Context, uns *unstructured.Unstructured) bool { + logger := klog.FromContext(ctx) + annotations := uns.GetAnnotations() - if _, exists := annotations[key]; !exists { + if _, exists := annotations[FedHPANotWorkReason]; !exists { return false } - delete(annotations, key) + logger.V(3).Info("Removing fed hpa not work annotation") + delete(annotations, FedHPANotWorkReason) uns.SetAnnotations(annotations) return true diff --git a/pkg/controllers/federatedhpa/util_test.go b/pkg/controllers/federatedhpa/util_test.go new file mode 100644 index 00000000..3e41b387 --- /dev/null +++ b/pkg/controllers/federatedhpa/util_test.go @@ -0,0 +1,267 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedhpa + +import ( + "context" + "encoding/json" + "testing" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" +) + +func TestFedObjectToSourceObjectResource(t *testing.T) { + hpaObject := &autoscalingv1.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v1", + Kind: "HorizontalPodAutoscaler", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + } + hpaJSON, err := json.Marshal(hpaObject) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Create a fedcorev1a1.GenericFederatedObject + fedObj := &fedcorev1a1.FederatedObject{ + Spec: fedcorev1a1.GenericFederatedObjectSpec{ + Template: apiextensionsv1.JSON{ + Raw: hpaJSON, + }, + }, + } + + // Call the fedObjectToSourceObjectResource function + resource, err := fedObjectToSourceObjectResource(fedObj) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Assert the expected values of the Resource fields + expectedResource := Resource{ + name: "test", + namespace: "default", + gvk: schema.GroupVersionKind{Group: "autoscaling", Version: "v1", Kind: "HorizontalPodAutoscaler"}, + } + if resource.name != expectedResource.name { + t.Errorf("Expected name %q, but got %q", expectedResource.name, resource.name) + } + if resource.namespace != expectedResource.namespace { + t.Errorf("Expected namespace %q, but got %q", expectedResource.namespace, resource.namespace) + } +} + +func TestPolicyObjectToResource(t *testing.T) { + // Create a dummy fedcorev1a1.PropagationPolicy object + propagationPolicy := &fedcorev1a1.PropagationPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppObject", + Namespace: "default", + }, + } + + // Call the policyObjectToResource function + resource := policyObjectToResource(propagationPolicy) + + // Assert the expected values of the Resource fields + expectedResource := Resource{ + name: "ppObject", + namespace: "default", + gvk: schema.GroupVersionKind{ + Group: fedcorev1a1.SchemeGroupVersion.Group, + Version: fedcorev1a1.SchemeGroupVersion.Version, + Kind: PropagationPolicyKind, + }, + } + if resource.name != expectedResource.name { + t.Errorf("Expected name %q, but got %q", expectedResource.name, resource.name) + } + if resource.namespace != expectedResource.namespace { + t.Errorf("Expected namespace %q, but got %q", expectedResource.namespace, resource.namespace) + } +} + +func TestPolicyObjectToResource_ClusterPropagationPolicy(t *testing.T) { + // Create a dummy fedcorev1a1.ClusterPropagationPolicy object + clusterPropagationPolicy := &fedcorev1a1.ClusterPropagationPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cpp", + }, + } + + // Call the policyObjectToResource function + resource := policyObjectToResource(clusterPropagationPolicy) + + // Assert the expected values of the Resource fields + expectedResource := Resource{ + name: "cpp", + namespace: "", + gvk: schema.GroupVersionKind{ + Group: fedcorev1a1.SchemeGroupVersion.Group, + Version: fedcorev1a1.SchemeGroupVersion.Version, + Kind: PropagationPolicyKind, + }, + } + if resource.name != expectedResource.name { + t.Errorf("Expected name %q, but got %q", expectedResource.name, resource.name) + } + if resource.namespace != expectedResource.namespace { + t.Errorf("Expected namespace %q, but got %q", expectedResource.namespace, resource.namespace) + } +} + +func TestGenerateFederationHPANotWorkReason(t *testing.T) { + // Test case 1: newPPResource is nil + reasons1 := generateFederationHPANotWorkReason(nil, &fedcorev1a1.PropagationPolicy{}) + expectedResult1 := "[The workload is not bound to any propagationPolicy.]" + if reasons1 != expectedResult1 { + t.Errorf("Expected %q, but got %q", expectedResult1, reasons1) + } + + // Test case 2: PropagationPolicy does not exist + reasons2 := generateFederationHPANotWorkReason(&Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp1"}, nil) + expectedResult2 := "[The Policy pp1 bound to the workload does not exist.]" + if reasons2 != expectedResult2 { + t.Errorf("Expected %q, but got %q", expectedResult2, reasons2) + } + + // Test case 3: PropagationPolicy exists but not in Divided mode + reasons3 := generateFederationHPANotWorkReason(&Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp2"}, + &fedcorev1a1.PropagationPolicy{ + Spec: fedcorev1a1.PropagationPolicySpec{SchedulingMode: fedcorev1a1.SchedulingModeDuplicate}, + }) + expectedResult3 := "[The Policy pp2 bound to the workload is not Divided mode.]" + if reasons3 != expectedResult3 { + t.Errorf("Expected %q, but got %q", expectedResult3, reasons3) + } +} + +func TestGenerateDistributedHPANotWorkReason(t *testing.T) { + ctx := context.TODO() + + retainObj := &fedcorev1a1.FederatedObject{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + common.RetainReplicasAnnotation: common.AnnotationValueTrue, + }, + Name: "retain", + Namespace: "default", + }, + } + followObj := &fedcorev1a1.FederatedObject{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + common.FollowersAnnotation: `[{"group": "autoscaling", "kind": "HorizontalPodAutoscaler", "name": "test"}]`, + }, + Name: "follow", + Namespace: "default", + }, + } + retainAndFollowObj := &fedcorev1a1.FederatedObject{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + common.FollowersAnnotation: `[{"group": "autoscaling", "kind": "HorizontalPodAutoscaler", "name": "test"}]`, + common.RetainReplicasAnnotation: common.AnnotationValueTrue, + }, + Name: "retainAndFollow", + Namespace: "default", + }, + } + + hpaObj := &autoscalingv1.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v1", + Kind: "HorizontalPodAutoscaler", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + } + hpaUnsMap, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(hpaObj) + hpaUns := &unstructured.Unstructured{Object: hpaUnsMap} + + // Test case 1: Workload is not enabled for retain replicas + reasons5 := generateDistributedHPANotWorkReason(ctx, &Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp4"}, + &fedcorev1a1.PropagationPolicy{ + Spec: fedcorev1a1.PropagationPolicySpec{SchedulingMode: fedcorev1a1.SchedulingModeDuplicate}, + }, followObj, hpaUns) + expectedResult5 := "[The workload is not enabled for retain replicas.]" + if reasons5 != expectedResult5 { + t.Errorf("Expected %q, but got %q", expectedResult5, reasons5) + } + + // Test case 2: HPA does not follow the workload + reasons6 := generateDistributedHPANotWorkReason(ctx, &Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp5"}, + &fedcorev1a1.PropagationPolicy{ + Spec: fedcorev1a1.PropagationPolicySpec{SchedulingMode: fedcorev1a1.SchedulingModeDuplicate}, + }, retainObj, hpaUns) + expectedResult6 := "[The hpa is not follow the workload.]" + if reasons6 != expectedResult6 { + t.Errorf("Expected %q, but got %q", expectedResult6, reasons6) + } + + // Test case 3: newPPResource is nil + reasons1 := generateDistributedHPANotWorkReason(ctx, nil, nil, retainAndFollowObj, hpaUns) + expectedResult1 := "[The workload is not bound to any propagationPolicy.]" + if reasons1 != expectedResult1 { + t.Errorf("Expected %q, but got %q", expectedResult1, reasons1) + } + + // Test case 4: PropagationPolicy does not exist + reasons2 := generateDistributedHPANotWorkReason(ctx, &Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp1"}, + nil, retainAndFollowObj, hpaUns) + expectedResult2 := "[The Policy pp1 bound to the workload does not exist.]" + if reasons2 != expectedResult2 { + t.Errorf("Expected %q, but got %q", expectedResult2, reasons2) + } + + // Test case 5: PropagationPolicy is not in Duplicate mode + reasons3 := generateDistributedHPANotWorkReason(ctx, &Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp2"}, + &fedcorev1a1.PropagationPolicy{ + Spec: fedcorev1a1.PropagationPolicySpec{SchedulingMode: fedcorev1a1.SchedulingModeDivide}, + }, retainAndFollowObj, hpaUns) + expectedResult3 := "[The Policy pp2 bound to the workload is not Duplicate mode.]" + if reasons3 != expectedResult3 { + t.Errorf("Expected %q, but got %q", expectedResult3, reasons3) + } + + // Test case 6: PropagationPolicy is not enabled for follower scheduling + reasons4 := generateDistributedHPANotWorkReason(ctx, &Resource{gvk: schema.GroupVersionKind{Kind: "Policy"}, name: "pp3"}, + &fedcorev1a1.PropagationPolicy{ + Spec: fedcorev1a1.PropagationPolicySpec{ + SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + DisableFollowerScheduling: true, + }, + }, retainAndFollowObj, hpaUns) + expectedResult4 := "[The Policy pp3 bound to the workload is not enabled for follower scheduling.]" + if reasons4 != expectedResult4 { + t.Errorf("Expected %q, but got %q", expectedResult4, reasons4) + } +} diff --git a/pkg/controllers/follower/controller.go b/pkg/controllers/follower/controller.go index 624ca354..4eccece7 100644 --- a/pkg/controllers/follower/controller.go +++ b/pkg/controllers/follower/controller.go @@ -197,7 +197,8 @@ func (c *Controller) Run(ctx context.Context) { } func (c *Controller) HasSynced() bool { - return c.fedObjectInformer.Informer().HasSynced() && + return c.ftcInformer.Informer().HasSynced() && + c.fedObjectInformer.Informer().HasSynced() && c.clusterFedObjectInformer.Informer().HasSynced() } diff --git a/pkg/controllers/override/overridepolicy_controller.go b/pkg/controllers/override/overridepolicy_controller.go index cb5a22a4..14b668c5 100644 --- a/pkg/controllers/override/overridepolicy_controller.go +++ b/pkg/controllers/override/overridepolicy_controller.go @@ -175,24 +175,11 @@ func (c *Controller) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedTyp logger.V(2).Info("Enqueue federated objects for FTC") - allObjects := []fedcorev1a1.GenericFederatedObject{} - labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} - fedObjects, err := c.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet)) + allObjects, err := fedobjectadapters.ListAllFedObjsForFTC(ftc, c.fedObjectInformer, c.clusterFedObjectInformer) if err != nil { - c.logger.Error(err, "Failed to enqueue FederatedObjects for override policy") + c.logger.Error(err, "Failed to enqueue Objects for override policy") return } - for _, obj := range fedObjects { - allObjects = append(allObjects, obj) - } - clusterFedObjects, err := c.clusterFedObjectInformer.Lister().List(labels.Everything()) - if err != nil { - c.logger.Error(err, "Failed to enqueue ClusterFederatedObjects for override policy") - return - } - for _, obj := range clusterFedObjects { - allObjects = append(allObjects, obj) - } for _, obj := range allObjects { c.worker.Enqueue(common.NewQualifiedName(obj)) diff --git a/pkg/controllers/scheduler/scheduler.go b/pkg/controllers/scheduler/scheduler.go index 06c561cf..54442abd 100644 --- a/pkg/controllers/scheduler/scheduler.go +++ b/pkg/controllers/scheduler/scheduler.go @@ -863,24 +863,11 @@ func (s *Scheduler) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedType logger.V(2).Info("Enqueue federated objects for FTC") - allObjects := []fedcorev1a1.GenericFederatedObject{} - labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} - fedObjects, err := s.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet)) + allObjects, err := fedobjectadapters.ListAllFedObjsForFTC(ftc, s.fedObjectInformer, s.clusterFedObjectInformer) if err != nil { - logger.Error(err, "Failed to enqueue FederatedObjects for policy") + s.logger.Error(err, "Failed to enqueue Objects for override policy") return } - for _, obj := range fedObjects { - allObjects = append(allObjects, obj) - } - clusterFedObjects, err := s.clusterFedObjectInformer.Lister().List(labels.Everything()) - if err != nil { - logger.Error(err, "Failed to enqueue ClusterFederatedObjects for policy") - return - } - for _, obj := range clusterFedObjects { - allObjects = append(allObjects, obj) - } for _, obj := range allObjects { s.worker.Enqueue(common.NewQualifiedName(obj)) diff --git a/pkg/controllers/sync/controller.go b/pkg/controllers/sync/controller.go index ab90598a..52a01862 100644 --- a/pkg/controllers/sync/controller.go +++ b/pkg/controllers/sync/controller.go @@ -409,7 +409,11 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua keyedLogger.Error(err, "Failed to get pending controllers") return worker.StatusError } - if len(pendingControllers) > 0 { + isFedHPAObject, err := isFedHPAObject(ctx, fedResource.Object()) + if err != nil { + return worker.StatusError + } + if len(pendingControllers) > 0 && !isFedHPAObject { // upstream controllers have not finished processing, we wait for our turn return worker.StatusAllOK } @@ -422,7 +426,7 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer")) return worker.StatusError } - clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource) + clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource, isFedHPAObject) if err != nil { fedResource.RecordError("PrepareToSyncError", errors.Wrap(err, "Failed to prepare to sync")) return worker.StatusError @@ -440,6 +444,7 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua func (s *SyncController) prepareToSync( ctx context.Context, fedResource FederatedResource, + isFedHPAObject bool, ) ( requireSync []*fedcorev1a1.FederatedCluster, selectedClusters sets.Set[string], @@ -465,6 +470,9 @@ func (s *SyncController) prepareToSync( } selectedClusterNames := fedResource.ComputePlacement(clusters) + if isFedHPAObject { + selectedClusterNames = nil + } pendingCreateClusters := selectedClusterNames.Clone() status := fedResource.Object().GetStatus() for _, cluster := range status.Clusters { diff --git a/pkg/controllers/sync/util.go b/pkg/controllers/sync/util.go new file mode 100644 index 00000000..e787d1b2 --- /dev/null +++ b/pkg/controllers/sync/util.go @@ -0,0 +1,42 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + + "k8s.io/klog/v2" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" +) + +func isFedHPAObject(ctx context.Context, fedObject fedcorev1a1.GenericFederatedObject) (bool, error) { + logger := klog.FromContext(ctx) + + templateMetadata, err := fedObject.GetSpec().GetTemplateMetadata() + if err != nil { + logger.Error(err, "Failed to get TemplateMetadata") + return false, err + } + + if value, ok := templateMetadata.GetLabels()[common.FedHPAEnableKey]; ok && value == common.AnnotationValueTrue { + return true, nil + } + + return false, nil +} diff --git a/pkg/controllers/sync/util_test.go b/pkg/controllers/sync/util_test.go new file mode 100644 index 00000000..b8ada4f3 --- /dev/null +++ b/pkg/controllers/sync/util_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + "encoding/json" + "testing" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" +) + +func Test_isFedHPAObject(t *testing.T) { + type args struct { + fedObject fedcorev1a1.GenericFederatedObject + } + + fedHPAObject := autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + common.FedHPAEnableKey: common.AnnotationValueTrue, + }, + }, + } + fedHPAObjectJSON, err := json.Marshal(fedHPAObject) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + notFedHPAObject := autoscalingv1.HorizontalPodAutoscaler{} + notFedHPAObjectJSON, err := json.Marshal(notFedHPAObject) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + tests := []struct { + name string + args args + want bool + wantErr bool + }{ + { + "is fed hpa object", + args{ + &fedcorev1a1.FederatedObject{ + Spec: fedcorev1a1.GenericFederatedObjectSpec{ + Template: apiextensionsv1.JSON{ + Raw: fedHPAObjectJSON, + }, + }, + }, + }, + true, + false, + }, + { + "is not fed hpa object", + args{ + &fedcorev1a1.FederatedObject{ + Spec: fedcorev1a1.GenericFederatedObjectSpec{ + Template: apiextensionsv1.JSON{ + Raw: notFedHPAObjectJSON, + }, + }, + }, + }, + false, + false, + }, + { + "get fed object Spec TemplateMetadata failed", + args{ + &fedcorev1a1.FederatedObject{ + Spec: fedcorev1a1.GenericFederatedObjectSpec{}, + }, + }, + false, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := isFedHPAObject(context.TODO(), tt.args.fedObject) + if (err != nil) != tt.wantErr { + t.Errorf("isFedHPAObject() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("isFedHPAObject() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/util/fedobjectadapters/adapters.go b/pkg/util/fedobjectadapters/adapters.go index da7b0b5e..c1e73dfa 100644 --- a/pkg/util/fedobjectadapters/adapters.go +++ b/pkg/util/fedobjectadapters/adapters.go @@ -21,9 +21,11 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" fedcorev1a1client "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned/typed/core/v1alpha1" + fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" fedcorev1a1listers "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1" ) @@ -154,3 +156,32 @@ func Delete( return fedv1a1Client.FederatedObjects(namespace).Delete(ctx, name, opts) } } + +func ListAllFedObjsForFTC( + ftc *fedcorev1a1.FederatedTypeConfig, + fedObjectLister fedcorev1a1informers.FederatedObjectInformer, + clusterFedObjectLister fedcorev1a1informers.ClusterFederatedObjectInformer, +) ([]fedcorev1a1.GenericFederatedObject, error) { + allObjects := []fedcorev1a1.GenericFederatedObject{} + labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} + + if ftc.GetSourceType().Namespaced { + fedObjects, err := fedObjectLister.Lister().List(labels.SelectorFromSet(labelsSet)) + if err != nil { + return nil, err + } + for _, obj := range fedObjects { + allObjects = append(allObjects, obj) + } + } else { + clusterFedObjects, err := clusterFedObjectLister.Lister().List(labels.SelectorFromSet(labelsSet)) + if err != nil { + return nil, err + } + for _, obj := range clusterFedObjects { + allObjects = append(allObjects, obj) + } + } + + return allObjects, nil +}