Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) KAFKA-18616; Refactor Tools's ApiMessageFormatter #18695

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@

<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<subpackage name="group">
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="kafka.api"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
Expand All @@ -31,44 +34,44 @@

import static java.nio.charset.StandardCharsets.UTF_8;

public abstract class ApiMessageFormatter implements MessageFormatter {
public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter {

private static final String TYPE = "type";
private static final String VERSION = "version";
private static final String DATA = "data";
private static final String KEY = "key";
private static final String VALUE = "value";
static final String UNKNOWN = "unknown";

@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
if (Objects.isNull(consumerRecord.key())) return;

ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
try {
CoordinatorRecord record = deserialize(
consumerRecord.key() != null ? ByteBuffer.wrap(consumerRecord.key()) : null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line#47 has checked the null, so this check is redundant.

consumerRecord.value() != null ? ByteBuffer.wrap(consumerRecord.value()) : null
);
if (!shouldPrint(record.key().apiKey())) return;

byte[] key = consumerRecord.key();
if (Objects.nonNull(key)) {
short keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
json
.putObject(KEY)
.put(TYPE, record.key().apiKey())
.set(DATA, keyAsJson(record.key()));

if (dataNode instanceof NullNode) {
return;
if (Objects.nonNull(record.value())) {
json
.putObject(VALUE)
.put(VERSION, record.value().version())
.set(DATA, valueAsJson(record.value().message(), record.value().version()));
} else {
json.set(VALUE, NullNode.getInstance());
}
json.putObject(KEY)
.put(TYPE, keyVersion)
.set(DATA, dataNode);
} else {
} catch (CoordinatorLoader.UnknownRecordTypeException ex) {
return;
}

byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value));

json.putObject(VALUE)
.put(VERSION, valueVersion)
.set(DATA, dataNode);
} else {
json.set(VALUE, NullNode.getInstance());
} catch (RuntimeException ex) {
throw new RuntimeException("Could not read record at offset " + consumerRecord.offset() +
" due to: " + ex.getMessage(), ex);
}

try {
Expand All @@ -78,6 +81,8 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
}
}

protected abstract JsonNode readToKeyJson(ByteBuffer byteBuffer);
protected abstract JsonNode readToValueJson(ByteBuffer byteBuffer);
protected abstract CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value);
protected abstract boolean shouldPrint(short recordType);
protected abstract JsonNode keyAsJson(ApiMessage message);
protected abstract JsonNode valueAsJson(ApiMessage message, short version);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,37 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;

public class GroupMetadataMessageFormatter extends ApiMessageFormatter {
public class GroupMetadataMessageFormatter extends CoordinatorRecordMessageFormatter {
private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();

@Override
protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) {
return serde.deserialize(key, value);
}

@Override
protected boolean shouldPrint(short recordType) {
return CoordinatorRecordType.GROUP_METADATA.id() == recordType;
}

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
case GROUP_METADATA:
return GroupMetadataKeyJsonConverter.write(
new GroupMetadataKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);

default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
return GroupMetadataValueJsonConverter.write(new GroupMetadataValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,41 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;

/**
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
*/
public class OffsetsMessageFormatter extends ApiMessageFormatter {
public class OffsetsMessageFormatter extends CoordinatorRecordMessageFormatter {
private CoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move serde to CoordinatorRecordMessageFormatter like CoordinatorRecordMessageParser?


@Override
protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) {
return serde.deserialize(key, value);
}

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
// We can read both record types with the offset commit one.
case LEGACY_OFFSET_COMMIT:
case OFFSET_COMMIT:
return OffsetCommitKeyJsonConverter.write(
new OffsetCommitKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);
protected boolean shouldPrint(short recordType) {
return CoordinatorRecordType.OFFSET_COMMIT.id() == recordType ||
CoordinatorRecordType.LEGACY_OFFSET_COMMIT.id() == recordType;
}

default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
@Override
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
return OffsetCommitValueJsonConverter.write(new OffsetCommitValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,36 @@
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordJsonConverters;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.nio.ByteBuffer;

public class TransactionLogMessageFormatter extends ApiMessageFormatter {
public class TransactionLogMessageFormatter extends CoordinatorRecordMessageFormatter {
private CoordinatorRecordSerde serde = new TransactionCoordinatorRecordSerde();

@Override
protected CoordinatorRecord deserialize(ByteBuffer key, ByteBuffer value) {
return serde.deserialize(key, value);
}

@Override
protected boolean shouldPrint(short recordType) {
return true;
}

@Override
protected JsonNode readToKeyJson(ByteBuffer byteBuffer) {
try {
switch (CoordinatorRecordType.fromId(byteBuffer.getShort())) {
case TRANSACTION_LOG:
return TransactionLogKeyJsonConverter.write(
new TransactionLogKey(new ByteBufferAccessor(byteBuffer), (short) 0),
(short) 0
);

default:
return NullNode.getInstance();
}
} catch (UnsupportedVersionException ex) {
return NullNode.getInstance();
}
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}

@Override
protected JsonNode readToValueJson(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return TransactionLogValueJsonConverter.write(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version), version);
}
return new TextNode(UNKNOWN);
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.coordinator.share.generated.ShareUpdateKeyJsonConverter;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValueJsonConverter;
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
Expand Down Expand Up @@ -132,7 +133,7 @@ private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersio
* as per RPC spec.
* To differentiate, we need to use the corresponding key versions. This is acceptable as
* the records will always appear in pairs (key, value). However, this means that we cannot
* extend {@link org.apache.kafka.tools.consumer.ApiMessageFormatter} as it requires overriding
* extend {@link CoordinatorRecordMessageFormatter} as it requires overriding
* readToValueJson whose signature does not allow for passing keyversion.
*
* @param byteBuffer - Represents the raw data read from the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public class GroupMetadataMessageFormatterTest {

private static Stream<Arguments> parameters() {
return Stream.of(
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 10, GROUP_METADATA_VALUE).array(),
""
),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
MessageUtil.toVersionPrefixedByteBuffer((short) 0, GROUP_METADATA_VALUE).array(),
Expand Down Expand Up @@ -126,11 +121,13 @@ private static Stream<Arguments> parameters() {
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 2, GROUP_METADATA_KEY).array(),
null,
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"),
"{\"key\":{\"type\":2,\"data\":{\"group\":\"group-id\"}},\"value\":null}"
),
Arguments.of(
null,
MessageUtil.toVersionPrefixedByteBuffer((short) 4, GROUP_METADATA_VALUE).array(),
""),
""
),
Arguments.of(null, null, ""),
Arguments.of(
MessageUtil.toVersionPrefixedByteBuffer((short) 0, OFFSET_COMMIT_KEY).array(),
Expand All @@ -142,7 +139,7 @@ private static Stream<Arguments> parameters() {

@ParameterizedTest
@MethodSource("parameters")
public void testTransactionLogMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) {
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC, 0, 0,
0L, TimestampType.CREATE_TIME, 0,
Expand Down
Loading
Loading