HTTP Kafka producer-proxy based on librdkafka & warp
In CityMobil we heavily rely on data-processing. It is widely used for pricing, statistics collection etc. Most of this data is processed from php-fpm.
For producing data to kafka internal librdkafka thread-pool is explicitly created. It is quite heavy operation for every php-fpm worker.
One of the easiest solutions is to use some external http-proxy. This is where 'kafka-proxy' (or kprf) comes to play.
rustc 1.51.0
is required to build kafka-proxy.
- To build debug version of kafka-proxy:
make # builds rust-debug version
Built debug binary is stored at target/debug/kprf
- To build release version of kafka-proxy:
make build_release
Built release binary is stored at target/release/kprf
- To run kafka-proxy(release version is recommended to use in production environment):
target/release/kprf --config=config_example.yaml
Following examples work if kafka-proxy is set up and running.
# Try asynchronous producing
curl 'http://127.0.0.1:4242/push' -H 'Content-Type: application/json' -d '{"records": [{"topic": "SOME_TOPIC", "data": "{"\a"\: "\b"\}"}],
"wait_for_send": false}'
# Possible success response:
{"status": "ok", "errors": []}
# Possible erroring response:
{"status": "error", "errors": [{"status": "error", "error": "some_message"}, {"status": "ok", "error":""}]}
JSON Fields:
records
- describes records for further producing.records[i].topic
- describes some kafka topic for producing.records[i].data
- describes some string data. Can be JSON, XML or anything.wait_for_send
- describes if http-producer client has to wait for delivery result or not. If false, message is produced asynchronously. Otherwise, synchronously. Default value isfalse
At this moment, this options from librdkafka are supported:
kafka.brokers
- alias forbootstrap.servers
from librdkafka. Default value is empty array.kafka.user
- alias forsasl.username
from librdkafka. Default value is empty string.kafka.password
- alias forsasl.password
from librdkafka. Default value is empty string.kafka.message_max_bytes
- alias formessage.max.bytes
from librdkafka. Default value is1 MiB
.kafka.queue_buffering_max_messages
- alias forqueue.buffering.max.messages
from librdkafka. Default value is100000
.kafka.queue_buffering_max_ms
- alias forqueue.buffering.max.ms
from librdkafka. Default value is10
.kafka.queue_buffering_max_kbytes
- alias forqueue.buffering.max.kbytes
from librdkafka. Default value is1048576
.kafka.retries
- alias forretries
from librdkafka. Default value is3
.kafka.message_timeout_ms
- alias formessage.timeout.ms
from librdkafka. Default value is2000
.kafka.request_timeout_ms
- alias forrequest.timeout.ms
from librdkafka. Default value is30000
.kafka.request_required_acks
- alias forrequest.required.acks
from librdkafka. Default value is-1
.http.port
- port for HTTP server for producing messages. Default value is4242
output_file
- output file for logging. Default value is/dev/stdout
kafka:
brokers:
- '127.0.0.1:9092'
request_required_acks: 1
queue_buffering_max_ms: 20
queue_buffering_max_kbytes: 2048 # 2 MiB
http:
port: 4242
metrics_port: 8088
output_file: "/dev/stdout"
ratelimit: # ratelimit settings.
enabled: true # enables or disables ratelimiting.
rules: # defines list of rules for concrete topics.
- topic_name: "some_topic_name" # concrete topic name.
max_requests_per_minute: 42 # maximum requests per minute allowed for concrete topic.
We managed to get such results with wrk
on v0.0.1-alpha version with 4 KiB
message size.
External host configuration:
CPU: 2-core Intel Core Processor (Broadwell) 2095 MHz.
RAM: 4 GiB DDR4
Virtualization: VT-x
Hypervisor vendor: KVM.
Kafka-proxy configuration:
kafka:
brokers:
- "some_kafka_broker_1.some_external_server"
- "some_kafka_broker_2.some_external_server"
- "some_kafka_broker_3.some_external_server"
- "some_kafka_broker_4.some_external_server"
- "some_kafka_broker_5.some_external_server"
request_required_acks: 1
queue_buffering_max_ms: 10
queue_buffering_max_kbytes: 2048 # 2 MiB
http:
port: 4242
- Asynchronous producing
wrk -s wrk.lua -t256 -c256 -d120s 'http://some_external_host:4242/push'
Running 2m test @ http://some_external_host:4242/push
256 threads and 256 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 20.72ms 11.53ms 145.52ms 72.16%
Req/Sec 49.57 21.97 1.02k 87.15%
864241 requests in 1.14m, 108.79MB read
Requests/sec: 12672.49
Transfer/sec: 1.60MB
- Synchronous producing
256 threads and 256 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 31.41ms 88.92ms 1.01s 96.82%
Req/Sec 59.35 13.91 111.00 74.87%
1765717 requests in 2.00m, 222.28MB read
Requests/sec: 14701.92
Transfer/sec: 1.85MB
All metrics are available in OpenMetrics(prometheus) format on :8088
port(non-configurable at v0.0.1-alpha).
At this moment(v0.1.0) these metrics are available:
http_requests_duration
- Histogram of HTTP requests. Labelpush/sync
is set when message produced asynchronously, otherwise,push/async
is set.code
label is set according to HTTP response status.kafka_internal_queue_size
- Gauge of internal kafka-queue size, per topic.kafka_message_send_duration
- Histogram of kafka message duration before delivery result callback is received, per topic.kafka_sent_messages
- Counter of total kafka messages sent, per topic.kafka_errors_count
- Counter of total kafka errors, per topic.ratelimit_messages_count
- Counter of total ratelimited messages, per topic.
Kafka librdkafka metrics:
kafka_producer_reply_queue_size
- Operations (callbacks, events, etc.) waiting in queue.kafka_producer_current_messages_in_queue
- Current number of messages in producer queues.kafka_producer_current_messages_in_queue_bytes
- Current total size of messages in producer queues.kafka_producer_total_requests_count
- Total number of requests sent to brokerskafka_producer_total_bytes_sent
- Total number of bytes transmitted to brokerskafka_producer_total_responses_received
- Total number of responses received from brokerskafka_producer_total_bytes_received
- Total number of bytes received from brokerskafka_producer_total_messages_sent
- Total number of messages transmitted (produced) to brokerskafka_producer_total_messages_bytes_sent
- Total number of bytes transmitted (produced) to brokerskafka_producer_metadata_cache_topics_count
- Number of topics in the metadata cachekafka_producer_broker_state
- Broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY, AUTH_HANDSHAKE, UP, UPDATE).kafka_producer_broker_state_age
- The time since the last broker state change, in microsecondskafka_producer_broker_outbuf_count
- Number of requests awaiting transmission to the brokerkafka_producer_broker_outbuf_msg_count
- Number of messages awaiting transmission to the brokerkafka_producer_broker_waitresp_count
- Number of requests in-flight to the broker that are awaiting responsekafka_producer_broker_waitresp_msg_count
- Number of messages awaiting transmission to the brokerkafka_producer_broker_requests_sent
- Total number of requests sent to the brokerkafka_producer_broker_requests_sent_bytes
- Total number of bytes sent to the brokerkafka_producer_broker_transmission_errors
- Total number of transmission errorskafka_producer_broker_request_retries
- Total number of request retrieskafka_producer_request_timeouts
- Total number of requests that timed outkafka_producer_broker_responses_count
- Total number of responses received from the brokerkafka_producer_broker_bytes_received
- Total number of bytes received from the brokerkafka_producer_broker_errors_count
- Total number of received errorskafka_producer_topic_metadata_age
- The age of the client's metadata for this topic, in millisecondskafka_producer_topic_batchsize_avg
- Rolling window statistics for batch sizes, in byteskafka_producer_topic_batchcount_avg
- Rolling window statistics for batch message counts
Write-Ahead-Log
. Write-Ahead-Log can be a good improvement if pattern of usage is asynchronous producing. Client does not know if his records were successfully sent to kafka. AddingWAL
can improve durability.