Skip to content

Commit

Permalink
Add MasterCoordinatorListener
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Dec 5, 2024
1 parent 1c92f7e commit 2cf0153
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,7 @@ public MasterCoordinator(final Registry registry, final MasterConfig masterConfi
registry,
RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
masterConfig.getMasterAddress());

addServerStatusChangeListener(new AbstractServerStatusChangeListener() {

@Override
public void changeToActive() {
onActive();
}

@Override
public void changeToStandBy() {
onStandBy();
}
});
addServerStatusChangeListener(new MasterCoordinatorListener(taskGroupCoordinator));
}

@Override
Expand All @@ -70,12 +58,23 @@ public void close() {
log.info("MasterCoordinator shutdown...");
}

private void onActive() {
taskGroupCoordinator.start();
}
public static class MasterCoordinatorListener extends AbstractServerStatusChangeListener {

private void onStandBy() {
taskGroupCoordinator.close();
private final TaskGroupCoordinator taskGroupCoordinator;

public MasterCoordinatorListener(TaskGroupCoordinator taskGroupCoordinator) {
this.taskGroupCoordinator = taskGroupCoordinator;
}

@Override
public void changeToActive() {
taskGroupCoordinator.start();
}

@Override
public void changeToStandBy() {
taskGroupCoordinator.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
@Override
public synchronized void close() {
if (!flag) {
log.error("TaskGroupCoordinator is already closed");
log.warn("TaskGroupCoordinator is already closed");
return;
}
flag = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
@Component
public class WorkflowEngine implements AutoCloseable {

@Autowired
private TaskGroupCoordinator taskGroupCoordinator;

@Autowired
private WorkflowEventBusCoordinator workflowEventBusCoordinator;

Expand All @@ -47,8 +44,6 @@ public class WorkflowEngine implements AutoCloseable {

public void start() {

taskGroupCoordinator.start();

workflowEventBusCoordinator.start();

commandEngine.start();
Expand All @@ -65,9 +60,7 @@ public void close() throws Exception {
try (
final CommandEngine ignore1 = commandEngine;
final WorkflowEventBusCoordinator ignore2 = workflowEventBusCoordinator;
final GlobalTaskDispatchWaitingQueueLooper ignore3 =
globalTaskDispatchWaitingQueueLooper;
final TaskGroupCoordinator ignore4 = taskGroupCoordinator;
final GlobalTaskDispatchWaitingQueueLooper ignore3 = globalTaskDispatchWaitingQueueLooper;
final LogicTaskEngineDelegator ignore5 = logicTaskEngineDelegator) {
// closed the resource
}
Expand Down
2 changes: 1 addition & 1 deletion dolphinscheduler-master/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
</appender>

<!-- We use OFF here to avoid too many exception log in CI -->
<root level="INFO">
<root level="OFF">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;

Expand All @@ -41,6 +42,10 @@ public abstract class AbstractHAServer implements HAServer {

private final List<ServerStatusChangeListener> serverStatusChangeListeners;

private static final long DEFAULT_RETRY_INTERVAL = 5_000;

private static final int DEFAULT_MAX_RETRY_TIMES = 20;

public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) {
this.registry = registry;
this.selectorPath = checkNotNull(selectorPath);
Expand Down Expand Up @@ -78,21 +83,30 @@ public boolean isActive() {
@Override
public boolean participateElection() {
final String electionLock = selectorPath + "-lock";
try {
if (registry.acquireLock(electionLock)) {
if (!registry.exists(selectorPath)) {
registry.put(selectorPath, serverIdentify, true);
return true;
// If meet exception during participate election, will retry.
// This can avoid the situation that the server is not elected as leader due to network jitter.
for (int i = 0; i < DEFAULT_MAX_RETRY_TIMES; i++) {
try {
try {
if (registry.acquireLock(electionLock)) {
if (!registry.exists(selectorPath)) {
registry.put(selectorPath, serverIdentify, true);
return true;
}
return serverIdentify.equals(registry.get(selectorPath));
}
return false;
} finally {
registry.releaseLock(electionLock);
}
return serverIdentify.equals(registry.get(selectorPath));
} catch (Exception e) {
log.error("Participate election error, meet an exception, will retry after {}ms",
DEFAULT_RETRY_INTERVAL, e);
ThreadUtils.sleep(DEFAULT_RETRY_INTERVAL);
}
return false;
} catch (Exception e) {
log.error("participate election error", e);
return false;
} finally {
registry.releaseLock(electionLock);
}
throw new IllegalStateException(
"Participate election failed after retry " + DEFAULT_MAX_RETRY_TIMES + " times");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

public interface ServerStatusChangeListener {

void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus);
void change(final HAServer.ServerStatus originStatus, final HAServer.ServerStatus currentStatus);

}

0 comments on commit 2cf0153

Please sign in to comment.