Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: RM TM startup connect server fail fast #6004

Merged
merged 40 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
69e4497
optimize: RM TM startup connect server fail fast
jsbxyyx Nov 7, 2023
d089244
add change
jsbxyyx Nov 7, 2023
89186d2
judge thread name
jsbxyyx Nov 9, 2023
7b01e32
Merge branch '2.x' of github.com:seata/seata into check2
jsbxyyx Nov 10, 2023
3e20f87
do reconnect
jsbxyyx Nov 10, 2023
34be22e
revert
jsbxyyx Nov 10, 2023
99562b7
Merge branch '2.x' into check2
funky-eyes Nov 11, 2023
9203558
Merge branch '2.x' into check2
jsbxyyx Nov 13, 2023
303e60e
Merge branch '2.x' into check2
funky-eyes Nov 27, 2023
849d767
Merge branch '2.x' into check2
funky-eyes Nov 28, 2023
232f187
Merge branch '2.x' into check2
funky-eyes Nov 29, 2023
e6a7e01
Merge branch '2.x' of github.com:seata/seata into check2
jsbxyyx Dec 1, 2023
2197068
add error log
jsbxyyx Dec 1, 2023
f9e88ff
Merge branch '2.x' into check2
funky-eyes Dec 5, 2023
2789bb1
Merge branch '2.x' into check2
jsbxyyx Dec 7, 2023
acc2ba6
fix: reviews
jsbxyyx Dec 7, 2023
0c62533
fix: available list empty
jsbxyyx Dec 11, 2023
16a2837
Merge branch '2.x' into check2
funky-eyes Dec 16, 2023
b7fd7af
fix reviews
jsbxyyx Dec 18, 2023
525e38e
Merge branch 'check2' of github.com:jsbxyyx/seata into check2
jsbxyyx Dec 18, 2023
e549797
Merge branch '2.x' into check2
funky-eyes Dec 18, 2023
ad32f72
fail fast default true
jsbxyyx Dec 22, 2023
04ae7d3
Merge branch '2.x' into check2
jsbxyyx Dec 22, 2023
a6e5276
fix: test
jsbxyyx Dec 22, 2023
583b257
Merge branch 'check2' of github.com:jsbxyyx/seata into check2
jsbxyyx Dec 22, 2023
a144b0c
fix test
jsbxyyx Dec 22, 2023
16bfd7f
add test
jsbxyyx Jan 4, 2024
f4d33a2
test
funky-eyes Jan 4, 2024
6814bc8
test
funky-eyes Jan 4, 2024
8fde8ec
test
funky-eyes Jan 4, 2024
8c32c3f
test
funky-eyes Jan 4, 2024
a78e7b1
test
funky-eyes Jan 4, 2024
c9ca778
test
funky-eyes Jan 4, 2024
7a5886d
test
funky-eyes Jan 4, 2024
e054f4e
update changes md
jsbxyyx Jan 4, 2024
c9a3357
Merge branch '2.x' into check2
jsbxyyx Jan 4, 2024
9062be8
Update changes/en-us/2.x.md
funky-eyes Jan 4, 2024
e5bb5ee
Update core/src/test/java/io/seata/core/rpc/netty/TmNettyClientTest.java
funky-eyes Jan 4, 2024
af9db4d
Update core/src/test/java/io/seata/core/rpc/netty/RmNettyClientTest.java
funky-eyes Jan 4, 2024
0893fca
Update core/src/test/java/io/seata/core/rpc/netty/TmNettyClientTest.java
funky-eyes Jan 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ The version is updated as follows:
- [[#6002](https://github.com/seata/seata/pull/6002)] remove fst serialization
- [[#6045](https://github.com/seata/seata/pull/6045)] optimize derivative product check base on mysql


### security:
- [[#5642](https://github.com/seata/seata/pull/5642)] add Hessian Serializer WhiteDenyList
- [[#5694](https://github.com/seata/seata/pull/5694)] fix several node.js security vulnerabilities
Expand Down
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6201](https://github.com/apache/incubator-seata/pull/6201)] restore required_status_checks kept to remove context validation
- [[#6218](https://github.com/apache/incubator-seata/pull/6218)] remove Seata-Docker link
- [[#6227](https://github.com/apache/incubator-seata/pull/6227)] validate that the primary key is free of illegal characters
- [[#6004](https://github.com/seata/seata/pull/6004)] optimize RM TM startup connect server fail fast

### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities
Expand Down Expand Up @@ -80,5 +81,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [PeppaO](https://github.com/PeppaO)
- [AlbumenJ](https://github.com/AlbumenJ)
- [dreamskyvision](https://github.com/dreamskyvision)
- [jsbxyyx](https://github.com/jsbxyyx)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
- [[#6201](https://github.com/apache/incubator-seata/pull/6201)] 恢复required_status_checks但去除context校验
- [[#6218](https://github.com/apache/incubator-seata/pull/6218)] 移除Seata-Docker链接
- [[#6227](https://github.com/apache/incubator-seata/pull/6227)] 校验pk中不含逗号
- [[#6004](https://github.com/seata/seata/pull/6004)] 优化RM,TM连接server快速失败

### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞
Expand Down Expand Up @@ -79,5 +80,6 @@
- [PeppaO](https://github.com/PeppaO)
- [AlbumenJ](https://github.com/AlbumenJ)
- [dreamskyvision](https://github.com/dreamskyvision)
- [jsbxyyx](https://github.com/jsbxyyx)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
10 changes: 10 additions & 0 deletions common/src/main/java/io/seata/common/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,16 @@ public interface ConfigurationKeys {
*/
String ENABLE_TM_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableTmClientBatchSendRequest";

/**
* The constant ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST
*/
String ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST = TRANSPORT_PREFIX + "enableTmClientChannelCheckFailFast";

/**
* The constant ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST
*/
String ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST = TRANSPORT_PREFIX + "enableRmClientChannelCheckFailFast";

/**
* The constant ENABLE_RM_CLIENT_BATCH_SEND_REQUEST
*/
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public interface DefaultValues {
boolean DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST = true;
boolean DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE = false;

boolean DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST = true;

String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss";
String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker";
String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler";
Expand Down
28 changes: 28 additions & 0 deletions common/src/main/java/io/seata/common/util/StringUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -399,4 +400,31 @@ public static boolean hasUpperCase(String str) {
return false;
}

public static String join(Iterator iterator, String separator) {
if (iterator == null) {
return null;
}
if (!iterator.hasNext()) {
return EMPTY;
}
Object first = iterator.next();
if (!iterator.hasNext()) {
return first == null ? "" : first.toString();
}
StringBuilder builder = new StringBuilder(256);
if (first != null) {
builder.append(first);
}
while (iterator.hasNext()) {
if (separator != null) {
builder.append(separator);
}
Object obj = iterator.next();
if (obj != null) {
builder.append(obj);
}
}
return builder.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
public abstract class AbstractNettyRemoting implements Disposable {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemoting.class);

/**
* The Timer executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting

@Override
public void init() {
jsbxyyx marked this conversation as resolved.
Show resolved Hide resolved
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
timerExecutor.scheduleAtFixedRate(() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,73 @@ void destroyChannel(String serverAddress, Channel channel) {
}
}

/**
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
doReconnect(transactionServiceGroup, false);
}

/**
* Init reconnect to remote server of current transaction service group.
* @param transactionServiceGroup
* @param failFast
*/
void initReconnect(String transactionServiceGroup, boolean failFast) {
doReconnect(transactionServiceGroup, failFast);
}

/**
* reconnect to remote server of current transaction service group.
* @param transactionServiceGroup
* @param failFast
*/
void doReconnect(String transactionServiceGroup, boolean failFast) {
List<String> availList;
try {
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
throwFailFastException(failFast, "Failed to get available servers");
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);

if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
throwFailFastException(failFast, "can not get cluster name in registry config.");
return;
}

if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
throwFailFastException(failFast, "no available service found in cluster.");
return;
}
try {
doReconnect(availList, transactionServiceGroup);
} catch (Exception e) {
if (failFast) {
throw e;
}
LOGGER.error("connect server failed. {}", e.getMessage(), e);
}
}

/**
* Reconnect to remote server of current transaction service group.
*
* @param availList avail list
* @param transactionServiceGroup transaction service group
*/
void reconnect(List<String> availList, String transactionServiceGroup) {
void doReconnect(List<String> availList, String transactionServiceGroup) {
Set<String> channelAddress = new HashSet<>(availList.size());
Map<String, Exception> failedMap = new HashMap<>();
try {
Expand All @@ -178,12 +238,17 @@ void reconnect(List<String> availList, String transactionServiceGroup) {
}
if (failedMap.size() > 0) {
if (LOGGER.isInfoEnabled()) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), failedMap.keySet(), failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
failedMap.keySet(),
failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
} else if (LOGGER.isDebugEnabled()) {
failedMap.forEach((key, value) -> {
LOGGER.error("{} can not connect to {} cause:{} trace information:{}", FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
LOGGER.error("{} can not connect to {} cause:{} trace information:{}",
FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
});
}
String invalidAddress = StringUtils.join(failedMap.keySet().iterator(), ", ");
throw new FrameworkException("can not connect to [" + invalidAddress + "]");
}
} finally {
if (CollectionUtils.isNotEmpty(channelAddress)) {
Expand All @@ -199,38 +264,6 @@ void reconnect(List<String> availList, String transactionServiceGroup) {
}
}

/**
* Reconnect to remote server of current transaction service group.
*
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList;
try {
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);

if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}

if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
reconnect(availList, transactionServiceGroup);
}

void invalidateObject(final String serverAddress, final Channel channel) throws Exception {
nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);
}
Expand Down Expand Up @@ -297,5 +330,12 @@ private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {
}
return null;
}

private void throwFailFastException(boolean failFast, String message) {
if (failFast) {
throw new FrameworkException(message);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,5 @@ public String getRmDispatchThreadPrefix() {
public static boolean isEnableClientBatchSendRequest() {
return ENABLE_CLIENT_BATCH_SEND_REQUEST;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ public void init() {
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}
}
}
Expand Down Expand Up @@ -214,7 +217,10 @@ public void registerResource(String resourceGroupId, String resourceId) {
}

if (getClientChannelManager().getChannels().isEmpty()) {
getClientChannelManager().reconnect(transactionServiceGroup);
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
return;
}
synchronized (getClientChannelManager().getChannels()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ private String getExtraData() {
}

private void initConnection() {
getClientChannelManager().reconnect(transactionServiceGroup);
boolean failFast = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ENABLE_TM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
}

}
50 changes: 50 additions & 0 deletions core/src/test/java/io/seata/core/rpc/netty/RmNettyClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,27 @@
*/
package io.seata.core.rpc.netty;

import io.seata.common.ConfigurationKeys;
import io.seata.common.DefaultValues;
import io.seata.common.exception.FrameworkException;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationChangeEvent;
import io.seata.config.ConfigurationChangeListener;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.Resource;
import io.seata.core.model.ResourceManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -31,6 +48,20 @@
*/
class RmNettyClientTest {

Logger logger = LoggerFactory.getLogger(getClass());

@BeforeAll
public static void beforeAll() {
RmNettyRemotingClient.getInstance().destroy();
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true");
}

@AfterAll
public static void afterAll() {
RmNettyRemotingClient.getInstance().destroy();
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "false");
}

@Test
public void assertGetInstanceAfterDestroy() {
RmNettyRemotingClient oldClient = RmNettyRemotingClient.getInstance("ap", "group");
Expand All @@ -47,6 +78,25 @@ public void assertGetInstanceAfterDestroy() {
assertTrue(initialized.get());
newClient.destroy();
}

@Test
public void testCheckFailFast() throws Exception {
RmNettyRemotingClient newClient = RmNettyRemotingClient.getInstance("fail_fast", "default_tx_group");

ResourceManager resourceManager = Mockito.mock(ResourceManager.class);
Resource mockResource = Mockito.mock(Resource.class);
Map<String, Resource> resourceMap = new HashMap<>();
resourceMap.put("jdbc:xx://localhost/test", mockResource);
Mockito.when(resourceManager.getManagedResources()).thenReturn(resourceMap);
newClient.setResourceManager(resourceManager);
System.setProperty("file.listener.enabled", "true");
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
event -> logger.info("dataId:{}, value: {}, oldValue: {}", event.getDataId(), event.getNewValue(),
event.getOldValue()));
System.setProperty(ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST, "true");
Thread.sleep(2000);
Assertions.assertThrows(FrameworkException.class, newClient::init);
funky-eyes marked this conversation as resolved.
Show resolved Hide resolved
}

private AtomicBoolean getInitializeStatus(final RmNettyRemotingClient rmNettyRemotingClient) {
try {
Expand Down
Loading
Loading