Welcome to the kafka_connect Puppet module!
- Description
- Setup - The basics of getting started with kafka_connect
- Usage - Configuration options and additional functionality
- Reference - An under-the-hood peek at what the module is doing and how
- Limitations - OS compatibility, etc.
- Development - Guide for contributing to the module
Manages the setup of Kafka Connect.
Supports running through either Confluent package or Apache .tgz archive.
Includes a Type, Provider, and helper class for management of individual KC connectors.
- Manages the KC installation, configuration, and system service.
- Manages the individual state of running connectors, + their config & secret files.
For a basic Kafka Connect system setup with the default settings, declare the kafka_connect
class.
class { 'kafka_connect': }
See the manifest documentation for various examples.
For a typical distributed mode deployment, most of the default settings should be fine. However, a normal setup will involve connecting to a cluster of Kafka brokers and the replication factor config values for storage topics should be increased. Here is a real-world example that also specifies version and includes the Confluent JDBC plugin:
class { 'kafka_connect':
config_storage_replication_factor => 3,
offset_storage_replication_factor => 3,
status_storage_replication_factor => 3,
bootstrap_servers => [
"kafka-01.${facts['networking']['domain']}:9092",
"kafka-02.${facts['networking']['domain']}:9092",
"kafka-03.${facts['networking']['domain']}:9092",
"kafka-04.${facts['networking']['domain']}:9092",
"kafka-05.${facts['networking']['domain']}:9092"
],
confluent_hub_plugins => [ 'confluentinc/kafka-connect-jdbc:10.7.4' ],
package_ensure => '7.5.2-1',
repo_version => '7.5',
}
The helper class is designed to work with connector data defined in hiera.
The main class needs to be included/declared. If only the connector management functionality is desired, there's a flag to exclude the standard setup stuff:
class { 'kafka_connect':
manage_connectors_only => true,
}
The following sections provide examples of specific functionality through hiera data.
The connector config data should be added to hiera with the following layout.
kafka_connect::connectors:
my-connector.json:
name: 'my-kc-connector'
config:
my.config.key: "value"
my.other.config: "other_value"
Simply make changes to the connector config
hash, as needed.
kafka_connect::connectors:
my-connector.json:
name: 'my-kc-connector'
config:
my.config.key: "new_value"
my.other.config: "other_new_value"
There's a parameter, enable_delete
, that by default is set to false and must first be overwritten to support this. Then use the optional ensure
key in the connector data hash and set it to 'absent'.
kafka_connect::enable_delete: true
kafka_connect::connectors:
my-connector.json:
name: 'my-jdbc-connector'
ensure: 'absent'
NOTE: be sure to remove it from the secrets array list as well, if present.
The provider supports ensuring the connector state is either running (default) or paused. Similar to removing, use the ensure
key in the connector data hash and set it to 'paused'.
kafka_connect::connectors:
my-connector.json:
name: 'my-jdbc-connector'
ensure: 'paused'
config:
my.config.key: "value"
my.other.config: "other_value"
Remove the ensure line or set it to 'running' to unpause/resume.
Support for Externalized Secrets is provided through kafka_connect::secrets
. This enables things like database passwords, etc., to be separated from the normal config and just loaded into memory when the connector starts.
The following is a basic DB connection example defined in YAML.
kafka_connect::connectors:
my-connector.json:
name: 'my-jdbc-connector'
config:
connection.url: "jdbc:postgresql://some-host.example.com:5432/db"
connection.user: "my-user"
connection.password: "${file:/etc/kafka-connect/my-jdbc-secret-file.properties:jdbc-sink-connection-password}"
The password is then added, preferably via EYAML, with the file and var names used in the config.
kafka_connect::secrets:
my-jdbc-secret-file.properties:
connectors:
- 'my-jdbc-connector'
key: 'jdbc-sink-connection-password'
value: 'ENC[PKCS7,encrypted-passwd-value]'
To add multiple secrets to a single file, use the kv_data
hash. Continuing with the example above, to instead have individual secret vars for each of the connection configs:
kafka_connect::secrets:
my-jdbc-secret-file.properties:
connectors:
- 'my-jdbc-connector'
kv_data:
jdbc-sink-connection-url: 'ENC[PKCS7,encrypted-url-value]'
jdbc-sink-connection-user: 'ENC[PKCS7,encrypted-user-value]'
jdbc-sink-connection-password: 'ENC[PKCS7,encrypted-passwd-value]'
The connectors
array should contain a list of connector names that reference it in the config. This allows for automatic update/refresh (via REST API restart POST) if the password value is changed.
To later remove unused files, use the optional ensure
hash key and set it to 'absent'.
kafka_connect::secrets:
my-old-jdbc-secret-file.properties:
ensure: 'absent'
In release v2.0.0 the type and provider were renamed from manage_connector
to kc_connector
. Usage and functionality remain the same.
Ensure a connector exists and the running config matches the file config:
kc_connector { 'some-kc-connector-name' :
ensure => 'present',
config_file => '/etc/kafka-connect/some-kc-connector.properties.json',
port => 8084,
}
To pause:
kc_connector { 'some-kc-connector-name' :
connector_state_ensure => 'PAUSED',
}
To remove:
kc_connector { 'some-kc-connector-name' :
ensure => 'absent',
enable_delete => true,
}
Command to remove through the Puppet RAL:
$ puppet resource kc_connector some-kc-connector-name ensure=absent enable_delete=true
Tested with Confluent Platform 7.x and Apache Kafka 3.8.0 on the Operating Systems noted in metadata.json.
In certain situations, for example when a connector is set to absent and the enable_delete
parameter is false (the default), Puppet will report a system state change when actually none has occured (i.e., it lies). There are warnings output along with the change notices in these scenarios.
The project is held at GitHub:
Issue reports and pull requests are welcome.