Skip to content

Commit

Permalink
feat(hpa): add unit test and add event
Browse files Browse the repository at this point in the history
  • Loading branch information
wy-lucky committed Nov 6, 2023
1 parent b840080 commit 7b45b52
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func startFederatedHPAController(
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating status-aggregator controller: %w", err)
return nil, fmt.Errorf("error creating federated-hpa controller: %w", err)
}

go federatedHPAController.Run(ctx)
Expand Down
23 changes: 23 additions & 0 deletions config/sample/host/01-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,26 @@ spec:
fields:
- metadata.creationTimestamp
- status
---
apiVersion: core.kubeadmiral.io/v1alpha1
kind: FederatedTypeConfig
metadata:
annotations:
hpa.kubeadmiral.io/scale-target-ref-path: spec.scaleTargetRef
name: horizontalpodautoscalers.autoscaling
spec:
sourceType:
group: autoscaling
kind: HorizontalPodAutoscaler
pluralName: horizontalpodautoscalers
scope: Namespaced
version: v1
controllers:
- - kubeadmiral.io/federatedhpa-controller
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
statusCollection:
enabled: true
fields:
- metadata.creationTimestamp
- status
2 changes: 1 addition & 1 deletion hack/make-rules/dev-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ MEMBER_CLUSTER_NAME=${MEMBER_CLUSTER_NAME:-"kubeadmiral-member"}
MANIFEST_DIR=${MANIFEST_DIR:-"${REPO_ROOT}/config/crds"}
CONFIG_DIR=${CONFIG_DIR:-"${REPO_ROOT}/config/sample/host"}
NUM_MEMBER_CLUSTERS=${NUM_MEMBER_CLUSTERS:-"3"}
CLUSTER_PROVIDER=${CLUSTER_PROVIDER:-"kind"}
CLUSTER_PROVIDER=${CLUSTER_PROVIDER:-"kwok"}
KUBE_VERSION=${KUBE_VERSION:="v1.20.15"}

mkdir -p "$(dirname "${KUBECONFIG_DIR}")"
Expand Down
31 changes: 16 additions & 15 deletions pkg/controllers/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,22 @@ const (
PodResource = "pods"
ReplicaSetResource = "replicasets"

NamespaceKind = "Namespace"
DeploymentKind = "Deployment"
StatefulSetKind = "StatefulSet"
DaemonSetKind = "DaemonSet"
JobKind = "Job"
CronJobKind = "CronJob"
ConfigMapKind = "ConfigMap"
SecretKind = "Secret"
ServiceKind = "Service"
ServiceAccountKind = "ServiceAccount"
IngressKind = "Ingress"
PersistentVolumeKind = "PersistentVolume"
PersistentVolumeClaimKind = "PersistentVolumeClaim"
PodKind = "Pod"
ReplicaSetKind = "ReplicaSet"
NamespaceKind = "Namespace"
DeploymentKind = "Deployment"
StatefulSetKind = "StatefulSet"
DaemonSetKind = "DaemonSet"
JobKind = "Job"
CronJobKind = "CronJob"
ConfigMapKind = "ConfigMap"
SecretKind = "Secret"
ServiceKind = "Service"
ServiceAccountKind = "ServiceAccount"
IngressKind = "Ingress"
PersistentVolumeKind = "PersistentVolume"
PersistentVolumeClaimKind = "PersistentVolumeClaim"
PodKind = "Pod"
ReplicaSetKind = "ReplicaSet"
HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler"
)

var (
Expand Down
101 changes: 78 additions & 23 deletions pkg/controllers/federatedhpa/controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2023 The KubeAdmiral Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package federatedhpa

import (
Expand All @@ -6,6 +22,7 @@ import (
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -40,6 +57,9 @@ const (
FederatedHPAControllerName = "federatedhpa-controller"
PrefixedFederatedHPAControllerName = common.DefaultPrefix + FederatedHPAControllerName

EventReasonFederationHPANotWork = "FederationHPANotWork"
EventReasonDistributedHPANotWork = "DistributedHPANotWork"

FederatedHPAMode = "hpa-mode"
FederatedHPAModeFederation = "federation"
FederatedHPAModeDistributed = "distributed"
Expand Down Expand Up @@ -126,31 +146,31 @@ func NewFederatedHPAController(
}

if _, err := fedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChanges(f.enqueueFedHPAForFederatedObjects),
eventhandlers.NewTriggerOnAllChanges(f.enqueueFedHPAObjectsForFederatedObjects),
); err != nil {
return nil, err
}
if _, err := clusterFedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChanges(f.enqueueFedHPAForFederatedObjects),
eventhandlers.NewTriggerOnAllChanges(f.enqueueFedHPAObjectsForFederatedObjects),
); err != nil {
return nil, err
}

if _, err := propagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAForPropagationPolicy),
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAObjectsForPropagationPolicy),
); err != nil {
return nil, err
}
if _, err := clusterPropagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAForPropagationPolicy),
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAObjectsForPropagationPolicy),
); err != nil {
return nil, err
}

if err := informerManager.AddFTCUpdateHandler(func(lastObserved, latest *fedcorev1a1.FederatedTypeConfig) {
if lastObserved == nil && latest != nil ||
lastObserved != nil && latest != nil && isHPAFTCAnnoChanged(lastObserved, latest) {
f.enqueueFederatedObjectsForFTC(latest)
f.enqueueFedHPAObjectsForFTC(latest)
}
}); err != nil {
return nil, err
Expand All @@ -159,14 +179,14 @@ func NewFederatedHPAController(
return f, nil
}

func (f *FederatedHPAController) enqueueFedHPAForFederatedObjects(fo metav1.Object) {
key, err := fedObjectToSourceObjectResource(fo)
func (f *FederatedHPAController) enqueueFedHPAObjectsForFederatedObjects(fedObject metav1.Object) {
key, err := fedObjectToSourceObjectResource(fedObject)
if err != nil {
f.logger.Error(err, "Failed to get source object resource from fed object")
return
}

if f.isHPAType(fo) {
if f.isHPAType(fedObject) {
f.worker.EnqueueWithDelay(key, 3*time.Second)
return
}
Expand All @@ -178,7 +198,7 @@ func (f *FederatedHPAController) enqueueFedHPAForFederatedObjects(fo metav1.Obje
}
}

func (f *FederatedHPAController) enqueueFedHPAForPropagationPolicy(policy metav1.Object) {
func (f *FederatedHPAController) enqueueFedHPAObjectsForPropagationPolicy(policy metav1.Object) {
key := policyObjectToResource(policy)

if workloads, exist := f.ppWorkloadMapping.LookupByT1(key); exist {
Expand All @@ -192,7 +212,7 @@ func (f *FederatedHPAController) enqueueFedHPAForPropagationPolicy(policy metav1
}
}

func (f *FederatedHPAController) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) {
func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) {
logger := f.logger.WithValues("ftc", ftc.GetName())

if scaleTargetRefPath, ok := ftc.GetAnnotations()[HPAScaleTargetRefPath]; ok {
Expand Down Expand Up @@ -272,7 +292,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s

hpaFTC, exists := f.informerManager.GetResourceFTC(key.gvk)
if !exists {
// Waiting for func enqueueFederatedObjectsForFTC enqueue it again.
// Waiting for func enqueueFedHPAObjectsForFTC enqueue it again.
return worker.StatusAllOK
}

Expand Down Expand Up @@ -328,7 +348,9 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusError
}

ctx, logger = logging.InjectLoggerValues(ctx, "workload-object", newWorkloadResource.QualifiedName(), "workload-gvk", newWorkloadResource.gvk)
ctx, logger = logging.InjectLoggerValues(ctx,
"workload-object", newWorkloadResource.QualifiedName(),
"workload-gvk", newWorkloadResource.gvk)

oldWorkloadResource, exist := f.workloadHPAMapping.LookupByT2(key)
if exist {
Expand All @@ -351,7 +373,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s

var pp fedcorev1a1.GenericPropagationPolicy
if workloadExist {
newPPResource := getPPResourceFromFedWorkload(fedWorkload)
newPPResource := getPropagationPolicyResourceFromFedWorkload(fedWorkload)
if newPPResource != nil {
_, exist = f.ppWorkloadMapping.LookupByT2(newWorkloadResource)
if exist {
Expand All @@ -363,7 +385,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusError
}

pp, err = f.getPPFromResource(newPPResource)
pp, err = f.getPropagationPolicyFromResource(newPPResource)
if err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to get pp from pp resource")
return worker.StatusError
Expand All @@ -377,24 +399,58 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
return worker.StatusError
}

if !workloadExist || isPPExist(pp) && isPPDivided(pp) {
return f.addFedHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey, common.AnnotationValueTrue)
if !workloadExist || isPropagationPolicyDividedMode(pp) {
if res := f.removeFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason); res != worker.StatusAllOK {
return worker.StatusError
}
return f.addHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey, common.AnnotationValueTrue)
} else {
return f.removeFedHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey)
hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp))
f.eventRecorder.Eventf(
hpaObject,
corev1.EventTypeWarning,
EventReasonFederationHPANotWork,
"Federation HPA not work: %s",
hpaNotWorkReason,
)

if res := f.addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason, hpaNotWorkReason); res != worker.StatusAllOK {
return worker.StatusError
}
return f.removeHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey)
}

case FederatedHPAModeDistributed, FederatedHPAModeDefault:
if res := f.removeFedHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey); res != worker.StatusAllOK {
if res := f.removeHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey); res != worker.StatusAllOK {
return worker.StatusError
}

if !workloadExist || isPPExist(pp) &&
!isPPDivided(pp) &&
isPPFollowerEnabled(pp) &&
if !workloadExist || isPropagationPolicyDuplicateMode(pp) &&
isPropagationPolicyFollowerEnabled(pp) &&
isWorkloadRetainReplicas(fedWorkload) &&
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) {
if res := f.removeFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason); res != worker.StatusAllOK {
return worker.StatusError
}
return f.removePendingController(ctx, hpaFTC, fedHPAObject)
} else {
hpaNotWorkReason := generateDistributedHPANotWorkReason(
isPropagationPolicyExist(pp),
isPropagationPolicyDuplicateMode(pp),
isPropagationPolicyFollowerEnabled(pp),
isWorkloadRetainReplicas(fedWorkload),
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload))
f.eventRecorder.Eventf(
hpaObject,
corev1.EventTypeWarning,
EventReasonDistributedHPANotWork,
"Distributed HPA not work: %s",
hpaNotWorkReason,
)

if res := f.addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason, hpaNotWorkReason); res != worker.StatusAllOK {
return worker.StatusError
}
return f.addFedHPAPendingController(ctx, fedHPAObject)
}
}
Expand All @@ -410,7 +466,6 @@ func (f *FederatedHPAController) getFedWorkLoadFromResource(workload Resource) (

fedObjectName := naming.GenerateFederatedObjectName(workload.name, workloadFTC.Name)

// get fed hpa object
fedObject, err := fedobjectadapters.GetFromLister(
f.fedObjectInformer.Lister(),
nil,
Expand All @@ -424,7 +479,7 @@ func (f *FederatedHPAController) getFedWorkLoadFromResource(workload Resource) (
return fedObject, nil
}

func (f *FederatedHPAController) getPPFromResource(resource *Resource) (fedcorev1a1.GenericPropagationPolicy, error) {
func (f *FederatedHPAController) getPropagationPolicyFromResource(resource *Resource) (fedcorev1a1.GenericPropagationPolicy, error) {
if resource.gvk.Kind == PropagationPolicyKind {
pp, err := f.propagationPolicyInformer.Lister().PropagationPolicies(resource.namespace).Get(resource.name)
if err != nil {
Expand Down
Loading

0 comments on commit 7b45b52

Please sign in to comment.