Skip to content

Commit

Permalink
fix: retry watch when not all clusters abnormal
Browse files Browse the repository at this point in the history
  • Loading branch information
Poor12 committed Nov 22, 2024
1 parent ab06029 commit 26c7c59
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions pkg/registry/hpaaggregator/aggregation/forward/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
var lock sync.Mutex
watchClusters := sets.Set[string]{}
proxyCh := make(chan watch.Event)
recoveryCh := make(chan string)

Check warning on line 154 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L154

Added line #L154 was not covered by tests
proxyWatcher := watch.NewProxyWatcher(proxyCh)
clusterWatchers := make(map[string]watch.Interface, len(clusters))

Check warning on line 156 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L156

Added line #L156 was not covered by tests
for i := range clusters {
client, exist := p.federatedInformerManager.GetClusterKubeClient(clusters[i].Name)
if !exist {
Expand All @@ -164,25 +166,32 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
if err != nil {
continue
}
clusterWatchers[clusters[i].Name] = watcher

Check warning on line 169 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L169

Added line #L169 was not covered by tests
watchClusters.Insert(clusters[i].Name)
go func(cluster string) {
defer watcher.Stop()
for {
clusterWatcher := clusterWatchers[cluster]

Check warning on line 173 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L173

Added line #L173 was not covered by tests
select {
case <-proxyWatcher.StopChan():
clusterWatcher.Stop()

Check warning on line 176 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L176

Added line #L176 was not covered by tests
return
case event, ok := <-watcher.ResultChan():
case event, ok := <-clusterWatcher.ResultChan():

Check warning on line 178 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L178

Added line #L178 was not covered by tests
if !ok {
klog.Infof("closed %s channel, selector %s", cluster, label.String())
clusterWatcher.Stop()

Check warning on line 181 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L180-L181

Added lines #L180 - L181 were not covered by tests
lock.Lock()
defer lock.Unlock()

watchClusters.Delete(cluster)
if watchClusters.Len() == 0 {
close(proxyCh)
close(recoveryCh)
return

Check warning on line 187 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L186-L187

Added lines #L186 - L187 were not covered by tests
}
return
recoveryCh <- cluster
lock.Unlock()
continue

Check warning on line 191 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L189-L191

Added lines #L189 - L191 were not covered by tests
}
if pod, ok := event.Object.(*corev1.Pod); ok {
klog.Infof("pod event name: %v %v", pod.Name, event.Type)

Check warning on line 194 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L194

Added line #L194 was not covered by tests
clusterobject.MakePodUnique(pod, cluster)
event.Object = pod
}
Expand All @@ -191,6 +200,28 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
}
}(clusters[i].Name)
}

go func() {
for cluster := range recoveryCh {
client, exist := p.federatedInformerManager.GetClusterKubeClient(cluster)
if !exist {
continue

Check warning on line 208 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L204-L208

Added lines #L204 - L208 were not covered by tests
}
watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: label.String(),
FieldSelector: options.FieldSelector.String(),
})
if err != nil {
continue

Check warning on line 215 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L210-L215

Added lines #L210 - L215 were not covered by tests
}

lock.Lock()
clusterWatchers[cluster] = watcher
watchClusters.Insert(cluster)
lock.Unlock()

Check warning on line 221 in pkg/registry/hpaaggregator/aggregation/forward/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/registry/hpaaggregator/aggregation/forward/pod.go#L218-L221

Added lines #L218 - L221 were not covered by tests
}
}()

return proxyWatcher, nil
}

Expand Down

0 comments on commit 26c7c59

Please sign in to comment.