Skip to content

Commit

Permalink
optimize: RM TM startup connect server fail fast (#6004)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbxyyx authored Jan 4, 2024
1 parent 6011c68 commit 2363715
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 41 deletions.
1 change: 0 additions & 1 deletion changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ The version is updated as follows:
- [[#6002](https://github.com/seata/seata/pull/6002)] remove fst serialization
- [[#6045](https://github.com/seata/seata/pull/6045)] optimize derivative product check base on mysql


### security:
- [[#5642](https://github.com/seata/seata/pull/5642)] add Hessian Serializer WhiteDenyList
- [[#5694](https://github.com/seata/seata/pull/5694)] fix several node.js security vulnerabilities
Expand Down
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6201](https://github.com/apache/incubator-seata/pull/6201)] restore required_status_checks kept to remove context validation
- [[#6218](https://github.com/apache/incubator-seata/pull/6218)] remove Seata-Docker link
- [[#6227](https://github.com/apache/incubator-seata/pull/6227)] validate that the primary key is free of illegal characters
- [[#6004](https://github.com/seata/seata/pull/6004)] optimize RM TM startup connect server fail fast

### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities
Expand Down Expand Up @@ -80,5 +81,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [PeppaO](https://github.com/PeppaO)
- [AlbumenJ](https://github.com/AlbumenJ)
- [dreamskyvision](https://github.com/dreamskyvision)
- [jsbxyyx](https://github.com/jsbxyyx)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
- [[#6201](https://github.com/apache/incubator-seata/pull/6201)] 恢复required_status_checks但去除context校验
- [[#6218](https://github.com/apache/incubator-seata/pull/6218)] 移除Seata-Docker链接
- [[#6227](https://github.com/apache/incubator-seata/pull/6227)] 校验pk中不含逗号
- [[#6004](https://github.com/seata/seata/pull/6004)] 优化RM,TM连接server快速失败

### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞
Expand Down Expand Up @@ -79,5 +80,6 @@
- [PeppaO](https://github.com/PeppaO)
- [AlbumenJ](https://github.com/AlbumenJ)
- [dreamskyvision](https://github.com/dreamskyvision)
- [jsbxyyx](https://github.com/jsbxyyx)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
10 changes: 10 additions & 0 deletions common/src/main/java/io/seata/common/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,16 @@ public interface ConfigurationKeys {
*/
String ENABLE_TM_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableTmClientBatchSendRequest";

/**
* The constant ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST

This comment has been minimized.

Copy link
@lcxw

lcxw Jan 4, 2024

maybe the comment is The constant ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST

*/
String ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST = TRANSPORT_PREFIX + "enableTmClientChannelCheckFailFast";

/**
* The constant ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST
*/
String ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST = TRANSPORT_PREFIX + "enableRmClientChannelCheckFailFast";

/**
* The constant ENABLE_RM_CLIENT_BATCH_SEND_REQUEST
*/
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public interface DefaultValues {
boolean DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST = true;
boolean DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE = false;

boolean DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST = true;

String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss";
String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker";
String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler";
Expand Down
28 changes: 28 additions & 0 deletions common/src/main/java/io/seata/common/util/StringUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -399,4 +400,31 @@ public static boolean hasUpperCase(String str) {
return false;
}

public static String join(Iterator iterator, String separator) {
if (iterator == null) {
return null;
}
if (!iterator.hasNext()) {
return EMPTY;
}
Object first = iterator.next();
if (!iterator.hasNext()) {
return first == null ? "" : first.toString();
}
StringBuilder builder = new StringBuilder(256);
if (first != null) {
builder.append(first);
}
while (iterator.hasNext()) {
if (separator != null) {
builder.append(separator);
}
Object obj = iterator.next();
if (obj != null) {
builder.append(obj);
}
}
return builder.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
public abstract class AbstractNettyRemoting implements Disposable {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemoting.class);

/**
* The Timer executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting

@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
timerExecutor.scheduleAtFixedRate(() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,73 @@ void destroyChannel(String serverAddress, Channel channel) {
}
}

/**
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
doReconnect(transactionServiceGroup, false);
}

/**
* Init reconnect to remote server of current transaction service group.
* @param transactionServiceGroup
* @param failFast
*/
void initReconnect(String transactionServiceGroup, boolean failFast) {
doReconnect(transactionServiceGroup, failFast);
}

/**
* reconnect to remote server of current transaction service group.
* @param transactionServiceGroup
* @param failFast
*/
void doReconnect(String transactionServiceGroup, boolean failFast) {
List<String> availList;
try {
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
throwFailFastException(failFast, "Failed to get available servers");
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);

if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
throwFailFastException(failFast, "can not get cluster name in registry config.");
return;
}

if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
throwFailFastException(failFast, "no available service found in cluster.");
return;
}
try {
doReconnect(availList, transactionServiceGroup);
} catch (Exception e) {
if (failFast) {
throw e;
}
LOGGER.error("connect server failed. {}", e.getMessage(), e);
}
}

/**
* Reconnect to remote server of current transaction service group.
*
* @param availList avail list
* @param transactionServiceGroup transaction service group
*/
void reconnect(List<String> availList, String transactionServiceGroup) {
void doReconnect(List<String> availList, String transactionServiceGroup) {
Set<String> channelAddress = new HashSet<>(availList.size());
Map<String, Exception> failedMap = new HashMap<>();
try {
Expand All @@ -178,12 +238,17 @@ void reconnect(List<String> availList, String transactionServiceGroup) {
}
if (failedMap.size() > 0) {
if (LOGGER.isInfoEnabled()) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), failedMap.keySet(), failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
failedMap.keySet(),
failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
} else if (LOGGER.isDebugEnabled()) {
failedMap.forEach((key, value) -> {
LOGGER.error("{} can not connect to {} cause:{} trace information:{}", FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
LOGGER.error("{} can not connect to {} cause:{} trace information:{}",
FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
});
}
String invalidAddress = StringUtils.join(failedMap.keySet().iterator(), ", ");
throw new FrameworkException("can not connect to [" + invalidAddress + "]");
}
} finally {
if (CollectionUtils.isNotEmpty(channelAddress)) {
Expand All @@ -199,38 +264,6 @@ void reconnect(List<String> availList, String transactionServiceGroup) {
}
}

/**
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList;
try {
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);

if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}

if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
reconnect(availList, transactionServiceGroup);
}

void invalidateObject(final String serverAddress, final Channel channel) throws Exception {
nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);
}
Expand Down Expand Up @@ -297,5 +330,12 @@ private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {
}
return null;
}

private void throwFailFastException(boolean failFast, String message) {
if (failFast) {
throw new FrameworkException(message);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,5 @@ public String getRmDispatchThreadPrefix() {
public static boolean isEnableClientBatchSendRequest() {
return ENABLE_CLIENT_BATCH_SEND_REQUEST;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ public void init() {
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}
}
}
Expand Down Expand Up @@ -214,7 +217,10 @@ public void registerResource(String resourceGroupId, String resourceId) {
}

if (getClientChannelManager().getChannels().isEmpty()) {
getClientChannelManager().reconnect(transactionServiceGroup);
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
return;
}
synchronized (getClientChannelManager().getChannels()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ private String getExtraData() {
}

private void initConnection() {
getClientChannelManager().reconnect(transactionServiceGroup);
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}

}
50 changes: 50 additions & 0 deletions core/src/test/java/io/seata/core/rpc/netty/RmNettyClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,27 @@
*/
package io.seata.core.rpc.netty;

import io.seata.common.ConfigurationKeys;
import io.seata.common.DefaultValues;
import io.seata.common.exception.FrameworkException;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationChangeEvent;
import io.seata.config.ConfigurationChangeListener;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.Resource;
import io.seata.core.model.ResourceManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -31,6 +48,20 @@
*/
class RmNettyClientTest {

Logger logger = LoggerFactory.getLogger(getClass());

@BeforeAll
public static void beforeAll() {
RmNettyRemotingClient.getInstance().destroy();
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true");
}

@AfterAll
public static void afterAll() {
RmNettyRemotingClient.getInstance().destroy();
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false");
}

@Test
public void assertGetInstanceAfterDestroy() {
RmNettyRemotingClient oldClient = RmNettyRemotingClient.getInstance("ap", "group");
Expand All @@ -47,6 +78,25 @@ public void assertGetInstanceAfterDestroy() {
assertTrue(initialized.get());
newClient.destroy();
}

@Test
public void testCheckFailFast() throws Exception {
RmNettyRemotingClient newClient = RmNettyRemotingClient.getInstance("fail_fast", "default_tx_group");

ResourceManager resourceManager = Mockito.mock(ResourceManager.class);
Resource mockResource = Mockito.mock(Resource.class);
Map<String, Resource> resourceMap = new HashMap<>();
resourceMap.put("jdbc:xx://localhost/test", mockResource);
Mockito.when(resourceManager.getManagedResources()).thenReturn(resourceMap);
newClient.setResourceManager(resourceManager);
System.setProperty("file.listener.enabled", "true");
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
event -> logger.info("dataId:{}, value: {}, oldValue: {}", event.getDataId(), event.getNewValue(),
event.getOldValue()));
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true");
Thread.sleep(2000);
Assertions.assertThrows(FrameworkException.class, newClient::init);
}

private AtomicBoolean getInitializeStatus(final RmNettyRemotingClient rmNettyRemotingClient) {
try {
Expand Down
Loading

0 comments on commit 2363715

Please sign in to comment.