diff --git a/.grype.yaml b/.grype.yaml index 57006e14..70db95e6 100644 --- a/.grype.yaml +++ b/.grype.yaml @@ -1,6 +1,6 @@ ignore: # false positive match on reactor-netty packages due to a bug on grype: https://github.com/anchore/grype/issues/431 - # Actually we are using netty 4.1.94 + # Actually we are using netty 4.1.100 - vulnerability: CVE-2014-3488 # solved in netty 3.9.2 - vulnerability: CVE-2015-2156 # solved in netty 4.1.42 - vulnerability: CVE-2019-16869 # solved in netty 4.1.42 @@ -15,3 +15,4 @@ ignore: - vulnerability: CVE-2022-24823 # solved in netty 4.1.77 - vulnerability: CVE-2022-41881 # solved in netty 4.1.86 - vulnerability: CVE-2023-34462 # solved in netty 4.1.94 + - vulnerability: CVE-2023-44487 # solved in netty 4.1.100 diff --git a/Dockerfile b/Dockerfile index ac1d0682..ff038b9a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ # # Build # -FROM maven:3.9.4-amazoncorretto-17-al2023@sha256:c7719f952f62e301c6c24b86ef9a2ea1cd0a314a862ed12e51f0ffbc3fbb96b5 AS buildtime +FROM maven:3.9.5-amazoncorretto-17-al2023@sha256:b7f94a5f1b6582a045692e31c2c97ef6f0ed867961669a0adbc2d5f0bbf8bc85 AS buildtime WORKDIR /build COPY . . @@ -11,7 +11,7 @@ RUN mvn clean package -DskipTests # # Docker RUNTIME # -FROM amazoncorretto:17.0.8-alpine3.18@sha256:34650d7c653af234dad21cd2d89d2f0dbdb1bad54041014932e51b3492e0dec5 AS runtime +FROM amazoncorretto:17.0.9-alpine3.18@sha256:df48bf2e183230040890460ddb4359a10aa6c7aad24bd88899482c52053c7e17 AS runtime RUN apk add shadow RUN useradd --uid 10000 runner @@ -21,7 +21,7 @@ WORKDIR /app COPY --from=buildtime /build/target/*.jar /app/app.jar # The agent is enabled at runtime via JAVA_TOOL_OPTIONS. -ADD https://github.com/microsoft/ApplicationInsights-Java/releases/download/3.4.16/applicationinsights-agent-3.4.16.jar /app/applicationinsights-agent.jar +ADD https://github.com/microsoft/ApplicationInsights-Java/releases/download/3.4.17/applicationinsights-agent-3.4.17.jar /app/applicationinsights-agent.jar RUN chown -R runner:runner /app diff --git a/pom.xml b/pom.xml index 3af88fa5..ba6dd1fd 100644 --- a/pom.xml +++ b/pom.xml @@ -4,17 +4,18 @@ org.springframework.boot spring-boot-starter-parent - 3.1.3 - - + 3.1.5 + it.gov.pagopa idpay-admissibility-assessor - 1.2.1 idpay-admissibility-assessor + 1.2.1 + 17 + @@ -67,41 +68,41 @@ com.azure.spring spring-cloud-azure-stream-binder-servicebus - 5.4.0 + 5.5.0 jakarta.activation jakarta.activation-api - 2.1.0 + 2.1.2 jakarta.xml.soap jakarta.xml.soap-api - 3.0.0 + 3.0.1 com.sun.xml.ws jaxws-rt - 4.0.1 + 4.0.2 jakarta.xml.ws jakarta.xml.ws-api - 4.0.0 + 4.0.1 jakarta.xml.bind jakarta.xml.bind-api - 4.0.0 + 4.0.1 org.projectlombok lombok - 1.18.28 + 1.18.30 org.apache.commons @@ -116,7 +117,7 @@ com.google.guava guava - 32.1.2-jre + 32.1.3-jre org.openapitools @@ -131,7 +132,7 @@ org.springdoc springdoc-openapi-starter-webflux-ui - 2.1.0 + 2.2.0 org.apache.commons @@ -143,14 +144,14 @@ org.yaml snakeyaml - - 2.0 + + 2.2 org.xerial.snappy snappy-java - - 1.1.10.3 + + 1.1.10.5 test @@ -163,13 +164,11 @@ org.junit.jupiter junit-jupiter-engine - 5.8.2 test org.junit.platform junit-platform-launcher - 1.8.2 test @@ -180,7 +179,7 @@ org.springframework.cloud spring-cloud-contract-wiremock - 4.0.2 + 4.0.4 test @@ -197,7 +196,7 @@ de.flapdoodle.embed de.flapdoodle.embed.mongo.spring30x - 4.7.0 + 4.9.3 test @@ -229,6 +228,15 @@ pom import + + + io.netty + netty-bom + + 4.1.100.Final + pom + import + diff --git a/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java b/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java index adb18972..acb31dfb 100644 --- a/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java +++ b/src/main/java/it/gov/pagopa/admissibility/service/commands/CommandMediatorServiceImpl.java @@ -5,6 +5,7 @@ import it.gov.pagopa.admissibility.dto.commands.QueueCommandOperationDTO; import it.gov.pagopa.admissibility.service.AdmissibilityErrorNotifierService; import it.gov.pagopa.admissibility.service.commands.operations.DeleteInitiativeService; +import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService; import it.gov.pagopa.admissibility.utils.CommandConstants; import it.gov.pagopa.common.reactive.kafka.consumer.BaseKafkaConsumer; import lombok.extern.slf4j.Slf4j; @@ -23,20 +24,29 @@ public class CommandMediatorServiceImpl extends BaseKafkaConsumer implements CommandMediatorService{ private final Duration commitDelay; + private final Duration beneficiaryRulesBuildDelayMinusCommit; private final DeleteInitiativeService deleteInitiativeService; + private final OnboardingContextHolderService onboardingContextHolderService; private final AdmissibilityErrorNotifierService admissibilityErrorNotifierService; private final ObjectReader objectReader; public CommandMediatorServiceImpl(@Value("${spring.application.name}") String applicationName, @Value("${spring.cloud.stream.kafka.bindings.consumerCommands-in-0.consumer.ackTime}") long commitMillis, + @Value("${app.beneficiary-rule.build-delay-duration}") String beneficiaryRulesBuildDelay, + DeleteInitiativeService deleteInitiativeService, - AdmissibilityErrorNotifierService admissibilityErrorNotifierService, + OnboardingContextHolderService onboardingContextHolderService, AdmissibilityErrorNotifierService admissibilityErrorNotifierService, ObjectMapper objectMapper) { super(applicationName); this.commitDelay = Duration.ofMillis(commitMillis); this.deleteInitiativeService = deleteInitiativeService; + this.onboardingContextHolderService = onboardingContextHolderService; this.admissibilityErrorNotifierService = admissibilityErrorNotifierService; this.objectReader = objectMapper.readerFor(QueueCommandOperationDTO.class); + + Duration beneficiaryRulesBuildDelayDuration = Duration.parse(beneficiaryRulesBuildDelay).minusMillis(commitMillis); + Duration defaultDurationDelay = Duration.ofMillis(2L); + this.beneficiaryRulesBuildDelayMinusCommit = defaultDurationDelay.compareTo(beneficiaryRulesBuildDelayDuration) >= 0 ? defaultDurationDelay : beneficiaryRulesBuildDelayDuration; } @Override @@ -47,6 +57,8 @@ protected Duration getCommitDelay() { @Override protected void subscribeAfterCommits(Flux> afterCommits2subscribe) { afterCommits2subscribe + .buffer(beneficiaryRulesBuildDelayMinusCommit) + .then(onboardingContextHolderService.refreshKieContainerCacheMiss()) .subscribe(r -> log.info("[ADMISSIBILITY_COMMANDS] Processed offsets committed successfully")); } diff --git a/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java b/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java index 6afd9f55..b589d41f 100644 --- a/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java +++ b/src/main/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImpl.java @@ -3,8 +3,8 @@ import it.gov.pagopa.admissibility.connector.repository.DroolsRuleRepository; import it.gov.pagopa.admissibility.connector.repository.InitiativeCountersRepository; import it.gov.pagopa.admissibility.connector.repository.OnboardingFamiliesRepository; -import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService; import it.gov.pagopa.admissibility.utils.AuditUtilities; +import it.gov.pagopa.common.reactive.utils.PerformanceLogger; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -20,7 +20,6 @@ public class DeleteInitiativeServiceImpl implements DeleteInitiativeService{ private final InitiativeCountersRepository initiativeCountersRepository; private final OnboardingFamiliesRepository onboardingFamiliesRepository; private final AuditUtilities auditUtilities; - private final OnboardingContextHolderService onboardingContextHolderService; private final int pageSize; private final long delay; @@ -28,14 +27,12 @@ public DeleteInitiativeServiceImpl(DroolsRuleRepository droolsRuleRepository, InitiativeCountersRepository initiativeCountersRepository, OnboardingFamiliesRepository onboardingFamiliesRepository, AuditUtilities auditUtilities, - OnboardingContextHolderService onboardingContextHolderService, @Value("${app.delete.paginationSize}") int pageSize, @Value("${app.delete.delayTime}") long delay) { this.droolsRuleRepository = droolsRuleRepository; this.initiativeCountersRepository = initiativeCountersRepository; this.onboardingFamiliesRepository = onboardingFamiliesRepository; this.auditUtilities = auditUtilities; - this.onboardingContextHolderService = onboardingContextHolderService; this.pageSize = pageSize; this.delay = delay; } @@ -43,19 +40,22 @@ public DeleteInitiativeServiceImpl(DroolsRuleRepository droolsRuleRepository, @Override public Mono execute(String initiativeId) { log.info("[DELETE_INITIATIVE] Starting handle delete initiative {}", initiativeId); - return deleteDroolsRule(initiativeId) - .then(deleteInitiativeCounters(initiativeId)) - .then(deleteOnboardingFamilies(initiativeId)) + return execAndLogTiming("DELETE_DROOLS_RULE", initiativeId, deleteDroolsRule(initiativeId)) + .then(execAndLogTiming("DELETE_INITIATIVE_COUNTERS", initiativeId, deleteInitiativeCounters(initiativeId))) + .then(execAndLogTiming("DELETE_ONBOARDING_FAMILIES", initiativeId, deleteOnboardingFamilies(initiativeId))) .then(Mono.just(initiativeId)); } + private Mono execAndLogTiming(String deleteFlowName, String initiativeId, Mono deleteMono) { + return PerformanceLogger.logTimingFinally(deleteFlowName, deleteMono, initiativeId); + } + private Mono deleteDroolsRule(String initiativeId) { return droolsRuleRepository.deleteById(initiativeId) .doOnSuccess(d -> { log.info("[DELETE_INITIATIVE] Deleted initiative {} from collection: beneficiary_rule", initiativeId); auditUtilities.logDeletedDroolsRule(initiativeId); }) - .then(onboardingContextHolderService.refreshKieContainerCacheMiss()) .then(); } diff --git a/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspect.java b/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspect.java index 559cbcee..0f5d2be5 100644 --- a/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspect.java +++ b/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspect.java @@ -1,6 +1,7 @@ package it.gov.pagopa.common.reactive.mongo.retry; import it.gov.pagopa.common.reactive.web.ReactiveRequestContextHolder; +import lombok.Generated; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -47,6 +48,12 @@ public MongoRequestRateTooLargeAutomaticRetryAspect( this.maxMillisElapsedBatch = maxMillisElapsedBatch; } + @Generated + @Pointcut("execution(* org.springframework.data.mongodb.repository.*MongoRepository+.*(..))") + public void inSpringRepositoryClass() { + } + + @Generated @Pointcut("within(*..*Repository*)") public void inRepositoryClass() { } @@ -59,35 +66,35 @@ public void returnMono() { public void returnFlux() { } - @Around("inRepositoryClass() && returnMono()") + @Around("(inRepositoryClass() or inSpringRepositoryClass()) && returnMono()") public Object decorateMonoRepositoryMethods(ProceedingJoinPoint pjp) throws Throwable { Mono out = (Mono) pjp.proceed(); - - return Mono.deferContextual(ctx -> decorateMethod(out, ctx)); + String flowName = pjp.getSignature().toShortString(); + return Mono.deferContextual(ctx -> decorateMethod(flowName, out, ctx)); } - @Around("inRepositoryClass() && returnFlux()") + @Around("(inRepositoryClass() or inSpringRepositoryClass()) && returnFlux()") public Object decorateFluxRepositoryMethods(ProceedingJoinPoint pjp) throws Throwable { @SuppressWarnings("unchecked") // only with Flux the compiler return error when using wildcard, so here we are using Object Flux out = (Flux) pjp.proceed(); - - return Flux.deferContextual(ctx -> decorateMethod(out, ctx)); + String flowName = pjp.getSignature().toShortString(); + return Flux.deferContextual(ctx -> decorateMethod(flowName, out, ctx)); } - private > T decorateMethod(T out, ContextView ctx) { + private > T decorateMethod(String flowName, T out, ContextView ctx) { Optional serverWebExchange = ctx.getOrEmpty(ReactiveRequestContextHolder.CONTEXT_KEY); if (serverWebExchange.isEmpty()) { if(enabledBatch) { - return invokeWithRetry(out, maxRetryBatch, maxMillisElapsedBatch); + return invokeWithRetry(flowName, out, maxRetryBatch, maxMillisElapsedBatch); }else { return out; } } else { MongoRequestRateTooLargeApiRetryable apiRetryableConfig = getRequestRateTooLargeApiRetryableConfig(serverWebExchange.get()); if(apiRetryableConfig!=null){ - return invokeWithRetry(out, apiRetryableConfig.maxRetry(), apiRetryableConfig.maxMillisElapsed()); + return invokeWithRetry(flowName, out, apiRetryableConfig.maxRetry(), apiRetryableConfig.maxMillisElapsed()); } else if(enabledApi){ - return invokeWithRetry(out, maxRetryApi, maxMillisElapsedApi); + return invokeWithRetry(flowName, out, maxRetryApi, maxMillisElapsedApi); }else { return out; } @@ -95,12 +102,12 @@ private > T decorateMethod(T out, ContextView ctx) { } @SuppressWarnings("unchecked") - private > T invokeWithRetry(T out, long maxRetry, long maxMillisElapsed) { + private > T invokeWithRetry(String flowName, T out, long maxRetry, long maxMillisElapsed) { if(out instanceof Mono mono) { - return (T) MongoRequestRateTooLargeRetryer.withRetry(mono, maxRetry, + return (T) MongoRequestRateTooLargeRetryer.withRetry(flowName, mono, maxRetry, maxMillisElapsed); } else { - return (T) MongoRequestRateTooLargeRetryer.withRetry((Flux) out, maxRetry, + return (T) MongoRequestRateTooLargeRetryer.withRetry(flowName, (Flux) out, maxRetry, maxMillisElapsed); } } diff --git a/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryer.java b/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryer.java index 67050059..606574e2 100644 --- a/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryer.java +++ b/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryer.java @@ -1,11 +1,7 @@ package it.gov.pagopa.common.reactive.mongo.retry; import it.gov.pagopa.common.reactive.mongo.retry.exception.MongoRequestRateTooLargeRetryExpiredException; -import java.time.Duration; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.springframework.dao.DataAccessException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,6 +9,10 @@ import reactor.util.retry.Retry.RetrySignal; import reactor.util.retry.RetrySpec; +import java.time.Duration; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + @Slf4j public class MongoRequestRateTooLargeRetryer { @@ -21,15 +21,15 @@ private MongoRequestRateTooLargeRetryer() { private static final Pattern RETRY_AFTER_MS_PATTERN = Pattern.compile("RetryAfterMs=(\\d+)"); - public static Mono withRetry(Mono publisher, long maxRetry, long maxMillisElapsed){ - return publisher.retryWhen( buildRetry(maxRetry, maxMillisElapsed, System.currentTimeMillis())); + public static Mono withRetry(String flowName, Mono publisher, long maxRetry, long maxMillisElapsed){ + return publisher.retryWhen( buildRetry(flowName, maxRetry, maxMillisElapsed, System.currentTimeMillis())); } - public static Flux withRetry(Flux publisher, long maxRetry, long maxMillisElapsed){ - return publisher.retryWhen( buildRetry(maxRetry, maxMillisElapsed, System.currentTimeMillis())); + public static Flux withRetry(String flowName, Flux publisher, long maxRetry, long maxMillisElapsed){ + return publisher.retryWhen( buildRetry(flowName, maxRetry, maxMillisElapsed, System.currentTimeMillis())); } - private static Retry buildRetry(long maxRetry, long maxMillisElapsed, long startTime) { + private static Retry buildRetry(String flowName, long maxRetry, long maxMillisElapsed, long startTime) { Long[] retryAfterMs = {null}; return buildBaseRetry(maxRetry) .filter(MongoRequestRateTooLargeRetryer::isRequestRateTooLargeException) @@ -43,32 +43,30 @@ private static Retry buildRetry(long maxRetry, long maxMillisElapsed, long start } if (maxMillisElapsed > 0 && millisElapsed > maxMillisElapsed){ - return Mono.error(buildMongoRequestRateTooLargeRetryExpiredException(maxRetry, e, + return Mono.error(buildMongoRequestRateTooLargeRetryExpiredException(flowName, maxRetry, e, maxMillisElapsed, millisElapsed, retryAfterMs[0])); } if (retryAfterMs[0] != null) { log.info( - "[REQUEST_RATE_TOO_LARGE_RETRY] Retrying after {} ms due to RequestRateTooLargeException: attempt {} of {} after {} ms of max {} ms", - retryAfterMs[0], counter, maxRetry, millisElapsed, maxMillisElapsed); + "[REQUEST_RATE_TOO_LARGE_RETRY][{}] Retrying after {} ms due to RequestRateTooLargeException: attempt {} of {} after {} ms of max {} ms", + flowName, retryAfterMs[0], counter, maxRetry, millisElapsed, maxMillisElapsed); return Mono.delay(Duration.ofMillis(retryAfterMs[0])).then(); }else { log.info( - "[REQUEST_RATE_TOO_LARGE_RETRY] Retrying for RequestRateTooLargeException: attempt {} of {} after {} ms of max {} ms", - counter, maxRetry, millisElapsed, maxMillisElapsed); + "[REQUEST_RATE_TOO_LARGE_RETRY][{}] Retrying for RequestRateTooLargeException: attempt {} of {} after {} ms of max {} ms", + flowName, counter, maxRetry, millisElapsed, maxMillisElapsed); return Mono.empty(); } }) .onRetryExhaustedThrow((r, e) -> buildMongoRequestRateTooLargeRetryExpiredException( - maxRetry, e, maxMillisElapsed, + flowName, maxRetry, e, maxMillisElapsed, System.currentTimeMillis() - startTime, retryAfterMs[0])); } - - @NotNull private static MongoRequestRateTooLargeRetryExpiredException buildMongoRequestRateTooLargeRetryExpiredException( - long maxRetry, RetrySignal e, long maxMillisElapsed, long startTime, Long retryAfterMs) { - return new MongoRequestRateTooLargeRetryExpiredException(maxRetry, e.totalRetries() + 1, + String flowName, long maxRetry, RetrySignal e, long maxMillisElapsed, long startTime, Long retryAfterMs) { + return new MongoRequestRateTooLargeRetryExpiredException(flowName, maxRetry, e.totalRetries() + 1, maxMillisElapsed, startTime, retryAfterMs, e.failure()); } @@ -88,7 +86,7 @@ public static Long getRetryAfterMs(Throwable ex) { } public static boolean isRequestRateTooLargeException(Throwable ex) { - return ex instanceof DataAccessException && ex.getMessage().contains("TooManyRequests"); + return ex instanceof DataAccessException && (ex.getMessage().contains("TooManyRequests") || ex.getMessage().contains("Error=16500,")); } } \ No newline at end of file diff --git a/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/exception/MongoRequestRateTooLargeRetryExpiredException.java b/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/exception/MongoRequestRateTooLargeRetryExpiredException.java index 185e73da..7d0492f6 100644 --- a/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/exception/MongoRequestRateTooLargeRetryExpiredException.java +++ b/src/main/java/it/gov/pagopa/common/reactive/mongo/retry/exception/MongoRequestRateTooLargeRetryExpiredException.java @@ -12,10 +12,10 @@ public class MongoRequestRateTooLargeRetryExpiredException extends RuntimeExcept private final Long retryAfterMs; - public MongoRequestRateTooLargeRetryExpiredException(long maxRetry, long counter, + public MongoRequestRateTooLargeRetryExpiredException(String flowName, long maxRetry, long counter, long maxMillisElapsed, long millisElapsed, Long retryAfterMs, Throwable cause) { - super("[REQUEST_RATE_TOO_LARGE_RETRY_EXPIRED] Expired retry for RequestRateTooLargeException: attempt %d of %d after %d ms of max %d ms, suggested retry after %s ms" - .formatted(counter, maxRetry, millisElapsed, maxMillisElapsed, String.valueOf(retryAfterMs)), + super("[REQUEST_RATE_TOO_LARGE_RETRY_EXPIRED][%s] Expired retry for RequestRateTooLargeException: attempt %d of %d after %d ms of max %d ms, suggested retry after %s ms" + .formatted(flowName, counter, maxRetry, millisElapsed, maxMillisElapsed, String.valueOf(retryAfterMs)), cause); this.maxRetry = maxRetry; this.counter = counter; diff --git a/src/main/java/it/gov/pagopa/common/stream/StreamsHealthIndicator.java b/src/main/java/it/gov/pagopa/common/stream/StreamsHealthIndicator.java index 483bf8f4..412d47ef 100644 --- a/src/main/java/it/gov/pagopa/common/stream/StreamsHealthIndicator.java +++ b/src/main/java/it/gov/pagopa/common/stream/StreamsHealthIndicator.java @@ -1,7 +1,6 @@ package it.gov.pagopa.common.stream; import lombok.NonNull; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; @@ -23,11 +22,14 @@ @GlobalChannelInterceptor public class StreamsHealthIndicator extends AbstractHealthIndicator implements ChannelInterceptor { - @Autowired - private ApplicationContext applicationContext; + private final ApplicationContext applicationContext; private final Set disconnectedSubscribers = new HashSet<>(); + public StreamsHealthIndicator(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + @Override protected void doHealthCheck(Health.Builder builder) { Map publisherSubscriptionCounts = applicationContext.getBeansOfType(DirectWithAttributesChannel.class).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSubscriberCount())); diff --git a/src/main/java/it/gov/pagopa/common/web/exception/MongoExceptionHandler.java b/src/main/java/it/gov/pagopa/common/web/exception/MongoExceptionHandler.java index d055abcc..6b5c0b8f 100644 --- a/src/main/java/it/gov/pagopa/common/web/exception/MongoExceptionHandler.java +++ b/src/main/java/it/gov/pagopa/common/web/exception/MongoExceptionHandler.java @@ -4,8 +4,6 @@ import it.gov.pagopa.common.reactive.mongo.retry.exception.MongoRequestRateTooLargeRetryExpiredException; import it.gov.pagopa.common.web.dto.ErrorDTO; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.dao.DataAccessException; @@ -23,8 +21,11 @@ @Order(Ordered.HIGHEST_PRECEDENCE) public class MongoExceptionHandler { - @Autowired - private ErrorManager errorManager; + private final ErrorManager errorManager; + + public MongoExceptionHandler(ErrorManager errorManager) { + this.errorManager = errorManager; + } @ExceptionHandler(DataAccessException.class) protected ResponseEntity handleDataAccessException( @@ -46,7 +47,6 @@ protected ResponseEntity handleMongoRequestRateTooLargeRetryExpiredExc return getErrorDTOResponseEntity(ex, request, ex.getRetryAfterMs()); } - @NotNull private ResponseEntity getErrorDTOResponseEntity(Exception ex, ServerWebExchange request, Long retryAfterMs) { String message = ex.getMessage(); diff --git a/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java b/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java index 1981aa02..757db668 100644 --- a/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java +++ b/src/test/java/it/gov/pagopa/admissibility/connector/event/consumer/CommandConsumerConfigIntegrationTest.java @@ -32,18 +32,20 @@ import java.util.stream.IntStream; @TestPropertySource(properties = { + "app.beneficiary-rule.build-delay-duration=PT1S", "logging.level.it.gov.pagopa.admissibility.service.commands.CommandMediatorServiceImpl=WARN", "logging.level.it.gov.pagopa.admissibility.service.commands.operations.DeleteInitiativeServiceImpl=WARN", "logging.level.it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderServiceImpl=WARN", + "logging.level.it.gov.pagopa.common.reactive.utils.PerformanceLogger=WARN" }) class CommandConsumerConfigIntegrationTest extends BaseIntegrationTest { private final String INITIATIVEID = "INITIATIVEID_%d"; private final Set INITIATIVES_DELETED = new HashSet<>(); @SpyBean - private DroolsRuleRepository droolsRuleRepository; + private DroolsRuleRepository droolsRuleRepositorySpy; + @Autowired private InitiativeCountersRepository initiativeCountersRepository; - @Autowired private OnboardingFamiliesRepository onboardingFamiliesRepository; @@ -68,6 +70,8 @@ void test() { checkRepositories(); checkErrorsPublished(notValidMessages, maxWaitingMs, errorUseCases); + Mockito.verify(droolsRuleRepositorySpy).findAll(); + System.out.printf(""" ************************ Time spent to send %d (%d + %d) messages (from start): %d millis @@ -140,7 +144,7 @@ private void initializeDB(int bias) { .initiativeConfig(initiativeConfig) .rule("") .build(); - droolsRuleRepository.save(droolsRule).block(); + droolsRuleRepositorySpy.save(droolsRule).block(); InitiativeCounters initiativeCounters = InitiativeCounters.builder() .id(INITIATIVEID.formatted(bias)) @@ -184,7 +188,7 @@ protected Pattern getErrorUseCaseIdPatternMatch() { errorUseCases.add(Pair.of( () -> { Mockito.doThrow(new MongoException("Command error dummy")) - .when(droolsRuleRepository).deleteById(errorInitiativeId); + .when(droolsRuleRepositorySpy).deleteById(errorInitiativeId); return commandOperationErrorString; }, errorMessage -> checkErrorMessageHeaders(errorMessage, "[ADMISSIBILITY_COMMANDS] An error occurred evaluating commands", commandOperationErrorString) @@ -192,7 +196,7 @@ protected Pattern getErrorUseCaseIdPatternMatch() { } private void checkRepositories() { - Assertions.assertTrue(droolsRuleRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId()))); + Assertions.assertTrue(droolsRuleRepositorySpy.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId()))); Assertions.assertTrue(initiativeCountersRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId()))); Assertions.assertTrue(onboardingFamiliesRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getInitiativeId()))); } diff --git a/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java b/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java index 81fc91f3..08b77d76 100644 --- a/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java +++ b/src/test/java/it/gov/pagopa/admissibility/service/commands/operations/DeleteInitiativeServiceImplTest.java @@ -6,13 +6,11 @@ import it.gov.pagopa.admissibility.connector.repository.OnboardingFamiliesRepository; import it.gov.pagopa.admissibility.dto.onboarding.extra.Family; import it.gov.pagopa.admissibility.model.OnboardingFamilies; -import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService; import it.gov.pagopa.admissibility.utils.AuditUtilities; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.kie.api.KieBase; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -25,7 +23,6 @@ class DeleteInitiativeServiceImplTest { @Mock private InitiativeCountersRepository initiativeCountersRepositoryMock; @Mock private OnboardingFamiliesRepository onboardingFamiliesRepositoryMock; @Mock private AuditUtilities auditUtilitiesMock; - @Mock private OnboardingContextHolderService onboardingContextHolderService; private DeleteInitiativeService deleteInitiativeService; private final static int PAGE_SIZE = 100; @@ -37,7 +34,8 @@ void setUp() { droolsRuleRepositoryMock, initiativeCountersRepositoryMock, onboardingFamiliesRepositoryMock, - auditUtilitiesMock, onboardingContextHolderService, PAGE_SIZE, 1000L); + auditUtilitiesMock, + PAGE_SIZE, 1000L); } @Test @@ -48,9 +46,6 @@ void executeOK() { Mockito.when(droolsRuleRepositoryMock.deleteById(initiativeId)) .thenReturn(Mono.just(Mockito.mock(Void.class))); - Mockito.when(onboardingContextHolderService.refreshKieContainerCacheMiss()) - .thenReturn(Mono.just(Mockito.mock(KieBase.class))); - Mockito.when(initiativeCountersRepositoryMock.deleteById(initiativeId)) .thenReturn(Mono.just(Mockito.mock(Void.class))); diff --git a/src/test/java/it/gov/pagopa/common/reactive/mongo/BaseReactiveMongoRepositoryIntegrationTest.java b/src/test/java/it/gov/pagopa/common/reactive/mongo/BaseReactiveMongoRepositoryIntegrationTest.java index 52ca5533..49f68059 100644 --- a/src/test/java/it/gov/pagopa/common/reactive/mongo/BaseReactiveMongoRepositoryIntegrationTest.java +++ b/src/test/java/it/gov/pagopa/common/reactive/mongo/BaseReactiveMongoRepositoryIntegrationTest.java @@ -7,7 +7,6 @@ import it.gov.pagopa.common.mongo.MongoTestUtilitiesService; import it.gov.pagopa.common.mongo.config.MongoConfig; import it.gov.pagopa.common.reactive.mongo.config.ReactiveMongoConfig; -import lombok.Data; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -18,26 +17,12 @@ import org.springframework.boot.autoconfigure.mongo.MongoClientSettingsBuilderCustomizer; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.data.annotation.Id; -import org.springframework.data.mapping.model.CamelCaseAbbreviatingFieldNamingStrategy; -import org.springframework.data.mapping.model.Property; -import org.springframework.data.mapping.model.SimpleTypeHolder; -import org.springframework.data.mongodb.core.ReactiveMongoOperations; -import org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity; -import org.springframework.data.mongodb.core.mapping.BasicMongoPersistentProperty; -import org.springframework.data.mongodb.core.mapping.Document; -import org.springframework.data.mongodb.repository.ReactiveMongoRepository; -import org.springframework.data.mongodb.repository.query.MongoEntityInformation; -import org.springframework.data.mongodb.repository.support.MappingMongoEntityInformation; -import org.springframework.data.util.TypeInformation; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import java.util.List; import java.util.Map; -import java.util.Set; @TestPropertySource(properties = { "de.flapdoodle.mongodb.embedded.version=4.0.21", @@ -59,13 +44,6 @@ class BaseReactiveMongoRepositoryIntegrationTest { ((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.INFO); } - @Document("beneficiary_rule") - @Data - public static class TestCollection { - @Id - private String id; - } - @TestConfiguration static class TestMongoRepositoryConfig extends MongoConfig { @Autowired @@ -78,30 +56,10 @@ public MongoClientSettingsBuilderCustomizer customizer(MongoDbCustomProperties m builder.addCommandListener(mongoMetricsCommandListener); }; } - - @Bean - public TestRepository configureTestRepository(ReactiveMongoOperations mongoOperations) throws NoSuchFieldException { - TypeInformation testTypeInformation = TypeInformation.of(TestCollection.class); - BasicMongoPersistentEntity testPersistentEntity = new BasicMongoPersistentEntity<>(testTypeInformation); - testPersistentEntity.addPersistentProperty(new BasicMongoPersistentProperty( - Property.of(testTypeInformation, TestCollection.class.getDeclaredField("id")), - testPersistentEntity, - new SimpleTypeHolder(Set.of(TestCollection.class), true), - new CamelCaseAbbreviatingFieldNamingStrategy() - )); - return new TestRepository(new MappingMongoEntityInformation<>(testPersistentEntity), mongoOperations); - } - } - - static class TestRepository extends ReactiveMongoRepositoryImpl implements ReactiveMongoRepository { - - public TestRepository(MongoEntityInformation entityInformation, ReactiveMongoOperations mongoOperations) { - super(entityInformation, mongoOperations); - } } @Autowired - private TestRepository repository; + private DummySpringRepository repository; private static final List ID_TEST_ENTITIES = List.of("ID", "ID2"); @@ -111,7 +69,7 @@ void initTestData(){ } private void storeTestData(String idTestEntity) { - TestCollection testData = new TestCollection(); + DummySpringRepository.DummyMongoCollection testData = new DummySpringRepository.DummyMongoCollection(); testData.setId(idTestEntity); repository.save(testData).block(); } @@ -125,7 +83,7 @@ void clearTestData(){ void testFindById() { MongoTestUtilitiesService.startMongoCommandListener(); - TestCollection result = repository.findById(ID_TEST_ENTITIES.get(0)).block(); + DummySpringRepository.DummyMongoCollection result = repository.findById(ID_TEST_ENTITIES.get(0)).block(); Assertions.assertNotNull(result); Assertions.assertEquals(ID_TEST_ENTITIES.get(0), result.getId()); diff --git a/src/test/java/it/gov/pagopa/common/reactive/mongo/DummySpringRepository.java b/src/test/java/it/gov/pagopa/common/reactive/mongo/DummySpringRepository.java new file mode 100644 index 00000000..2ea5be8b --- /dev/null +++ b/src/test/java/it/gov/pagopa/common/reactive/mongo/DummySpringRepository.java @@ -0,0 +1,20 @@ +package it.gov.pagopa.common.reactive.mongo; + +import lombok.Data; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DummySpringRepository extends ReactiveMongoRepository { + Mono findByIdOrderById(String id); + Flux findByIdOrderByIdDesc(String id); + + @Document("beneficiary_rule") + @Data + class DummyMongoCollection { + @Id + private String id; + } +} diff --git a/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspectTest.java b/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspectTest.java index 4b4c4aca..81083ea0 100644 --- a/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspectTest.java +++ b/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeAutomaticRetryAspectTest.java @@ -2,7 +2,7 @@ import it.gov.pagopa.common.reactive.web.ReactiveRequestContextHolder; import org.aspectj.lang.ProceedingJoinPoint; -import org.jetbrains.annotations.NotNull; +import org.aspectj.lang.Signature; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -140,7 +140,6 @@ void testApiFluxDisabledBatchEnabled() throws Throwable { } //endregion - @NotNull private MongoRequestRateTooLargeAutomaticRetryAspect buildMongoRequestRateTooLargeAutomaticRetryAspect( boolean enabledApi, boolean enabledBatch) { return new MongoRequestRateTooLargeAutomaticRetryAspect( @@ -151,10 +150,14 @@ private void configureRetryMockMono() throws Throwable { Mockito.doAnswer(i -> Mono.deferContextual(ctx -> { if (counter[0]++ < maxRetry){ - return Mono.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException()); + return Mono.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException_whenReading()); } return expectedMonoResult; })).when(pjpMock).proceed(); + + Signature signatureMock = Mockito.mock(Signature.class); + Mockito.lenient().when(signatureMock.toShortString()).thenReturn("ClassName.jointPointName(..)"); + Mockito.lenient().when(pjpMock.getSignature()).thenReturn(signatureMock); } private void checkRetryBehaviourMono(MongoRequestRateTooLargeAutomaticRetryAspect aspect, boolean isBatch) throws Throwable { @@ -174,7 +177,7 @@ private void checkExceptionMono(MongoRequestRateTooLargeAutomaticRetryAspect asp configureRetryMockMono(); Mono mono = buildContextMono(aspect, isBatch); UncategorizedMongoDbException uncategorizedMongoDbException = Assertions.assertThrows(UncategorizedMongoDbException.class, mono::block); - Assertions.assertEquals( MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException().getMessage() ,uncategorizedMongoDbException.getMessage()); + Assertions.assertEquals( MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException_whenReading().getMessage() ,uncategorizedMongoDbException.getMessage()); } private Mono buildContextMono(MongoRequestRateTooLargeAutomaticRetryAspect aspect, boolean isBatch) @@ -191,10 +194,14 @@ private void configureRetryMockFlux() throws Throwable { Mockito.doAnswer(i -> Flux.deferContextual(ctx -> { if (counter[0]++ < maxRetry){ - return Flux.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException()); + return Flux.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException_whenReading()); } return expectedFluxResult; })).when(pjpMock).proceed(); + + Signature signatureMock = Mockito.mock(Signature.class); + Mockito.lenient().when(signatureMock.toShortString()).thenReturn("ClassName.jointPointName(..)"); + Mockito.lenient().when(pjpMock.getSignature()).thenReturn(signatureMock); } private void checkRetryBehaviourFlux(MongoRequestRateTooLargeAutomaticRetryAspect aspect, boolean isBatch) throws Throwable { @@ -214,7 +221,7 @@ private void checkExceptionFlux(MongoRequestRateTooLargeAutomaticRetryAspect asp configureRetryMockFlux(); Flux flux = buildContextFlux(aspect, isBatch); UncategorizedMongoDbException uncategorizedMongoDbException = Assertions.assertThrows(UncategorizedMongoDbException.class, flux::blockLast); - Assertions.assertEquals( MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException().getMessage() ,uncategorizedMongoDbException.getMessage()); + Assertions.assertEquals( MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException_whenReading().getMessage() ,uncategorizedMongoDbException.getMessage()); } private Flux buildContextFlux(MongoRequestRateTooLargeAutomaticRetryAspect aspect, boolean isBatch) diff --git a/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryIntegrationTest.java b/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryIntegrationTest.java index 88ab63ac..c294ba09 100644 --- a/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryIntegrationTest.java +++ b/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryIntegrationTest.java @@ -1,5 +1,8 @@ package it.gov.pagopa.common.reactive.mongo.retry; +import it.gov.pagopa.common.mongo.config.MongoConfig; +import it.gov.pagopa.common.reactive.mongo.DummySpringRepository; +import it.gov.pagopa.common.reactive.mongo.config.ReactiveMongoConfig; import it.gov.pagopa.common.reactive.mongo.retry.exception.MongoRequestRateTooLargeRetryExpiredException; import it.gov.pagopa.common.reactive.web.ReactiveRequestContextFilter; import it.gov.pagopa.common.reactive.web.ReactiveRequestContextHolder; @@ -10,13 +13,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.web.bind.annotation.GetMapping; @@ -27,17 +34,33 @@ import java.time.Duration; import java.time.LocalDateTime; +@TestPropertySource( + properties = { + "de.flapdoodle.mongodb.embedded.version=4.0.21", + + "spring.data.mongodb.database=idpay", + "spring.data.mongodb.config.connectionPool.maxSize: 100", + "spring.data.mongodb.config.connectionPool.minSize: 0", + "spring.data.mongodb.config.connectionPool.maxWaitTimeMS: 120000", + "spring.data.mongodb.config.connectionPool.maxConnectionLifeTimeMS: 0", + "spring.data.mongodb.config.connectionPool.maxConnectionIdleTimeMS: 120000", + "spring.data.mongodb.config.connectionPool.maxConnecting: 2", + }) @ExtendWith(SpringExtension.class) @ContextConfiguration(classes = { ReactiveRequestContextFilter.class, MongoRequestRateTooLargeAutomaticRetryAspect.class, ErrorManager.class, MongoExceptionHandler.class, + MongoConfig.class, + ReactiveMongoConfig.class, MongoRequestRateTooLargeRetryIntegrationTest.TestController.class, MongoRequestRateTooLargeRetryIntegrationTest.TestRepository.class }) @WebFluxTest +@AutoConfigureDataMongo +@EnableAutoConfiguration class MongoRequestRateTooLargeRetryIntegrationTest { @Value("${mongo.request-rate-too-large.batch.max-retry:3}") @@ -49,6 +72,11 @@ class MongoRequestRateTooLargeRetryIntegrationTest { @SpyBean private TestRepository testRepositorySpy; + @Autowired + private DummySpringRepository dummySpringRepository; + + @SpyBean + private MongoRequestRateTooLargeAutomaticRetryAspect automaticRetryAspectSpy; private static int[] counter; @@ -128,14 +156,14 @@ static class TestRepository { public Mono testMono() { return Mono.defer(() -> { counter[0]++; - return Mono.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException()); + return Mono.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException_whenReading()); }); } public Flux testFlux() { return Flux.defer(() -> { counter[0]++; - return Flux.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException()); + return Flux.error(MongoRequestRateTooLargeRetryerTest.buildRequestRateTooLargeMongodbException_whenReading()); }); } } @@ -210,4 +238,15 @@ private void testNoController(Mono mono) { Assertions.assertEquals(maxRetry + 1, counter[0]); } + + @Test + void testSpringRepositoryInterceptor() throws Throwable { + // When + dummySpringRepository.findByIdOrderById("ID"); + dummySpringRepository.findByIdOrderByIdDesc("ID2"); + + // Then + Mockito.verify(automaticRetryAspectSpy).decorateMonoRepositoryMethods(Mockito.argThat(i -> i.getArgs()[0].equals("ID"))); + Mockito.verify(automaticRetryAspectSpy).decorateFluxRepositoryMethods(Mockito.argThat(i -> i.getArgs()[0].equals("ID2"))); + } } \ No newline at end of file diff --git a/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryerTest.java b/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryerTest.java index 269698a0..42781f3f 100644 --- a/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryerTest.java +++ b/src/test/java/it/gov/pagopa/common/reactive/mongo/retry/MongoRequestRateTooLargeRetryerTest.java @@ -1,23 +1,28 @@ package it.gov.pagopa.common.reactive.mongo.retry; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - import ch.qos.logback.classic.LoggerContext; import com.mongodb.MongoQueryException; +import com.mongodb.MongoWriteException; import com.mongodb.ServerAddress; +import com.mongodb.WriteError; import it.gov.pagopa.common.reactive.mongo.retry.exception.MongoRequestRateTooLargeRetryExpiredException; import it.gov.pagopa.common.utils.MemoryAppender; import org.bson.BsonDocument; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.LoggerFactory; +import org.springframework.dao.DataAccessException; +import org.springframework.dao.DataIntegrityViolationException; import org.springframework.data.mongodb.UncategorizedMongoDbException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; + class MongoRequestRateTooLargeRetryerTest { @@ -36,48 +41,97 @@ public void setup() { memoryAppender.start(); } + public static Stream buildRetriableExceptions(){ + return Stream.of( + buildRequestRateTooLargeMongodbException_whenReading(), + buildRequestRateTooLargeMongodbException_whenWriting(), + buildRequestRateTooLargeMongodbException_whenBulkWriting() + ); + } + + /** Exception thrown when 429 occurs while reading */ + public static UncategorizedMongoDbException buildRequestRateTooLargeMongodbException_whenReading() { + String mongoFullErrorResponse = """ + {"ok": 0.0, "errmsg": "Error=16500, RetryAfterMs=34,\s + Details='Response status code does not indicate success: TooManyRequests (429) Substatus: 3200 ActivityId: 46ba3855-bc3b-4670-8609-17e1c2c87778 Reason:\s + (\\r\\nErrors : [\\r\\n \\"Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: + http://aka.ms/cosmosdb-error-429\\"\\r\\n]\\r\\n) ", "code": 16500, "codeName": "RequestRateTooLarge"} + """; + + MongoQueryException mongoQueryException = new MongoQueryException( + BsonDocument.parse(mongoFullErrorResponse), new ServerAddress()); + return new UncategorizedMongoDbException(mongoQueryException.getMessage(), mongoQueryException); + } + + /** Exception thrown when 429 occurs while writing */ + public static DataIntegrityViolationException buildRequestRateTooLargeMongodbException_whenWriting() { + String writeErrorMessage = """ + Error=16500, RetryAfterMs=34, Details='Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: 822d212d-5aac-4f5d-a2d4-76d6da7b619e; Reason: ( + Errors : [ + "Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: http://aka.ms/cosmosdb-error-429" + ] + ); + """; + final MongoWriteException mongoWriteException = new MongoWriteException( + new WriteError(16500, writeErrorMessage, BsonDocument.parse("{}")), new ServerAddress()); + return new DataIntegrityViolationException(mongoWriteException.getMessage(), mongoWriteException); + } + + /** Exception thrown when 429 occurs while bulk writing */ + public static DataIntegrityViolationException buildRequestRateTooLargeMongodbException_whenBulkWriting() { + String writeErrorMessage = """ + Error=16500, RetryAfterMs=34, Details='Batch write error.' + """; + final MongoWriteException mongoWriteException = new MongoWriteException( + new WriteError(16500, writeErrorMessage, BsonDocument.parse("{}")), new ServerAddress()); + return new DataIntegrityViolationException(mongoWriteException.getMessage(), mongoWriteException); + } + //region Mono ops - @Test - void testMonoWithRetryMaxRetry_resultOk(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testMonoWithRetryMaxRetry_resultOk(DataAccessException retriableException){ int[] counter = {0}; Mono testPublisher = Mono.fromSupplier(() -> counter[0]++) .map(x -> { if (counter[0] <= REQUEST_RATE_TOO_LARGE_MAX_RETRY) { - return throwRequestRateTooLargeMongodbException(); + throw retriableException; } return "OK"; }); - assertLogForMaxRetryTest(MongoRequestRateTooLargeRetryer.withRetry(testPublisher, + assertLogForMaxRetryTest(MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher, REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0).block(), counter[0]); } - @Test - void testMonoWithRetryMaxMillisElapsed_resultOk(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testMonoWithRetryMaxMillisElapsed_resultOk(DataAccessException retriableException){ int[] counter = {0}; long startTime = System.currentTimeMillis(); Mono testPublisher = Mono.fromSupplier(() -> counter[0]++) .map(x -> { if (System.currentTimeMillis() - startTime < (REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED / 2)) { - return throwRequestRateTooLargeMongodbException(); + throw retriableException; } return "OK"; }); - assertLogForMaxMillisElapsedTest(MongoRequestRateTooLargeRetryer.withRetry(testPublisher, + assertLogForMaxMillisElapsedTest(MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher, 0, REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED).block()); } - @Test - void testMonoWithRetryMaxRetry_resultKo(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testMonoWithRetryMaxRetry_resultKo(DataAccessException retriableException){ int[] counter = {0}; Mono testPublisher = Mono.fromSupplier(() -> counter[0]++) - .map(x -> throwRequestRateTooLargeMongodbException()); - Mono mono = MongoRequestRateTooLargeRetryer.withRetry(testPublisher + .map(x -> {throw retriableException;}); + Mono mono = MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0); try { @@ -90,13 +144,14 @@ void testMonoWithRetryMaxRetry_resultKo(){ assertLogForMaxRetryTest(null, counter[0]); } - @Test - void testMonoWithRetryMaxMillisElapsed_resultKo(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testMonoWithRetryMaxMillisElapsed_resultKo(DataAccessException retriableException){ int[] counter = {0}; Mono testPublisher = Mono.fromSupplier(() -> counter[0]++) - .map(x -> throwRequestRateTooLargeMongodbException()); - Mono mono = MongoRequestRateTooLargeRetryer.withRetry(testPublisher + .map(x -> {throw retriableException;}); + Mono mono = MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , 0, REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED); try { @@ -119,7 +174,7 @@ void testMonoUncategorizedMongoDbExceptionNotRequestRateTooLarge() { .map(x -> { throw new UncategorizedMongoDbException("not Request Rate Too Large Exception", new Throwable()); }); - Mono mono = MongoRequestRateTooLargeRetryer.withRetry(testPublisher + Mono mono = MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0); try { @@ -142,7 +197,7 @@ void testMonoRequestRateTooLargeRetryAfterMsNull() { } return "OK"; }); - MongoRequestRateTooLargeRetryer.withRetry(testPublisher + MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0).block(); assertLogForRequestRateTooLargeRetryAfterMsNull(); @@ -151,48 +206,51 @@ void testMonoRequestRateTooLargeRetryAfterMsNull() { //endregion //region Flux ops - @Test - void testFluxWithRetryMaxRetry_resultOk(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testFluxWithRetryMaxRetry_resultOk(DataAccessException retriableException){ int[] counter = {0}; Flux testPublisher = Flux.defer(() -> Flux.just(counter[0]++)) .map(x -> { if (counter[0] <= REQUEST_RATE_TOO_LARGE_MAX_RETRY) { - return throwRequestRateTooLargeMongodbException(); + throw retriableException; } return "OK"; }); - assertLogForMaxRetryTest(MongoRequestRateTooLargeRetryer.withRetry(testPublisher, + assertLogForMaxRetryTest(MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher, REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0).blockLast(), counter[0]); } - @Test - void testFluxWithRetryMaxMillisElapsed_resultOk(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testFluxWithRetryMaxMillisElapsed_resultOk(DataAccessException retriableException){ int[] counter = {0}; long startTime = System.currentTimeMillis(); Flux testPublisher = Flux.defer(() -> Flux.just(counter[0]++)) .map(x -> { if (System.currentTimeMillis() - startTime < (REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED / 2)) { - return throwRequestRateTooLargeMongodbException(); + throw retriableException; } return "OK"; }); - assertLogForMaxMillisElapsedTest(MongoRequestRateTooLargeRetryer.withRetry(testPublisher, + assertLogForMaxMillisElapsedTest(MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher, 0, REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED).blockLast()); } - @Test - void testFluxWithRetryMaxRetry_resultKo(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testFluxWithRetryMaxRetry_resultKo(DataAccessException retriableException){ int[] counter = {0}; Flux testPublisher = Flux.defer(() -> Flux.just(counter[0]++)) - .map(x -> throwRequestRateTooLargeMongodbException()); - Flux flux = MongoRequestRateTooLargeRetryer.withRetry(testPublisher + .map(x -> {throw retriableException;}); + Flux flux = MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0); try { @@ -205,13 +263,14 @@ void testFluxWithRetryMaxRetry_resultKo(){ assertLogForMaxRetryTest(null, counter[0]); } - @Test - void testFluxWithRetryMaxMillisElapsed_resultKo(){ + @ParameterizedTest + @MethodSource("buildRetriableExceptions") + void testFluxWithRetryMaxMillisElapsed_resultKo(DataAccessException retriableException){ int[] counter = {0}; Flux testPublisher = Flux.defer(() -> Flux.just(counter[0]++)) - .map(x -> throwRequestRateTooLargeMongodbException()); - Flux flux = MongoRequestRateTooLargeRetryer.withRetry(testPublisher + .map(x -> {throw retriableException;}); + Flux flux = MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , 0, REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED); try { @@ -234,7 +293,7 @@ void testFluxUncategorizedMongoDbExceptionNotRequestRateTooLarge() { .map(x -> { throw new UncategorizedMongoDbException("not Request Rate Too Large Exception", new Throwable()); }); - Flux flux = MongoRequestRateTooLargeRetryer.withRetry(testPublisher + Flux flux = MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0); try { @@ -257,7 +316,7 @@ void testFluxRequestRateTooLargeRetryAfterMsNull() { } return "OK"; }); - MongoRequestRateTooLargeRetryer.withRetry(testPublisher + MongoRequestRateTooLargeRetryer.withRetry("FLOWNAME", testPublisher , REQUEST_RATE_TOO_LARGE_MAX_RETRY, 0).blockLast(); assertLogForRequestRateTooLargeRetryAfterMsNull(); @@ -265,26 +324,6 @@ void testFluxRequestRateTooLargeRetryAfterMsNull() { } //endregion - @NotNull - public static UncategorizedMongoDbException buildRequestRateTooLargeMongodbException() { - String mongoFullErrorResponse = """ - {"ok": 0.0, "errmsg": "Error=16500, RetryAfterMs=34,\s - Details='Response status code does not indicate success: TooManyRequests (429) Substatus: 3200 ActivityId: 46ba3855-bc3b-4670-8609-17e1c2c87778 Reason:\s - (\\r\\nErrors : [\\r\\n \\"Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later. Learn more: - http://aka.ms/cosmosdb-error-429\\"\\r\\n]\\r\\n) ", "code": 16500, "codeName": "RequestRateTooLarge"} - """; - - MongoQueryException mongoQueryException = new MongoQueryException( - BsonDocument.parse(mongoFullErrorResponse), new ServerAddress()); - return new UncategorizedMongoDbException(mongoQueryException.getMessage(), mongoQueryException); - } - - @NotNull - public static UncategorizedMongoDbException throwRequestRateTooLargeMongodbException() { - - throw buildRequestRateTooLargeMongodbException(); - } - private void assertLogMessage(String expectedMessage, long maxRetryOrMaxMillisElapsed) { for (int i = 0; i < memoryAppender.getLoggedEvents().size(); i++) { @@ -297,7 +336,7 @@ private void assertLogMessage(String expectedMessage, long maxRetryOrMaxMillisEl } private void assertLogForRequestRateTooLargeRetryAfterMsNull() { - String expectedMessage = "\\[REQUEST_RATE_TOO_LARGE_RETRY] Retrying for RequestRateTooLargeException: attempt %d of %d after .*"; + String expectedMessage = "\\[REQUEST_RATE_TOO_LARGE_RETRY]\\[FLOWNAME] Retrying for RequestRateTooLargeException: attempt %d of %d after .*"; assertEquals(REQUEST_RATE_TOO_LARGE_MAX_RETRY, memoryAppender.getLoggedEvents().size()); assertLogMessage(expectedMessage, REQUEST_RATE_TOO_LARGE_MAX_RETRY); } @@ -306,7 +345,7 @@ private void assertLogForMaxMillisElapsedTest(Object testPublisher) { if(testPublisher!=null) { assertEquals("OK", testPublisher); } - String message = "\\[REQUEST_RATE_TOO_LARGE_RETRY] Retrying after 34 ms due to RequestRateTooLargeException: attempt %d of \\d+ after \\d+ ms of max %d ms"; + String message = "\\[REQUEST_RATE_TOO_LARGE_RETRY]\\[FLOWNAME] Retrying after 34 ms due to RequestRateTooLargeException: attempt %d of \\d+ after \\d+ ms of max %d ms"; assertLogMessage(message, REQUEST_RATE_TOO_LARGE_MAX_MILLIS_ELAPSED); } @@ -315,7 +354,7 @@ private void assertLogForMaxRetryTest(Object testPublisher, int counter) { assertEquals("OK", testPublisher); } assertEquals(REQUEST_RATE_TOO_LARGE_MAX_RETRY + 1, counter); - String expectedMessage = "\\[REQUEST_RATE_TOO_LARGE_RETRY] Retrying after 34 ms due to RequestRateTooLargeException: attempt %d of %d after .*"; + String expectedMessage = "\\[REQUEST_RATE_TOO_LARGE_RETRY]\\[FLOWNAME] Retrying after 34 ms due to RequestRateTooLargeException: attempt %d of %d after .*"; assertLogMessage(expectedMessage, REQUEST_RATE_TOO_LARGE_MAX_RETRY); } diff --git a/src/test/java/it/gov/pagopa/common/reactive/web/exception/MongoExceptionHandlerTest.java b/src/test/java/it/gov/pagopa/common/reactive/web/exception/MongoExceptionHandlerTest.java index e810755d..5c5076e6 100644 --- a/src/test/java/it/gov/pagopa/common/reactive/web/exception/MongoExceptionHandlerTest.java +++ b/src/test/java/it/gov/pagopa/common/reactive/web/exception/MongoExceptionHandlerTest.java @@ -123,7 +123,7 @@ void handleUncategorizedMongoDbExceptionNotRequestRateTooLarge() { @Test void handleMongoRequestRateTooLargeRetryExpiredException() { - doThrow(new MongoRequestRateTooLargeRetryExpiredException(3,3,0,100,34L,new Exception())) + doThrow(new MongoRequestRateTooLargeRetryExpiredException("FLOWNAME",3,3,0,100,34L,new Exception())) .when(testControllerSpy).testEndpoint(); ErrorDTO expectedErrorDefault = new ErrorDTO("TOO_MANY_REQUESTS","TOO_MANY_REQUESTS");