Skip to content

Commit

Permalink
Upgrade to Kafka 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 14, 2025
1 parent 5a5467a commit 1a5e5d6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.connect.storage.StringConverterConfig.ENCODING_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

Expand Down Expand Up @@ -289,15 +288,21 @@ void shouldConvertAvroKeys(final SerializerFactory<GenericRecord> factory) {
@MethodSource("generateByteArraySerializers")
void shouldConvertByteValues(final SerializerFactory<byte[]> factory) {
final byte[] value = {1, 0};
final Map<String, Object> config = Map.of(ENCODING_CONFIG, "missing");
final Map<String, Object> config = Map.of(
BruteForceConverterConfig.CONVERTER_CONFIG,
List.of(AvroConverter.class.getName(), ByteArrayConverter.class.getName())
);
this.testValueConversion(factory, new ByteArraySerializer(), value, config, new ByteArrayConverter());
}

@ParameterizedTest
@MethodSource("generateByteArraySerializers")
void shouldConvertByteKeys(final SerializerFactory<byte[]> factory) {
final byte[] value = {1, 0};
final Map<String, Object> config = Map.of(ENCODING_CONFIG, "missing");
final Map<String, Object> config = Map.of(
BruteForceConverterConfig.CONVERTER_CONFIG,
List.of(AvroConverter.class.getName(), ByteArrayConverter.class.getName())
);
this.testKeyConversion(factory, new ByteArraySerializer(), value, config, new ByteArrayConverter());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
import org.apache.kafka.common.serialization.Serdes.IntegerSerde;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -360,8 +361,8 @@ void shouldReadGenericAvroKeys(final SerdeFactory<GenericRecord> factory) {
@MethodSource("generateByteArraySerdes")
void shouldReadBytesValues(final SerdeFactory<byte[]> factory) {
final Map<String, Object> properties = new HashMap<>();
// this makes StringDeserializer fail
properties.put("value.deserializer.encoding", "missing");
properties.put(BruteForceSerdeConfig.SERDES_CONFIG,
List.of(GenericAvroSerde.class.getName(), ByteArraySerde.class.getName()));

final byte[] value = {1, 0};
this.testValueTopology(factory, properties, Serdes.ByteArray(), value);
Expand All @@ -371,8 +372,8 @@ void shouldReadBytesValues(final SerdeFactory<byte[]> factory) {
@MethodSource("generateByteArraySerdes")
void shouldReadBytesKeys(final SerdeFactory<byte[]> factory) {
final Map<String, Object> properties = new HashMap<>();
// this makes StringDeserializer fail
properties.put("key.deserializer.encoding", "missing");
properties.put(BruteForceSerdeConfig.SERDES_CONFIG,
List.of(GenericAvroSerde.class.getName(), ByteArraySerde.class.getName()));

final byte[] value = {1, 0};
this.testKeyTopology(factory, properties, Serdes.ByteArray(), value);
Expand Down

0 comments on commit 1a5e5d6

Please sign in to comment.