Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the kafka reader got an unknown error reading partition #726

Open
nsomarouthu opened this issue Aug 25, 2021 · 16 comments
Open

the kafka reader got an unknown error reading partition #726

nsomarouthu opened this issue Aug 25, 2021 · 16 comments
Assignees
Labels

Comments

@nsomarouthu
Copy link

nsomarouthu commented Aug 25, 2021

First issue

we're often getting a reader error ~500k/day.

the kafka reader got an unknown error reading partition 9 of SOME_TOPIC at offset 3: read tcp IP_ADDRESS:46406->IP_ADDRESS:9093: i/o timeout

kafka-go/reader.go

Lines 1366 to 1375 in a4890bd

default:
if _, ok := err.(Error); ok {
r.sendError(ctx, err)
} else {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err)
})
r.stats.errors.observe(1)
conn.Close()
break readLoop

Kafka-go 0.4.16

Kafka 2.5.0

Second issue

The errors happened when kafka reader is committing the message after it has been processed successfully, the message was re-consumed by another replica

"msg": "debezium.Consumer: failed to commit message: write tcp IP_ADDRESS:49610->IP_ADDRESS:9093: use of closed network connection"
Receiving Successfully handled message and a while afterward getting failed to commit message

@achille-roussel
Copy link
Contributor

Hello @nsomarouthu, thanks for reporting the issue.

Do you know whether these errors are causing adverse effect on your application? Or are they simply transient errors that are handled and recovered by kafka-go automatically?

I/O timeouts seem like frequent problems in network systems, so I would not be too concerned about it, but let me know if this is causing any other issues!

@nsomarouthu
Copy link
Author

Hello @achille-roussel, If there is any setting in kafka.ReaderConfig to reduce the number of errors that we are experiencing currently with our configuration. Reader exit the loops when reporting this issue and reconnect. This doesn't impact the reading functionality of the message, but this seems expensive.

@chihweichang
Copy link

chihweichang commented Sep 2, 2021

@achille-roussel I am in the same team/company as @nsomarouthu, working together on the 2 issues.
For the 1st issue, we have many the kafka reader got an unknown error reading partition as well as numerous no messages received from kafka within the allocated time for partition logged. I believe this is because our applications set ReaderConfig.MaxWait https://github.com/segmentio/kafka-go/blob/master/reader.go#L395 to 100ms. There are some messages that we want the application to fetch and handle as soon as possible. Do you see any concern with setting a short ReaderConfig.MaxWait? Does ReaderConfig.MaxWait duration impact the expect message consumption rate? (We only need a short ReaderConfig.MaxWait for some of our apps, not all of our apps).

For the 2nd issue, we have a few error messages about 5 seconds after finished handling and trying to commit the Kafka message with error such as write tcp 100.96.92.51:52652->100.67.54.193:9093: use of closed network connection . I believe this is related to ConsumerGroupConfig.Timeout (default to 5s) https://github.com/segmentio/kafka-go/blob/master/consumergroup.go#L161
Is there a way to set a different (longer) value when creating a NewReader with a ReaderConfig? It does not look like it is configurable here: https://github.com/segmentio/kafka-go/blob/master/reader.go#L704

@achille-roussel
Copy link
Contributor

A MaxWait for 100ms definitely seems short. Typically, I've seen this value configured to between 250ms and a couple of seconds. Kafka is designed for batch processing, clients use long-polling to fetch batches of messages, which is what MaxWait configures. In my experience, Kafka can be complex to configure well to achieve low latency.

For you second question, it appears we are not exposing this configuration option when creating the ConsumerGroup internally in the Reader. Would you be able to contribute a change to add this option?

@chihweichang
Copy link

@achille-roussel thanks for the feedback. We will test tuning of MaxWait (likely in combination of MinBytes and MaxBites) in our applications.

We can also test configuring ConsumerGroupConfig.Timeout on our end to see if that reduces the errors we are seeing. If it works, we can contribute to making this option configurable.

@achille-roussel
Copy link
Contributor

Hello @chihweichang, how did your investigation go? Are there any follow ups we should discuss here?

@lujiacn
Copy link

lujiacn commented Nov 11, 2021

same error message when reading large data, server side client reader no issue, local dev environment reader cannot read the large data with error:
the kafka reader got an unknown error reading partition .... : i/o timeout
Found the solution, need to update the code in reader.go, there is an safeTimeout which set 10 sec, after updated to 60 sec, no issue any more

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
	r.stats.fetches.observe(1)
	r.stats.offset.observe(offset)

	t0 := time.Now()
	conn.SetReadDeadline(t0.Add(r.maxWait))

	batch := conn.ReadBatchWith(ReadBatchConfig{
		MinBytes:       r.minBytes,
		MaxBytes:       r.maxBytes,
		IsolationLevel: r.isolationLevel,
	})
	highWaterMark := batch.HighWaterMark()

	t1 := time.Now()
	r.stats.waitTime.observeDuration(t1.Sub(t0))

	var msg Message
	var err error
	var size int64
	var bytes int64

	const safetyTimeout = 60 * time.Second // changed from 10 to 60
	deadline := time.Now().Add(safetyTimeout)
	conn.SetReadDeadline(deadline)

@lujiacn
Copy link

lujiacn commented Nov 11, 2021

And not sure the meaning of code below

	const safetyTimeout = 60 * time.Second // changed from 10 to 60
	deadline := time.Now().Add(safetyTimeout)
	conn.SetReadDeadline(deadline)

it overwrite the deadline which been set before and cannot be configured with MaxWait in Reader configure. Can we remove the code above? Refer to commented code below

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
	r.stats.fetches.observe(1)
	r.stats.offset.observe(offset)

	t0 := time.Now()
	conn.SetReadDeadline(t0.Add(r.maxWait))

	batch := conn.ReadBatchWith(ReadBatchConfig{
		MinBytes:       r.minBytes,
		MaxBytes:       r.maxBytes,
		IsolationLevel: r.isolationLevel,
	})
	highWaterMark := batch.HighWaterMark()

	t1 := time.Now()
	r.stats.waitTime.observeDuration(t1.Sub(t0))

	var msg Message
	var err error
	var size int64
	var bytes int64

	//const safetyTimeout = 10 * time.Second
	//deadline := time.Now().Add(safetyTimeout)
	//conn.SetReadDeadline(deadline)

	for {
		//if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
		//deadline = now.Add(safetyTimeout)
		//conn.SetReadDeadline(deadline)
		//}

		if msg, err = batch.ReadMessage(); err != nil {
			batch.Close()
			break
		}

		n := int64(len(msg.Key) + len(msg.Value))
		r.stats.messages.observe(1)
		r.stats.bytes.observe(n)

		if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
			batch.Close()
			break
		}

		offset = msg.Offset + 1
		r.stats.offset.observe(offset)
		r.stats.lag.observe(highWaterMark - offset)

		size++
		bytes += n
	}

	conn.SetReadDeadline(time.Time{})

	t2 := time.Now()
	r.stats.readTime.observeDuration(t2.Sub(t1))
	r.stats.fetchSize.observe(size)
	r.stats.fetchBytes.observe(bytes)
	return offset, err
}

@chihweichang
Copy link

Hello @chihweichang, how did your investigation go? Are there any follow ups we should discuss here?

@achille-roussel sorry, I haven't had time to investigate further on this issue.

@achille-roussel
Copy link
Contributor

Hello @chihweichang!

I wanted to follow up on this issue and ask if you were able to investigate it further.


@lujiacn thanks for following up with suggested changes.

The 10s safeguard is definitely very opinionated, it would be better for this value to be configurable. Would you be available to send a pull request to support tuning this timeout?

@mostafa
Copy link

mostafa commented Oct 11, 2022

One of the users of my project is experiencing the same issue and I don't know what's causing the issue.

@nsomarouthu @chihweichang Have you been able to pinpoint the issue here?

@moogacs You seem to have fixed the issue on your fork. Is there anything you can add here?

@moogacs
Copy link
Contributor

moogacs commented Oct 11, 2022

@mostafa, the issue has been detected and indeed allowing to configure the reads timeout will fix is so the user of the lib will be able to adjust the read timeouts based on their needs.

atm it's hard coded to 10 sec. and increasing that will resolve it.

So I am waiting in approving my PR and merging it

@moogacs
Copy link
Contributor

moogacs commented Oct 11, 2022

@achille-roussel i think can help in peroritizing #989

@mostafa
Copy link

mostafa commented Oct 11, 2022

@achille-roussel I saw your review on @moogacs's PR, #989. What does it take to merge that PR? Is there anything missing? Can I help in any way?

achille-roussel pushed a commit that referenced this issue Oct 14, 2022
* #726 remove reader safetyTimeout

* add configuration

* update docs and add default

* update comment

* add read deadline to every itration

* rename to ReadBatchTimeout
@achille-roussel
Copy link
Contributor

#989 has been merged, let me know if you are still experiencing the issue on the latest version of kafka-go!

@mostafa
Copy link

mostafa commented Jan 12, 2023

@achille-roussel
One of the users of xk6-kafka (and kafka-go) is still experiencing the issue: mostafa/xk6-kafka#185.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants