From d60db8acd58b8895c966056b01e59fd2b79e4d4d Mon Sep 17 00:00:00 2001 From: Serhii Nosko Date: Wed, 22 Sep 2021 12:13:12 +0300 Subject: [PATCH] MODDATAIMP-499. Spike migrate to Spring Kafka --- mod-source-record-manager-server/pom.xml | 11 ++ .../org/folio/config/ApplicationConfig.java | 32 +++++ .../services/ChangeEngineServiceImpl.java | 19 ++- .../folio/services/KafkaProducerService.java | 131 ++++++++++++++++++ .../QuickMarcEventProducerServiceImpl.java | 50 ++----- .../RecordsPublishingServiceImpl.java | 23 ++- .../services/util/EventHandlingUtil.java | 100 ------------- .../verticle/AbstractConsumersVerticle.java | 12 +- .../org/folio/rest/impl/AbstractRestTest.java | 30 +++- .../services/ChangeEngineServiceImplTest.java | 12 +- ...tDrivenChunkProcessingServiceImplTest.java | 60 ++------ ...ProcessedEventHandlingServiceImplTest.java | 5 +- 12 files changed, 249 insertions(+), 236 deletions(-) create mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/KafkaProducerService.java diff --git a/mod-source-record-manager-server/pom.xml b/mod-source-record-manager-server/pom.xml index 4b1bc2709..facbee02a 100644 --- a/mod-source-record-manager-server/pom.xml +++ b/mod-source-record-manager-server/pom.xml @@ -40,6 +40,11 @@ + + org.springframework.kafka + spring-kafka + 2.5.15.RELEASE + org.folio postgres-testing @@ -216,6 +221,12 @@ org.folio folio-kafka-wrapper 2.4.0-SNAPSHOT + + + io.vertx + vertx-kafka-client + + org.folio diff --git a/mod-source-record-manager-server/src/main/java/org/folio/config/ApplicationConfig.java b/mod-source-record-manager-server/src/main/java/org/folio/config/ApplicationConfig.java index bdbd5f613..ca752fec3 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/config/ApplicationConfig.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/config/ApplicationConfig.java @@ -1,6 +1,8 @@ package org.folio.config; import io.vertx.core.Vertx; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.dataimport.util.marc.MarcRecordAnalyzer; @@ -13,6 +15,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; @Configuration @ComponentScan(basePackages = { @@ -40,6 +48,7 @@ public class ApplicationConfig { @Value("${srm.kafkacache.expiration.time.hours:3}") private int cachedEventExpirationTimeHours; + // todo need to be deleted after story to migrate consumers to Spring Kafka @Bean(name = "newKafkaConfig") public KafkaConfig kafkaConfigBean() { KafkaConfig kafkaConfig = KafkaConfig.builder() @@ -80,4 +89,27 @@ public KafkaInternalCache kafkaInternalCache(KafkaConfig kafkaConfig) { return kafkaInternalCache; } + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + kafkaHost + ":" + kafkaPort); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, + maxRequestSize); + + LOGGER.info("kafkaConfig: " + configProps); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java index 876d83f2b..a5762deba 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java @@ -18,7 +18,6 @@ import static org.folio.services.afterprocessing.AdditionalFieldsUtil.addFieldToMarcRecord; import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getControlFieldValue; import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getValue; -import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; import java.util.ArrayList; import java.util.Collection; @@ -54,7 +53,6 @@ import org.folio.dataimport.util.marc.MarcRecordAnalyzer; import org.folio.dataimport.util.marc.MarcRecordType; import org.folio.dataimport.util.marc.RecordAnalyzer; -import org.folio.kafka.KafkaConfig; import org.folio.kafka.KafkaHeaderUtils; import org.folio.rest.client.SourceStorageBatchClient; import org.folio.rest.jaxrs.model.ActionProfile; @@ -99,10 +97,12 @@ public class ChangeEngineServiceImpl implements ChangeEngineService { private RecordAnalyzer marcRecordAnalyzer; private HrIdFieldService hrIdFieldService; private RecordsPublishingService recordsPublishingService; - private KafkaConfig kafkaConfig; + private KafkaProducerService kafkaProducerService; @Value("${srm.kafka.RawChunksKafkaHandler.maxDistributionNum:100}") private int maxDistributionNum; + @Value("${ENV:folio}") + private String envId; @Value("${marc.holdings.batch.size:100}") private int batchSize; @@ -112,13 +112,13 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio @Autowired MarcRecordAnalyzer marcRecordAnalyzer, @Autowired HrIdFieldService hrIdFieldService, @Autowired RecordsPublishingService recordsPublishingService, - @Autowired KafkaConfig kafkaConfig) { + @Autowired KafkaProducerService kafkaProducerService) { this.jobExecutionSourceChunkDao = jobExecutionSourceChunkDao; this.jobExecutionService = jobExecutionService; this.marcRecordAnalyzer = marcRecordAnalyzer; this.hrIdFieldService = hrIdFieldService; this.recordsPublishingService = recordsPublishingService; - this.kafkaConfig = kafkaConfig; + this.kafkaProducerService = kafkaProducerService; } @Override @@ -339,8 +339,8 @@ private void populateError(Record record, JobExecution jobExecution, OkapiConnec .withContent(record.getRawRecord().getContent()) .withDescription(new JsonObject().put(MESSAGE_KEY, HOLDINGS_004_TAG_ERROR_MESSAGE).encode()) ); - sendEventToKafka(okapiParams.getTenantId(), Json.encode(eventPayload), DI_ERROR.value(), - KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), kafkaConfig, key) + kafkaProducerService.sendEvent(okapiParams.getTenantId(), envId, Json.encode(eventPayload), DI_ERROR.value(), + KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), key) .onFailure(th -> LOGGER.error("Error publishing DI_ERROR event for MARC Holdings record with id {}", record.getId(), th)); } @@ -467,8 +467,7 @@ private Future> saveRecords(OkapiConnectionParams params, JobExecut String key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum); - return sendEventToKafka(params.getTenantId(), Json.encode(recordCollection), DI_RAW_RECORDS_CHUNK_PARSED.value(), - kafkaHeaders, kafkaConfig, key) - .map(parsedRecords); + return kafkaProducerService.sendEvent(params.getTenantId(), envId, Json.encode(recordCollection), + DI_RAW_RECORDS_CHUNK_PARSED.value(), kafkaHeaders, key).map(parsedRecords); } } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/KafkaProducerService.java b/mod-source-record-manager-server/src/main/java/org/folio/services/KafkaProducerService.java new file mode 100644 index 000000000..35ee794e1 --- /dev/null +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/KafkaProducerService.java @@ -0,0 +1,131 @@ +package org.folio.services; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.json.Json; +import io.vertx.kafka.client.producer.KafkaHeader; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.kafka.KafkaTopicNameHelper; +import org.folio.processing.events.utils.ZIPArchiver; +import org.folio.rest.jaxrs.model.Event; +import org.folio.rest.jaxrs.model.EventMetadata; +import org.folio.services.util.EventHandlingUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +@Service +public class KafkaProducerService { + private static final Logger LOGGER = LogManager.getLogger(); + + private KafkaTemplate kafkaTemplate; + + @Autowired + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + /** + * Prepares and sends event to kafka. + * + * @param tenantId tenant id + * @param envId env id + * @param eventPayload eventPayload in String representation + * @param eventType eventType + * @param kafkaHeaders kafka headers + * @return completed future with true if event was sent successfully + */ + public Future sendEvent(String tenantId, String envId, String eventPayload, String eventType, + List kafkaHeaders, String key) { + + Event event; + try { + event = createEvent(eventPayload, eventType, tenantId, true); + } catch (IOException e) { + LOGGER.error("Failed to construct an event for eventType {}", eventType, e); + Promise promise = Promise.promise(); + promise.fail(e); + return promise.future(); + } + + String topicName = createTopicName(eventType, tenantId, envId); + + ProducerRecord record = createProducerRecord(event, key, topicName, kafkaHeaders); + + return sendEvent(eventType, record); + } + + /** + * Sends event to kafka. + * + * @param eventType event type + * @param record producer record + * @return completed future with true if event was sent successfully + */ + public Future sendEvent(String eventType, ProducerRecord record) { + LOGGER.debug("Starting to send event to Kafka for eventType: {}", eventType); + + Promise promise = Promise.promise(); + + ListenableFuture> future = kafkaTemplate.send(record); + future.addCallback(new ListenableFutureCallback<>() { + + @Override + public void onSuccess(SendResult result) { + Headers headers = result.getProducerRecord().headers(); + LOGGER.info("Event with type: {} and correlationId: {} was sent to kafka", eventType, getHeaderValue(headers, "correlationId")); + promise.complete(true); + } + + @Override + public void onFailure(Throwable e) { + LOGGER.error("Next chunk has failed with errors", e); + promise.fail(e); + } + }); + + return promise.future(); + } + + private String getHeaderValue(Headers headers, String headerName) { + return Optional.ofNullable(headers.lastHeader(headerName)).map(header -> new String(header.value())).orElse(null); + } + + protected String createTopicName(String eventType, String tenantId, String envId) { + return KafkaTopicNameHelper.formatTopicName(envId, KafkaTopicNameHelper.getDefaultNameSpace(), + tenantId, eventType); + } + + protected Event createEvent(String eventPayload, String eventType, String tenantId, boolean isZipped) throws IOException { + return new Event() + .withId(UUID.randomUUID().toString()) + .withEventType(eventType) + .withEventPayload(isZipped ? ZIPArchiver.zip(eventPayload) : eventPayload) + .withEventMetadata(new EventMetadata() + .withTenantId(tenantId) + .withEventTTL(1) + .withPublishedBy(EventHandlingUtil.constructModuleName())); + } + + + public ProducerRecord createProducerRecord(Event event, String key, String topicName, List kafkaHeaders) { + List
headers = kafkaHeaders.stream() + .map(header -> new RecordHeader(header.key(), header.value().getBytes())) + .collect(Collectors.toList()); + return new ProducerRecord<>(topicName,null, key, Json.encode(event), headers); + } +} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java index ff9490a36..4bbf2440f 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/QuickMarcEventProducerServiceImpl.java @@ -1,38 +1,24 @@ package org.folio.services; -import static org.folio.services.util.EventHandlingUtil.createEvent; -import static org.folio.services.util.EventHandlingUtil.createProducer; -import static org.folio.services.util.EventHandlingUtil.createProducerRecord; -import static org.folio.services.util.EventHandlingUtil.createTopicName; -import static org.folio.verticle.consumers.util.QMEventTypes.QM_COMPLETED; -import static org.folio.verticle.consumers.util.QMEventTypes.QM_RECORD_UPDATED; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.kafka.client.producer.KafkaHeader; -import io.vertx.kafka.client.producer.KafkaProducer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.folio.kafka.KafkaConfig; +import java.util.List; @Service public class QuickMarcEventProducerServiceImpl implements QuickMarcEventProducerService { private static final Logger LOGGER = LogManager.getLogger(); - private final KafkaConfig kafkaConfig; - private final Map> kafkaProducers = new HashMap<>(); - - public QuickMarcEventProducerServiceImpl(KafkaConfig kafkaConfig) { - this.kafkaConfig = kafkaConfig; - kafkaProducers.put(QM_RECORD_UPDATED.name(), createProducer(QM_RECORD_UPDATED.name(), kafkaConfig)); - kafkaProducers.put(QM_COMPLETED.name(), createProducer(QM_COMPLETED.name(), kafkaConfig)); - } + @Value("${ENV:folio}") + private String envId; + @Autowired + private KafkaProducerService kafkaProducerService; @Override public Future sendEvent(String eventPayload, String eventType, String key, String tenantId, @@ -50,24 +36,10 @@ private Future sendEventInternal(String eventPayload, String eventType, List kafkaHeaders, boolean isZipped) { Promise promise = Promise.promise(); try { - var event = createEvent(eventPayload, eventType, tenantId, isZipped); - var topicName = createTopicName(eventType, tenantId, kafkaConfig); - var record = createProducerRecord(event, key, topicName, kafkaHeaders); - var producer = kafkaProducers.get(eventType); - if (producer != null) { - producer.write(record) - .onSuccess(unused -> { - LOGGER.info("Event with type {} was sent to kafka", eventType); - promise.complete(true); - }) - .onFailure(throwable -> { - var cause = throwable.getCause(); - LOGGER.error("Error while send event {}: {}", eventType, cause); - promise.fail(cause); - }); - } else { - promise.fail("No producer found for event: " + eventType); - } + var event = kafkaProducerService.createEvent(eventPayload, eventType, tenantId, isZipped); + var topicName = kafkaProducerService.createTopicName(eventType, tenantId, envId); + var record = kafkaProducerService.createProducerRecord(event, key, topicName, kafkaHeaders); + kafkaProducerService.sendEvent(eventType, record); } catch (Exception e) { promise.fail(e); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java index 77e152f7c..83e19a1f2 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/RecordsPublishingServiceImpl.java @@ -7,8 +7,6 @@ import static org.folio.rest.jaxrs.model.EntityType.MARC_BIBLIOGRAPHIC; import static org.folio.rest.jaxrs.model.EntityType.MARC_HOLDINGS; import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_AUTHORITY; -import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_HOLDING; -import static org.folio.services.util.EventHandlingUtil.sendEventToKafka; import java.util.ArrayList; import java.util.HashMap; @@ -29,7 +27,6 @@ import org.springframework.stereotype.Service; import org.folio.dataimport.util.OkapiConnectionParams; -import org.folio.kafka.KafkaConfig; import org.folio.kafka.KafkaHeaderUtils; import org.folio.okapi.common.GenericCompositeFuture; import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; @@ -52,21 +49,23 @@ public class RecordsPublishingServiceImpl implements RecordsPublishingService { private MappingParametersProvider mappingParametersProvider; private MappingRuleCache mappingRuleCache; private DataImportPayloadContextBuilder payloadContextBuilder; - private KafkaConfig kafkaConfig; + private KafkaProducerService kafkaProducerService; @Value("${srm.kafka.CreatedRecordsKafkaHandler.maxDistributionNum:100}") private int maxDistributionNum; + @Value("${ENV:folio}") + private String envId; public RecordsPublishingServiceImpl(@Autowired JobExecutionService jobExecutionService, @Autowired MappingParametersProvider mappingParametersProvider, @Autowired MappingRuleCache mappingRuleCache, @Autowired DataImportPayloadContextBuilder payloadContextBuilder, - @Autowired KafkaConfig kafkaConfig) { + @Autowired KafkaProducerService kafkaProducerService) { this.jobExecutionService = jobExecutionService; this.mappingParametersProvider = mappingParametersProvider; this.mappingRuleCache = mappingRuleCache; this.payloadContextBuilder = payloadContextBuilder; - this.kafkaConfig = kafkaConfig; + this.kafkaProducerService = kafkaProducerService; } @Override @@ -96,14 +95,14 @@ private Future sendRecords(List createdRecords, JobExecution jo if (isRecordReadyToSend(record)) { DataImportEventPayload payload = prepareEventPayload(record, profileSnapshotWrapper, mappingRules, mappingParameters, params, eventType); params.getHeaders().set(CORRELATION_ID_HEADER, UUID.randomUUID().toString()); - Future booleanFuture = sendEventToKafka(params.getTenantId(), Json.encode(payload), - eventType, KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), kafkaConfig, key); - futures.add(booleanFuture.onFailure(th -> sendEventWithRecordPublishingError(record, jobExecution, params, th.getMessage(), kafkaConfig, key))); + Future booleanFuture = kafkaProducerService.sendEvent(params.getTenantId(), envId, Json.encode(payload), + eventType, KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), key); + futures.add(booleanFuture.onFailure(th -> sendEventWithRecordPublishingError(record, jobExecution, params, th.getMessage(), key))); } } catch (Exception e) { LOGGER.error("Error publishing event with record", e); futures.add(Future.failedFuture(e) - .onFailure(th -> sendEventWithRecordPublishingError(record, jobExecution, params, th.getMessage(), kafkaConfig, key))); + .onFailure(th -> sendEventWithRecordPublishingError(record, jobExecution, params, th.getMessage(), key))); } } @@ -118,7 +117,7 @@ private Future sendRecords(List createdRecords, JobExecution jo return promise.future(); } - private void sendEventWithRecordPublishingError(Record record, JobExecution jobExecution, OkapiConnectionParams params, String errorMsg, KafkaConfig kafkaConfig, String key) { + private void sendEventWithRecordPublishingError(Record record, JobExecution jobExecution, OkapiConnectionParams params, String errorMsg, String key) { String sourceRecordKey = getSourceRecordKey(record); DataImportEventPayload eventPayload = new DataImportEventPayload() @@ -133,7 +132,7 @@ private void sendEventWithRecordPublishingError(Record record, JobExecution jobE put(ERROR_MSG_KEY, errorMsg); }}); - sendEventToKafka(params.getTenantId(), Json.encode(eventPayload), DI_ERROR.value(), KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), kafkaConfig, key) + kafkaProducerService.sendEvent(params.getTenantId(), envId, Json.encode(eventPayload), DI_ERROR.value(), KafkaHeaderUtils.kafkaHeadersFromMultiMap(params.getHeaders()), key) .onFailure(th -> LOGGER.error("Error publishing DI_ERROR event for record with id {}", record.getId(), th)); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/util/EventHandlingUtil.java b/mod-source-record-manager-server/src/main/java/org/folio/services/util/EventHandlingUtil.java index 879fef771..98d197a7d 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/util/EventHandlingUtil.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/util/EventHandlingUtil.java @@ -1,115 +1,15 @@ package org.folio.services.util; -import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.json.Json; -import io.vertx.kafka.client.producer.KafkaHeader; -import io.vertx.kafka.client.producer.KafkaProducer; -import io.vertx.kafka.client.producer.KafkaProducerRecord; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaTopicNameHelper; import org.folio.processing.events.utils.PomReaderUtil; -import org.folio.processing.events.utils.ZIPArchiver; -import org.folio.rest.jaxrs.model.Event; -import org.folio.rest.jaxrs.model.EventMetadata; import org.folio.rest.tools.utils.ModuleName; -import java.io.IOException; -import java.util.List; -import java.util.UUID; - public final class EventHandlingUtil { private EventHandlingUtil() { } - private static final Logger LOGGER = LogManager.getLogger(); - - /** - * Prepares and sends event with zipped payload to kafka - * - * @param tenantId tenant id - * @param eventPayload eventPayload in String representation - * @param eventType eventType - * @param kafkaHeaders kafka headers - * @param kafkaConfig kafka config - * @return completed future with true if event was sent successfully - */ - public static Future sendEventToKafka(String tenantId, String eventPayload, String eventType, - List kafkaHeaders, KafkaConfig kafkaConfig, String key) { - LOGGER.debug("Starting to send event to Kafka for eventType: {}", eventType); - Event event; - try { - event = createEvent(eventPayload, eventType, tenantId, true); - } catch (IOException e) { - LOGGER.error("Failed to construct an event for eventType {}", eventType, e); - return Future.failedFuture(e); - } - - String topicName = createTopicName(eventType, tenantId, kafkaConfig); - - KafkaProducerRecord record = createProducerRecord(event, key, topicName, kafkaHeaders); - - Promise promise = Promise.promise(); - - String correlationId = extractCorrelationId(kafkaHeaders); - String producerName = eventType + "_Producer"; - KafkaProducer producer = - KafkaProducer.createShared(Vertx.currentContext().owner(), producerName, kafkaConfig.getProducerProps()); - producer.write(record, war -> { - producer.end(ear -> producer.close()); - if (war.succeeded()) { - LOGGER.info("Event with type: {} and correlationId: {} was sent to kafka", eventType, correlationId); - promise.complete(true); - } else { - Throwable cause = war.cause(); - LOGGER.error("{} write error for event {}:", producerName, eventType, cause); - promise.fail(cause); - } - }); - return promise.future(); - } - - private static String extractCorrelationId(List kafkaHeaders) { - return kafkaHeaders.stream() - .filter(header -> header.key().equals("correlationId")) - .findFirst() - .map(header -> header.value().toString()) - .orElse(null); - } - - public static KafkaProducerRecord createProducerRecord(Event event, String key, String topicName, List kafkaHeaders) { - KafkaProducerRecord record = KafkaProducerRecord.create(topicName, key, Json.encode(event)); - record.addHeaders(kafkaHeaders); - return record; - } - - public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) { - return KafkaTopicNameHelper.formatTopicName(kafkaConfig.getEnvId(), KafkaTopicNameHelper.getDefaultNameSpace(), - tenantId, eventType); - } - - public static Event createEvent(String eventPayload, String eventType, String tenantId, boolean isZipped) throws IOException { - return new Event() - .withId(UUID.randomUUID().toString()) - .withEventType(eventType) - .withEventPayload(isZipped ? ZIPArchiver.zip(eventPayload) : eventPayload) - .withEventMetadata(new EventMetadata() - .withTenantId(tenantId) - .withEventTTL(1) - .withPublishedBy(constructModuleName())); - } - public static String constructModuleName() { return PomReaderUtil.INSTANCE.constructModuleVersionAndVersion(ModuleName.getModuleName(), ModuleName.getModuleVersion()); } - - public static KafkaProducer createProducer(String eventType, KafkaConfig kafkaConfig) { - String producerName = eventType + "_Producer"; - return KafkaProducer.createShared(Vertx.currentContext().owner(), producerName, kafkaConfig.getProducerProps()); - } } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/verticle/AbstractConsumersVerticle.java b/mod-source-record-manager-server/src/main/java/org/folio/verticle/AbstractConsumersVerticle.java index 66495cea6..58a7c7377 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/verticle/AbstractConsumersVerticle.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/verticle/AbstractConsumersVerticle.java @@ -3,13 +3,9 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.Promise; -import org.folio.kafka.AsyncRecordHandler; -import org.folio.kafka.GlobalLoadSensor; -import org.folio.kafka.KafkaConfig; -import org.folio.kafka.KafkaConsumerWrapper; -import org.folio.kafka.KafkaTopicNameHelper; -import org.folio.kafka.SubscriptionDefinition; +import org.folio.kafka.*; import org.folio.okapi.common.GenericCompositeFuture; +import org.folio.services.util.EventHandlingUtil; import org.folio.spring.SpringContextUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -19,8 +15,6 @@ import java.util.ArrayList; import java.util.List; -import static org.folio.services.util.EventHandlingUtil.constructModuleName; - public abstract class AbstractConsumersVerticle extends AbstractVerticle { //TODO: get rid of this workaround with global spring context @@ -60,7 +54,7 @@ public void start(Promise startPromise) { List> futures = new ArrayList<>(); consumerWrappersList.forEach(consumerWrapper -> futures.add(consumerWrapper.start(getHandler(), - constructModuleName() + "_" + getClass().getSimpleName()))); + EventHandlingUtil.constructModuleName() + "_" + getClass().getSimpleName()))); GenericCompositeFuture.all(futures).onComplete(ar -> startPromise.complete()); } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java index 481d54d72..bc32d7afe 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/rest/impl/AbstractRestTest.java @@ -27,9 +27,10 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.folio.TestUtil; import org.folio.kafka.KafkaTopicNameHelper; -import org.folio.postgres.testing.PostgresTesterContainer; import org.folio.rest.RestVerticle; import org.folio.rest.client.TenantClient; import org.folio.rest.jaxrs.model.ActionProfile; @@ -52,15 +53,15 @@ import org.junit.ClassRule; import org.junit.Rule; import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.testcontainers.containers.PostgreSQLContainer; import java.io.IOException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; import static com.github.tomakehurst.wiremock.client.WireMock.get; @@ -208,6 +209,25 @@ public static void tearDownClass(final TestContext context) { })); } + private ProducerFactory producerFactory() { + String[] hostAndPort = kafkaCluster.getBrokerList().split(":"); + Map configProps = new HashMap<>(); + configProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + System.getProperty(hostAndPort[0]) + ":" + System.getProperty(hostAndPort[1])); + configProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + protected KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + private static void runDatabase() throws Exception { PostgresClient.stopPostgresTester(); PostgresClient.closeAllClients(); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java index 1ddb53ef4..cd522436a 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java @@ -24,7 +24,6 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.test.util.ReflectionTestUtils; @@ -32,7 +31,6 @@ import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.dataimport.util.marc.MarcRecordAnalyzer; import org.folio.dataimport.util.marc.MarcRecordType; -import org.folio.kafka.KafkaConfig; import org.folio.rest.jaxrs.model.ActionProfile; import org.folio.rest.jaxrs.model.InitialRecord; import org.folio.rest.jaxrs.model.JobExecution; @@ -43,7 +41,6 @@ import org.folio.rest.jaxrs.model.Record; import org.folio.rest.jaxrs.model.RecordsMetadata; import org.folio.services.afterprocessing.HrIdFieldService; -import org.folio.services.util.EventHandlingUtil; @RunWith(MockitoJUnitRunner.class) public class ChangeEngineServiceImplTest { @@ -63,7 +60,7 @@ public class ChangeEngineServiceImplTest { @Mock private RecordsPublishingService recordsPublishingService; @Mock - private KafkaConfig kafkaConfig; + private KafkaProducerService kafkaProducerService; private OkapiConnectionParams okapiConnectionParams = new OkapiConnectionParams(new HashMap<>(), Vertx.vertx()); @InjectMocks @@ -184,10 +181,7 @@ private JobExecution getTestJobExecution() { private Future> executeWithKafkaMock(RawRecordsDto rawRecordsDto, JobExecution jobExecution, Future eventSentResult) { - try (var mockedStatic = Mockito.mockStatic(EventHandlingUtil.class)) { - mockedStatic.when(() -> EventHandlingUtil.sendEventToKafka(any(), any(), any(), anyList(), any(), any())) - .thenReturn(eventSentResult); - return service.parseRawRecordsChunkForJobExecution(rawRecordsDto, jobExecution, "1", okapiConnectionParams); - } + when(kafkaProducerService.sendEvent(any(), any(), any(), any(), anyList(), any())).thenReturn(eventSentResult); + return service.parseRawRecordsChunkForJobExecution(rawRecordsDto, jobExecution, "1", okapiConnectionParams); } } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java index e9cd6a848..7c4dbf980 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java @@ -13,30 +13,14 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.folio.Record; import org.folio.TestUtil; -import org.folio.dao.JobExecutionDaoImpl; -import org.folio.dao.JobExecutionProgressDaoImpl; -import org.folio.dao.JobExecutionSourceChunkDaoImpl; -import org.folio.dao.JobMonitoringDaoImpl; -import org.folio.dao.JournalRecordDaoImpl; -import org.folio.dao.MappingParamsSnapshotDaoImpl; -import org.folio.dao.MappingRuleDaoImpl; -import org.folio.dao.MappingRulesSnapshotDaoImpl; +import org.folio.dao.*; import org.folio.dao.util.PostgresClientFactory; import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.dataimport.util.marc.MarcRecordAnalyzer; -import org.folio.kafka.KafkaConfig; import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; import org.folio.rest.impl.AbstractRestTest; -import org.folio.rest.jaxrs.model.File; -import org.folio.rest.jaxrs.model.InitJobExecutionsRqDto; -import org.folio.rest.jaxrs.model.InitialRecord; -import org.folio.rest.jaxrs.model.JobExecution; -import org.folio.rest.jaxrs.model.JobProfileInfo; +import org.folio.rest.jaxrs.model.*; import org.folio.rest.jaxrs.model.JobProfileInfo.DataType; -import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; -import org.folio.rest.jaxrs.model.RawRecordsDto; -import org.folio.rest.jaxrs.model.RecordsMetadata; -import org.folio.rest.jaxrs.model.StatusDto; import org.folio.services.afterprocessing.HrIdFieldServiceImpl; import org.folio.services.mappers.processor.MappingParametersProvider; import org.folio.services.progress.JobExecutionProgressServiceImpl; @@ -45,27 +29,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.InjectMocks; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.Spy; +import org.mockito.*; import org.springframework.test.util.ReflectionTestUtils; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Optional; -import java.util.UUID; - -import static com.github.tomakehurst.wiremock.client.WireMock.created; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.ok; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import java.util.*; + import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static com.github.tomakehurst.wiremock.client.WireMock.*; import static org.folio.dataimport.util.RestUtil.OKAPI_URL_HEADER; import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.JOB_PROFILE; import static org.folio.rest.jaxrs.model.StatusDto.Status.ERROR; @@ -73,12 +44,8 @@ import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; @RunWith(VertxUnitRunner.class) // TODO fix in scope of MODSOURMAN-400 @@ -145,7 +112,7 @@ public class EventDrivenChunkProcessingServiceImplTest extends AbstractRestTest @Spy private RecordsPublishingService recordsPublishingService; - private KafkaConfig kafkaConfig; + private KafkaProducerService kafkaProducerService; private MappingRuleCache mappingRuleCache; private ChangeEngineService changeEngineService; private ChunkProcessingService chunkProcessingService; @@ -174,12 +141,6 @@ public class EventDrivenChunkProcessingServiceImplTest extends AbstractRestTest public void setUp() throws IOException { String rules = TestUtil.readFileFromPath(RULES_PATH); MockitoAnnotations.openMocks(this); - String[] hostAndPort = kafkaCluster.getBrokerList().split(":"); - kafkaConfig = KafkaConfig.builder() - .kafkaHost(hostAndPort[0]) - .kafkaPort(hostAndPort[1]) - .envId(KAFKA_ENV_ID) - .build(); mappingRuleDao = when(mock(MappingRuleDaoImpl.class).get(any(Record.RecordType.class), anyString())).thenReturn(Future.succeededFuture(Optional.of(new JsonObject(rules)))).getMock(); marcRecordAnalyzer = new MarcRecordAnalyzer(); @@ -187,7 +148,8 @@ public void setUp() throws IOException { mappingRuleService = new MappingRuleServiceImpl(mappingRuleDao, mappingRuleCache); mappingParametersProvider = when(mock(MappingParametersProvider.class).get(anyString(), any(OkapiConnectionParams.class))).thenReturn(Future.succeededFuture(new MappingParameters())).getMock(); - changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService, recordsPublishingService, kafkaConfig); + kafkaProducerService = new KafkaProducerService(kafkaTemplate()); + changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService, recordsPublishingService, kafkaProducerService); ReflectionTestUtils.setField(changeEngineService, "maxDistributionNum", 10); ReflectionTestUtils.setField(changeEngineService, "batchSize", 100); mappingMetadataService = new MappingMetadataServiceImpl(mappingParametersProvider, mappingRuleService, mappingRulesSnapshotDao, mappingParamsSnapshotDao); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java index 19c695a00..6cf114c17 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java @@ -64,7 +64,6 @@ import org.folio.dao.util.PostgresClientFactory; import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.dataimport.util.marc.MarcRecordAnalyzer; -import org.folio.kafka.KafkaConfig; import org.folio.processing.events.utils.ZIPArchiver; import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; import org.folio.rest.impl.AbstractRestTest; @@ -97,7 +96,7 @@ public class RecordProcessedEventHandlingServiceImplTest extends AbstractRestTes private Vertx vertx = Vertx.vertx(); @Spy - private KafkaConfig kafkaConfig; + private KafkaProducerService kafkaProducerService; @Spy private PostgresClientFactory postgresClientFactory = new PostgresClientFactory(vertx); @Spy @@ -175,7 +174,7 @@ public void setUp() throws IOException { MockitoAnnotations.openMocks(this); mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx); marcRecordAnalyzer = new MarcRecordAnalyzer(); - changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService , recordsPublishingService, kafkaConfig); + changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService , recordsPublishingService, kafkaProducerService); mappingRuleService = new MappingRuleServiceImpl(mappingRuleDao, mappingRuleCache); mappingRuleDao = when(mock(MappingRuleDaoImpl.class).get(any(), anyString())).thenReturn(Future.succeededFuture(Optional.of(new JsonObject(rules)))).getMock(); mappingParametersProvider = when(mock(MappingParametersProvider.class).get(anyString(), any(OkapiConnectionParams.class))).thenReturn(Future.succeededFuture(new MappingParameters())).getMock();