This guide explains how Spring Kafka is used in devonfw applications. It focuses on aspects which are special to devonfw if you want to learn about spring-kafka you should adhere to springs references documentation.
There is an example of simple Kafka implementation in the devon4j-kafka-employeeapp.
The devon4j-kafka library consists of:
-
Custom message processor with retry pattern
-
Monitoring support
-
Tracing support
-
Logging support
-
Configuration support for Kafka Producers, Consumers, brave tracer and message retry processing including defaults
To use devon4j-kafka you have to add required starter dependencies which is "starter-kafka-sender" or "starter-kafka-receiver" from devon4j. These 2 starters are responsible for taking care of the required spring configuration. If you only want to produce messages "starter-kafka-sender" is enough. For consuming messages you need "starter-kafka-receiver" which also includes "starter-kafka-sender".
To use devon4j-kafka message sender add the below dependency:
<dependency>
<groupId>com.devonfw.java.starters</groupId>
<artifactId>devon4j-starter-kafka-sender</artifactId>
</dependency>
It includes the Tracer implementations from Spring cloud sleuth.
To use the devon4j-kafka message receiver configurations, loggers and message retry processor for processing message, add the below dependency:
<dependency>
<groupId>com.devonfw.java.starters</groupId>
<artifactId>devon4j-starter-kafka-receiver</artifactId>
</dependency>
As written before kafka-producer and listener-specific configuration is done via properties classes. These classes provide useful defaults, at a minimum the following parameters have to be configured:
messaging.kafka.common.bootstrap-servers=kafka-broker:9092
messaging.kafka.consumer.group-id=<e.g. application name>
messaging.kafka.listener.container.concurrency=<Number of listener threads for each listener container>
All the configuration beans for devon4j-kafka are annotated with @ConfigurationProperties
and use common prefixes to read the property values from application.properties
or application.yml
.
Example:
@Bean
@ConfigurationProperties(prefix = "messaging.kafka.producer")
public KafkaProducerProperties messageKafkaProducerProperties() {
return new KafkaProducerProperties();
}
For producer and consumer the prefixes are messaging.kafka.producer…
and message.kafka.consumer…
and for retry the prefix is messaging.retry…
See devon4j-kafka-employeeapp’s application.properties file for an example.
We use the same properties defined by Apache Kafka or Spring Kafka. They are simply "mapped" to the above prefixes to allow easy access from your application properties. The java docs provided in each of the devon4j-kafka property classes which explains their use and what value has to be passed.
-
The
Devon4j-Kafka-Producer properties
are defined fromkafka prodcuer config
. -
The
Devon4j-Kafka-Consumer properties
are defined fromkafka consumer config
. -
The
Devon4j-Kafka-Listener container properties
are defined fromSpring kafka listener properties
.
For better managing of several Kafka topics in your application portfolio we strongly advice to introduce a naming scheme for your topics. The schema may depend on the actual usage pattern of Kafka. For context where Kafka is used in a 1-to-1-communication-scheme (not publish/subscribe) the following schema has been proven useful in practice:
<application name>-<service name>-<version>-<service-operation>
To keep things easy and prevent problems we suggest to use only small letters, hyphens but no other special characters.
As mentioned above the 'starter-kafka-sender' is required to be added as a dependency to use MessageSender from Kafka.
<dependency>
<groupId>com.devonfw.java.starters</groupId>
<artifactId>devon4j-starter-kafka-sender</artifactId>
</dependency>
The following example shows how to use MessageSender and its method to send messages to Kafka broker:
Example:
@Inject
private MessageSender messageSender;
private ProducerRecord<K,V> producerRecord;
public void sendMessageToKafka(){
producerRecord=new ProducerRecord<>("topic-name","message");
messageSender.sendMessage(this.producerRecord);
//Alternative
messageSender.sendMessageAndWait(this.producerRecord,10);
}
There are multiple methods available from MessageSender of devon4j-kafka. The ProducerListener will log the message sent to the Kafka broker.
To receive messages you have to define a listener. The listener is normally part of the service layer.
Import the following starter-kafka-receiver
dependency to use the listener configurations and loggers from devon4j-kafka.
<dependency>
<groupId>com.devonfw.java.starters</groupId>
<artifactId>devon4j-starter-kafka-receiver</artifactId>
</dependency>
The listener is defined by implementing and annotating a method like in the following example:
@KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}", containerFactory = "kafkaListenerContainerFactory")
public void consumer(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment acknowledgment) {
//user operation
//To acknowledge listener after processing
acknowledgement.acknowledge();
}
The group id can be mentioned in application.properties
as listener properties.
messaging.kafka.consumer.groupId=default
If there are multiple topics and multiple listeners then we suggest to specify the topic names directly on each listener instead reading from the property file.
The container factory mentioned in the @KafkaListener
is provided in the KafkaListenerContainerProperties.java to create a default container factory with acknowledgement.
The default ack-mode is manual_immediate
. It can be overridden by below example:
messaging.kafka.listener.container.ackMode=<ack-mode>
The other ack-mode values can be referred from here.
The retry pattern in devon4j-kafka is invoked when a particular exception(described by user in application.properties file) is thrown while processing the consumed message and it is configured in application.properties file. The general idea is to separate messages which could not be processed into dedicated retry-topics to allow fine control on how processing of the messages is retried and to not block newly arriving messages. Let us see more about handling retry in the below topics.
The retry pattern is included in the starter dependency of "starter-kafka-receiver".
The retryPattern method is used by calling the method processMessageWithRetry(ConsumerRecord<K, V> consumerRecord,MessageProcessor<K, V> processor). Please find the below Example:
@Inject
private MessageRetryOperations<K, V> messageRetryOperations;
@Inject
private DeleteEmployeeMessageProcessor<K, V> deleteEmployeeMessageProcessor;
@KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}",containerFactory = "kafkaListenerContainerFactory")
public void consumer(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
this.messageRetryOperations.processMessageWithRetry(consumerRecord, this.deleteEmployeeMessageProcessor);
// Acknowledge the listener.
acknowledgment.acknowledge();
}
The implementation for MessageProcessor from devon4j-kafka is required to provide the implementation to process the ConsumedRecord from Kafka broker. The implementation for MessageProcessor interface can look as below example:
import com.devonfw.module.kafka.common.messaging.retry.api.client.MessageProcessor;
@Named
public class DeleteEmployeeMessageProcessor<K, V> implements MessageProcessor<K, V> {
@Override
public void processMessage(ConsumerRecord<K, V> message) {
//process message
}
}
It works as follows:
-
The application gets a message from the topic.
-
During the processing of the message an error occurs, the message will be written to the redelivery topic.
-
The message is acknowledged in the topic.
-
The message will be processed from the re-delivery topic after a delay.
-
Processing of the message fails again. It retires until the retry count gets over.
-
When the retry fails in all the retry then the message is logged and payload in the ProducerRecord is deleted for log compaction which is explained below.
The following properties should be added in the application.properties
or application.yml
file.
The retry pattern in devon4j-kafka will perform for specific topic of a message. So its mandatory to specify the properties for each topic. Below properties are example,
# Back off policy properties for employeeapp-employee-v1-delete
messaging.retry.back-off-policy.retryReEnqueueDelay.employeeapp-employee-v1-delete=1000
messaging.retry.back-off-policy.retryDelay.employeeapp-employee-v1-delete=600000
messaging.retry.back-off-policy.retryDelayMultiplier.employeeapp-employee-v1-delete=1.0
messaging.retry.back-off-policy.retryMaxDelay.employeeapp-employee-v1-delete=600000
messaging.retry.back-off-policy.retryCount.employeeapp-employee-v1-delete=2
# Retry policy properties for employeeapp-employee-v1-delete
messaging.retry.retry-policy.retryPeriod.employeeapp-employee-v1-delete=1800
messaging.retry.retry-policy.retryableExceptions.employeeapp-employee-v1-delete=<Class names of exceptions for which a retry should be performed>
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.employeeapp-employee-v1-delete=true
# Back off policy properties for employeeapp-employee-v1-add
messaging.retry.back-off-policy.retryReEnqueueDelay.employeeapp-employee-v1-add=1000
messaging.retry.back-off-policy.retryDelay.employeeapp-employee-v1-add=600000
messaging.retry.back-off-policy.retryDelayMultiplier.employeeapp-employee-v1-add=2.0
messaging.retry.back-off-policy.retryMaxDelay.employeeapp-employee-v1-add=600000
messaging.retry.back-off-policy.retryCount.employeeapp-employee-v1-add=4
# Retry policy properties for employeeapp-employee-v1-add
messaging.retry.retry-policy.retryPeriod.employeeapp-employee-v1-add=3000
messaging.retry.retry-policy.retryableExceptions.employeeapp-employee-v1-add=<Class names of exceptions for which a retry should be performed>
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.employeeapp-employee-v1-add=true
If you notice the above properties, the retry-policy
and back-off policy
properties are repeated twice as i have 2 topics for the retry to be performed with different level of values. The topic name should be added at the last of attribute.
So, the retry will be performed for each topic according to their configuration values.
If you want to provide same/default values for all the topics, then its required to add default
in the place of topic on the above properties example.
For example,
# Default back off policy properties
messaging.retry.back-off-policy.retryReEnqueueDelay.default=1000
messaging.retry.back-off-policy.retryDelay.default=600000
messaging.retry.back-off-policy.retryDelayMultiplier.default=1.0
messaging.retry.back-off-policy.retryMaxDelay.default=600000
messaging.retry.back-off-policy.retryCount.default=2
# Default retry policy properties
messaging.retry.retry-policy.retryPeriod.default=1800
messaging.retry.retry-policy.retryableExceptions.default=<Class names of exceptions for which a retry should be performed>
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.default=true
By giving properties like above, the same values will be passed for all the topics and the way of processing retry for all the topics are same.
All these above property values are mapped to the classes DefaultBackOffPolicyProperties.java
and DefaultRetryPolicyProperties.java
and configured by the class MessageDefaultRetryConfig.java
.
The MessageRetryContext in devon kafka is used to perform the retry pattern with the properties from DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.
The 2 main properties of MessageRetryContext are nextRetry and retryUntil which is a Instant
date format and it is calculated internally using the properties given in DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.
You may change the behavior of this date calculation by providing your own implementation classes for MessageBackOffPolicy.java
and MessageRetryPolicy.java
.
The naming convention for retry topic is the same topic name which you have given to publish the message and we add suffix -retry
to it once it is consumed and given to process with retry.
If there is no topic found in the consumed record the default retry topic will be added which is default-message-retry
.
Devon4j-kafka uses a separate retry topic for each topic where retries occur. By default this topic is named <topic name>-retry
. You may change this behavior by providing your own implementation for DefaultKafkaRecordSupport
which is a default implementation from devon4j-kafka for KafkaRecordSupport
.
Devon4j-kafka enqueues a new message for each retry attempt. It is very important to configure your retry tropics with log compaction enabled. More or less simplified, if log compaction is enabled Kafka keeps only one message per message key. Since each retry message has the same key, in fact only one message per retry attempt is stored. After the last retry attempt the message payload is removed from the message so, you do not keep unnecessary data in your topics.
Per default when the retry fails with final attempt we just log the message and delete the payload of ProducerRecord which comes to proceed the retry pattern.
You can change this behavior by providing the implementation class for the interface MessageRetryHandler.java
which has two methods retryTimeout
and retryFailedFinal
.
We leverage Spring Cloud Sleuth for tracing in devon4j-kafka This is used to trace the asynchronous process of Kafka producing and consuming. In an asynchronous process it is important to maintain an id which will be same for all asynchronous process. However, devon uses its own correlation-id(UUID) to track the process. But devon4j-kafka uses an additional tracing protocol which is Brave Tracer.
This is a part of both starter dependencies starter-kafka-receiver
and starter-kafka-sender
.
There are 2 important properties which will be automatically logged which are trace-id and spain-id. The trace-id is same for all the asynchronous process and span-id is unique for each asynchronous process.
We inject the trace-id and span-id in to the ProducerRecord headers which comes to publish into the Kafka broker.
It’s injected in the headers with the key traceId
for trace-id and spanId
for span-id.
Along with these, the correlation-id(UUID) is also injected in the headers of record with the key correlationId
.
So, when you consume record from Kafka broker, these values can be found in the consumed record’s headers with these keys.
So, it is very helpful to track the asynchronous process of consuming the messages.
devon4j-kafka provides multiple support classes to log the published message and the consumed message.
* The class ProducerLoggingListener
which implements ProducerListener<K,V> from Spring Kafka uses to log the message as soon as it is published in the Kafka broker.
-
The aspect class
MessageListenerLoggingAspect
which is annotated with@Aspect
and has a methodlogMessageprocessing
which is annotated with@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)&&args(kafkaRecord,..)")
is used to listen to the classes which is annotated with@KafkaListener
and logs the message as soon as it is consumed. -
The class
MessageLoggingSupport
has multiple methods to log different types of events like MessageReceived, MessageSent, MessageProcessed, MessageNotProcessed. -
The class
LoggingErrorHandler
which implementsErrorHandler
from spring-kafka which logs the message when an error occurred while consuming messages. You may change this behavior by creating your own implementation class for the ErrorHandler.
The spring config class MessageCommonConfig automatically provides a spring health indicator bean for kafka if
the property endpoints. The health indicator will check for all topics listed in messaging.kafka.health.topics-tocheck
if a leader is available. If this property is missing only the broker connection will be checked. The timeout for
the check (default 60s) maybe changed via the property messaging.kafka.health.timeout
.
If an application uses multiple broker(-clusters) for each broker(-cluster) a dedicated health indicator bean has to be
configured in the spring config.
The properties for the devon kafka health check should be given like below example:
management.endpoint.health.enabled=<true or false>
messaging.kafka.health.timeout=<the health check timeout seconds>
messaging.kafka.health.topicsToCheck=employeeapp-employee-v1-delete,employeeapp-employee-v1-add
These properties are provided with default values except the topicsToCheck and health check will do happen only when the property is management.endpoint.health.enabled=true
.
devon4j-kafka supports authentication via JSON Web Tokens (JWT) out-of-the-box. To use it add a dependency to the devon4j-starter-security-jwt:
<dependency>
<groupId>com.devonfw.java.starters</groupId>
<artifactId>devon4j-starter-security-jwt</artifactId>
</dependency>
The authentication via JWT needs some configuration, e.g. a keystore to verify the token signature. This is explained in the JWT documentation.
To secure a message listener with jwt add the @JwtAuthentication
:
@JwtAuthentication
@KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}")
public void consumer(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
...
}
}
With this annotation in-place each message will be checked for a valid JWT in a message header with the name Authorization
. If a valid annotation is found the spring security context will be initialized with the user roles and "normal" authorization e.g. with @RolesAllowed
may be used. This is also demonstrated in the kafka sample application.
Apart from the use of Kafka as "communication channel", it is sometimes helpful to use Kafka internally to do parallel processing:
This examples shows a payment service which allows to submit a list of receipt IDs for payment. We assume that the payment itself takes a long time and should be done asynchronously and in parallel. The general idea is to put a message for each receipt to pay into a topic. This is done in the use case implementation in a first step, if a rest call arrives. Also part of the use case is a listener which consumes the messages. For each message (e.g. payment to do) a processor is called, which actually does the payment via the use case. Since Kafka supports concurrency for the listeners easily the payment will also be done in parallel. All features of devon4j-kafka, like retry handling could also be used.