diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index 8b808efc2146..a889702aea29 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -53,6 +53,7 @@ import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider; import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.kerberos.SelfContainedKerberosUserService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -70,6 +71,7 @@ import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; +import static org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE; @@ -146,6 +148,14 @@ public class Kafka3ConnectionService extends AbstractControllerService implement ) .build(); + public static final PropertyDescriptor SELF_CONTAINED_KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-user-service") + .displayName("Kerberos User Service") + .description("Service supporting user authentication with Kerberos") + .identifiesControllerService(SelfContainedKerberosUserService.class) + .required(false) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("Service supporting SSL communication with Kafka brokers") @@ -220,6 +230,8 @@ public class Kafka3ConnectionService extends AbstractControllerService implement SASL_MECHANISM, SASL_USERNAME, SASL_PASSWORD, + SELF_CONTAINED_KERBEROS_USER_SERVICE, + KERBEROS_SERVICE_NAME, SSL_CONTEXT_SERVICE, TRANSACTION_ISOLATION_LEVEL, MAX_POLL_RECORDS, @@ -272,8 +284,7 @@ public KafkaConsumerService getConsumerService(final PollingContext pollingConte consumer.subscribe(topics); } - final Kafka3ConsumerService consumerService = new Kafka3ConsumerService(getLogger(), consumer, subscription, pollingContext.getMaxUncommittedTime()); - return consumerService; + return new Kafka3ConsumerService(getLogger(), consumer, subscription, pollingContext.getMaxUncommittedTime()); } private Subscription createSubscription(final PollingContext pollingContext) {