Skip to content

Commit

Permalink
kafka queue factories refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Dec 30, 2024
1 parent d087bd2 commit 693db65
Show file tree
Hide file tree
Showing 19 changed files with 57 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,63 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.mqtt.broker.queue.TbQueueAdmin;
import org.thingsboard.mqtt.broker.queue.constants.QueueConstants;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.stats.ConsumerStatsManager;
import org.thingsboard.mqtt.broker.queue.stats.ProducerStatsManager;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;

import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.CLEANUP_POLICY_PROPERTY;
import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.COMPACT_POLICY;

@Slf4j
public abstract class AbstractQueueFactory {

protected final Map<String, String> requiredConsumerProperties = Map.of("auto.offset.reset", "earliest");

@Autowired
protected TbKafkaConsumerSettings consumerSettings;
@Autowired
protected TbKafkaProducerSettings producerSettings;
@Autowired
protected TbQueueAdmin queueAdmin;
@Autowired
protected TbKafkaConsumerStatsService consumerStatsService;

@Autowired(required = false)
protected ProducerStatsManager producerStatsManager;
@Autowired(required = false)
protected ConsumerStatsManager consumerStatsManager;

@Value("${queue.kafka.kafka-prefix:}")
protected String kafkaPrefix;

protected Map<String, String> validateAndConfigurePartitionsForTopic(String topicProperties, String topicName) {
var topicConfigs = QueueUtil.getConfigs(topicProperties);
String configuredPartitions = topicConfigs.get(QueueConstants.PARTITIONS);
if (configuredPartitions != null && Integer.parseInt(configuredPartitions) != 1) {
log.warn("{} topic must have only 1 partition!", topicName);
}
topicConfigs.put(QueueConstants.PARTITIONS, "1");
return topicConfigs;
}

protected Map<String, String> validateAndConfigureCleanupPolicyForTopic(String topicProperties, String topicName) {
var topicConfigs = QueueUtil.getConfigs(topicProperties);
String configuredLogCleanupPolicy = topicConfigs.get(CLEANUP_POLICY_PROPERTY);
if (configuredLogCleanupPolicy != null && !configuredLogCleanupPolicy.equals(COMPACT_POLICY)) {
log.warn("{} clean-up policy should be compact!", topicName);
}
topicConfigs.put(CLEANUP_POLICY_PROPERTY, COMPACT_POLICY);
return topicConfigs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;


import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;


import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;


import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;


import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.thingsboard.mqtt.broker.queue.provider;


import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.constants.QueueConstants;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ApplicationPersistenceMsgKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ApplicationSharedTopicMsgKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.stats.ProducerStatsManager;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;
Expand All @@ -45,27 +39,15 @@ public class KafkaApplicationPersistenceMsgQueueFactory extends AbstractQueueFac

private final Map<String, String> requiredConsumerProperties = Map.of("auto.offset.reset", "latest");

private final TbKafkaConsumerSettings consumerSettings;
private final TbKafkaProducerSettings producerSettings;
private final ApplicationPersistenceMsgKafkaSettings applicationPersistenceMsgSettings;
private final ApplicationSharedTopicMsgKafkaSettings applicationSharedTopicMsgSettings;
private final TbKafkaConsumerStatsService consumerStatsService;

@Autowired(required = false)
private ProducerStatsManager producerStatsManager;

private Map<String, String> topicConfigs;
private Map<String, String> sharedTopicConfigs;

@PostConstruct
public void init() {
this.topicConfigs = QueueUtil.getConfigs(applicationPersistenceMsgSettings.getTopicProperties());
String configuredPartitions = topicConfigs.get(QueueConstants.PARTITIONS);
if (configuredPartitions != null && Integer.parseInt(configuredPartitions) != 1) {
log.warn("Application persistent message topic must have only 1 partition.");
}
topicConfigs.put(QueueConstants.PARTITIONS, "1");

this.topicConfigs = validateAndConfigurePartitionsForTopic(applicationPersistenceMsgSettings.getTopicProperties(), "Application persistent message");
this.sharedTopicConfigs = QueueUtil.getConfigs(applicationSharedTopicMsgSettings.getTopicProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,22 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueAdmin;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ApplicationRemovedEventKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;
import java.util.Properties;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaApplicationRemovedEventQueueFactory extends AbstractQueueFactory implements ApplicationRemovedEventQueueFactory {

private final Map<String, String> requiredConsumerProperties = Map.of("auto.offset.reset", "earliest");

private final TbKafkaConsumerSettings consumerSettings;
private final TbKafkaProducerSettings producerSettings;
private final ApplicationRemovedEventKafkaSettings kafkaSettings;
private final TbQueueAdmin queueAdmin;
private final TbKafkaConsumerStatsService consumerStatsService;

@Override
public TbQueueProducer<TbProtoQueueMsg<QueueProtos.ApplicationRemovedEventProto>> createEventProducer(String serviceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,24 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueAdmin;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ClientSessionEventKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ClientSessionEventResponseKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.stats.ConsumerStatsManager;
import org.thingsboard.mqtt.broker.queue.stats.ProducerStatsManager;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaClientSessionEventQueueFactory extends AbstractQueueFactory implements ClientSessionEventQueueFactory {

private final TbKafkaConsumerSettings consumerSettings;
private final TbKafkaProducerSettings producerSettings;
private final ClientSessionEventKafkaSettings clientSessionEventSettings;
private final ClientSessionEventResponseKafkaSettings clientSessionEventResponseSettings;
private final TbQueueAdmin queueAdmin;
private final TbKafkaConsumerStatsService consumerStatsService;

@Autowired(required = false)
private ProducerStatsManager producerStatsManager;
@Autowired(required = false)
private ConsumerStatsManager consumerStatsManager;

@Override
public TbQueueProducer<TbProtoQueueMsg<QueueProtos.ClientSessionEventProto>> createEventProducer(String serviceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,32 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueAdmin;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ClientSessionKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.stats.ConsumerStatsManager;
import org.thingsboard.mqtt.broker.queue.stats.ProducerStatsManager;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;
import java.util.Properties;

import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.CLEANUP_POLICY_PROPERTY;
import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.COMPACT_POLICY;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaClientSessionQueueFactory extends AbstractQueueFactory implements ClientSessionQueueFactory {

private final Map<String, String> requiredConsumerProperties = Map.of("auto.offset.reset", "earliest");

private final TbKafkaConsumerSettings consumerSettings;
private final TbKafkaProducerSettings producerSettings;
private final ClientSessionKafkaSettings clientSessionSettings;
private final TbQueueAdmin queueAdmin;
private final TbKafkaConsumerStatsService consumerStatsService;

@Autowired(required = false)
private ProducerStatsManager producerStatsManager;
@Autowired(required = false)
private ConsumerStatsManager consumerStatsManager;

private Map<String, String> topicConfigs;

@PostConstruct
public void init() {
this.topicConfigs = QueueUtil.getConfigs(clientSessionSettings.getTopicProperties());
String configuredLogCleanupPolicy = topicConfigs.get(CLEANUP_POLICY_PROPERTY);
if (configuredLogCleanupPolicy != null && !configuredLogCleanupPolicy.equals(COMPACT_POLICY)) {
log.warn("Client session clean-up policy should be " + COMPACT_POLICY + ".");
}
topicConfigs.put(CLEANUP_POLICY_PROPERTY, COMPACT_POLICY);
this.topicConfigs = validateAndConfigureCleanupPolicyForTopic(clientSessionSettings.getTopicProperties(), "Client sessions");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,32 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueAdmin;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.settings.ClientSubscriptionsKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.stats.ConsumerStatsManager;
import org.thingsboard.mqtt.broker.queue.stats.ProducerStatsManager;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;
import java.util.Properties;

import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.CLEANUP_POLICY_PROPERTY;
import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.COMPACT_POLICY;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaClientSubscriptionsQueueFactory extends AbstractQueueFactory implements ClientSubscriptionsQueueFactory {

private final Map<String, String> requiredConsumerProperties = Map.of("auto.offset.reset", "earliest");

private final TbKafkaConsumerSettings consumerSettings;
private final TbKafkaProducerSettings producerSettings;
private final ClientSubscriptionsKafkaSettings clientSubscriptionsSettings;
private final TbQueueAdmin queueAdmin;
private final TbKafkaConsumerStatsService consumerStatsService;

@Autowired(required = false)
private ProducerStatsManager producerStatsManager;
@Autowired(required = false)
private ConsumerStatsManager consumerStatsManager;

private Map<String, String> topicConfigs;

@PostConstruct
public void init() {
this.topicConfigs = QueueUtil.getConfigs(clientSubscriptionsSettings.getTopicProperties());
String configuredLogCleanupPolicy = topicConfigs.get(CLEANUP_POLICY_PROPERTY);
if (configuredLogCleanupPolicy != null && !configuredLogCleanupPolicy.equals(COMPACT_POLICY)) {
log.warn("Client subscriptions clean-up policy should be " + COMPACT_POLICY + ".");
}
topicConfigs.put(CLEANUP_POLICY_PROPERTY, COMPACT_POLICY);
this.topicConfigs = validateAndConfigureCleanupPolicyForTopic(clientSubscriptionsSettings.getTopicProperties(), "Client subscriptions");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,14 @@
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.queue.TbQueueAdmin;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.mqtt.broker.queue.kafka.settings.DevicePersistenceMsgKafkaSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaConsumerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.settings.TbKafkaProducerSettings;
import org.thingsboard.mqtt.broker.queue.kafka.stats.TbKafkaConsumerStatsService;
import org.thingsboard.mqtt.broker.queue.stats.ConsumerStatsManager;
import org.thingsboard.mqtt.broker.queue.stats.ProducerStatsManager;
import org.thingsboard.mqtt.broker.queue.util.QueueUtil;

import java.util.Map;
Expand All @@ -43,18 +36,6 @@
@RequiredArgsConstructor
public class KafkaDevicePersistenceMsgQueueFactory extends AbstractQueueFactory implements DevicePersistenceMsgQueueFactory {

private final Map<String, String> requiredConsumerProperties = Map.of("auto.offset.reset", "earliest");

private final TbKafkaConsumerSettings consumerSettings;
private final TbKafkaProducerSettings producerSettings;
private final TbQueueAdmin queueAdmin;
private final TbKafkaConsumerStatsService consumerStatsService;

@Autowired(required = false)
private ProducerStatsManager producerStatsManager;
@Autowired(required = false)
private ConsumerStatsManager consumerStatsManager;

private final DevicePersistenceMsgKafkaSettings devicePersistenceMsgSettings;

private Map<String, String> topicConfigs;
Expand Down
Loading

0 comments on commit 693db65

Please sign in to comment.