Skip to content

Commit

Permalink
Extract the resource event handler functions into a separate file, an…
Browse files Browse the repository at this point in the history
…d add unit test making sure the add/update/delete functions are called, and that in particular the update function is _not_ called when updating a Link status.
  • Loading branch information
alpeb committed Jan 22, 2025
1 parent d0f85c4 commit 207bdf2
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 63 deletions.
9 changes: 9 additions & 0 deletions controller/k8s/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
ewinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload/v1beta1"
linkinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/link/v1alpha2"
srvinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server/v1beta3"
spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/pkg/k8s"
Expand Down Expand Up @@ -52,6 +53,7 @@ type API struct {
es discoveryinformers.EndpointSliceInformer
ew ewinformers.ExternalWorkloadInformer
job batchv1informers.JobInformer
link linkinformers.LinkInformer
mwc arinformers.MutatingWebhookConfigurationInformer
ns coreinformers.NamespaceInformer
pod coreinformers.PodInformer
Expand Down Expand Up @@ -248,6 +250,13 @@ func newAPI(
api.job = sharedInformers.Batch().V1().Jobs()
api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer())
case Link:
if l5dCrdSharedInformers == nil {
panic("Linkerd CRD shared informer not configured")
}
api.link = l5dCrdSharedInformers.Link().V1alpha2().Links()
api.syncChecks = append(api.syncChecks, api.link.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Link, informerLabels, api.link.Informer())
case MWC:
api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations()
api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced)
Expand Down
1 change: 1 addition & 0 deletions controller/k8s/api_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
ES // EndpointSlice resource
ExtWorkload
Job
Link
MWC
NS
Pod
Expand Down
1 change: 1 addition & 0 deletions controller/k8s/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrd
DS,
Endpoint,
Job,
Link,
MWC,
NS,
Pod,
Expand Down
64 changes: 1 addition & 63 deletions multicluster/cmd/service-mirror/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"os"
"os/signal"
"reflect"
"syscall"
"time"

Expand All @@ -22,7 +21,6 @@ import (
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
Expand Down Expand Up @@ -150,67 +148,7 @@ func Main(args []string) {
log.Infof("Starting Link informer")
informerFactory.Start(ctx.Done())

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
link, ok := obj.(*v1alpha2.Link)
if !ok {
log.Errorf("object is not a Link: %+v", obj)
return
}
if link.GetName() == linkName {
select {
case results <- link:
default:
log.Errorf("Link update dropped (queue full): %s", link.GetName())
}
}
},
UpdateFunc: func(oldObj, currentObj interface{}) {
oldLink, ok := oldObj.(*v1alpha2.Link)
if !ok {
log.Errorf("object is not a Link: %+v", oldObj)
return
}
currentLink, ok := currentObj.(*v1alpha2.Link)
if !ok {
log.Errorf("object is not a Link: %+v", currentObj)
return
}
if reflect.DeepEqual(oldLink.Spec, currentLink.Spec) {
log.Debugf("Link update ignored (only status changed): %s", currentLink.GetName())
return
}
if currentLink.GetName() == linkName {
select {
case results <- currentLink:
default:
log.Errorf("Link update dropped (queue full): %s", currentLink.GetName())
}
}
},
DeleteFunc: func(obj interface{}) {
link, ok := obj.(*v1alpha2.Link)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
return
}
link, ok = tombstone.Obj.(*v1alpha2.Link)
if !ok {
log.Errorf("DeletedFinalStateUnknown contained object that is not a Link %#v", obj)
return
}
}
if link.GetName() == linkName {
select {
case results <- nil: // nil indicates the link was deleted
default:
log.Errorf("Link delete dropped (queue full): %s", link.GetName())
}
}
},
})
_, err := informer.AddEventHandler(servicemirror.GetLinkHandlers(results, linkName))
if err != nil {
log.Fatalf("Failed to add event handler to Link informer: %s", err)
}
Expand Down
73 changes: 73 additions & 0 deletions multicluster/service-mirror/link_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package servicemirror

import (
"reflect"

"github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha2"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
)

func GetLinkHandlers(results chan<- *v1alpha2.Link, linkName string) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
link, ok := obj.(*v1alpha2.Link)
if !ok {
log.Errorf("object is not a Link: %+v", obj)
return
}
if link.GetName() == linkName {
select {
case results <- link:
default:
log.Errorf("Link update dropped (queue full): %s", link.GetName())
}
}
},
UpdateFunc: func(oldObj, currentObj interface{}) {
oldLink, ok := oldObj.(*v1alpha2.Link)
if !ok {
log.Errorf("object is not a Link: %+v", oldObj)
return
}
currentLink, ok := currentObj.(*v1alpha2.Link)
if !ok {
log.Errorf("object is not a Link: %+v", currentObj)
return
}
if reflect.DeepEqual(oldLink.Spec, currentLink.Spec) {
log.Debugf("Link update ignored (only status changed): %s", currentLink.GetName())
return
}
if currentLink.GetName() == linkName {
select {
case results <- currentLink:
default:
log.Errorf("Link update dropped (queue full): %s", currentLink.GetName())
}
}
},
DeleteFunc: func(obj interface{}) {
link, ok := obj.(*v1alpha2.Link)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
return
}
link, ok = tombstone.Obj.(*v1alpha2.Link)
if !ok {
log.Errorf("DeletedFinalStateUnknown contained object that is not a Link %#v", obj)
return
}
}
if link.GetName() == linkName {
select {
case results <- nil: // nil indicates the link was deleted
default:
log.Errorf("Link delete dropped (queue full): %s", link.GetName())
}
}
},
}
}
141 changes: 141 additions & 0 deletions multicluster/service-mirror/link_handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package servicemirror

import (
"context"
"encoding/json"
"log"
"testing"
"time"

"github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha2"
l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
"github.com/linkerd/linkerd2/controller/k8s"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

const nsName = "ns1"
const linkName = "linkName"

func TestLinkHandlers(t *testing.T) {
k8sAPI, l5dAPI, err := k8s.NewFakeAPIWithL5dClient()
if err != nil {
t.Fatal(err)
}
k8sAPI.Sync(nil)

informerFactory := l5dcrdinformer.NewSharedInformerFactoryWithOptions(
l5dAPI,
k8s.ResyncTime,
l5dcrdinformer.WithNamespace(nsName),
)
informer := informerFactory.Link().V1alpha2().Links().Informer()
informerFactory.Start(context.Background().Done())

results := make(chan *v1alpha2.Link, 100)
_, err = informer.AddEventHandler(GetLinkHandlers(results, linkName))
if err != nil {
t.Fatal(err)
}

// test that a message is received when a link is created
_, err = k8sAPI.Client.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

link := &v1alpha2.Link{
ObjectMeta: metav1.ObjectMeta{
Name: linkName,
Namespace: nsName,
},
Spec: v1alpha2.LinkSpec{ProbeSpec: v1alpha2.ProbeSpec{Timeout: "30s"}},
}
_, err = l5dAPI.LinkV1alpha2().Links(nsName).Create(context.Background(), link, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case link := <-results:
if link.GetName() != linkName {
t.Fatalf("Expected LinkName, got %s", link.GetName())
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}

// test that a message is received when a link spec is updated
patch := map[string]any{
"spec": map[string]any{
"probeSpec": map[string]any{
"timeout": "60s",
},
},
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Fatalf("Failed to marshal patch: %v", err)
}
_, err = l5dAPI.LinkV1alpha2().Links(nsName).Patch(
context.Background(),
linkName,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{},
)
if err != nil {
t.Fatalf("Failed to patch link: %s", err)
}

select {
case link := <-results:
if link.GetName() != linkName {
t.Fatalf("Expected LinkName, got %s", link.GetName())
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}

// test that a message is _not_ received when a link status is updated
patch = map[string]any{
"status": map[string]any{
"foo": "bar",
},
}
patchBytes, err = json.Marshal(patch)
if err != nil {
log.Fatalf("Failed to marshal patch: %v", err)
}
_, err = l5dAPI.LinkV1alpha2().Links(nsName).Patch(
context.Background(),
linkName,
types.MergePatchType,
patchBytes,
metav1.PatchOptions{},
"status",
)
if err != nil {
t.Fatalf("Failed to patch link: %s", err)
}

select {
case link := <-results:
t.Fatalf("Received unexpected message: %v", link)
case <-time.After(time.Second):
}

// test that a nil message is received when a link is deleted
if err := l5dAPI.LinkV1alpha2().Links(nsName).Delete(context.Background(), linkName, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete link: %s", err)
}
select {
case link := <-results:
if link != nil {
t.Fatalf("Expected nil, got %v", link)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
}
1 change: 1 addition & 0 deletions pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
EndpointSlices = "endpointslices"
ExtWorkload = "externalworkload"
Job = "job"
Link = "link"
MeshTLSAuthentication = "meshtlsauthentication"
MutatingWebhookConfig = "mutatingwebhookconfig"
Namespace = "namespace"
Expand Down

0 comments on commit 207bdf2

Please sign in to comment.