Skip to content

Commit

Permalink
[DSIP-73] Add dolphinscheduler-task-executor module to unify the task…
Browse files Browse the repository at this point in the history
… execution logic
  • Loading branch information
ruanwenjun committed Nov 26, 2024
1 parent 94e39c6 commit c053789
Show file tree
Hide file tree
Showing 289 changed files with 5,874 additions and 7,409 deletions.
41 changes: 16 additions & 25 deletions docs/docs/en/architecture/configuration.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/docs/en/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ This document records the incompatible updates between each version. You need to
* Deprecated upgrade code of 1.x and 2.x ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)

34 changes: 16 additions & 18 deletions docs/docs/zh/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,24 +303,22 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId

位置:`worker-server/conf/application.yaml`

| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------|
| worker.listen-port | 1234 | worker监听端口 |
| worker.exec-threads | 100 | worker工作线程数量,用于限制并行的任务实例数量 |
| worker.max-heartbeat-interval | 10s | worker最大心跳间隔 |
| worker.host-weight | 100 | 派发任务时,worker主机的权重 |
| worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
| worker.server-load-protection.enabled | true | 是否开启系统保护策略 |
| worker.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统CPU |
| worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的JVM CPU |
| worker.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统内存 |
| worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| worker.alert-listen-host | localhost | alert监听host |
| worker.alert-listen-port | 50052 | alert监听端口 |
| worker.registry-disconnect-strategy.max-waiting-time | 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
| worker.task-execute-threads-full-policy | REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务 |
| worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
| worker.tenant-config.default-tenant-enabled | false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 |
| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|-----------|-----------------------------------------------------------------------------------------|
| worker.listen-port | 1234 | worker监听端口 |
| worker.max-heartbeat-interval | 10s | worker最大心跳间隔 |
| worker.host-weight | 100 | 派发任务时,worker主机的权重 |
| worker.tenant-auto-create | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
| worker.server-load-protection.enabled | true | 是否开启系统保护策略 |
| worker.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | worker最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统CPU |
| worker.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | worker最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的JVM CPU |
| worker.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | worker最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统内存 |
| worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| worker.alert-listen-host | localhost | alert监听host |
| worker.alert-listen-port | 50052 | alert监听端口 |
| worker.physical-task-config.task-executor-thread-size | 100 | Worker中任务最大并发度 |
| worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
| worker.tenant-config.default-tenant-enabled | false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 |

## Alert Server相关配置

Expand Down
1 change: 1 addition & 0 deletions docs/docs/zh/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
* 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
*`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* 在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)

Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -302,11 +302,11 @@ public Result stopTask(User loginUser, long projectCode, Integer taskInstanceId)
}

// todo: we only support streaming task for now
final TaskInstanceKillResponse taskInstanceKillResponse = Clients
.withService(ITaskInstanceOperator.class)
final TaskExecutorKillResponse taskExecutorKillResponse = Clients
.withService(IPhysicalTaskExecutorOperator.class)
.withHost(taskInstance.getHost())
.killTask(new TaskInstanceKillRequest(taskInstanceId));
log.info("TaskInstance kill response: {}", taskInstanceKillResponse);
.killTask(TaskExecutorKillRequest.of(taskInstanceId));
log.info("TaskInstance kill response: {}", taskExecutorKillResponse);

putMsg(result, Status.SUCCESS);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand Down Expand Up @@ -131,11 +129,6 @@ public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLog
return new TaskInstanceLogPageQueryResponse();
}

@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
return new GetAppIdResponse();
}

@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
@Slf4j
public class ThreadUtils {

/**
* Create a daemon fixed thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadNameFormat, int threadsNum) {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}
Expand All @@ -43,9 +49,10 @@ public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(S
* Create a daemon scheduler thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
* @param threadsNum the number of threads in the pool
*/
public static ScheduledExecutorService newDaemonScheduledExecutorService(String threadNameFormat, int threadsNum) {
public static ScheduledExecutorService newDaemonScheduledExecutorService(final String threadNameFormat,
final int threadsNum) {
return Executors.newScheduledThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,47 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.Builder;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* The abstract class of delay event, the event will be triggered after the delay time.
* <p> You can extend this class to implement your own delay event.
*/
@ToString
@SuperBuilder
public abstract class AbstractDelayEvent implements IEvent, Delayed {

private static final long DEFAULT_DELAY_TIME = 0;

protected long delayTime;

protected long triggerTimeInMillis;
@Builder.Default
protected long createTimeInNano = System.nanoTime();

public AbstractDelayEvent() {
this(0);
this(DEFAULT_DELAY_TIME);
}

public AbstractDelayEvent(final long delayTime) {
this(delayTime, System.nanoTime());
}

public AbstractDelayEvent(long delayTime) {
if (delayTime == 0) {
this.triggerTimeInMillis = System.currentTimeMillis();
} else {
this.triggerTimeInMillis = System.currentTimeMillis() + delayTime;
}
public AbstractDelayEvent(final long delayTime, final long createTimeInNano) {
this.delayTime = delayTime;
this.createTimeInNano = createTimeInNano;
}

@Override
public long getDelay(TimeUnit unit) {
long delay = triggerTimeInMillis - System.currentTimeMillis();
return unit.convert(delay, TimeUnit.MILLISECONDS);
long delay = createTimeInNano + delayTime * 1_000_000 - System.nanoTime();
return unit.convert(delay, TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.triggerTimeInMillis, ((AbstractDelayEvent) other).triggerTimeInMillis);
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public Optional<T> poll() {
return Optional.ofNullable(delayEventQueue.poll());
}

@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}

@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.remove());
}

@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ public interface IEventBus<T extends IEvent> {
*/
Optional<T> poll() throws InterruptedException;

/**
* peek the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> peek();

/**
* Remove the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> remove();

/**
* Whether the bus is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,28 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand Down
Loading

0 comments on commit c053789

Please sign in to comment.