Skip to content

Commit

Permalink
Add the track of the completion of the topology refresh cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
thachlp committed Jan 2, 2025
1 parent 83a470e commit 2f38c3c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,33 @@ protected void activateTopologyRefreshIfNeeded() {
/**
* Suspend (cancel) periodic topology refresh.
*/
public void suspendTopologyRefresh() {
public CompletableFuture<Void> suspendTopologyRefresh() {
CompletableFuture<Void> completionFuture = new CompletableFuture<>();

if (clusterTopologyRefreshActivated.compareAndSet(true, false)) {

ScheduledFuture<?> scheduledFuture = clusterTopologyRefreshFuture.get();

try {
scheduledFuture.cancel(false);
clusterTopologyRefreshFuture.set(null);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
clusterTopologyRefreshFuture.set(null);
}
completionFuture.complete(null);
} catch (Exception e) {
logger.debug("Could not cancel Cluster topology refresh", e);
completionFuture.completeExceptionally(e);
}
} else {
completionFuture.complete(null);
}

return completionFuture;
}

public boolean isTopologyRefreshInProgress() {
return clusterTopologyRefreshTask.get();
}

public CompletableFuture<Void> getTopologyRefreshCompletionFuture() {
return clusterTopologyRefreshTask.getCompletionFuture();
}

@Override
public void run() {

Expand Down Expand Up @@ -322,16 +326,10 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements

private final Supplier<CompletionStage<?>> reloadTopologyAsync;

private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();

ClusterTopologyRefreshTask(Supplier<CompletionStage<?>> reloadTopologyAsync) {
this.reloadTopologyAsync = reloadTopologyAsync;
}

public CompletableFuture<Void> getCompletionFuture() {
return completionFuture;
}

public void run() {

if (compareAndSet(false, true)) {
Expand All @@ -354,16 +352,12 @@ void doRun() {

if (throwable != null) {
logger.warn("Cannot refresh Redis Cluster topology", throwable);
completionFuture.completeExceptionally(throwable);
} else {
completionFuture.complete(null);
}

set(false);
});
} catch (Exception e) {
logger.warn("Cannot refresh Redis Cluster topology", e);
completionFuture.completeExceptionally(e);
}
}

Expand Down
14 changes: 3 additions & 11 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,8 @@ public CompletionStage<Void> refreshPartitionsAsync() {
*
* @since 6.3
*/
public void suspendTopologyRefresh() {
topologyRefreshScheduler.suspendTopologyRefresh();
public CompletableFuture<Void> suspendTopologyRefresh() {
return topologyRefreshScheduler.suspendTopologyRefresh();
}

/**
Expand Down Expand Up @@ -1151,15 +1151,7 @@ public void setPartitions(Partitions partitions) {
@Override
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {

suspendTopologyRefresh();

CompletableFuture<Void> topologyRefreshFuture = topologyRefreshScheduler.getTopologyRefreshCompletionFuture();

return topologyRefreshFuture.thenCompose(ignore -> super.shutdownAsync(quietPeriod, timeout, timeUnit))
.exceptionally(ex -> {
System.err.println("Error during topology refresh or shutdown: " + ex.getMessage());
return null;
});
return suspendTopologyRefresh().thenCompose(voidResult -> super.shutdownAsync(quietPeriod, timeout, timeUnit));
}

// -------------------------------------------------------------------------
Expand Down

0 comments on commit 2f38c3c

Please sign in to comment.