Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16880] Merge worker group from config and ui and distinct display it in api #16883

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Map<String, Object> queryAssignedWorkerGroupsByProject(User loginUser, Lo
projectWorkerGroup.setProjectCode(projectCode);
projectWorkerGroup.setWorkerGroup(workerGroup);
return projectWorkerGroup;
}).collect(Collectors.toList());
}).distinct().collect(Collectors.toList());

result.put(Constants.DATA_LIST, projectWorkerGroups);
putMsg(result, Status.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public Map<String, Object> queryAllGroup(User loginUser) {
.map(WorkerGroup::getName)
.collect(Collectors.toList());
availableWorkerGroupList.addAll(configWorkerGroupNames);
result.put(Constants.DATA_LIST, availableWorkerGroupList);
result.put(Constants.DATA_LIST, availableWorkerGroupList.stream().distinct().collect(Collectors.toList()));
putMsg(result, Status.SUCCESS);
return result;
}
Expand Down Expand Up @@ -362,6 +362,7 @@ public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
}

workerGroupDao.deleteById(id);
boardCastToMasterThatWorkerGroupChanged();

log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServe
// WorkerIdentifier(workerAddress) -> worker
private final Map<String, WorkerServerMetadata> workerMapping = new ConcurrentHashMap<>();

// WorkerGroup -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> workerGroupMapping = new ConcurrentHashMap<>();
// WorkerGroup from db -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> dbWorkerGroupMapping = new ConcurrentHashMap<>();

// WorkerGroup from config -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> configWorkerGroupMapping = new ConcurrentHashMap<>();

private final List<IClustersChangeListener<WorkerServerMetadata>> workerClusterChangeListeners =
new CopyOnWriteArrayList<>();
Expand All @@ -59,27 +62,44 @@ public Optional<WorkerServerMetadata> getServer(final String address) {
return Optional.ofNullable(workerMapping.get(address));
}

public List<String> getWorkerServerAddressByGroup(String workerGroup) {
public List<String> getDbWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
}
SbloodyS marked this conversation as resolved.
Show resolved Hide resolved
return dbWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
}

public List<String> getConfigWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
}
return workerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
return configWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
}

public List<String> getNormalWorkerServerAddressByGroup(String workerGroup) {
List<String> normalWorkerAddresses = getWorkerServerAddressByGroup(workerGroup)
List<String> dbWorkerAddresses = getDbWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
List<String> configWorkerAddresses = getConfigWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
return UnmodifiableList.unmodifiableList(normalWorkerAddresses);
dbWorkerAddresses.removeAll(configWorkerAddresses);
dbWorkerAddresses.addAll(configWorkerAddresses);
return UnmodifiableList.unmodifiableList(dbWorkerAddresses);
}

public boolean containsWorkerGroup(String workerGroup) {
return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)
|| workerGroupMapping.containsKey(workerGroup);
|| dbWorkerGroupMapping.containsKey(workerGroup)
|| configWorkerGroupMapping.containsKey(workerGroup);
}

@Override
Expand All @@ -89,9 +109,9 @@ public void registerListener(IClustersChangeListener<WorkerServerMetadata> liste

@Override
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
synchronized (workerGroupMapping) {
synchronized (dbWorkerGroupMapping) {
for (WorkerGroup workerGroup : workerGroups) {
workerGroupMapping.remove(workerGroup.getName());
dbWorkerGroupMapping.remove(workerGroup.getName());
}
}
}
Expand All @@ -112,8 +132,8 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
.filter(Objects::nonNull)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
synchronized (workerGroupMapping) {
workerGroupMapping.put(workerGroup.getName(), activeWorkers);
synchronized (dbWorkerGroupMapping) {
dbWorkerGroupMapping.put(workerGroup.getName(), activeWorkers);
}
}
}
Expand All @@ -130,15 +150,15 @@ WorkerServerMetadata parseServerFromHeartbeat(String serverHeartBeatJson) {
@Override
public void onServerAdded(WorkerServerMetadata workerServer) {
workerMapping.put(workerServer.getAddress(), workerServer);
synchronized (workerGroupMapping) {
List<String> addWorkerGroupAddrList = workerGroupMapping.get(workerServer.getWorkerGroup());
synchronized (configWorkerGroupMapping) {
List<String> addWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (addWorkerGroupAddrList == null) {
List<String> newWorkerGroupAddrList = new ArrayList<>();
newWorkerGroupAddrList.add(workerServer.getAddress());
workerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList);
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList);
} else if (!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
addWorkerGroupAddrList.add(workerServer.getAddress());
workerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList);
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList);
}
}
for (IClustersChangeListener<WorkerServerMetadata> listener : workerClusterChangeListeners) {
Expand All @@ -149,12 +169,12 @@ public void onServerAdded(WorkerServerMetadata workerServer) {
@Override
public void onServerRemove(WorkerServerMetadata workerServer) {
workerMapping.remove(workerServer.getAddress(), workerServer);
synchronized (workerGroupMapping) {
List<String> removeWorkerGroupAddrList = workerGroupMapping.get(workerServer.getWorkerGroup());
synchronized (configWorkerGroupMapping) {
List<String> removeWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
SbloodyS marked this conversation as resolved.
Show resolved Hide resolved
if (removeWorkerGroupAddrList != null && removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
removeWorkerGroupAddrList.remove(workerServer.getAddress());
if (removeWorkerGroupAddrList.isEmpty()) {
workerGroupMapping.remove(workerServer.getWorkerGroup());
configWorkerGroupMapping.remove(workerServer.getWorkerGroup());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ void testOnWorkerGroupDelete() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());

workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup));
Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse();
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
}

@Test
Expand All @@ -59,7 +59,7 @@ void testOnWorkerGroupAdd() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());
}

Expand All @@ -74,15 +74,15 @@ void testOnWorkerGroupChange() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());

WorkerGroup updatedWorkerGroup = WorkerGroup.builder()
.name("flinkCluster")
.addrList("")
.build();
workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue();
}

Expand All @@ -94,7 +94,7 @@ void testOnServerAdded() {
WorkerClusters workerClusters = new WorkerClusters();
workerClusters.onServerAdded(normalWorkerServerMetadata);
workerClusters.onServerAdded(busyWorkerServerMetadata);
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), busyWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
Expand All @@ -110,7 +110,7 @@ void testOnServerRemove() {
workerClusters.onServerAdded(busyWorkerServerMetadata);
workerClusters.onServerRemove(busyWorkerServerMetadata);

assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
Expand All @@ -137,7 +137,7 @@ void testOnServerUpdate() {

workerClusters.onServerUpdate(workerServerMetadata);

assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker group name
# worker group name. If it is not set, the default value is default.
group: default
server-load-protection:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void validate(Object target, Errors errors) {

workerConfig.setWorkerRegistryPath(
RegistryNodeType.WORKER.getRegistryPath() + "/" + workerConfig.getWorkerAddress());

if (StringUtils.isEmpty(group)) {
workerConfig.setGroup("default");
}

printConfig();
}

Expand All @@ -87,6 +92,7 @@ private void printConfig() {
"\n address -> " + workerAddress +
"\n registry-path: " + workerRegistryPath +
"\n physical-task-config -> " + physicalTaskConfig +
"\n group -> " + group +
"\n****************************Worker Configuration**************************************";
log.info(config);
}
Expand Down
Loading