Skip to content

Commit

Permalink
Merge branch 'dev' into dsip_72
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS authored Dec 5, 2024
2 parents afef203 + 84f6a0f commit 0f12131
Show file tree
Hide file tree
Showing 22 changed files with 271 additions and 214 deletions.
10 changes: 1 addition & 9 deletions docs/docs/en/architecture/load-balance.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,7 @@ Each worker has two weights parameters, weight (which remains constant after war

### Linear Weighting (Default Algorithm)

This algorithm reports its own load information to the registry at regular intervals. Make decision on two main pieces of information:

- load average (default is the number of CPU cores * 2)
- available physical memory (default is 0.3, in G)
This algorithm reports its own load information to the registry at regular intervals. We mainly judge by CPU usage, memory usage and worker slot usage.

If either of these is lower than the configured item, then this worker will not participate in the load. (no traffic will be allocated)

You can customise the configuration by changing the following properties in worker.properties

- worker.max.cpu.load.avg=-1 (worker max cpu load avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2)
- worker.reserved.memory=0.3 (worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is percentage)

1 change: 1 addition & 0 deletions docs/docs/en/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ This document records the incompatible updates between each version. You need to
* Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
* Drop unused column `other_params_json` in `t_ds_worker_group` ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
* Remove the `Dynamic` from the `Task Plugin` ([#16482])(https://github.com/apache/dolphinscheduler/pull/16842)

12 changes: 2 additions & 10 deletions docs/docs/zh/architecture/load-balance.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,7 @@ eg:master.host.selector=random(不区分大小写)

#### 线性加权(默认算法)

该算法每隔一段时间会向注册中心上报自己的负载信息。我们主要根据两个信息来进行判断
该算法每隔一段时间会向注册中心上报自己的负载信息。我们主要根据CPU使用率、内存使用率以及 worker slot 使用情况来进行判断

* load 平均值(默认是 CPU 核数 *2)
* 可用物理内存(默认是 0.3,单位是 G)

如果两者任何一个低于配置项,那么这台 worker 将不参与负载。(即不分配流量)

你可以在 worker.properties 修改下面的属性来自定义配置

* worker.max.cpu.load.avg=-1 (worker最大cpu load均值,只有高于系统cpu load均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2)
* worker.reserved.memory=0.3 (worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为百分比)
如果任何一个低于配置项,那么这台 worker 将不参与负载。(即不分配流量)

3 changes: 2 additions & 1 deletion docs/docs/zh/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
*`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* 在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
*`worker``application.yaml` 中移除 `exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
*`t_ds_worker_group` 表中移除 无用的 `other_params_json` 字段 ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
*`任务插件` 中移除 `Dynamic` 类型 ([#16482])(https://github.com/apache/dolphinscheduler/pull/16842)

Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ public static void cleanup() {
@Test
@Order(1)
public void testSaveWorkerGroup() {
HttpResponse saveWorkerGroupHttpResponse = workerGroupPage
.saveWorkerGroup(loginUser, 1, "test_worker_group", "10.5.0.5:1234", "test", null);
HttpResponse saveWorkerGroupHttpResponse = workerGroupPage.saveWorkerGroup(
loginUser,
0,
"test_worker_group",
"10.5.0.5:1234",
"test");
Assertions.assertTrue(saveWorkerGroupHttpResponse.getBody().getSuccess());

HttpResponse queryAllWorkerGroupsResponse = workerGroupPage.queryAllWorkerGroups(loginUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ public class WorkerGroupPage {

private String sessionId;

public HttpResponse saveWorkerGroup(User loginUser, int id, String name, String addrList, String description,
String otherParamsJson) {
public HttpResponse saveWorkerGroup(User loginUser, int id, String name, String addrList, String description) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
params.put("id", id);
params.put("name", name);
params.put("addrList", addrList);
params.put("description", description);
params.put("otherParamsJson", otherParamsJson);

Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;

import java.util.Map;
Expand All @@ -51,16 +52,13 @@
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;

/**
* worker group controller
*/
@Tag(name = "WORKER_GROUP_TAG")
@RestController
@RequestMapping("/worker-groups")
public class WorkerGroupController extends BaseController {

@Autowired
WorkerGroupService workerGroupService;
private WorkerGroupService workerGroupService;

/**
* create or update a worker group
Expand All @@ -77,21 +75,18 @@ public class WorkerGroupController extends BaseController {
@Parameter(name = "name", description = "WORKER_GROUP_NAME", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "addrList", description = "WORKER_ADDR_LIST", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "description", description = "WORKER_DESC", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "otherParamsJson", description = "WORKER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class)),
})
@PostMapping()
@ResponseStatus(HttpStatus.OK)
@ApiException(SAVE_ERROR)
@OperatorLog(auditType = AuditType.WORKER_GROUP_CREATE)
public Result saveWorkerGroup(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "addrList") String addrList,
@RequestParam(value = "description", required = false, defaultValue = "") String description,
@RequestParam(value = "otherParamsJson", required = false, defaultValue = "") String otherParamsJson) {
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson);
return returnDataList(result);
public Result<WorkerGroup> saveWorkerGroup(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "addrList") String addrList,
@RequestParam(value = "description", required = false, defaultValue = "") String description) {
final WorkerGroup workerGroup = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description);
return Result.success(workerGroup);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;

import java.util.List;
import java.util.Map;
Expand All @@ -33,11 +34,9 @@ public interface WorkerGroupService {
* @param name worker group name
* @param addrList addr list
* @param description description
* @param otherParamsJson otherParamsJson
* @return create or update result code
*/
Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description,
String otherParamsJson);
WorkerGroup saveWorkerGroup(User loginUser, int id, String name, String addrList, String description);

/**
* Query worker group paging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.Schedule;
Expand All @@ -42,9 +44,10 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -55,14 +58,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -85,9 +87,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper;

@Autowired
private ProcessService processService;

@Autowired
private ScheduleMapper scheduleMapper;

Expand All @@ -107,89 +106,53 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return create or update result code
*/
@Override
@Transactional
public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description,
String otherParamsJson) {
public WorkerGroup saveWorkerGroup(User loginUser,
int id,
String name,
String addrList,
String description) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser, null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
// todo: add permission exception
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if (StringUtils.isEmpty(name)) {
log.warn("Parameter name can ot be null.");
putMsg(result, Status.NAME_NULL);
return result;
throw new ServiceException(Status.NAME_NULL);
}
Date now = new Date();
WorkerGroup workerGroup = null;
if (id != 0) {
workerGroup = workerGroupMapper.selectById(id);
if (Objects.nonNull(workerGroup) && !workerGroup.getName().equals(name)) {
if (checkWorkerGroupDependencies(workerGroup, result)) {
return result;
checkWorkerGroupAddrList(addrList);
final Date now = new Date();
final WorkerGroup workerGroup;
try {
if (id == 0) {
// insert
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
workerGroupMapper.insert(workerGroup);
} else {
workerGroup = workerGroupMapper.selectById(id);
if (workerGroup == null) {
throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST, id);
}
// todo: Can we update the worker name?
if (!workerGroup.getName().equals(name)) {
checkWorkerGroupDependencies(workerGroup, result);
}
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
workerGroupMapper.updateById(workerGroup);
log.info("Update worker group: {} success .", workerGroup);
}
boardCastToMasterThatWorkerGroupChanged();
return workerGroup;
} catch (DuplicateKeyException duplicateKeyException) {
throw new ServiceException(Status.NAME_EXIST, name);
}
if (workerGroup == null) {
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}

workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);

if (checkWorkerGroupNameExists(workerGroup)) {
log.warn("Worker group with the same name already exists, name:{}.", workerGroup.getName());
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
String invalidAddr = checkWorkerGroupAddrList(workerGroup);
if (invalidAddr != null) {
log.warn("Worker group address is invalid, invalidAddr:{}.", invalidAddr);
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}

handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
log.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, workerGroup);
return result;
}

protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser,
String otherParamsJson) {
if (workerGroup.getId() != null) {
workerGroupMapper.updateById(workerGroup);
} else {
workerGroupMapper.insert(workerGroup);
}
}

/**
* check worker group name exists
*
* @param workerGroup worker group
* @return boolean
*/
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
// check database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
if (CollectionUtils.isNotEmpty(workerGroupList)) {
// create group, the same group name exists in the database
if (workerGroup.getId() == null) {
return true;
}
// update group, the database exists with the same group name except itself
Optional<WorkerGroup> sameNameWorkGroupOptional = workerGroupList.stream()
.filter(group -> !Objects.equals(group.getId(), workerGroup.getId())).findFirst();
if (sameNameWorkGroupOptional.isPresent()) {
return true;
}
}
return false;
}

/**
Expand Down Expand Up @@ -240,23 +203,16 @@ private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String
return false;
}

/**
* check worker group addr list
*
* @param workerGroup worker group
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
private void checkWorkerGroupAddrList(String workerGroupAddress) {
if (Strings.isNullOrEmpty(workerGroupAddress)) {
return;
}
Map<String, String> serverMaps = registryClient.getServerMaps(RegistryNodeType.WORKER);
for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
for (String addr : workerGroupAddress.split(Constants.COMMA)) {
if (!serverMaps.containsKey(addr)) {
return addr;
throw new ServiceException(Status.WORKER_ADDRESS_INVALID);
}
}
return null;
}

/**
Expand Down Expand Up @@ -438,4 +394,20 @@ public Map<Long, String> queryWorkerGroupByWorkflowDefinitionCodes(List<Long> wo
Schedule::getWorkerGroup));
}

private void boardCastToMasterThatWorkerGroupChanged() {
final List<Server> masters = registryClient.getServerList(RegistryNodeType.MASTER);
if (CollectionUtils.isEmpty(masters)) {
return;
}
for (Server master : masters) {
try {
Clients.withService(IMasterContainerService.class)
.withHost(master.getHost() + ":" + master.getPort())
.refreshWorkerGroup();
} catch (Exception e) {
log.error("Broadcast to master: {} that worker group changed failed", master, e);
}
}
}

}
Loading

0 comments on commit 0f12131

Please sign in to comment.