Skip to content

Commit

Permalink
[Fix-16900][api] fix when the copied workflow, the logical node switc…
Browse files Browse the repository at this point in the history
…h loses the flow relationship (#16911)
  • Loading branch information
llllkid authored Dec 30, 2024
1 parent 47d0f22 commit adee537
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
Expand Down Expand Up @@ -2122,6 +2123,24 @@ protected void doBatchOperateWorkflowDefinition(User loginUser,
long taskCode = CodeGenerateUtils.genCode();
taskCodeMap.put(taskDefinitionLog.getCode(), taskCode);
taskDefinitionLog.setCode(taskCode);
if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) {
final String taskParams = taskDefinitionLog.getTaskParams();
final SwitchParameters switchParameters =
JSONUtils.parseObject(taskParams, SwitchParameters.class);
if (switchParameters == null) {
throw new IllegalArgumentException(
"Switch task params: " + taskParams + " is invalid.");
}
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
switchResult.getDependTaskList().forEach(switchResultVo -> {
switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
});
if (switchResult.getNextNode() != null) {
switchResult.setNextNode(
taskCodeMap.get(switchResult.getNextNode()));
}
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
}
} catch (CodeGenerateException e) {
log.error("Generate task definition code error, projectCode:{}.", targetProjectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,85 @@ public void before() {
user = loginUser;
}

@Test()
public void testCopyworkflowLogicalNodeSwitch() {
long projectCode = 128645169571296L;
String codes = "128645230604768";
long targetProjectCode = 128645169571296L;
long shellTaskDefinitionCode = 128645175846368L;
long switchTaskDefinitionCode = 128645191546336L;

Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);

Set<Long> definitionCodes = new HashSet<>();
for (String code : String.valueOf(codes).split(Constants.COMMA)) {
try {
long parse = Long.parseLong(code);
definitionCodes.add(parse);
} catch (NumberFormatException e) {
Assertions.fail();
}
}

WorkflowDefinition workflowDefinition = new WorkflowDefinition();
workflowDefinition.setId(1);
workflowDefinition.setCode(Long.parseLong(codes));
workflowDefinition.setName("workflow_switch");
workflowDefinition.setDescription("");
workflowDefinition.setVersion(1);
workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
workflowDefinition.setProjectCode(projectCode);
workflowDefinition.setUserId(user.getId());
List<WorkflowDefinition> workflowDefinitionList = new ArrayList<>();
workflowDefinitionList.add(workflowDefinition);

Project project = new Project();
project.setCode(projectCode);
project.setId(1);
project.setName("project_switch");
project.setUserId(user.getId());

List<WorkflowTaskRelation> workflowTaskRelations = new ArrayList<>();
WorkflowTaskRelation workflowTaskRelationShell = getWorkflowTaskRelation(1, 1, projectCode,
workflowDefinition.getCode(), 0L, 1, switchTaskDefinitionCode, 1);
WorkflowTaskRelation workflowTaskRelationSwitch = getWorkflowTaskRelation(2, 1, projectCode,
workflowDefinition.getCode(), switchTaskDefinitionCode, 1, shellTaskDefinitionCode, 1);

workflowTaskRelations.add(workflowTaskRelationShell);
workflowTaskRelations.add(workflowTaskRelationSwitch);

String taskDefinitionLogJson =
"[{\"id\":1,\"code\":128645175846368,\"name\":\"shellA\",\"version\":1,\"description\":\"\",\"projectCode\":128645169571296,\"userId\":1,\"taskType\":\"SHELL\","
+
"\"taskParams\":{\"localParams\":[],\"rawScript\":\"echo 'A'\",\"resourceList\":[]},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"environmentCode\":-1,"
+
"\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\",\"timeout\":0,\"delayTime\":0,\"createTime\":\"2024-12-25 01:15:08\",\"updateTime\":\"2024-12-25 01:15:08\","
+
"\"taskGroupId\":0,\"taskGroupPriority\":0,\"cpuQuota\":-1,\"memoryMax\":-1,\"taskExecuteType\":\"BATCH\"},{\"id\":2,\"code\":128645191546336,\"name\":\"switchA\",\"version\":1,"
+
"\"description\":\"\",\"projectCode\":128645169571296,\"userId\":1,\"taskType\":\"SWITCH\"," +
"\"taskParams\":{\"localParams\":[],\"rawScript\":\"\",\"resourceList\":[],\"switchResult\":{\"dependTaskList\":[{\"condition\":\"${value} == 'A'\",\"nextNode\":128645175846368}],"
+
"\"nextNode\":128645175846368}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"environmentCode\":-1,\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+
"\"timeout\":0,\"delayTime\":0,\"createTime\":\"2024-12-25 01:15:08\",\"updateTime\":\"2024-12-25 01:15:08\",\"taskGroupId\":0,\"taskGroupPriority\":0,\"cpuQuota\":-1,\"memoryMax\":-1,\"taskExecuteType\":\"BATCH\"}]";

List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionLogJson, TaskDefinitionLog.class);

when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_BATCH_COPY)).thenReturn(result);
when(workflowDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(workflowDefinitionList);
when(workflowTaskRelationMapper.queryByWorkflowDefinitionCode(Long.parseLong(codes)))
.thenReturn(workflowTaskRelations);
when(taskDefinitionLogDao.queryTaskDefineLogList(workflowTaskRelations)).thenReturn(taskDefinitionLogs);
when(processService.saveTaskDefine(user, projectCode, taskDefinitionLogs, true)).thenReturn(1);
when(processService.saveWorkflowDefine(user, workflowDefinition, true, true)).thenReturn(1);
Map<String, Object> successRes =
processDefinitionService.batchCopyWorkflowDefinition(user, projectCode, codes, targetProjectCode);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));

}
@Test
public void testQueryWorkflowDefinitionList() {
when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Expand Down Expand Up @@ -1122,6 +1201,22 @@ private List<WorkflowTaskRelation> getProcessTaskRelation() {
return workflowTaskRelations;
}

private WorkflowTaskRelation getWorkflowTaskRelation(int id, int workflowDefinitionVersion, long projectCode,
long workflowDefinitionCode, long preTaskCode,
int preTaskVersion,
long postTaskCode, int postTaskVersion) {
WorkflowTaskRelation workflowTaskRelation = new WorkflowTaskRelation();
workflowTaskRelation.setId(id);
workflowTaskRelation.setWorkflowDefinitionVersion(workflowDefinitionVersion);
workflowTaskRelation.setProjectCode(projectCode);
workflowTaskRelation.setWorkflowDefinitionCode(workflowDefinitionCode);
workflowTaskRelation.setPreTaskCode(preTaskCode);
workflowTaskRelation.setPreTaskVersion(preTaskVersion);
workflowTaskRelation.setPostTaskCode(postTaskCode);
workflowTaskRelation.setPostTaskVersion(postTaskVersion);
return workflowTaskRelation;
}

/**
* get mock schedule
*
Expand Down

0 comments on commit adee537

Please sign in to comment.