Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] TableChangeWatcher will fire CreatePartitionEvent after restart although the partition has been created #333

Open
1 of 2 tasks
luoyuxia opened this issue Jan 21, 2025 · 4 comments
Labels
bug Something isn't working good first issue Good for newcomers
Milestone

Comments

@luoyuxia
Copy link
Collaborator

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main

Minimal reproduce step

Create a partition table, and restart coordinator server

What doesn't meet your expectations?

It'll fire CreatePartitionEvent and CoordinatorEventProcessor will process it again which cause the state mess.

Anything else?

We use TablePathChangeListener which implements CuratorCacheListener to listen zk node changes, but found during restarting, it'll load all nodes into cache which will fire method event(Type type, ChildData oldData, ChildData newData). We haven't figured it out and mistak it as a new node is created which cause the CreatePartitionEvent fired for an already existing zk node..

I'd like to suggest to rewrite method void initialized() to know the cache is initialized and only process the events after initialized..

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@luoyuxia luoyuxia added bug Something isn't working good first issue Good for newcomers labels Jan 21, 2025
@luoyuxia luoyuxia added this to the v0.6 milestone Jan 21, 2025
@Jeremy-Suon
Copy link

Hold on a second I will try to fix your code

@Jeremy-Suon
Copy link

请稍等,谢谢

@Jeremy-Suon
Copy link

//方法一:重写 initialized() 方法
public class TablePathChangeListener extends CuratorCacheListener {
private volatile boolean initialized = false;

// 初始化方法
public void initializeCache() {
    // 加载所有节点到缓存
    loadNodesToCache();
    // 设置初始化标志为 true
    this.initialized = true;
}

@Override
public void event(Type type, ChildData oldData, ChildData newData) {
    // 如果未初始化,直接返回
    if (!initialized) {
        return;
    }

    // 正常处理事件逻辑
    switch (type) {
        case NODE_ADDED:
            handleNodeAdded(newData);
            break;
        case NODE_CHANGED:
            handleNodeChanged(oldData, newData);
            break;
        case NODE_DELETED:
            handleNodeDeleted(oldData);
            break;
        default:
            break;
    }
}

private void loadNodesToCache() {
    // 加载节点到缓存的逻辑
}

private void handleNodeAdded(ChildData newData) {
    // 处理新增节点
}

private void handleNodeChanged(ChildData oldData, ChildData newData) {
    // 处理节点变更
}

private void handleNodeDeleted(ChildData oldData) {
    // 处理节点删除
}

}

@luoyuxia
Copy link
Collaborator Author

@Jeremy-Suon Seems you're insterested in fixing this? If so, I can assign the ticket to you~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

2 participants