Skip to content

Commit

Permalink
feat(fed-hpa): init controller frame
Browse files Browse the repository at this point in the history
  • Loading branch information
wy-lucky committed Oct 30, 2023
1 parent 60a002b commit eff31b1
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
SyncControllerName = "sync"
AutoMigrationControllerName = "auto-migration"
StatusAggregatorControllerName = "status-aggregator"
FederatedHPAControllerName = "federated-hpa"
)

var knownControllers = map[string]controllermanager.StartControllerFunc{
Expand All @@ -60,6 +61,7 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{
FollowerControllerName: startFollowerController,
AutoMigrationControllerName: startAutoMigrationController,
StatusAggregatorControllerName: startStatusAggregatorController,
FederatedHPAControllerName: startFederatedHPAController,
}

var controllersDisabledByDefault = sets.New[string]()
Expand Down
27 changes: 27 additions & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federate"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedhpa"
"github.com/kubewharf/kubeadmiral/pkg/controllers/follower"
"github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop"
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
Expand Down Expand Up @@ -317,3 +318,29 @@ func startStatusAggregatorController(

return statusAggregatorController, nil
}

func startFederatedHPAController(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
federatedHPAController, err := federatedhpa.NewFederatedHPAController(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.DynamicClientset,
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().PropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(),
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating status-aggregator controller: %w", err)
}

go federatedHPAController.Run(ctx)

return federatedHPAController, nil
}
303 changes: 303 additions & 0 deletions pkg/controllers/federatedhpa/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
package federatedhpa

Check failure on line 1 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/kubewharf/kubeadmiral/pkg/controllers/federatedhpa

import (
"context"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned"
fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/util/bijection"
"github.com/kubewharf/kubeadmiral/pkg/util/eventhandlers"
"github.com/kubewharf/kubeadmiral/pkg/util/eventsink"
"github.com/kubewharf/kubeadmiral/pkg/util/informermanager"
"github.com/kubewharf/kubeadmiral/pkg/util/logging"
"github.com/kubewharf/kubeadmiral/pkg/util/worker"
)

const (
FederatedHPAControllerName = "federated-hpa"
)

type Resource struct {
GVK schema.GroupVersionKind
Namespace string
Name string
}

type FederatedHPAController struct {
name string

informerManager informermanager.InformerManager
fedObjectInformer fedcorev1a1informers.FederatedObjectInformer
propagationPolicyInformer fedcorev1a1informers.PropagationPolicyInformer
clusterPropagationPolicyInformer fedcorev1a1informers.ClusterPropagationPolicyInformer

kubeClient kubernetes.Interface
fedClient fedclient.Interface
dynamicClient dynamic.Interface

worker worker.ReconcileWorker[Resource]

scaleTargetRefMapping map[schema.GroupVersionKind]string // hpa 的 scaleTargetRef 的路径
workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource] // workload 和 HPA 的1对多映射【多个hpa可以指向同一workload,尽管会冲突,但是无法拦截】
ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource] // PP 和 HPA有关联的workload 的1对多映射【多个workload可以被同一个PP管理】

eventRecorder record.EventRecorder

metrics stats.Metrics
logger klog.Logger
}

func NewFederatedHPAController(
kubeClient kubernetes.Interface,
fedClient fedclient.Interface,
dynamicClient dynamic.Interface,
informerManager informermanager.InformerManager,
fedObjectInformer fedcorev1a1informers.FederatedObjectInformer,
clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer,
propagationPolicyInformer fedcorev1a1informers.PropagationPolicyInformer,
clusterPropagationPolicyInformer fedcorev1a1informers.ClusterPropagationPolicyInformer,
metrics stats.Metrics,
logger klog.Logger,
workerCount int,
) (*FederatedHPAController, error) {
f := &FederatedHPAController{
name: FederatedHPAControllerName,

informerManager: informerManager,
fedObjectInformer: fedObjectInformer,
propagationPolicyInformer: propagationPolicyInformer,
clusterPropagationPolicyInformer: clusterPropagationPolicyInformer,

kubeClient: kubeClient,
fedClient: fedClient,
dynamicClient: dynamicClient,

metrics: metrics,
logger: logger.WithValues("controller", FederatedHPAControllerName),
eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, FederatedHPAControllerName, 6),
}

f.worker = worker.NewReconcileWorker[Resource](
"fed-hpa-controller",
f.reconcile,
worker.RateLimiterOptions{},
workerCount,
metrics,
)

predicate := func(old, cur metav1.Object) bool {
oldPP, ok := old.(fedcorev1a1.GenericPropagationPolicy)
if !ok {
return false
}
newPP, ok := cur.(fedcorev1a1.GenericPropagationPolicy)
if !ok {
return false
}
return oldPP.GetSpec().SchedulingMode != newPP.GetSpec().SchedulingMode ||
oldPP.GetSpec().DisableFollowerScheduling != newPP.GetSpec().DisableFollowerScheduling
}

if _, err := fedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnGenerationChanges(f.enqueueFedHPAForFederatedObjects),
); err != nil {
return nil, err
}
if _, err := clusterFedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnGenerationChanges(f.enqueueFedHPAForFederatedObjects),
); err != nil {
return nil, err
}

if _, err := propagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAForPropagationPolicy),
); err != nil {
return nil, err
}
if _, err := clusterPropagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnChanges(predicate, f.enqueueFedHPAForPropagationPolicy),
); err != nil {
return nil, err
}

if err := informerManager.AddFTCUpdateHandler(func(lastObserved, latest *fedcorev1a1.FederatedTypeConfig) {
f.handleFTCUpdate(latest)
}); err != nil {
return nil, err
}

return f, nil
}

func (f *FederatedHPAController) HasSynced() bool {
return f.informerManager.HasSynced() && f.fedObjectInformer.Informer().HasSynced() &&
f.clusterPropagationPolicyInformer.Informer().HasSynced() &&
f.propagationPolicyInformer.Informer().HasSynced()
}

func (f *FederatedHPAController) IsControllerReady() bool {
return f.HasSynced()
}

func (c *FederatedHPAController) Run(ctx context.Context) {
ctx, logger := logging.InjectLogger(ctx, c.logger)

logger.Info("Starting controller")
defer logger.Info("Stopping controller")

if !cache.WaitForNamedCacheSync(FederatedHPAControllerName, ctx.Done(), c.HasSynced) {
logger.Error(nil, "Timed out waiting for cache sync")
return
}

logger.Info("Caches are synced")

c.worker.Run(ctx)
<-ctx.Done()
}

func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (status worker.Result) {
// 获取联邦层的 hpa
hpa, err := f.getTargetHPA(key)

Check failure on line 172 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.getTargetHPA undefined (type *FederatedHPAController has no field or method getTargetHPA)

Check failure on line 172 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

f.getTargetHPA undefined (type *FederatedHPAController has no field or method getTargetHPA)

Check failure on line 172 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.getTargetHPA undefined (type *FederatedHPAController has no field or method getTargetHPA)
hpaFO, err := f.getFOFromResource(key)

Check failure on line 173 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.getFOFromResource undefined (type *FederatedHPAController has no field or method getFOFromResource)

Check failure on line 173 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

f.getFOFromResource undefined (type *FederatedHPAController has no field or method getFOFromResource)

Check warning on line 173 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"FO" should be "OF" or "FOR" or "DO" or "GO" or "TO".

Check warning on line 173 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"FO" should be "OF" or "FOR" or "DO" or "GO" or "TO".

Check failure on line 173 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.getFOFromResource undefined (type *FederatedHPAController has no field or method getFOFromResource)

// 获取hpa指向的workload
scaleTargetRef, err := f.getScaleTargetRef(hpa)

Check failure on line 176 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.getScaleTargetRef undefined (type *FederatedHPAController has no field or method getScaleTargetRef)

Check failure on line 176 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

f.getScaleTargetRef undefined (type *FederatedHPAController has no field or method getScaleTargetRef)

Check failure on line 176 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.getScaleTargetRef undefined (type *FederatedHPAController has no field or method getScaleTargetRef)
// 查看hpa指向的workload是否改变
newWrokload := scaleTargetRefToResource(scaleTargetRef, key.Namespace)

Check failure on line 178 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

undefined: scaleTargetRefToResource

Check failure on line 178 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

undefined: scaleTargetRefToResource

Check warning on line 178 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"Wrokload" should be "Workload".

Check failure on line 178 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

undefined: scaleTargetRefToResource
oldWorkload, exist := f.workloadHPAMapping.LookupByT2(key)
if exist {
f.workloadHPAMapping.DeleteT2(key) // 老的workload存在则删除老的
f.ppWorkloadMapping.DeleteT2(oldWorkload) // 同样删除老workload及其关联的PP
}
// 添加新的workload关系
if err := f.workloadHPAMapping.Add(newWrokload, key); err != nil {

Check warning on line 185 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"Wrokload" should be "Workload".
return worker.StatusError
}

// 获取workload的 fo

Check warning on line 189 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"fo" should be "of" or "for" or "do" or "go" or "to".
workloadFO, err := f.getFOFromResource(newWrokload)

Check failure on line 190 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.getFOFromResource undefined (type *FederatedHPAController has no field or method getFOFromResource)

Check failure on line 190 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

f.getFOFromResource undefined (type *FederatedHPAController has no field or method getFOFromResource)

Check warning on line 190 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"FO" should be "OF" or "FOR" or "DO" or "GO" or "TO".

Check warning on line 190 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"FO" should be "OF" or "FOR" or "DO" or "GO" or "TO".

Check warning on line 190 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"Wrokload" should be "Workload".

Check failure on line 190 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.getFOFromResource undefined (type *FederatedHPAController has no field or method getFOFromResource)
workloadExist := true
if err != nil {
if !errors.IsNotFound(err) {
return worker.StatusError
}
workloadExist = false
}
// 从workload的fo上获取pp

Check warning on line 198 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"fo" should be "of" or "for" or "do" or "go" or "to".
var pp fedcorev1a1.GenericPropagationPolicy
if workloadExist {
newPPResource := getPPFromFo(workloadFO) // 当前workload关联的pp

Check failure on line 201 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

undefined: getPPFromFo

Check failure on line 201 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

undefined: getPPFromFo

Check warning on line 201 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / typos

"Fo" should be "Of" or "For" or "Do" or "Go" or "To".

Check failure on line 201 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

undefined: getPPFromFo
_, exist := f.ppWorkloadMapping.LookupByT2(newWrokload) //workload之前关联的PP
if exist { // 存在老的PP则清除,因为可能已经过时
f.ppWorkloadMapping.DeleteT2(newWrokload)
}
if newPPResource != nil {
// 存在新PP则写入新关系
if err := f.ppWorkloadMapping.Add(newPPResource, newWrokload); err != nil {
return worker.StatusError
}

pp, err = f.getPP(newPPResource) // 获取pp内容

Check failure on line 212 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.getPP undefined (type *FederatedHPAController has no field or method getPP)

Check failure on line 212 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

f.getPP undefined (type *FederatedHPAController has no field or method getPP)

Check failure on line 212 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.getPP undefined (type *FederatedHPAController has no field or method getPP)
if err != nil {
if errors.IsNotFound(err) {
return worker.StatusAllOK
}
return worker.StatusError
}
}
}

switch hpa.Labels["hpa-mode"] {
case "federation":
if err := f.addPendingController(hpaFO, "fed-hpa-controller"); err != nil {

Check failure on line 224 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.addPendingController undefined (type *FederatedHPAController has no field or method addPendingController)

Check failure on line 224 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

f.addPendingController undefined (type *FederatedHPAController has no field or method addPendingController)

Check failure on line 224 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.addPendingController undefined (type *FederatedHPAController has no field or method addPendingController)
return worker.StatusError
}
if !workloadExist || isDividedPP(pp) {

Check failure on line 227 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

undefined: isDividedPP

Check failure on line 227 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / lint

undefined: isDividedPP

Check failure on line 227 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

undefined: isDividedPP
if err := f.addFedHPAAnnotation(hpa, "fed-hpa-enabled", "true"); err != nil {

Check failure on line 228 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

f.addFedHPAAnnotation undefined (type *FederatedHPAController has no field or method addFedHPAAnnotation)

Check failure on line 228 in pkg/controllers/federatedhpa/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

f.addFedHPAAnnotation undefined (type *FederatedHPAController has no field or method addFedHPAAnnotation)
return worker.StatusError
}
return worker.StatusAllOK
}
if err := f.removeFedHPAAnnotation(hpa, "fed-hpa-enabled"); err != nil {
return worker.StatusError
}
return worker.StatusAllOK

case "distributed", "":
if err := f.removeFedHPAAnnotation(hpa, "fed-hpa-enabled"); err != nil {
return worker.StatusError
}
if !workloadExist || !isDividedPP(pp) &&
isFollowerEnabled(pp) &&
isWorkloadRetained(workloadFO) &&
doseHPAFollowTheWorkload(hpa, workloadFO) {
if err := f.removeFedHPAAnnotation(hpa, "fed-hpa-enabled"); err != nil {
return worker.StatusError
}
if err := f.removePendingController(hpaFO, "fed-hpa-controller"); err != nil {
return worker.StatusError
}
return worker.StatusAllOK
}
if err := f.addPendingController(hpaFO, "fed-hpa-controller"); err != nil {
return worker.StatusError
}
return worker.StatusAllOK
}

return worker.StatusAllOK
}

func (f *FederatedHPAController) enqueueFedHPAForFederatedObjects(fo metav1.Object) {
key := ObjectToResource(fo)
if f.isHPAType(fo) {
f.worker.Enqueue(key)
return
}

if hpas, exist := f.workloadHPAMapping.LookupByT1(key); exist {
for hpa := range hpas {
f.worker.Enqueue(hpa)
}
}
}

func (f *FederatedHPAController) enqueueFedHPAForPropagationPolicy(policy metav1.Object) {
key := ObjectToResource(policy)

if workloads, exist := f.ppWorkloadMapping.LookupByT1(key); exist {
for workload := range workloads {
if hpas, exist := f.workloadHPAMapping.LookupByT1(workload); exist {
for hpa := range hpas {
f.worker.Enqueue(hpa)
}
}
}
}
}

func (f *FederatedHPAController) handleFTCUpdate(ftc *fedcorev1a1.FederatedTypeConfig) {
resourceGVK := schema.GroupVersionKind{
Group: ftc.Spec.SourceType.Group,
Version: ftc.Spec.SourceType.Version,
Kind: ftc.Spec.SourceType.Kind,
}

if ftc == nil {
delete(f.scaleTargetRefMapping, resourceGVK)
} else {
// todo: 解析scaleTargetRef
}
}
23 changes: 23 additions & 0 deletions pkg/controllers/federatedhpa/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package federatedhpa

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
)

func ObjectToResource(object metav1.Object) Resource {
uns := object.(*unstructured.Unstructured)
return Resource{
Name: uns.GetName(),
Namespace: uns.GetNamespace(),
GVK: uns.GroupVersionKind(),
}
}

func (f *FederatedHPAController) isHPAType(fo metav1.Object) bool {
federatedObject := fo.(*fedcorev1a1.FederatedObject)
_, ok := f.scaleTargetRefMapping[federatedObject.GroupVersionKind()]
return ok
}
Loading

0 comments on commit eff31b1

Please sign in to comment.