diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d4b21d0..1ee5340d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) - Incrementally remove resources from workflow state during deprovisioning ([#898](https://github.com/opensearch-project/flow-framework/pull/898)) ### Bug Fixes +- Fixed Template Update Location and Improved Logger Statements in ReprovisionWorkflowTransportAction ([#918](https://github.com/opensearch-project/flow-framework/pull/918)) + ### Infrastructure ### Documentation - Add knowledge base alert agent into sample templates ([#874](https://github.com/opensearch-project/flow-framework/pull/874)) diff --git a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java index 54f6a332..867c61f6 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java @@ -270,12 +270,28 @@ private void executeWorkflowAsync( ActionListener listener ) { try { - threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { executeWorkflow(template, workflowSequence, workflowId); }); + threadPool.executor(PROVISION_WORKFLOW_THREAD_POOL).execute(() -> { + updateTemplate(template, workflowId); + executeWorkflow(template, workflowSequence, workflowId); + }); } catch (Exception exception) { listener.onFailure(new FlowFrameworkException("Failed to execute workflow " + workflowId, ExceptionsHelper.status(exception))); } } + /** + * Replace template document + * @param template The template to store after reprovisioning completes successfully + * @param workflowId The workflowId associated with the workflow that is executing + */ + private void updateTemplate(Template template, String workflowId) { + flowFrameworkIndicesHandler.updateTemplateInGlobalContext(workflowId, template, ActionListener.wrap(templateResponse -> { + logger.info("Updated template for {}", workflowId); + }, exception -> { logger.error("Failed to update use case template for {}", workflowId, exception); }), + true // ignores NOT_STARTED state if request is to reprovision + ); + } + /** * Executes the given workflow sequence * @param template The template to store after reprovisioning completes successfully @@ -289,8 +305,9 @@ private void executeWorkflow(Template template, List workflowSequen for (ProcessNode processNode : workflowSequence) { List predecessors = processNode.predecessors(); logger.info( - "Queueing process [{}].{}", + "Queueing Process [{} (type: {})].{}", processNode.id(), + processNode.workflowStep().getName(), predecessors.isEmpty() ? " Can start immediately!" : String.format( @@ -321,18 +338,6 @@ private void executeWorkflow(Template template, List workflowSequen logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED); - // Replace template document - flowFrameworkIndicesHandler.updateTemplateInGlobalContext( - workflowId, - template, - ActionListener.wrap(templateResponse -> { - logger.info("Updated template for {}", workflowId, State.COMPLETED); - }, exception -> { - String errorMessage = "Failed to update use case template for " + workflowId; - logger.error(errorMessage, exception); - }), - true // ignores NOT_STARTED state if request is to reprovision - ); }, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); }) ); } catch (Exception ex) {