Skip to content

Commit

Permalink
Fix data race in watch and log resources
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Jan 3, 2025
1 parent c26274b commit 5a243a5
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions test/e2e/sacura_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -101,29 +102,53 @@ func TestSacuraBrokerJob(t *testing.T) {
})
}

type Event struct {
GVR schema.GroupVersionResource
Event watch.Event
}

func runSacuraTest(t *testing.T, config SacuraTestConfig) {

c := testlib.Setup(t, false)
defer testlib.TearDown(c)

ctx := context.Background()

watchUserFacingResource := watchResource(t, ctx, c.Dynamic, config.Namespace, config.ConsumerResourceGVR)
out := make(chan Event)

watchUserFacingResource := watchResource(t, ctx, c.Dynamic, config.Namespace, config.ConsumerResourceGVR, out)
t.Cleanup(watchUserFacingResource.Stop)

watchConsumerGroups := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumergroups"))
watchConsumerGroups := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumergroups"), out)
t.Cleanup(watchConsumerGroups.Stop)

watchConsumer := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumers"))
watchConsumer := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumers"), out)
t.Cleanup(watchConsumer.Stop)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

for e := range out {
bytes, _ := json.MarshalIndent(e, "", " ")
t.Logf("Resource %q changed:\n%s\n\n", e.GVR.String(), string(bytes))
}
}()

jobPollError := wait.Poll(pollInterval, pollTimeout, func() (done bool, err error) {
job, err := c.Kube.BatchV1().Jobs(config.Namespace).Get(ctx, app, metav1.GetOptions{})
assert.Nil(t, err)

return isJobSucceeded(job)
})

watchConsumer.Stop()
watchConsumerGroups.Stop()
watchUserFacingResource.Stop()
close(out)
wg.Wait()

pkgtesting.LogJobOutput(t, ctx, c.Kube, config.Namespace, app)

if jobPollError != nil {
Expand Down Expand Up @@ -215,7 +240,7 @@ func getKafkaSubscriptionConsumerGroup(ctx context.Context, c dynamic.Interface,
}
}

func watchResource(t *testing.T, ctx context.Context, dynamic dynamic.Interface, ns string, gvr schema.GroupVersionResource) watch.Interface {
func watchResource(t *testing.T, ctx context.Context, dynamic dynamic.Interface, ns string, gvr schema.GroupVersionResource, out chan<- Event) watch.Interface {

w, err := dynamic.Resource(gvr).
Namespace(ns).
Expand All @@ -226,8 +251,10 @@ func watchResource(t *testing.T, ctx context.Context, dynamic dynamic.Interface,

go func() {
for e := range w.ResultChan() {
bytes, _ := json.MarshalIndent(e, "", " ")
t.Logf("Resource %q changed:\n%s\n\n", gvr.String(), string(bytes))
out <- Event{
GVR: gvr,
Event: e,
}
}
}()

Expand Down

0 comments on commit 5a243a5

Please sign in to comment.