diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index 2bae5e12b97f..f9432da84f6d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -117,6 +117,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -2122,6 +2123,24 @@ protected void doBatchOperateWorkflowDefinition(User loginUser, long taskCode = CodeGenerateUtils.genCode(); taskCodeMap.put(taskDefinitionLog.getCode(), taskCode); taskDefinitionLog.setCode(taskCode); + if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) { + final String taskParams = taskDefinitionLog.getTaskParams(); + final SwitchParameters switchParameters = + JSONUtils.parseObject(taskParams, SwitchParameters.class); + if (switchParameters == null) { + throw new IllegalArgumentException( + "Switch task params: " + taskParams + " is invalid."); + } + SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); + switchResult.getDependTaskList().forEach(switchResultVo -> { + switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode())); + }); + if (switchResult.getNextNode() != null) { + switchResult.setNextNode( + taskCodeMap.get(switchResult.getNextNode())); + } + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters)); + } } catch (CodeGenerateException e) { log.error("Generate task definition code error, projectCode:{}.", targetProjectCode, e); putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java index 5fc60f9561b6..15064f738543 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java @@ -207,6 +207,85 @@ public void before() { user = loginUser; } + @Test() + public void testCopyworkflowLogicalNodeSwitch() { + long projectCode = 128645169571296L; + String codes = "128645230604768"; + long targetProjectCode = 128645169571296L; + long shellTaskDefinitionCode = 128645175846368L; + long switchTaskDefinitionCode = 128645191546336L; + + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS); + + Set definitionCodes = new HashSet<>(); + for (String code : String.valueOf(codes).split(Constants.COMMA)) { + try { + long parse = Long.parseLong(code); + definitionCodes.add(parse); + } catch (NumberFormatException e) { + Assertions.fail(); + } + } + + WorkflowDefinition workflowDefinition = new WorkflowDefinition(); + workflowDefinition.setId(1); + workflowDefinition.setCode(Long.parseLong(codes)); + workflowDefinition.setName("workflow_switch"); + workflowDefinition.setDescription(""); + workflowDefinition.setVersion(1); + workflowDefinition.setReleaseState(ReleaseState.OFFLINE); + workflowDefinition.setProjectCode(projectCode); + workflowDefinition.setUserId(user.getId()); + List workflowDefinitionList = new ArrayList<>(); + workflowDefinitionList.add(workflowDefinition); + + Project project = new Project(); + project.setCode(projectCode); + project.setId(1); + project.setName("project_switch"); + project.setUserId(user.getId()); + + List workflowTaskRelations = new ArrayList<>(); + WorkflowTaskRelation workflowTaskRelationShell = getWorkflowTaskRelation(1, 1, projectCode, + workflowDefinition.getCode(), 0L, 1, switchTaskDefinitionCode, 1); + WorkflowTaskRelation workflowTaskRelationSwitch = getWorkflowTaskRelation(2, 1, projectCode, + workflowDefinition.getCode(), switchTaskDefinitionCode, 1, shellTaskDefinitionCode, 1); + + workflowTaskRelations.add(workflowTaskRelationShell); + workflowTaskRelations.add(workflowTaskRelationSwitch); + + String taskDefinitionLogJson = + "[{\"id\":1,\"code\":128645175846368,\"name\":\"shellA\",\"version\":1,\"description\":\"\",\"projectCode\":128645169571296,\"userId\":1,\"taskType\":\"SHELL\"," + + + "\"taskParams\":{\"localParams\":[],\"rawScript\":\"echo 'A'\",\"resourceList\":[]},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"environmentCode\":-1," + + + "\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\",\"timeout\":0,\"delayTime\":0,\"createTime\":\"2024-12-25 01:15:08\",\"updateTime\":\"2024-12-25 01:15:08\"," + + + "\"taskGroupId\":0,\"taskGroupPriority\":0,\"cpuQuota\":-1,\"memoryMax\":-1,\"taskExecuteType\":\"BATCH\"},{\"id\":2,\"code\":128645191546336,\"name\":\"switchA\",\"version\":1," + + + "\"description\":\"\",\"projectCode\":128645169571296,\"userId\":1,\"taskType\":\"SWITCH\"," + + "\"taskParams\":{\"localParams\":[],\"rawScript\":\"\",\"resourceList\":[],\"switchResult\":{\"dependTaskList\":[{\"condition\":\"${value} == 'A'\",\"nextNode\":128645175846368}]," + + + "\"nextNode\":128645175846368}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"environmentCode\":-1,\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\"," + + + "\"timeout\":0,\"delayTime\":0,\"createTime\":\"2024-12-25 01:15:08\",\"updateTime\":\"2024-12-25 01:15:08\",\"taskGroupId\":0,\"taskGroupPriority\":0,\"cpuQuota\":-1,\"memoryMax\":-1,\"taskExecuteType\":\"BATCH\"}]"; + + List taskDefinitionLogs = JSONUtils.toList(taskDefinitionLogJson, TaskDefinitionLog.class); + + when(projectMapper.queryByCode(projectCode)).thenReturn(project); + when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_BATCH_COPY)).thenReturn(result); + when(workflowDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(workflowDefinitionList); + when(workflowTaskRelationMapper.queryByWorkflowDefinitionCode(Long.parseLong(codes))) + .thenReturn(workflowTaskRelations); + when(taskDefinitionLogDao.queryTaskDefineLogList(workflowTaskRelations)).thenReturn(taskDefinitionLogs); + when(processService.saveTaskDefine(user, projectCode, taskDefinitionLogs, true)).thenReturn(1); + when(processService.saveWorkflowDefine(user, workflowDefinition, true, true)).thenReturn(1); + Map successRes = + processDefinitionService.batchCopyWorkflowDefinition(user, projectCode, codes, targetProjectCode); + Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + + } @Test public void testQueryWorkflowDefinitionList() { when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); @@ -1122,6 +1201,22 @@ private List getProcessTaskRelation() { return workflowTaskRelations; } + private WorkflowTaskRelation getWorkflowTaskRelation(int id, int workflowDefinitionVersion, long projectCode, + long workflowDefinitionCode, long preTaskCode, + int preTaskVersion, + long postTaskCode, int postTaskVersion) { + WorkflowTaskRelation workflowTaskRelation = new WorkflowTaskRelation(); + workflowTaskRelation.setId(id); + workflowTaskRelation.setWorkflowDefinitionVersion(workflowDefinitionVersion); + workflowTaskRelation.setProjectCode(projectCode); + workflowTaskRelation.setWorkflowDefinitionCode(workflowDefinitionCode); + workflowTaskRelation.setPreTaskCode(preTaskCode); + workflowTaskRelation.setPreTaskVersion(preTaskVersion); + workflowTaskRelation.setPostTaskCode(postTaskCode); + workflowTaskRelation.setPostTaskVersion(postTaskVersion); + return workflowTaskRelation; + } + /** * get mock schedule *