From a7d296f423a9d4519c4e6a3d230818c5f1b32a64 Mon Sep 17 00:00:00 2001 From: SbloodyS <460888207@qq.com> Date: Fri, 6 Dec 2024 14:22:14 +0800 Subject: [PATCH 1/4] improvement 16880 --- ...ProjectWorkerGroupRelationServiceImpl.java | 2 +- .../service/impl/WorkerGroupServiceImpl.java | 3 +- .../server/master/cluster/WorkerClusters.java | 56 +++++++++++++------ .../master/cluster/WorkerClustersTest.java | 16 +++--- 4 files changed, 49 insertions(+), 28 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java index 3e6e4617f55d..94c6eec53bc4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java @@ -187,7 +187,7 @@ public Map queryAssignedWorkerGroupsByProject(User loginUser, Lo projectWorkerGroup.setProjectCode(projectCode); projectWorkerGroup.setWorkerGroup(workerGroup); return projectWorkerGroup; - }).collect(Collectors.toList()); + }).distinct().collect(Collectors.toList()); result.put(Constants.DATA_LIST, projectWorkerGroups); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 52db3bb7b0af..eefe30767026 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -303,7 +303,7 @@ public Map queryAllGroup(User loginUser) { .map(WorkerGroup::getName) .collect(Collectors.toList()); availableWorkerGroupList.addAll(configWorkerGroupNames); - result.put(Constants.DATA_LIST, availableWorkerGroupList); + result.put(Constants.DATA_LIST, availableWorkerGroupList.stream().distinct().collect(Collectors.toList())); putMsg(result, Status.SUCCESS); return result; } @@ -362,6 +362,7 @@ public Map deleteWorkerGroupById(User loginUser, Integer id) { } workerGroupDao.deleteById(id); + boardCastToMasterThatWorkerGroupChanged(); log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName()); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java index 509c54f3fba4..1716e78b0719 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java @@ -43,8 +43,11 @@ public class WorkerClusters extends AbstractClusterSubscribeListener worker private final Map workerMapping = new ConcurrentHashMap<>(); - // WorkerGroup -> WorkerIdentifier(workerAddress) - private final Map> workerGroupMapping = new ConcurrentHashMap<>(); + // WorkerGroup from db -> WorkerIdentifier(workerAddress) + private final Map> dbWorkerGroupMapping = new ConcurrentHashMap<>(); + + // WorkerGroup from config -> WorkerIdentifier(workerAddress) + private final Map> configWorkerGroupMapping = new ConcurrentHashMap<>(); private final List> workerClusterChangeListeners = new CopyOnWriteArrayList<>(); @@ -59,27 +62,44 @@ public Optional getServer(final String address) { return Optional.ofNullable(workerMapping.get(address)); } - public List getWorkerServerAddressByGroup(String workerGroup) { + public List getDbWorkerServerAddressByGroup(String workerGroup) { + if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) { + return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet())); + } + return dbWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList()); + } + + public List getConfigWorkerServerAddressByGroup(String workerGroup) { if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) { return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet())); } - return workerGroupMapping.getOrDefault(workerGroup, Collections.emptyList()); + return configWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList()); } public List getNormalWorkerServerAddressByGroup(String workerGroup) { - List normalWorkerAddresses = getWorkerServerAddressByGroup(workerGroup) + List dbWorkerAddresses = getDbWorkerServerAddressByGroup(workerGroup) + .stream() + .map(workerMapping::get) + .filter(Objects::nonNull) + .filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL) + .map(WorkerServerMetadata::getAddress) + .collect(Collectors.toList()); + List configWorkerAddresses = getConfigWorkerServerAddressByGroup(workerGroup) .stream() .map(workerMapping::get) .filter(Objects::nonNull) .filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL) .map(WorkerServerMetadata::getAddress) .collect(Collectors.toList()); - return UnmodifiableList.unmodifiableList(normalWorkerAddresses); + dbWorkerAddresses.removeAll(configWorkerAddresses); + dbWorkerAddresses.addAll(configWorkerAddresses); + return UnmodifiableList.unmodifiableList(dbWorkerAddresses); } public boolean containsWorkerGroup(String workerGroup) { return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup) - || workerGroupMapping.containsKey(workerGroup); + || dbWorkerGroupMapping.containsKey(workerGroup) + || configWorkerGroupMapping.containsKey(workerGroup); } @Override @@ -89,9 +109,9 @@ public void registerListener(IClustersChangeListener liste @Override public void onWorkerGroupDelete(List workerGroups) { - synchronized (workerGroupMapping) { + synchronized (dbWorkerGroupMapping) { for (WorkerGroup workerGroup : workerGroups) { - workerGroupMapping.remove(workerGroup.getName()); + dbWorkerGroupMapping.remove(workerGroup.getName()); } } } @@ -112,8 +132,8 @@ public void onWorkerGroupChange(List workerGroups) { .filter(Objects::nonNull) .map(WorkerServerMetadata::getAddress) .collect(Collectors.toList()); - synchronized (workerGroupMapping) { - workerGroupMapping.put(workerGroup.getName(), activeWorkers); + synchronized (dbWorkerGroupMapping) { + dbWorkerGroupMapping.put(workerGroup.getName(), activeWorkers); } } } @@ -130,15 +150,15 @@ WorkerServerMetadata parseServerFromHeartbeat(String serverHeartBeatJson) { @Override public void onServerAdded(WorkerServerMetadata workerServer) { workerMapping.put(workerServer.getAddress(), workerServer); - synchronized (workerGroupMapping) { - List addWorkerGroupAddrList = workerGroupMapping.get(workerServer.getWorkerGroup()); + synchronized (configWorkerGroupMapping) { + List addWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup()); if (addWorkerGroupAddrList == null) { List newWorkerGroupAddrList = new ArrayList<>(); newWorkerGroupAddrList.add(workerServer.getAddress()); - workerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList); + configWorkerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList); } else if (!addWorkerGroupAddrList.contains(workerServer.getAddress())) { addWorkerGroupAddrList.add(workerServer.getAddress()); - workerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList); + configWorkerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList); } } for (IClustersChangeListener listener : workerClusterChangeListeners) { @@ -149,12 +169,12 @@ public void onServerAdded(WorkerServerMetadata workerServer) { @Override public void onServerRemove(WorkerServerMetadata workerServer) { workerMapping.remove(workerServer.getAddress(), workerServer); - synchronized (workerGroupMapping) { - List removeWorkerGroupAddrList = workerGroupMapping.get(workerServer.getWorkerGroup()); + synchronized (configWorkerGroupMapping) { + List removeWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup()); if (removeWorkerGroupAddrList != null && removeWorkerGroupAddrList.contains(workerServer.getAddress())) { removeWorkerGroupAddrList.remove(workerServer.getAddress()); if (removeWorkerGroupAddrList.isEmpty()) { - workerGroupMapping.remove(workerServer.getWorkerGroup()); + configWorkerGroupMapping.remove(workerServer.getWorkerGroup()); } } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java index 045e493d57fe..0be794c49742 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java @@ -40,12 +40,12 @@ void testOnWorkerGroupDelete() { .addrList(normalWorkerServerMetadata.getAddress()) .build(); workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup)); - assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")) + assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")) .containsExactly(normalWorkerServerMetadata.getAddress()); workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup)); Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse(); - assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty(); + assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty(); } @Test @@ -59,7 +59,7 @@ void testOnWorkerGroupAdd() { .addrList(normalWorkerServerMetadata.getAddress()) .build(); workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup)); - assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")) + assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")) .containsExactly(normalWorkerServerMetadata.getAddress()); } @@ -74,7 +74,7 @@ void testOnWorkerGroupChange() { .addrList(normalWorkerServerMetadata.getAddress()) .build(); workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup)); - assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")) + assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")) .containsExactly(normalWorkerServerMetadata.getAddress()); WorkerGroup updatedWorkerGroup = WorkerGroup.builder() @@ -82,7 +82,7 @@ void testOnWorkerGroupChange() { .addrList("") .build(); workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup)); - assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty(); + assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty(); assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue(); } @@ -94,7 +94,7 @@ void testOnServerAdded() { WorkerClusters workerClusters = new WorkerClusters(); workerClusters.onServerAdded(normalWorkerServerMetadata); workerClusters.onServerAdded(busyWorkerServerMetadata); - assertThat(workerClusters.getWorkerServerAddressByGroup("default")) + assertThat(workerClusters.getDbWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress(), busyWorkerServerMetadata.getAddress()); assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress()); @@ -110,7 +110,7 @@ void testOnServerRemove() { workerClusters.onServerAdded(busyWorkerServerMetadata); workerClusters.onServerRemove(busyWorkerServerMetadata); - assertThat(workerClusters.getWorkerServerAddressByGroup("default")) + assertThat(workerClusters.getDbWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress()); assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress()); @@ -137,7 +137,7 @@ void testOnServerUpdate() { workerClusters.onServerUpdate(workerServerMetadata); - assertThat(workerClusters.getWorkerServerAddressByGroup("default")) + assertThat(workerClusters.getDbWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress()); assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default")) .containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress()); From 399771ef9b40e1911dc7eb52c00e64c888c9cd32 Mon Sep 17 00:00:00 2001 From: SbloodyS <460888207@qq.com> Date: Mon, 9 Dec 2024 09:13:21 +0800 Subject: [PATCH 2/4] fix comment --- .../src/main/resources/application.yaml | 2 +- .../server/worker/config/WorkerConfig.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 31896727dfce..4f78b91c0431 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -194,7 +194,7 @@ worker: max-heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 - # worker group name + # worker group name. If it is not set, the default value is default. group: default server-load-protection: enabled: true diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index c92caa79c0f7..49806b111443 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -56,6 +56,13 @@ public class WorkerConfig implements Validator { private PhysicalTaskConfig physicalTaskConfig = new PhysicalTaskConfig(); + public String getGroup() { + if (StringUtils.isEmpty(group)) { + return "default"; + } + return group; + } + @Override public boolean supports(Class clazz) { return WorkerConfig.class.isAssignableFrom(clazz); From 0f05c505a4cb3a7033b4bb28474db1d72ea13b4f Mon Sep 17 00:00:00 2001 From: SbloodyS <460888207@qq.com> Date: Mon, 9 Dec 2024 09:21:41 +0800 Subject: [PATCH 3/4] fix comment --- .../server/worker/config/WorkerConfig.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 49806b111443..8047a7eeb16d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -56,13 +56,6 @@ public class WorkerConfig implements Validator { private PhysicalTaskConfig physicalTaskConfig = new PhysicalTaskConfig(); - public String getGroup() { - if (StringUtils.isEmpty(group)) { - return "default"; - } - return group; - } - @Override public boolean supports(Class clazz) { return WorkerConfig.class.isAssignableFrom(clazz); @@ -80,6 +73,11 @@ public void validate(Object target, Errors errors) { workerConfig.setWorkerRegistryPath( RegistryNodeType.WORKER.getRegistryPath() + "/" + workerConfig.getWorkerAddress()); + + if (StringUtils.isEmpty(group)) { + workerConfig.setGroup("default"); + } + printConfig(); } From 847de7dfd61e1f7dd66101ac9b20543b03cb159b Mon Sep 17 00:00:00 2001 From: SbloodyS <460888207@qq.com> Date: Mon, 9 Dec 2024 09:35:06 +0800 Subject: [PATCH 4/4] fix comment --- .../dolphinscheduler/server/worker/config/WorkerConfig.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 8047a7eeb16d..5f3d3cddf682 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -92,6 +92,7 @@ private void printConfig() { "\n address -> " + workerAddress + "\n registry-path: " + workerRegistryPath + "\n physical-task-config -> " + physicalTaskConfig + + "\n group -> " + group + "\n****************************Worker Configuration**************************************"; log.info(config); }