From 23b4ee1cdc307f6c67b6697df3599b6e1ef71762 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 11 Dec 2024 23:13:10 +0800 Subject: [PATCH] [DSIP-87] Remove cache configuration of task --- docs/docs/en/faq.md | 16 -- docs/docs/en/guide/task/appendix.md | 1 - .../test/resources/workflow-json/test.json | 1 - .../controller/TaskInstanceController.java | 23 -- .../TaskInstanceRemoveCacheResponse.java | 44 ---- .../dolphinscheduler/api/enums/Status.java | 3 - .../api/service/TaskInstanceService.java | 10 - .../service/impl/TaskInstanceServiceImpl.java | 29 --- .../api/service/TaskInstanceServiceTest.java | 26 --- .../dolphinscheduler/common/enums/Flag.java | 1 - .../src/test/resources/sql/mysql_ddl.sql | 3 - .../dao/entity/TaskDefinition.java | 6 - .../dao/entity/TaskDefinitionLog.java | 1 - .../dao/entity/TaskInstance.java | 6 - .../dao/mapper/TaskInstanceMapper.java | 4 - .../dao/repository/TaskInstanceDao.java | 16 -- .../repository/impl/TaskInstanceDaoImpl.java | 20 -- .../dao/utils/TaskCacheUtils.java | 213 ------------------ .../dao/mapper/TaskDefinitionLogMapper.xml | 6 +- .../dao/mapper/TaskDefinitionMapper.xml | 8 +- .../dao/mapper/TaskInstanceMapper.xml | 16 +- .../resources/sql/dolphinscheduler_h2.sql | 4 - .../resources/sql/dolphinscheduler_mysql.sql | 5 - .../sql/dolphinscheduler_postgresql.sql | 5 - .../mysql/dolphinscheduler_ddl.sql | 115 ++++++++++ .../postgresql/dolphinscheduler_ddl.sql | 114 ++++++++++ .../dao/utils/TaskCacheUtilsTest.java | 204 ----------------- .../runnable/AbstractTaskInstanceFactory.java | 2 - .../runnable/RetryTaskInstanceFactory.java | 1 - .../service/model/TaskNode.java | 10 - .../service/process/ProcessServiceImpl.java | 1 - .../service/process/ProcessServiceTest.java | 2 - .../src/locales/en_US/project.ts | 2 - .../src/locales/zh_CN/project.ts | 2 - .../service/modules/task-instances/index.ts | 10 - .../task/components/node/fields/index.ts | 1 - .../task/components/node/fields/use-cache.ts | 29 --- .../components/node/fields/use-run-flag.ts | 3 +- .../task/components/node/format-data.ts | 2 - .../node/tasks/use-aliyun-serverless-spark.ts | 1 - .../task/components/node/tasks/use-chunjun.ts | 1 - .../components/node/tasks/use-data-factory.ts | 1 - .../components/node/tasks/use-datasync.ts | 1 - .../task/components/node/tasks/use-datax.ts | 1 - .../task/components/node/tasks/use-dinky.ts | 1 - .../task/components/node/tasks/use-dms.ts | 1 - .../task/components/node/tasks/use-dvc.ts | 1 - .../task/components/node/tasks/use-emr.ts | 1 - .../components/node/tasks/use-flink-stream.ts | 1 - .../task/components/node/tasks/use-flink.ts | 1 - .../components/node/tasks/use-hive-cli.ts | 1 - .../task/components/node/tasks/use-http.ts | 1 - .../task/components/node/tasks/use-java.ts | 1 - .../task/components/node/tasks/use-jupyter.ts | 1 - .../task/components/node/tasks/use-k8s.ts | 1 - .../components/node/tasks/use-kubeflow.ts | 1 - .../task/components/node/tasks/use-linkis.ts | 1 - .../task/components/node/tasks/use-mlflow.ts | 1 - .../task/components/node/tasks/use-mr.ts | 1 - .../components/node/tasks/use-openmldb.ts | 1 - .../components/node/tasks/use-procedure.ts | 1 - .../task/components/node/tasks/use-python.ts | 1 - .../task/components/node/tasks/use-pytorch.ts | 1 - .../components/node/tasks/use-remote-shell.ts | 1 - .../components/node/tasks/use-sagemaker.ts | 1 - .../components/node/tasks/use-sea-tunnel.ts | 1 - .../task/components/node/tasks/use-shell.ts | 1 - .../task/components/node/tasks/use-spark.ts | 1 - .../task/components/node/tasks/use-sql.ts | 1 - .../task/components/node/tasks/use-sqoop.ts | 1 - .../components/node/tasks/use-zeppelin.ts | 1 - .../projects/task/components/node/types.ts | 4 +- .../components/dag/dag-context-menu.tsx | 13 -- .../workflow/components/dag/index.tsx | 8 - 74 files changed, 240 insertions(+), 782 deletions(-) delete mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java delete mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java delete mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java delete mode 100644 dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts diff --git a/docs/docs/en/faq.md b/docs/docs/en/faq.md index 7ebc9dc811d2..52a77f01f564 100644 --- a/docs/docs/en/faq.md +++ b/docs/docs/en/faq.md @@ -752,20 +752,4 @@ start API server. If you want disabled when Python gateway service you could cha --- -## Q:How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task? - -A: For the task identified as `Cache Execution`, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed: - -- task definition: the id of the task definition corresponding to the task instance -- task version: the version of the task definition corresponding to the task instance -- task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using `${}` -- environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the `security` - `environment management` - -If the task with cache identification runs, it will find whether there is data with the same cache key in the database, - -- If there is, copy the task instance and update the corresponding data -- If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed - -If you do not need to cache, you can right-click the node to run `Clear cache` in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version. - We will collect more FAQ later diff --git a/docs/docs/en/guide/task/appendix.md b/docs/docs/en/guide/task/appendix.md index 456921661796..57cb5dbc3106 100644 --- a/docs/docs/en/guide/task/appendix.md +++ b/docs/docs/en/guide/task/appendix.md @@ -8,7 +8,6 @@ DolphinScheduler task plugins share some common default parameters. Each type of |--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Node Name | The name of the task. Node names within the same workflow must be unique. | | Run Flag | Indicating whether to schedule the task. If you do not need to execute the task, you can turn on the `Prohibition execution` switch. | -| Cache Execution | Indicating whether this node needs to be cached. If it is cached, the same identifier (same task version, same task definition, same parameter input) task is cached. When the task has been cached, it will not be executed again, and the result will be reused directly. | | Description | Describing the function of this node. | | Task Priority | When the number of the worker threads is insufficient, the worker executes task according to the priority. When two tasks have the same priority, the worker will execute them in `first come first served` fashion. | | Worker Group | Machines which execute the tasks. If you choose `default`, scheduler will send the task to a random worker. | diff --git a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/workflow-json/test.json b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/workflow-json/test.json index 9601a93a9beb..c3cb8a5741e6 100644 --- a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/workflow-json/test.json +++ b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/workflow-json/test.json @@ -53,7 +53,6 @@ "taskParamList" : [ ], "taskParamMap" : null, "flag" : "YES", - "isCache" : "NO", "taskPriority" : "MEDIUM", "userName" : null, "projectName" : null, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index 777da9bc2186..d5f41c6aa17e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -19,13 +19,11 @@ import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR; import org.apache.dolphinscheduler.api.audit.OperatorLog; import org.apache.dolphinscheduler.api.audit.enums.AuditType; -import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.utils.Result; @@ -37,7 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -202,24 +199,4 @@ public Result stopTask(@Parameter(hidden = true) @RequestAttribute(value return taskInstanceService.stopTask(loginUser, projectCode, id); } - /** - * remove task instance cache - * - * @param loginUser login user - * @param projectCode project code - * @param id task instance id - * @return the result code and msg - */ - @Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE") - @Parameters({ - @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12")) - }) - @DeleteMapping(value = "/{id}/remove-cache") - @ResponseStatus(HttpStatus.OK) - @ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR) - public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "id") Integer id) { - return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id); - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java deleted file mode 100644 index 34a8218304be..000000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.dto.taskInstance; - -import org.apache.dolphinscheduler.api.utils.Result; - -import lombok.Data; - -/** - * task instance success response - */ -@Data -public class TaskInstanceRemoveCacheResponse extends Result { - - private String cacheKey; - - public TaskInstanceRemoveCacheResponse(Result result) { - super(); - this.setCode(result.getCode()); - this.setMsg(result.getMsg()); - } - - public TaskInstanceRemoveCacheResponse(Result result, String cacheKey) { - super(); - this.setCode(result.getCode()); - this.setMsg(result.getMsg()); - this.cacheKey = cacheKey; - } -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index c9f091272395..8e129f943698 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -314,8 +314,6 @@ public enum Status { "资源文件已授权其他用户[{0}],后缀不允许修改"), RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"), - REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"), - ILLEGAL_RESOURCE_PATH(20020, "Resource file [{0}] is illegal", "非法的资源路径[{0}]"), USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"), @@ -520,7 +518,6 @@ public enum Status { CLOSE_TASK_GROUP_ERROR(130011, "close task group error", "关闭任务组错误"), START_TASK_GROUP_ERROR(130012, "start task group error", "启动任务组错误"), QUERY_TASK_GROUP_QUEUE_LIST_ERROR(130013, "query task group queue list error", "查询任务组队列列表错误"), - TASK_GROUP_CACHE_START_FAILED(130014, "cache start failed", "任务组相关的缓存启动失败"), ENVIRONMENT_WORKER_GROUPS_IS_INVALID(130015, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"), UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(130016, "You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]", diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index af7c650f34b2..3690aa118c6f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -101,14 +100,5 @@ void forceTaskSuccess(User loginUser, */ TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId); - /** - * remove task instance cache - * @param loginUser - * @param projectCode - * @param taskInstanceId - * @return - */ - TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId); - void deleteByWorkflowInstanceId(Integer workflowInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 372ee35b6d72..b1518e3cc70d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -18,10 +18,8 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; -import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -43,7 +41,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; -import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator; @@ -56,7 +53,6 @@ import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import java.util.Date; import java.util.HashSet; @@ -325,31 +321,6 @@ public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long return taskInstance; } - @Override - public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, - Integer taskInstanceId) { - Result result = new Result(); - - Project project = projectMapper.queryByCode(projectCode); - projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE); - - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId); - if (taskInstance == null) { - log.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, - taskInstanceId); - putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); - return new TaskInstanceRemoveCacheResponse(result); - } - String tagCacheKey = taskInstance.getCacheKey(); - Pair taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey); - String cacheKey = taskIdAndCacheKey.getRight(); - if (StringUtils.isNotEmpty(cacheKey)) { - taskInstanceDao.clearCacheByCacheKey(cacheKey); - } - putMsg(result, Status.SUCCESS); - return new TaskInstanceRemoveCacheResponse(result, cacheKey); - } - @Override public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { List needToDeleteTaskInstances = diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 224111ba8047..5a8dabaaae04 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; -import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; @@ -410,29 +409,4 @@ public void testForceTaskSuccess_withTaskInstanceNotFinished() { () -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId())); } - @Test - public void testRemoveTaskInstanceCache() { - User user = getAdminUser(); - long projectCode = 1L; - Project project = getProject(projectCode); - int taskId = 1; - TaskInstance task = getTaskInstance(); - String cacheKey = "950311f3597f9198976cd3fd69e208e5b9ba6750"; - task.setCacheKey(cacheKey); - - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(taskInstanceMapper.selectById(1)).thenReturn(task); - when(taskInstanceDao.queryByCacheKey(cacheKey)).thenReturn(task, null); - when(taskInstanceDao.updateById(task)).thenReturn(true); - - TaskInstanceRemoveCacheResponse response = - taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId); - Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode()); - - when(taskInstanceMapper.selectById(1)).thenReturn(null); - TaskInstanceRemoveCacheResponse responseNotFoundTask = - taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId); - Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), responseNotFoundTask.getCode()); - - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java index 68fdb2a96f01..6466b2168794 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java @@ -26,7 +26,6 @@ * have_arr_variables * have_map_variables * have_alert - * is_cache */ public enum Flag { diff --git a/dolphinscheduler-common/src/test/resources/sql/mysql_ddl.sql b/dolphinscheduler-common/src/test/resources/sql/mysql_ddl.sql index c4be0bed7fd7..feb99080424f 100644 --- a/dolphinscheduler-common/src/test/resources/sql/mysql_ddl.sql +++ b/dolphinscheduler-common/src/test/resources/sql/mysql_ddl.sql @@ -36,9 +36,6 @@ ALTER TABLE `t_ds_plugin_define` AUTO_INCREMENT 2; ALTER TABLE `t_ds_workflow_instance` MODIFY COLUMN `state_history` text NULL COMMENT 'state history desc'; ALTER TABLE `t_ds_project` MODIFY COLUMN `description` varchar(255) NULL; ALTER TABLE `t_ds_task_group` MODIFY COLUMN `description` varchar(255) NULL; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `app_link` text NULL COMMENT 'yarn app id', MODIFY COLUMN `cache_key` varchar(200) NULL COMMENT 'cache_key', MODIFY COLUMN `executor_name` varchar(64) NULL; ALTER TABLE `t_ds_worker_group` MODIFY COLUMN `description` text NULL COMMENT 'description'; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `cache_key` varchar(200) NULL COMMENT 'cache_key', MODIFY COLUMN `executor_name` varchar(64) NULL; ALTER TABLE `t_ds_fav_task` MODIFY COLUMN `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id'; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `cache_key` varchar(200) NULL COMMENT 'cache_key', MODIFY COLUMN `executor_name` varchar(64) NULL; SET FOREIGN_KEY_CHECKS = 1; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 02323d29b048..064082e7410e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -122,11 +122,6 @@ public class TaskDefinition { */ private Flag flag; - /** - * task is cache: yes/no - */ - private Flag isCache; - /** * task priority */ @@ -291,7 +286,6 @@ public boolean equals(Object o) { && Objects.equals(taskType, that.taskType) && Objects.equals(taskParams, that.taskParams) && flag == that.flag - && isCache == that.isCache && taskPriority == that.taskPriority && Objects.equals(workerGroup, that.workerGroup) && timeoutFlag == that.timeoutFlag diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index 21c4dc899f03..ce47df869a84 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -69,7 +69,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) { this.setFailRetryInterval(taskDefinition.getFailRetryInterval()); this.setFailRetryTimes(taskDefinition.getFailRetryTimes()); this.setFlag(taskDefinition.getFlag()); - this.setIsCache(taskDefinition.getIsCache()); this.setModifyBy(taskDefinition.getModifyBy()); this.setCpuQuota(taskDefinition.getCpuQuota()); this.setMemoryMax(taskDefinition.getMemoryMax()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index ca0ee90c9cfb..5dc6443f8dba 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -27,7 +27,6 @@ import lombok.Data; -import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -95,11 +94,6 @@ public class TaskInstance implements Serializable { private Flag flag; - private Flag isCache; - - @TableField(updateStrategy = FieldStrategy.IGNORED) - private String cacheKey; - @TableField(exist = false) private String duration; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index e2979c14e005..ca370e90926a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -46,10 +46,6 @@ List findValidTaskListByWorkflowInstanceId(@Param("workflowInstanc TaskInstance queryByInstanceIdAndCode(@Param("workflowInstanceId") int workflowInstanceId, @Param("taskCode") Long taskCode); - TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey); - - Boolean clearCacheByCacheKey(@Param("cacheKey") String cacheKey); - List queryByWorkflowInstanceIdsAndTaskCodes(@Param("workflowInstanceIds") List workflowInstanceIds, @Param("taskCodes") List taskCodes); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index be5d6a1fe57f..0518759d7820 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -79,22 +79,6 @@ public interface TaskInstanceDao extends IDao { */ List queryPreviousTaskListByWorkflowInstanceId(Integer workflowInstanceId); - /** - * find task instance by cache_key - * - * @param cacheKey cache key - * @return task instance - */ - TaskInstance queryByCacheKey(String cacheKey); - - /** - * clear task instance cache by cache_key - * - * @param cacheKey cache key - * @return task instance - */ - Boolean clearCacheByCacheKey(String cacheKey); - void deleteByWorkflowInstanceId(int workflowInstanceId); List queryByWorkflowInstanceId(Integer workflowInstanceId); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 8c0f9f9cca43..00eb08eb8f63 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import java.util.Date; import java.util.List; @@ -155,25 +154,6 @@ public List queryPreviousTaskListByWorkflowInstanceId(Integer work workflowInstance.getTestFlag()); } - @Override - public TaskInstance queryByCacheKey(String cacheKey) { - if (StringUtils.isEmpty(cacheKey)) { - return null; - } - return mybatisMapper.queryByCacheKey(cacheKey); - } - - @Override - public Boolean clearCacheByCacheKey(String cacheKey) { - try { - mybatisMapper.clearCacheByCacheKey(cacheKey); - return true; - } catch (Exception e) { - log.error("clear cache by cacheKey failed", e); - return false; - } - } - @Override public void deleteByWorkflowInstanceId(int workflowInstanceId) { mybatisMapper.deleteByWorkflowInstanceId(workflowInstanceId); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java deleted file mode 100644 index 90b648386b9e..000000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.dao.utils; - -import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX; - -import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; -import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; - -import java.io.FileInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import lombok.extern.slf4j.Slf4j; - -import com.fasterxml.jackson.databind.JsonNode; - -@Slf4j -public class TaskCacheUtils { - - private TaskCacheUtils() { - throw new IllegalStateException("Utility class"); - } - - public static final String MERGE_TAG = "-"; - - /** - * generate cache key for task instance - * the follow message will be used to generate cache key - * 2. task version - * 3. task is cache - * 4. input VarPool, from upstream task and workflow global parameters - * @param taskInstance task instance - * @param taskExecutionContext taskExecutionContext - * @param storageOperator storageOperate - * @return cache key - */ - public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext, - StorageOperator storageOperator) { - List keyElements = new ArrayList<>(); - keyElements.add(String.valueOf(taskInstance.getTaskCode())); - keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion())); - keyElements.add(String.valueOf(taskInstance.getIsCache().getCode())); - keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig())); - keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperator)); - String data = StringUtils.join(keyElements, "_"); - return DigestUtils.sha256Hex(data); - } - - /** - * generate cache key for task instance which is cache execute - * this key will record which cache task instance will be copied, and cache key will be used - * tagCacheKey = sourceTaskId + "-" + cacheKey - * @param sourceTaskId source task id - * @param cacheKey cache key - * @return tagCacheKey - */ - public static String generateTagCacheKey(Integer sourceTaskId, String cacheKey) { - return sourceTaskId + MERGE_TAG + cacheKey; - } - - /** - * revert cache key tag to source task id and cache key - * @param tagCacheKey cache key - * @return Pair, first is source task id, second is cache key - */ - public static Pair revertCacheKey(String tagCacheKey) { - Pair taskIdAndCacheKey; - if (tagCacheKey == null) { - taskIdAndCacheKey = Pair.of(-1, ""); - return taskIdAndCacheKey; - } - if (tagCacheKey.contains(MERGE_TAG)) { - String[] split = tagCacheKey.split(MERGE_TAG); - if (split.length == 2) { - taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]); - } else { - taskIdAndCacheKey = Pair.of(-1, ""); - } - return taskIdAndCacheKey; - } else { - return Pair.of(-1, tagCacheKey); - } - } - - /** - * get hash data of task input var pool - * there are two parts of task input var pool: from upstream task and workflow global parameters - * @param taskInstance task instance - * taskExecutionContext taskExecutionContext - */ - public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context, - StorageOperator storageOperator) { - JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams()); - - // The set of input values considered from localParams in the taskParams - Set propertyInSet = JSONUtils.toList(taskParams.get("localParams").toString(), Property.class).stream() - .filter(property -> property.getDirect().equals(Direct.IN)) - .map(Property::getProp).collect(Collectors.toSet()); - - // The set of input values considered from `${var}` form task definition - propertyInSet.addAll(getScriptVarInSet(taskInstance)); - - // var pool value from upstream task - List varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class); - - Map fileCheckSumMap = new HashMap<>(); - List fileInput = varPool.stream().filter(property -> property.getType().equals(DataType.FILE)) - .collect(Collectors.toList()); - fileInput.forEach( - property -> fileCheckSumMap.put(property.getProp(), - getValCheckSum(property, context, storageOperator))); - - // var pool value from workflow global parameters - if (context.getPrepareParamsMap() != null) { - Set taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet()); - List globalContextVarPool = context.getPrepareParamsMap().entrySet().stream() - .filter(entry -> !taskVarPoolSet.contains(entry.getKey())) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); - varPool.addAll(globalContextVarPool); - } - - // only consider var pool value which is in propertyInSet - varPool = varPool.stream() - .filter(property -> property.getDirect().equals(Direct.IN)) - .filter(property -> propertyInSet.contains(property.getProp())) - .sorted(Comparator.comparing(Property::getProp)) - .collect(Collectors.toList()); - - varPool.forEach(property -> { - if (property.getType() == DataType.FILE) { - property.setValue(fileCheckSumMap.get(property.getValue())); - } - }); - return JSONUtils.toJsonString(varPool); - } - - /** - * get checksum from crc32 file of file property in varPool - * cache can be used if content of upstream output files are the same - * @param fileProperty - * @param context - * @param storageOperator - */ - public static String getValCheckSum(Property fileProperty, TaskExecutionContext context, - StorageOperator storageOperator) { - String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX; - String resourceCRCWholePath = - storageOperator.getStorageFileAbsolutePath(context.getTenantCode(), resourceCRCPath); - String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath); - log.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath); - String crcString = ""; - try { - storageOperator.download(resourceCRCWholePath, targetPath, true); - crcString = FileUtils.readFile2Str(new FileInputStream(targetPath)); - fileProperty.setValue(crcString); - } catch (IOException e) { - log.error("Replace checksum failed for file property {}.", fileProperty.getProp()); - } - return crcString; - } - - /** - * get var in set from task definition - * @param taskInstance task instance - * @return var in set - */ - public static List getScriptVarInSet(TaskInstance taskInstance) { - Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}"); - Matcher matcher = pattern.matcher(taskInstance.getTaskParams()); - - List varInSet = new ArrayList<>(); - while (matcher.find()) { - varInSet.add(matcher.group(1)); - } - return varInSet; - } - -} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index 14c71f83c5bb..2dfa96a92920 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -19,7 +19,7 @@ - id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority, + id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type @@ -50,14 +50,14 @@ insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id, - task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, + task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type) values (#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description}, #{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams}, - #{taskDefinitionLog.flag},#{taskDefinitionLog.isCache},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode}, + #{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode}, #{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy}, #{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime}, #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index fe154bd6c239..5dab4f5fb803 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -19,13 +19,13 @@ - id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority, + id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max, task_execute_type ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id, - ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.is_cache, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, + ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout, ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id, ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max, ${alias}.task_execute_type @@ -86,12 +86,12 @@ insert into t_ds_task_definition (code, name, version, description, project_code, user_id, - task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, + task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time,task_group_id, task_execute_type) values (#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description}, - #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},#{taskDefinition.isCache}, + #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag}, #{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes}, #{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout}, #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 04b6bec241e4..5f7ba908b995 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -21,13 +21,13 @@ id, name, task_type, workflow_instance_id, workflow_instance_name, project_code, task_code, task_definition_version, state, submit_time, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, - flag, is_cache, cache_key, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, executor_name, + flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, executor_name, first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.workflow_instance_id, ${alias}.state, ${alias}.submit_time, ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, - ${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, + ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type - - - update t_ds_task_instance - set cache_key = null - where cache_key = #{cacheKey} -