From 2cf01539dc50b03d947ca34c8042b427a788082b Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 5 Dec 2024 20:13:43 +0800 Subject: [PATCH] Add MasterCoordinatorListener --- .../master/engine/MasterCoordinator.java | 35 +++++++++-------- .../master/engine/TaskGroupCoordinator.java | 2 +- .../server/master/engine/WorkflowEngine.java | 9 +---- .../src/test/resources/logback.xml | 2 +- .../registry/api/ha/AbstractHAServer.java | 38 +++++++++++++------ .../api/ha/ServerStatusChangeListener.java | 2 +- 6 files changed, 47 insertions(+), 41 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java index 65e96101ba8e..50dc14af5527 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java @@ -43,19 +43,7 @@ public MasterCoordinator(final Registry registry, final MasterConfig masterConfi registry, RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(), masterConfig.getMasterAddress()); - - addServerStatusChangeListener(new AbstractServerStatusChangeListener() { - - @Override - public void changeToActive() { - onActive(); - } - - @Override - public void changeToStandBy() { - onStandBy(); - } - }); + addServerStatusChangeListener(new MasterCoordinatorListener(taskGroupCoordinator)); } @Override @@ -70,12 +58,23 @@ public void close() { log.info("MasterCoordinator shutdown..."); } - private void onActive() { - taskGroupCoordinator.start(); - } + public static class MasterCoordinatorListener extends AbstractServerStatusChangeListener { - private void onStandBy() { - taskGroupCoordinator.close(); + private final TaskGroupCoordinator taskGroupCoordinator; + + public MasterCoordinatorListener(TaskGroupCoordinator taskGroupCoordinator) { + this.taskGroupCoordinator = taskGroupCoordinator; + } + + @Override + public void changeToActive() { + taskGroupCoordinator.start(); + } + + @Override + public void changeToStandBy() { + taskGroupCoordinator.close(); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java index d0b6e555c2fc..8896eede78fe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java @@ -489,7 +489,7 @@ private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) { @Override public synchronized void close() { if (!flag) { - log.error("TaskGroupCoordinator is already closed"); + log.warn("TaskGroupCoordinator is already closed"); return; } flag = false; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java index 75bc6b4527ef..6639fc553b35 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEngine.java @@ -30,9 +30,6 @@ @Component public class WorkflowEngine implements AutoCloseable { - @Autowired - private TaskGroupCoordinator taskGroupCoordinator; - @Autowired private WorkflowEventBusCoordinator workflowEventBusCoordinator; @@ -47,8 +44,6 @@ public class WorkflowEngine implements AutoCloseable { public void start() { - taskGroupCoordinator.start(); - workflowEventBusCoordinator.start(); commandEngine.start(); @@ -65,9 +60,7 @@ public void close() throws Exception { try ( final CommandEngine ignore1 = commandEngine; final WorkflowEventBusCoordinator ignore2 = workflowEventBusCoordinator; - final GlobalTaskDispatchWaitingQueueLooper ignore3 = - globalTaskDispatchWaitingQueueLooper; - final TaskGroupCoordinator ignore4 = taskGroupCoordinator; + final GlobalTaskDispatchWaitingQueueLooper ignore3 = globalTaskDispatchWaitingQueueLooper; final LogicTaskEngineDelegator ignore5 = logicTaskEngineDelegator) { // closed the resource } diff --git a/dolphinscheduler-master/src/test/resources/logback.xml b/dolphinscheduler-master/src/test/resources/logback.xml index 5debebf4eedc..2af91b3daa58 100644 --- a/dolphinscheduler-master/src/test/resources/logback.xml +++ b/dolphinscheduler-master/src/test/resources/logback.xml @@ -68,7 +68,7 @@ - + diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java index 822b4d93f4a8..c87f341a30a9 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Registry; @@ -41,6 +42,10 @@ public abstract class AbstractHAServer implements HAServer { private final List serverStatusChangeListeners; + private static final long DEFAULT_RETRY_INTERVAL = 5_000; + + private static final int DEFAULT_MAX_RETRY_TIMES = 20; + public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) { this.registry = registry; this.selectorPath = checkNotNull(selectorPath); @@ -78,21 +83,30 @@ public boolean isActive() { @Override public boolean participateElection() { final String electionLock = selectorPath + "-lock"; - try { - if (registry.acquireLock(electionLock)) { - if (!registry.exists(selectorPath)) { - registry.put(selectorPath, serverIdentify, true); - return true; + // If meet exception during participate election, will retry. + // This can avoid the situation that the server is not elected as leader due to network jitter. + for (int i = 0; i < DEFAULT_MAX_RETRY_TIMES; i++) { + try { + try { + if (registry.acquireLock(electionLock)) { + if (!registry.exists(selectorPath)) { + registry.put(selectorPath, serverIdentify, true); + return true; + } + return serverIdentify.equals(registry.get(selectorPath)); + } + return false; + } finally { + registry.releaseLock(electionLock); } - return serverIdentify.equals(registry.get(selectorPath)); + } catch (Exception e) { + log.error("Participate election error, meet an exception, will retry after {}ms", + DEFAULT_RETRY_INTERVAL, e); + ThreadUtils.sleep(DEFAULT_RETRY_INTERVAL); } - return false; - } catch (Exception e) { - log.error("participate election error", e); - return false; - } finally { - registry.releaseLock(electionLock); } + throw new IllegalStateException( + "Participate election failed after retry " + DEFAULT_MAX_RETRY_TIMES + " times"); } @Override diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java index af109228e230..18fbed8c794a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java @@ -19,6 +19,6 @@ public interface ServerStatusChangeListener { - void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus); + void change(final HAServer.ServerStatus originStatus, final HAServer.ServerStatus currentStatus); }