Skip to content

Commit

Permalink
Share Kubernetes watches between Typha Syncers.
Browse files Browse the repository at this point in the history
Introduce a pool of watcher syncers, reuse existing one if
if has exact same configuration.
  • Loading branch information
fasaxc committed Jan 2, 2025
1 parent b88f571 commit a3cfbc0
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 15 deletions.
6 changes: 5 additions & 1 deletion libcalico-go/lib/backend/syncersv1/bgpsyncer/bgpsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
// KDD. An optional node name may be supplied. If set, the syncer only watches
// the specified node rather than all nodes.
func New(client api.Client, callbacks api.SyncerCallbacks, node string, cfg apiconfig.CalicoAPIConfigSpec) api.Syncer {
return NewFromProvider(watchersyncer.NewWatcherCacheFactory(client), callbacks, node, cfg)
}

func NewFromProvider(watcherCacheProvider watchersyncer.WatcherCacheProvider, callbacks api.SyncerCallbacks, node string, cfg apiconfig.CalicoAPIConfigSpec) api.Syncer {
// Create ResourceTypes required for BGP.
resourceTypes := []watchersyncer.ResourceType{
{
Expand Down Expand Up @@ -57,5 +61,5 @@ func New(client api.Client, callbacks api.SyncerCallbacks, node string, cfg apic
})
}

return watchersyncer.New(client, resourceTypes, callbacks)
return watchersyncer.NewFromProvider(watcherCacheProvider, resourceTypes, callbacks)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

// New creates a new Felix v1 Syncer.
func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks, isLeader bool) api.Syncer {
return NewFromProvider(watchersyncer.NewWatcherCacheFactory(client), cfg, callbacks, isLeader)
}

func NewFromProvider(watcherCacheProvider watchersyncer.WatcherCacheProvider, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.SyncerCallbacks, isLeader bool) api.Syncer {
// Felix always needs ClusterInformation and FelixConfiguration resources.
resourceTypes := []watchersyncer.ResourceType{
{
Expand Down Expand Up @@ -118,8 +122,8 @@ func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.Syn
resourceTypes = append(resourceTypes, additionalTypes...)
}

return watchersyncer.New(
client,
return watchersyncer.NewFromProvider(
watcherCacheProvider,
resourceTypes,
callbacks,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ import (

// New creates a new CalicoNodeStatus v1 Syncer.
func New(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
return NewFromProvider(watchersyncer.NewWatcherCacheFactory(client), callbacks)
}

func NewFromProvider(watcherCacheProvider watchersyncer.WatcherCacheProvider, callbacks api.SyncerCallbacks) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindCalicoNodeStatus},
},
}

return watchersyncer.New(client, resourceTypes, callbacks)
return watchersyncer.NewFromProvider(watcherCacheProvider, resourceTypes, callbacks)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
// New creates a new tunnel IP allocation v1 Syncer. An optional node name may be supplied. If set, the syncer only
// watches the specified node rather than all nodes.
func New(client api.Client, callbacks api.SyncerCallbacks, node string) api.Syncer {
return NewFromProvider(watchersyncer.NewWatcherCacheFactory(client), callbacks, node)
}

func NewFromProvider(watcherCacheProvider watchersyncer.WatcherCacheProvider, callbacks api.SyncerCallbacks, node string) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindIPPool},
Expand All @@ -37,5 +41,5 @@ func New(client api.Client, callbacks api.SyncerCallbacks, node string) api.Sync
},
}

return watchersyncer.New(client, resourceTypes, callbacks)
return watchersyncer.NewFromProvider(watcherCacheProvider, resourceTypes, callbacks)
}
33 changes: 33 additions & 0 deletions libcalico-go/lib/backend/watchersyncer/watchercache_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2024 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package watchersyncer

import (
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
)

// WatcherCacheFactory is the simplest WatcherCacheProvider implementation.
// It creates a new WatcherCache for each call to WatcherCache.
type WatcherCacheFactory struct {
client api.Client
}

func (w WatcherCacheFactory) WatcherCache(resourceType ResourceType, results chan interface{}) WatcherCacheIface {
return newWatcherCache(w.client, resourceType, results)
}

func NewWatcherCacheFactory(client api.Client) WatcherCacheProvider {
return &WatcherCacheFactory{client: client}
}
136 changes: 136 additions & 0 deletions libcalico-go/lib/backend/watchersyncer/watchercache_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2024 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package watchersyncer

import (
"reflect"
"sync"

"github.com/sirupsen/logrus"
"golang.org/x/net/context"

"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
)

type resourceTypeToCacheMapping struct {
resourceType ResourceType

realWatcherCache *watcherCache
fanout *watcherCacheFanout
}

// WatcherCachePool is the simplest WatcherCacheProvider implementation.
// It creates a new WatcherCache for each call to WatcherCache.
type WatcherCachePool struct {
client api.Client

caches []*resourceTypeToCacheMapping
}

func NewWatcherCachePool(client api.Client) WatcherCacheProvider {
return &WatcherCachePool{
client: client,
}
}

func (w *WatcherCachePool) WatcherCache(resourceType ResourceType, results chan interface{}) WatcherCacheIface {
mapping := w.getOrCreateCache(resourceType)
mapping.fanout.addOutput(results)
return mapping.fanout
}

func (w *WatcherCachePool) getOrCreateCache(resourceType ResourceType) *resourceTypeToCacheMapping {
// ResourceType may not be comparable, so we need to do a scan...
for _, mapping := range w.caches {
if reflect.DeepEqual(mapping.resourceType, resourceType) {
return mapping
}
}
results := make(chan interface{})
cache := newWatcherCache(w.client, resourceType, results)
fo := &watcherCacheFanout{
resourceType: resourceType,
upstreamCache: cache,
input: results,
}
w.caches = append(w.caches, &resourceTypeToCacheMapping{
resourceType: resourceType,
realWatcherCache: cache,
fanout: fo,
})
return w.caches[len(w.caches)-1]
}

type watcherCacheFanout struct {
resourceType ResourceType
input <-chan any

lock sync.Mutex
wg sync.WaitGroup
outputs []chan<- any
started bool
upstreamCache *watcherCache
}

func (f *watcherCacheFanout) run(ctx context.Context) {
f.lock.Lock()
if f.started {
<-ctx.Done()
return // FIXME should we make intermediate object to avoid this?
}
f.started = true
f.lock.Unlock()

if len(f.outputs) > 1 {
logrus.WithFields(logrus.Fields{
"outputs": len(f.outputs),
"resourceType": f.resourceType,
}).Info("Sharing watcher cache for resource type.")
} else {
logrus.WithFields(logrus.Fields{
"resourceType": f.resourceType,
}).Info("Not sharing watcher cache for resource type.")
}

// FIXME we can be called multiple times so we don't want to stop for
// the individual context.
go f.upstreamCache.run(context.TODO())
go f.loopFanningOut(context.TODO())

<-ctx.Done()
}

func (f *watcherCacheFanout) addOutput(output chan<- any) {
f.lock.Lock()
defer f.lock.Unlock()

if f.started {
logrus.Panic("Cannot add output after starting fanout.")
}
f.outputs = append(f.outputs, output)
}

func (f *watcherCacheFanout) loopFanningOut(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case result := <-f.input:
for _, output := range f.outputs {
output <- result
}
}
}
}
20 changes: 16 additions & 4 deletions libcalico-go/lib/backend/watchersyncer/watchersyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,35 @@ type SyncerUpdateProcessor interface {
OnSyncerStarting()
}

type WatcherCacheProvider interface {
WatcherCache(resourceType ResourceType, results chan interface{}) WatcherCacheIface
}

// New creates a new multiple Watcher-backed api.Syncer.
func New(client api.Client, resourceTypes []ResourceType, callbacks api.SyncerCallbacks) api.Syncer {
return NewFromProvider(NewWatcherCacheFactory(client), resourceTypes, callbacks)
}

func NewFromProvider(watcherCacheProvider WatcherCacheProvider, resourceTypes []ResourceType, callbacks api.SyncerCallbacks) api.Syncer {
rs := &watcherSyncer{
watcherCaches: make([]*watcherCache, len(resourceTypes)),
watcherCaches: make([]WatcherCacheIface, len(resourceTypes)),
results: make(chan interface{}, 2000),
callbacks: callbacks,
}
for i, r := range resourceTypes {
rs.watcherCaches[i] = newWatcherCache(client, r, rs.results)
rs.watcherCaches[i] = watcherCacheProvider.WatcherCache(r, rs.results)
}
return rs
}

type WatcherCacheIface interface {
run(ctx context.Context)
}

// watcherSyncer implements the api.Syncer interface.
type watcherSyncer struct {
status api.SyncStatus
watcherCaches []*watcherCache
watcherCaches []WatcherCacheIface
results chan interface{}
numSynced int
callbacks api.SyncerCallbacks
Expand Down Expand Up @@ -139,7 +151,7 @@ func (ws *watcherSyncer) run(ctx context.Context) {
ws.sendStatusUpdate(api.WaitForDatastore)
for _, wc := range ws.watcherCaches {
// no need for ws.wgwc.Add(1), been set already
go func(wc *watcherCache) {
go func(wc WatcherCacheIface) {
defer ws.wgwc.Done()
wc.run(ctx)
log.Debug("Watcher cache run completed")
Expand Down
19 changes: 13 additions & 6 deletions typha/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/projectcalico/calico/libcalico-go/lib/backend/syncersv1/felixsyncer"
"github.com/projectcalico/calico/libcalico-go/lib/backend/syncersv1/nodestatussyncer"
"github.com/projectcalico/calico/libcalico-go/lib/backend/syncersv1/tunnelipsyncer"
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
"github.com/projectcalico/calico/libcalico-go/lib/clientv3"
"github.com/projectcalico/calico/libcalico-go/lib/debugserver"
"github.com/projectcalico/calico/libcalico-go/lib/health"
Expand Down Expand Up @@ -118,7 +119,12 @@ func New() *TyphaDaemon {
if err != nil {
return nil, err
}
return ClientV3Shim{client.(RealClientV3), config}, nil
realClient := client.(RealClientV3)
return ClientV3Shim{
RealClientV3: realClient,
config: config,
watcherCacheProvider: watchersyncer.NewWatcherCachePool(realClient.Backend()),
}, nil
},
ConfigureEarlyLogging: logutils.ConfigureEarlyLogging,
ConfigureLogging: logutils.ConfigureLogging,
Expand Down Expand Up @@ -490,23 +496,24 @@ func (t *TyphaDaemon) WaitAndShutDown(cxt context.Context) {
// ClientV3Shim wraps a real client, allowing its syncer to be mocked.
type ClientV3Shim struct {
RealClientV3
config apiconfig.CalicoAPIConfig
config apiconfig.CalicoAPIConfig
watcherCacheProvider watchersyncer.WatcherCacheProvider
}

func (s ClientV3Shim) FelixSyncerByIface(callbacks bapi.SyncerCallbacks) bapi.Syncer {
return felixsyncer.New(s.Backend(), s.config.Spec, callbacks, true)
return felixsyncer.NewFromProvider(s.watcherCacheProvider, s.config.Spec, callbacks, true)
}

func (s ClientV3Shim) BGPSyncerByIface(callbacks bapi.SyncerCallbacks) bapi.Syncer {
return bgpsyncer.New(s.Backend(), callbacks, "", s.config.Spec)
return bgpsyncer.NewFromProvider(s.watcherCacheProvider, callbacks, "", s.config.Spec)
}

func (s ClientV3Shim) TunnelIPAllocationSyncerByIface(callbacks bapi.SyncerCallbacks) bapi.Syncer {
return tunnelipsyncer.New(s.Backend(), callbacks, "")
return tunnelipsyncer.NewFromProvider(s.watcherCacheProvider, callbacks, "")
}

func (s ClientV3Shim) NodeStatusSyncerByIface(callbacks bapi.SyncerCallbacks) bapi.Syncer {
return nodestatussyncer.New(s.Backend(), callbacks)
return nodestatussyncer.NewFromProvider(s.watcherCacheProvider, callbacks)
}

// DatastoreClient is our interface to the datastore, used for mocking in the UTs.
Expand Down

0 comments on commit a3cfbc0

Please sign in to comment.