Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-16834] Start worker node with worker group label #16875

Merged
merged 17 commits into from
Dec 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testDeleteWorkerGroupById() {
Assertions.assertTrue(queryAllWorkerGroupsResponse.getBody().getSuccess());
Assertions.assertTrue(workerGroupsBeforeDelete.contains("test_worker_group"));

HttpResponse deleteWorkerGroupResponse = workerGroupPage.deleteWorkerGroupById(loginUser, 1);
HttpResponse deleteWorkerGroupResponse = workerGroupPage.deleteWorkerGroupById(loginUser, 2);
Assertions.assertTrue(deleteWorkerGroupResponse.getBody().getSuccess());

queryAllWorkerGroupsResponse = workerGroupPage.queryAllWorkerGroups(loginUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker group name
group: default
server-load-protection:
enabled: true
# Worker max system cpu usage, when the worker's system cpu usage is smaller then this value, worker server can be dispatched tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import java.util.List;
import java.util.Map;

/**
* the service of project and worker group
*/
public interface ProjectWorkerGroupRelationService {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.repository.ProjectWorkerGroupDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -53,26 +53,18 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;

/**
* task definition service impl
*/
@Service
@Slf4j
public class ProjectWorkerGroupRelationServiceImpl extends BaseServiceImpl
implements
ProjectWorkerGroupRelationService {

@Autowired
private ProjectWorkerGroupMapper projectWorkerGroupMapper;
private ProjectWorkerGroupDao projectWorkerGroupDao;

@Autowired
private ProjectMapper projectMapper;

@Autowired
private WorkerGroupMapper workerGroupMapper;

@Autowired
private TaskDefinitionMapper taskDefinitionMapper;

Expand All @@ -82,6 +74,9 @@ public class ProjectWorkerGroupRelationServiceImpl extends BaseServiceImpl
@Autowired
private ProjectService projectService;

@Autowired
private WorkerGroupDao workerGroupDao;

/**
* assign worker groups to a project
*
Expand All @@ -105,7 +100,12 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
}

if (CollectionUtils.isEmpty(workerGroups)) {
putMsg(result, Status.WORKER_GROUP_TO_PROJECT_IS_EMPTY);
boolean deleted = projectWorkerGroupDao.deleteByProjectCode(projectCode);
if (deleted) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR);
}
return result;
}

Expand All @@ -115,9 +115,7 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
return result;
}

Set<String> workerGroupNames =
workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect(
Collectors.toSet());
Set<String> workerGroupNames = new HashSet<>(workerGroupDao.queryAllWorkerGroupNames());

workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup());

Expand All @@ -130,12 +128,8 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
return result;
}

Set<String> projectWorkerGroupNames = projectWorkerGroupMapper.selectList(new QueryWrapper<ProjectWorkerGroup>()
.lambda()
.eq(ProjectWorkerGroup::getProjectCode, projectCode))
.stream()
.map(ProjectWorkerGroup::getWorkerGroup)
.collect(Collectors.toSet());
Set<String> projectWorkerGroupNames =
projectWorkerGroupDao.queryAssignedWorkerGroupNamesByProjectCode(projectCode);

difference = SetUtils.difference(projectWorkerGroupNames, assignedWorkerGroupNames);

Expand All @@ -147,10 +141,9 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
SetUtils.intersection(usedWorkerGroups, difference).toSet());
}

int deleted = projectWorkerGroupMapper.delete(
new QueryWrapper<ProjectWorkerGroup>().lambda().eq(ProjectWorkerGroup::getProjectCode, projectCode)
.in(ProjectWorkerGroup::getWorkerGroup, difference));
if (deleted > 0) {
boolean deleted =
projectWorkerGroupDao.deleteByProjectCodeAndWorkerGroups(projectCode, new ArrayList<>(difference));
if (deleted) {
log.info("Success to delete worker groups [{}] for the project [{}] .", difference, project.getName());
} else {
log.error("Failed to delete worker groups [{}] for the project [{}].", difference, project.getName());
Expand All @@ -161,13 +154,13 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
difference = SetUtils.difference(assignedWorkerGroupNames, projectWorkerGroupNames);
Date now = new Date();
if (CollectionUtils.isNotEmpty(difference)) {
difference.stream().forEach(workerGroupName -> {
difference.forEach(workerGroupName -> {
ProjectWorkerGroup projectWorkerGroup = new ProjectWorkerGroup();
projectWorkerGroup.setProjectCode(projectCode);
projectWorkerGroup.setWorkerGroup(workerGroupName);
projectWorkerGroup.setCreateTime(now);
projectWorkerGroup.setUpdateTime(now);
int create = projectWorkerGroupMapper.insert(projectWorkerGroup);
int create = projectWorkerGroupDao.insert(projectWorkerGroup);
if (create > 0) {
log.info("Success to add worker group [{}] for the project [{}] .", workerGroupName,
project.getName());
Expand Down Expand Up @@ -201,9 +194,8 @@ public Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long proje

Set<String> assignedWorkerGroups = getAllUsedWorkerGroups(project);

projectWorkerGroupMapper.selectList(
new QueryWrapper<ProjectWorkerGroup>().lambda().eq(ProjectWorkerGroup::getProjectCode, projectCode))
.stream().forEach(projectWorkerGroup -> assignedWorkerGroups.add(projectWorkerGroup.getWorkerGroup()));
projectWorkerGroupDao.queryByProjectCode(projectCode)
.forEach(projectWorkerGroup -> assignedWorkerGroups.add(projectWorkerGroup.getWorkerGroup()));

List<ProjectWorkerGroup> projectWorkerGroups = assignedWorkerGroups.stream().map(workerGroup -> {
ProjectWorkerGroup projectWorkerGroup = new ProjectWorkerGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkerGroupSource;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand All @@ -40,10 +41,9 @@
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
Expand Down Expand Up @@ -76,7 +76,7 @@
public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGroupService {

@Autowired
private WorkerGroupMapper workerGroupMapper;
private WorkerGroupDao workerGroupDao;

@Autowired
private WorkflowInstanceMapper workflowInstanceMapper;
Expand Down Expand Up @@ -129,11 +129,12 @@ public WorkerGroup saveWorkerGroup(User loginUser,
workerGroup.setCreateTime(now);
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setSource(WorkerGroupSource.UI);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
workerGroupMapper.insert(workerGroup);
workerGroupDao.insert(workerGroup);
} else {
workerGroup = workerGroupMapper.selectById(id);
workerGroup = workerGroupDao.queryById(id);
if (workerGroup == null) {
throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST, id);
}
Expand All @@ -145,7 +146,7 @@ public WorkerGroup saveWorkerGroup(User loginUser,
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
workerGroupMapper.updateById(workerGroup);
workerGroupDao.updateById(workerGroup);
log.info("Update worker group: {} success .", workerGroup);
}
boardCastToMasterThatWorkerGroupChanged();
Expand Down Expand Up @@ -298,33 +299,19 @@ public Map<String, Object> queryAllGroup(User loginUser) {
return result;
}

/**
* get worker groups
*
* @return WorkerGroup list
*/
private List<WorkerGroup> getWorkerGroups(List<Integer> ids) {
// worker groups from database
List<WorkerGroup> workerGroups;
if (ids != null) {
workerGroups = ids.isEmpty() ? new ArrayList<>() : workerGroupMapper.selectBatchIds(ids);
workerGroups = ids.isEmpty() ? new ArrayList<>() : workerGroupDao.queryByIds(ids);
} else {
workerGroups = workerGroupMapper.queryAllWorkerGroup();
workerGroups = workerGroupDao.queryAllWorkerGroup();
}
boolean containDefaultWorkerGroups = workerGroups.stream()
.anyMatch(workerGroup -> WorkerGroupUtils.isWorkerGroupEmpty(workerGroup.getName()));
if (!containDefaultWorkerGroups) {
// there doesn't exist a default WorkerGroup, we will add all worker to the default worker group.
Set<String> activeWorkerNodes = registryClient.getServerNodeSet(RegistryNodeType.WORKER);
WorkerGroup defaultWorkerGroup = new WorkerGroup();
defaultWorkerGroup.setName(WorkerGroupUtils.getDefaultWorkerGroup());
defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes));
defaultWorkerGroup.setCreateTime(new Date());
defaultWorkerGroup.setUpdateTime(new Date());
defaultWorkerGroup.setSystemDefault(true);
workerGroups.add(defaultWorkerGroup);
}

workerGroups.forEach(workerGroup -> {
if (workerGroup.getSource().equals(WorkerGroupSource.CONFIG)) {
workerGroup.setSystemDefault(true);
}
});
return workerGroups;
}

Expand All @@ -342,7 +329,7 @@ public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
WorkerGroup workerGroup = workerGroupMapper.selectById(id);
WorkerGroup workerGroup = workerGroupDao.queryById(id);
if (workerGroup == null) {
log.error("Worker group does not exist, workerGroupId:{}.", id);
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
Expand All @@ -365,7 +352,7 @@ public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
return result;
}

workerGroupMapper.deleteById(id);
workerGroupDao.deleteById(id);

log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
Expand Down Expand Up @@ -402,7 +389,7 @@ private void boardCastToMasterThatWorkerGroupChanged() {
for (Server master : masters) {
try {
Clients.withService(IMasterContainerService.class)
.withHost(master.getHost() + ":" + master.getPort())
.withHost(master.getHost() + Constants.COLON + master.getPort())
.refreshWorkerGroup();
} catch (Exception e) {
log.error("Broadcast to master: {} that worker group changed failed", master, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private List<Server> getServerList() {
Server server = new Server();
server.setId(1);
server.setHost("127.0.0.1");
server.setZkDirectory("ws/server");
server.setServerDirectory("ws/server");
server.setPort(123);
server.setCreateTime(new Date());
server.setLastHeartbeatTime(new Date());
Expand Down
Loading
Loading