Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chore] Fix JdbcRegistryTestCase might failed due to purge dead clients interval is too small #16894

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ public static void sleep(final long millis) {
log.error("Current thread sleep error", interruptedException);
}
}

public static void rethrowInterruptedException(InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeException("Current thread: " + Thread.currentThread().getName() + " is interrupted",
interruptedException);
}

public static void consumeInterruptedException(InterruptedException interruptedException) {
log.info("Current thread: {} is interrupted", Thread.currentThread().getName(), interruptedException);
Thread.currentThread().interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand All @@ -48,7 +49,7 @@
@Slf4j
class NettyRemotingServer {

private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private Channel serverBootstrapChannel;

@Getter
private final String serverName;
Expand Down Expand Up @@ -87,7 +88,7 @@ class NettyRemotingServer {

void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
Expand All @@ -104,23 +105,24 @@ protected void initChannel(SocketChannel ch) {
}
});

ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
final ChannelFuture channelFuture = serverBootstrap.bind(serverConfig.getListenPort()).sync();
if (channelFuture.isSuccess()) {
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
this.serverBootstrapChannel = channelFuture.channel();
} else {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(),
serverConfig.getListenPort()),
channelFuture.cause());
}
} catch (InterruptedException it) {
ThreadUtils.rethrowInterruptedException(it);
} catch (Exception e) {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
e);
}

if (future.isSuccess()) {
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
return;
}

throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
future.cause());
}
}

Expand All @@ -144,18 +146,25 @@ void registerMethodInvoker(ServerMethodInvoker methodInvoker) {

void close() {
if (isStarted.compareAndSet(true, false)) {
log.info("{} closing", serverConfig.getServerName());
try {
if (serverBootstrapChannel != null) {
serverBootstrapChannel.close().sync();
log.info("{} stop bind at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
}
if (bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
methodInvokerExecutor.shutdown();
methodInvokerExecutor.shutdownNow();
} catch (InterruptedException it) {
ThreadUtils.consumeInterruptedException(it);
} catch (Exception ex) {
log.error("netty server close exception", ex);
log.error("{} close failed", serverConfig.getServerName(), ex);
}
log.info("netty server closed");
log.info("{} closed", serverConfig.getServerName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ class MasterRpcServerTest {
private final MasterRpcServer masterRpcServer = new MasterRpcServer(new MasterConfig());

@Test
void testStart() {
void testStartAndClose() {
Assertions.assertDoesNotThrow(masterRpcServer::start);
}

@Test
void testClose() {
Assertions.assertDoesNotThrow(masterRpcServer::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.registry.api;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;

Expand Down Expand Up @@ -109,4 +110,7 @@ public interface Registry extends Closeable {
* Release the lock of the prefix {@param key}
*/
boolean releaseLock(String key);

@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.dolphinscheduler.registry.api.SubscribeListener;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -172,8 +173,8 @@ public void testChildren() {
registry.put(master1, value, true);
registry.put(master2, value, true);
assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
"127.0.0.2:8080");
assertThat(registry.children("/nodes/children/childGroup1")).containsExactlyElementsIn(
Arrays.asList("127.0.0.1:8080", "127.0.0.2:8080"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public final class JdbcRegistry implements Registry {
JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryProperties, jdbcRegistryServer);
log.info("Initialize Jdbc Registry...");
this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryServer);
log.info("Initialized Jdbc Registry...");
}

@Override
Expand Down Expand Up @@ -259,13 +259,14 @@ public boolean releaseLock(String key) {

@Override
public void close() {
log.info("Closing Jdbc Registry...");
log.info("Closing JdbcRegistry...");
// remove the current Ephemeral node, if can connect to jdbc
try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
try (final JdbcRegistryClient closed1 = jdbcRegistryClient) {
// ignore
} catch (Exception e) {
log.error("Close Jdbc Registry error", e);
log.error("Close JdbcRegistry error", e);
}
log.info("Closed Jdbc Registry...");
log.info("Closed JdbcRegistry...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO;
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener;
Expand All @@ -41,14 +40,11 @@ public class JdbcRegistryClient implements IJdbcRegistryClient {

private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_" + OSUtils.getProcessID();

private final JdbcRegistryProperties jdbcRegistryProperties;

private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify;

private final IJdbcRegistryServer jdbcRegistryServer;

public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
public JdbcRegistryClient(IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClientIdentify =
new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(), DEFAULT_CLIENT_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void start() {
purgeInvalidJdbcRegistryMetadata();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::purgeInvalidJdbcRegistryMetadata,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getSessionTimeout().toMillis(),
jdbcRegistryProperties.getSessionTimeout().toMillis(),
TimeUnit.MILLISECONDS);
jdbcRegistryDataManager.start();
jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
Expand Down Expand Up @@ -149,13 +149,13 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) {
@Override
public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) {
checkNotNull(jdbcRegistryClient);
jdbcRegistryClients.remove(jdbcRegistryClient);
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());
final JdbcRegistryClientIdentify clientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify();
checkNotNull(clientIdentify);

JdbcRegistryClientIdentify jdbcRegistryClientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify();
checkNotNull(jdbcRegistryClientIdentify);
jdbcRegistryClients.removeIf(client -> clientIdentify.equals(client.getJdbcRegistryClientIdentify()));
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());

doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId()));
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(clientIdentify.getClientId()));
}

@Override
Expand Down
Loading