From 508c9fb8e7ca51a8ff9fbc6bf9c70fe9f0aa21c4 Mon Sep 17 00:00:00 2001 From: boneys Date: Mon, 18 Dec 2023 23:48:35 -0800 Subject: [PATCH] Add the task update API by reference name --- .../core/dal/ExecutionDAOFacade.java | 6 +++- .../conductor/service/ExecutionService.java | 30 +++++++++++++++---- .../conductor/service/TaskService.java | 7 +++++ .../conductor/service/TaskServiceImpl.java | 21 +++++++++++++ .../rest/controllers/TaskResource.java | 12 ++++++++ 5 files changed, 69 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index 92255bc4d..24d0a93b9 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -439,11 +439,15 @@ public List createTasks(List tasks) { } public List getTasksForWorkflow(String workflowId) { - return executionDAO.getTasksForWorkflow(workflowId).stream() + return getTaskModelsForWorkflow(workflowId).stream() .map(TaskModel::toTask) .collect(Collectors.toList()); } + public List getTaskModelsForWorkflow(String workflowId) { + return executionDAO.getTasksForWorkflow(workflowId); + } + public TaskModel getTaskModel(String taskId) { TaskModel taskModel = getTaskFromDatastore(taskId); if (taskModel != null) { diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index a4e411b4e..7a70eda72 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -14,6 +14,7 @@ import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -27,6 +28,7 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.common.utils.ExternalPayloadStorage.Operation; import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType; +import com.netflix.conductor.common.utils.TaskUtils; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.events.queue.Message; @@ -252,12 +254,28 @@ public Task getTask(String taskId) { } public Task getPendingTaskForWorkflow(String taskReferenceName, String workflowId) { - return executionDAOFacade.getTasksForWorkflow(workflowId).stream() - .filter(task -> !task.getStatus().isTerminal()) - .filter(task -> task.getReferenceTaskName().equals(taskReferenceName)) - .findFirst() // There can only be one task by a given reference name running at a - // time. - .orElse(null); + List tasks = executionDAOFacade.getTaskModelsForWorkflow(workflowId); + Stream taskStream = + tasks.stream().filter(task -> !task.getStatus().isTerminal()); + Optional found = + taskStream + .filter(task -> task.getReferenceTaskName().equals(taskReferenceName)) + .findFirst(); + if (found.isPresent()) { + return found.get().toTask(); + } + // If no task is found, let's check if there is one inside an iteration + found = + tasks.stream() + .filter(task -> !task.getStatus().isTerminal()) + .filter( + task -> + TaskUtils.removeIterationFromTaskRefName( + task.getReferenceTaskName()) + .equals(taskReferenceName)) + .findFirst(); + + return found.map(TaskModel::toTask).orElse(null); } /** diff --git a/core/src/main/java/com/netflix/conductor/service/TaskService.java b/core/src/main/java/com/netflix/conductor/service/TaskService.java index d89772659..e3d18e08e 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskService.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskService.java @@ -250,4 +250,11 @@ SearchResult search( */ ExternalStorageLocation getExternalStorageLocation( String path, String operation, String payloadType); + + String updateTask( + String workflowId, + String taskRefName, + TaskResult.Status status, + String workerId, + Map output); } diff --git a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java index 5c07d5cff..2f70487a4 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java @@ -139,6 +139,27 @@ public String updateTask(TaskResult taskResult) { return taskResult.getTaskId(); } + @Override + public String updateTask( + String workflowId, + String taskRefName, + TaskResult.Status status, + String workerId, + Map output) { + Task pending = getPendingTaskForWorkflow(workflowId, taskRefName); + if (pending == null) { + return null; + } + + TaskResult taskResult = new TaskResult(pending); + taskResult.setStatus(status); + taskResult.getOutputData().putAll(output); + if (StringUtils.isNotBlank(workerId)) { + taskResult.setWorkerId(workerId); + } + return updateTask(taskResult); + } + /** * Ack Task is received. * diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index d9f818f44..500bed4a6 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -83,6 +83,18 @@ public String updateTask(@RequestBody TaskResult taskResult) { return taskService.updateTask(taskResult); } + @PostMapping(value = "/{workflowId}/{taskRefName}/{status}", produces = TEXT_PLAIN_VALUE) + @Operation(summary = "Update a task By Ref Name") + public String updateTask( + @PathVariable("workflowId") String workflowId, + @PathVariable("taskRefName") String taskRefName, + @PathVariable("status") TaskResult.Status status, + @RequestParam(value = "workerid", required = false) String workerId, + @RequestBody Map output) { + + return taskService.updateTask(workflowId, taskRefName, status, workerId, output); + } + @PostMapping("/{taskId}/log") @Operation(summary = "Log Task Execution Details") public void log(@PathVariable("taskId") String taskId, @RequestBody String log) {