-
Notifications
You must be signed in to change notification settings - Fork 0
Home
The documentation on ingesting JSON document into ksqlDB stream is over here instead of the main page to keep it organised.
With JSON data, you can determine which key you want it ingest into ksqlDB stream instead of ingesting all fields. When creating the stream, you can define which of the interested JSON key field. Special syntax is required if your JSON contains array or a struct.
Example of JSON struct
"chunk-header": {
"timestamp": 1688299607,
"engine-type": "avenger",
"engine-version": "0.0.3.1",
"node-id": "xxxxxx3b-be74-57f2-91cd-50609364yyyy"
},
Example of JSON array
"dns-message": {
"id": 33018,
"rcode": "noerror",
"opcode": "query",
"flags": ["qr", "rd", "ra"],
"question": [
{
"name": "www.akamai.com.",
"rdclass": "IN",
"rdtype": "A"
}
],
To have the "timestamp" key field within JSON struct, you will define it with chunk-header
<STRUCT<timestamp. The same rule applies, "-" hyphen is a reserved word in kafka, to use it you need back quote `` . Remember to close the bracket with >> for the struct
For key "name" field inside the array, you will define it with `dns-message` STRUCT<question ARRAY<STRUCT<name VARCHAR >>>
. This will extract the "name" field inside the array to be used in the kafka stream.
An example of a stream statement to create a stream with the name dnsquery
create stream dnsquery (`start-time` BIGINT, `dns-message` STRUCT<question ARRAY<STRUCT<name VARCHAR >>>, `client-address` VARCHAR) with (KAFKA_TOPIC='dns-base', VALUE_FORMAT='json');
Note:
- If you are ingesting the JSON text file using the kafka-console-producer.sh utility, ensure your JSON file is not formatted with spacing and carriage return. Otherwise kafka stream will not be able to process it and return empty records. An example of working JSON record with no spacing/formatting is as below
{"start-time":1688307712430112,"flags":["rd","res","simulated"],"client-port":0,"address-family":4,"elapsed-time":73958,"dns-message":{"id":27830,"rcode":"noerror","opcode":"query","flags":["qr","rd","ra"],"question":[{"name":"www.cloudflare.com.","rdclass":"IN","rdtype":"A"}],"answer":[],"authority":[],"additional":[]},"client-address":"192.168.0.100","server-address":"127.0.0.1","server-port":9434,"view":"world","resolver":"world","request-length":36,"response-length":68,"core-domain":"cloudflare.com.","chunk-header":{"timestamp":1688307712,"engine-type":"cacheserve","engine-version":"7.6.3.1","node-id":"6962303b-be74-57f2-91cd-506093640287"},"chunk-offset":0,"kafka-partition":5,"kafka-offset":0,"kafka-timestamp":1688307712.723}
- If you do not have a kafka producer client, you can ingest it using the kafka-console-producer.sh utility by doing kafka-console-producer.sh --bootstrap-server [broker IP:port] --topic [topic name] < [Path and filename of JSON file]
example: kafka-console-producer.sh --bootstrap-server 192.168.1.1:9092 --topic mydns < dns.json
After a stream has been created in Ksql, and data produced to the kafka topic, we can now execute SQL commands to view the records. To show all created streams in Ksql,
- List all created streams using ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
BOTNET | webserver | DELIMITED | DELIMITED | false
DNSQUERY | dns-base | KAFKA | JSON | false
EXAMPLE | honey | DELIMITED | DELIMITED | false
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
------------------------------------------------------------------------------------------
- To view all records for dnsquery stream
ksql> select * from dnsquery;
+-----------------------------------+-----------------------------------+-----------------------------------+
|start-time |dns-message |client-address |
+-----------------------------------+-----------------------------------+-----------------------------------+
|1688299229271183 |{QUESTION=[{NAME=www.akamai.com.}]}|192.168.0.100 |
|1688307751758335 |{QUESTION=[{NAME=www.cloudflare.com|192.168.0.200 |
| |.}]} | |
|1688307720400095 |{QUESTION=[{NAME=www.amazon.com.}]}|192.168.0.100 |
|1688294041192363 |{QUESTION=[{NAME=www.google.com.}]}|192.168.0.11 |
|1688307739486080 |{QUESTION=[{NAME=www.akamai.com.}]}|192.168.0.200 |
|1688307712430112 |{QUESTION=[{NAME=www.cloudflare.com|192.168.0.100 |
| |.}]} | |
|1688299607364724 |{QUESTION=[{NAME=www.akamai.com.}]}|192.168.0.100 |
Query Completed
Query terminated
3. To beautify and trim the '{' and '[', you can use the select command with
ksql> select `start-time`, `dns-message`->question[1]->name, `client-address` from dnsquery;
+-----------------------------------+-----------------------------------+-----------------------------------+
|start-time |NAME |client-address |
+-----------------------------------+-----------------------------------+-----------------------------------+
|1688299229271183 |www.akamai.com. |192.168.0.100 |
|1688307751758335 |www.cloudflare.com. |192.168.0.200 |
|1688307720400095 |www.amazon.com. |192.168.0.100 |
|1688294041192363 |www.google.com. |192.168.0.11 |
|1688307739486080 |www.akamai.com. |192.168.0.200 |
|1688307712430112 |www.cloudflare.com. |192.168.0.100 |
to have it permanent, when you create a Ksql table, you can use the select statement and emit changes over to a materialise table.
All other sql statement works for the kafka stream and materialise table.