Skip to content

Commit

Permalink
feat(follower): allow hpa to be used as follower resource for distrib…
Browse files Browse the repository at this point in the history
…ution
  • Loading branch information
wy-lucky committed Nov 6, 2023
1 parent 7b45b52 commit 9313ac0
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 253 deletions.
2 changes: 1 addition & 1 deletion cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func startFollowerController(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(),
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.Metrics,
Expand Down Expand Up @@ -329,7 +330,6 @@ func startFederatedHPAController(
controllerCtx.DynamicClientset,
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().PropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(),
controllerCtx.Metrics,
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,9 @@ var (
//
//nolint:lll
const MaxFederatedObjectNameLength = 253

// HPAScaleTargetRefPath defines the fed hpa annotations and labels
const (
HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path"
FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled"
)
90 changes: 54 additions & 36 deletions pkg/controllers/federatedhpa/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func NewFederatedHPAController(
dynamicClient dynamic.Interface,
informerManager informermanager.InformerManager,
fedObjectInformer fedcorev1a1informers.FederatedObjectInformer,
clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer,
propagationPolicyInformer fedcorev1a1informers.PropagationPolicyInformer,
clusterPropagationPolicyInformer fedcorev1a1informers.ClusterPropagationPolicyInformer,
metrics stats.Metrics,
Expand Down Expand Up @@ -150,11 +149,6 @@ func NewFederatedHPAController(
); err != nil {
return nil, err
}
if _, err := clusterFedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChanges(f.enqueueFedHPAObjectsForFederatedObjects),
); err != nil {
return nil, err
}

if _, err := propagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAObjectsForPropagationPolicy),
Expand Down Expand Up @@ -186,7 +180,7 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFederatedObjects(fedObje
return
}

if f.isHPAType(fedObject) {
if f.isHPAType(key.gvk) {
f.worker.EnqueueWithDelay(key, 3*time.Second)
return
}
Expand Down Expand Up @@ -215,7 +209,7 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForPropagationPolicy(policy
func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) {
logger := f.logger.WithValues("ftc", ftc.GetName())

if scaleTargetRefPath, ok := ftc.GetAnnotations()[HPAScaleTargetRefPath]; ok {
if scaleTargetRefPath, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath]; ok {
f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = scaleTargetRefPath
} else {
delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK())
Expand All @@ -225,7 +219,8 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.Fed
logger.V(2).Info("Enqueue federated objects for FTC")

allObjects := []fedcorev1a1.GenericFederatedObject{}
fedObjects, err := f.fedObjectInformer.Lister().List(labels.Everything())
labelsSet := labels.Set{ftc.GetSourceTypeGVK().GroupVersion().String(): ftc.GetSourceTypeGVK().Kind}
fedObjects, err := f.fedObjectInformer.Lister().List(labels.SelectorFromSet(labelsSet))
if err != nil {
logger.Error(err, "Failed to enqueue FederatedObjects for policy")
return
Expand All @@ -235,19 +230,12 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.Fed
}

for _, obj := range allObjects {
templateMetadata, err := obj.GetSpec().GetTemplateMetadata()
sourceObjectResource, err := fedObjectToSourceObjectResource(obj)
if err != nil {
logger.Error(err, "Failed to get source GVK from FederatedObject, will not enqueue")
logger.Error(err, "Failed to get source Resource from FederatedObject, will not enqueue")
continue
}
if templateMetadata.GroupVersionKind() == ftc.GetSourceTypeGVK() {
sourceObjectResource, err := fedObjectToSourceObjectResource(obj)
if err != nil {
logger.Error(err, "Failed to get source Resource from FederatedObject, will not enqueue")
continue
}
f.worker.Enqueue(sourceObjectResource)
}
f.worker.Enqueue(sourceObjectResource)
}
}

Expand Down Expand Up @@ -297,7 +285,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
}

hpaGVR := hpaFTC.GetSourceTypeGVR()
ctx, logger = logging.InjectLoggerValues(ctx, "hpa-ftc", hpaFTC.Name, "hpa-gvr", hpaGVR)
ctx, logger = logging.InjectLoggerValues(ctx, "hpa-ftc", hpaFTC.Name)

lister, hasSynced, exists := f.informerManager.GetResourceLister(key.gvk)
if !exists {
Expand Down Expand Up @@ -393,17 +381,16 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
}
}

var isHPAObjectUpdated, isFedHPAObjectUpdated bool
switch hpaObject.GetLabels()[FederatedHPAMode] {
case FederatedHPAModeFederation:
if res := f.addFedHPAPendingController(ctx, fedHPAObject); res != worker.StatusAllOK {
if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject); err != nil {
return worker.StatusError
}

if !workloadExist || isPropagationPolicyDividedMode(pp) {
if res := f.removeFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason); res != worker.StatusAllOK {
return worker.StatusError
}
return f.addHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey, common.AnnotationValueTrue)
isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || addHPALabel(hpaObject, common.FedHPAEnableKey, common.AnnotationValueTrue)
} else {
hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp))
f.eventRecorder.Eventf(
Expand All @@ -414,25 +401,21 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
hpaNotWorkReason,
)

if res := f.addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason, hpaNotWorkReason); res != worker.StatusAllOK {
return worker.StatusError
}
return f.removeHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey)
isHPAObjectUpdated = addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason)
isHPAObjectUpdated = isHPAObjectUpdated || removeHPALabel(hpaObject, common.FedHPAEnableKey)
}

case FederatedHPAModeDistributed, FederatedHPAModeDefault:
if res := f.removeHPALabel(ctx, hpaObject, hpaGVR, HPAEnableKey); res != worker.StatusAllOK {
return worker.StatusError
}
isHPAObjectUpdated = removeHPALabel(hpaObject, common.FedHPAEnableKey)

if !workloadExist || isPropagationPolicyDuplicateMode(pp) &&
isPropagationPolicyFollowerEnabled(pp) &&
isWorkloadRetainReplicas(fedWorkload) &&
isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) {
if res := f.removeFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason); res != worker.StatusAllOK {
isHPAObjectUpdated = isHPAObjectUpdated || removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason)
if isFedHPAObjectUpdated, err = removePendingController(ctx, hpaFTC, fedHPAObject); err != nil {
return worker.StatusError
}
return f.removePendingController(ctx, hpaFTC, fedHPAObject)
} else {
hpaNotWorkReason := generateDistributedHPANotWorkReason(
isPropagationPolicyExist(pp),
Expand All @@ -448,10 +431,45 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s
hpaNotWorkReason,
)

if res := f.addFedHPANotWorkReasonAnno(ctx, hpaObject, hpaGVR, FedHPANotWorkReason, hpaNotWorkReason); res != worker.StatusAllOK {
isHPAObjectUpdated = isHPAObjectUpdated || addFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason, hpaNotWorkReason)
if isFedHPAObjectUpdated, err = addFedHPAPendingController(ctx, fedHPAObject); err != nil {
return worker.StatusError
}
return f.addFedHPAPendingController(ctx, fedHPAObject)
}
}

if isHPAObjectUpdated {
logger.V(1).Info("Updating hpa object")
_, err := f.dynamicClient.Resource(hpaGVR).Namespace(hpaObject.GetNamespace()).
UpdateStatus(ctx, hpaObject, metav1.UpdateOptions{})
if err != nil {
errMsg := "Failed to update hpa object"
logger.Error(err, errMsg)
f.eventRecorder.Eventf(hpaObject, corev1.EventTypeWarning, EventReasonUpdateHPASourceObject,
errMsg+" %v, err: %v, retry later", key, err)
if apierrors.IsConflict(err) {
return worker.StatusConflict
}
return worker.StatusError
}
}

if isFedHPAObjectUpdated {
logger.V(1).Info("Updating fed hpa object")
if _, err = fedobjectadapters.Update(
ctx,
f.fedClient.CoreV1alpha1(),
fedHPAObject,
metav1.UpdateOptions{},
); err != nil {
errMsg := "Failed to update fed hpa object"
logger.Error(err, errMsg)
f.eventRecorder.Eventf(fedHPAObject, corev1.EventTypeWarning, EventReasonUpdateHPAFedObject,
errMsg+" %v, err: %v, retry later", fedHPAObject, err)
if apierrors.IsConflict(err) {
return worker.StatusConflict
}
return worker.StatusError
}
}

Expand Down
Loading

0 comments on commit 9313ac0

Please sign in to comment.