diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index 94c9c00e0..b0734ce26 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -152,6 +152,9 @@ public void setTasks(List tasks) { @ProtoField(id = 28) private String expression; + @ProtoField(id = 29) + private boolean permissive = false; + /** * @return the name */ @@ -547,6 +550,22 @@ public void setExpression(String expression) { this.expression = expression; } + /** + * @return If the task is permissive. When set to true, and the task is in failed status, + * fail-fast does not occur. The workflow execution continues until reaching join or end of + * workflow, allowing idempotent execution of other tasks. + */ + public boolean isPermissive() { + return this.permissive; + } + + /** + * @param permissive when set to true, the task is marked as permissive + */ + public void setPermissive(boolean permissive) { + this.permissive = permissive; + } + private Collection> children() { Collection> workflowTaskLists = new LinkedList<>(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 6d7a42e70..e1bacb34d 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -207,7 +207,9 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get()); executedTaskRefNames.remove(retryTask.get().getReferenceTaskName()); outcome.tasksToBeUpdated.add(pendingTask); - } else { + } else if (!(pendingTask.getWorkflowTask() != null + && pendingTask.getWorkflowTask().isPermissive() + && !pendingTask.getWorkflowTask().isOptional())) { pendingTask.setStatus(COMPLETED_WITH_ERRORS); } } @@ -254,6 +256,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List preS if (hasSuccessfulTerminateTask || (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) { LOGGER.debug("Marking workflow: {} as complete.", workflow); + List permissiveTasksTerminalNonSuccessful = + workflow.getTasks().stream() + .filter(t -> t.getWorkflowTask() != null) + .filter(t -> t.getWorkflowTask().isPermissive()) + .filter(t -> !t.getWorkflowTask().isOptional()) + .collect( + Collectors.toMap( + TaskModel::getReferenceTaskName, + t -> t, + (t1, t2) -> + t1.getRetryCount() > t2.getRetryCount() + ? t1 + : t2)) + .values() + .stream() + .filter( + t -> + t.getStatus().isTerminal() + && !t.getStatus().isSuccessful()) + .toList(); + if (!permissiveTasksTerminalNonSuccessful.isEmpty()) { + final String errMsg = + permissiveTasksTerminalNonSuccessful.stream() + .map( + t -> + String.format( + "Task %s failed with status: %s and reason: '%s'", + t.getTaskId(), + t.getStatus(), + t.getReasonForIncompletion())) + .collect(Collectors.joining(". ")); + throw new TerminateWorkflowException(errMsg); + } outcome.isComplete = true; } @@ -437,11 +472,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow) if (status == null || !status.isTerminal()) { return false; } - // if we reach here, the task has been completed. - // Was the task successful in completion? - if (!status.isSuccessful()) { - return false; - } } boolean noPendingSchedule = @@ -529,7 +559,8 @@ Optional retry( if (!task.getStatus().isRetriable() || TaskType.isBuiltIn(task.getTaskType()) || expectedRetryCount <= retryCount) { - if (workflowTask != null && workflowTask.isOptional()) { + if (workflowTask != null + && (workflowTask.isOptional() || workflowTask.isPermissive())) { return Optional.empty(); } WorkflowModel.Status status; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 84ee5e001..a25f06dd1 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) { for (TaskModel task : workflow.getTasks()) { switch (task.getStatus()) { case FAILED: + if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString()) + || task.getTaskType() + .equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) { + @SuppressWarnings("unchecked") + List joinOn = (List) task.getInputData().get("joinOn"); + boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow); + if (joinOnFailedPermissive) { + task.setStatus(IN_PROGRESS); + addTaskToQueue(task); + break; + } + } case FAILED_WITH_TERMINAL_ERROR: case TIMED_OUT: retriableMap.put(task.getReferenceTaskName(), task); @@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) { LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE); } + + private static boolean isJoinOnFailedPermissive(List joinOn, WorkflowModel workflow) { + return joinOn.stream() + .map(workflow::getTaskByRefName) + .anyMatch( + t -> + t.getWorkflowTask().isPermissive() + && !t.getWorkflowTask().isOptional() + && t.getStatus().equals(FAILED)); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java index fc1325e5e..9800eb451 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java @@ -65,9 +65,20 @@ public boolean execute( } taskStatus = exclusiveTask.getStatus(); foundExlusiveJoinOnTask = taskStatus.isTerminal(); - hasFailures = !taskStatus.isSuccessful(); + hasFailures = + !taskStatus.isSuccessful() + && (!exclusiveTask.getWorkflowTask().isPermissive() + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } break; diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index a0ad5a96f..4114e39ab 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -57,9 +57,21 @@ public boolean execute( break; } TaskModel.Status taskStatus = forkedTask.getStatus(); - hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional(); + hasFailures = + !taskStatus.isSuccessful() + && !forkedTask.getWorkflowTask().isOptional() + && (!forkedTask.getWorkflowTask().isPermissive() + || joinOn.stream() + .map(workflow::getTaskByRefName) + .allMatch(t -> t.getStatus().isTerminal())); if (hasFailures) { - failureReason.append(forkedTask.getReasonForIncompletion()).append(" "); + final String failureReasons = + joinOn.stream() + .map(workflow::getTaskByRefName) + .filter(t -> !t.getStatus().isSuccessful()) + .map(TaskModel::getReasonForIncompletion) + .collect(Collectors.joining(" ")); + failureReason.append(failureReasons); } // Only add to task output if it's not empty if (!forkedTask.getOutputData().isEmpty()) { diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index d06029020..c89a97b70 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -440,6 +440,79 @@ public void testOptional() { outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); } + @Test + public void testPermissive() { + WorkflowDef def = new WorkflowDef(); + def.setName("test-permissive"); + + WorkflowTask task1 = new WorkflowTask(); + task1.setName("task0"); + task1.setPermissive(true); + task1.setTaskReferenceName("t0"); + task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}"); + task1.setTaskDefinition(new TaskDef("task0")); + + WorkflowTask task2 = new WorkflowTask(); + task2.setName("task1"); + task2.setPermissive(true); + task2.setTaskReferenceName("t1"); + task2.setTaskDefinition(new TaskDef("task1")); + + def.getTasks().add(task1); + def.getTasks().add(task2); + def.setSchemaVersion(2); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowDefinition(def); + workflow.setCreateTime(System.currentTimeMillis()); + DeciderOutcome outcome = deciderService.decide(workflow); + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeScheduled.size()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + + for (int i = 0; i < 3; i++) { + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId")); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task1.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount()); + } + + String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId(); + + workflow.getTasks().clear(); + workflow.getTasks().addAll(outcome.tasksToBeScheduled); + workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED); + + outcome = deciderService.decide(workflow); + + assertNotNull(outcome); + assertEquals(1, outcome.tasksToBeUpdated.size()); + assertEquals(1, outcome.tasksToBeScheduled.size()); + + assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus()); + assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId()); + assertEquals( + task2.getTaskReferenceName(), + outcome.tasksToBeScheduled.get(0).getReferenceTaskName()); + } + @Test public void testOptionalWithDynamicFork() { WorkflowDef def = new WorkflowDef(); diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 8548d9120..b6d3a3447 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -1351,6 +1351,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { if (from.getExpression() != null) { to.setExpression( from.getExpression() ); } + to.setPermissive( from.isPermissive() ); return to.build(); } @@ -1396,6 +1397,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { to.setRetryCount( from.getRetryCount() ); to.setEvaluatorType( from.getEvaluatorType() ); to.setExpression( from.getExpression() ); + to.setPermissive( from.getPermissive() ); return to; } diff --git a/grpc/src/main/proto/model/workflowtask.proto b/grpc/src/main/proto/model/workflowtask.proto index 8855a714f..2c35d56dd 100644 --- a/grpc/src/main/proto/model/workflowtask.proto +++ b/grpc/src/main/proto/model/workflowtask.proto @@ -41,4 +41,5 @@ message WorkflowTask { int32 retry_count = 26; string evaluator_type = 27; string expression = 28; + bool permissive = 29; } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 83407e7e8..4494874f3 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -16,11 +16,11 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.tasks.Join import com.netflix.conductor.core.execution.tasks.SubWorkflow -import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared @@ -45,18 +45,23 @@ class ForkJoinSpec extends AbstractSpecification { @Shared def FORK_JOIN_SUB_WORKFLOW = 'integration_test_fork_join_sw' + @Shared + def FORK_JOIN_PERMISSIVE_WF = 'FanInOutPermissiveTest' + @Autowired SubWorkflow subWorkflowTask def setup() { workflowTestUtil.registerWorkflows('fork_join_integration_test.json', 'fork_join_with_no_task_retry_integration_test.json', + 'fork_join_with_no_permissive_task_retry_integration_test.json', 'nested_fork_join_integration_test.json', 'simple_workflow_1_integration_test.json', 'nested_fork_join_with_sub_workflow_integration_test.json', 'simple_one_task_sub_workflow_integration_test.json', 'fork_join_with_optional_sub_workflow_forks_integration_test.json', - 'fork_join_sub_workflow.json' + 'fork_join_sub_workflow.json', + 'fork_join_permissive_integration_test.json', ) } @@ -251,6 +256,110 @@ class ForkJoinSpec extends AbstractSpecification { metadataService.updateTaskDef(persistedIntegrationTask2Definition) } + /** + * start + * | + * fork + * / \ + * p_task1 p_task2 + * | / + * \ / + * \ / + * join + * | + * s_task3 + * | + * End + */ + def "Test a simple workflow with fork join permissive failure flow"() { + setup: "Ensure that 'integration_task_1' has a retry count of 0" + def persistedIntegrationTask1Definition = workflowTestUtil.getPersistedTaskDefinition('integration_task_1').get() + def modifiedIntegrationTask1Definition = new TaskDef(persistedIntegrationTask1Definition.name, + persistedIntegrationTask1Definition.description, persistedIntegrationTask1Definition.ownerEmail, 0, + 0, persistedIntegrationTask1Definition.responseTimeoutSeconds) + metadataService.updateTaskDef(modifiedIntegrationTask1Definition) + + when: "A fork join workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF, 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].workflowTask.permissive + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_task_1' + tasks[2].workflowTask.permissive + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndFailTask('integration_task_1', 'task1.worker', 'Failed...') + + then: "verify that the 'integration_task_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The other node of the fork is completed by completing 'integration_task_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2','task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "JOIN task executed by the async executor" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + then: "The workflow has been updated with the task status and task list" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 4 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'integration_task_1' + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + } + + cleanup: "Restore the task definitions that were modified as part of this feature testing" + metadataService.updateTaskDef(persistedIntegrationTask1Definition) + } + def "Test retrying a failed fork join workflow"() { when: "A fork join workflow is started" @@ -384,6 +493,166 @@ class ForkJoinSpec extends AbstractSpecification { } } + def "Test retrying a failed permissive fork join workflow"() { + + when: "A fork join permissive workflow is started" + def workflowInstanceId = startWorkflow(FORK_JOIN_PERMISSIVE_WF + '_2', 1, + 'fanoutTest', [:], + null) + + then: "verify that the workflow has started and the starting nodes of the each fork are in scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 4 + tasks[0].status == Task.Status.COMPLETED + tasks[0].taskType == 'FORK' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + } + + when: "The first task of the fork is polled and completed" + def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("fanouttask_join").getTaskId() + def polledAndAckTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_1', 'task1.worker') + + then: "verify that the 'integration_task_p_0_RT_1' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask1Try1) + + and: "The workflow has been updated and has all the required tasks in the right status to move forward" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_2'" + def polledAndAckTask2Try1 = workflowTestUtil.pollAndFailTask('integration_p_task_0_RT_2', + 'task1.worker', 'Failed....') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_0_RT_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try1) + + and: "the workflow is not in the failed state, until the completion of the permissive forked tasks" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.SCHEDULED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_3'" + def polledAndAckTask3Try1 = workflowTestUtil.pollAndFailTask('integration_p_task_0_RT_3', + 'task1.worker', 'Failed....') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_0_RT_3' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try1) + + and: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "the workflow is in the failed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 5 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.FAILED + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + } + + when: "The workflow is retried" + workflowExecutor.retry(workflowInstanceId, false) + + then: "verify that all the workflow is retried and new tasks are added in place of the failed tasks" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 7 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.IN_PROGRESS + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + tasks[5].status == Task.Status.SCHEDULED + tasks[5].taskType == 'integration_p_task_0_RT_2' + tasks[6].status == Task.Status.SCHEDULED + tasks[6].taskType == 'integration_p_task_0_RT_3' + } + + when: "The 'integration_p_task_0_RT_3' is polled and completed" + def polledAndAckTask3Try2 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_3', 'task1.worker') + + then: "verify that the 'integration_p_task_3' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask3Try2) + + when: "The other node of the fork is completed by completing 'integration_p_task_0_RT_2'" + def polledAndAckTask2Try2 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_2', 'task1.worker') + + and: "workflow is evaluated" + sweep(workflowInstanceId) + + then: "verify that the 'integration_p_task_2' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask2Try2) + + when: "JOIN task is polled and executed" + asyncSystemTaskExecutor.execute(joinTask, joinTaskId) + + and: "The last task of the workflow is then polled and completed integration_p_task_0_RT_4'" + def polledAndAckTask4Try1 = workflowTestUtil.pollAndCompleteTask('integration_p_task_0_RT_4', 'task1.worker') + + then: "verify that the 'integration_p_task_0_RT_4' was polled and acknowledged" + workflowTestUtil.verifyPolledAndAcknowledgedTask(polledAndAckTask4Try1) + + then: "Then verify that the workflow is completed and the task list of execution is as expected" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 8 + tasks[1].status == Task.Status.COMPLETED + tasks[1].taskType == 'integration_p_task_0_RT_1' + tasks[2].status == Task.Status.FAILED + tasks[2].taskType == 'integration_p_task_0_RT_2' + tasks[3].status == Task.Status.COMPLETED + tasks[3].taskType == 'JOIN' + tasks[4].status == Task.Status.FAILED + tasks[4].taskType == 'integration_p_task_0_RT_3' + tasks[5].status == Task.Status.COMPLETED + tasks[5].taskType == 'integration_p_task_0_RT_2' + tasks[6].status == Task.Status.COMPLETED + tasks[6].taskType == 'integration_p_task_0_RT_3' + tasks[7].status == Task.Status.COMPLETED + tasks[7].taskType == 'integration_p_task_0_RT_4' + } + } + def "Test nested fork join workflow success flow"() { given: "Input for the nested fork join workflow" Map input = new HashMap() diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy index 3561665be..d594fbfeb 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/WorkflowAndTaskConfigurationSpec.groovy @@ -45,6 +45,12 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { @Shared def WORKFLOW_WITH_OPTIONAL_TASK = 'optional_task_wf' + @Shared + def WORKFLOW_WITH_PERMISSIVE_TASK = 'permissive_task_wf' + + @Shared + def WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK = 'permissive_optional_task_wf' + @Shared def TEST_WORKFLOW = 'integration_test_wf3' @@ -52,12 +58,14 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { def WAIT_TIME_OUT_WORKFLOW = 'test_wait_timeout' def setup() { - //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK + //Register LINEAR_WORKFLOW_T1_T2, TEST_WORKFLOW, RTOWF, WORKFLOW_WITH_OPTIONAL_TASK, WORKFLOW_WITH_PERMISSIVE_TASK, WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK workflowTestUtil.registerWorkflows( 'simple_workflow_1_integration_test.json', 'simple_workflow_1_input_template_integration_test.json', 'simple_workflow_3_integration_test.json', 'simple_workflow_with_optional_task_integration_test.json', + 'simple_workflow_with_permissive_task_integration_test.json', + 'simple_workflow_with_permissive_optional_task_integration_test.json', 'simple_wait_task_workflow_integration_test.json') } @@ -133,6 +141,155 @@ class WorkflowAndTaskConfigurationSpec extends AbstractSpecification { } } + def "Test simple workflow which has a permissive task"() { + + given: "A input parameters for a workflow with a permissive task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_permissive' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_permissive' + } + + when: "The first permissive task is polled and failed" + Tuple polledAndFailedTaskTry2 = workflowTestUtil.pollAndFailTask('task_permissive', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_permissive was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry2) + + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.FAILED + tasks[1].taskType == 'task_permissive' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + reasonForIncompletion == "Task ${tasks[1].taskId} failed with status: FAILED and reason: 'NETWORK ERROR'" + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + + def "Test simple workflow which has a permissive optional task"() { + + given: "A input parameters for a workflow with a permissive optional task" + def correlationId = 'integration_test' + UUID.randomUUID().toString() + def workflowInput = new HashMap() + workflowInput['param1'] = 'p1 value' + workflowInput['param2'] = 'p2 value' + + when: "A permissive optional task workflow is started" + def workflowInstanceId = startWorkflow(WORKFLOW_WITH_PERMISSIVE_OPTIONAL_TASK, 1, + correlationId, workflowInput, + null) + + then: "verify that the workflow has started and the permissive optional task is in a scheduled state" + workflowInstanceId + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'task_optional' + } + + when: "The first permissive optional task is polled and failed" + Tuple polledAndFailedTaskTry1 = workflowTestUtil.pollAndFailTask('task_optional', + 'task1.integration.worker', 'NETWORK ERROR') + + then: "Verify that the task_optional was polled and acknowledged" + verifyPolledAndAcknowledgedTask(polledAndFailedTaskTry1) + + when: "A decide is executed on the workflow" + workflowExecutor.decide(workflowInstanceId) + + then: "verify that the workflow is still running and the first permissive optional task has failed and the retry has kicked in" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 2 + tasks[0].status == Task.Status.FAILED + tasks[0].taskType == 'task_optional' + tasks[1].status == Task.Status.SCHEDULED + tasks[1].taskType == 'task_optional' + } + + when: "Poll the permissive optional task again and do not complete it and run decide" + workflowExecutionService.poll('task_optional', 'task1.integration.worker') + Thread.sleep(5000) + workflowExecutor.decide(workflowInstanceId) + + then: "Ensure that the workflow is updated" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 3 + tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS + tasks[1].taskType == 'task_optional' + tasks[2].status == Task.Status.SCHEDULED + tasks[2].taskType == 'integration_task_2' + } + + when: "The second task 'integration_task_2' is polled and completed" + def task2Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker') + + then: "Verify that the task was polled and acknowledged" + verifyPolledAndAcknowledgedTask(task2Try1) + + and: "Ensure that the workflow is in completed state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 3 + tasks[2].status == Task.Status.COMPLETED + tasks[2].taskType == 'integration_task_2' + } + } + def "test workflow with input template parsing"() { given: "Input parameters for a workflow with input template" def correlationId = 'integration_test' + UUID.randomUUID().toString() diff --git a/test-harness/src/test/resources/fork_join_permissive_integration_test.json b/test-harness/src/test/resources/fork_join_permissive_integration_test.json new file mode 100644 index 000000000..771230d20 --- /dev/null +++ b/test-harness/src/test/resources/fork_join_permissive_integration_test.json @@ -0,0 +1,111 @@ +{ + "name": "FanInOutPermissiveTest", + "description": "FanInOutPermissiveTest", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_task_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + [ + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t1", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json new file mode 100644 index 000000000..ead2a678c --- /dev/null +++ b/test-harness/src/test/resources/fork_join_with_no_permissive_task_retry_integration_test.json @@ -0,0 +1,194 @@ +{ + "name": "FanInOutPermissiveTest_2", + "description": "FanInOutPermissiveTest_2", + "version": 1, + "tasks": [ + { + "name": "fork", + "taskReferenceName": "fanouttask", + "inputParameters": {}, + "type": "FORK_JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [ + [ + { + "name": "integration_p_task_0_RT_1", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_1", + "description": "integration_p_task_0_RT_1", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + }, + { + "name": "integration_p_task_0_RT_3", + "taskReferenceName": "t3", + "inputParameters": { + "p1": "workflow.input.param1", + "p2": "workflow.input.param2" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_3", + "description": "integration_p_task_0_RT_3", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ], + [ + { + "name": "integration_p_task_0_RT_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_2", + "description": "integration_p_task_0_RT_2", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ] + ], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "join", + "taskReferenceName": "fanouttask_join", + "inputParameters": {}, + "type": "JOIN", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [ + "t3", + "t2" + ], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_p_task_0_RT_4", + "taskReferenceName": "t4", + "inputParameters": { + "tp1": "workflow.input.param1" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_p_task_0_RT_4", + "description": "integration_p_task_0_RT_4", + "retryCount": 0, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + } + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": {}, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json new file mode 100644 index 000000000..ac68e097c --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_optional_task_integration_test.json @@ -0,0 +1,60 @@ +{ + "name": "permissive_optional_task_wf", + "description": "permissive_optional_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_optional", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": true, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file diff --git a/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json new file mode 100644 index 000000000..9893d3a26 --- /dev/null +++ b/test-harness/src/test/resources/simple_workflow_with_permissive_task_integration_test.json @@ -0,0 +1,94 @@ +{ + "name": "permissive_task_wf", + "description": "permissive_task_wf", + "version": 1, + "tasks": [ + { + "name": "task_permissive", + "taskReferenceName": "t1", + "inputParameters": { + "p1": "${workflow.input.param1}", + "p2": "${workflow.input.param2}" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "task_permissive", + "description": "task_permissive", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + }, + { + "name": "integration_task_2", + "taskReferenceName": "t2", + "inputParameters": { + "tp1": "${workflow.input.param1}", + "tp2": "${t1.output.op}" + }, + "type": "SIMPLE", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "permissive": true, + "retryCount": 1, + "taskDefinition": { + "createdBy": "integration_app", + "name": "integration_task_2", + "description": "integration_task_2", + "retryCount": 1, + "timeoutSeconds": 120, + "inputKeys": [], + "outputKeys": [], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 0, + "responseTimeoutSeconds": 3600, + "inputTemplate": {}, + "rateLimitPerFrequency": 0, + "rateLimitFrequencyInSeconds": 1 + }, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [] + } + ], + "inputParameters": [ + "param1", + "param2" + ], + "outputParameters": { + "o1": "${workflow.input.param1}", + "o2": "${t2.output.uuid}", + "o3": "${t1.output.op}" + }, + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "ownerEmail": "test@harness.com" +} \ No newline at end of file