From 013a98f3c6898c9ee18edcf3999df7dd2136757e Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 24 Jan 2025 17:18:26 +0100 Subject: [PATCH 1/2] KAFKA-18616; Refactor Tools's ApiMessageFormatter --- checkstyle/import-control.xml | 1 + ...=> CoordinatorRecordMessageFormatter.java} | 57 ++++++++++--------- .../GroupMetadataMessageFormatter.java | 52 +++++++---------- .../consumer/OffsetsMessageFormatter.java | 53 +++++++---------- .../TransactionLogMessageFormatter.java | 53 +++++++---------- .../ShareGroupStateMessageFormatter.java | 3 +- .../GroupMetadataMessageFormatterTest.java | 13 ++--- .../consumer/OffsetMessageFormatterTest.java | 56 +++++++++--------- .../TransactionLogMessageFormatterTest.java | 15 +++-- 9 files changed, 143 insertions(+), 160 deletions(-) rename tools/src/main/java/org/apache/kafka/tools/consumer/{ApiMessageFormatter.java => CoordinatorRecordMessageFormatter.java} (53%) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 921db8162e33a..9938e0046c016 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -329,6 +329,7 @@ + diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java similarity index 53% rename from tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java rename to tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java index c3c529265e1f2..f94685180e326 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ApiMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java @@ -23,6 +23,9 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import java.io.IOException; import java.io.PrintStream; @@ -31,44 +34,44 @@ import static java.nio.charset.StandardCharsets.UTF_8; -public abstract class ApiMessageFormatter implements MessageFormatter { +public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter { private static final String TYPE = "type"; private static final String VERSION = "version"; private static final String DATA = "data"; private static final String KEY = "key"; private static final String VALUE = "value"; - static final String UNKNOWN = "unknown"; @Override public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { + if (Objects.isNull(consumerRecord.key())) return; + ObjectNode json = new ObjectNode(JsonNodeFactory.instance); + try { + CoordinatorRecord record = deserialize( + consumerRecord.key() != null ? ByteBuffer.wrap(consumerRecord.key()) : null, + consumerRecord.value() != null ? ByteBuffer.wrap(consumerRecord.value()) : null + ); + if (!shouldPrint(record.key().apiKey())) return; - byte[] key = consumerRecord.key(); - if (Objects.nonNull(key)) { - short keyVersion = ByteBuffer.wrap(key).getShort(); - JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key)); + json + .putObject(KEY) + .put(TYPE, record.key().apiKey()) + .set(DATA, keyAsJson(record.key())); - if (dataNode instanceof NullNode) { - return; + if (Objects.nonNull(record.value())) { + json + .putObject(VALUE) + .put(VERSION, record.value().version()) + .set(DATA, valueAsJson(record.value().message(), record.value().version())); + } else { + json.set(VALUE, NullNode.getInstance()); } - json.putObject(KEY) - .put(TYPE, keyVersion) - .set(DATA, dataNode); - } else { + } catch (CoordinatorLoader.UnknownRecordTypeException ex) { return; - } - - byte[] value = consumerRecord.value(); - if (Objects.nonNull(value)) { - short valueVersion = ByteBuffer.wrap(value).getShort(); - JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value)); - - json.putObject(VALUE) - .put(VERSION, valueVersion) - .set(DATA, dataNode); - } else { - json.set(VALUE, NullNode.getInstance()); + } catch (RuntimeException ex) { + throw new RuntimeException("Could not read record at offset " + consumerRecord.offset() + + " due to: " + ex.getMessage(), ex); } try { @@ -78,6 +81,8 @@ public void writeTo(ConsumerRecord consumerRecord, PrintStream o } } - protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer); - protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer); + protected abstract CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value); + protected abstract boolean shouldPrint(short recordType); + protected abstract JsonNode keyAsJson(ApiMessage message); + protected abstract JsonNode valueAsJson(ApiMessage message, short version); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java index e26f4d103d551..8e017e1b84c6a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatter.java @@ -16,45 +16,37 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; import java.nio.ByteBuffer; -public class GroupMetadataMessageFormatter extends ApiMessageFormatter { +public class GroupMetadataMessageFormatter extends CoordinatorRecordMessageFormatter { + private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde(); + + @Override + protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) { + return serde.deserialize(key, value); + } + + @Override + protected boolean shouldPrint(short recordType) { + return CoordinatorRecordType.GROUP_METADATA.id() == recordType; + } + @Override - protected JsonNode readToKeyJson(ByteBuffer byteBuffer) { - try { - switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) { - case GROUP_METADATA: - return GroupMetadataKeyJsonConverter.write( - new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0), - (short) 0 - ); - - default: - return NullNode.getInstance(); - } - } catch (UnsupportedVersionException ex) { - return NullNode.getInstance(); - } + protected JsonNode keyAsJson(ApiMessage message) { + return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message); } @Override - protected JsonNode readToValueJson(ByteBuffer byteBuffer) { - short version = byteBuffer.getShort(); - if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) { - return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version); - } - return new TextNode(UNKNOWN); + protected JsonNode valueAsJson(ApiMessage message, short version) { + return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java index 3cc715b934832..42ec4768920b7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/OffsetsMessageFormatter.java @@ -16,50 +16,41 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; import java.nio.ByteBuffer; /** * Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false. */ -public class OffsetsMessageFormatter extends ApiMessageFormatter { +public class OffsetsMessageFormatter extends CoordinatorRecordMessageFormatter { + private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde(); + + @Override + protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) { + return serde.deserialize(key, value); + } + @Override - protected JsonNode readToKeyJson(ByteBuffer byteBuffer) { - try { - switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) { - // We can read both record types with the offset commit one. - case LEGACY_OFFSET_COMMIT: - case OFFSET_COMMIT: - return OffsetCommitKeyJsonConverter.write( - new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0), - (short) 0 - ); + protected boolean shouldPrint(short recordType) { + return CoordinatorRecordType.OFFSET_COMMIT.id() == recordType || + CoordinatorRecordType.LEGACY_OFFSET_COMMIT.id() == recordType; + } - default: - return NullNode.getInstance(); - } - } catch (UnsupportedVersionException ex) { - return NullNode.getInstance(); - } + @Override + protected JsonNode keyAsJson(ApiMessage message) { + return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message); } @Override - protected JsonNode readToValueJson(ByteBuffer byteBuffer) { - short version = byteBuffer.getShort(); - if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) { - return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version); - } - return new TextNode(UNKNOWN); + protected JsonNode valueAsJson(ApiMessage message, short version) { + return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java index 574ce0ce63e6f..bcf79c997fcd7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java @@ -16,45 +16,36 @@ */ package org.apache.kafka.tools.consumer; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; -import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; -import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters; +import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; import java.nio.ByteBuffer; -public class TransactionLogMessageFormatter extends ApiMessageFormatter { +public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter { + private CoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde(); + + @Override + protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) { + return serde.deserialize(key, value); + } + + @Override + protected boolean shouldPrint(short recordType) { + return true; + } + @Override - protected JsonNode readToKeyJson(ByteBuffer byteBuffer) { - try { - switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) { - case TRANSACTION_LOG: - return TransactionLogKeyJsonConverter.write( - new TransactionLogKey(new ByteBufferAccessor(byteBuffer), (short) 0), - (short) 0 - ); - - default: - return NullNode.getInstance(); - } - } catch (UnsupportedVersionException ex) { - return NullNode.getInstance(); - } + protected JsonNode keyAsJson(ApiMessage message) { + return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message); } @Override - protected JsonNode readToValueJson(ByteBuffer byteBuffer) { - short version = byteBuffer.getShort(); - if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { - return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version); - } - return new TextNode(UNKNOWN); + protected JsonNode valueAsJson(ApiMessage message, short version) { + return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java index c5358b62d1b6c..38cf53be729e4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java @@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter; import java.io.IOException; import java.io.PrintStream; @@ -132,7 +133,7 @@ private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersio * as per RPC spec. * To differentiate, we need to use the corresponding key versions. This is acceptable as * the records will always appear in pairs (key, value). However, this means that we cannot - * extend {@link org.apache.kafka.tools.consumer.ApiMessageFormatter} as it requires overriding + * extend {@link CoordinatorRecordMessageFormatter} as it requires overriding * readToValueJson whose signature does not allow for passing keyversion. * * @param byteBuffer - Represents the raw data read from the topic diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java index 62ae43bd09c35..4a894c22fbb53 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/GroupMetadataMessageFormatterTest.java @@ -73,11 +73,6 @@ public class GroupMetadataMessageFormatterTest { private static Stream parameters() { return Stream.of( - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(), - "" - ), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(), @@ -126,11 +121,13 @@ private static Stream parameters() { Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), null, - "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"), + "{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}" + ), Arguments.of( null, MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(), - ""), + "" + ), Arguments.of(null, null, ""), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), @@ -142,7 +139,7 @@ private static Stream parameters() { @ParameterizedTest @MethodSource("parameters") - public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { + public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { ConsumerRecord record = new ConsumerRecord<>( TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java index 684c681a17573..f1546c23d5679 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java @@ -50,7 +50,7 @@ public class OffsetMessageFormatterTest { .setLeaderEpoch(10) .setMetadata("metadata") .setCommitTimestamp(1234L) - .setExpireTimestamp(-1L); + .setExpireTimestamp(5678L); private static final GroupMetadataKey GROUP_METADATA_KEY = new GroupMetadataKey().setGroup("group-id"); private static final GroupMetadataValue GROUP_METADATA_VALUE = new GroupMetadataValue() .setProtocolType("consumer") @@ -62,11 +62,6 @@ public class OffsetMessageFormatterTest { private static Stream parameters() { return Stream.of( - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 10, OFFSET_COMMIT_VALUE).array(), - "" - ), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), @@ -75,53 +70,58 @@ private static Stream parameters() { "\"commitTimestamp\":1234}}}" ), Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_VALUE).array(), + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "\"value\":{\"version\":0,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + + "\"commitTimestamp\":1234}}}" + ), + Arguments.of( + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "\"value\":{\"version\":1,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + - "\"commitTimestamp\":1234,\"expireTimestamp\":-1}}}" + "\"commitTimestamp\":1234,\"expireTimestamp\":5678}}}" ), Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 2, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "\"value\":{\"version\":2,\"data\":{\"offset\":100,\"metadata\":\"metadata\"," + "\"commitTimestamp\":1234}}}" ), Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 3, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "\"value\":{\"version\":3,\"data\":{\"offset\":100,\"leaderEpoch\":10," + "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" ), Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + "\"value\":{\"version\":4,\"data\":{\"offset\":100,\"leaderEpoch\":10," + "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" ), Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 4, OFFSET_COMMIT_VALUE).array(), - "" - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 5, OFFSET_COMMIT_VALUE).array(), - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":{\"version\":5,\"data\":\"unknown\"}}" + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "\"value\":{\"version\":5,\"data\":{\"offset\":100,\"leaderEpoch\":10," + + "\"metadata\":\"metadata\",\"commitTimestamp\":1234}}}" ), Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(), + MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_KEY).array(), null, - "{\"key\":{\"type\":0,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + - "\"value\":null}"), + "{\"key\":{\"type\":1,\"data\":{\"group\":\"group-id\",\"topic\":\"foo\",\"partition\":1}}," + + "\"value\":null}" + ), Arguments.of( null, MessageUtil.toVersionPrefixedByteBuffer((short) 1, OFFSET_COMMIT_VALUE).array(), - ""), + "" + ), Arguments.of(null, null, ""), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(), @@ -133,7 +133,7 @@ private static Stream parameters() { @ParameterizedTest @MethodSource("parameters") - public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { + public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { ConsumerRecord record = new ConsumerRecord<>( TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java index dc7946d5fa007..78ed1d3078e47 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatterTest.java @@ -70,28 +70,33 @@ private static Stream parameters() { MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 5, TXN_LOG_VALUE).array(), "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + - "\"value\":{\"version\":5,\"data\":\"unknown\"}}" + "\"value\":{\"version\":5,\"data\":{\"producerId\":100,\"producerEpoch\":50," + + "\"transactionTimeoutMs\":500,\"transactionStatus\":4,\"transactionPartitions\":[]," + + "\"transactionLastUpdateTimestampMs\":1000,\"transactionStartTimestampMs\":750}}}" ), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), - ""), + "" + ), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 0, TXN_LOG_KEY).array(), null, "{\"key\":{\"type\":0,\"data\":{\"transactionalId\":\"TXNID\"}}," + - "\"value\":null}"), + "\"value\":null}" + ), Arguments.of( null, MessageUtil.toVersionPrefixedByteBuffer((short) 1, TXN_LOG_VALUE).array(), - ""), + "" + ), Arguments.of(null, null, "") ); } @ParameterizedTest @MethodSource("parameters") - public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { + public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { ConsumerRecord record = new ConsumerRecord<>( TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, From 6c49bfb358c6e4ec0bd9fd795981c7fec72de77d Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 24 Jan 2025 17:27:51 +0100 Subject: [PATCH 2/2] fix imports --- .../tools/consumer/CoordinatorRecordMessageFormatter.java | 6 +++--- .../tools/consumer/TransactionLogMessageFormatter.java | 4 ++-- .../group/share/ShareGroupStateMessageFormatter.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java index f94685180e326..4c65b08d325a9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java @@ -18,14 +18,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; -import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import java.io.IOException; import java.io.PrintStream; diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java index bcf79c997fcd7..c4ee1d997c935 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java @@ -19,10 +19,10 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; +import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters; -import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; import java.nio.ByteBuffer; diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java index 38cf53be729e4..d695e17d0442e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java @@ -30,13 +30,13 @@ import org.apache.kafka.coordinator.share.generated.ShareUpdateKeyJsonConverter; import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; import org.apache.kafka.coordinator.share.generated.ShareUpdateValueJsonConverter; +import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; -import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter; import java.io.IOException; import java.io.PrintStream;