diff --git a/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java b/src/main/java/io/lettuce/core/context/AutoBatchFlushEndPointContext.java similarity index 61% rename from src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java rename to src/main/java/io/lettuce/core/context/AutoBatchFlushEndPointContext.java index 1939c62a55..32dc7fd241 100644 --- a/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java +++ b/src/main/java/io/lettuce/core/context/AutoBatchFlushEndPointContext.java @@ -15,9 +15,9 @@ /** * @author chenxiaofan */ -public class BatchFlushEndPointContext { +public class AutoBatchFlushEndPointContext { - private static final InternalLogger logger = InternalLoggerFactory.getInstance(BatchFlushEndPointContext.class); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutoBatchFlushEndPointContext.class); public static class HasOngoingSendLoop { @@ -51,14 +51,14 @@ public void exit() { } - BatchFlushEndPointContext() { + AutoBatchFlushEndPointContext() { } /** - * Tasks that failed to send (probably due to connection errors) + * Commands that failed to send (probably due to connection errors) */ @Nullable - Deque> retryableFailedToSendTasks = null; + Deque> retryableFailedToSendCommands = null; Throwable firstDiscontinueReason = null; @@ -66,11 +66,11 @@ public Throwable getFirstDiscontinueReason() { return firstDiscontinueReason; } - private int flyingTaskNum; + private int flyingCmdNum; @SuppressWarnings("unused") - public int getFlyingTaskNum() { - return flyingTaskNum; + public int getFlyingCmdNum() { + return flyingCmdNum; } private int total = 0; @@ -83,47 +83,48 @@ public int getTotal() { public void add(int n) { this.total += n; - this.flyingTaskNum += n; + this.flyingCmdNum += n; } - public @Nullable Deque> getAndClearRetryableFailedToSendTasks() { - final Deque> old = this.retryableFailedToSendTasks; - // don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendTask() afterwards - this.retryableFailedToSendTasks = UnmodifiableDeque.emptyDeque(); + public @Nullable Deque> getAndClearRetryableFailedToSendCommands() { + final Deque> old = this.retryableFailedToSendCommands; + // don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendCommand() + // afterwards + this.retryableFailedToSendCommands = UnmodifiableDeque.emptyDeque(); return old; } public void done(int n) { - this.flyingTaskNum -= n; + this.flyingCmdNum -= n; } public boolean isDone() { - if (this.flyingTaskNum < 0) { - logger.error("[unexpected] flyingTaskNum < 0, flyingTaskNum: {}, total: {}", this.flyingTaskNum, this.total); + if (this.flyingCmdNum < 0) { + logger.error("[unexpected] flyingCmdNum < 0, flyingCmdNum: {}, total: {}", this.flyingCmdNum, this.total); return true; } - return this.flyingTaskNum == 0; + return this.flyingCmdNum == 0; } - public boolean hasRetryableFailedToSendTasks() { - return retryableFailedToSendTasks != null; + public boolean hasRetryableFailedToSendCommands() { + return retryableFailedToSendCommands != null; } /** - * @param retryableTask retryable task + * @param retryableCommand retryable command * @param cause fail reason - * @return true if this is the first retryable failed task + * @return true if this is the first retryable failed command */ - public boolean addRetryableFailedToSendTask(RedisCommand retryableTask, @Nonnull Throwable cause) { - if (retryableFailedToSendTasks == null) { - retryableFailedToSendTasks = new ArrayDeque<>(); - retryableFailedToSendTasks.add(retryableTask); + public boolean addRetryableFailedToSendCommand(RedisCommand retryableCommand, @Nonnull Throwable cause) { + if (retryableFailedToSendCommands == null) { + retryableFailedToSendCommands = new ArrayDeque<>(); + retryableFailedToSendCommands.add(retryableCommand); firstDiscontinueReason = cause; return true; } - retryableFailedToSendTasks.add(retryableTask); + retryableFailedToSendCommands.add(retryableCommand); return false; } diff --git a/src/main/java/io/lettuce/core/context/ConnectionContext.java b/src/main/java/io/lettuce/core/context/ConnectionContext.java index 796674eafe..74ce4e9534 100644 --- a/src/main/java/io/lettuce/core/context/ConnectionContext.java +++ b/src/main/java/io/lettuce/core/context/ConnectionContext.java @@ -67,11 +67,11 @@ public boolean isConnected() { public final State initialState; - public final BatchFlushEndPointContext batchFlushEndPointContext; + public final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext; public ConnectionContext(State initialState) { this.initialState = initialState; - this.batchFlushEndPointContext = new BatchFlushEndPointContext(); + this.autoBatchFlushEndPointContext = new AutoBatchFlushEndPointContext(); } /* below fields must be accessed by the event loop thread only */ @@ -92,10 +92,6 @@ public boolean isChannelInactiveEventFired() { private boolean channelQuiescent = false; - public boolean isChannelQuiescent() { - return channelQuiescent; - } - public boolean setChannelQuiescentOnce() { if (channelQuiescent) { return false; diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 8f5ee0f8fa..de6cf0f0e1 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -96,7 +96,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private final Endpoint endpoint; - private final boolean supportsBatchFlush; + private final boolean supportsAutoBatchFlush; private final ArrayDeque> stack = new ArrayDeque<>(); @@ -154,7 +154,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc this.clientOptions = clientOptions; this.clientResources = clientResources; this.endpoint = endpoint; - this.supportsBatchFlush = endpoint instanceof AutoBatchFlushEndpoint; + this.supportsAutoBatchFlush = endpoint instanceof AutoBatchFlushEndpoint; this.commandLatencyRecorder = clientResources.commandLatencyRecorder(); this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled(); this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE; @@ -377,9 +377,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { setState(LifecycleState.DEACTIVATING); endpoint.notifyChannelInactive(ctx.channel()); - Deque> batchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque(); - if (supportsBatchFlush) { - batchFlushRetryableDrainQueuedCommands = drainStack(); + Deque> autoBatchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque(); + if (supportsAutoBatchFlush) { + autoBatchFlushRetryableDrainQueuedCommands = drainStack(); } else { endpoint.notifyDrainQueuedCommands(this); } @@ -397,10 +397,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - if (supportsBatchFlush) { + if (supportsAutoBatchFlush) { // Needs decision of watchdog ((AutoBatchFlushEndpoint) endpoint).notifyChannelInactiveAfterWatchdogDecision(ctx.channel(), - batchFlushRetryableDrainQueuedCommands); + autoBatchFlushRetryableDrainQueuedCommands); } } diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index dc0f222084..82bb679bb6 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -84,7 +84,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private final String epid; - private final boolean useBatchFlushEndpoint; + private final boolean useAutoBatchFlushEndpoint; private final Endpoint endpoint; @@ -149,7 +149,7 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI); this.epid = endpoint.getId(); this.endpoint = endpoint; - this.useBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint; + this.useAutoBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint; Mono wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr) .onErrorResume(t -> { @@ -226,7 +226,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } doReconnectOnEndpointQuiescence = this::scheduleReconnect; - if (!useBatchFlushEndpoint) { + if (!useAutoBatchFlushEndpoint) { doReconnectOnEndpointQuiescence.run(); } // otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence @@ -307,7 +307,7 @@ private void notifyEndpointFailedToConnectIfNeeded() { } private void notifyEndpointFailedToConnectIfNeeded(Exception e) { - if (useBatchFlushEndpoint) { + if (useAutoBatchFlushEndpoint) { ((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e); } } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index a8f2494cb6..9075d4ca27 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -41,7 +41,7 @@ import io.lettuce.core.RedisException; import io.lettuce.core.api.push.PushListener; import io.lettuce.core.constant.DummyContextualChannelInstances; -import io.lettuce.core.context.BatchFlushEndPointContext; +import io.lettuce.core.context.AutoBatchFlushEndPointContext; import io.lettuce.core.context.ConnectionContext; import io.lettuce.core.datastructure.queue.offerfirst.UnboundedMpscOfferFirstQueue; import io.lettuce.core.datastructure.queue.offerfirst.impl.JcToolsUnboundedMpscOfferFirstQueue; @@ -612,7 +612,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { return; } - if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { + if (chan.context.autoBatchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get): // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls // Avg latency: 3.2956217278663s @@ -633,19 +633,20 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { private void loopSend(final ContextualChannel chan, boolean entered) { final ConnectionContext connectionContext = chan.context; - final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext; - if (connectionContext.isChannelInactiveEventFired() || batchFlushEndPointContext.hasRetryableFailedToSendTasks()) { + final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = connectionContext.autoBatchFlushEndPointContext; + if (connectionContext.isChannelInactiveEventFired() + || autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) { return; } LettuceAssert.assertState(channel == chan, "unexpected: channel not match but closeStatus == null"); - loopSend0(batchFlushEndPointContext, chan, writeSpinCount, entered); + loopSend0(autoBatchFlushEndPointContext, chan, writeSpinCount, entered); } - private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan, + private void loopSend0(final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext, final ContextualChannel chan, int remainingSpinnCount, final boolean entered) { do { - final int count = pollBatch(batchFlushEndPointContext, chan); + final int count = pollBatch(autoBatchFlushEndPointContext, chan); if (count < 0) { return; } @@ -664,15 +665,15 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext if (entered) { // The send loop will be triggered later when a new task is added, // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call. - batchFlushEndPointContext.hasOngoingSendLoop.exit(); + autoBatchFlushEndPointContext.hasOngoingSendLoop.exit(); // // Guarantee thread-safety: no dangling tasks in the queue. - loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false); + loopSend0(autoBatchFlushEndPointContext, chan, remainingSpinnCount, false); // chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100, // TimeUnit.NANOSECONDS); } } - private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext, ContextualChannel chan) { + private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext, ContextualChannel chan) { int count = 0; while (count < batchSize) { final Object o = this.taskQueue.poll(); @@ -695,10 +696,10 @@ private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext, } if (count > 0) { - batchFlushEndPointContext.add(count); + autoBatchFlushEndPointContext.add(count); channelFlush(chan); - if (batchFlushEndPointContext.hasRetryableFailedToSendTasks()) { + if (autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) { // Wait for onConnectionClose event() return -1; } @@ -711,12 +712,12 @@ private void trySetEndpointQuiescence(ContextualChannel chan) { final ConnectionContext connectionContext = chan.context; final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus(); - final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext; - if (batchFlushEndPointContext.isDone() && closeStatus != null) { + final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = connectionContext.autoBatchFlushEndPointContext; + if (autoBatchFlushEndPointContext.isDone() && closeStatus != null) { if (closeStatus.isWillReconnect()) { - onWillReconnect(closeStatus, batchFlushEndPointContext); + onWillReconnect(closeStatus, autoBatchFlushEndPointContext); } else { - onWontReconnect(closeStatus, batchFlushEndPointContext); + onWontReconnect(closeStatus, autoBatchFlushEndPointContext); } if (chan.context.setChannelQuiescentOnce()) { @@ -744,15 +745,15 @@ private void onEndpointQuiescence() { } private void onWillReconnect(@Nonnull final ConnectionContext.CloseStatus closeStatus, - final BatchFlushEndPointContext batchFlushEndPointContext) { - final @Nullable Deque> retryableFailedToSendTasks = batchFlushEndPointContext - .getAndClearRetryableFailedToSendTasks(); + final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext) { + final @Nullable Deque> retryableFailedToSendTasks = autoBatchFlushEndPointContext + .getAndClearRetryableFailedToSendCommands(); if (retryableFailedToSendTasks != null) { // Save retryable failed tasks logger.info( "[onWillReconnect][{}] compensate {} retryableFailedToSendTasks (write failure) for retrying on reconnecting, first write error: {}", logPrefix(), retryableFailedToSendTasks.size(), - batchFlushEndPointContext.getFirstDiscontinueReason().getMessage()); + autoBatchFlushEndPointContext.getFirstDiscontinueReason().getMessage()); offerFirstAll(retryableFailedToSendTasks); } @@ -773,15 +774,15 @@ private void onWillReconnect(@Nonnull final ConnectionContext.CloseStatus closeS } private void onWontReconnect(@Nonnull final ConnectionContext.CloseStatus closeStatus, - final BatchFlushEndPointContext batchFlushEndPointContext) { + final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext) { // No need to use syncAfterTerminated() since we are already in the event loop. if (isClosed()) { onEndpointClosed(closeStatus.getAndClearRetryablePendingCommands(), - batchFlushEndPointContext.getAndClearRetryableFailedToSendTasks()); + autoBatchFlushEndPointContext.getAndClearRetryableFailedToSendCommands()); } else { fulfillCommands("onConnectionClose called and won't reconnect", it -> it.completeExceptionally(closeStatus.getErr()), closeStatus.getAndClearRetryablePendingCommands(), - batchFlushEndPointContext.getAndClearRetryableFailedToSendTasks()); + autoBatchFlushEndPointContext.getAndClearRetryableFailedToSendCommands()); } } @@ -978,7 +979,7 @@ protected WrittenToChannel newObject(Recycler.Handle handle) { private DefaultAutoBatchFlushEndpoint endpoint; - private RedisCommand command; + private RedisCommand cmd; private ContextualChannel chan; @@ -998,22 +999,22 @@ static WrittenToChannel newInstance(DefaultAutoBatchFlushEndpoint endpoint, Cont entry.endpoint = endpoint; entry.chan = chan; - entry.command = command; + entry.cmd = command; return entry; } @Override public void operationComplete(Future future) { - final BatchFlushEndPointContext batchFlushEndPointContext = chan.context.batchFlushEndPointContext; + final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext = chan.context.autoBatchFlushEndPointContext; try { QUEUE_SIZE.decrementAndGet(endpoint); - batchFlushEndPointContext.done(1); + autoBatchFlushEndPointContext.done(1); - final Throwable retryableErr = checkSendResult(future, chan, command); - if (retryableErr != null && batchFlushEndPointContext.addRetryableFailedToSendTask(command, retryableErr)) { + final Throwable retryableErr = checkSendResult(future); + if (retryableErr != null && autoBatchFlushEndPointContext.addRetryableFailedToSendCommand(cmd, retryableErr)) { // Close connection on first transient write failure - internalCloseConnectionIfNeeded(chan, retryableErr); + internalCloseConnectionIfNeeded(retryableErr); } endpoint.trySetEndpointQuiescence(chan); @@ -1026,21 +1027,18 @@ public void operationComplete(Future future) { * Check write result. * * @param sendFuture The future to check. - * @param contextualChannel The channel instance associated with the future. - * @param cmd The task. * @return The cause of the failure if is a retryable failed task, otherwise null. */ - private Throwable checkSendResult(Future sendFuture, ContextualChannel contextualChannel, - RedisCommand cmd) { + private Throwable checkSendResult(Future sendFuture) { if (cmd.isDone()) { ExceptionUtils.logUnexpectedDone(logger, endpoint.logPrefix(), cmd); return null; } - final ConnectionContext.CloseStatus closeStatus = contextualChannel.context.getCloseStatus(); + final ConnectionContext.CloseStatus closeStatus = chan.context.getCloseStatus(); if (closeStatus != null) { logger.warn("[checkSendResult][interesting][{}] callback called after onClose() event, close status: {}", - endpoint.logPrefix(), contextualChannel.context.getCloseStatus()); + endpoint.logPrefix(), chan.context.getCloseStatus()); final Throwable err = sendFuture.isSuccess() ? closeStatus.getErr() : sendFuture.cause(); if (!closeStatus.isWillReconnect() || shouldNotRetry(err, cmd)) { cmd.completeExceptionally(err); @@ -1056,7 +1054,7 @@ private Throwable checkSendResult(Future sendFuture, ContextualChannel contex final Throwable cause = sendFuture.cause(); ExceptionUtils.maybeLogSendError(logger, cause); - if (shouldNotRetry(cause, cmd)) { + if (shouldNotRetry(cause)) { cmd.completeExceptionally(cause); return null; } @@ -1064,22 +1062,22 @@ private Throwable checkSendResult(Future sendFuture, ContextualChannel contex return cause; } - private boolean shouldNotRetry(Throwable cause, RedisCommand cmd) { + private boolean shouldNotRetry(Throwable cause) { return endpoint.reliability == Reliability.AT_MOST_ONCE || ActivationCommand.isActivationCommand(cmd) || ExceptionUtils.oneOf(cause, SHOULD_NOT_RETRY_EXCEPTION_TYPES); } - private void internalCloseConnectionIfNeeded(ContextualChannel toCloseChan, Throwable reason) { - if (toCloseChan.context.isChannelInactiveEventFired() || !toCloseChan.isActive()) { + private void internalCloseConnectionIfNeeded(Throwable reason) { + if (chan.context.isChannelInactiveEventFired() || !chan.isActive()) { return; } logger.error( "[internalCloseConnectionIfNeeded][interesting][{}] close the connection due to write error, reason: '{}'", endpoint.logPrefix(), reason.getMessage(), reason); - toCloseChan.eventLoop().schedule(() -> { - if (toCloseChan.isActive()) { - toCloseChan.close(); + chan.eventLoop().schedule(() -> { + if (chan.isActive()) { + chan.close(); } }, 1, TimeUnit.SECONDS); } @@ -1087,7 +1085,7 @@ private void internalCloseConnectionIfNeeded(ContextualChannel toCloseChan, Thro private void recycle() { this.endpoint = null; this.chan = null; - this.command = null; + this.cmd = null; handle.recycle(this); } diff --git a/src/main/java/io/lettuce/core/utils/ExceptionUtils.java b/src/main/java/io/lettuce/core/utils/ExceptionUtils.java index 4072d81b3e..49ed6e548c 100644 --- a/src/main/java/io/lettuce/core/utils/ExceptionUtils.java +++ b/src/main/java/io/lettuce/core/utils/ExceptionUtils.java @@ -34,18 +34,6 @@ public static void maybeLogSendError(InternalLogger logger, Throwable cause) { } } - public static T castTo(Throwable throwable, Class clazz, Function supplier) { - if (clazz.isInstance(throwable)) { - return clazz.cast(throwable); - } - return supplier.apply(throwable); - } - - public static T clearStackTrace(T throwable) { - throwable.setStackTrace(new StackTraceElement[0]); - return throwable; - } - /** * Returns whether the throwable is one of the exception types or one of the cause in the cause chain is one of the * exception types