From 9cf841e6a5989a0bb06bc2e9768ddd0dee939b49 Mon Sep 17 00:00:00 2001 From: lyl2008dsg Date: Wed, 25 Dec 2024 15:36:53 +0800 Subject: [PATCH 1/6] fast fail when channel is null --- .../core/rpc/netty/AbstractNettyRemotingClient.java | 2 +- .../seata/core/rpc/netty/RmNettyClientTest.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index bbbab50faa5..8618c3030b4 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -200,7 +200,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti public void sendAsyncRequest(Channel channel, Object msg) { if (channel == null) { LOGGER.warn("sendAsyncRequest nothing, caused by null channel."); - return; + throw new FrameworkException(new Throwable("throw"), "frameworkException", FrameworkErrorCode.ChannelIsNotWritable); } RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java index 1709246e884..08151d58219 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java @@ -26,6 +26,7 @@ import org.apache.seata.config.ConfigurationCache; import org.apache.seata.core.model.Resource; import org.apache.seata.core.model.ResourceManager; +import org.apache.seata.core.protocol.HeartbeatMessage; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -37,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * Rm RPC client test. @@ -96,4 +98,13 @@ private AtomicBoolean getInitializeStatus(final RmNettyRemotingClient rmNettyRem throw new RuntimeException(ex.getMessage()); } } + + @Test + public void testSendAsyncRequestWithNullChannelLogsWarning() { + RmNettyRemotingClient remotingClient = RmNettyRemotingClient.getInstance(); + Object message = HeartbeatMessage.PING; + assertThrows(FrameworkException.class, () -> { + remotingClient.sendAsyncRequest(null, message); + }); + } } From 01d776dd5ad786e26a02796ccc229f83826f693d Mon Sep 17 00:00:00 2001 From: lyl2008dsg Date: Wed, 25 Dec 2024 15:46:39 +0800 Subject: [PATCH 2/6] add optimize changes --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + 2 files changed, 2 insertions(+) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 4f725b4c96b..73676753a75 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -14,6 +14,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6828](https://github.com/apache/incubator-seata/pull/6828)] spring boot compatible with file.conf and registry.conf - [[#7012](https://github.com/apache/incubator-seata/pull/7012)] When the number of primary keys exceeds 1000, use union to concatenate the SQL +- [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index dc2be119679..7e3d9a5938c 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -14,6 +14,7 @@ - [[#6828](https://github.com/apache/incubator-seata/pull/6828)] seata-spring-boot-starter兼容file.conf和registry.conf - [[#7012](https://github.com/apache/incubator-seata/pull/7012)] 当主键超过1000个时,使用union拼接sql,可以使用索引 +- [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待 ### security: From 89a1ceb4fdc055359e7cbcbd298bb3c4daae4251 Mon Sep 17 00:00:00 2001 From: lyl2008dsg Date: Sat, 4 Jan 2025 19:45:26 +0800 Subject: [PATCH 3/6] add failFuturesForChannel --- .../netty/AbstractNettyRemotingClient.java | 73 ++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 8618c3030b4..257dbc7cab0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -36,6 +37,7 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutorGroup; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; @@ -91,6 +93,10 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting protected final Map childToParentMap = new ConcurrentHashMap<>(); + // 辅助映射:Channel -> Set + protected final ConcurrentHashMap> channelToRequestIds = new ConcurrentHashMap<>(); + + /** * When batch sending is enabled, the message will be stored to basketMap * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} @@ -181,6 +187,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException { } } else { Channel channel = clientChannelManager.acquireChannel(serverAddress); + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); return super.sendSync(channel, rpcMessage, timeoutMillis); } @@ -193,6 +200,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti return null; } RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); return super.sendSync(channel, rpcMessage, this.getRpcRequestTimeout()); } @@ -214,6 +222,7 @@ public void sendAsyncRequest(Channel channel, Object msg) { childToParentMap.put(msgId, parentId); } } + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); } super.sendAsync(channel, rpcMessage); } @@ -222,6 +231,7 @@ public void sendAsyncRequest(Channel channel, Object msg) { public void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) { RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE); Channel channel = clientChannelManager.acquireChannel(serverAddress); + channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId()); super.sendAsync(channel, rpcMsg); } @@ -423,7 +433,9 @@ class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcMessage) { - processMessage(ctx, (RpcMessage)msg); + RpcMessage rpcMessage = (RpcMessage)msg; + processMessage(ctx, rpcMessage); + removeRequestIdFromChannel(ctx.channel(), rpcMessage.getId()); } else { LOGGER.error("rpcMessage type error"); } @@ -448,6 +460,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("channel inactive: {}", ctx.channel()); } clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress())); + failFuturesForChannel(ctx.channel(), new FrameworkException("Channel inactive")); super.channelInactive(ctx); } @@ -489,6 +502,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (LOGGER.isInfoEnabled()) { LOGGER.info("remove exception rm channel:{}", ctx.channel()); } + failFuturesForChannel(ctx.channel(), cause); super.exceptionCaught(ctx, cause); } @@ -501,4 +515,61 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep } } + /** + * 遍历 futures 并标记与指定 Channel 关联的所有 MessageFuture 为失败 + * + * @param channel 发生断开或异常的 Channel + * @param cause 失败原因 + */ + private void failFuturesForChannel(Channel channel, Throwable cause) { + Set requestIds = channelToRequestIds.remove(channel); + if (requestIds != null) { + for (Integer requestId : requestIds) { + MessageFuture future = futures.remove(requestId); + if (future != null) { + future.setResultMessage(cause); + } + } + } + } + + /** + * 从 channelToRequestIds 映射中移除指定 Channel 和 requestId 的关联关系。 + * 如果该 Channel 不再关联任何 requestId,则从映射中移除该 Channel。 + * + * @param channel 发生断开或异常的 Channel + * @param requestId 要移除的 requestId + */ + private void removeRequestIdFromChannel(Channel channel, Integer requestId) { + // 参数 null 检查 + if (channel == null) { + if (requestId != null) { + LOGGER.warn("Attempted to remove requestId {} from a null channel.", requestId); + } else { + LOGGER.warn("Attempted to remove a null requestId from a null channel."); + } + return; + } + + if (requestId == null) { + LOGGER.warn("Attempted to remove a null requestId from channel {}.", channel); + return; + } + + // 使用 computeIfPresent 确保操作的原子性 + channelToRequestIds.computeIfPresent(channel, (ch, requestIds) -> { + boolean removed = requestIds.remove(requestId); + if (removed) { + LOGGER.debug("Removed requestId {} from channel {}.", requestId, ch); + } else { + LOGGER.warn("Attempted to remove non-existing requestId {} from channel {}.", requestId, ch); + } + + if (requestIds.isEmpty()) { + LOGGER.debug("No more requestIds associated with channel {}. Channel removed from mapping.", ch); + return null; // 返回 null 表示移除该 Channel 的映射 + } + return requestIds; + }); + } } From c3c81faf5f15a4bd5c8364e20258e58f82e51e13 Mon Sep 17 00:00:00 2001 From: lyl2008dsg Date: Sat, 4 Jan 2025 20:03:58 +0800 Subject: [PATCH 4/6] add change logs --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + 2 files changed, 2 insertions(+) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index b1afbb58d16..a75a9b71caa 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -18,6 +18,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null - [[#7089](https://github.com/apache/incubator-seata/pull/7089)] support instance registration to the registry center - [[#7093](https://github.com/apache/incubator-seata/pull/7093)] add a test workflow for JDK 21 +- [[#7095](https://github.com/apache/incubator-seata/pull/7095)] support fast fail when channel disconnect after message sent for RemotingClient ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 0cf918bf87a..ebddf90da1b 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -18,6 +18,7 @@ - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待 - [[#7089](https://github.com/apache/incubator-seata/pull/7089)] 新增instance注册到注册中心的接口 - [[#7093](https://github.com/apache/incubator-seata/pull/7093)] 增加jdk21的工作流测试 +- [[#7095](https://github.com/apache/incubator-seata/pull/7095)] 当消息发送后通道中断,快速失败,以便于减少不必要的等待 ### security: From f738f3ed475448784be276cb09089ab7fe676b21 Mon Sep 17 00:00:00 2001 From: lyl2008dsg Date: Sat, 4 Jan 2025 20:08:11 +0800 Subject: [PATCH 5/6] add comment --- .../netty/AbstractNettyRemotingClient.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 257dbc7cab0..f65336ad8a5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -37,7 +37,6 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutorGroup; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; @@ -93,10 +92,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting protected final Map childToParentMap = new ConcurrentHashMap<>(); - // 辅助映射:Channel -> Set + // Channel -> Set protected final ConcurrentHashMap> channelToRequestIds = new ConcurrentHashMap<>(); - /** * When batch sending is enabled, the message will be stored to basketMap * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} @@ -516,10 +514,10 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep } /** - * 遍历 futures 并标记与指定 Channel 关联的所有 MessageFuture 为失败 + * Iterates over futures and marks all MessageFutures associated with the specified Channel as failed. * - * @param channel 发生断开或异常的 Channel - * @param cause 失败原因 + * @param channel The Channel that has been disconnected or encountered an exception. + * @param cause The reason for the failure. */ private void failFuturesForChannel(Channel channel, Throwable cause) { Set requestIds = channelToRequestIds.remove(channel); @@ -534,14 +532,13 @@ private void failFuturesForChannel(Channel channel, Throwable cause) { } /** - * 从 channelToRequestIds 映射中移除指定 Channel 和 requestId 的关联关系。 - * 如果该 Channel 不再关联任何 requestId,则从映射中移除该 Channel。 + * Removes the association between the specified Channel and requestId from the channelToRequestIds mapping. + * If the Channel no longer has any associated requestId, the Channel will be removed from the mapping. * - * @param channel 发生断开或异常的 Channel - * @param requestId 要移除的 requestId + * @param channel The Channel that has been disconnected or encountered an exception. + * @param requestId The requestId to be removed. */ private void removeRequestIdFromChannel(Channel channel, Integer requestId) { - // 参数 null 检查 if (channel == null) { if (requestId != null) { LOGGER.warn("Attempted to remove requestId {} from a null channel.", requestId); @@ -556,7 +553,6 @@ private void removeRequestIdFromChannel(Channel channel, Integer requestId) { return; } - // 使用 computeIfPresent 确保操作的原子性 channelToRequestIds.computeIfPresent(channel, (ch, requestIds) -> { boolean removed = requestIds.remove(requestId); if (removed) { @@ -567,7 +563,7 @@ private void removeRequestIdFromChannel(Channel channel, Integer requestId) { if (requestIds.isEmpty()) { LOGGER.debug("No more requestIds associated with channel {}. Channel removed from mapping.", ch); - return null; // 返回 null 表示移除该 Channel 的映射 + return null; } return requestIds; }); From 4ba162106e1277788890264ceb1680accbd8f769 Mon Sep 17 00:00:00 2001 From: lyl2008dsg Date: Sat, 4 Jan 2025 20:11:01 +0800 Subject: [PATCH 6/6] add ChannelIsNotWritable --- .../seata/core/rpc/netty/AbstractNettyRemotingClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index f65336ad8a5..db671cb4ce2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -458,7 +458,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("channel inactive: {}", ctx.channel()); } clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress())); - failFuturesForChannel(ctx.channel(), new FrameworkException("Channel inactive")); + failFuturesForChannel(ctx.channel(), new FrameworkException(FrameworkErrorCode.ChannelIsNotWritable)); super.channelInactive(ctx); }