From 9313ac06606c7078505195e0722696db43c809e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=AF=93?= Date: Mon, 6 Nov 2023 16:06:56 +0800 Subject: [PATCH] feat(follower): allow hpa to be used as follower resource for distribution --- cmd/controller-manager/app/core.go | 2 +- pkg/controllers/common/constants.go | 6 + pkg/controllers/federatedhpa/controller.go | 90 ++++--- pkg/controllers/federatedhpa/util.go | 238 ++++-------------- pkg/controllers/follower/controller.go | 31 ++- .../override/overridepolicy_controller.go | 12 +- pkg/controllers/scheduler/scheduler.go | 16 +- pkg/util/bijection/bijection.go | 5 +- 8 files changed, 147 insertions(+), 253 deletions(-) diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index 86e7fb3f..a1d516be 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -252,6 +252,7 @@ func startFollowerController( controllerCtx.KubeClientset, controllerCtx.FedClientset, controllerCtx.InformerManager, + controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(), controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(), controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), controllerCtx.Metrics, @@ -329,7 +330,6 @@ func startFederatedHPAController( controllerCtx.DynamicClientset, controllerCtx.InformerManager, controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(), - controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), controllerCtx.FedInformerFactory.Core().V1alpha1().PropagationPolicies(), controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(), controllerCtx.Metrics, diff --git a/pkg/controllers/common/constants.go b/pkg/controllers/common/constants.go index d14dfa90..3bd228d4 100644 --- a/pkg/controllers/common/constants.go +++ b/pkg/controllers/common/constants.go @@ -190,3 +190,9 @@ var ( // //nolint:lll const MaxFederatedObjectNameLength = 253 + +// HPAScaleTargetRefPath defines the fed hpa annotations and labels +const ( + HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path" + FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled" +) diff --git a/pkg/controllers/federatedhpa/controller.go b/pkg/controllers/federatedhpa/controller.go index 445c51aa..63ace05c 100644 --- a/pkg/controllers/federatedhpa/controller.go +++ b/pkg/controllers/federatedhpa/controller.go @@ -95,7 +95,6 @@ func NewFederatedHPAController( dynamicClient dynamic.Interface, informerManager informermanager.InformerManager, fedObjectInformer fedcorev1a1informers.FederatedObjectInformer, - clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer, propagationPolicyInformer fedcorev1a1informers.PropagationPolicyInformer, clusterPropagationPolicyInformer fedcorev1a1informers.ClusterPropagationPolicyInformer, metrics stats.Metrics, @@ -150,11 +149,6 @@ func NewFederatedHPAController( ); err != nil { return nil, err } - if _, err := clusterFedObjectInformer.Informer().AddEventHandler( - eventhandlers.NewTriggerOnAllChanges(f.enqueueFedHPAObjectsForFederatedObjects), - ); err != nil { - return nil, err - } if _, err := propagationPolicyInformer.Informer().AddEventHandler( eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAObjectsForPropagationPolicy), @@ -186,7 +180,7 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFederatedObjects(fedObje return } - if f.isHPAType(fedObject) { + if f.isHPAType(key.gvk) { f.worker.EnqueueWithDelay(key, 3*time.Second) return } @@ -215,7 +209,7 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForPropagationPolicy(policy func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) { logger := f.logger.WithValues("ftc", ftc.GetName()) - if scaleTargetRefPath, ok := ftc.GetAnnotations()[HPAScaleTargetRefPath]; ok { + if scaleTargetRefPath, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath]; ok { f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = scaleTargetRefPath } else { delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK()) @@ -225,7 +219,8 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.Fed logger.V(2).Info("Enqueue federated objects for FTC") allObjects := []fedcorev1a1.GenericFederatedObject{} - fedObjects, err := f.fedObjectInformer.Lister().List(labels.Everything()) + labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} + fedObjects, err := f.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet)) if err != nil { logger.Error(err, "Failed to enqueue FederatedObjects for policy") return @@ -235,19 +230,12 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.Fed } for _, obj := range allObjects { - templateMetadata, err := obj.GetSpec().GetTemplateMetadata() + sourceObjectResource, err := fedObjectToSourceObjectResource(obj) if err != nil { - logger.Error(err, "Failed to get source GVK from FederatedObject, will not enqueue") + logger.Error(err, "Failed to get source Resource from FederatedObject, will not enqueue") continue } - if templateMetadata.GroupVersionKind() == ftc.GetSourceTypeGVK() { - sourceObjectResource, err := fedObjectToSourceObjectResource(obj) - if err != nil { - logger.Error(err, "Failed to get source Resource from FederatedObject, will not enqueue") - continue - } - f.worker.Enqueue(sourceObjectResource) - } + f.worker.Enqueue(sourceObjectResource) } } @@ -297,7 +285,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s } hpaGVR := hpaFTC.GetSourceTypeGVR() - ctx, logger = logging.InjectLoggerValues(ctx, "hpa-ftc", hpaFTC.Name, "hpa-gvr", hpaGVR) + ctx, logger = logging.InjectLoggerValues(ctx, "hpa-ftc", hpaFTC.Name) lister, hasSynced, exists := f.informerManager.GetResourceLister(key.gvk) if !exists { @@ -393,17 +381,16 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s } } + var isHPAObjectUpdated, isFedHPAObjectUpdated bool switch hpaObject.GetLabels()[FederatedHPAMode] { case FederatedHPAModeFederation: - if res := f.addFedHPAPendingController(ctx, fedHPAObject); res != worker.StatusAllOK { + if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject); err != nil { return worker.StatusError } if !workloadExist || isPropagationPolicyDividedMode(pp) { - if res := f.removeFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason); res != worker.StatusAllOK { - return worker.StatusError - } - return f.addHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey, common.AnnotationValueTrue) + isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason) + isHPAObjectUpdated = isHPAObjectUpdated || addHPALabel(hpaObject, common.FedHPAEnableKey, common.AnnotationValueTrue) } else { hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp)) f.eventRecorder.Eventf( @@ -414,25 +401,21 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s hpaNotWorkReason, ) - if res := f.addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason, hpaNotWorkReason); res != worker.StatusAllOK { - return worker.StatusError - } - return f.removeHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey) + isHPAObjectUpdated = addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason) + isHPAObjectUpdated = isHPAObjectUpdated || removeHPALabel(hpaObject, common.FedHPAEnableKey) } case FederatedHPAModeDistributed, FederatedHPAModeDefault: - if res := f.removeHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey); res != worker.StatusAllOK { - return worker.StatusError - } + isHPAObjectUpdated = removeHPALabel(hpaObject, common.FedHPAEnableKey) if !workloadExist || isPropagationPolicyDuplicateMode(pp) && isPropagationPolicyFollowerEnabled(pp) && isWorkloadRetainReplicas(fedWorkload) && isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) { - if res := f.removeFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason); res != worker.StatusAllOK { + isHPAObjectUpdated = isHPAObjectUpdated || removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason) + if isFedHPAObjectUpdated, err = removePendingController(ctx, hpaFTC, fedHPAObject); err != nil { return worker.StatusError } - return f.removePendingController(ctx, hpaFTC, fedHPAObject) } else { hpaNotWorkReason := generateDistributedHPANotWorkReason( isPropagationPolicyExist(pp), @@ -448,10 +431,45 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s hpaNotWorkReason, ) - if res := f.addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason, hpaNotWorkReason); res != worker.StatusAllOK { + isHPAObjectUpdated = isHPAObjectUpdated || addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason) + if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject); err != nil { return worker.StatusError } - return f.addFedHPAPendingController(ctx, fedHPAObject) + } + } + + if isHPAObjectUpdated { + logger.V(1).Info("Updating hpa object") + _, err := f.dynamicClient.Resource(hpaGVR).Namespace(hpaObject.GetNamespace()). + UpdateStatus(ctx, hpaObject, metav1.UpdateOptions{}) + if err != nil { + errMsg := "Failed to update hpa object" + logger.Error(err, errMsg) + f.eventRecorder.Eventf(hpaObject, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, + errMsg+" %v, err: %v, retry later", key, err) + if apierrors.IsConflict(err) { + return worker.StatusConflict + } + return worker.StatusError + } + } + + if isFedHPAObjectUpdated { + logger.V(1).Info("Updating fed hpa object") + if _, err = fedobjectadapters.Update( + ctx, + f.fedClient.CoreV1alpha1(), + fedHPAObject, + metav1.UpdateOptions{}, + ); err != nil { + errMsg := "Failed to update fed hpa object" + logger.Error(err, errMsg) + f.eventRecorder.Eventf(fedHPAObject, corev1.EventTypeWarning, EventReasonUpdateHPAFedObject, + errMsg+" %v, err: %v, retry later", fedHPAObject, err) + if apierrors.IsConflict(err) { + return worker.StatusConflict + } + return worker.StatusError } } diff --git a/pkg/controllers/federatedhpa/util.go b/pkg/controllers/federatedhpa/util.go index 0810f933..d6910929 100644 --- a/pkg/controllers/federatedhpa/util.go +++ b/pkg/controllers/federatedhpa/util.go @@ -23,8 +23,6 @@ import ( "github.com/pkg/errors" autoscalingv1 "k8s.io/api/autoscaling/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -35,19 +33,14 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/follower" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler" - "github.com/kubewharf/kubeadmiral/pkg/util/fedobjectadapters" "github.com/kubewharf/kubeadmiral/pkg/util/pendingcontrollers" - "github.com/kubewharf/kubeadmiral/pkg/util/worker" ) const ( PropagationPolicyKind = "PropagationPolicy" ClusterPropagationPolicyKind = "ClusterPropagationPolicy" - HPAScaleTargetRefPath = "hpa.kubeadmiral.io/scale-target-ref-path" - FedHPANotWorkReason = "hpa.kubeadmiral.io/fed-hpa-not-work-reason" - - HPAEnableKey = "fed-hpa-enabled" + FedHPANotWorkReason = common.DefaultPrefix + "fed-hpa-not-work-reason" EventReasonUpdateHPASourceObject = "UpdateHPASourceObject" EventReasonUpdateHPAFedObject = "UpdateHPAFedObject" @@ -141,7 +134,7 @@ func generateDistributedHPANotWorkReason( } func isHPAFTCAnnoChanged(lastObserved, latest *fedcorev1a1.FederatedTypeConfig) bool { - return lastObserved.GetAnnotations()[HPAScaleTargetRefPath] != latest.GetAnnotations()[HPAScaleTargetRefPath] + return lastObserved.GetAnnotations()[common.HPAScaleTargetRefPath] != latest.GetAnnotations()[common.HPAScaleTargetRefPath] } func isPropagationPolicyExist(pp fedcorev1a1.GenericPropagationPolicy) bool { @@ -186,29 +179,25 @@ func isHPAFollowTheWorkload( }) } -func (f *FederatedHPAController) isHPAType(fedObject metav1.Object) bool { - if federatedObject, ok := fedObject.(*fedcorev1a1.FederatedObject); !ok { +func (f *FederatedHPAController) isHPAType(resourceGVK schema.GroupVersionKind) bool { + ftc, exists := f.informerManager.GetResourceFTC(resourceGVK) + if !exists { return false - } else { - metadata, _ := federatedObject.Spec.GetTemplateMetadata() - ftc, exists := f.informerManager.GetResourceFTC(metadata.GroupVersionKind()) - if !exists { - return false - } + } - // HPA gvk has already been stored - if _, ok := f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()]; ok { - return true - } + // HPA gvk has already been stored + if _, ok := f.scaleTargetRefMapping[resourceGVK]; ok { + return true + } - if path, ok := ftc.Annotations[HPAScaleTargetRefPath]; ok { - f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = path - return true - } else { - delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK()) - return false - } + if path, ok := ftc.Annotations[common.HPAScaleTargetRefPath]; ok { + f.scaleTargetRefMapping[resourceGVK] = path + return true + } else { + delete(f.scaleTargetRefMapping, resourceGVK) + return false } + } func isWorkloadRetainReplicas(fedObj metav1.Object) bool { @@ -248,130 +237,76 @@ func scaleTargetRefToResource( } func getPropagationPolicyResourceFromFedWorkload(workload fedcorev1a1.GenericFederatedObject) *Resource { - if policyName, exists := workload.GetLabels()[scheduler.ClusterPropagationPolicyNameLabel]; exists { + if policyName, exists := workload.GetLabels()[scheduler.PropagationPolicyNameLabel]; exists { return &Resource{ gvk: schema.GroupVersionKind{ Group: fedcorev1a1.SchemeGroupVersion.Group, Version: fedcorev1a1.SchemeGroupVersion.Version, - Kind: ClusterPropagationPolicyKind, + Kind: PropagationPolicyKind, }, - name: policyName, + name: policyName, + namespace: workload.GetNamespace(), } } - if policyName, exists := workload.GetLabels()[scheduler.PropagationPolicyNameLabel]; exists { + if policyName, exists := workload.GetLabels()[scheduler.ClusterPropagationPolicyNameLabel]; exists { return &Resource{ gvk: schema.GroupVersionKind{ Group: fedcorev1a1.SchemeGroupVersion.Group, Version: fedcorev1a1.SchemeGroupVersion.Version, - Kind: PropagationPolicyKind, + Kind: ClusterPropagationPolicyKind, }, - name: policyName, - namespace: workload.GetNamespace(), + name: policyName, } } return nil } -func (f *FederatedHPAController) addHPALabel( - ctx context.Context, - uns *unstructured.Unstructured, - gvr schema.GroupVersionResource, - key, value string, -) worker.Result { - logger := klog.FromContext(ctx) - +func addHPALabel(uns *unstructured.Unstructured, key, value string) bool { labels := uns.GetLabels() if labels == nil { labels = make(map[string]string, 1) } if oldValue, ok := labels[key]; ok && oldValue == value { - return worker.StatusAllOK + return false } labels[key] = value uns.SetLabels(labels) - logger.V(1).Info("Adding fed hpa label of source object") - _, err := f.dynamicClient.Resource(gvr).Namespace(uns.GetNamespace()). - UpdateStatus(ctx, uns, metav1.UpdateOptions{}) - if err != nil { - errMsg := "Failed to add fed hpa annotation" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(uns, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, - errMsg+" %v, err: %v, retry later", key, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } - - return worker.StatusAllOK + return true } -func (f *FederatedHPAController) removeHPALabel( - ctx context.Context, - uns *unstructured.Unstructured, - gvr schema.GroupVersionResource, - key string, -) worker.Result { - logger := klog.FromContext(ctx) - +func removeHPALabel(uns *unstructured.Unstructured, key string) bool { labels := uns.GetLabels() - if labels == nil { - return worker.StatusAllOK - } if _, ok := labels[key]; !ok { - return worker.StatusAllOK + return false } delete(labels, key) uns.SetLabels(labels) - logger.V(1).Info("Removing fed hpa label of source object") - _, err := f.dynamicClient.Resource(gvr).Namespace(uns.GetNamespace()). - UpdateStatus(ctx, uns, metav1.UpdateOptions{}) - if err != nil { - errMsg := "Failed to remove fed hpa label" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(uns, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, - errMsg+" %v, err: %v, retry later", key, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } - - return worker.StatusAllOK + return true } -func (f *FederatedHPAController) addFedHPAPendingController( - ctx context.Context, - fedObject fedcorev1a1.GenericFederatedObject, -) worker.Result { +func addFedHPAPendingController(ctx context.Context, fedObject fedcorev1a1.GenericFederatedObject) (bool, error) { logger := klog.FromContext(ctx) pendControllers, err := pendingcontrollers.GetPendingControllers(fedObject) if err != nil { logger.Error(err, "Failed to get pending controllers") - return worker.StatusError + return false, err } - var hpaControllerExist bool for _, controllers := range pendControllers { for _, controller := range controllers { if controller == PrefixedFederatedHPAControllerName { - hpaControllerExist = true - break + return false, nil } } } - if hpaControllerExist { - return worker.StatusAllOK - } - if pendControllers == nil { pendControllers = [][]string{{PrefixedFederatedHPAControllerName}} } else { @@ -381,34 +316,17 @@ func (f *FederatedHPAController) addFedHPAPendingController( _, err = pendingcontrollers.SetPendingControllers(fedObject, pendControllers) if err != nil { logger.Error(err, "Failed to set pending controllers") - return worker.StatusError - } - - logger.V(1).Info("Adding pending controller") - if _, err = fedobjectadapters.Update( - ctx, - f.fedClient.CoreV1alpha1(), - fedObject, - metav1.UpdateOptions{}, - ); err != nil { - errMsg := "Failed to add pending controller" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(fedObject, corev1.EventTypeWarning, EventReasonUpdateHPAFedObject, - errMsg+" %v, err: %v, retry later", fedObject, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError + return false, err } - return worker.StatusAllOK + return true, nil } -func (f *FederatedHPAController) removePendingController( +func removePendingController( ctx context.Context, ftc *fedcorev1a1.FederatedTypeConfig, fedObject fedcorev1a1.GenericFederatedObject, -) worker.Result { +) (bool, error) { logger := klog.FromContext(ctx) updated, err := pendingcontrollers.UpdatePendingControllers( @@ -419,89 +337,35 @@ func (f *FederatedHPAController) removePendingController( ) if err != nil { logger.Error(err, "Failed to update pending controllers") - return worker.StatusError - } - - if updated { - logger.V(1).Info("Removing pending controller") - if _, err = fedobjectadapters.Update( - ctx, - f.fedClient.CoreV1alpha1(), - fedObject, - metav1.UpdateOptions{}, - ); err != nil { - errMsg := "Failed to remove pending controller" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(fedObject, corev1.EventTypeWarning, EventReasonUpdateHPAFedObject, - errMsg+" %v, err: %v, retry later", fedObject, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } + return false, err } - return worker.StatusAllOK + return updated, nil } -func (f *FederatedHPAController) addFedHPANotWorkReasonAnno( - ctx context.Context, - uns *unstructured.Unstructured, - gvr schema.GroupVersionResource, - key string, - value string, -) worker.Result { - logger := klog.FromContext(ctx) - +func addFedHPANotWorkReasonAnno(uns *unstructured.Unstructured, key string, value string) bool { annotations := uns.GetAnnotations() if annotations == nil { annotations = make(map[string]string, 1) } + if oldValue, ok := annotations[key]; ok && oldValue == value { + return false + } + annotations[key] = value uns.SetAnnotations(annotations) - logger.V(1).Info("Adding fed hpa not work reason annotation") - _, err := f.dynamicClient.Resource(gvr).Namespace(uns.GetNamespace()). - UpdateStatus(ctx, uns, metav1.UpdateOptions{}) - if err != nil { - errMsg := "Failed to add fed hpa not work reason annotation" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(uns, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, - errMsg+" %v, err: %v, retry later", key, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } - - return worker.StatusAllOK + return true } -func (f *FederatedHPAController) removeFedHPANotWorkReasonAnno( - ctx context.Context, - uns *unstructured.Unstructured, - gvr schema.GroupVersionResource, - key string, -) worker.Result { - logger := klog.FromContext(ctx) - +func removeFedHPANotWorkReasonAnno(uns *unstructured.Unstructured, key string) bool { annotations := uns.GetAnnotations() + if _, exists := annotations[key]; !exists { + return false + } + delete(annotations, key) uns.SetAnnotations(annotations) - logger.V(1).Info("Removing fed hpa not work reason annotation") - _, err := f.dynamicClient.Resource(gvr).Namespace(uns.GetNamespace()). - UpdateStatus(ctx, uns, metav1.UpdateOptions{}) - if err != nil { - errMsg := "Failed to remove fed hpa not work reason annotation" - logger.Error(err, errMsg) - f.eventRecorder.Eventf(uns, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject, - errMsg+" %v, err: %v, retry later", key, err) - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - return worker.StatusError - } - - return worker.StatusAllOK + return true } diff --git a/pkg/controllers/follower/controller.go b/pkg/controllers/follower/controller.go index feb6f0f3..624ca354 100644 --- a/pkg/controllers/follower/controller.go +++ b/pkg/controllers/follower/controller.go @@ -32,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/scale/scheme/autoscalingv1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" @@ -74,7 +73,6 @@ var ( {Group: "", Kind: common.PodKind}: "", } - // todo: dynamic add custom hpa supportedFollowerTypes = sets.New( schema.GroupKind{Group: "", Kind: common.ConfigMapKind}, schema.GroupKind{Group: "", Kind: common.SecretKind}, @@ -82,7 +80,6 @@ var ( schema.GroupKind{Group: "", Kind: common.ServiceAccountKind}, schema.GroupKind{Group: "", Kind: common.ServiceKind}, schema.GroupKind{Group: networkingv1.GroupName, Kind: common.IngressKind}, - schema.GroupKind{Group: autoscalingv1.GroupName, Kind: common.HorizontalPodAutoscalerKind}, ) ) @@ -105,6 +102,7 @@ type Controller struct { worker worker.ReconcileWorker[objectGroupKindKey] informerManager informermanager.InformerManager + ftcInformer fedcorev1a1informers.FederatedTypeConfigInformer fedObjectInformer fedcorev1a1informers.FederatedObjectInformer clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer @@ -122,6 +120,7 @@ func NewFollowerController( kubeClient kubernetes.Interface, fedClient fedclient.Interface, informerManager informermanager.InformerManager, + ftcInformer fedcorev1a1informers.FederatedTypeConfigInformer, fedObjectInformer fedcorev1a1informers.FederatedObjectInformer, clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer, metrics stats.Metrics, @@ -131,6 +130,7 @@ func NewFollowerController( c := &Controller{ gkToFTCName: make(map[schema.GroupKind]string), informerManager: informerManager, + ftcInformer: ftcInformer, fedObjectInformer: fedObjectInformer, clusterFedObjectInformer: clusterFedObjectInformer, cacheObservedFromLeaders: newBidirectionalCache[fedcorev1a1.LeaderReference, FollowerReference](), @@ -214,7 +214,8 @@ func (c *Controller) enqueueSupportedType(object interface{}) { templateGK := template.GroupVersionKind().GroupKind() _, isLeader := leaderPodSpecPaths[templateGK] - isFollower := supportedFollowerTypes.Has(templateGK) + isHPAType, _ := c.isHPAType(templateGK) + isFollower := supportedFollowerTypes.Has(templateGK) || isHPAType if isLeader || isFollower { c.worker.Enqueue(objectGroupKindKey{ sourceGK: templateGK, @@ -230,7 +231,12 @@ func (c *Controller) reconcile(ctx context.Context, key objectGroupKindKey) (sta return c.reconcileLeader(ctx, key) } - if supportedFollowerTypes.Has(key.sourceGK) { + isHPAType, err := c.isHPAType(key.sourceGK) + if err != nil { + return worker.StatusError + } + + if supportedFollowerTypes.Has(key.sourceGK) || isHPAType { return c.reconcileFollower(ctx, key) } @@ -558,3 +564,18 @@ func (c *Controller) leaderPlacementUnion( return clusters, nil } + +func (c *Controller) isHPAType(gk schema.GroupKind) (bool, error) { + c.gkToFTCLock.RLock() + defer c.gkToFTCLock.RUnlock() + + ftcName := c.gkToFTCName[gk] + ftc, err := c.ftcInformer.Lister().Get(ftcName) + if err != nil { + c.logger.Error(err, "Failed to get ftc") + return false, err + } + + _, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath] + return ok, nil +} diff --git a/pkg/controllers/override/overridepolicy_controller.go b/pkg/controllers/override/overridepolicy_controller.go index b10be234..cb5a22a4 100644 --- a/pkg/controllers/override/overridepolicy_controller.go +++ b/pkg/controllers/override/overridepolicy_controller.go @@ -176,7 +176,8 @@ func (c *Controller) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedTyp logger.V(2).Info("Enqueue federated objects for FTC") allObjects := []fedcorev1a1.GenericFederatedObject{} - fedObjects, err := c.fedObjectInformer.Lister().List(labels.Everything()) + labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} + fedObjects, err := c.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet)) if err != nil { c.logger.Error(err, "Failed to enqueue FederatedObjects for override policy") return @@ -194,14 +195,7 @@ func (c *Controller) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedTyp } for _, obj := range allObjects { - sourceMetadata, err := obj.GetSpec().GetTemplateMetadata() - if err != nil { - c.logger.Error(err, "Failed to get source metadata from FederatedObject, will not enqueue") - continue - } - if sourceMetadata.GroupVersionKind() == ftc.GetSourceTypeGVK() { - c.worker.Enqueue(common.NewQualifiedName(obj)) - } + c.worker.Enqueue(common.NewQualifiedName(obj)) } } diff --git a/pkg/controllers/scheduler/scheduler.go b/pkg/controllers/scheduler/scheduler.go index ff6697ef..06c561cf 100644 --- a/pkg/controllers/scheduler/scheduler.go +++ b/pkg/controllers/scheduler/scheduler.go @@ -864,7 +864,8 @@ func (s *Scheduler) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedType logger.V(2).Info("Enqueue federated objects for FTC") allObjects := []fedcorev1a1.GenericFederatedObject{} - fedObjects, err := s.fedObjectInformer.Lister().List(labels.Everything()) + labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind} + fedObjects, err := s.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet)) if err != nil { logger.Error(err, "Failed to enqueue FederatedObjects for policy") return @@ -882,16 +883,9 @@ func (s *Scheduler) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedType } for _, obj := range allObjects { - templateMetadata, err := obj.GetSpec().GetTemplateMetadata() - if err != nil { - logger.Error(err, "Failed to get source GVK from FederatedObject, will not enqueue") - continue - } - if templateMetadata.GroupVersionKind() == ftc.GetSourceTypeGVK() { - s.worker.Enqueue(common.NewQualifiedName(obj)) - s.metrics.Counter(utilmetrics.QueueIncomingFederatedObjectTotal, 1, - stats.Tag{Name: "event", Value: FTCChanged}) - } + s.worker.Enqueue(common.NewQualifiedName(obj)) + s.metrics.Counter(utilmetrics.QueueIncomingFederatedObjectTotal, 1, + stats.Tag{Name: "event", Value: FTCChanged}) } } diff --git a/pkg/util/bijection/bijection.go b/pkg/util/bijection/bijection.go index 7a7eff66..9d42f1cd 100644 --- a/pkg/util/bijection/bijection.go +++ b/pkg/util/bijection/bijection.go @@ -130,11 +130,8 @@ func (o *OneToManyRelation[T1, T2]) LookupByT1(key T1) (value sets.Set[T2], exis defer o.lock.RUnlock() val, exists := o.t1ToT2Map[key] - if !exists { - return nil, false - } - return val, true + return val, exists } func (o *OneToManyRelation[T1, T2]) LookupByT2(key T2) (value T1, exists bool) {