Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODDATAIMP-499. Spike migrate to Spring Kafka #492

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions mod-source-record-manager-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.15.RELEASE</version>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>postgres-testing</artifactId>
Expand Down Expand Up @@ -216,6 +221,12 @@
<groupId>org.folio</groupId>
<artifactId>folio-kafka-wrapper</artifactId>
<version>2.4.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.folio</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.folio.config;

import io.vertx.core.Vertx;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dataimport.util.marc.MarcRecordAnalyzer;
Expand All @@ -13,6 +15,12 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@ComponentScan(basePackages = {
Expand Down Expand Up @@ -40,6 +48,7 @@ public class ApplicationConfig {
@Value("${srm.kafkacache.expiration.time.hours:3}")
private int cachedEventExpirationTimeHours;

// todo need to be deleted after story to migrate consumers to Spring Kafka
@Bean(name = "newKafkaConfig")
public KafkaConfig kafkaConfigBean() {
KafkaConfig kafkaConfig = KafkaConfig.builder()
Expand Down Expand Up @@ -80,4 +89,27 @@ public KafkaInternalCache kafkaInternalCache(KafkaConfig kafkaConfig) {

return kafkaInternalCache;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaHost + ":" + kafkaPort);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
maxRequestSize);

LOGGER.info("kafkaConfig: " + configProps);

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.addFieldToMarcRecord;
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getControlFieldValue;
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getValue;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -54,7 +53,6 @@
import org.folio.dataimport.util.marc.MarcRecordAnalyzer;
import org.folio.dataimport.util.marc.MarcRecordType;
import org.folio.dataimport.util.marc.RecordAnalyzer;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaHeaderUtils;
import org.folio.rest.client.SourceStorageBatchClient;
import org.folio.rest.jaxrs.model.ActionProfile;
Expand Down Expand Up @@ -99,10 +97,12 @@ public class ChangeEngineServiceImpl implements ChangeEngineService {
private RecordAnalyzer marcRecordAnalyzer;
private HrIdFieldService hrIdFieldService;
private RecordsPublishingService recordsPublishingService;
private KafkaConfig kafkaConfig;
private KafkaProducerService kafkaProducerService;

@Value("${srm.kafka.RawChunksKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;
@Value("${ENV:folio}")
private String envId;

@Value("${marc.holdings.batch.size:100}")
private int batchSize;
Expand All @@ -112,13 +112,13 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio
@Autowired MarcRecordAnalyzer marcRecordAnalyzer,
@Autowired HrIdFieldService hrIdFieldService,
@Autowired RecordsPublishingService recordsPublishingService,
@Autowired KafkaConfig kafkaConfig) {
@Autowired KafkaProducerService kafkaProducerService) {
this.jobExecutionSourceChunkDao = jobExecutionSourceChunkDao;
this.jobExecutionService = jobExecutionService;
this.marcRecordAnalyzer = marcRecordAnalyzer;
this.hrIdFieldService = hrIdFieldService;
this.recordsPublishingService = recordsPublishingService;
this.kafkaConfig = kafkaConfig;
this.kafkaProducerService = kafkaProducerService;
}

@Override
Expand Down Expand Up @@ -339,8 +339,8 @@ private void populateError(Record record, JobExecution jobExecution, OkapiConnec
.withContent(record.getRawRecord().getContent())
.withDescription(new JsonObject().put(MESSAGE_KEY, HOLDINGS_004_TAG_ERROR_MESSAGE).encode())
);
sendEventToKafka(okapiParams.getTenantId(), Json.encode(eventPayload), DI_ERROR.value(),
KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), kafkaConfig, key)
kafkaProducerService.sendEvent(okapiParams.getTenantId(), envId, Json.encode(eventPayload), DI_ERROR.value(),
KafkaHeaderUtils.kafkaHeadersFromMultiMap(okapiParams.getHeaders()), key)
.onFailure(th -> LOGGER.error("Error publishing DI_ERROR event for MARC Holdings record with id {}", record.getId(), th));
}

Expand Down Expand Up @@ -467,8 +467,7 @@ private Future<List<Record>> saveRecords(OkapiConnectionParams params, JobExecut

String key = String.valueOf(indexer.incrementAndGet() % maxDistributionNum);

return sendEventToKafka(params.getTenantId(), Json.encode(recordCollection), DI_RAW_RECORDS_CHUNK_PARSED.value(),
kafkaHeaders, kafkaConfig, key)
.map(parsedRecords);
return kafkaProducerService.sendEvent(params.getTenantId(), envId, Json.encode(recordCollection),
DI_RAW_RECORDS_CHUNK_PARSED.value(), kafkaHeaders, key).map(parsedRecords);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package org.folio.services;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.KafkaHeader;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.processing.events.utils.ZIPArchiver;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.EventMetadata;
import org.folio.services.util.EventHandlingUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

@Service
public class KafkaProducerService {
private static final Logger LOGGER = LogManager.getLogger();

private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

/**
* Prepares and sends event to kafka.
*
* @param tenantId tenant id
* @param envId env id
* @param eventPayload eventPayload in String representation
* @param eventType eventType
* @param kafkaHeaders kafka headers
* @return completed future with true if event was sent successfully
*/
public Future<Boolean> sendEvent(String tenantId, String envId, String eventPayload, String eventType,
List<KafkaHeader> kafkaHeaders, String key) {

Event event;
try {
event = createEvent(eventPayload, eventType, tenantId, true);
} catch (IOException e) {
LOGGER.error("Failed to construct an event for eventType {}", eventType, e);
Promise<Boolean> promise = Promise.promise();
promise.fail(e);
return promise.future();
}

String topicName = createTopicName(eventType, tenantId, envId);

ProducerRecord<String, String> record = createProducerRecord(event, key, topicName, kafkaHeaders);

return sendEvent(eventType, record);
}

/**
* Sends event to kafka.
*
* @param eventType event type
* @param record producer record
* @return completed future with true if event was sent successfully
*/
public Future<Boolean> sendEvent(String eventType, ProducerRecord<String, String> record) {
LOGGER.debug("Starting to send event to Kafka for eventType: {}", eventType);

Promise<Boolean> promise = Promise.promise();

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<>() {

@Override
public void onSuccess(SendResult<String, String> result) {
Headers headers = result.getProducerRecord().headers();
LOGGER.info("Event with type: {} and correlationId: {} was sent to kafka", eventType, getHeaderValue(headers, "correlationId"));
promise.complete(true);
}

@Override
public void onFailure(Throwable e) {
LOGGER.error("Next chunk has failed with errors", e);
promise.fail(e);
}
});

return promise.future();
}

private String getHeaderValue(Headers headers, String headerName) {
return Optional.ofNullable(headers.lastHeader(headerName)).map(header -> new String(header.value())).orElse(null);
}

protected String createTopicName(String eventType, String tenantId, String envId) {
return KafkaTopicNameHelper.formatTopicName(envId, KafkaTopicNameHelper.getDefaultNameSpace(),
tenantId, eventType);
}

protected Event createEvent(String eventPayload, String eventType, String tenantId, boolean isZipped) throws IOException {
return new Event()
.withId(UUID.randomUUID().toString())
.withEventType(eventType)
.withEventPayload(isZipped ? ZIPArchiver.zip(eventPayload) : eventPayload)
.withEventMetadata(new EventMetadata()
.withTenantId(tenantId)
.withEventTTL(1)
.withPublishedBy(EventHandlingUtil.constructModuleName()));
}


public ProducerRecord<String, String> createProducerRecord(Event event, String key, String topicName, List<KafkaHeader> kafkaHeaders) {
List<Header> headers = kafkaHeaders.stream()
.map(header -> new RecordHeader(header.key(), header.value().getBytes()))
.collect(Collectors.toList());
return new ProducerRecord<>(topicName,null, key, Json.encode(event), headers);
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,24 @@
package org.folio.services;

import static org.folio.services.util.EventHandlingUtil.createEvent;
import static org.folio.services.util.EventHandlingUtil.createProducer;
import static org.folio.services.util.EventHandlingUtil.createProducerRecord;
import static org.folio.services.util.EventHandlingUtil.createTopicName;
import static org.folio.verticle.consumers.util.QMEventTypes.QM_COMPLETED;
import static org.folio.verticle.consumers.util.QMEventTypes.QM_RECORD_UPDATED;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import org.folio.kafka.KafkaConfig;
import java.util.List;

@Service
public class QuickMarcEventProducerServiceImpl implements QuickMarcEventProducerService {

private static final Logger LOGGER = LogManager.getLogger();
private final KafkaConfig kafkaConfig;
private final Map<String, KafkaProducer<String, String>> kafkaProducers = new HashMap<>();

public QuickMarcEventProducerServiceImpl(KafkaConfig kafkaConfig) {
this.kafkaConfig = kafkaConfig;
kafkaProducers.put(QM_RECORD_UPDATED.name(), createProducer(QM_RECORD_UPDATED.name(), kafkaConfig));
kafkaProducers.put(QM_COMPLETED.name(), createProducer(QM_COMPLETED.name(), kafkaConfig));
}
@Value("${ENV:folio}")
private String envId;
@Autowired
private KafkaProducerService kafkaProducerService;

@Override
public Future<Boolean> sendEvent(String eventPayload, String eventType, String key, String tenantId,
Expand All @@ -50,24 +36,10 @@ private Future<Boolean> sendEventInternal(String eventPayload, String eventType,
List<KafkaHeader> kafkaHeaders, boolean isZipped) {
Promise<Boolean> promise = Promise.promise();
try {
var event = createEvent(eventPayload, eventType, tenantId, isZipped);
var topicName = createTopicName(eventType, tenantId, kafkaConfig);
var record = createProducerRecord(event, key, topicName, kafkaHeaders);
var producer = kafkaProducers.get(eventType);
if (producer != null) {
producer.write(record)
.onSuccess(unused -> {
LOGGER.info("Event with type {} was sent to kafka", eventType);
promise.complete(true);
})
.onFailure(throwable -> {
var cause = throwable.getCause();
LOGGER.error("Error while send event {}: {}", eventType, cause);
promise.fail(cause);
});
} else {
promise.fail("No producer found for event: " + eventType);
}
var event = kafkaProducerService.createEvent(eventPayload, eventType, tenantId, isZipped);
var topicName = kafkaProducerService.createTopicName(eventType, tenantId, envId);
var record = kafkaProducerService.createProducerRecord(event, key, topicName, kafkaHeaders);
kafkaProducerService.sendEvent(eventType, record);
} catch (Exception e) {
promise.fail(e);
}
Expand Down
Loading