Skip to content

Commit

Permalink
WorkflowTask permissive property added, so various task types
Browse files Browse the repository at this point in the history
can be permissive.
  • Loading branch information
ivakoleva committed Dec 19, 2023
1 parent 92f06bf commit fdf5f5e
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public enum TaskType {
DYNAMIC,
FORK_JOIN,
FORK_JOIN_DYNAMIC,
PERMISSIVE,
DECISION,
SWITCH,
JOIN,
Expand Down Expand Up @@ -71,7 +70,6 @@ public enum TaskType {
public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM";
public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE";
public static final String TASK_TYPE_FORK = "FORK";
public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE";
public static final String TASK_TYPE_NOOP = "NOOP";

private static final Set<String> BUILT_IN_TASKS = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ public void setTasks(List<WorkflowTask> tasks) {
@ProtoField(id = 28)
private String expression;

@ProtoField(id = 29)
private boolean permissive = false;

/**
* @return the name
*/
Expand Down Expand Up @@ -548,6 +551,22 @@ public void setExpression(String expression) {
this.expression = expression;
}

/**
* @return If the task is permissive. When set to true, and the task is in failed status,
* fail-fast does not occur. The workflow execution continues until reaching join or end of
* workflow, allowing idempotent execution of other tasks.
*/
public boolean isPermissive() {
return this.permissive;
}

/**
* @param permissive when set to true, the task is marked as permissive
*/
public void setPermissive(boolean permissive) {
this.permissive = permissive;
}

private Collection<List<WorkflowTask>> children() {
Collection<List<WorkflowTask>> workflowTaskLists = new LinkedList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED;
import static com.netflix.conductor.model.TaskModel.Status.*;
Expand Down Expand Up @@ -209,9 +208,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
outcome.tasksToBeUpdated.add(pendingTask);
} else if (!(pendingTask.getWorkflowTask() != null
&& TaskType.PERMISSIVE
.name()
.equals(pendingTask.getWorkflowTask().getType())
&& pendingTask.getWorkflowTask().isPermissive()
&& !pendingTask.getWorkflowTask().isOptional())) {
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
}
Expand Down Expand Up @@ -262,7 +259,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
List<TaskModel> permissiveTasksTerminalNonSuccessful =
workflow.getTasks().stream()
.filter(t -> t.getWorkflowTask() != null)
.filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType()))
.filter(t -> t.getWorkflowTask().isPermissive())
.filter(t -> !t.getWorkflowTask().isOptional())
.collect(
Collectors.toMap(
Expand Down Expand Up @@ -563,8 +560,7 @@ Optional<TaskModel> retry(
|| TaskType.isBuiltIn(task.getTaskType())
|| expectedRetryCount <= retryCount) {
if (workflowTask != null
&& (workflowTask.isOptional()
|| TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) {
&& (workflowTask.isOptional() || workflowTask.isPermissive())) {
return Optional.empty();
}
WorkflowModel.Status status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ private static boolean isJoinOnFailedPermissive(List<String> joinOn, WorkflowMod
.map(workflow::getTaskByRefName)
.anyMatch(
t ->
TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType())
t.getWorkflowTask().isPermissive()
&& !t.getWorkflowTask().isOptional()
&& t.getStatus().equals(FAILED));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN;

@Component(TASK_TYPE_EXCLUSIVE_JOIN)
Expand Down Expand Up @@ -68,7 +67,7 @@ public boolean execute(
foundExlusiveJoinOnTask = taskStatus.isTerminal();
hasFailures =
!taskStatus.isSuccessful()
&& (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType())
&& (!exclusiveTask.getWorkflowTask().isPermissive()
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;

@Component(TASK_TYPE_JOIN)
Expand Down Expand Up @@ -61,7 +60,7 @@ public boolean execute(
hasFailures =
!taskStatus.isSuccessful()
&& !forkedTask.getWorkflowTask().isOptional()
&& (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType())
&& (!forkedTask.getWorkflowTask().isPermissive()
|| joinOn.stream()
.map(workflow::getTaskByRefName)
.allMatch(t -> t.getStatus().isTerminal()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,22 +440,21 @@ public void testOptional() {
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
}

/** Similar to {@link #testOptional} */
@Test
public void testPermissive() {
WorkflowDef def = new WorkflowDef();
def.setName("test-permissive");

WorkflowTask task1 = new WorkflowTask();
task1.setName("task0");
task1.setType("PERMISSIVE");
task1.setPermissive(true);
task1.setTaskReferenceName("t0");
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
task1.setTaskDefinition(new TaskDef("task0"));

WorkflowTask task2 = new WorkflowTask();
task2.setName("task1");
task2.setType("PERMISSIVE");
task2.setPermissive(true);
task2.setTaskReferenceName("t1");
task2.setTaskDefinition(new TaskDef("task1"));

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
if (from.getExpression() != null) {
to.setExpression( from.getExpression() );
}
to.setPermissive( from.isPermissive() );
return to.build();
}

Expand Down Expand Up @@ -1396,6 +1397,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
to.setRetryCount( from.getRetryCount() );
to.setEvaluatorType( from.getEvaluatorType() );
to.setExpression( from.getExpression() );
to.setPermissive( from.getPermissive() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/workflowtask.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ message WorkflowTask {
int32 retry_count = 26;
string evaluator_type = 27;
string expression = 28;
bool permissive = 29;
}
Loading

0 comments on commit fdf5f5e

Please sign in to comment.