From 1bf9a43ae2a8716aac44001f86021366ee9da4d9 Mon Sep 17 00:00:00 2001 From: shentiecheng Date: Mon, 13 Jan 2025 18:00:47 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9Asupport=20svc=20and=20eps=20list?= =?UTF-8?q?=20and=20watch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/hpaaggregatorapiserver/apiserver.go | 4 + .../pkg/printers/internalversion/printers.go | 204 +++++++- .../aggregation/forward/common.go | 34 +- .../aggregation/forward/endpointSlice.go | 208 +++++++++ .../hpaaggregator/aggregation/forward/pod.go | 12 +- .../aggregation/forward/service.go | 208 +++++++++ .../hpaaggregator/aggregation/rest.go | 35 +- .../metrics/resource/pod_test.go | 437 ------------------ .../hpaaggregator/metrics/resource_metrics.go | 2 +- pkg/util/aggregatedlister/endpointSlice.go | 145 ++++++ pkg/util/aggregatedlister/pod.go | 5 +- pkg/util/aggregatedlister/service.go | 145 ++++++ 12 files changed, 987 insertions(+), 452 deletions(-) create mode 100644 pkg/registry/hpaaggregator/aggregation/forward/endpointSlice.go create mode 100644 pkg/registry/hpaaggregator/aggregation/forward/service.go delete mode 100644 pkg/registry/hpaaggregator/metrics/resource/pod_test.go create mode 100644 pkg/util/aggregatedlister/endpointSlice.go create mode 100644 pkg/util/aggregatedlister/service.go diff --git a/pkg/hpaaggregatorapiserver/apiserver.go b/pkg/hpaaggregatorapiserver/apiserver.go index 8c618395..63b414b7 100644 --- a/pkg/hpaaggregatorapiserver/apiserver.go +++ b/pkg/hpaaggregatorapiserver/apiserver.go @@ -149,11 +149,15 @@ func (c completedConfig) New() (*Server, error) { apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(hpaaggregatorapi.GroupName, Scheme, ParameterCodec, Codecs) podLister := aggregatedlister.NewPodLister(c.ExtraConfig.FederatedInformerManager) + serviceLister := aggregatedlister.NewServiceLister(c.ExtraConfig.FederatedInformerManager) + endpointSliceLister := aggregatedlister.NewEndpointSliceLister(c.ExtraConfig.FederatedInformerManager) v1alpha1storage := map[string]rest.Storage{} aggregationAPI, err := aggregation.NewREST( c.ExtraConfig.FederatedInformerManager, podLister, + serviceLister, + endpointSliceLister, c.ExtraConfig.RestConfig, time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second, klog.Background().WithValues("api", "aggregations"), diff --git a/pkg/lifted/kubernetes/pkg/printers/internalversion/printers.go b/pkg/lifted/kubernetes/pkg/printers/internalversion/printers.go index 107474aa..ef8fb4fa 100644 --- a/pkg/lifted/kubernetes/pkg/printers/internalversion/printers.go +++ b/pkg/lifted/kubernetes/pkg/printers/internalversion/printers.go @@ -27,12 +27,16 @@ package internalversion import ( "fmt" "strconv" + "strings" "time" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/duration" + "k8s.io/apimachinery/pkg/util/sets" "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/printers" ) @@ -41,7 +45,10 @@ import ( // on the node it is (was) running. // // taken from https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/util/node/node.go#L37 -const NodeUnreachablePodReason = "NodeLost" +const ( + NodeUnreachablePodReason = "NodeLost" + loadBalancerWidth = 16 +) // AddHandlers adds print handlers for default Kubernetes types dealing with internal versions. // TODO: handle errors from Handler @@ -61,6 +68,28 @@ func AddHandlers(h printers.PrintHandler) { } _ = h.TableHandler(podColumnDefinitions, printPodList) _ = h.TableHandler(podColumnDefinitions, printPod) + + serviceColumnDefinitions := []metav1.TableColumnDefinition{ + {Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]}, + {Name: "Type", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["type"]}, + {Name: "Cluster-IP", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["clusterIP"]}, + {Name: "External-IP", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["externalIPs"]}, + {Name: "Port(s)", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["ports"]}, + {Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]}, + {Name: "Selector", Type: "string", Priority: 1, Description: corev1.ServiceSpec{}.SwaggerDoc()["selector"]}, + } + _ = h.TableHandler(serviceColumnDefinitions, printService) + _ = h.TableHandler(serviceColumnDefinitions, printServiceList) + + endpointSliceColumnDefinitions := []metav1.TableColumnDefinition{ + {Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]}, + {Name: "AddressType", Type: "string", Description: discoveryv1.EndpointSlice{}.SwaggerDoc()["addressType"]}, + {Name: "Ports", Type: "string", Description: discoveryv1.EndpointSlice{}.SwaggerDoc()["ports"]}, + {Name: "Endpoints", Type: "string", Description: discoveryv1.EndpointSlice{}.SwaggerDoc()["endpoints"]}, + {Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]}, + } + _ = h.TableHandler(endpointSliceColumnDefinitions, printEndpointSlice) + _ = h.TableHandler(endpointSliceColumnDefinitions, printEndpointSliceList) } // translateTimestampSince returns the elapsed time since timestamp in @@ -91,7 +120,7 @@ func printPodList(podList *corev1.PodList, options printers.GenerateOptions) ([] return rows, nil } -//nolint +// nolint func printPod(pod *corev1.Pod, options printers.GenerateOptions) ([]metav1.TableRow, error) { restarts := 0 totalContainers := len(pod.Spec.Containers) @@ -246,6 +275,177 @@ func printPod(pod *corev1.Pod, options printers.GenerateOptions) ([]metav1.Table return []metav1.TableRow{row}, nil } +func printService(obj *corev1.Service, options printers.GenerateOptions) ([]metav1.TableRow, error) { + row := metav1.TableRow{ + Object: runtime.RawExtension{Object: obj}, + } + svcType := obj.Spec.Type + internalIP := "" + if len(obj.Spec.ClusterIPs) > 0 { + internalIP = obj.Spec.ClusterIPs[0] + } + + externalIP := getServiceExternalIP(obj, options.Wide) + svcPorts := makePortString(obj.Spec.Ports) + if len(svcPorts) == 0 { + svcPorts = "" + } + + row.Cells = append(row.Cells, obj.Name, string(svcType), internalIP, externalIP, svcPorts, translateTimestampSince(obj.CreationTimestamp)) + if options.Wide { + row.Cells = append(row.Cells, labels.FormatLabels(obj.Spec.Selector)) + } + + return []metav1.TableRow{row}, nil +} + +func getServiceExternalIP(svc *corev1.Service, wide bool) string { + switch svc.Spec.Type { + case corev1.ServiceTypeClusterIP: + if len(svc.Spec.ExternalIPs) > 0 { + return strings.Join(svc.Spec.ExternalIPs, ",") + } + return "" + case corev1.ServiceTypeNodePort: + if len(svc.Spec.ExternalIPs) > 0 { + return strings.Join(svc.Spec.ExternalIPs, ",") + } + return "" + case corev1.ServiceTypeLoadBalancer: + lbIps := loadBalancerStatusStringer(svc.Status.LoadBalancer, wide) + if len(svc.Spec.ExternalIPs) > 0 { + results := []string{} + if len(lbIps) > 0 { + results = append(results, strings.Split(lbIps, ",")...) + } + results = append(results, svc.Spec.ExternalIPs...) + return strings.Join(results, ",") + } + if len(lbIps) > 0 { + return lbIps + } + return "" + case corev1.ServiceTypeExternalName: + return svc.Spec.ExternalName + } + return "" +} + +// loadBalancerStatusStringer behaves mostly like a string interface and converts the given status to a string. +// `wide` indicates whether the returned value is meant for --o=wide output. If not, it's clipped to 16 bytes. +func loadBalancerStatusStringer(s corev1.LoadBalancerStatus, wide bool) string { + ingress := s.Ingress + result := sets.NewString() + for i := range ingress { + if ingress[i].IP != "" { + result.Insert(ingress[i].IP) + } else if ingress[i].Hostname != "" { + result.Insert(ingress[i].Hostname) + } + } + + r := strings.Join(result.List(), ",") + if !wide && len(r) > loadBalancerWidth { + r = r[0:(loadBalancerWidth-3)] + "..." + } + return r +} + +func makePortString(ports []corev1.ServicePort) string { + pieces := make([]string, len(ports)) + for ix := range ports { + port := &ports[ix] + pieces[ix] = fmt.Sprintf("%d/%s", port.Port, port.Protocol) + if port.NodePort > 0 { + pieces[ix] = fmt.Sprintf("%d:%d/%s", port.Port, port.NodePort, port.Protocol) + } + } + return strings.Join(pieces, ",") +} + +func printServiceList(list *corev1.ServiceList, options printers.GenerateOptions) ([]metav1.TableRow, error) { + rows := make([]metav1.TableRow, 0, len(list.Items)) + for i := range list.Items { + r, err := printService(&list.Items[i], options) + if err != nil { + return nil, err + } + rows = append(rows, r...) + } + return rows, nil +} + +func printEndpointSlice(obj *discoveryv1.EndpointSlice, options printers.GenerateOptions) ([]metav1.TableRow, error) { + row := metav1.TableRow{ + Object: runtime.RawExtension{Object: obj}, + } + row.Cells = append(row.Cells, obj.Name, string(obj.AddressType), formatDiscoveryPorts(obj.Ports), formatDiscoveryEndpoints(obj.Endpoints), translateTimestampSince(obj.CreationTimestamp)) + return []metav1.TableRow{row}, nil +} + +func printEndpointSliceList(list *discoveryv1.EndpointSliceList, options printers.GenerateOptions) ([]metav1.TableRow, error) { + rows := make([]metav1.TableRow, 0, len(list.Items)) + for i := range list.Items { + r, err := printEndpointSlice(&list.Items[i], options) + if err != nil { + return nil, err + } + rows = append(rows, r...) + } + return rows, nil +} + +func formatDiscoveryPorts(ports []discoveryv1.EndpointPort) string { + list := []string{} + max := 3 + more := false + count := 0 + for _, port := range ports { + if len(list) < max { + portNum := "*" + if port.Port != nil { + portNum = strconv.Itoa(int(*port.Port)) + } else if port.Name != nil { + portNum = *port.Name + } + list = append(list, portNum) + } else if len(list) == max { + more = true + } + count++ + } + return listWithMoreString(list, more, count, max) +} + +func listWithMoreString(list []string, more bool, count, max int) string { + ret := strings.Join(list, ",") + if more { + return fmt.Sprintf("%s + %d more...", ret, count-max) + } + if ret == "" { + ret = "" + } + return ret +} + +func formatDiscoveryEndpoints(endpoints []discoveryv1.Endpoint) string { + list := []string{} + max := 3 + more := false + count := 0 + for _, endpoint := range endpoints { + for _, address := range endpoint.Addresses { + if len(list) < max { + list = append(list, address) + } else if len(list) == max { + more = true + } + count++ + } + } + return listWithMoreString(list, more, count, max) +} + func hasPodReadyCondition(conditions []corev1.PodCondition) bool { for _, condition := range conditions { if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { diff --git a/pkg/registry/hpaaggregator/aggregation/forward/common.go b/pkg/registry/hpaaggregator/aggregation/forward/common.go index 8d159858..4842ea2e 100644 --- a/pkg/registry/hpaaggregator/aggregation/forward/common.go +++ b/pkg/registry/hpaaggregator/aggregation/forward/common.go @@ -21,6 +21,7 @@ import ( "errors" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +39,12 @@ import ( printerstorage "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/printers/storage" ) +const ( + getVerb = "get" + listVerb = "list" + watchVerb = "watch" +) + var ( scheme = runtime.NewScheme() codecs = serializer.NewCodecFactory(scheme) @@ -55,7 +62,7 @@ var ( tableConvertor = printerstorage.TableConvertor{ TableGenerator: printers.NewTableGenerator().With(internalversion.AddHandlers), } - scope = &handlers.RequestScope{ + podScope = &handlers.RequestScope{ Namer: &handlers.ContextBasedNaming{ Namer: runtime.Namer(meta.NewAccessor()), ClusterScoped: false, @@ -67,11 +74,36 @@ var ( MetaGroupVersion: metav1.SchemeGroupVersion, Resource: common.PodGVR, } + serviceScope = &handlers.RequestScope{ + Namer: &handlers.ContextBasedNaming{ + Namer: runtime.Namer(meta.NewAccessor()), + ClusterScoped: false, + }, + Serializer: codecs, + Kind: common.ServiceGVK, + TableConvertor: tableConvertor, + Convertor: scheme, + MetaGroupVersion: metav1.SchemeGroupVersion, + Resource: common.ServiceGVR, + } + endpointSliceScope = &handlers.RequestScope{ + Namer: &handlers.ContextBasedNaming{ + Namer: runtime.Namer(meta.NewAccessor()), + ClusterScoped: false, + }, + Serializer: codecs, + Kind: common.EndpointSliceGVK, + TableConvertor: tableConvertor, + Convertor: scheme, + MetaGroupVersion: metav1.SchemeGroupVersion, + Resource: common.EndpointSliceGVR, + } ) func init() { metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) utilruntime.Must(corev1.AddToScheme(scheme)) + utilruntime.Must(discoveryv1.AddToScheme(scheme)) utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion)) scheme.AddUnversionedTypes(unversionedVersion, unversionedTypes...) diff --git a/pkg/registry/hpaaggregator/aggregation/forward/endpointSlice.go b/pkg/registry/hpaaggregator/aggregation/forward/endpointSlice.go new file mode 100644 index 00000000..4749e027 --- /dev/null +++ b/pkg/registry/hpaaggregator/aggregation/forward/endpointSlice.go @@ -0,0 +1,208 @@ +/* +Copyright 2025 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//nolint:dupl + +package forward + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/handlers" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" + "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" +) + +type EndpointSliceHandler interface { + Handler(requestInfo *genericapirequest.RequestInfo) (http.Handler, error) +} + +type EndpointSliceREST struct { + endpointSliceLister aggregatedlister.AggregatedLister + federatedInformerManager informermanager.FederatedInformerManager + minRequestTimeout time.Duration +} + +var ( + _ rest.Getter = &EndpointSliceREST{} + _ rest.Lister = &EndpointSliceREST{} + _ rest.Watcher = &EndpointSliceREST{} + _ EndpointSliceHandler = &EndpointSliceREST{} +) + +func NewEndpointSliceREST( + f informermanager.FederatedInformerManager, + endpointSliceLister aggregatedlister.AggregatedLister, + minRequestTimeout time.Duration, +) *EndpointSliceREST { + return &EndpointSliceREST{ + federatedInformerManager: f, + endpointSliceLister: endpointSliceLister, + minRequestTimeout: minRequestTimeout, + } +} + +func (e *EndpointSliceREST) Handler(requestInfo *genericapirequest.RequestInfo) (http.Handler, error) { + switch requestInfo.Verb { + case getVerb: + return handlers.GetResource(e, endpointSliceScope), nil + case listVerb, watchVerb: + return handlers.ListResource(e, e, endpointSliceScope, false, e.minRequestTimeout), nil + default: + return nil, apierrors.NewMethodNotSupported(schema.GroupResource{ + Group: requestInfo.APIGroup, + Resource: requestInfo.Resource, + }, requestInfo.Verb) + } +} + +func (e *EndpointSliceREST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) { + namespace := genericapirequest.NamespaceValue(ctx) + obj, err := e.endpointSliceLister.ByNamespace(namespace).Get(ctx, name, *opts) + if err != nil { + if apierrors.IsNotFound(err) { + // return not-found errors directly + return nil, err + } + klog.ErrorS(err, "Failed getting endpointSlice", "endpointSlice", klog.KRef(namespace, name)) + return nil, fmt.Errorf("failed getting endpointSlice: %w", err) + } + return obj, nil +} + +func (e *EndpointSliceREST) NewList() runtime.Object { + return &discoveryv1.EndpointSliceList{} +} + +func (e *EndpointSliceREST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { + namespace := genericapirequest.NamespaceValue(ctx) + objs, err := e.endpointSliceLister.ByNamespace(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: options.LabelSelector.String(), + FieldSelector: options.FieldSelector.String(), + ResourceVersion: options.ResourceVersion, + }) + if err != nil { + klog.ErrorS(err, "Failed listing endpointSlices", "namespace", klog.KRef("", namespace)) + return nil, fmt.Errorf("failed listing endpointSlices: %w", err) + } + return objs, nil +} + +func (e *EndpointSliceREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { + return tableConvertor.ConvertToTable(ctx, object, tableOptions) +} + +func (e *EndpointSliceREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + grv := aggregatedlister.NewGlobalResourceVersionFromString(options.ResourceVersion) + retGrv := grv.Clone() + label := labels.Everything() + if options != nil && options.LabelSelector != nil { + label = options.LabelSelector + } + + namespace := genericapirequest.NamespaceValue(ctx) + + ctx, logger := logging.InjectLoggerValues( + ctx, + "label_selector", label, + "field_selector", options.FieldSelector, + "namespace", namespace, + ) + + clusters, err := e.federatedInformerManager.GetReadyClusters() + if err != nil { + logger.Error(err, "Failed to get ready clusters") + return nil, fmt.Errorf("failed watching endpointSlices: %w", err) + } + + // TODO: support cluster addition and deletion during the watch + var lock sync.Mutex + isProxyChClosed := false + proxyCh := make(chan watch.Event) + proxyWatcher := watch.NewProxyWatcher(proxyCh) + for i := range clusters { + client, exist := e.federatedInformerManager.GetClusterKubeClient(clusters[i].Name) + if !exist { + continue + } + watcher, err := client.DiscoveryV1().EndpointSlices(namespace).Watch(ctx, metav1.ListOptions{ + LabelSelector: label.String(), + FieldSelector: options.FieldSelector.String(), + TimeoutSeconds: pointer.Int64(1200), + ResourceVersion: grv.Get(clusters[i].Name), + }) + if err != nil { + logger.Error(err, "Failed watching endpointSlices") + continue + } + go func(cluster string) { + defer func() { + logger.WithValues("cluster", cluster).Info("Stopped cluster watcher") + watcher.Stop() + }() + for { + select { + case <-proxyWatcher.StopChan(): + return + case event, ok := <-watcher.ResultChan(): + if !ok { + lock.Lock() + if !isProxyChClosed { + close(proxyCh) + isProxyChClosed = true + logger.WithValues("cluster", cluster).Info("Closed proxy watcher channel") + } + lock.Unlock() + + return + } + if eps, ok := event.Object.(*discoveryv1.EndpointSlice); ok { + clusterobject.MakeObjectUnique(eps, cluster) + retGrv.Set(cluster, eps.ResourceVersion) + eps.SetResourceVersion(retGrv.String()) + event.Object = eps + } + + lock.Lock() + if !isProxyChClosed { + proxyCh <- event + } + lock.Unlock() + } + } + }(clusters[i].Name) + } + return proxyWatcher, nil +} diff --git a/pkg/registry/hpaaggregator/aggregation/forward/pod.go b/pkg/registry/hpaaggregator/aggregation/forward/pod.go index 8664251b..cefa8a43 100644 --- a/pkg/registry/hpaaggregator/aggregation/forward/pod.go +++ b/pkg/registry/hpaaggregator/aggregation/forward/pod.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:dupl + package forward import ( @@ -23,7 +25,6 @@ import ( "sync" "time" - "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" @@ -38,6 +39,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/pointer" + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" "github.com/kubewharf/kubeadmiral/pkg/util/logging" @@ -74,10 +76,10 @@ func NewPodREST( func (p *PodREST) Handler(requestInfo *genericapirequest.RequestInfo) (http.Handler, error) { switch requestInfo.Verb { - case "get": - return handlers.GetResource(p, scope), nil - case "list", "watch": - return handlers.ListResource(p, p, scope, false, p.minRequestTimeout), nil + case getVerb: + return handlers.GetResource(p, podScope), nil + case listVerb, watchVerb: + return handlers.ListResource(p, p, podScope, false, p.minRequestTimeout), nil default: return nil, apierrors.NewMethodNotSupported(schema.GroupResource{ Group: requestInfo.APIGroup, diff --git a/pkg/registry/hpaaggregator/aggregation/forward/service.go b/pkg/registry/hpaaggregator/aggregation/forward/service.go new file mode 100644 index 00000000..5e2769a2 --- /dev/null +++ b/pkg/registry/hpaaggregator/aggregation/forward/service.go @@ -0,0 +1,208 @@ +/* +Copyright 2025 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//nolint:dupl + +package forward + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/handlers" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" + "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" +) + +type ServiceHandler interface { + Handler(requestInfo *genericapirequest.RequestInfo) (http.Handler, error) +} + +type ServiceREST struct { + serviceLister aggregatedlister.AggregatedLister + federatedInformerManager informermanager.FederatedInformerManager + minRequestTimeout time.Duration +} + +var ( + _ rest.Getter = &ServiceREST{} + _ rest.Lister = &ServiceREST{} + _ rest.Watcher = &ServiceREST{} + _ ServiceHandler = &ServiceREST{} +) + +func NewServiceREST( + f informermanager.FederatedInformerManager, + serviceLister aggregatedlister.AggregatedLister, + minRequestTimeout time.Duration, +) *ServiceREST { + return &ServiceREST{ + federatedInformerManager: f, + serviceLister: serviceLister, + minRequestTimeout: minRequestTimeout, + } +} + +func (s *ServiceREST) Handler(requestInfo *genericapirequest.RequestInfo) (http.Handler, error) { + switch requestInfo.Verb { + case getVerb: + return handlers.GetResource(s, serviceScope), nil + case listVerb, watchVerb: + return handlers.ListResource(s, s, serviceScope, false, s.minRequestTimeout), nil + default: + return nil, apierrors.NewMethodNotSupported(schema.GroupResource{ + Group: requestInfo.APIGroup, + Resource: requestInfo.Resource, + }, requestInfo.Verb) + } +} + +func (s *ServiceREST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) { + namespace := genericapirequest.NamespaceValue(ctx) + obj, err := s.serviceLister.ByNamespace(namespace).Get(ctx, name, *opts) + if err != nil { + if apierrors.IsNotFound(err) { + // return not-found errors directly + return nil, err + } + klog.ErrorS(err, "Failed getting service", "service", klog.KRef(namespace, name)) + return nil, fmt.Errorf("failed getting service: %w", err) + } + return obj, nil +} + +func (s *ServiceREST) NewList() runtime.Object { + return &corev1.ServiceList{} +} + +func (s *ServiceREST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { + namespace := genericapirequest.NamespaceValue(ctx) + objs, err := s.serviceLister.ByNamespace(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: options.LabelSelector.String(), + FieldSelector: options.FieldSelector.String(), + ResourceVersion: options.ResourceVersion, + }) + if err != nil { + klog.ErrorS(err, "Failed listing services", "namespace", klog.KRef("", namespace)) + return nil, fmt.Errorf("failed listing services: %w", err) + } + return objs, nil +} + +func (s *ServiceREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { + return tableConvertor.ConvertToTable(ctx, object, tableOptions) +} + +func (s *ServiceREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + grv := aggregatedlister.NewGlobalResourceVersionFromString(options.ResourceVersion) + retGrv := grv.Clone() + label := labels.Everything() + if options != nil && options.LabelSelector != nil { + label = options.LabelSelector + } + + namespace := genericapirequest.NamespaceValue(ctx) + + ctx, logger := logging.InjectLoggerValues( + ctx, + "label_selector", label, + "field_selector", options.FieldSelector, + "namespace", namespace, + ) + + clusters, err := s.federatedInformerManager.GetReadyClusters() + if err != nil { + logger.Error(err, "Failed to get ready clusters") + return nil, fmt.Errorf("failed watching services: %w", err) + } + + // TODO: support cluster addition and deletion during the watch + var lock sync.Mutex + isProxyChClosed := false + proxyCh := make(chan watch.Event) + proxyWatcher := watch.NewProxyWatcher(proxyCh) + for i := range clusters { + client, exist := s.federatedInformerManager.GetClusterKubeClient(clusters[i].Name) + if !exist { + continue + } + watcher, err := client.CoreV1().Services(namespace).Watch(ctx, metav1.ListOptions{ + LabelSelector: label.String(), + FieldSelector: options.FieldSelector.String(), + TimeoutSeconds: pointer.Int64(1200), + ResourceVersion: grv.Get(clusters[i].Name), + }) + if err != nil { + logger.Error(err, "Failed watching services") + continue + } + go func(cluster string) { + defer func() { + logger.WithValues("cluster", cluster).Info("Stopped cluster watcher") + watcher.Stop() + }() + for { + select { + case <-proxyWatcher.StopChan(): + return + case event, ok := <-watcher.ResultChan(): + if !ok { + lock.Lock() + if !isProxyChClosed { + close(proxyCh) + isProxyChClosed = true + logger.WithValues("cluster", cluster).Info("Closed proxy watcher channel") + } + lock.Unlock() + + return + } + if svc, ok := event.Object.(*corev1.Service); ok { + clusterobject.MakeObjectUnique(svc, cluster) + retGrv.Set(cluster, svc.ResourceVersion) + svc.SetResourceVersion(retGrv.String()) + event.Object = svc + } + + lock.Lock() + if !isProxyChClosed { + proxyCh <- event + } + lock.Unlock() + } + } + }(clusters[i].Name) + } + return proxyWatcher, nil +} diff --git a/pkg/registry/hpaaggregator/aggregation/rest.go b/pkg/registry/hpaaggregator/aggregation/rest.go index a5aaf2df..ca2329b5 100644 --- a/pkg/registry/hpaaggregator/aggregation/rest.go +++ b/pkg/registry/hpaaggregator/aggregation/rest.go @@ -24,7 +24,6 @@ import ( "path" "time" - "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -37,6 +36,7 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/apis/hpaaggregator/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/registry/hpaaggregator/aggregation/forward" + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" ) @@ -46,8 +46,9 @@ type REST struct { resolver genericapirequest.RequestInfoResolver - podLister aggregatedlister.AggregatedLister - podHandler forward.PodHandler + podHandler forward.PodHandler + serviceHandler forward.ServiceHandler + endpointSliceHandler forward.EndpointSliceHandler forwardHandler forward.ForwardHandler @@ -66,6 +67,8 @@ var proxyMethods = []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OP func NewREST( federatedInformerManager informermanager.FederatedInformerManager, podLister aggregatedlister.AggregatedLister, + serviceLister aggregatedlister.AggregatedLister, + endpointSliceLister aggregatedlister.AggregatedLister, config *restclient.Config, minRequestTimeout time.Duration, logger klog.Logger, @@ -76,6 +79,18 @@ func NewREST( minRequestTimeout, ) + serviceHandler := forward.NewServiceREST( + federatedInformerManager, + serviceLister, + minRequestTimeout, + ) + + endpointSliceHandler := forward.NewEndpointSliceREST( + federatedInformerManager, + endpointSliceLister, + minRequestTimeout, + ) + forwardHandler := forward.NewForwardHandler(config) resolver := &genericapirequest.RequestInfoFactory{ @@ -87,8 +102,9 @@ func NewREST( restConfig: config, federatedInformerManager: federatedInformerManager, resolver: resolver, - podLister: podLister, podHandler: podHandler, + serviceHandler: serviceHandler, + endpointSliceHandler: endpointSliceHandler, forwardHandler: forwardHandler, logger: logger, }, nil @@ -149,6 +165,10 @@ func (r *REST) Connect(ctx context.Context, _ string, _ runtime.Object, resp res err = errors.New("can't proxy to self") case isRequestForPod(proxyInfo): proxyHandler, err = r.podHandler.Handler(proxyInfo) + case isRequestForService(proxyInfo): + proxyHandler, err = r.serviceHandler.Handler(proxyInfo) + case isRequestForEndpointSlice(proxyInfo): + proxyHandler, err = r.endpointSliceHandler.Handler(proxyInfo) default: // TODO: if we provide an API for ResourceMetrics or CustomMetrics, we can serve it directly without proxy proxyHandler, err = r.forwardHandler.Handler(proxyInfo, r.isRequestForHPA(proxyInfo)) @@ -195,3 +215,10 @@ func (r *REST) isRequestForHPA(request *genericapirequest.RequestInfo) bool { } return false } + +func isRequestForService(request *genericapirequest.RequestInfo) bool { + return request.APIGroup == "" && request.APIVersion == "v1" && request.Resource == "services" +} +func isRequestForEndpointSlice(request *genericapirequest.RequestInfo) bool { + return request.APIGroup == "discovery.k8s.io" && request.APIVersion == "v1" && request.Resource == "endpointslices" +} diff --git a/pkg/registry/hpaaggregator/metrics/resource/pod_test.go b/pkg/registry/hpaaggregator/metrics/resource/pod_test.go deleted file mode 100644 index 64579817..00000000 --- a/pkg/registry/hpaaggregator/metrics/resource/pod_test.go +++ /dev/null @@ -1,437 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -This file may have been modified by The KubeAdmiral Authors -("KubeAdmiral Modifications"). All KubeAdmiral Modifications -are Copyright 2023 The KubeAdmiral Authors. -*/ - -package resource - -import ( - "fmt" - "reflect" - "strings" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - apitypes "k8s.io/apimachinery/pkg/types" - genericapirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/client-go/tools/cache" - "k8s.io/component-base/metrics/testutil" - "k8s.io/metrics/pkg/apis/metrics" -) - -func TestPodList(t *testing.T) { - tcs := []struct { - name string - listOptions *metainternalversion.ListOptions - listerError error - wantPods []apitypes.NamespacedName - wantError bool - }{ - { - name: "Normal", - wantPods: []apitypes.NamespacedName{ - {Name: "pod1", Namespace: "other"}, - {Name: "pod2", Namespace: "other"}, - {Name: "pod3", Namespace: "testValue"}, - }, - }, - { - name: "Empty response", - listOptions: &metainternalversion.ListOptions{ - FieldSelector: fields.SelectorFromSet(map[string]string{ - "metadata.namespace": "unknown", - }), - }, - }, - { - name: "With FieldOptions", - listOptions: &metainternalversion.ListOptions{ - FieldSelector: fields.SelectorFromSet(map[string]string{ - "metadata.namespace": "testValue", - }), - }, - wantPods: []apitypes.NamespacedName{{Name: "pod3", Namespace: "testValue"}}, - }, - { - name: "With Label selectors", - listOptions: &metainternalversion.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{ - "labelKey": "labelValue", - }), - }, - wantPods: []apitypes.NamespacedName{{Name: "pod1", Namespace: "other"}}, - }, - { - name: "With both fields and label selectors", - listOptions: &metainternalversion.ListOptions{ - FieldSelector: fields.SelectorFromSet(map[string]string{ - "metadata.name": "pod3", - }), - LabelSelector: labels.SelectorFromSet(map[string]string{ - "labelKey": "otherValue", - }), - }, - wantPods: []apitypes.NamespacedName{{Name: "pod3", Namespace: "testValue"}}, - }, - { - name: "Lister error", - listerError: fmt.Errorf("lister error"), - wantPods: []apitypes.NamespacedName{}, - wantError: true, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - for _, partial := range []bool{true, false} { - // setup - r := NewPodTestStorage(tc.listerError, partial) - - // execute - got, err := r.List(genericapirequest.NewContext(), tc.listOptions) - - // assert - if (err != nil) != tc.wantError { - t.Fatalf("Unexpected error: %v", err) - } - res := got.(*metrics.PodMetricsList) - - if len(res.Items) != len(tc.wantPods) { - t.Fatalf("len(res.Items) != %d, got: %d", len(tc.wantPods), len(res.Items)) - } - for i := range res.Items { - testPod(t, res.Items[i], tc.wantPods[i]) - } - } - }) - } -} - -func TestPodGet(t *testing.T) { - tcs := []struct { - name string - pods *corev1.Pod - get apitypes.NamespacedName - listerError error - wantPod apitypes.NamespacedName - wantError bool - }{ - { - name: "Normal", - pods: createTestPods()[0], - get: apitypes.NamespacedName{Name: "pod1", Namespace: "other"}, - wantPod: apitypes.NamespacedName{Name: "pod1", Namespace: "other"}, - }, - { - name: "Lister error", - get: apitypes.NamespacedName{Name: "pod1", Namespace: "other"}, - listerError: fmt.Errorf("lister error"), - wantError: true, - }, - { - name: "Pod without metrics", - get: apitypes.NamespacedName{Name: "pod4", Namespace: "testValue"}, - wantError: true, - }, - { - name: "Pod doesn't exist", - get: apitypes.NamespacedName{Name: "pod5", Namespace: "other"}, - wantError: true, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - for _, partial := range []bool{true, false} { - // setup - r := NewPodTestStorage(tc.listerError, partial) - - // execute - got, err := r.Get(genericapirequest.NewContext(), tc.get.Name, nil) - - // assert - if (err != nil) != tc.wantError { - t.Fatalf("Unexpected error: %v", err) - } - if tc.wantError { - return - } - res := got.(*metrics.PodMetrics) - testPod(t, *res, tc.wantPod) - } - }) - } -} - -func TestPodList_Monitoring(t *testing.T) { - c := &fakeClock{} - myClock = c - - metricFreshness.Create(nil) - metricFreshness.Reset() - - r := NewPodTestStorage(nil, false) - c.now = c.now.Add(10 * time.Second) - _, err := r.List(genericapirequest.NewContext(), nil) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - err = testutil.CollectAndCompare(metricFreshness, strings.NewReader(` - # HELP metrics_server_api_metric_freshness_seconds [ALPHA] Freshness of metrics exported - # TYPE metrics_server_api_metric_freshness_seconds histogram - metrics_server_api_metric_freshness_seconds_bucket{le="1"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="1.364"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="1.8604960000000004"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="2.5377165440000007"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="3.4614453660160014"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="4.721411479245826"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="6.440005257691307"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="8.784167171490942"} 0 - metrics_server_api_metric_freshness_seconds_bucket{le="11.981604021913647"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="16.342907885890217"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="22.291726356354257"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="30.405914750067208"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="41.47366771909167"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="56.57008276884105"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="77.16159289669919"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="105.2484127110977"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="143.55883493793726"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="195.81425085534644"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="267.09063816669254"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="364.31163045936864"} 3 - metrics_server_api_metric_freshness_seconds_bucket{le="+Inf"} 3 - metrics_server_api_metric_freshness_seconds_sum 30 - metrics_server_api_metric_freshness_seconds_count 3 - `), "metrics_server_api_metric_freshness_seconds") - if err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -// fakes both PodLister and PodNamespaceLister at once -type fakePodLister struct { - data []*corev1.Pod - err error - - partialResult bool -} - -func (pl fakePodLister) List(selector labels.Selector) (ret []runtime.Object, err error) { - if pl.err != nil { - return nil, pl.err - } - res := []runtime.Object{} - for _, pod := range pl.data { - if selector.Matches(labels.Set(pod.Labels)) { - if pl.partialResult { - res = append(res, &metav1.PartialObjectMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - Labels: pod.Labels, - }, - }) - } else { - res = append(res, pod) - } - } - } - return res, nil -} - -func (pl fakePodLister) Get(name string) (runtime.Object, error) { - if pl.err != nil { - return nil, pl.err - } - for _, pod := range pl.data { - if pod.Name == name { - if !pl.partialResult { - return pod, nil - } - return &metav1.PartialObjectMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - Labels: pod.Labels, - }, - }, nil - } - } - return nil, nil -} - -func (pl fakePodLister) ByNamespace(namespace string) cache.GenericNamespaceLister { - return pl -} - -type fakePodMetricsGetter struct { - now time.Time -} - -var _ PodMetricsGetter = (*fakePodMetricsGetter)(nil) - -//nolint:goconst -func (mp fakePodMetricsGetter) GetPodMetrics(pods ...*metav1.PartialObjectMetadata) ([]metrics.PodMetrics, error) { - ms := make([]metrics.PodMetrics, 0, len(pods)) - for _, pod := range pods { - switch { - case pod.Name == "pod1" && pod.Namespace == "other": - ms = append(ms, metrics.PodMetrics{ - ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace, Labels: pod.Labels}, - Timestamp: metav1.Time{Time: mp.now}, - Window: metav1.Duration{Duration: 1000}, - Containers: []metrics.ContainerMetrics{ - {Name: "metric1", Usage: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("10m")}}, - {Name: "metric1-b", Usage: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("5Mi")}}, - }, - }) - case pod.Name == "pod2" && pod.Namespace == "other": - ms = append(ms, metrics.PodMetrics{ - ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace, Labels: pod.Labels}, - Timestamp: metav1.Time{Time: mp.now}, - Window: metav1.Duration{Duration: 2000}, - Containers: []metrics.ContainerMetrics{ - {Name: "metric2", Usage: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("20m"), - corev1.ResourceMemory: resource.MustParse("15Mi"), - }}, - }, - }) - case pod.Name == "pod3" && pod.Namespace == "testValue": - ms = append(ms, metrics.PodMetrics{ - ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace, Labels: pod.Labels}, - Timestamp: metav1.Time{Time: mp.now}, - Window: metav1.Duration{Duration: 3000}, - Containers: []metrics.ContainerMetrics{ - {Name: "metric3", Usage: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("20m"), - corev1.ResourceMemory: resource.MustParse("25Mi"), - }}, - }, - }) - } - } - return ms, nil -} - -func NewPodTestStorage(listerError error, partial bool) *PodMetrics { - return &PodMetrics{ - podLister: fakePodLister{data: createTestPods(), err: listerError, partialResult: partial}, - metrics: fakePodMetricsGetter{now: myClock.Now()}, - } -} - -func testPod(t *testing.T, got metrics.PodMetrics, want apitypes.NamespacedName) { - t.Helper() - if got.Name != want.Name { - t.Errorf(`Name != "%s", got: %+v`, want.Name, got.Name) - } - if got.Namespace != want.Namespace { - t.Errorf(`Namespace != "%s", got: %+v`, want.Namespace, got.Namespace) - } - wantLabels := podLabels(want.Name, want.Namespace) - if diff := cmp.Diff(got.Labels, wantLabels); diff != "" { - t.Errorf(`Labels != %+v, diff: %s`, wantLabels, diff) - } -} - -func createTestPods() []*corev1.Pod { - pod1 := &corev1.Pod{} - pod1.Namespace = "other" - pod1.Name = "pod1" - pod1.Status.Phase = corev1.PodRunning - pod1.Labels = podLabels(pod1.Name, pod1.Namespace) - pod2 := &corev1.Pod{} - pod2.Namespace = "other" - pod2.Name = "pod2" - pod2.Status.Phase = corev1.PodRunning - pod2.Labels = podLabels(pod2.Name, pod2.Namespace) - pod3 := &corev1.Pod{} - pod3.Namespace = "testValue" - pod3.Name = "pod3" - pod3.Status.Phase = corev1.PodRunning - pod3.Labels = podLabels(pod3.Name, pod3.Namespace) - pod4 := &corev1.Pod{} - pod4.Namespace = "other" - pod4.Name = "pod4" - pod4.Status.Phase = corev1.PodRunning - pod4.Labels = podLabels(pod4.Name, pod4.Namespace) - return []*corev1.Pod{pod1, pod2, pod3, pod4} -} - -func podLabels(name, namespace string) map[string]string { - var labels map[string]string - switch { - case name == "pod1" && namespace == "other": - labels = map[string]string{ - "labelKey": "labelValue", - } - case name == "pod2" && namespace == "other": - labels = map[string]string{ - "otherKey": "labelValue", - } - case name == "pod3" && namespace == "testValue": - labels = map[string]string{ - "labelKey": "otherValue", - } - case name == "pod4" && namespace == "testValue": - labels = map[string]string{ - "otherKey": "otherValue", - } - } - return labels -} - -func Test_NewPodMetrics(t *testing.T) { - podLister := fakePodLister{ - data: createTestPods(), - } - getter := fakePodMetricsGetter{now: myClock.Now()} - - podMetrics := NewPodMetrics(schema.GroupResource{}, getter, podLister) - - got := podMetrics.New() - want := &metrics.PodMetrics{} - if !reflect.DeepEqual(got, want) { - t.Errorf(`want = %+v, got: %+v`, want, got) - } - - gotList := podMetrics.NewList() - wantList := &metrics.PodMetricsList{} - if !reflect.DeepEqual(gotList, wantList) { - t.Errorf(`want = %+v, got: %+v`, wantList, gotList) - } - - if podMetrics.Kind() != "PodMetrics" { - t.Errorf(`want = NodeMetrics, got: %v`, podMetrics.Kind()) - } - - if !podMetrics.NamespaceScoped() { - t.Errorf(`want = true, got: %v`, podMetrics.NamespaceScoped()) - } - - podMetrics.Destroy() -} diff --git a/pkg/registry/hpaaggregator/metrics/resource_metrics.go b/pkg/registry/hpaaggregator/metrics/resource_metrics.go index 7486e3d6..39488b3f 100644 --- a/pkg/registry/hpaaggregator/metrics/resource_metrics.go +++ b/pkg/registry/hpaaggregator/metrics/resource_metrics.go @@ -17,7 +17,6 @@ limitations under the License. package metrics import ( - "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -28,6 +27,7 @@ import ( metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" "github.com/kubewharf/kubeadmiral/pkg/registry/hpaaggregator/metrics/resource" + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" ) // BuildResourceMetrics constructs APIGroupInfo the metrics.k8s.io API group using the given getters. diff --git a/pkg/util/aggregatedlister/endpointSlice.go b/pkg/util/aggregatedlister/endpointSlice.go new file mode 100644 index 00000000..07690d34 --- /dev/null +++ b/pkg/util/aggregatedlister/endpointSlice.go @@ -0,0 +1,145 @@ +/* +Copyright 2025 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregatedlister + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" +) + +type EndpointSliceLister struct { + federatedInformerManager informermanager.FederatedInformerManager +} + +type EndpointSliceNamespaceLister struct { + namespace string + federatedInformerManager informermanager.FederatedInformerManager +} + +func NewEndpointSliceLister(informer informermanager.FederatedInformerManager) *EndpointSliceLister { + return &EndpointSliceLister{federatedInformerManager: informer} +} + +func (e *EndpointSliceLister) ByNamespace(namespace string) AggregatedNamespaceLister { + return &EndpointSliceNamespaceLister{federatedInformerManager: e.federatedInformerManager, namespace: namespace} +} + +func (e *EndpointSliceNamespaceLister) List(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + grv := NewGlobalResourceVersionFromString(opts.ResourceVersion) + retGrv := grv.Clone() + clusters, err := e.federatedInformerManager.GetReadyClusters() + if err != nil { + return nil, err + } + var resultObject runtime.Object + items := make([]runtime.Object, 0) + for _, cluster := range clusters { + client, exists := e.federatedInformerManager.GetClusterKubeClient(cluster.Name) + if !exists { + continue + } + + endpointSliceList, err := client.DiscoveryV1().EndpointSlices(e.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: opts.LabelSelector, + FieldSelector: opts.FieldSelector, + ResourceVersion: grv.Get(cluster.Name), + }) + if err != nil { + continue + } + endpointSlices := endpointSliceList.Items + + list, err := meta.ListAccessor(endpointSliceList) + if err != nil { + continue + } + + if resultObject == nil { + resultObject = endpointSliceList + } + + for _, eps := range endpointSlices { + clusterobject.MakeObjectUnique(&eps, cluster.Name) + epsObj := eps.DeepCopyObject() + items = append(items, epsObj) + } + + retGrv.Set(cluster.Name, list.GetResourceVersion()) + } + + if resultObject == nil { + resultObject = &metav1.List{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "List", + }, + ListMeta: metav1.ListMeta{}, + Items: []runtime.RawExtension{}, + } + } + + err = meta.SetList(resultObject, items) + if err != nil { + return nil, err + } + accessor, err := meta.ListAccessor(resultObject) + if err != nil { + return nil, err + } + accessor.SetResourceVersion(retGrv.String()) + return resultObject, nil +} + +func (e *EndpointSliceNamespaceLister) Get(ctx context.Context, name string, opts metav1.GetOptions) (runtime.Object, error) { + clusters, err := e.federatedInformerManager.GetReadyClusters() + if err != nil { + return nil, err + } + + for _, cluster := range clusterobject.GetPossibleClusters(clusters, name) { + client, exists := e.federatedInformerManager.GetClusterKubeClient(cluster) + if !exists { + continue + } + + endpointSliceList, err := client.DiscoveryV1().EndpointSlices(e.namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + continue + } + endpointSlices := endpointSliceList.Items + + for i := range endpointSlices { + if name == clusterobject.GenUniqueName(cluster, endpointSlices[i].Name) { + eps := endpointSlices[i].DeepCopy() + clusterobject.MakeObjectUnique(eps, cluster) + grv := NewGlobalResourceVersionWithCapacity(1) + grv.Set(cluster, eps.GetResourceVersion()) + eps.SetResourceVersion(grv.String()) + return eps, nil + } + } + } + return nil, apierrors.NewNotFound(corev1.Resource("endpointSlice"), name) +} diff --git a/pkg/util/aggregatedlister/pod.go b/pkg/util/aggregatedlister/pod.go index 7d76305e..c8513df9 100644 --- a/pkg/util/aggregatedlister/pod.go +++ b/pkg/util/aggregatedlister/pod.go @@ -19,13 +19,14 @@ package aggregatedlister import ( "context" - "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" - "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" ) type PodLister struct { diff --git a/pkg/util/aggregatedlister/service.go b/pkg/util/aggregatedlister/service.go new file mode 100644 index 00000000..cc7712e4 --- /dev/null +++ b/pkg/util/aggregatedlister/service.go @@ -0,0 +1,145 @@ +/* +Copyright 2025 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregatedlister + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" +) + +type ServiceLister struct { + federatedInformerManager informermanager.FederatedInformerManager +} + +type ServiceNamespaceLister struct { + namespace string + federatedInformerManager informermanager.FederatedInformerManager +} + +func NewServiceLister(informer informermanager.FederatedInformerManager) *ServiceLister { + return &ServiceLister{federatedInformerManager: informer} +} + +func (s *ServiceLister) ByNamespace(namespace string) AggregatedNamespaceLister { + return &ServiceNamespaceLister{federatedInformerManager: s.federatedInformerManager, namespace: namespace} +} + +func (s *ServiceNamespaceLister) List(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + grv := NewGlobalResourceVersionFromString(opts.ResourceVersion) + retGrv := grv.Clone() + clusters, err := s.federatedInformerManager.GetReadyClusters() + if err != nil { + return nil, err + } + var resultObject runtime.Object + items := make([]runtime.Object, 0) + for _, cluster := range clusters { + client, exists := s.federatedInformerManager.GetClusterKubeClient(cluster.Name) + if !exists { + continue + } + + serviceList, err := client.CoreV1().Services(s.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: opts.LabelSelector, + FieldSelector: opts.FieldSelector, + ResourceVersion: grv.Get(cluster.Name), + }) + if err != nil { + continue + } + services := serviceList.Items + + list, err := meta.ListAccessor(serviceList) + if err != nil { + continue + } + + if resultObject == nil { + resultObject = serviceList + } + + for _, svc := range services { + clusterobject.MakeObjectUnique(&svc, cluster.Name) + svcObj := svc.DeepCopyObject() + items = append(items, svcObj) + } + + retGrv.Set(cluster.Name, list.GetResourceVersion()) + } + + if resultObject == nil { + resultObject = &metav1.List{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "List", + }, + ListMeta: metav1.ListMeta{}, + Items: []runtime.RawExtension{}, + } + } + + err = meta.SetList(resultObject, items) + if err != nil { + return nil, err + } + accessor, err := meta.ListAccessor(resultObject) + if err != nil { + return nil, err + } + accessor.SetResourceVersion(retGrv.String()) + return resultObject, nil +} + +func (s *ServiceNamespaceLister) Get(ctx context.Context, name string, opts metav1.GetOptions) (runtime.Object, error) { + clusters, err := s.federatedInformerManager.GetReadyClusters() + if err != nil { + return nil, err + } + + for _, cluster := range clusterobject.GetPossibleClusters(clusters, name) { + client, exists := s.federatedInformerManager.GetClusterKubeClient(cluster) + if !exists { + continue + } + + serviceList, err := client.CoreV1().Services(s.namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + continue + } + services := serviceList.Items + + for i := range services { + if name == clusterobject.GenUniqueName(cluster, services[i].Name) { + service := services[i].DeepCopy() + clusterobject.MakeObjectUnique(service, cluster) + grv := NewGlobalResourceVersionWithCapacity(1) + grv.Set(cluster, service.GetResourceVersion()) + service.SetResourceVersion(grv.String()) + return service, nil + } + } + } + return nil, apierrors.NewNotFound(corev1.Resource("service"), name) +}