Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for Kafka Keys, and handles nil for both keys and values… #39

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
94 changes: 77 additions & 17 deletions kafka-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,28 @@ func getConfig(metadata kafkaMetadata) (*sarama.Config, error) {
return config, nil
}

func extractControlHeaders(headers []sarama.RecordHeader) ([]byte, bool, []sarama.RecordHeader) {
var (
key []byte
)
tombstone := false
cleaned := make([]sarama.RecordHeader, 0, len(headers))
for _, header := range headers {
if strings.ToLower(string(header.Key)) == "keda-message-key" {
key = header.Value
continue
}

if strings.ToLower(string(header.Key)) == "keda-message-tombstone" {
tombstone = true
continue
}
cleaned = append(cleaned, header)
}

return key, tombstone, cleaned
}

// kafkaConnector represents a Sarama consumer group consumer
type kafkaConnector struct {
ready chan bool
Expand Down Expand Up @@ -206,7 +228,7 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
conn.logger.Info(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic))
conn.logger.Info(fmt.Sprintf("Message claimed: key = %s, value = %s, timestamp = %v, topic = %s", string(message.Key), string(message.Value), message.Timestamp, message.Topic))
msg := string(message.Value)

headers := http.Header{
Expand All @@ -217,6 +239,17 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl
"KEDA-Source-Name": {conn.connectorData.SourceName},
}

// Add the message key, if it's been set.
if message.Key != nil {
headers.Add("KEDA-Message-Key", string(message.Key))
}

// Indicate that this is a tombstone, not a empty message.
// Normally indicative of a deletion request
if message.Value == nil {
headers.Add("KEDA-Message-Tombstone", "true")
}

// Set the headers came from Kafka record
for _, h := range message.Headers {
if utf8.ValidString(string(h.Value)) {
Expand All @@ -226,21 +259,14 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl

resp, err := common.HandleHTTPRequest(msg, headers, conn.connectorData, conn.logger)
if err != nil {
conn.errorHandler(err)
conn.errorHandler(resp, err)
} else {
body, err := io.ReadAll(resp.Body)
if err != nil {
conn.errorHandler(err)
conn.errorHandler(nil, err)
} else {
// Generate Kafka record headers
var kafkaRecordHeaders []sarama.RecordHeader

for k, v := range resp.Header {
// One key may have multiple values
for _, v := range v {
kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)})
}
}
kafkaRecordHeaders := mapHeaders(resp)
if success := conn.responseHandler(string(body), kafkaRecordHeaders); success {
session.MarkMessage(message, "")
}
Expand All @@ -254,12 +280,33 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl
return nil
}

func (conn *kafkaConnector) errorHandler(err error) {
func mapHeaders(resp *http.Response) []sarama.RecordHeader {
var kafkaRecordHeaders []sarama.RecordHeader

for k, v := range resp.Header {
// One key may have multiple values
for _, v := range v {
kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)})
}
}
return kafkaRecordHeaders
}

func (conn *kafkaConnector) errorHandler(resp *http.Response, err error) {
if len(conn.connectorData.ErrorTopic) > 0 {
_, _, e := conn.producer.SendMessage(&sarama.ProducerMessage{
message := &sarama.ProducerMessage{
Topic: conn.connectorData.ErrorTopic,
Value: sarama.StringEncoder(err.Error()),
})
}
if resp != nil {
kafkaRecordHeaders := mapHeaders(resp)
key, _, headers := extractControlHeaders(kafkaRecordHeaders)
message.Headers = headers
if key != nil {
message.Key = sarama.StringEncoder(key)
}
}
_, _, e := conn.producer.SendMessage(message)
if e != nil {
conn.logger.Error("failed to publish message to error topic",
zap.Error(e),
Expand All @@ -277,12 +324,25 @@ func (conn *kafkaConnector) errorHandler(err error) {
}

func (conn *kafkaConnector) responseHandler(msg string, headers []sarama.RecordHeader) bool {

// extract the key and tombstone should they exist.
key, tombstone, headers := extractControlHeaders(headers)

if len(conn.connectorData.ResponseTopic) > 0 {
_, _, err := conn.producer.SendMessage(&sarama.ProducerMessage{
message := &sarama.ProducerMessage{
Topic: conn.connectorData.ResponseTopic,
Value: sarama.StringEncoder(msg),
Headers: headers,
})
}

if key != nil {
message.Key = sarama.StringEncoder(key)
}

if len(msg) > 0 || !tombstone {
message.Value = sarama.StringEncoder(msg)
}

_, _, err := conn.producer.SendMessage(message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for us to do the same for error topic as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add that.

if err != nil {
conn.logger.Warn("failed to publish response body from http request to topic",
zap.Error(err),
Expand Down