From db246993ae0260c5cc66d76906bd8795b59c0282 Mon Sep 17 00:00:00 2001 From: saimedhi Date: Thu, 17 Oct 2024 15:16:39 -0700 Subject: [PATCH] added Signed-off-by: saimedhi --- .../ReprovisionWorkflowTransportAction.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java index 54f6a332..04ff5f5a 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java @@ -270,19 +270,39 @@ private void executeWorkflowAsync( ActionListener listener ) { try { - threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(template, workflowSequence, workflowId); }); + threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { + updateTemplate( template,workflowId); + executeWorkflow(workflowSequence, workflowId); }); } catch (Exception exception) { listener.onFailure(new FlowFrameworkException("Failed to execute workflow " + workflowId, ExceptionsHelper.status(exception))); } } /** - * Executes the given workflow sequence + * Replace template document * @param template The template to store after reprovisioning completes successfully + * @param workflowId The workflowId associated with the workflow that is executing + */ + private void updateTemplate ( Template template, String workflowId) { + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + workflowId, + template, + ActionListener.wrap(templateResponse -> { + logger.info("Updated template for {}", workflowId, State.COMPLETED); + }, exception -> { + String errorMessage = "Failed to update use case template for " + workflowId; + logger.error(errorMessage, exception); + }), + true // ignores NOT_STARTED state if request is to reprovision + ); + } + + /** + * Executes the given workflow sequence * @param workflowSequence The topologically sorted workflow to execute * @param workflowId The workflowId associated with the workflow that is executing */ - private void executeWorkflow(Template template, List workflowSequence, String workflowId) { + private void executeWorkflow(List workflowSequence, String workflowId) { String currentStepId = ""; try { Map> workflowFutureMap = new LinkedHashMap<>(); @@ -290,7 +310,7 @@ private void executeWorkflow(Template template, List workflowSequen List predecessors = processNode.predecessors(); logger.info( "Queueing process [{}].{}", - processNode.id(), + processNode.id() + processNode.workflowStep().getName(), predecessors.isEmpty() ? " Can start immediately!" : String.format( @@ -321,18 +341,6 @@ private void executeWorkflow(Template template, List workflowSequen logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED); - // Replace template document - flowFrameworkIndicesHandler.updateTemplateInGlobalContext( - workflowId, - template, - ActionListener.wrap(templateResponse -> { - logger.info("Updated template for {}", workflowId, State.COMPLETED); - }, exception -> { - String errorMessage = "Failed to update use case template for " + workflowId; - logger.error(errorMessage, exception); - }), - true // ignores NOT_STARTED state if request is to reprovision - ); }, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); }) ); } catch (Exception ex) {