Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to send messages to the kafka topics #185

Closed
chandana194 opened this issue Dec 26, 2022 · 40 comments
Closed

Unable to send messages to the kafka topics #185

chandana194 opened this issue Dec 26, 2022 · 40 comments
Assignees
Labels
🐛 Bug Something isn't working ❓ Question Further information is requested

Comments

@chandana194
Copy link

chandana194 commented Dec 26, 2022

Hi,
I am trying the send messages to already available kafka topics. I am able to successfully make a connection to the kafka broker and read the existing topics, but when I try to produce and consume messages I am not getting any error on k6 and also the messages are not reaching the topic. Could you please help me on this ?

Script snippet :

import { check } from "k6";
//import {fs} from 'fs';
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
  SCHEMA_TYPE_STRING,
  SASL_SCRAM_SHA512,
} from "k6/x/kafka"; // import kafka extension

export const options = {
  
  vus: 1,
  iteration : 1,
 
};

const brokers = ["bootstrap-server-name:portnumber"];
const topic = "topic-name";

const saslConfig = {
  
  username: "<username>",
  password: "<pwd>",
  algorithm: SASL_SCRAM_SHA512,
  clientCertPem: "pathto/client.pem",
  serverCaPem: "pathto/serverCa.pem",
  
};
const tlsConfig = {
 
  enableTls: true,
  insecureSkipTlsVerify: true,
  clientCertPem: "pathto/client.pem",
  serverCaPem: "pathto/serverCa.pem",
  
};

const offset = 0;
// partition and groupId are mutually exclusive
const partition = 0;
const numPartitions = 1;
const replicationFactor = 1;

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  sasl: saslConfig,
  tls: tlsConfig,
});
const reader = new Reader({
  brokers: brokers,
  topic: topic,
  //partition: partition,
  //maxWait: 300,
  offset: offset,
  sasl: saslConfig,
  tls: tlsConfig,
});
const connection = new Connection({
  address: brokers[0],
  sasl: saslConfig,
  tls: tlsConfig,
});
const schemaRegistry = new SchemaRegistry();



if (__VU == 0) {
  console.log(
    "Existing topics: ",
    connection.listTopics(saslConfig, tlsConfig)
  );
}

export default function () {
  for (let index = 0; index < 2; index++) {
    let messages = [
      {
        key: schemaRegistry.serialize({
          data: 
             "4096525_541" ,
          schemaType: SCHEMA_TYPE_STRING,
        }),
        value: schemaRegistry.serialize({
          data: {
            "eventId" : "22358d8b-9826-4bb0-b209-cd13a0ae1ff", //random UUI
            "itemNumber" : 476183, 
            "storeNumber": 4105, 
            "retailPricePriorAmount": 10.99,
            "retailPriceAmount": 10.99,
            "labelPrice": 8.77,
            "sellingPriceAmount" : 8.77,
            "winningLabelAdv" : "WAS_PP",
            "priceTypeCode": 2,
            "priceDisplayCode": 3,
            "advEvents": [{
                "eventType": "WAS_PP",
                "eventDescription" : "WAS NCF ITEMS ALICIA",
                "eventStartDate" : "2022-12-21T00:00:00Z",
                "eventEndDate" : "2023-07-01T23:59:59.999Z",
                "dollarOffPrice": 8.77,
                "percentOff" : 0,
                "sourceId" : "CORP"
            }],
            "nlpIndicator": "N",
            "mgrPriceIndicator": "N",
            "updateId": "PEWINCMP",
            "createdTS": "2022-11-09T06:07:16.733+0000",
            "updatedTS": "2022-11-09T06:07:16.733+0000"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ];
//console.log(new Date())
    writer.produce({ messages: messages });
  }

  let messages = reader.consume({ limit: 2 });
  check(messages, {
    "2 messages returned": (msgs) => msgs.length == 2,
  });
 
}

export function teardown(data) {
  writer.close();
  reader.close();
  connection.close();
}

K6 run logs :

./k6 run -v '/Users/cnagara/Documents/kafkaTesting/spexKafkaTesting.js'                 
DEBU[0000] Logger format: TEXT                          
DEBU[0000] k6 version: v0.41.0 ((devel), go1.18.3, darwin/amd64) 

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

DEBU[0000] Resolving and reading test '/Users//Documents/kafkaTesting/spexKafkaTesting.js'... 
DEBU[0000] Loading...                                    moduleSpecifier="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js" originalModuleSpecifier=/Users//Documents/kafkaTesting/spexKafkaTesting.js
DEBU[0000] '/Users//Documents/kafkaTesting/spexKafkaTesting.js' resolved to 'file:///Users/cnagara/Documents/kafkaTesting/spexKafkaTesting.js' and successfully loaded 5165 bytes! 
DEBU[0000] Gathering k6 runtime options...              
DEBU[0000] Initializing k6 runner for '/Users//Documents/kafkaTesting/spexKafkaTesting.js' (file:///Users//Documents/kafkaTesting/spexKafkaTesting.js)... 
DEBU[0000] Detecting test type for...                    test_path="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js"
DEBU[0000] Trying to load as a JS test...                test_path="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js"
DEBU[0000] Babel: Transformed                            t=127.309875ms
INFO[0005] Existing topics:  ["sync","pendingprice"]  source=console
WARN[0005] There were unknown fields in the options exported in the script  error="json: unknown field \"iteration\""
DEBU[0005] Runner successfully initialized!             
DEBU[0005] Parsing CLI flags...                         
DEBU[0005] Consolidating config layers...               
DEBU[0005] Parsing thresholds and validating config...  
DEBU[0005] Initializing the execution scheduler...      
DEBU[0005] Starting 1 outputs...                         component=output-manager
DEBU[0005] Starting...                                   component=metrics-engine-ingester
DEBU[0005] Started!                                      component=metrics-engine-ingester
  execution: local
     script: /Users/Documents/kafkaTesting/spexKafkaTesting.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

DEBU[0005] Starting the REST API server on localhost:6565 
DEBU[0005] Initialization starting...                    component=engine
DEBU[0005] Starting emission of VUs and VUsMax metrics... 
DEBU[0005] Start of initialization                       executorsCount=1 neededVUs=1 phase=local-execution-scheduler-init
DEBU[0010] Initialized VU #1                             phase=local-execution-scheduler-init
DEBU[0010] Finished initializing needed VUs, start initializing executors...  phase=local-execution-scheduler-init
DEBU[0010] Initialized executor default                  phase=local-execution-scheduler-init
DEBU[0010] Initialization completed                      phase=local-execution-scheduler-init
DEBU[0010] Execution scheduler starting...               component=engine
DEBU[0010] Start of test run                             executorsCount=1 phase=local-execution-scheduler-run
DEBU[0010] Running setup()                               phase=local-execution-scheduler-run
DEBU[0010] Metrics processing started...                 component=engine
INFO[0015] Existing topics:  ["pendingprice","promocentral"]  source=console
DEBU[0015] Start all executors...                        phase=local-execution-scheduler-run
DEBU[0015] Starting executor                             executor=default startTime=0s type=per-vu-iterations
DEBU[0015] Starting executor run...                      executor=per-vu-iterations iterations=1 maxDuration=10m0s scenario=default type=per-vu-iterations vus=1
INFO[0015] "2022-12-26T08:15:14.116Z"                    source=console
INFO[0020] "2022-12-26T08:15:18.657Z"                    source=console
^CDEBU[0190] Stopping k6 in response to signal...          sig=interrupt
DEBU[0190] Engine: Thresholds terminated                 component=engine
ERRO[0190] Unable to read messages.                      error="Unable to read messages."
DEBU[0190] run: context expired; exiting...              component=engine
DEBU[0190] Metrics emission of VUs and VUsMax metrics stopped 
DEBU[0190] Executor finished successfully                executor=default startTime=0s type=per-vu-iterations
DEBU[0190] Running teardown()                            phase=local-execution-scheduler-run
INFO[0194] Existing topics:  ["promocentral","pendingprice"]  source=console
DEBU[0194] Execution scheduler terminated                component=engine error="<nil>"
DEBU[0194] Processing metrics and thresholds after the test run has ended...  component=engine
DEBU[0194] Stopping...                                   component=metrics-engine-ingester
DEBU[0194] Stopped!                                      component=metrics-engine-ingester
DEBU[0194] Engine run terminated cleanly                

running (03m04.2s), 0/1 VUs, 0 complete and 1 interrupted iterations
default ✗ [--------------------------------------] 1 VUs  02m54.5s/10m0s  0/1 iters, 1 per VU
WARN[0194] No script iterations finished, consider making the test duration longer 
INFO[0199] Existing topics:  ["promocentral","pendingprice"]  source=console

     █ teardown

     data_received................: 0 B    0 B/s
     data_sent....................: 0 B    0 B/s
     iteration_duration...........: avg=63.75µs min=63.75µs med=63.75µs max=63.75µs p(90)=63.75µs p(95)=63.75µs
     kafka_writer_acks_required...: 0      min=0                  max=0 
     kafka_writer_async...........: 0.00%  ✓ 02   
     kafka_writer_attempts_max....: 0      min=0                  max=0 
     kafka_writer_batch_bytes.....: 1.3 kB 7.1105804965559365 B/s
     kafka_writer_batch_max.......: 1      min=1                  max=1 
     kafka_writer_batch_size......: 2      0.010856/s
     kafka_writer_batch_timeout...: 0s     min=0s                 max=0s
     kafka_writer_error_count.....: 0      0/s
     kafka_writer_message_bytes...: 1.3 kB 7.1105804965559365 B/s
     kafka_writer_message_count...: 2      0.010856/s
     kafka_writer_read_timeout....: 0s     min=0s                 max=0s
     kafka_writer_retries_count...: 0      0/s
     kafka_writer_wait_seconds....: avg=0s      min=0s      med=0s      max=0s      p(90)=0s      p(95)=0s     
     kafka_writer_write_count.....: 2      0.010856/s
     kafka_writer_write_seconds...: avg=2.15s   min=1.9s    med=2.15s   max=2.4s    p(90)=2.35s   p(95)=2.37s  
     kafka_writer_write_timeout...: 0s     min=0s                 max=0s
     vus..........................: 1      min=0                  max=1 
     vus_max......................: 1      min=0                  max=1 

DEBU[0199] Waiting for engine processes to finish...    
DEBU[0199] Metrics processing winding down...            component=engine
DEBU[0199] Everything has finished, exiting k6!         
DEBU[0199] Stopping 1 outputs...                         component=output-manager
DEBU[0199] Stopping...                                   component=metrics-engine-ingester
DEBU[0199] Stopped!                                      component=metrics-engine-ingester
@mostafa
Copy link
Owner

mostafa commented Jan 2, 2023

Hey @chandana194,

Can you use duration instead of iteration to see what happens? From what I see, your connection to Kafka works and it successfully lists your topics and the writer is reporting metrics correctly, which means the messages are correctly produced, however, the reader seem to be somehow malfunctioning. It is evident in reader metrics not being printed to the console.

@chandana194
Copy link
Author

Hi @mostafa Thank you for your response.

Yes, As you have mentioned - K6 says the messages are produced successfully but they are not reaching the the kafka topics. We have verified all the logs at kafka side.

I will try changing it to duration and see how it behaves.

@chandana194
Copy link
Author

chandana194 commented Jan 3, 2023

@mostafa Tried changing the iteration to duration, Observed the same behaviour

  1. K6 produced msgs successfully but they are not reaching the the kafka topics
  2. ERRO[0055] Unable to read messages. error="Unable to read messages."
 **data_received................: 0 B    0 B/s**
     **data_sent....................: 0 B    0 B/s**
     iteration_duration...........: avg=143.29µs min=143.29µs med=143.29µs max=143.29µs p(90)=143.29µs p(95)=143.29µs
     kafka_writer_acks_required...: 0      min=0      max=0 
     kafka_writer_async...........: 0.00%  ✓ 0        ✗ 2   
     kafka_writer_attempts_max....: 0      min=0      max=0 
     kafka_writer_batch_bytes.....: 1.3 kB 27 B/s
     kafka_writer_batch_max.......: 1      min=1      max=1 
     kafka_writer_batch_size......: 2      0.040419/s
     kafka_writer_batch_timeout...: 0s     min=0s     max=0s
     kafka_writer_error_count.....: 1      0.02021/s
     kafka_writer_message_bytes...: 2.0 kB 40 B/s
     kafka_writer_message_count...: 3      0.060629/s
     kafka_writer_read_timeout....: 0s     min=0s     max=0s
     kafka_writer_retries_count...: 1      0.02021/s
     kafka_writer_wait_seconds....: avg=0s       min=0s       med=0s       max=0s       p(90)=0s       p(95)=0s      
     kafka_writer_write_count.....: 3      0.060629/s
     kafka_writer_write_seconds...: avg=3.17s    min=2.13s    med=3.17s    max=4.22s    p(90)=4.01s    p(95)=4.11s   
     kafka_writer_write_timeout...: 0s     min=0s     max=0s
     vus..........................: 0      min=0      max=1 
     vus_max......................: 1      min=0      max=1

@mostafa
Copy link
Owner

mostafa commented Jan 3, 2023

@chandana194

  1. As shown by the kafka_writer_error_count metric, you have an error. It also retried once, kafka_writer_retries_count, but to no avail. So, there's something wrong with your configuration.
  2. The data_received and data_sent you highlighted are k6's built-in HTTP metrics, and won't be updated by xk6-kafka.
  3. ERRO[0055] Unable to read messages. error="Unable to read messages." means there is a configuration issue.

@chandana194
Copy link
Author

@mostafa Is there a way which can help me to print that error message ?

@mostafa
Copy link
Owner

mostafa commented Jan 3, 2023

@chandana194 Absolutely!

Just add connectLogger parameter to writer and reader configs and set it to true, so you can see the internal logs of the kafka-go library.

@mostafa mostafa added the ❓ Question Further information is requested label Jan 3, 2023
@chandana194
Copy link
Author

chandana194 commented Jan 4, 2023

Thank you @mostafa It helped me to get the logs. I am getting a timeout issue !

Could you please let me know how can I specify both security protocol ( SASL_SSL ) and security algorithm ( sasl.mechanism=SCRAM-SHA-512)

Currently I am using only below config:

const saslConfig = {
  username: "<username>",
  password: "<pwd>",
  algorithm: SASL_SCRAM_SHA512,
  clientCertPem: "pathto/client.pem",
  serverCaPem: "pathto/serverCa.pem",  
};
INFO[0016] writing 1 messages to pendingprice (partition: 0) 
INFO[0018] initializing kafka reader for partition 0 of  pendingprice starting at offset first offset 
INFO[0023] the kafka reader for partition 0 of  pendingprice is seeking to offset 10238021 
INFO[0023] looking up offset of kafka reader for partition 0 of  pendingprice: first offset 
INFO[0024] the kafka reader got an unknown error reading partition 0 of .pendingprice at offset 10238021: read tcp 172.31.230.127:57108->172.16.49.27:443: i/o timeout 
DEBU[0024] Regular duration is done, waiting for iterations to gracefully finish  executor=constant-vus gracefulStop=30s scenario=default
INFO[0024] initializing kafka reader for partition 0 of  pendingprice starting at offset 10238021 

@mostafa
Copy link
Owner

mostafa commented Jan 4, 2023

@chandana194

AFAIK, when you're using SCRAM, you must enable SSL (via TLS config), which suggests that SCRAM is an authentication mechanism that requires SSL. Also, remove clientCertPem if you don't require mutual TLS auth (mTLS), otherwise you need to provide clientKeyPem as well.

@chandana194
Copy link
Author

chandana194 commented Jan 4, 2023

@mostafa Tried enabling the SSL through TLS config and removed clientCertPem but still stuck at resolving the below timeout issue !

const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: false,
  minVersion: TLS_1_2,
  serverCaPem: "/Users/cnagara/Downloads/serverCa.pem",
  
};

I think there is no issue with SASL and TLS config as k6 is able to connect to the broker and list the existing topics !

console.log(
    "Existing topics: ",
    connection.listTopics(saslConfig, tlsConfig)
  );
INFO[0014] Existing topics:  ["","."]  source=console

INFO[0057] the kafka reader for partition 0 of topic is seeking to offset 10238021 
INFO[0057] the kafka reader got an unknown error reading partition 0 of topic at offset 10238021: **read tcp 172.31.230.127:64955->172.16.49.27:443: i/o timeout** 

@mostafa
Copy link
Owner

mostafa commented Jan 4, 2023

@chandana194
Are you sure about the offset (10238021)?

Honestly, this is hard to reproduce locally, and given we don't have logs from the server-side, it makes it even harder. In the meantime, I added #187.

@chandana194
Copy link
Author

chandana194 commented Jan 4, 2023

@mostafa I even tried with different offset but by default it's taking the same offset
FYI : These topics are Strimzi kafka topics not the confluent kafka topics.

INFO[0014] looking up offset of kafka reader for partition 0 of pexintegration.pendingprice: 18238021
INFO[0016] writing 1 messages to pexintegration.pendingprice (partition: 0) 
INFO[0017] initializing kafka reader for partition 0 of pexintegration.pendingprice starting at offset 18238021 
INFO[0022] the kafka reader for partition 0 of pexintegration.pendingprice is seeking to offset 10238021
INFO[0023] the kafka reader got an unknown error reading partition 0 of pexintegration.pendingprice at offset 10238021: read tcp 172.31.230.127:51358->172.16.49.27:443: i/o timeout 
INFO[0023] initializing kafka reader for partition 0 of pexintegration.pendingprice starting at offset 10238021 

@chandana194
Copy link
Author

chandana194 commented Jan 5, 2023

@mostafa I am able to produce the messages without any issues. The read tcp timeout error is coming up, only when I try to consume the messages .

Tried for both Confluent topic (no SASL/SSL enabled) and Strimzi ( SASL/SSL enabled)
topics.

INFO[0005] writing 1 messages to e.upload.stg (partition: 0) INFO[0005] initializing kafka reader for partition 0 of upload.stg starting at offset first offset DEBU[0006] running thresholds component=metrics-engine metric_name=kafka_writer_error_count INFO[0007] the kafka reader for partition 0 of upload.stg is seeking to offset 1330230 INFO[0007] looking up offset of kafka reader for partition 0 of upload.stg: first offset INFO[0007] the kafka reader got an unknown error reading partition 0 of upload.stg at offset 1330230: read tcp 172.31.230.127:65003->10.148.34.189:9092: i/o timeout DEBU[0008] running thresholds component=metrics-engine metric_name=kafka_writer_error_count

@chandana194
Copy link
Author

chandana194 commented Jan 5, 2023

below is the code I have used to test the confluent topic consumer ( Producer is working fine )


import { check } from "k6";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  CODEC_SNAPPY,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka"; // import kafka extension

const brokers = ["<brokerIP>:9092"];
const topic = "topic";

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  autoCreateTopic: true,
  compression: CODEC_SNAPPY,
  connectLogger: true,
});
const reader = new Reader({
  brokers: brokers,
  topic: topic,
  connectLogger: true,
});
const connection = new Connection({
  address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();

if (__VU == 0) {
  
}


export const options = {
    insecureSkipTlsVerify: true,
  thresholds: {
    kafka_writer_error_count: ["count == 0"],
    kafka_reader_error_count: ["count == 0"],
  },
};

export default function () {
    for (let index = 0; index < 1; index++) {
    let messages = [
      {
        // The data type of the key is JSON
        key: schemaRegistry.serialize({
          data: {
            "txId":"test-0628-EIP-new94653",
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        // The data type of the value is JSON
        value: schemaRegistry.serialize({
          data: {"txId":"test-0628-EIP-new94653","authorId":"3736253","authorType":"oca","source":"EIP","asset":{"name":"test.jpeg","type":"Image","size":"15123","locUrl":"gs://dev/1/test.jpeg"}},
          schemaType: SCHEMA_TYPE_JSON,
        }),
        
         // Will be converted to timestamp automatically
      },
    ];

    writer.produce({ messages: messages });
 
    }
  // Read 10 messages only
  let messages = reader.consume({ limit: 1 });

  check(messages, {
   "1 message is received": (messages) => messages.length == 1, });

  
}

export function teardown(data) {
  if (__VU == 0) {
  
  }
  writer.close();
  reader.close();
  connection.close();
}

@mostafa
Copy link
Owner

mostafa commented Jan 5, 2023

@chandana194

I think I have an idea why this happens. 💡

Currently the maxWait value of the ReaderConfig is set to 200ms by default. This means that the reader gives up looking for new messages after 200ms and your log says it is an i/o timeout. Consider increasing it to see if it helps.

@chandana194
Copy link
Author

chandana194 commented Jan 5, 2023

@chandana194

I think I have an idea why this happens. 💡

Currently the maxWait value of the ReaderConfig is set to 200ms by default. This means that the reader gives up looking for new messages after 200ms and your log says it is an i/o timeout. Consider increasing it to see if it helps.

@mostafa Tried below config but no luck ! still seeing the same error

const reader = new Reader({
brokers: brokers,
topic: topic,
maxWait:30000,
connectLogger: true,
});

@mostafa
Copy link
Owner

mostafa commented Jan 5, 2023

@chandana194
I just remembered that someone reported a similar issue:

At that time I tried to make a quick fix by adding readBatchTimeout option to ReaderConfig, which was newly introduced to kafka-go. I rebased the branch, and opened a PR #188. Can you please fetch that code and rebuild your binary and test it again using that option. For this, follow this guide, and use GitHub cli to quickly fetch the changes in the PR or just switch to the branch using git checkout add-read-batch-timeout before building the binary.

@chandana194
Copy link
Author

chandana194 commented Jan 12, 2023

@mostafa have tried building the new binary as suggested above and still I see the below issue:

Reader config as below:

const reader = new Reader({
  brokers: brokers,
  topic: topic,
  partition: 0,
  offset: -1,
  sasl: saslConfig,
  tls: tlsConfig,
  MaxWait:900000 ,
  groupID: "stage-test-v18",
  ReadBatchTimeout:3000000,
  connectLogger: true
});

one more observation is that, even-though I point partition to 0 it's trying to read from all partitions!

INFO[0005] entering loop for consumer group, stage-test-v18 
INFO[0005] using 'range' balancer to assign group, stage-test-v18
INFO[0005] found member: k6@cnaga (github.com/segmentio/kafka-go)-8a987d62-9c57-4357-b1da-2c9d792b233f/[]byte(nil) 
INFO[0005] found topic/partition: topic.pendingprice/0 
INFO[0005] found topic/partition: topic.pendingprice/5 
INFO[0005] found topic/partition: topic.pendingprice/10 
INFO[0005] found topic/partition: topic.pendingprice/13 --listed all partitions
INFO[0005] assigned member/topic/partitions k6@cnagara-dfb77 (github.com/segmentio/kafka-go)-8a987d62-9c57-4357-b1da-2c9d792b233f/topic.pendingprice/[0 5 10 13 8 2 12 14 9 11 4 1 6 7 3] 
INFO[0005] joinGroup succeeded for response, stage-test-v18.  generationID=1, memberID=k6@cnaga (github.com/segmentio/kafka-go)-8a987d62-9c57-4357-b1da-2c9d792b233f 
INFO[0005] Joined group stage-test-v18 as member k6@cnaga (github.com/segmentio/kafka-go)-8a987d62-9c57-4357-b1da-2c9d792b233f in generation 1 
INFO[0005] Syncing 1 assignments for generation 1 as member k6@cnaga (github.com/segmentio/kafka-go)-8a987d62-9c57-4357-b1da-2c9d792b233f 
INFO[0005] sync group finished for group, stage-test-v18 
INFO[0005] subscribed to topics and partitions: map[{topic:topic.pendingprice partition:0}:-2 
{topic:topic.pendingprice partition:1}:-2 {topic:topic.pendingprice partition:2}:-2 {topic:topic.pendingprice partition:3}:-2 {topic:topic.pendingprice partition:4}:-2 {topic:topic.pendingprice partition:5}:-2 {topic:topic.pendingprice partition:6}:-2 {topic:topic.pendingprice partition:7}:-2 {topic:topic.pendingprice partition:8}:-2 {topic:topic.pendingprice partition:9}:-2 {topic:topic.pendingprice partition:10}:-2 {topic:topic.pendingprice partition:11}:-2 {topic:topic.pendingprice 

INFO[0005] started heartbeat for group, stage-test-v18 [3s] 
INFO[0005] initializing kafka reader for partition 5 of topic.pendingprice starting at offset first offset 
INFO[0005] started commit for group stage-test-v18 

partition:12}:-2 {topic:topic.pendingprice partition:13}:-2 {topic:topic.pendingprice partition:14}:-2] 
INFO[0005] initializing kafka reader for partition 2 of topic.pendingprice starting at offset first offset 
INFO[0005] initializing kafka reader for partition 10 of topic.pendingprice starting at offset first offset 
INFO[0005] initializing kafka reader for partition 9 of topic.pendingprice starting at offset first offset 
INFO[0010] the kafka reader for partition 10 of topic.pendingprice is seeking to offset 2928535 
INFO[0010] the kafka reader for partition 7 of topic.pendingprice is seeking to offset 7170991 
INFO[0010] the kafka reader for partition 14 of topic.pendingprice is seeking to offset 2931028 
.....
INFO[0011] the kafka reader got an unknown error reading partition 7 of topic.pendingprice at offset 7170991: read tcp <ip>:49551-><ip>:443: i/o timeout .9s), 1/1 VUs, 0 complete and 0 interrupted iterations
INFO[0011] the kafka reader got an unknown error reading partition 10 of topic.pendingprice at offset 2928535: read tcp <ip>:49550-><ip>:443: i/o timeout 
.
.
.
ERRO[0021] Unable to read messages.                      error="Unable to read messages."

@mostafa
Copy link
Owner

mostafa commented Jan 12, 2023

@chandana194
There are 2 mistakes in your config:

  1. MaxWait and ReadBatchTimeout should be camel case, that is, maxWait and readBatchTimeout.
  2. groupID and partition is mutually exclusive, that is, you should specify only one of them. If you have one partition, you must specify the partition. Otherwise, if you have a consumer group, then you must specify the groupID.

And as far as I see, the reader seems to initialize and seek on certain partitions to the last offset. However, you added offset -1, which causes the reader to hang, as explained by @andxr in #15 (comment).

@chandana194
Copy link
Author

chandana194 commented Jan 12, 2023

@mostafa Thank you for the quick response. I made the changes as suggested. Tried below options:

  1. passing the offset as -2 along with groupID without partition
  2. passing the exact offset along with single partition
const reader = new Reader({
  brokers: brokers,
  topic: topic,
  partition: 1,
  //maxWait: 300,
  offset: 10508826,
  sasl: saslConfig,
  tls: tlsConfig,
  maxWait:900000 ,
  //groupID: "stage-test-v18",
  readBatchTimeout:3000000,
})

in both the cases I observed the same timeout issue !

INFO[0005] setting the offset of the kafka reader for partition 1 of topic.pendingprice from first offset to 10508826 
INFO[0005] looking up offset of kafka reader for partition 1 of topic.pendingprice: 10508826 
INFO[0009] initializing kafka reader for partition 1 of topic.pendingprice starting at offset 10508826 
INFO[0009] looking up offset of kafka reader for partition 1 of topic.pendingprice: 10508826 
INFO[0014] the kafka reader for partition 1 of topic.pendingprice is seeking to offset 10508826 
INFO[0014] the kafka reader got an unknown error reading partition 1 of topic.pendingprice at offset 10508826: read tcp <ip>:51643-><ip>:443: i/o timeout 4s), 1/1 VUs, 0 complete and 0 interrupted iterations

@mostafa
Copy link
Owner

mostafa commented Jan 12, 2023

@chandana194

Also, if you want to consume from a consumer group, you need to set groupID and groupTopics, and and unset partition and topic. I am currently investigating configuration changes when passing values to kafka-go, to see if I missed setting any default values.

@mostafa
Copy link
Owner

mostafa commented Jan 12, 2023

@chandana194

Can you please test changes in this PR?

Update:
I tested the changes myself and added a working script in scripts/test_consumer_group.js. I observed this issue while running the test script: when there is no data to consume, the reader.consume keeps waiting on messages on each partition of the topic and fails with the Unable to read messages. when it times out.

@chandana194
Copy link
Author

Sure Will try this and update you. FYI when I was testing earlier, I made sure that there were enough messages to consume.

@mostafa mostafa added the 🐛 Bug Something isn't working label Jan 12, 2023
@mostafa mostafa self-assigned this Jan 12, 2023
@mostafa mostafa moved this to Doing in xk6-kafka Jan 12, 2023
@chandana194
Copy link
Author

chandana194 commented Jan 17, 2023

Sure Will try this and update you. FYI when I was testing earlier, I made sure that there were enough messages to consume.

@mostafa Have tried with this branch.. but no luck ... getting the same error !

@mostafa
Copy link
Owner

mostafa commented Jan 17, 2023

@chandana194
Have you looked at the example script I added to the PR? I managed to make it work with that script.

@manishsaini7
Copy link

manishsaini7 commented Jan 17, 2023

Hey @mostafa, I am also getting the same error.
My Script is:

import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_STRING,
  SASL_PLAIN,
  TLS_1_2,
} from "k6/x/kafka"; // import kafka extension

const brokers = ["uri:25073"];
const topic = "xk6_kafka_json_topic";

// SASL config 
const saslConfig = {
  username: "username",
  password: "password",
  algorithm: SASL_PLAIN,
};

// TLS config 
const tlsConfig = {
  // Enable/disable TLS (default: false)
  enableTls: true,
  // Skip TLS verification if the certificate is invalid or self-signed (default: false)
  insecureSkipTlsVerify: true,

  minVersion: TLS_1_2,

  // Only needed if you have a custom or self-signed certificate and keys
  // clientCertPem: "/dbaas/kafka/ca-certificate.crt",
  // clientKeyPem: "/path/to/your/client-key.pem",
   serverCaPem: "dbaas/kafka/ca-certificate.crt",
};

const offset = 0;
const partition = 0;

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  sasl: saslConfig,
  tls: tlsConfig,
  connectLogger: true,
});
const reader = new Reader({
  brokers: brokers,
  topic: topic,
  partition: partition,
  offset: offset,
  sasl: saslConfig,
  tls: tlsConfig,
  connectLogger: true,
  maxWait: 900000,
});
const connection = new Connection({
  address: brokers[0],
  sasl: saslConfig,
  tls: tlsConfig,
});

const schemaRegistry = new SchemaRegistry();
// Can accept a SchemaRegistryConfig object

if (__VU == 0) {
  // Create a topic on initialization (before producing messages)
  connection.createTopic({
    // TopicConfig object
    topic: topic,
  });
}

export default function () {
    // Fetch the list of all topics
    const topics = connection.listTopics();
    console.log(topics); // list of topics
  
    // Produces message to Kafka
    writer.produce({
      // ProduceConfig object
      messages: [
        // Message object(s)
        {
          key: schemaRegistry.serialize({
            data: "my-key",
            schemaType: SCHEMA_TYPE_STRING,
          }),
          value: schemaRegistry.serialize({
            data: "my-value",
            schemaType: SCHEMA_TYPE_STRING,
          }),
        },
      ],
    });
  
    // Consume messages from Kafka
    let messages = reader.consume({
      // ConsumeConfig object
      limit: 10,
    });
  
    // your messages
    console.log(messages);
    let deserializedValue = schemaRegistry.deserialize({
      data: messages[0].value,
      schemaType: SCHEMA_TYPE_STRING,
    });
  }

export function teardown(data) {
  if (__VU == 0) {
    // Delete the topic
    connection.deleteTopic(topic);
  }
  writer.close();
  reader.close();
  connection.close();
}

Logs from k6

          /\      |??| /??/   /??/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ??\  
   /          \   |  |\  \ |  (?)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: /kafka/produce_consume_message.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 40s max duration (incl. graceful stop):
           * default: 1 looping VUs for 10s (gracefulStop: 30s)

INFO[0013] ["xk6_kafka_json_topic"]                      source=console
INFO[0015] writing 1 messages to xk6_kafka_json_topic (partition: 0) 
INFO[0017] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset first offset 
INFO[0022] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0022] looking up offset of kafka reader for partition 0 of xk6_kafka_json_topic: first offset 
INFO[0022] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51727->x.x.x.x:25073: i/o timeout 
INFO[0022] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 
INFO[0027] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0028] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51729->x.x.x.x:25073: i/o timeout 
INFO[0028] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 
INFO[0032] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0033] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51731->x.x.x.x:25073: i/o timeout 
INFO[0033] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 
INFO[0037] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0038] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51737->x.x.x.x:25073: i/o timeout 
INFO[0038] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 
INFO[0042] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0043] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51739->x.x.x.x:25073: i/o timeout 
INFO[0043] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 
INFO[0048] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0048] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51741->x.x.x.x:25073: i/o timeout 
INFO[0049] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 
ERRO[0053] Unable to read messages.                      error="Unable to read messages."
INFO[0053] the kafka reader for partition 0 of xk6_kafka_json_topic is seeking to offset 0 
INFO[0054] the kafka reader got an unknown error reading partition 0 of xk6_kafka_json_topic at offset 0: read tcp 10.254.15.12:51743->x.x.x.x:25073: i/o timeout 
INFO[0054] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 

running (48.8s), 0/1 VUs, 0 complete and 1 interrupted iterations
default ? [======================================] 1 VUs  10s
WARN[0057] No script iterations finished, consider making the test duration longer 
INFO[0058] error initializing the kafka reader for partition 0 of xk6_kafka_json_topic: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker 
INFO[0058] initializing kafka reader for partition 0 of xk6_kafka_json_topic starting at offset 0 

     ¦ teardown

     data_received................: 0 B   0 B/s
     data_sent....................: 0 B   0 B/s
     iteration_duration...........: avg=281.79ms min=281.79ms med=281.79ms max=281.79ms p(90)=281.79ms p(95)=281.79ms
     kafka_writer_acks_required...: 0     min=0                  max=0 
     kafka_writer_async...........: 0.00% ? 0                    ? 1   
     kafka_writer_attempts_max....: 0     min=0                  max=0 
     kafka_writer_batch_bytes.....: 36 B  0.7369566955647858 B/s
     kafka_writer_batch_max.......: 1     min=1                  max=1 
     kafka_writer_batch_size......: 1     0.020471/s
     kafka_writer_batch_timeout...: 0s    min=0s                 max=0s
     kafka_writer_error_count.....: 0     0/s
     kafka_writer_message_bytes...: 36 B  0.7369566955647858 B/s
     kafka_writer_message_count...: 1     0.020471/s
     kafka_writer_read_timeout....: 0s    min=0s                 max=0s
     kafka_writer_retries_count...: 0     0/s
     kafka_writer_wait_seconds....: avg=0s       min=0s       med=0s       max=0s       p(90)=0s       p(95)=0s      
     kafka_writer_write_count.....: 1     0.020471/s
     kafka_writer_write_seconds...: avg=1.68s    min=1.68s    med=1.68s    max=1.68s    p(90)=1.68s    p(95)=1.68s   
     kafka_writer_write_timeout...: 0s    min=0s                 max=0s
     vus..........................: 0     min=0                  max=1 
     vus_max......................: 1     min=0                  max=1 

@mostafa
Copy link
Owner

mostafa commented Jan 17, 2023

Hey @manishsaini7,

Have you tried compiling and using the changes in the PR #189? Also, you're just producing a single message, yet you're waiting to receive 10 messages, which causes the consumer to wait forever, until it times out.

@chandana194
Copy link
Author

@chandana194 Have you looked at the example script I added to the PR? I managed to make it work with that script.

@mostafa Tried with the latest script you have shared. I am able to consume the messages now :) but one observation below

INFO[0019] committed offsets for group stage-test-v18: 
running topic: pexintegration.pendingpriced 0 interrupted iterations
default   [-----partition 11: 2998881 -----------] 1 VUs  00m11.3s/10m0s  0/1 iters, 1 per VU
INFO[0020] committed offsets for group stage-test-v18: 
running topic: pexintegration.pendingpriced 0 interrupted iterations
default   [-----partition 11: 2998882 -----------] 1 VUs  00m11.6s/10m0s  0/1 iters, 1 per VU

there are multiple partitions and I tried to consume only 2 messages.
from the above logs I can say that the Messages were consumed at partition 11 with the above offset, but in the k6 results the offset values are out of sync highlighted below. Please correct me if my understanding is wrong !

✓ 2 messages is received

     █ teardown

     checks.........................: 100.00% ✓ 1         ✗ 0        
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=10.79s   min=9.99s    med=10.79s   max=11.6s    p(90)=11.44s   p(95)=11.52s  
     iterations.....................: 1       0.039389/s
     kafka_reader_dial_count........: 15      0.590836/s
     kafka_reader_dial_seconds......: avg=3.94s    min=3.94s    med=3.94s    max=3.94s    p(90)=3.94s    p(95)=3.94s   
   ✓ kafka_reader_error_count.......: 0       0/s
     kafka_reader_fetch_bytes.......: 7.0 kB  276 B/s
     kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000
     kafka_reader_fetch_bytes_min...: 1       min=1       max=1      
     kafka_reader_fetch_size........: 3       0.118167/s
     kafka_reader_fetch_wait_max....: 10s     min=10s     max=10s    
     kafka_reader_fetches_count.....: 30      1.181671/s
     kafka_reader_lag...............: 0       min=0       max=0      
     kafka_reader_message_bytes.....: 33 kB   1.3 kB/s
     kafka_reader_message_count.....: 51      2.008841/s
     **kafka_reader_offset............: 7240251 min=7240251 max=7240251**
     kafka_reader_queue_capacity....: 100     min=100     max=100    
     kafka_reader_queue_length......: 49      min=49      max=49     
     kafka_reader_read_seconds......: avg=74.03µs  min=74.03µs  med=74.03µs  max=74.03µs  p(90)=74.03µs  p(95)=74.03µs 
     kafka_reader_rebalance_count...: 1       0.039389/s
     kafka_reader_timeouts_count....: 0       0/s
     kafka_reader_wait_seconds......: avg=255.59ms min=255.59ms med=255.59ms max=255.59ms p(90)=255.59ms p(95)=255.59ms

@mostafa
Copy link
Owner

mostafa commented Jan 18, 2023

@chandana194
Good catch! Because offset should be set to -1 when consuming from a group. Its current value seems to be the last offset of the partition. 🤔 Also, you should have received an error with this message: Unable to set offset, yet returning the reader..

@chandana194
Copy link
Author

Unable to set offset, yet returning the reader

Have just went through the complete logs and don't see the above error message !

@mostafa
Copy link
Owner

mostafa commented Jan 18, 2023

@chandana194 Are you consuming from a consumer group, that is, using a consumer ID?

@chandana194
Copy link
Author

Yes I am consuming using a consumer ID
const brokers = [":443"];
const topic = "topic-name";
const groupID = "stage-test-v18";

@mostafa
Copy link
Owner

mostafa commented Jan 18, 2023

@chandana194

Then you shouldn't care about that offset. I'll fix it. And I think the issue is fixed by now.

@chandana194
Copy link
Author

@mostafa Could you please let me know if there is any ETA on releasing this fix to main branch ?

@mostafa
Copy link
Owner

mostafa commented Jan 23, 2023

@chandana194 I can release it right away if you confirm it works.

@chandana194
Copy link
Author

Yes @mostafa consuming messages through group ID's is working fine now.

@mostafa
Copy link
Owner

mostafa commented Jan 23, 2023

@chandana194 Awesome!

The changes are released in v0.16.1. I'll close this ticket, as I consider this fixed. If you have further questions, feel free to reopen this issue or create a new.

@mostafa mostafa closed this as completed Jan 23, 2023
@github-project-automation github-project-automation bot moved this from Doing to Test in xk6-kafka Jan 23, 2023
@chandana194
Copy link
Author

@mostafa One issue have observed with the latest version is that, when I test with more number of users( like 50 Vu's for 30 seconds ), the k6 summary report is not printing the message count/metrics properly. From the backend I see that more than 10K messages are produced, but summary shows very small number - kafka_writer_message_count...: 1931 78.300207/s

running (24.7s), 00/50 VUs, 50 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  1s

     █ teardown

     data_received................: 0 B    0 B/s
     data_sent....................: 0 B    0 B/s
     iteration_duration...........: avg=12.15s min=6.93s med=12.36s max=13.15s p(90)=13s   p(95)=13.01s
     iterations...................: 50     2.027452/s
   ✓ kafka_reader_error_count.....: 0      0/s
     kafka_writer_acks_required...: 0      min=0       max=0 
     kafka_writer_async...........: 0.00%  ✓ 0         ✗ 50  
     kafka_writer_attempts_max....: 0      min=0       max=0 
     kafka_writer_batch_bytes.....: 33 kB  1.3 kB/s
     kafka_writer_batch_max.......: 1      min=1       max=1 
     kafka_writer_batch_size......: 50     2.027452/s
     kafka_writer_batch_timeout...: 0s     min=0s      max=0s
   ✗ kafka_writer_error_count.....: 431    17.476639/s
     kafka_writer_message_bytes...: 1.3 MB 52 kB/s
     kafka_writer_message_count...: 1931   78.300207/s
     kafka_writer_read_timeout....: 0s     min=0s      max=0s
     kafka_writer_retries_count...: 431    17.476639/s
     kafka_writer_wait_seconds....: avg=0s     min=0s    med=0s     max=0s     p(90)=0s    p(95)=0s    
     kafka_writer_write_count.....: 1931   78.300207/s
     kafka_writer_write_seconds...: avg=2.83s  min=2.54s med=2.85s  max=3.1s   p(90)=3.01s p(95)=3.04s 
     kafka_writer_write_timeout...: 0s     min=0s      max=0s
     vus..........................: 0      min=0       max=50
     vus_max......................: 50     min=0       max=50

@mostafa
Copy link
Owner

mostafa commented Jan 25, 2023

@chandana194 You have too many errors:
✗ kafka_writer_error_count.....: 431 17.476639/s

@chandana194
Copy link
Author

chandana194 commented Jan 25, 2023

@chandana194 You have too many errors: ✗ kafka_writer_error_count.....: 431 17.476639/s

@mostafa Both failed and success count is not matching the message count that got produced in the backend. Could see a huge difference !
As I mentioned the test produced >10K messages which I can validate at kafka.

Also, I would like to know if there is way to control the number of messages to be produced/second

@mostafa
Copy link
Owner

mostafa commented Jan 25, 2023

@chandana194
That's strange! Of course you can control it. I just used for-loops to demonstrate that you can generate lots of messages before sending them to Kafka, but it stuck as de-facto way to write xk6-kafka scripts and actually use xk6-kafka. You can simply generate one message per VU per iteration, and that's totally fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐛 Bug Something isn't working ❓ Question Further information is requested
Projects
Status: Release
Development

No branches or pull requests

3 participants