From d0c5285661ebcff5c9b975926b5d5b1f09d81df4 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 14 Jan 2025 08:02:03 +0100 Subject: [PATCH] Upgrade to Kafka 3.8 --- .github/workflows/build-and-publish.yaml | 3 +- .github/workflows/release.yaml | 2 +- brute-force-connect/build.gradle.kts | 25 +-- .../BruteForceConverterIntegrationTest.java | 191 +++++++++--------- .../kafka/BruteForceConverterTest.java | 27 +-- brute-force-serde/build.gradle.kts | 28 +-- .../kafka/BruteForceDeserializerTest.java | 94 +++++---- build.gradle.kts | 17 +- gradle.properties | 14 +- 9 files changed, 175 insertions(+), 226 deletions(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index ec1d95d..04e4535 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -8,7 +8,7 @@ on: jobs: build-and-publish: name: Java Gradle - uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.40.6 + uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.46.3 with: java-version: 17 secrets: @@ -19,5 +19,4 @@ jobs: signing-password: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} ossrh-username: ${{ secrets.SONATYPE_OSSRH_USERNAME }} ossrh-password: ${{ secrets.SONATYPE_OSSRH_PASSWORD }} - github-username: ${{ secrets.GH_USERNAME }} github-token: ${{ secrets.GH_TOKEN }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 6a30c40..ae9bee5 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -16,7 +16,7 @@ on: jobs: java-gradle-release: name: Java Gradle - uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.40.6 + uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.46.3 with: java-version: 17 release-type: "${{ inputs.release-type }}" diff --git a/brute-force-connect/build.gradle.kts b/brute-force-connect/build.gradle.kts index a82a0b2..b31f7fc 100644 --- a/brute-force-connect/build.gradle.kts +++ b/brute-force-connect/build.gradle.kts @@ -1,10 +1,5 @@ description = "Kafka Connect Converter that deserializes messages of an unknown serialization format" -repositories { - // required for kafka-streams-json-schema-serde dependency - maven(url = "https://jitpack.io") -} - dependencies { api(project(":brute-force-core")) @@ -20,22 +15,24 @@ dependencies { testImplementation(group = "io.confluent", name = "kafka-connect-protobuf-converter", version = confluentVersion) testImplementation(group = "io.confluent", name = "kafka-connect-json-schema-converter", version = confluentVersion) + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion) testImplementation(group = "io.confluent", name = "kafka-streams-json-schema-serde", version = confluentVersion) val testContainersVersion: String by project testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion) - val fluentKafkaVersion = "2.11.3" - testImplementation( - group = "com.bakdata.fluent-kafka-streams-tests", - name = "schema-registry-mock-junit5", - version = fluentKafkaVersion - ) testImplementation(group = "com.bakdata.kafka", name = "large-message-serde", version = largeMessageVersion) - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.5.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - } testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion) + testImplementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) + testImplementation( + group = "org.apache.kafka", + name = "connect-runtime", + version = kafkaVersion, + classifier = "test" + ) + testImplementation(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion, classifier = "test") + testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion) + testImplementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion, classifier = "test") } diff --git a/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterIntegrationTest.java b/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterIntegrationTest.java index b874cc2..0f7983f 100644 --- a/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterIntegrationTest.java +++ b/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterIntegrationTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,36 +25,35 @@ package com.bakdata.kafka; import static com.bakdata.kafka.BruteForceConverterTest.newGenericRecord; -import static net.mguenther.kafka.junit.Wait.delay; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer; +import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Properties; +import java.util.Map; import java.util.concurrent.TimeUnit; -import net.mguenther.kafka.junit.EmbeddedConnectConfig; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.SendKeyValues; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.file.FileStreamSinkConnector; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.containers.localstack.LocalStackContainer.Service; import org.testcontainers.junit.jupiter.Container; @@ -74,6 +73,12 @@ class BruteForceConverterIntegrationTest { @Container private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE) .withServices(Service.S3); + private static final String SCHEMA_REGISTRY_URL = "mock://"; + private static final String BUCKET_NAME = "testbucket"; + private static final String TOPIC = "input"; + private EmbeddedConnectCluster kafkaCluster; + @TempDir + private File outputDir; static S3Client getS3Client() { return S3Client.builder() @@ -96,22 +101,16 @@ private static AwsBasicCredentials getCredentials() { private static URI getEndpointOverride() { return LOCAL_STACK_CONTAINER.getEndpointOverride(Service.S3); } - private static final String BUCKET_NAME = "testbucket"; - private static final String TOPIC = "input"; - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension(); - private EmbeddedKafkaCluster kafkaCluster; - private Path outputFile; - private static Properties createS3BackedProperties() { - final Properties properties = new Properties(); + private static Map createS3BackedProperties() { + final Map properties = new HashMap<>(); final AwsBasicCredentials credentials = getCredentials(); - properties.setProperty(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, + properties.put(AbstractLargeMessageConfig.S3_ENDPOINT_CONFIG, getEndpointOverride().toString()); - properties.setProperty(AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id()); - properties.setProperty(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()); - properties.setProperty(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()); - properties.setProperty(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME)); + properties.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, getRegion().id()); + properties.put(AbstractLargeMessageConfig.S3_ACCESS_KEY_CONFIG, credentials.accessKeyId()); + properties.put(AbstractLargeMessageConfig.S3_SECRET_KEY_CONFIG, credentials.secretAccessKey()); + properties.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, String.format("s3://%s/", BUCKET_NAME)); return properties; } @@ -119,70 +118,21 @@ private static String withValuePrefix(final Object config) { return ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + config; } - @BeforeEach - void setUp() throws IOException { - this.outputFile = Files.createTempFile("test", "temp"); - getS3Client().createBucket(CreateBucketRequest.builder() - .bucket(BUCKET_NAME) - .build()); - this.kafkaCluster = this.createCluster(); - this.kafkaCluster.start(); - } - - @AfterEach - void tearDown() throws IOException { - this.kafkaCluster.stop(); - Files.deleteIfExists(this.outputFile); - } - - @Test - void shouldProcessRecordsCorrectly() throws InterruptedException, IOException { - this.kafkaCluster - .send(SendKeyValues.to(TOPIC, Collections.singletonList(new KeyValue<>("key", "toS3"))) - .withAll(this.createBackedStringProducerProperties(true)).build()); - - this.kafkaCluster.send(SendKeyValues - .to(TOPIC, Collections.singletonList(new KeyValue<>("key", "local"))) - .withAll(this.createBackedStringProducerProperties(false)).build()); - - this.kafkaCluster.send(SendKeyValues - .to(TOPIC, Collections.singletonList(new KeyValue<>("key", "regular"))) - .withAll(this.createStringProducerProperties()).build()); - - this.kafkaCluster.send(SendKeyValues - .to(TOPIC, Collections.singletonList(new KeyValue<>("key", newGenericRecord()))) - .withAll(this.createAvroProducerProperties()).build()); - - // makes sure that all records are processed - delay(2, TimeUnit.SECONDS); - final List output = Files.readAllLines(this.outputFile); - assertThat(output).containsExactly("toS3", "local", "regular", "Struct{id=foo}"); - } - - private EmbeddedKafkaCluster createCluster() { - return EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig.newClusterConfig() - .configure(EmbeddedConnectConfig.kafkaConnect() - .deployConnector(this.config()) - .build()) - .build()); - } - - private Properties config() { - final Properties properties = new Properties(); - properties.setProperty(ConnectorConfig.NAME_CONFIG, "test"); - properties.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink"); - properties.setProperty(SinkConnector.TOPICS_CONFIG, TOPIC); - properties.setProperty(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString()); - properties.setProperty(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - properties.setProperty(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName()); - properties.setProperty(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG), - this.schemaRegistry.getUrl()); + private static Map config(final File file) { + final Map properties = new HashMap<>(); + properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); + properties.put(SinkConnector.TOPICS_CONFIG, TOPIC); + properties.put(FileStreamSinkConnector.FILE_CONFIG, file.getAbsolutePath()); + properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, BruteForceConverter.class.getName()); + properties.put(withValuePrefix(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG), + SCHEMA_REGISTRY_URL); createS3BackedProperties().forEach((key, value) -> properties.put(withValuePrefix(key), value)); return properties; } - private Properties createBackedStringProducerProperties(final boolean shouldBack) { - final Properties properties = this.createBaseProducerProperties(); + private static Map createBackedStringProducerProperties(final boolean shouldBack) { + final Map properties = createBaseProducerProperties(); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LargeMessageSerializer.class); properties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, StringSerde.class); properties.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, shouldBack ? 0 : Integer.MAX_VALUE); @@ -190,24 +140,79 @@ private Properties createBackedStringProducerProperties(final boolean shouldBack return properties; } - private Properties createBaseProducerProperties() { - final Properties properties = new Properties(); + private static Map createBaseProducerProperties() { + final Map properties = new HashMap<>(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList()); return properties; } - private Properties createStringProducerProperties() { - final Properties properties = this.createBaseProducerProperties(); + private static Map createStringProducerProperties() { + final Map properties = createBaseProducerProperties(); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return properties; } - private Properties createAvroProducerProperties() { - final Properties properties = this.createBaseProducerProperties(); + private static Map createAvroProducerProperties() { + final Map properties = createBaseProducerProperties(); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class); - properties.setProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl()); + properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); return properties; } + + @BeforeEach + void setUp() { + getS3Client().createBucket(CreateBucketRequest.builder() + .bucket(BUCKET_NAME) + .build()); + this.kafkaCluster = new EmbeddedConnectCluster.Builder() + .name("test-cluster") + .workerProps(new HashMap<>(Map.of( // map needs to be mutable + // FIXME make compatible with service discovery + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, HYBRID_WARN.toString() + ))) + .build(); + this.kafkaCluster.start(); + } + + @AfterEach + void tearDown() { + this.kafkaCluster.stop(); + } + + @Test + void shouldProcessRecordsCorrectly() throws InterruptedException, IOException { + this.kafkaCluster.kafka().createTopic(TOPIC); + final File file = new File(this.outputDir, "out"); + this.kafkaCluster.configureConnector("test", config(file)); + + try (final Producer producer = this.createProducer( + createBackedStringProducerProperties(true))) { + producer.send(new ProducerRecord<>(TOPIC, "key", "toS3")); + } + + try (final Producer producer = this.createProducer( + createBackedStringProducerProperties(false))) { + producer.send(new ProducerRecord<>(TOPIC, "key", "local")); + } + + try (final Producer producer = this.createProducer(createStringProducerProperties())) { + producer.send(new ProducerRecord<>(TOPIC, "key", "regular")); + } + + try (final Producer producer = this.createProducer(createAvroProducerProperties())) { + producer.send(new ProducerRecord<>(TOPIC, "key", newGenericRecord())); + } + + // makes sure that all records are processed + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + final List output = Files.readAllLines(file.toPath()); + assertThat(output).containsExactly("toS3", "local", "regular", "Struct{id=foo}"); + } + + @SuppressWarnings("unchecked") // Producer always uses byte[] although serializer is customizable + private Producer createProducer(final Map properties) { + return (Producer) this.kafkaCluster.kafka() + .createProducer(properties); + } } diff --git a/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterTest.java b/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterTest.java index 0215dce..305fdd6 100644 --- a/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterTest.java +++ b/brute-force-connect/src/test/java/com/bakdata/kafka/BruteForceConverterTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -30,16 +30,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.DynamicMessage; import io.confluent.connect.avro.AvroConverter; import io.confluent.connect.json.JsonSchemaConverter; import io.confluent.connect.protobuf.ProtobufConverter; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.schemaregistry.protobuf.dynamic.DynamicSchema; import io.confluent.kafka.schemaregistry.protobuf.dynamic.MessageDefinition; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; @@ -73,8 +69,6 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.StringConverter; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -98,11 +92,6 @@ class BruteForceConverterTest { @Container private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE) .withServices(Service.S3); - private final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock(List.of( - new AvroSchemaProvider(), - new JsonSchemaProvider(), - new ProtobufSchemaProvider() - )); static S3Client getS3Client() { return S3Client.builder() @@ -161,7 +150,7 @@ private static DynamicMessage generateDynamicMessage() throws DescriptorValidati final DynamicSchema dynamicSchema = DynamicSchema.newBuilder() .setName("file") .addMessageDefinition(MessageDefinition.newBuilder("Test") - .addField(null, "string", "testId", 1, null, null) + .addField(null, "string", "testId", 1) .build()) .build(); final Descriptor test = dynamicSchema.getMessageDescriptor("Test"); @@ -218,16 +207,6 @@ private static Stream generateSerializers(final Serde baseSerd .map(Arguments::of); } - @BeforeEach - void setUp() { - this.schemaRegistry.start(); - } - - @AfterEach - void tearDown() { - this.schemaRegistry.stop(); - } - @Test void shouldIgnoreNoMatch() { final byte[] value = {1, 0}; @@ -399,7 +378,7 @@ private void testConversion(final SerializerFactory factory, final Serial .bucket(bucket) .build()); final Map config = new HashMap<>(originals); - config.put(SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistry.getUrl()); + config.put(SCHEMA_REGISTRY_URL_CONFIG, "mock://"); config.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, "s3://" + bucket + "/"); config.putAll(getS3EndpointConfig()); diff --git a/brute-force-serde/build.gradle.kts b/brute-force-serde/build.gradle.kts index 1976cbf..ef52fc1 100644 --- a/brute-force-serde/build.gradle.kts +++ b/brute-force-serde/build.gradle.kts @@ -1,16 +1,8 @@ -import com.google.protobuf.gradle.protobuf -import com.google.protobuf.gradle.protoc - description = "Kafka SerDe that deserializes messages of an unknown serialization format" plugins { id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1" - id("com.google.protobuf") version "0.8.18" -} - -repositories { - // required for kafka-streams-json-schema-serde dependency - maven(url = "https://jitpack.io") + id("com.google.protobuf") version "0.9.4" } dependencies { @@ -29,30 +21,18 @@ dependencies { val testContainersVersion: String by project testImplementation(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) testImplementation(group = "org.testcontainers", name = "localstack", version = testContainersVersion) - testImplementation(group = "org.jooq", name = "jool", version = "0.9.14") + testImplementation(group = "org.jooq", name = "jool", version = "0.9.15") - val fluentKafkaVersion = "2.11.3" + val fluentKafkaVersion = "3.0.0" testImplementation( group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = fluentKafkaVersion ) - - testImplementation( - group = "com.bakdata.fluent-kafka-streams-tests", - name = "schema-registry-mock-junit5", - version = fluentKafkaVersion - ) } protobuf { protoc { - artifact = "com.google.protobuf:protoc:3.18.1" - } -} - -sourceSets { - test { - java.srcDirs("build/generated/source/proto/test/java") + artifact = "com.google.protobuf:protoc:3.25.5" } } diff --git a/brute-force-serde/src/test/java/com/bakdata/kafka/BruteForceDeserializerTest.java b/brute-force-serde/src/test/java/com/bakdata/kafka/BruteForceDeserializerTest.java index 8b1517f..e9f16e9 100644 --- a/brute-force-serde/src/test/java/com/bakdata/kafka/BruteForceDeserializerTest.java +++ b/brute-force-serde/src/test/java/com/bakdata/kafka/BruteForceDeserializerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,10 +31,6 @@ import com.bakdata.Id; import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.kafka.Test.ProtobufRecord; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer; import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; @@ -44,7 +40,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.function.Function; import java.util.stream.Stream; import lombok.AllArgsConstructor; @@ -68,7 +63,6 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; -import org.jooq.lambda.Seq; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -93,6 +87,7 @@ class BruteForceDeserializerTest { @Container private static final LocalStackContainer LOCAL_STACK_CONTAINER = new LocalStackContainer(LOCAL_STACK_IMAGE) .withServices(Service.S3); + private static final String SCHEMA_REGISTRY_URL = "mock://"; static S3Client getS3Client() { return S3Client.builder() @@ -163,14 +158,15 @@ private static SerdeFactory configured(final Serde s) { }; } - private static Properties createProperties(final Properties properties) { - properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); - properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + private static Map createProperties(final Map properties) { + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); properties.putAll(getS3EndpointConfig()); return properties; } - private static Topology createValueTopology(final Properties properties, final Class serdeClass) { + private static Topology createValueTopology(final Map properties, + final Class serdeClass) { final StreamsBuilder builder = new StreamsBuilder(); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, IntegerSerde.class); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdeClass); @@ -183,7 +179,8 @@ private static Topology createValueTopology(final Properties properties, final C return builder.build(); } - private static Topology createKeyTopology(final Properties properties, final Class serdeClass) { + private static Topology createKeyTopology(final Map properties, + final Class serdeClass) { final StreamsBuilder builder = new StreamsBuilder(); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdeClass); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, IntegerSerde.class); @@ -240,14 +237,14 @@ void tearDown() { @Test void shouldReadNullKey() { - this.createTopology(properties -> createKeyTopology(properties, StringSerde.class), new Properties()); + this.createTopology(properties -> createKeyTopology(properties, StringSerde.class), new HashMap<>()); this.topology.input() .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Integer()) .add(null, 1); - final List> records = Seq.seq(this.topology.streamOutput() + final List> records = this.topology.streamOutput() .withKeySerde(Serdes.ByteArray()) - .withValueSerde(Serdes.Integer())) + .withValueSerde(Serdes.Integer()) .toList(); assertThat(records) .hasSize(1) @@ -258,14 +255,14 @@ void shouldReadNullKey() { @Test void shouldReadNullValue() { - this.createTopology(properties -> createValueTopology(properties, StringSerde.class), new Properties()); + this.createTopology(properties -> createValueTopology(properties, StringSerde.class), new HashMap<>()); this.topology.input() .withKeySerde(Serdes.Integer()) .withValueSerde(Serdes.String()) .add(1, null); - final List> records = Seq.seq(this.topology.streamOutput() + final List> records = this.topology.streamOutput() .withKeySerde(Serdes.Integer()) - .withValueSerde(Serdes.ByteArray())) + .withValueSerde(Serdes.ByteArray()) .toList(); assertThat(records) .hasSize(1) @@ -277,7 +274,7 @@ void shouldReadNullValue() { @Test void shouldIgnoreNoMatch() { final byte[] value = {1, 0}; - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of()); this.testValueTopology(configured(Serdes.ByteArray()), properties, Serdes.ByteArray(), value); } @@ -285,7 +282,7 @@ void shouldIgnoreNoMatch() { @Test void shouldFailIfIgnoreNoMatchIsDisabled() { final byte[] value = {1, 0}; - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(AbstractBruteForceConfig.IGNORE_NO_MATCH_CONFIG, false); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of(GenericAvroSerde.class.getName())); final SerdeFactory serdeFactory = configured(Serdes.ByteArray()); @@ -304,7 +301,7 @@ void shouldFailForLargeMessageSerdeIfDisabled() { final SerdeFactory factory = createLargeMessageSerde(new GenericAvroSerde(), 0, true); final GenericAvroSerde serde = new GenericAvroSerde(); - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(AbstractBruteForceConfig.LARGE_MESSAGE_ENABLED_CONFIG, false); properties.put(AbstractBruteForceConfig.IGNORE_NO_MATCH_CONFIG, false); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of(GenericAvroSerde.class.getName())); @@ -321,50 +318,50 @@ void shouldFailForLargeMessageSerdeIfDisabled() { @MethodSource("generateStringSerdes") void shouldReadStringValues(final SerdeFactory factory) { final String value = "foo"; - this.testValueTopology(factory, new Properties(), Serdes.String(), value); + this.testValueTopology(factory, new HashMap<>(), Serdes.String(), value); } @ParameterizedTest @MethodSource("generateStringSerdes") void shouldReadStringKeys(final SerdeFactory factory) { final String value = "foo"; - this.testKeyTopology(factory, new Properties(), Serdes.String(), value); + this.testKeyTopology(factory, new HashMap<>(), Serdes.String(), value); } @ParameterizedTest @MethodSource("generateSpecificAvroSerdes") void shouldReadSpecificAvroValues(final SerdeFactory factory) { final SpecificRecord value = Id.newBuilder().setId("").build(); - this.testValueTopology(factory, new Properties(), new SpecificAvroSerde<>(), value); + this.testValueTopology(factory, new HashMap<>(), new SpecificAvroSerde<>(), value); } @ParameterizedTest @MethodSource("generateSpecificAvroSerdes") void shouldReadSpecificAvroKeys(final SerdeFactory factory) { final SpecificRecord value = Id.newBuilder().setId("").build(); - this.testKeyTopology(factory, new Properties(), new SpecificAvroSerde<>(), value); + this.testKeyTopology(factory, new HashMap<>(), new SpecificAvroSerde<>(), value); } @ParameterizedTest @MethodSource("generateGenericAvroSerdes") void shouldReadGenericAvroValues(final SerdeFactory factory) { final GenericRecord value = newGenericRecord(); - this.testValueTopology(factory, new Properties(), new GenericAvroSerde(), value); + this.testValueTopology(factory, new HashMap<>(), new GenericAvroSerde(), value); } @ParameterizedTest @MethodSource("generateGenericAvroSerdes") void shouldReadGenericAvroKeys(final SerdeFactory factory) { final GenericRecord value = newGenericRecord(); - this.testKeyTopology(factory, new Properties(), new GenericAvroSerde(), value); + this.testKeyTopology(factory, new HashMap<>(), new GenericAvroSerde(), value); } @ParameterizedTest @MethodSource("generateByteArraySerdes") void shouldReadBytesValues(final SerdeFactory factory) { - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); // this makes StringDeserializer fail - properties.setProperty("value.deserializer.encoding", "missing"); + properties.put("value.deserializer.encoding", "missing"); final byte[] value = {1, 0}; this.testValueTopology(factory, properties, Serdes.ByteArray(), value); @@ -373,9 +370,9 @@ void shouldReadBytesValues(final SerdeFactory factory) { @ParameterizedTest @MethodSource("generateByteArraySerdes") void shouldReadBytesKeys(final SerdeFactory factory) { - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); // this makes StringDeserializer fail - properties.setProperty("key.deserializer.encoding", "missing"); + properties.put("key.deserializer.encoding", "missing"); final byte[] value = {1, 0}; this.testKeyTopology(factory, properties, Serdes.ByteArray(), value); @@ -385,7 +382,7 @@ void shouldReadBytesKeys(final SerdeFactory factory) { @MethodSource("generateProtobufSerdes") void shouldReadProtobufValues(final SerdeFactory factory) { final ProtobufRecord value = ProtobufRecord.newBuilder().setName("Test").build(); - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of(GenericAvroSerde.class.getName(), KafkaProtobufSerde.class.getName())); this.testValueTopology(factory, properties, new KafkaProtobufSerde<>(ProtobufRecord.class), value); @@ -395,7 +392,7 @@ void shouldReadProtobufValues(final SerdeFactory factory) { @MethodSource("generateProtobufSerdes") void shouldReadProtobufKeys(final SerdeFactory factory) { final ProtobufRecord value = ProtobufRecord.newBuilder().setName("Test").build(); - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of(GenericAvroSerde.class.getName(), KafkaProtobufSerde.class.getName())); this.testKeyTopology(factory, properties, new KafkaProtobufSerde<>(ProtobufRecord.class), value); @@ -405,7 +402,7 @@ void shouldReadProtobufKeys(final SerdeFactory factory) { @MethodSource("generateJsonSerdes") void shouldReadJsonValues(final SerdeFactory factory) { final JsonTestRecord value = new JsonTestRecord("test"); - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of(GenericAvroSerde.class.getName(), KafkaJsonSchemaSerde.class.getName())); this.testValueTopology(factory, properties, new KafkaJsonSchemaSerde<>(JsonTestRecord.class), value); @@ -415,13 +412,14 @@ void shouldReadJsonValues(final SerdeFactory factory) { @MethodSource("generateJsonSerdes") void shouldReadJsonKeys(final SerdeFactory factory) { final JsonTestRecord value = new JsonTestRecord("test"); - final Properties properties = new Properties(); + final Map properties = new HashMap<>(); properties.put(BruteForceSerdeConfig.SERDES_CONFIG, List.of(GenericAvroSerde.class.getName(), KafkaJsonSchemaSerde.class.getName())); this.testKeyTopology(factory, properties, new KafkaJsonSchemaSerde<>(JsonTestRecord.class), value); } - private void testValueTopology(final SerdeFactory factory, final Properties properties, final Serde serde, + private void testValueTopology(final SerdeFactory factory, final Map properties, + final Serde serde, final T value) { final String bucket = "bucket"; getS3Client().createBucket(CreateBucketRequest.builder() @@ -430,7 +428,7 @@ private void testValueTopology(final SerdeFactory factory, final Properti this.createTopology(p -> createValueTopology(p, serde.getClass()), properties); final Map config = Map.of( - SCHEMA_REGISTRY_URL_CONFIG, this.topology.getSchemaRegistryUrl(), + SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL, AbstractLargeMessageConfig.BASE_PATH_CONFIG, "s3://" + bucket + "/" ); final Serde inputSerde = factory.create(config, false); @@ -440,9 +438,9 @@ private void testValueTopology(final SerdeFactory factory, final Properti .add(1, value); serde.configure(config, false); - final List> records = Seq.seq(this.topology.streamOutput() + final List> records = this.topology.streamOutput() .withKeySerde(Serdes.Integer()) - .withValueSerde(serde)) + .withValueSerde(serde) .toList(); assertThat(records) .hasSize(1) @@ -450,7 +448,8 @@ private void testValueTopology(final SerdeFactory factory, final Properti .containsExactlyInAnyOrder(value); } - private void testKeyTopology(final SerdeFactory factory, final Properties properties, final Serde serde, + private void testKeyTopology(final SerdeFactory factory, final Map properties, + final Serde serde, final T value) { final String bucket = "bucket"; getS3Client().createBucket(CreateBucketRequest.builder() @@ -459,7 +458,7 @@ private void testKeyTopology(final SerdeFactory factory, final Properties this.createTopology(p -> createKeyTopology(p, serde.getClass()), properties); final Map config = Map.of( - SCHEMA_REGISTRY_URL_CONFIG, this.topology.getSchemaRegistryUrl(), + SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL, AbstractLargeMessageConfig.BASE_PATH_CONFIG, "s3://" + bucket + "/" ); final Serde inputSerde = factory.create(config, true); @@ -469,9 +468,9 @@ private void testKeyTopology(final SerdeFactory factory, final Properties .add(value, 1); serde.configure(config, true); - final List> records = Seq.seq(this.topology.streamOutput() + final List> records = this.topology.streamOutput() .withKeySerde(serde) - .withValueSerde(Serdes.Integer())) + .withValueSerde(Serdes.Integer()) .toList(); assertThat(records) .hasSize(1) @@ -479,12 +478,9 @@ private void testKeyTopology(final SerdeFactory factory, final Properties .containsExactlyInAnyOrder(value); } - private void createTopology(final Function topologyFactory, - final Properties properties) { - this.topology = new TestTopology<>(topologyFactory, createProperties(properties)) - .withSchemaRegistryMock(new SchemaRegistryMock(List.of( - new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider() - ))); + private void createTopology(final Function, ? extends Topology> topologyFactory, + final Map properties) { + this.topology = new TestTopology<>(topologyFactory, createProperties(properties)); this.topology.start(); } diff --git a/build.gradle.kts b/build.gradle.kts index 2c470eb..10f5047 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,8 +1,7 @@ plugins { - id("net.researchgate.release") version "3.0.2" - id("com.bakdata.sonar") version "1.1.17" - id("com.bakdata.sonatype") version "1.1.14" - id("org.hildan.github.changelog") version "2.2.0" + id("com.bakdata.release") version "1.4.0" + id("com.bakdata.sonar") version "1.4.0" + id("com.bakdata.sonatype") version "1.4.1" id("io.freefair.lombok") version "8.4" } @@ -17,6 +16,7 @@ allprojects { repositories { mavenCentral() maven(url = "https://packages.confluent.io/maven/") + maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots") } } @@ -33,13 +33,6 @@ configure { } } -configure { - githubUser = "bakdata" - githubRepository = "kafka-brute-force-serde" - futureVersionTag = findProperty("changelog.releaseVersion")?.toString() - sinceTag = findProperty("changelog.sinceTag")?.toString() -} - subprojects { apply(plugin = "java-library") apply(plugin = "io.freefair.lombok") @@ -58,7 +51,7 @@ subprojects { "testImplementation"(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion) "testImplementation"(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion) "testRuntimeOnly"(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) - "testImplementation"(group = "org.assertj", name = "assertj-core", version = "3.25.1") + "testImplementation"(group = "org.assertj", name = "assertj-core", version = "3.27.2") val log4jVersion: String by project "testImplementation"(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) diff --git a/gradle.properties b/gradle.properties index 9b9720e..2e2d191 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ version=1.2.2-SNAPSHOT org.gradle.caching=true org.gradle.parallel=true -kafkaVersion=3.5.2 -confluentVersion=7.5.1 -junitVersion=5.10.1 -slf4jVersion=2.0.10 -largeMessageVersion=2.6.0 -log4jVersion=2.22.1 -testContainersVersion=1.19.3 +kafkaVersion=3.8.1 +confluentVersion=7.8.0 +junitVersion=5.11.4 +slf4jVersion=2.0.16 +largeMessageVersion=2.9.1-SNAPSHOT +log4jVersion=2.24.3 +testContainersVersion=1.20.4