diff --git a/pkg/registry/hpaaggregator/aggregation/forward/pod.go b/pkg/registry/hpaaggregator/aggregation/forward/pod.go index 683dc599..e331a9fe 100644 --- a/pkg/registry/hpaaggregator/aggregation/forward/pod.go +++ b/pkg/registry/hpaaggregator/aggregation/forward/pod.go @@ -32,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/handlers" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -40,9 +39,11 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/utils/pointer" "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" ) type PodHandler interface { @@ -141,15 +142,22 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp namespace := genericapirequest.NamespaceValue(ctx) + ctx, logger := logging.InjectLoggerValues( + ctx, + "label_selector", label, + "field_selector", options.FieldSelector, + "namespace", namespace, + ) + clusters, err := p.federatedInformerManager.GetReadyClusters() if err != nil { - klog.ErrorS(err, "Failed watching pods", "labelSelector", label, "namespace", klog.KRef("", namespace)) + logger.Error(err, "Failed to get ready clusters") return nil, fmt.Errorf("failed watching pods: %w", err) } // TODO: support cluster addition and deletion during the watch var lock sync.Mutex - watchClusters := sets.Set[string]{} + isProxyChClosed := false proxyCh := make(chan watch.Event) proxyWatcher := watch.NewProxyWatcher(proxyCh) for i := range clusters { @@ -158,15 +166,19 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp continue } watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ - LabelSelector: label.String(), - FieldSelector: options.FieldSelector.String(), + LabelSelector: label.String(), + FieldSelector: options.FieldSelector.String(), + TimeoutSeconds: pointer.Int64(1200), }) if err != nil { + logger.Error(err, "Failed watching pods") continue } - watchClusters.Insert(clusters[i].Name) go func(cluster string) { - defer watcher.Stop() + defer func() { + logger.WithValues("cluster", cluster).Info("Stopped cluster watcher") + watcher.Stop() + }() for { select { case <-proxyWatcher.StopChan(): @@ -176,9 +188,10 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp lock.Lock() defer lock.Unlock() - watchClusters.Delete(cluster) - if watchClusters.Len() == 0 { + if !isProxyChClosed { close(proxyCh) + isProxyChClosed = true + logger.WithValues("cluster", cluster).Info("Closed proxy watcher channel") } return }