From c2dd49121c0c46f11c5fb07169261e603953755f Mon Sep 17 00:00:00 2001 From: Pete Savage Date: Wed, 15 Nov 2023 17:24:21 +0000 Subject: [PATCH 1/3] Fix MSK replicas --- controllers/cloud.redhat.com/providers/kafka/msk.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/controllers/cloud.redhat.com/providers/kafka/msk.go b/controllers/cloud.redhat.com/providers/kafka/msk.go index 1bfda00c2..91f53d8ac 100644 --- a/controllers/cloud.redhat.com/providers/kafka/msk.go +++ b/controllers/cloud.redhat.com/providers/kafka/msk.go @@ -200,13 +200,13 @@ type genericConfig map[string]string func (s mskProvider) connectConfig(config *apiextensions.JSON) error { connectConfig := genericConfig{ - "config.storage.replication.factor": "1", + "config.storage.replication.factor": "3", "config.storage.topic": fmt.Sprintf("%v-connect-cluster-configs", s.Env.Name), "connector.client.config.override.policy": "All", "group.id": "connect-cluster", - "offset.storage.replication.factor": "1", + "offset.storage.replication.factor": "3", "offset.storage.topic": fmt.Sprintf("%v-connect-cluster-offsets", s.Env.Name), - "status.storage.replication.factor": "1", + "status.storage.replication.factor": "3", "status.storage.topic": fmt.Sprintf("%v-connect-cluster-status", s.Env.Name), } From b5874a16d7d3ef38e75fad19af275d9f5c4e4416 Mon Sep 17 00:00:00 2001 From: Pete Savage Date: Wed, 15 Nov 2023 18:19:41 +0000 Subject: [PATCH 2/3] Fixed replicas --- .../v1alpha1/clowdenvironment_types.go | 3 +++ controllers/cloud.redhat.com/providers/kafka/msk.go | 12 +++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go b/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go index 1e29c520d..02a5b7e6c 100644 --- a/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go +++ b/apis/cloud.redhat.com/v1alpha1/clowdenvironment_types.go @@ -275,6 +275,9 @@ type KafkaConfig struct { // (Deprecated) (Unused) Suffix string `json:"suffix,omitempty"` + + // Sets the replica count for ephem-msk mode for kafka connect + KafkaConnectReplicaCount int `json:"kafkaConnectReplicaCount,omitempty"` } // DatabaseMode details the mode of operation of the Clowder Database Provider diff --git a/controllers/cloud.redhat.com/providers/kafka/msk.go b/controllers/cloud.redhat.com/providers/kafka/msk.go index 91f53d8ac..54edf0145 100644 --- a/controllers/cloud.redhat.com/providers/kafka/msk.go +++ b/controllers/cloud.redhat.com/providers/kafka/msk.go @@ -3,6 +3,7 @@ package kafka import ( "encoding/json" "fmt" + "strconv" "strings" crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1" @@ -199,14 +200,19 @@ type genericConfig map[string]string func (s mskProvider) connectConfig(config *apiextensions.JSON) error { + replicas := 3 + if s.Env.Spec.Providers.Kafka.KafkaConnectReplicaCount != 0 { + replicas = s.Env.Spec.Providers.Kafka.KafkaConnectReplicaCount + } + connectConfig := genericConfig{ - "config.storage.replication.factor": "3", + "config.storage.replication.factor": strconv.Itoa(replicas), "config.storage.topic": fmt.Sprintf("%v-connect-cluster-configs", s.Env.Name), "connector.client.config.override.policy": "All", "group.id": "connect-cluster", - "offset.storage.replication.factor": "3", + "offset.storage.replication.factor": strconv.Itoa(replicas), "offset.storage.topic": fmt.Sprintf("%v-connect-cluster-offsets", s.Env.Name), - "status.storage.replication.factor": "3", + "status.storage.replication.factor": strconv.Itoa(replicas), "status.storage.topic": fmt.Sprintf("%v-connect-cluster-status", s.Env.Name), } From ab0808766d85816bc1ae86c8bc10a0b9539c9ae0 Mon Sep 17 00:00:00 2001 From: Pete Savage Date: Wed, 15 Nov 2023 18:27:46 +0000 Subject: [PATCH 3/3] Fixed test --- config/crd/bases/cloud.redhat.com_clowdenvironments.yaml | 4 ++++ deploy-mutate.yml | 4 ++++ deploy.yml | 4 ++++ docs/antora/modules/ROOT/pages/api_reference.adoc | 1 + tests/kuttl/test-kafka-msk/03-pods.yaml | 1 + 5 files changed, 14 insertions(+) diff --git a/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml b/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml index eda968386..064475a38 100644 --- a/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml +++ b/config/crd/bases/cloud.redhat.com_clowdenvironments.yaml @@ -316,6 +316,10 @@ spec: enableLegacyStrimzi: description: EnableLegacyStrimzi disables TLS + user auth type: boolean + kafkaConnectReplicaCount: + description: Sets the replica count for ephem-msk mode for + kafka connect + type: integer managedPrefix: description: Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. diff --git a/deploy-mutate.yml b/deploy-mutate.yml index f756a402d..e9568320f 100644 --- a/deploy-mutate.yml +++ b/deploy-mutate.yml @@ -6524,6 +6524,10 @@ objects: enableLegacyStrimzi: description: EnableLegacyStrimzi disables TLS + user auth type: boolean + kafkaConnectReplicaCount: + description: Sets the replica count for ephem-msk mode for + kafka connect + type: integer managedPrefix: description: Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. diff --git a/deploy.yml b/deploy.yml index 997677bf4..d0565e5c5 100644 --- a/deploy.yml +++ b/deploy.yml @@ -6524,6 +6524,10 @@ objects: enableLegacyStrimzi: description: EnableLegacyStrimzi disables TLS + user auth type: boolean + kafkaConnectReplicaCount: + description: Sets the replica count for ephem-msk mode for + kafka connect + type: integer managedPrefix: description: Managed topic prefix for the managed cluster. Only used in (*_managed_*) mode. diff --git a/docs/antora/modules/ROOT/pages/api_reference.adoc b/docs/antora/modules/ROOT/pages/api_reference.adoc index 2e362b546..bfe7aa544 100644 --- a/docs/antora/modules/ROOT/pages/api_reference.adoc +++ b/docs/antora/modules/ROOT/pages/api_reference.adoc @@ -1009,6 +1009,7 @@ KafkaConfig configures the Clowder provider controlling the creation of Kafka in | *`connectNamespace`* __string__ | (Deprecated) The namespace that the Kafka Connect cluster is expected to reside in. This is only used in (*_app-interface_*) and (*_operator_*) modes. | *`connectClusterName`* __string__ | (Deprecated) Defines the kafka connect cluster name that is used in this environment. | *`suffix`* __string__ | (Deprecated) (Unused) +| *`kafkaConnectReplicaCount`* __integer__ | Sets the replica count for ephem-msk mode for kafka connect |=== diff --git a/tests/kuttl/test-kafka-msk/03-pods.yaml b/tests/kuttl/test-kafka-msk/03-pods.yaml index c493fd061..3a857de3c 100644 --- a/tests/kuttl/test-kafka-msk/03-pods.yaml +++ b/tests/kuttl/test-kafka-msk/03-pods.yaml @@ -20,6 +20,7 @@ spec: namespace: test-kafka-msk-sec-source clusterAnnotation: test-kafka-msk topicNamespace: test-kafka-msk-sec-source + kafkaConnectReplicaCount: 1 db: mode: none logging: