Skip to content

Commit

Permalink
feat:support svc and eps list and watch
Browse files Browse the repository at this point in the history
  • Loading branch information
Poor12 committed Jan 15, 2025
1 parent 83b9df2 commit f024694
Show file tree
Hide file tree
Showing 15 changed files with 1,001 additions and 547 deletions.
4 changes: 4 additions & 0 deletions pkg/hpaaggregatorapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,15 @@ func (c completedConfig) New() (*Server, error) {

apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(hpaaggregatorapi.GroupName, Scheme, ParameterCodec, Codecs)
podLister := aggregatedlister.NewPodLister(c.ExtraConfig.FederatedInformerManager)
serviceLister := aggregatedlister.NewServiceLister(c.ExtraConfig.FederatedInformerManager)
endpointSliceLister := aggregatedlister.NewEndpointSliceLister(c.ExtraConfig.FederatedInformerManager)

v1alpha1storage := map[string]rest.Storage{}
aggregationAPI, err := aggregation.NewREST(
c.ExtraConfig.FederatedInformerManager,
podLister,
serviceLister,
endpointSliceLister,
c.ExtraConfig.RestConfig,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
klog.Background().WithValues("api", "aggregations"),
Expand Down
205 changes: 203 additions & 2 deletions pkg/lifted/kubernetes/pkg/printers/internalversion/printers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ package internalversion
import (
"fmt"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/printers"
)
Expand All @@ -41,7 +45,10 @@ import (
// on the node it is (was) running.
//
// taken from https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/util/node/node.go#L37
const NodeUnreachablePodReason = "NodeLost"
const (
NodeUnreachablePodReason = "NodeLost"
loadBalancerWidth = 16
)

// AddHandlers adds print handlers for default Kubernetes types dealing with internal versions.
// TODO: handle errors from Handler
Expand All @@ -61,6 +68,28 @@ func AddHandlers(h printers.PrintHandler) {
}
_ = h.TableHandler(podColumnDefinitions, printPodList)
_ = h.TableHandler(podColumnDefinitions, printPod)

serviceColumnDefinitions := []metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
{Name: "Type", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["type"]},
{Name: "Cluster-IP", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["clusterIP"]},
{Name: "External-IP", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["externalIPs"]},
{Name: "Port(s)", Type: "string", Description: corev1.ServiceSpec{}.SwaggerDoc()["ports"]},
{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
{Name: "Selector", Type: "string", Priority: 1, Description: corev1.ServiceSpec{}.SwaggerDoc()["selector"]},
}
_ = h.TableHandler(serviceColumnDefinitions, printService)
_ = h.TableHandler(serviceColumnDefinitions, printServiceList)

endpointSliceColumnDefinitions := []metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
{Name: "AddressType", Type: "string", Description: discoveryv1.EndpointSlice{}.SwaggerDoc()["addressType"]},
{Name: "Ports", Type: "string", Description: discoveryv1.EndpointSlice{}.SwaggerDoc()["ports"]},
{Name: "Endpoints", Type: "string", Description: discoveryv1.EndpointSlice{}.SwaggerDoc()["endpoints"]},
{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
}
_ = h.TableHandler(endpointSliceColumnDefinitions, printEndpointSlice)
_ = h.TableHandler(endpointSliceColumnDefinitions, printEndpointSliceList)
}

// translateTimestampSince returns the elapsed time since timestamp in
Expand Down Expand Up @@ -91,7 +120,7 @@ func printPodList(podList *corev1.PodList, options printers.GenerateOptions) ([]
return rows, nil
}

//nolint
// nolint
func printPod(pod *corev1.Pod, options printers.GenerateOptions) ([]metav1.TableRow, error) {
restarts := 0
totalContainers := len(pod.Spec.Containers)
Expand Down Expand Up @@ -246,6 +275,178 @@ func printPod(pod *corev1.Pod, options printers.GenerateOptions) ([]metav1.Table
return []metav1.TableRow{row}, nil
}

func printService(obj *corev1.Service, options printers.GenerateOptions) ([]metav1.TableRow, error) {
row := metav1.TableRow{
Object: runtime.RawExtension{Object: obj},
}
svcType := obj.Spec.Type
internalIP := "<none>"
if len(obj.Spec.ClusterIPs) > 0 {
internalIP = obj.Spec.ClusterIPs[0]
}

externalIP := getServiceExternalIP(obj, options.Wide)
svcPorts := makePortString(obj.Spec.Ports)
if len(svcPorts) == 0 {
svcPorts = "<none>"
}

row.Cells = append(row.Cells, obj.Name, string(svcType), internalIP, externalIP, svcPorts, translateTimestampSince(obj.CreationTimestamp))
if options.Wide {
row.Cells = append(row.Cells, labels.FormatLabels(obj.Spec.Selector))
}

return []metav1.TableRow{row}, nil
}

func getServiceExternalIP(svc *corev1.Service, wide bool) string {
switch svc.Spec.Type {
case corev1.ServiceTypeClusterIP:
if len(svc.Spec.ExternalIPs) > 0 {
return strings.Join(svc.Spec.ExternalIPs, ",")
}
return "<none>"
case corev1.ServiceTypeNodePort:
if len(svc.Spec.ExternalIPs) > 0 {
return strings.Join(svc.Spec.ExternalIPs, ",")
}
return "<none>"
case corev1.ServiceTypeLoadBalancer:
lbIps := loadBalancerStatusStringer(svc.Status.LoadBalancer, wide)
if len(svc.Spec.ExternalIPs) > 0 {
results := []string{}
if len(lbIps) > 0 {
results = append(results, strings.Split(lbIps, ",")...)
}
results = append(results, svc.Spec.ExternalIPs...)
return strings.Join(results, ",")
}
if len(lbIps) > 0 {
return lbIps
}
return "<pending>"
case corev1.ServiceTypeExternalName:
return svc.Spec.ExternalName
}
return "<unknown>"
}

// loadBalancerStatusStringer behaves mostly like a string interface and converts the given status to a string.
// `wide` indicates whether the returned value is meant for --o=wide output. If not, it's clipped to 16 bytes.
func loadBalancerStatusStringer(s corev1.LoadBalancerStatus, wide bool) string {
ingress := s.Ingress
result := sets.NewString()
for i := range ingress {
if ingress[i].IP != "" {
result.Insert(ingress[i].IP)
} else if ingress[i].Hostname != "" {
result.Insert(ingress[i].Hostname)
}
}

r := strings.Join(result.List(), ",")
if !wide && len(r) > loadBalancerWidth {
r = r[0:(loadBalancerWidth-3)] + "..."
}
return r
}

func makePortString(ports []corev1.ServicePort) string {
pieces := make([]string, len(ports))
for ix := range ports {
port := &ports[ix]
pieces[ix] = fmt.Sprintf("%d/%s", port.Port, port.Protocol)
if port.NodePort > 0 {
pieces[ix] = fmt.Sprintf("%d:%d/%s", port.Port, port.NodePort, port.Protocol)
}
}
return strings.Join(pieces, ",")
}

func printServiceList(list *corev1.ServiceList, options printers.GenerateOptions) ([]metav1.TableRow, error) {
rows := make([]metav1.TableRow, 0, len(list.Items))
for i := range list.Items {
r, err := printService(&list.Items[i], options)
if err != nil {
return nil, err
}
rows = append(rows, r...)
}
return rows, nil
}

func printEndpointSlice(obj *discoveryv1.EndpointSlice, options printers.GenerateOptions) ([]metav1.TableRow, error) {
row := metav1.TableRow{
Object: runtime.RawExtension{Object: obj},
}
row.Cells = append(row.Cells, obj.Name, string(obj.AddressType), formatDiscoveryPorts(obj.Ports),
formatDiscoveryEndpoints(obj.Endpoints), translateTimestampSince(obj.CreationTimestamp))
return []metav1.TableRow{row}, nil
}

func printEndpointSliceList(list *discoveryv1.EndpointSliceList, options printers.GenerateOptions) ([]metav1.TableRow, error) {
rows := make([]metav1.TableRow, 0, len(list.Items))
for i := range list.Items {
r, err := printEndpointSlice(&list.Items[i], options)
if err != nil {
return nil, err
}
rows = append(rows, r...)
}
return rows, nil
}

func formatDiscoveryPorts(ports []discoveryv1.EndpointPort) string {
list := []string{}
max := 3
more := false
count := 0
for _, port := range ports {
if len(list) < max {
portNum := "*"
if port.Port != nil {
portNum = strconv.Itoa(int(*port.Port))
} else if port.Name != nil {
portNum = *port.Name
}
list = append(list, portNum)
} else if len(list) == max {
more = true
}
count++
}
return listWithMoreString(list, more, count, max)
}

func listWithMoreString(list []string, more bool, count, max int) string {
ret := strings.Join(list, ",")
if more {
return fmt.Sprintf("%s + %d more...", ret, count-max)
}
if ret == "" {
ret = "<unset>"
}
return ret
}

func formatDiscoveryEndpoints(endpoints []discoveryv1.Endpoint) string {
list := []string{}
max := 3
more := false
count := 0
for _, endpoint := range endpoints {
for _, address := range endpoint.Addresses {
if len(list) < max {
list = append(list, address)
} else if len(list) == max {
more = true
}
count++
}
}
return listWithMoreString(list, more, count, max)
}

func hasPodReadyCondition(conditions []corev1.PodCondition) bool {
for _, condition := range conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
Expand Down
34 changes: 33 additions & 1 deletion pkg/registry/hpaaggregator/aggregation/forward/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,6 +39,12 @@ import (
printerstorage "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/printers/storage"
)

const (
getVerb = "get"
listVerb = "list"
watchVerb = "watch"
)

var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
Expand All @@ -55,7 +62,7 @@ var (
tableConvertor = printerstorage.TableConvertor{
TableGenerator: printers.NewTableGenerator().With(internalversion.AddHandlers),
}
scope = &handlers.RequestScope{
podScope = &handlers.RequestScope{
Namer: &handlers.ContextBasedNaming{
Namer: runtime.Namer(meta.NewAccessor()),
ClusterScoped: false,
Expand All @@ -67,11 +74,36 @@ var (
MetaGroupVersion: metav1.SchemeGroupVersion,
Resource: common.PodGVR,
}
serviceScope = &handlers.RequestScope{
Namer: &handlers.ContextBasedNaming{
Namer: runtime.Namer(meta.NewAccessor()),
ClusterScoped: false,
},
Serializer: codecs,
Kind: common.ServiceGVK,
TableConvertor: tableConvertor,
Convertor: scheme,
MetaGroupVersion: metav1.SchemeGroupVersion,
Resource: common.ServiceGVR,
}
endpointSliceScope = &handlers.RequestScope{
Namer: &handlers.ContextBasedNaming{
Namer: runtime.Namer(meta.NewAccessor()),
ClusterScoped: false,
},
Serializer: codecs,
Kind: common.EndpointSliceGVK,
TableConvertor: tableConvertor,
Convertor: scheme,
MetaGroupVersion: metav1.SchemeGroupVersion,
Resource: common.EndpointSliceGVR,
}
)

func init() {
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(discoveryv1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion))

scheme.AddUnversionedTypes(unversionedVersion, unversionedTypes...)
Expand Down
Loading

0 comments on commit f024694

Please sign in to comment.