Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] New Partiions created will cause Flink job failover when partition discovery is enabled while scanning #288

Open
1 of 2 tasks
luoyuxia opened this issue Dec 30, 2024 · 5 comments
Assignees
Labels
bug Something isn't working component=flink

Comments

@luoyuxia
Copy link
Collaborator

luoyuxia commented Dec 30, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.5.0

Minimal reproduce step

Create a partitioned table and a Flink job to subscribe it.. After new partitions is created, it may well throw

Caused by: com.alibaba.fluss.exception.FlussRuntimeException: Leader not found for table bucket: TableBucket{tableId=0, partitionId=60, bucket=16}
	at com.alibaba.fluss.client.metadata.MetadataUpdater.leaderFor(MetadataUpdater.java:122)
	at com.alibaba.fluss.client.admin.FlussAdmin.prepareListOffsetsRequests(FlussAdmin.java:295)
	at com.alibaba.fluss.client.admin.FlussAdmin.listOffsets(FlussAdmin.java:253)
	at com.alibaba.fluss.connector.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl.listOffsets(BucketOffsetsRetrieverImpl.java:72)
	at com.alibaba.fluss.connector.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl.latestOffsets(BucketOffsetsRetrieverImpl.java:49)
	at com.alibaba.fluss.connector.flink.source.enumerator.initializer.LatestOffsetsInitializer.getBucketOffsets(LatestOffsetsInitializer.java:38)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.getLogSplit(FlinkSourceEnumerator.java:449)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.initLogTablePartitionSplits(FlinkSourceEnumerator.java:356)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.initPartitionedSplits(FlinkSourceEnumerator.java:348)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.lambda$checkPartitionChanges$2(FlinkSourceEnumerator.java:296)
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)

It'll cause job failover

What doesn't meet your expectations?

It's in the expectation that throws an exception since the leader may well still not be elected although partition has been created...Although it'll running normally after failover, we'd better to handle this cause to avoid failover again and again in every partition created.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@luoyuxia luoyuxia added bug Something isn't working component=flink labels Dec 30, 2024
@luoyuxia
Copy link
Collaborator Author

luoyuxia commented Jan 9, 2025

Seems main branch has introduced a try machesim with max try time 5... It may be still too short to wait leader ready if response of update metada return quickly..

Let me check whether the problem can still happen in latest branch..
Btw, I think we should follow kafak's desgin which will retry foerver if LeaderNotAvailableException is thrown..

@luoyuxia
Copy link
Collaborator Author

It turns out the exception can still happen:

Caused by: com.alibaba.fluss.exception.FlussRuntimeException: Leader not found after retry  5 times for table bucket: TableBucket{tableId=0, partitionId=2, bucket=23}
	at com.alibaba.fluss.client.metadata.MetadataUpdater.leaderFor(MetadataUpdater.java:135)
	at com.alibaba.fluss.client.admin.FlussAdmin.prepareListOffsetsRequests(FlussAdmin.java:295)
	at com.alibaba.fluss.client.admin.FlussAdmin.listOffsets(FlussAdmin.java:253)
	at com.alibaba.fluss.connector.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl.listOffsets(BucketOffsetsRetrieverImpl.java:72)
	at com.alibaba.fluss.connector.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl.latestOffsets(BucketOffsetsRetrieverImpl.java:49)
	at com.alibaba.fluss.connector.flink.source.enumerator.initializer.LatestOffsetsInitializer.getBucketOffsets(LatestOffsetsInitializer.java:38)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.getLogSplit(FlinkSourceEnumerator.java:449)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.initLogTablePartitionSplits(FlinkSourceEnumerator.java:356)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.initPartitionedSplits(FlinkSourceEnumerator.java:348)
	at com.alibaba.fluss.connector.flink.source.enumerator.FlinkSourceEnumerator.lambda$checkPartitionChanges$2(FlinkSourceEnumerator.java:296)
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
	... 7 more

@luoyuxia
Copy link
Collaborator Author

Btw, I think we should backoff when try again..

@luoyuxia
Copy link
Collaborator Author

cc @swuferhong @loserwang1024

@loserwang1024
Copy link
Collaborator

I wonder it will also happens if a leader is lost and the new leader is not be decided? It seems kafka add triable to ListOffsets in https://issues.apache.org/jira/browse/KAFKA-14821 in kafka 3.5.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working component=flink
Projects
None yet
Development

No branches or pull requests

3 participants