Skip to content

Commit

Permalink
add failFuturesForChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
lyl2008dsg committed Jan 4, 2025
1 parent 01d776d commit 89a1ceb
Showing 1 changed file with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -36,6 +37,7 @@
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.exception.FrameworkException;
Expand Down Expand Up @@ -91,6 +93,10 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting

protected final Map<Integer, Integer> childToParentMap = new ConcurrentHashMap<>();

// 辅助映射:Channel -> Set<msgId>
protected final ConcurrentHashMap<Channel, Set<Integer>> channelToRequestIds = new ConcurrentHashMap<>();


/**
* When batch sending is enabled, the message will be stored to basketMap
* Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable}
Expand Down Expand Up @@ -181,6 +187,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
}
} else {
Channel channel = clientChannelManager.acquireChannel(serverAddress);
channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId());
return super.sendSync(channel, rpcMessage, timeoutMillis);
}

Expand All @@ -193,6 +200,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti
return null;
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId());
return super.sendSync(channel, rpcMessage, this.getRpcRequestTimeout());
}

Expand All @@ -214,6 +222,7 @@ public void sendAsyncRequest(Channel channel, Object msg) {
childToParentMap.put(msgId, parentId);
}
}
channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId());
}
super.sendAsync(channel, rpcMessage);
}
Expand All @@ -222,6 +231,7 @@ public void sendAsyncRequest(Channel channel, Object msg) {
public void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) {
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE);
Channel channel = clientChannelManager.acquireChannel(serverAddress);
channelToRequestIds.computeIfAbsent(channel, ch -> ConcurrentHashMap.newKeySet()).add(rpcMessage.getId());
super.sendAsync(channel, rpcMsg);
}

Expand Down Expand Up @@ -423,7 +433,9 @@ class ClientHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RpcMessage) {
processMessage(ctx, (RpcMessage)msg);
RpcMessage rpcMessage = (RpcMessage)msg;
processMessage(ctx, rpcMessage);
removeRequestIdFromChannel(ctx.channel(), rpcMessage.getId());
} else {
LOGGER.error("rpcMessage type error");
}
Expand All @@ -448,6 +460,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("channel inactive: {}", ctx.channel());
}
clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));
failFuturesForChannel(ctx.channel(), new FrameworkException("Channel inactive"));
super.channelInactive(ctx);
}

Expand Down Expand Up @@ -489,6 +502,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
if (LOGGER.isInfoEnabled()) {
LOGGER.info("remove exception rm channel:{}", ctx.channel());
}
failFuturesForChannel(ctx.channel(), cause);
super.exceptionCaught(ctx, cause);
}

Expand All @@ -501,4 +515,61 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep
}
}

/**
* 遍历 futures 并标记与指定 Channel 关联的所有 MessageFuture 为失败
*
* @param channel 发生断开或异常的 Channel
* @param cause 失败原因
*/
private void failFuturesForChannel(Channel channel, Throwable cause) {
Set<Integer> requestIds = channelToRequestIds.remove(channel);
if (requestIds != null) {
for (Integer requestId : requestIds) {
MessageFuture future = futures.remove(requestId);
if (future != null) {
future.setResultMessage(cause);
}
}
}
}

/**
* 从 channelToRequestIds 映射中移除指定 Channel 和 requestId 的关联关系。
* 如果该 Channel 不再关联任何 requestId,则从映射中移除该 Channel。
*
* @param channel 发生断开或异常的 Channel
* @param requestId 要移除的 requestId
*/
private void removeRequestIdFromChannel(Channel channel, Integer requestId) {
// 参数 null 检查
if (channel == null) {
if (requestId != null) {
LOGGER.warn("Attempted to remove requestId {} from a null channel.", requestId);
} else {
LOGGER.warn("Attempted to remove a null requestId from a null channel.");
}
return;
}

if (requestId == null) {
LOGGER.warn("Attempted to remove a null requestId from channel {}.", channel);
return;
}

// 使用 computeIfPresent 确保操作的原子性
channelToRequestIds.computeIfPresent(channel, (ch, requestIds) -> {
boolean removed = requestIds.remove(requestId);
if (removed) {
LOGGER.debug("Removed requestId {} from channel {}.", requestId, ch);
} else {
LOGGER.warn("Attempted to remove non-existing requestId {} from channel {}.", requestId, ch);
}

if (requestIds.isEmpty()) {
LOGGER.debug("No more requestIds associated with channel {}. Channel removed from mapping.", ch);
return null; // 返回 null 表示移除该 Channel 的映射
}
return requestIds;
});
}
}

0 comments on commit 89a1ceb

Please sign in to comment.