Skip to content

Commit

Permalink
fix mongo cursor closed exception
Browse files Browse the repository at this point in the history
  • Loading branch information
antonio.torre committed Oct 26, 2023
1 parent 5e5ebbd commit 3158f93
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import it.gov.pagopa.admissibility.dto.commands.QueueCommandOperationDTO;
import it.gov.pagopa.admissibility.service.AdmissibilityErrorNotifierService;
import it.gov.pagopa.admissibility.service.commands.operations.DeleteInitiativeService;
import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService;
import it.gov.pagopa.admissibility.utils.CommandConstants;
import it.gov.pagopa.common.reactive.kafka.consumer.BaseKafkaConsumer;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -23,20 +24,29 @@
public class CommandMediatorServiceImpl extends BaseKafkaConsumer<QueueCommandOperationDTO, String> implements CommandMediatorService{

private final Duration commitDelay;
private final Duration beneficiaryRulesBuildDelayMinusCommit;
private final DeleteInitiativeService deleteInitiativeService;
private final OnboardingContextHolderService onboardingContextHolderService;
private final AdmissibilityErrorNotifierService admissibilityErrorNotifierService;
private final ObjectReader objectReader;

public CommandMediatorServiceImpl(@Value("${spring.application.name}") String applicationName,
@Value("${spring.cloud.stream.kafka.bindings.consumerCommands-in-0.consumer.ackTime}") long commitMillis,
@Value("${app.beneficiary-rule.build-delay-duration}") String beneficiaryRulesBuildDelay,

DeleteInitiativeService deleteInitiativeService,
AdmissibilityErrorNotifierService admissibilityErrorNotifierService,
OnboardingContextHolderService onboardingContextHolderService, AdmissibilityErrorNotifierService admissibilityErrorNotifierService,
ObjectMapper objectMapper) {
super(applicationName);
this.commitDelay = Duration.ofMillis(commitMillis);
this.deleteInitiativeService = deleteInitiativeService;
this.onboardingContextHolderService = onboardingContextHolderService;
this.admissibilityErrorNotifierService = admissibilityErrorNotifierService;
this.objectReader = objectMapper.readerFor(QueueCommandOperationDTO.class);

Duration beneficiaryRulesBuildDelayDuration = Duration.parse(beneficiaryRulesBuildDelay).minusMillis(commitMillis);
Duration defaultDurationDelay = Duration.ofMillis(2L);
this.beneficiaryRulesBuildDelayMinusCommit = defaultDurationDelay.compareTo(beneficiaryRulesBuildDelayDuration) >= 0 ? defaultDurationDelay : beneficiaryRulesBuildDelayDuration;
}

@Override
Expand All @@ -47,6 +57,8 @@ protected Duration getCommitDelay() {
@Override
protected void subscribeAfterCommits(Flux<List<String>> afterCommits2subscribe) {
afterCommits2subscribe
.buffer(beneficiaryRulesBuildDelayMinusCommit)
.then(onboardingContextHolderService.refreshKieContainerCacheMiss())
.subscribe(r -> log.info("[ADMISSIBILITY_COMMANDS] Processed offsets committed successfully"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import it.gov.pagopa.admissibility.connector.repository.DroolsRuleRepository;
import it.gov.pagopa.admissibility.connector.repository.InitiativeCountersRepository;
import it.gov.pagopa.admissibility.connector.repository.OnboardingFamiliesRepository;
import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService;
import it.gov.pagopa.admissibility.utils.AuditUtilities;
import it.gov.pagopa.common.reactive.utils.PerformanceLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
Expand All @@ -20,42 +20,42 @@ public class DeleteInitiativeServiceImpl implements DeleteInitiativeService{
private final InitiativeCountersRepository initiativeCountersRepository;
private final OnboardingFamiliesRepository onboardingFamiliesRepository;
private final AuditUtilities auditUtilities;
private final OnboardingContextHolderService onboardingContextHolderService;
private final int pageSize;
private final long delay;

public DeleteInitiativeServiceImpl(DroolsRuleRepository droolsRuleRepository,
InitiativeCountersRepository initiativeCountersRepository,
OnboardingFamiliesRepository onboardingFamiliesRepository,
AuditUtilities auditUtilities,
OnboardingContextHolderService onboardingContextHolderService,
@Value("${app.delete.paginationSize}") int pageSize,
@Value("${app.delete.delayTime}") long delay) {
this.droolsRuleRepository = droolsRuleRepository;
this.initiativeCountersRepository = initiativeCountersRepository;
this.onboardingFamiliesRepository = onboardingFamiliesRepository;
this.auditUtilities = auditUtilities;
this.onboardingContextHolderService = onboardingContextHolderService;
this.pageSize = pageSize;
this.delay = delay;
}

@Override
public Mono<String> execute(String initiativeId) {
log.info("[DELETE_INITIATIVE] Starting handle delete initiative {}", initiativeId);
return deleteDroolsRule(initiativeId)
.then(deleteInitiativeCounters(initiativeId))
.then(deleteOnboardingFamilies(initiativeId))
return execAndLogTiming("DELETE_DROOLS_RULE", initiativeId, deleteDroolsRule(initiativeId))
.then(execAndLogTiming("DELETE_INITIATIVE_COUNTERS", initiativeId, deleteInitiativeCounters(initiativeId)))
.then(execAndLogTiming("DELETE_ONBOARDING_FAMILIES", initiativeId, deleteOnboardingFamilies(initiativeId)))
.then(Mono.just(initiativeId));
}

private Mono<?> execAndLogTiming(String deleteFlowName, String initiativeId, Mono<?> deleteMono) {
return PerformanceLogger.logTimingFinally(deleteFlowName, deleteMono, initiativeId);
}

private Mono<Void> deleteDroolsRule(String initiativeId) {
return droolsRuleRepository.deleteById(initiativeId)
.doOnSuccess(d -> {
log.info("[DELETE_INITIATIVE] Deleted initiative {} from collection: beneficiary_rule", initiativeId);
auditUtilities.logDeletedDroolsRule(initiativeId);
})
.then(onboardingContextHolderService.refreshKieContainerCacheMiss())
.then();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@
import java.util.stream.IntStream;

@TestPropertySource(properties = {
"app.beneficiary-rule.build-delay-duration=PT1S",
"logging.level.it.gov.pagopa.admissibility.service.commands.CommandMediatorServiceImpl=WARN",
"logging.level.it.gov.pagopa.admissibility.service.commands.operations.DeleteInitiativeServiceImpl=WARN",
"logging.level.it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderServiceImpl=WARN",
"logging.level.it.gov.pagopa.common.reactive.utils.PerformanceLogger=WARN"
})
class CommandConsumerConfigIntegrationTest extends BaseIntegrationTest {
private final String INITIATIVEID = "INITIATIVEID_%d";
private final Set<String> INITIATIVES_DELETED = new HashSet<>();
@SpyBean
private DroolsRuleRepository droolsRuleRepository;
private DroolsRuleRepository droolsRuleRepositorySpy;

@Autowired
private InitiativeCountersRepository initiativeCountersRepository;

@Autowired
private OnboardingFamiliesRepository onboardingFamiliesRepository;

Expand All @@ -68,6 +70,8 @@ void test() {
checkRepositories();
checkErrorsPublished(notValidMessages, maxWaitingMs, errorUseCases);

Mockito.verify(droolsRuleRepositorySpy).findAll();

System.out.printf("""
************************
Time spent to send %d (%d + %d) messages (from start): %d millis
Expand Down Expand Up @@ -140,7 +144,7 @@ private void initializeDB(int bias) {
.initiativeConfig(initiativeConfig)
.rule("")
.build();
droolsRuleRepository.save(droolsRule).block();
droolsRuleRepositorySpy.save(droolsRule).block();

InitiativeCounters initiativeCounters = InitiativeCounters.builder()
.id(INITIATIVEID.formatted(bias))
Expand Down Expand Up @@ -184,15 +188,15 @@ protected Pattern getErrorUseCaseIdPatternMatch() {
errorUseCases.add(Pair.of(
() -> {
Mockito.doThrow(new MongoException("Command error dummy"))
.when(droolsRuleRepository).deleteById(errorInitiativeId);
.when(droolsRuleRepositorySpy).deleteById(errorInitiativeId);
return commandOperationErrorString;
},
errorMessage -> checkErrorMessageHeaders(errorMessage, "[ADMISSIBILITY_COMMANDS] An error occurred evaluating commands", commandOperationErrorString)
));
}

private void checkRepositories() {
Assertions.assertTrue(droolsRuleRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId())));
Assertions.assertTrue(droolsRuleRepositorySpy.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId())));
Assertions.assertTrue(initiativeCountersRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getId())));
Assertions.assertTrue(onboardingFamiliesRepository.findAll().toStream().noneMatch(ri -> INITIATIVES_DELETED.contains(ri.getInitiativeId())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
import it.gov.pagopa.admissibility.connector.repository.OnboardingFamiliesRepository;
import it.gov.pagopa.admissibility.dto.onboarding.extra.Family;
import it.gov.pagopa.admissibility.model.OnboardingFamilies;
import it.gov.pagopa.admissibility.service.onboarding.OnboardingContextHolderService;
import it.gov.pagopa.admissibility.utils.AuditUtilities;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.api.KieBase;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -25,7 +23,6 @@ class DeleteInitiativeServiceImplTest {
@Mock private InitiativeCountersRepository initiativeCountersRepositoryMock;
@Mock private OnboardingFamiliesRepository onboardingFamiliesRepositoryMock;
@Mock private AuditUtilities auditUtilitiesMock;
@Mock private OnboardingContextHolderService onboardingContextHolderService;

private DeleteInitiativeService deleteInitiativeService;
private final static int PAGE_SIZE = 100;
Expand All @@ -37,7 +34,8 @@ void setUp() {
droolsRuleRepositoryMock,
initiativeCountersRepositoryMock,
onboardingFamiliesRepositoryMock,
auditUtilitiesMock, onboardingContextHolderService, PAGE_SIZE, 1000L);
auditUtilitiesMock,
PAGE_SIZE, 1000L);
}

@Test
Expand All @@ -48,9 +46,6 @@ void executeOK() {
Mockito.when(droolsRuleRepositoryMock.deleteById(initiativeId))
.thenReturn(Mono.just(Mockito.mock(Void.class)));

Mockito.when(onboardingContextHolderService.refreshKieContainerCacheMiss())
.thenReturn(Mono.just(Mockito.mock(KieBase.class)));

Mockito.when(initiativeCountersRepositoryMock.deleteById(initiativeId))
.thenReturn(Mono.just(Mockito.mock(Void.class)));

Expand Down

0 comments on commit 3158f93

Please sign in to comment.