Skip to content

Commit

Permalink
Upgrade to Kafka 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 14, 2025
1 parent 829aa13 commit d0c5285
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 226 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.40.6
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.46.3
with:
java-version: 17
secrets:
Expand All @@ -19,5 +19,4 @@ jobs:
signing-password: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
ossrh-username: ${{ secrets.SONATYPE_OSSRH_USERNAME }}
ossrh-password: ${{ secrets.SONATYPE_OSSRH_PASSWORD }}
github-username: ${{ secrets.GH_USERNAME }}
github-token: ${{ secrets.GH_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
jobs:
java-gradle-release:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.40.6
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.46.3
with:
java-version: 17
release-type: "${{ inputs.release-type }}"
Expand Down
25 changes: 11 additions & 14 deletions brute-force-connect/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
description = "Kafka Connect Converter that deserializes messages of an unknown serialization format"

repositories {
// required for kafka-streams-json-schema-serde dependency
maven(url = "https://jitpack.io")
}

dependencies {
api(project(":brute-force-core"))

Expand All @@ -20,22 +15,24 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-connect-protobuf-converter", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-connect-json-schema-converter", version = confluentVersion)

testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-json-schema-serde", version = confluentVersion)

val testContainersVersion: String by project
testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion)
val fluentKafkaVersion = "2.11.3"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)

testImplementation(group = "com.bakdata.kafka", name = "large-message-serde", version = largeMessageVersion)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.5.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion)
testImplementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion)
testImplementation(
group = "org.apache.kafka",
name = "connect-runtime",
version = kafkaVersion,
classifier = "test"
)
testImplementation(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion, classifier = "test")
testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion)
testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion, classifier = "test")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -25,36 +25,35 @@
package com.bakdata.kafka;

import static com.bakdata.kafka.BruteForceConverterTest.newGenericRecord;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedConnectConfig;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.SendKeyValues;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.file.FileStreamSinkConnector;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
Expand All @@ -74,6 +73,12 @@ class BruteForceConverterIntegrationTest {
@Container
private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE)
.withServices(Service.S3);
private static final String SCHEMA_REGISTRY_URL = "mock://";
private static final String BUCKET_NAME = "testbucket";
private static final String TOPIC = "input";
private EmbeddedConnectCluster kafkaCluster;
@TempDir
private File outputDir;

static S3Client getS3Client() {
return S3Client.builder()
Expand All @@ -96,118 +101,118 @@ private static AwsBasicCredentials getCredentials() {
private static URI getEndpointOverride() {
return LOCAL_STACK_CONTAINER.getEndpointOverride(Service.S3);
}
private static final String BUCKET_NAME = "testbucket";
private static final String TOPIC = "input";
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension();
private EmbeddedKafkaCluster kafkaCluster;
private Path outputFile;

private static Properties createS3BackedProperties() {
final Properties properties = new Properties();
private static Map<String, String> createS3BackedProperties() {
final Map<String, String> properties = new HashMap<>();
final AwsBasicCredentials credentials = getCredentials();
properties.setProperty(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG,
properties.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG,
getEndpointOverride().toString());
properties.setProperty(AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id());
properties.setProperty(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId());
properties.setProperty(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey());
properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME));
properties.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id());
properties.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId());
properties.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey());
properties.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME));
return properties;
}

private static String withValuePrefix(final Object config) {
return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + config;
}

@BeforeEach
void setUp() throws IOException {
this.outputFile = Files.createTempFile("test", "temp");
getS3Client().createBucket(CreateBucketRequest.builder()
.bucket(BUCKET_NAME)
.build());
this.kafkaCluster = this.createCluster();
this.kafkaCluster.start();
}

@AfterEach
void tearDown() throws IOException {
this.kafkaCluster.stop();
Files.deleteIfExists(this.outputFile);
}

@Test
void shouldProcessRecordsCorrectly() throws InterruptedException, IOException {
this.kafkaCluster
.send(SendKeyValues.to(TOPIC, Collections.singletonList(new KeyValue<>("key", "toS3")))
.withAll(this.createBackedStringProducerProperties(true)).build());

this.kafkaCluster.send(SendKeyValues
.to(TOPIC, Collections.singletonList(new KeyValue<>("key", "local")))
.withAll(this.createBackedStringProducerProperties(false)).build());

this.kafkaCluster.send(SendKeyValues
.to(TOPIC, Collections.singletonList(new KeyValue<>("key", "regular")))
.withAll(this.createStringProducerProperties()).build());

this.kafkaCluster.send(SendKeyValues
.to(TOPIC, Collections.singletonList(new KeyValue<>("key", newGenericRecord())))
.withAll(this.createAvroProducerProperties()).build());

// makes sure that all records are processed
delay(2, TimeUnit.SECONDS);
final List<String> output = Files.readAllLines(this.outputFile);
assertThat(output).containsExactly("toS3", "local", "regular", "Struct{id=foo}");
}

private EmbeddedKafkaCluster createCluster() {
return EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig.newClusterConfig()
.configure(EmbeddedConnectConfig.kafkaConnect()
.deployConnector(this.config())
.build())
.build());
}

private Properties config() {
final Properties properties = new Properties();
properties.setProperty(ConnectorConfig.NAME_CONFIG, "test");
properties.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink");
properties.setProperty(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.setProperty(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString());
properties.setProperty(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.setProperty(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName());
properties.setProperty(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
this.schemaRegistry.getUrl());
private static Map<String, String> config(final File file) {
final Map<String, String> properties = new HashMap<>();
properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName());
properties.put(SinkConnector.TOPICS_CONFIG, TOPIC);
properties.put(FileStreamSinkConnector.FILE_CONFIG, file.getAbsolutePath());
properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName());
properties.put(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
SCHEMA_REGISTRY_URL);
createS3BackedProperties().forEach((key, value) -> properties.put(withValuePrefix(key), value));
return properties;
}

private Properties createBackedStringProducerProperties(final boolean shouldBack) {
final Properties properties = this.createBaseProducerProperties();
private static Map<String, Object> createBackedStringProducerProperties(final boolean shouldBack) {
final Map<String, Object> properties = createBaseProducerProperties();
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class);
properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, shouldBack ? 0 : Integer.MAX_VALUE);
properties.putAll(createS3BackedProperties());
return properties;
}

private Properties createBaseProducerProperties() {
final Properties properties = new Properties();
private static Map<String, Object> createBaseProducerProperties() {
final Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList());
return properties;
}

private Properties createStringProducerProperties() {
final Properties properties = this.createBaseProducerProperties();
private static Map<String, Object> createStringProducerProperties() {
final Map<String, Object> properties = createBaseProducerProperties();
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return properties;
}

private Properties createAvroProducerProperties() {
final Properties properties = this.createBaseProducerProperties();
private static Map<String, Object> createAvroProducerProperties() {
final Map<String, Object> properties = createBaseProducerProperties();
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
properties.setProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl());
properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
return properties;
}

@BeforeEach
void setUp() {
getS3Client().createBucket(CreateBucketRequest.builder()
.bucket(BUCKET_NAME)
.build());
this.kafkaCluster = new EmbeddedConnectCluster.Builder()
.name("test-cluster")
.workerProps(new HashMap<>(Map.of( // map needs to be mutable
// FIXME make compatible with service discovery
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, HYBRID_WARN.toString()
)))
.build();
this.kafkaCluster.start();
}

@AfterEach
void tearDown() {
this.kafkaCluster.stop();
}

@Test
void shouldProcessRecordsCorrectly() throws InterruptedException, IOException {
this.kafkaCluster.kafka().createTopic(TOPIC);
final File file = new File(this.outputDir, "out");
this.kafkaCluster.configureConnector("test", config(file));

try (final Producer<String, String> producer = this.createProducer(
createBackedStringProducerProperties(true))) {
producer.send(new ProducerRecord<>(TOPIC, "key", "toS3"));
}

try (final Producer<String, String> producer = this.createProducer(
createBackedStringProducerProperties(false))) {
producer.send(new ProducerRecord<>(TOPIC, "key", "local"));
}

try (final Producer<String, String> producer = this.createProducer(createStringProducerProperties())) {
producer.send(new ProducerRecord<>(TOPIC, "key", "regular"));
}

try (final Producer<String, GenericRecord> producer = this.createProducer(createAvroProducerProperties())) {
producer.send(new ProducerRecord<>(TOPIC, "key", newGenericRecord()));
}

// makes sure that all records are processed
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
final List<String> output = Files.readAllLines(file.toPath());
assertThat(output).containsExactly("toS3", "local", "regular", "Struct{id=foo}");
}

@SuppressWarnings("unchecked") // Producer always uses byte[] although serializer is customizable
private <K, V> Producer<K, V> createProducer(final Map<String, Object> properties) {
return (Producer<K, V>) this.kafkaCluster.kafka()
.createProducer(properties);
}
}

Loading

0 comments on commit d0c5285

Please sign in to comment.