Skip to content

Commit

Permalink
Merge pull request kubewharf#341 from wy-lucky/main
Browse files Browse the repository at this point in the history
fix: close host cluster proxy watch when member clustr watch closed
  • Loading branch information
mrlihanbo authored Nov 27, 2024
2 parents ab06029 + 2234da6 commit eea8b07
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions pkg/registry/hpaaggregator/aggregation/forward/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

"github.com/kubewharf/kubeadmiral/pkg/util/clusterobject"
"github.com/kubewharf/kubeadmiral/pkg/util/informermanager"
"github.com/kubewharf/kubeadmiral/pkg/util/logging"
)

type PodHandler interface {
Expand Down Expand Up @@ -141,15 +142,22 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp

namespace := genericapirequest.NamespaceValue(ctx)

ctx, logger := logging.InjectLoggerValues(
ctx,
"label_selector", label,
"field_selector", options.FieldSelector,
"namespace", namespace,
)

clusters, err := p.federatedInformerManager.GetReadyClusters()
if err != nil {
klog.ErrorS(err, "Failed watching pods", "labelSelector", label, "namespace", klog.KRef("", namespace))
logger.Error(err, "Failed to get ready clusters")
return nil, fmt.Errorf("failed watching pods: %w", err)
}

// TODO: support cluster addition and deletion during the watch
var lock sync.Mutex
watchClusters := sets.Set[string]{}
isProxyChClosed := false
proxyCh := make(chan watch.Event)
proxyWatcher := watch.NewProxyWatcher(proxyCh)
for i := range clusters {
Expand All @@ -158,15 +166,19 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
continue
}
watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: label.String(),
FieldSelector: options.FieldSelector.String(),
LabelSelector: label.String(),
FieldSelector: options.FieldSelector.String(),
TimeoutSeconds: pointer.Int64(1200),
})
if err != nil {
logger.Error(err, "Failed watching pods")
continue
}
watchClusters.Insert(clusters[i].Name)
go func(cluster string) {
defer watcher.Stop()
defer func() {
logger.WithValues("cluster", cluster).Info("Stopped cluster watcher")
watcher.Stop()
}()
for {
select {
case <-proxyWatcher.StopChan():
Expand All @@ -176,9 +188,10 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
lock.Lock()
defer lock.Unlock()

watchClusters.Delete(cluster)
if watchClusters.Len() == 0 {
if !isProxyChClosed {
close(proxyCh)
isProxyChClosed = true
logger.WithValues("cluster", cluster).Info("Closed proxy watcher channel")
}
return
}
Expand Down

0 comments on commit eea8b07

Please sign in to comment.