diff --git a/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java b/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java index adb18972..acb31dfb 100644 --- a/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java +++ b/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java @@ -5,6 +5,7 @@ import it.gov.pagopa.admissibility.dto.commands.QueueCommandOperationDTO; import it.gov.pagopa.admissibility.service.AdmissibilityErrorNotifierService; import it.gov.pagopa.admissibility.service.commands.operations.DeleteInitiativeService; +import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService; import it.gov.pagopa.admissibility.utils.CommandConstants; import it.gov.pagopa.common.reactive.kafka.consumer.BaseKafkaConsumer; import lombok.extern.slf4j.Slf4j; @@ -23,20 +24,29 @@ public class CommandMediatorServiceImpl extends BaseKafkaConsumer implements CommandMediatorService{ private final Duration commitDelay; + private final Duration beneficiaryRulesBuildDelayMinusCommit; private final DeleteInitiativeService deleteInitiativeService; + private final OnboardingContextHolderService onboardingContextHolderService; private final AdmissibilityErrorNotifierService admissibilityErrorNotifierService; private final ObjectReader objectReader; public CommandMediatorServiceImpl(@Value("${spring.application.name}") String applicationName, @Value("${spring.cloud.stream.kafka.bindings.consumerCommands-in-0.consumer.ackTime}") long commitMillis, + @Value("${app.beneficiary-rule.build-delay-duration}") String beneficiaryRulesBuildDelay, + DeleteInitiativeService deleteInitiativeService, - AdmissibilityErrorNotifierService admissibilityErrorNotifierService, + OnboardingContextHolderService onboardingContextHolderService, AdmissibilityErrorNotifierService admissibilityErrorNotifierService, ObjectMapper objectMapper) { super(applicationName); this.commitDelay = Duration.ofMillis(commitMillis); this.deleteInitiativeService = deleteInitiativeService; + this.onboardingContextHolderService = onboardingContextHolderService; this.admissibilityErrorNotifierService = admissibilityErrorNotifierService; this.objectReader = objectMapper.readerFor(QueueCommandOperationDTO.class); + + Duration beneficiaryRulesBuildDelayDuration = Duration.parse(beneficiaryRulesBuildDelay).minusMillis(commitMillis); + Duration defaultDurationDelay = Duration.ofMillis(2L); + this.beneficiaryRulesBuildDelayMinusCommit = defaultDurationDelay.compareTo(beneficiaryRulesBuildDelayDuration) >= 0 ? defaultDurationDelay : beneficiaryRulesBuildDelayDuration; } @Override @@ -47,6 +57,8 @@ protected Duration getCommitDelay() { @Override protected void subscribeAfterCommits(Flux> afterCommits2subscribe) { afterCommits2subscribe + .buffer(beneficiaryRulesBuildDelayMinusCommit) + .then(onboardingContextHolderService.refreshKieContainerCacheMiss()) .subscribe(r -> log.info("[ADMISSIBILITY_COMMANDS] Processed offsets committed successfully")); } diff --git a/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java b/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java index 6afd9f55..b589d41f 100644 --- a/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java +++ b/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java @@ -3,8 +3,8 @@ import it.gov.pagopa.admissibility.connector.repository.DroolsRuleRepository; import it.gov.pagopa.admissibility.connector.repository.InitiativeCountersRepository; import it.gov.pagopa.admissibility.connector.repository.OnboardingFamiliesRepository; -import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService; import it.gov.pagopa.admissibility.utils.AuditUtilities; +import it.gov.pagopa.common.reactive.utils.PerformanceLogger; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -20,7 +20,6 @@ public class DeleteInitiativeServiceImpl implements DeleteInitiativeService{ private final InitiativeCountersRepository initiativeCountersRepository; private final OnboardingFamiliesRepository onboardingFamiliesRepository; private final AuditUtilities auditUtilities; - private final OnboardingContextHolderService onboardingContextHolderService; private final int pageSize; private final long delay; @@ -28,14 +27,12 @@ public DeleteInitiativeServiceImpl(DroolsRuleRepository droolsRuleRepository, InitiativeCountersRepository initiativeCountersRepository, OnboardingFamiliesRepository onboardingFamiliesRepository, AuditUtilities auditUtilities, - OnboardingContextHolderService onboardingContextHolderService, @Value("${app.delete.paginationSize}") int pageSize, @Value("${app.delete.delayTime}") long delay) { this.droolsRuleRepository = droolsRuleRepository; this.initiativeCountersRepository = initiativeCountersRepository; this.onboardingFamiliesRepository = onboardingFamiliesRepository; this.auditUtilities = auditUtilities; - this.onboardingContextHolderService = onboardingContextHolderService; this.pageSize = pageSize; this.delay = delay; } @@ -43,19 +40,22 @@ public DeleteInitiativeServiceImpl(DroolsRuleRepository droolsRuleRepository, @Override public Mono execute(String initiativeId) { log.info("[DELETE_INITIATIVE] Starting handle delete initiative {}", initiativeId); - return deleteDroolsRule(initiativeId) - .then(deleteInitiativeCounters(initiativeId)) - .then(deleteOnboardingFamilies(initiativeId)) + return execAndLogTiming("DELETE_DROOLS_RULE", initiativeId, deleteDroolsRule(initiativeId)) + .then(execAndLogTiming("DELETE_INITIATIVE_COUNTERS", initiativeId, deleteInitiativeCounters(initiativeId))) + .then(execAndLogTiming("DELETE_ONBOARDING_FAMILIES", initiativeId, deleteOnboardingFamilies(initiativeId))) .then(Mono.just(initiativeId)); } + private Mono execAndLogTiming(String deleteFlowName, String initiativeId, Mono deleteMono) { + return PerformanceLogger.logTimingFinally(deleteFlowName, deleteMono, initiativeId); + } + private Mono deleteDroolsRule(String initiativeId) { return droolsRuleRepository.deleteById(initiativeId) .doOnSuccess(d -> { log.info("[DELETE_INITIATIVE] Deleted initiative {} from collection: beneficiary_rule", initiativeId); auditUtilities.logDeletedDroolsRule(initiativeId); }) - .then(onboardingContextHolderService.refreshKieContainerCacheMiss()) .then(); } diff --git a/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java b/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java index 1981aa02..757db668 100644 --- a/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java +++ b/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java @@ -32,18 +32,20 @@ import java.util.stream.IntStream; @TestPropertySource(properties = { + "app.beneficiary-rule.build-delay-duration=PT1S", "logging.level.it.gov.pagopa.admissibility.service.commands.CommandMediatorServiceImpl=WARN", "logging.level.it.gov.pagopa.admissibility.service.commands.operations.DeleteInitiativeServiceImpl=WARN", "logging.level.it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderServiceImpl=WARN", + "logging.level.it.gov.pagopa.common.reactive.utils.PerformanceLogger=WARN" }) class CommandConsumerConfigIntegrationTest extends BaseIntegrationTest { private final String INITIATIVEID = "INITIATIVEID_%d"; private final Set INITIATIVES_DELETED = new HashSet<>(); @SpyBean - private DroolsRuleRepository droolsRuleRepository; + private DroolsRuleRepository droolsRuleRepositorySpy; + @Autowired private InitiativeCountersRepository initiativeCountersRepository; - @Autowired private OnboardingFamiliesRepository onboardingFamiliesRepository; @@ -68,6 +70,8 @@ void test() { checkRepositories(); checkErrorsPublished(notValidMessages, maxWaitingMs, errorUseCases); + Mockito.verify(droolsRuleRepositorySpy).findAll(); + System.out.printf(""" ************************ Time spent to send %d (%d + %d) messages (from start): %d millis @@ -140,7 +144,7 @@ private void initializeDB(int bias) { .initiativeConfig(initiativeConfig) .rule("") .build(); - droolsRuleRepository.save(droolsRule).block(); + droolsRuleRepositorySpy.save(droolsRule).block(); InitiativeCounters initiativeCounters = InitiativeCounters.builder() .id(INITIATIVEID.formatted(bias)) @@ -184,7 +188,7 @@ protected Pattern getErrorUseCaseIdPatternMatch() { errorUseCases.add(Pair.of( () -> { Mockito.doThrow(new MongoException("Command error dummy")) - .when(droolsRuleRepository).deleteById(errorInitiativeId); + .when(droolsRuleRepositorySpy).deleteById(errorInitiativeId); return commandOperationErrorString; }, errorMessage -> checkErrorMessageHeaders(errorMessage, "[ADMISSIBILITY_COMMANDS] An error occurred evaluating commands", commandOperationErrorString) @@ -192,7 +196,7 @@ protected Pattern getErrorUseCaseIdPatternMatch() { } private void checkRepositories() { - Assertions.assertTrue(droolsRuleRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId()))); + Assertions.assertTrue(droolsRuleRepositorySpy.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId()))); Assertions.assertTrue(initiativeCountersRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId()))); Assertions.assertTrue(onboardingFamiliesRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getInitiativeId()))); } diff --git a/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java b/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java index 81fc91f3..08b77d76 100644 --- a/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java +++ b/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java @@ -6,13 +6,11 @@ import it.gov.pagopa.admissibility.connector.repository.OnboardingFamiliesRepository; import it.gov.pagopa.admissibility.dto.onboarding.extra.Family; import it.gov.pagopa.admissibility.model.OnboardingFamilies; -import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService; import it.gov.pagopa.admissibility.utils.AuditUtilities; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.kie.api.KieBase; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -25,7 +23,6 @@ class DeleteInitiativeServiceImplTest { @Mock private InitiativeCountersRepository initiativeCountersRepositoryMock; @Mock private OnboardingFamiliesRepository onboardingFamiliesRepositoryMock; @Mock private AuditUtilities auditUtilitiesMock; - @Mock private OnboardingContextHolderService onboardingContextHolderService; private DeleteInitiativeService deleteInitiativeService; private final static int PAGE_SIZE = 100; @@ -37,7 +34,8 @@ void setUp() { droolsRuleRepositoryMock, initiativeCountersRepositoryMock, onboardingFamiliesRepositoryMock, - auditUtilitiesMock, onboardingContextHolderService, PAGE_SIZE, 1000L); + auditUtilitiesMock, + PAGE_SIZE, 1000L); } @Test @@ -48,9 +46,6 @@ void executeOK() { Mockito.when(droolsRuleRepositoryMock.deleteById(initiativeId)) .thenReturn(Mono.just(Mockito.mock(Void.class))); - Mockito.when(onboardingContextHolderService.refreshKieContainerCacheMiss()) - .thenReturn(Mono.just(Mockito.mock(KieBase.class))); - Mockito.when(initiativeCountersRepositoryMock.deleteById(initiativeId)) .thenReturn(Mono.just(Mockito.mock(Void.class)));