diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index d3542f7d..9dcef71a 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -43,6 +43,7 @@ import kafdrop.util.DefaultMessageDeserializer; import kafdrop.util.DefaultMessageSerializer; import kafdrop.util.Deserializers; +import kafdrop.util.IntMessageDeserializer; import kafdrop.util.KeyFormat; import kafdrop.util.MessageDeserializer; import kafdrop.util.MessageFormat; @@ -435,6 +436,8 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form deserializer = new ProtobufSchemaRegistryMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth); } else if (format == MessageFormat.MSGPACK) { deserializer = new MsgPackMessageDeserializer(); + } else if (format == MessageFormat.INT) { + deserializer = new IntMessageDeserializer(); } else { deserializer = new DefaultMessageDeserializer(); } diff --git a/src/main/java/kafdrop/util/ByteUtils.java b/src/main/java/kafdrop/util/ByteUtils.java index 6239832f..1cd9219d 100644 --- a/src/main/java/kafdrop/util/ByteUtils.java +++ b/src/main/java/kafdrop/util/ByteUtils.java @@ -12,6 +12,10 @@ static String readString(ByteBuffer buffer) { return new String(readBytes(buffer), StandardCharsets.UTF_8); } + static String readInt(ByteBuffer buffer) { + return String.valueOf(buffer.getInt()); + } + private static byte[] readBytes(ByteBuffer buffer) { return readBytes(buffer, buffer.limit()); } diff --git a/src/main/java/kafdrop/util/IntMessageDeserializer.java b/src/main/java/kafdrop/util/IntMessageDeserializer.java new file mode 100644 index 00000000..ff3696fe --- /dev/null +++ b/src/main/java/kafdrop/util/IntMessageDeserializer.java @@ -0,0 +1,12 @@ +package kafdrop.util; + +import java.nio.ByteBuffer; + +public class IntMessageDeserializer implements MessageDeserializer { + @Override + public String deserializeMessage(ByteBuffer buffer) { + return ByteUtils.readInt(buffer); + } + + +} diff --git a/src/main/java/kafdrop/util/KeyFormat.java b/src/main/java/kafdrop/util/KeyFormat.java index 9d38d71c..dbb60815 100644 --- a/src/main/java/kafdrop/util/KeyFormat.java +++ b/src/main/java/kafdrop/util/KeyFormat.java @@ -1,5 +1,5 @@ package kafdrop.util; public enum KeyFormat { - DEFAULT, AVRO + DEFAULT, INT, AVRO } diff --git a/src/main/java/kafdrop/util/MessageFormat.java b/src/main/java/kafdrop/util/MessageFormat.java index 7b64f5cf..b5176c60 100644 --- a/src/main/java/kafdrop/util/MessageFormat.java +++ b/src/main/java/kafdrop/util/MessageFormat.java @@ -1,5 +1,5 @@ package kafdrop.util; public enum MessageFormat { - DEFAULT, AVRO, PROTOBUF, MSGPACK + DEFAULT, INT, AVRO, PROTOBUF, MSGPACK }