Skip to content

Commit

Permalink
[DSIP-73] Add dolphinscheduler-task-executor module to unify the task…
Browse files Browse the repository at this point in the history
… execution logic
  • Loading branch information
ruanwenjun committed Nov 20, 2024
1 parent 76a962f commit d1fa92b
Show file tree
Hide file tree
Showing 272 changed files with 5,586 additions and 7,164 deletions.
67 changes: 29 additions & 38 deletions docs/docs/en/architecture/configuration.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/docs/en/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ This document records the incompatible updates between each version. You need to
* Deprecated upgrade code of 1.x and 2.x ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)

60 changes: 29 additions & 31 deletions docs/docs/zh/architecture/configuration.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/docs/zh/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
* 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
*`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* 在worker`application.yaml`中移除exec-threads,使用physical-task-config替代;在master`application.yaml`中移除master-async-task-executor-thread-pool-size使用logic-task-config替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)

Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest;
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -302,11 +302,11 @@ public Result stopTask(User loginUser, long projectCode, Integer taskInstanceId)
}

// todo: we only support streaming task for now
final TaskInstanceKillResponse taskInstanceKillResponse = Clients
.withService(ITaskInstanceOperator.class)
final TaskExecutorKillResponse taskExecutorKillResponse = Clients
.withService(IPhysicalTaskExecutorOperator.class)
.withHost(taskInstance.getHost())
.killTask(new TaskInstanceKillRequest(taskInstanceId));
log.info("TaskInstance kill response: {}", taskInstanceKillResponse);
.killTask(TaskExecutorKillRequest.of(taskInstanceId));
log.info("TaskInstance kill response: {}", taskExecutorKillResponse);

putMsg(result, Status.SUCCESS);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand Down Expand Up @@ -131,11 +129,6 @@ public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLog
return new TaskInstanceLogPageQueryResponse();
}

@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
return new GetAppIdResponse();
}

@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
@Slf4j
public class ThreadUtils {

/**
* Create a daemon fixed thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadNameFormat, int threadsNum) {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}
Expand All @@ -43,9 +49,10 @@ public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(S
* Create a daemon scheduler thread pool, the thread name will be formatted with the given name.
*
* @param threadNameFormat the thread name format, e.g. "DemonThread-%d"
* @param threadsNum the number of threads in the pool
* @param threadsNum the number of threads in the pool
*/
public static ScheduledExecutorService newDaemonScheduledExecutorService(String threadNameFormat, int threadsNum) {
public static ScheduledExecutorService newDaemonScheduledExecutorService(final String threadNameFormat,
final int threadsNum) {
return Executors.newScheduledThreadPool(threadsNum, newDaemonThreadFactory(threadNameFormat));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,45 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.Builder;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

/**
* The abstract class of delay event, the event will be triggered after the delay time.
* <p> You can extend this class to implement your own delay event.
*/
@ToString
@SuperBuilder
public abstract class AbstractDelayEvent implements IEvent, Delayed {

protected long delayTime;

protected long triggerTimeInMillis;
@Builder.Default
protected long createTimeInNano = System.nanoTime();

public AbstractDelayEvent() {
this(0);
}

public AbstractDelayEvent(long delayTime) {
if (delayTime == 0) {
this.triggerTimeInMillis = System.currentTimeMillis();
} else {
this.triggerTimeInMillis = System.currentTimeMillis() + delayTime;
}
public AbstractDelayEvent(final long delayTime) {
this(delayTime, System.nanoTime());
}

public AbstractDelayEvent(final long delayTime, final long createTimeInNano) {
this.delayTime = delayTime;
this.createTimeInNano = createTimeInNano;
}

@Override
public long getDelay(TimeUnit unit) {
long delay = triggerTimeInMillis - System.currentTimeMillis();
return unit.convert(delay, TimeUnit.MILLISECONDS);
long delay = createTimeInNano + delayTime * 1_000_000 - System.nanoTime();
return unit.convert(delay, TimeUnit.NANOSECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.triggerTimeInMillis, ((AbstractDelayEvent) other).triggerTimeInMillis);
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public Optional<T> poll() {
return Optional.ofNullable(delayEventQueue.poll());
}

@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}

@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.remove());
}

@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ public interface IEventBus<T extends IEvent> {
*/
Optional<T> poll() throws InterruptedException;

/**
* peek the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> peek();

/**
* Remove the head event from the bus. This method will not block if the event bus is empty will return empty optional.
*/
Optional<T> remove();

/**
* Whether the bus is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,28 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,19 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.TimeZone;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

/**
* json serialize or deserialize
*/
@Slf4j
public class JsonSerializer {

Expand All @@ -60,13 +57,6 @@ private JsonSerializer() {

}

/**
* serialize to byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
if (obj == null) {
return null;
Expand All @@ -79,44 +69,14 @@ public static <T> byte[] serialize(T obj) {
}
}

/**
* serialize to string
*
* @param obj object
* @param <T> object type
* @return string
*/
public static <T> String serializeToString(T obj) {
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("serializeToString exception!", e);
}

return json;
}

/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
@SneakyThrows
public static <T> T deserialize(byte[] src, Class<T> clazz) {
if (src == null) {
return null;
}

String json = new String(src, StandardCharsets.UTF_8);
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
log.error("deserialize exception!", e);
return null;
}
return objectMapper.readValue(json, clazz);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class NettyRemotingServer {
this.serverConfig = serverConfig;
this.serverName = serverConfig.getServerName();
this.methodInvokerExecutor = ThreadUtils.newDaemonFixedThreadExecutor(
serverName + "MethodInvoker-%d", Runtime.getRuntime().availableProcessors() * 2 + 1);
serverName + "-methodInvoker-%d", Runtime.getRuntime().availableProcessors() * 2 + 1);
this.channelHandler = new JdkDynamicServerHandler(methodInvokerExecutor);
ThreadFactory bossThreadFactory =
ThreadUtils.newDaemonThreadFactory(serverName + "BossThread-%d");
ThreadUtils.newDaemonThreadFactory(serverName + "-boss-%d");
ThreadFactory workerThreadFactory =
ThreadUtils.newDaemonThreadFactory(serverName + "WorkerThread-%d");
ThreadUtils.newDaemonThreadFactory(serverName + "-worker-%d");
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
<version>dev-SNAPSHOT</version>
</parent>

<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-common</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
Expand All @@ -35,9 +33,6 @@ public interface ILogService {
@RpcMethod
TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest);

@RpcMethod
GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest);

@RpcMethod
void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);

Expand Down
12 changes: 9 additions & 3 deletions dolphinscheduler-extract/dolphinscheduler-extract-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,27 @@
<artifactId>dolphinscheduler-extract-master</artifactId>

<dependencies>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-extract-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-executor</artifactId>
</dependency>

</dependencies>

</project>
Loading

0 comments on commit d1fa92b

Please sign in to comment.