Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add brpop cmd #48

Open
wants to merge 13 commits into
base: unstable
Choose a base branch
from
Open

feat: add brpop cmd #48

wants to merge 13 commits into from

Conversation

SamuelSze1
Copy link
Collaborator

@SamuelSze1 SamuelSze1 commented Sep 21, 2024

Summary by CodeRabbit

  • New Features

    • Added blocking list pop commands (BLPOP and BRPOP)
    • Introduced mechanism for managing blocked client connections
    • Implemented periodic cleanup of expired blocked connections
  • Improvements

    • Enhanced concurrency control for list operations
    • Added thread-safe connection management
    • Improved lock management across storage and command handling
    • Introduced a method for popping elements from a list without acquiring a lock
  • Bug Fixes

    • Refined key management during list and rename operations
    • Added expiration checks for blocked connections

Copy link

coderabbitai bot commented Sep 21, 2024

Walkthrough

This pull request introduces significant enhancements to the handling of blocking list operations in the KiwiDB system, specifically implementing the BLPOP and BRPOP commands. The changes encompass multiple files, adding new classes, methods, and data structures to manage blocked connections, handle expiration, and ensure thread-safe operations. Key functionalities include methods for blocking clients, serving and unblocking connections, and cleaning up expired nodes, all aimed at improving the efficiency and robustness of list operations in a multi-threaded environment.

Changes

File Changes
src/base_cmd.h - Added BlockedConnNode class
- Added BlockKey and BlockKeyHash structs
- Added kCmdNameBLPop and kCmdNameBRPop constants
src/base_cmd.cc - Implemented BlockThisClientToWaitLRPush
- Implemented ServeAndUnblockConns
- Added IsExpired method to BlockedConnNode
src/cmd_list.h - Added BLPopCmd and BRPopCmd classes
src/cmd_list.cc - Implemented blocking list pop command logic
- Added ServeAndUnblockConns calls to various list commands
src/kiwi.h - Added key_to_blocked_conns_ map
- Added block_mtx_ shared mutex
- Added methods for managing blocked connections
src/kiwi.cc - Implemented ScanEvictedBlockedConnsOfBlrpop
- Implemented CleanBlockedNodes
- Added timer task for periodic blocked connection cleanup
src/cmd_table_manager.cc - Registered BLPop and BRPop commands
src/storage/include/storage/storage.h - Added GetLockMgr method
- Added lock_mgr_ member variable
src/storage/src/redis.cc - Updated Redis constructor to accept lock_mgr parameter
src/storage/src/redis.h - Updated Redis constructor signature
- Added RPopWithoutLock method
src/storage/src/redis_lists.cc - Added RPopWithoutLock method
src/storage/src/storage.cc - Added RPopWithoutLock method
- Updated Open method to include lock_mgr_

Possibly related PRs

  • style: some keywords begins with 'k' #155: This PR is unrelated to the main PR as it focuses on style changes in the CMake configuration and does not involve any modifications to the functionality of the BaseCmd or BlockedConnNode classes.

Poem

🐰 In the land of lists, we hop and play,
Blocking and popping, in a thread-safe way.
With timers and mutexes, we keep it neat,
A rabbit's delight, where connections meet!
Hooray for the changes, let the data flow,
In KiwiDB's garden, watch our features grow! 🌱✨

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@SamuelSze1
Copy link
Collaborator Author

增加blpop和brpop指令,步骤为下
1.首先尝试直接lpop或rpop,如果list中有数据就直接返回,否则进入下一步
2.调用BlockThisClientToWaitLRPush将client放入key_to_blocked_conns_中对应的key的阻塞队列中
3.在执行向list中添加元素的cmd后调用ServeAndUnblockConns,将list的元素根据key_to_blocked_conns_中对应的key的阻塞队列的顺序执行对应的lpop或rpop并把结果发回给client
4.注册了一个线程每隔250ms调用ScanEvictedBlockedConnsOfBlrpop,遍历所有key_to_blocked_conns_并删除已经超时的或被执行的链接并返回给client
5.因为blpop只能可以输入多个list作为参数,但最后只能返回一个list的元素,所以在BlockedConnNode中有一个std::shared_ptr<std::atomic>,在BlockThisClientToWaitLRPush中会给对应同一个bpop指令的BlockedConnNode一个指向同一个原子变量的指针,当一个BlockedConnNode被从队列中取出来的时候会把这个变量设为true,阻止其他BlockedConnNode被执行

src/base_cmd.cc Outdated
@@ -106,6 +106,91 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, PClient* client,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockThisClientToWaitLRPush 这个方法没有看到哪里有调用

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看 pika 那边用到了这个方法, 可以补齐

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@浩林

@luky116 luky116 changed the title feat: Add bpop cmd feat: add bpop cmd Nov 16, 2024
Copy link
Collaborator

@lqxhub lqxhub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为PClient相关代码改动了, 所以这里提示冲突了,需要解决一下冲突

src/base_cmd.cc Outdated
continue;
}

PClient* BlockedClient = (*conn_blocked).GetBlockedClient();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

指针强转用 reinterpret_cast

return false;
}
auto now = std::chrono::system_clock::now();
int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断是否过期, 从调用 IsExpired 的地方传递一个 时间戳过来, 会不会更好,或者提供一个函数重载,可以通过外部传入时间戳, 避免在多次调用时,重复获取当前时间

src/base_cmd.h Outdated
private:
Type type_;
int64_t expire_time_;
PClient* client_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

尽量使用 std::week_ptr 因为在 client由网络层持有, 并且管理声明周期, 如果client被释放,但是这里还保存了原来的地址, 这里的 client* 就成了野指针, 调用会出错.

src/base_cmd.h Outdated Show resolved Hide resolved
src/kiwi.cc Outdated
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
} else {
conn_node++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++conn_node; 会不会更好, 还有上面两个 if 分支里 不需要 ++ 吗

// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的错误码能兼容redis吗

src/base_cmd.h Outdated Show resolved Hide resolved
};
struct BlockKeyHash {
std::size_t operator()(const BlockKey& k) const {
return std::hash<int>{}(k.db_id) ^ std::hash<std::string>{}(k.key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里依然会出现hash碰撞的情况吧,只是概率大小的问题

read_latch.unlock();

std::unique_lock<std::shared_mutex> write_lock(g_kiwi->GetBlockMtx());
auto& waitting_list = it->second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto waitting_list = it->second; 这样写就行了吧, 不需要加 引用, 或者加了 引用有什么特殊作用吗

src/base_cmd.cc Outdated Show resolved Hide resolved
@luky116 luky116 changed the title feat: add bpop cmd feat: add brpop cmd Nov 30, 2024
src/base_cmd.cc Outdated Show resolved Hide resolved
src/base_cmd.h Outdated Show resolved Hide resolved
src/base_cmd.h Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (10)
src/base_cmd.h (3)

222-236: Document thread safety guarantees.

The BlockedConnNode class manages shared state across threads. Please add documentation about thread safety guarantees and synchronization requirements.


301-304: Add documentation for new methods.

Please add documentation for ServeAndUnblockConns and BlockThisClientToWaitLRPush methods describing:

  • Purpose and functionality
  • Parameter requirements
  • Thread safety guarantees
  • Error handling behavior

346-354: Consider improving hash function to reduce collisions.

The current hash implementation using XOR can lead to collisions when keys have similar patterns. Consider using a better combining function like boost::hash_combine or a more sophisticated hash combination method.

struct BlockKeyHash {
  std::size_t operator()(const BlockKey& k) const {
    std::size_t seed = std::hash<int>{}(k.db_id);
    // Similar to boost::hash_combine
    seed ^= std::hash<std::string>{}(k.key) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
    return seed;
  }
};
src/base_cmd.cc (4)

109-123: Consider optimizing for multiple keys.

The current implementation acquires and releases the lock for each key in the loop. Consider collecting all new keys first and then performing a single lock operation.

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
                                         std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
  std::vector<kiwi::BlockKey> block_keys;
  block_keys.reserve(keys.size());
  for (const auto& key : keys) {
    block_keys.emplace_back(kiwi::BlockKey{client->GetCurrentDB(), key});
  }
  
  std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
  auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
  for (const auto& block_key : block_keys) {
    auto it = key_to_conns.find(block_key);
    if (it == key_to_conns.end()) {
      key_to_conns.emplace(block_key, std::make_unique<std::list<BlockedConnNode>>());
      it = key_to_conns.find(block_key);
    }
    it->second->emplace_back(expire_time, client, type);
  }
}

141-141: Improve iterator increment logic.

The iterator increment is not consistently handled in the loop. Consider moving it to the loop statement for better readability and maintenance.

for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end(); ) {
  // ... existing code ...
  conn_blocked = waitting_list->erase(conn_blocked);
  // Remove the manual increment as it's handled in the loop
}

167-167: Use specific error codes.

The error handling uses a generic error message. Consider using specific error codes or messages that better describe the failure condition.

BlockedClient->SetRes(CmdRes::kErrOther, fmt::format("Failed to serve blocked connection: {}", s.ToString()));

176-185: Optimize time comparison.

The current implementation converts time points to milliseconds for comparison. Consider comparing time points directly for better efficiency.

bool BlockedConnNode::IsExpired(std::chrono::system_clock::time_point now) {
  if (expire_time_ == 0) {
    return false;
  }
  auto expire_point = std::chrono::system_clock::time_point(std::chrono::milliseconds(expire_time_));
  return now >= expire_point;
}
src/storage/src/redis_lists.cc (1)

749-794: Implementation matches RPop but requires external locking.

The implementation correctly follows the same logic as RPop but skips internal locking. This is suitable for scenarios where locking is handled at a higher level.

Consider adding documentation about locking requirements.

Add a comment explaining that this method requires external locking to prevent race conditions.

+// RPopWithoutLock removes and returns elements from the right end of the list without acquiring internal locks.
+// IMPORTANT: This method requires external locking to be thread-safe.
 Status Redis::RPopWithoutLock(const Slice& key, int64_t count, std::vector<std::string>* elements) {
src/storage/src/storage.cc (1)

149-151: Consider making lock manager parameters configurable.

The lock manager is initialized with hardcoded values (1000 buckets, 0 timeout). Consider making these configurable through StorageOptions.

 struct StorageOptions {
   // ... existing fields ...
+  size_t lock_mgr_buckets = 1000;
+  uint32_t lock_mgr_timeout_ms = 0;
 };

 Status Storage::Open(const StorageOptions& storage_options, const std::string& db_path) {
   // ... existing code ...
-  lock_mgr_ = std::make_shared<LockMgr>(1000, 0, std::make_shared<MutexFactoryImpl>());
+  lock_mgr_ = std::make_shared<LockMgr>(storage_options.lock_mgr_buckets,
+                                       storage_options.lock_mgr_timeout_ms,
+                                       std::make_shared<MutexFactoryImpl>());
src/kiwi.h (1)

66-71: Consider using encapsulation for internal state management.

The public accessor methods directly expose internal data structures. Consider providing more controlled interfaces that encapsulate the implementation details.

-  std::unordered_map<kiwi::BlockKey, std::unique_ptr<std::list<kiwi::BlockedConnNode>>, kiwi::BlockKeyHash>&
-  GetMapFromKeyToConns() {
-    return key_to_blocked_conns_;
-  }
-
-  std::shared_mutex& GetBlockMtx() { return block_mtx_; };
+  // Add a connection to the blocked list for a key
+  void AddBlockedConnection(const BlockKey& key, std::unique_ptr<BlockedConnNode> node);
+  
+  // Remove a connection from the blocked list
+  void RemoveBlockedConnection(const BlockKey& key, const BlockedConnNode& node);
+  
+  // Get all blocked connections for a key
+  std::vector<BlockedConnNode*> GetBlockedConnections(const BlockKey& key);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4b3acdc and 5b0332f.

📒 Files selected for processing (14)
  • src/base_cmd.cc (1 hunks)
  • src/base_cmd.h (4 hunks)
  • src/cmd_admin.cc (1 hunks)
  • src/cmd_keys.cc (1 hunks)
  • src/cmd_list.cc (5 hunks)
  • src/cmd_list.h (1 hunks)
  • src/cmd_table_manager.cc (1 hunks)
  • src/kiwi.cc (2 hunks)
  • src/kiwi.h (2 hunks)
  • src/storage/include/storage/storage.h (4 hunks)
  • src/storage/src/redis.cc (1 hunks)
  • src/storage/src/redis.h (2 hunks)
  • src/storage/src/redis_lists.cc (1 hunks)
  • src/storage/src/storage.cc (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/cmd_keys.cc
  • src/cmd_table_manager.cc
  • src/storage/src/redis.cc
  • src/cmd_list.h
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/cmd_list.cc

[performance] 101-101: Variable 'downSince' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


[performance] 203-203: Variable 'func_' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

🔇 Additional comments (13)
src/base_cmd.h (1)

145-146: LGTM!

The new command name constants follow the existing naming convention and are properly placed in the list commands section.

src/storage/src/redis.h (2)

38-38: Dependency injection of lock manager improves flexibility.

The constructor now accepts a shared lock manager instance, enabling better control over concurrency management and resource sharing.


254-254: Verify thread safety of RPopWithoutLock usage.

The method allows bypassing internal locking mechanisms. Ensure proper synchronization is handled by the caller to prevent race conditions.

Consider documenting the thread-safety requirements in the method comment.

✅ Verification successful

Thread safety properly handled in RPopWithoutLock usage

The method is safely used in BRPopCmd::DoCmd where proper synchronization is ensured through MultiScopeRecordLock before calling RPopWithoutLock. The locking scope covers all list keys, preventing potential race conditions.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all usages of RPopWithoutLock to verify proper lock handling
rg -A 5 "RPopWithoutLock"

Length of output: 2525


Script:

#!/bin/bash
# Get more context around the usage in cmd_list.cc
rg -B 10 -A 5 "RPopWithoutLock.*elements" src/cmd_list.cc

# Look for any documentation about thread safety
rg -i "thread.*safe|concurrency|synchronization" README.md docs/

Length of output: 781

src/storage/include/storage/storage.h (3)

31-31: LGTM!

The include directive is correctly placed with other internal includes.


579-580: LGTM!

The method declaration is consistent with the implementation in redis_lists.cc.


1120-1121: LGTM!

The lock manager is appropriately stored as a shared pointer, allowing safe sharing across Redis instances.

src/storage/src/storage.cc (1)

909-914: LGTM!

The implementation correctly delegates to the appropriate Redis instance after clearing the elements vector.

src/kiwi.h (1)

107-120: Well-documented member variables with clear purposes.

The documentation clearly explains the purpose of key_to_blocked_conns_ and its associated mutex, making the code more maintainable.

src/kiwi.cc (2)

282-284: Well-implemented timer task for periodic cleanup.

The timer task is correctly set up with an appropriate interval and proper binding to the member function.


163-192: ⚠️ Potential issue

Fix potential race condition in map access.

The method accesses GetMapFromKeyToConns() after acquiring the lock, which could lead to a race condition if the map is modified between these operations.

 void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
   std::vector<kiwi::BlockKey> keys_need_remove;
 
   std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
-  auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
-  for (auto& it : key_to_blocked_conns) {
+  for (auto& it : key_to_blocked_conns_) {
     auto& conns_list = it.second;
     // ... rest of the loop
   }
 
   for (auto& remove_key : keys_need_remove) {
-    key_to_blocked_conns.erase(remove_key);
+    key_to_blocked_conns_.erase(remove_key);
   }
 }

Likely invalid or redundant comment.

src/cmd_list.cc (2)

185-204: Well-implemented timeout validation in BRPopCmd.

The timeout validation is thorough and handles all edge cases:

  • Validates timeout value format
  • Checks for negative values
  • Correctly converts to milliseconds
🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 203-203: Variable 'func_' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


206-226: Well-structured blocking operation implementation.

The implementation follows a good pattern:

  1. Tries non-blocking operation first
  2. Uses proper locking with MultiScopeRecordLock
  3. Falls back to blocking operation if needed
src/cmd_admin.cc (1)

596-597: Consistent implementation of key setting and connection unblocking.

The implementation follows the established pattern of setting the key and unblocking connections after successful operations.

Comment on lines +175 to +180
BLPopCmd::BLPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BLPopCmd::DoInitial(PClient* client) { return true; }

void BLPopCmd::DoCmd(PClient* client) {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement BLPopCmd functionality.

The BLPopCmd has an empty implementation. It should mirror BRPopCmd's functionality but for the left side of the list.

Would you like me to generate the implementation for BLPopCmd?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
src/cmd_list.cc (1)

176-181: ⚠️ Potential issue

Implement BLPopCmd functionality.

The BLPopCmd has an empty implementation. It should mirror BRPopCmd's functionality but for the left side of the list.

Here's the implementation:

 bool BLPopCmd::DoInitial(PClient* client) {
+  std::vector<std::string> keys(client->argv_.begin() + 1, client->argv_.end() - 1);
+  client->SetKey(keys);
+
+  int64_t timeout = 0;
+  if (!pstd::String2int(client->argv_.back(), &timeout)) {
+    client->SetRes(CmdRes::kInvalidInt);
+    return false;
+  }
+  if (timeout < 0) {
+    client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value");
+    return false;
+  }
+  if (timeout > 0) {
+    auto now = std::chrono::system_clock::now();
+    expire_time_ =
+        std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count() + timeout * 1000;
+  }
+  return true;
 }

 void BLPopCmd::DoCmd(PClient* client) {
+  std::vector<std::string> elements;
+  std::vector<std::string> list_keys(client->Keys().begin(), client->Keys().end());
+  storage::MultiScopeRecordLock(PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys);
+  for (auto& list_key : list_keys) {
+    storage::Status s =
+        PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPopWithoutLock(list_key, 1, &elements);
+    if (s.ok()) {
+      client->AppendArrayLen(2);
+      client->AppendString(list_key);
+      client->AppendString(elements[0]);
+      return;
+    } else if (s.IsNotFound()) {
+      continue;
+    } else {
+      client->SetRes(CmdRes::kErrOther, s.ToString());
+      return;
+    }
+  }
+  BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BLPop);
 }
🧹 Nitpick comments (7)
src/kiwi.h (1)

66-71: Consider encapsulating the blocking connection management.

The public getters GetMapFromKeyToConns() and GetBlockMtx() expose internal implementation details. This could lead to thread safety issues if not used carefully.

Consider providing higher-level methods that encapsulate the blocking operations instead of exposing the raw map and mutex:

-  std::unordered_map<kiwi::BlockKey, std::unique_ptr<std::list<kiwi::BlockedConnNode>>, kiwi::BlockKeyHash>&
-  GetMapFromKeyToConns() {
-    return key_to_blocked_conns_;
-  }
-  std::shared_mutex& GetBlockMtx() { return block_mtx_; };
+  void AddBlockedConnection(const BlockKey& key, std::unique_ptr<BlockedConnNode> node);
+  void RemoveBlockedConnection(const BlockKey& key, const BlockedConnNode& node);
+  std::vector<BlockedConnNode> GetBlockedConnections(const BlockKey& key);
src/base_cmd.cc (2)

109-123: Add input validation for expire_time.

The BlockThisClientToWaitLRPush method should validate the expire_time parameter to ensure it's not in the past.

Add validation before blocking the client:

 void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
                                           std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
+  if (expire_time < 0) {
+    return;
+  }
+  if (expire_time > 0) {
+    int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(
+                      std::chrono::system_clock::now())
+                      .time_since_epoch()
+                      .count();
+    if (expire_time <= now) {
+      return;
+    }
+  }
   std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
   // ... rest of the implementation

176-185: Optimize time point creation in IsExpired.

The now parameter should be passed by const reference to avoid unnecessary copies.

-bool BlockedConnNode::IsExpired(std::chrono::system_clock::time_point now) {
+bool BlockedConnNode::IsExpired(const std::chrono::system_clock::time_point& now) {
   if (expire_time_ == 0) {
     return false;
   }
   int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
   if (expire_time_ <= now_in_ms) {
     return true;
   }
   return false;
 }
src/storage/src/redis_lists.cc (1)

749-794: Implementation looks solid but consider adding documentation.

The implementation correctly handles:

  • Memory management
  • Error handling
  • Type validation
  • Atomic batch operations
  • Resource cleanup
  • Statistics tracking

Consider adding documentation to explain:

  • The purpose of this non-locking variant
  • When it should be used vs the locking version
  • Thread-safety implications
+// RPopWithoutLock removes and returns elements from the right end of a list without acquiring locks.
+// This method is designed to be used in conjunction with blocking list operations where locks are
+// managed externally. Not thread-safe - caller must ensure proper synchronization.
 Status Redis::RPopWithoutLock(const Slice& key, int64_t count, std::vector<std::string>* elements) {
src/storage/src/storage.cc (1)

149-149: Consider making lock manager parameters configurable.

The lock manager is initialized with hardcoded values (1000 locks, 0 timeout). Consider making these configurable through StorageOptions.

-  lock_mgr_ = std::make_shared<LockMgr>(1000, 0, std::make_shared<MutexFactoryImpl>());
+  lock_mgr_ = std::make_shared<LockMgr>(
+      storage_options.lock_manager_size,
+      storage_options.lock_manager_timeout,
+      std::make_shared<MutexFactoryImpl>());
src/base_cmd.h (2)

303-306: Consider adding documentation for the new methods.

These methods play a crucial role in the blocking operations. Adding documentation would help other developers understand their purpose and usage.

Add documentation like this:

+  /**
+   * @brief Serve blocked connections and unblock them
+   * @param client The client that triggered the unblocking
+   */
   void ServeAndUnblockConns(PClient* client);

+  /**
+   * @brief Block a client waiting for LPUSH/RPUSH operations
+   * @param keys List of keys to wait on
+   * @param expire_time Expiration time in milliseconds
+   * @param client The client to be blocked
+   * @param type Type of blocking operation (BLPOP/BRPOP)
+   */
   void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
                                   std::shared_ptr<PClient> client, BlockedConnNode::Type type);

354-356: Consider using a better hash combining function.

The current XOR-based hash function can lead to hash collisions when db_id and key have similar patterns. Consider using a more robust hash combining function.

Here's an improved implementation:

   std::size_t operator()(const BlockKey& k) const {
-    return std::hash<int>{}(k.db_id) ^ std::hash<std::string>{}(k.key);
+    std::size_t h1 = std::hash<int>{}(k.db_id);
+    std::size_t h2 = std::hash<std::string>{}(k.key);
+    return h1 + 0x9e3779b9 + (h2 << 6) + (h2 >> 2);
   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b0332f and 94b9a33.

📒 Files selected for processing (13)
  • src/base_cmd.cc (1 hunks)
  • src/base_cmd.h (4 hunks)
  • src/cmd_admin.cc (1 hunks)
  • src/cmd_keys.cc (1 hunks)
  • src/cmd_list.cc (5 hunks)
  • src/cmd_table_manager.cc (1 hunks)
  • src/kiwi.cc (2 hunks)
  • src/kiwi.h (2 hunks)
  • src/storage/include/storage/storage.h (4 hunks)
  • src/storage/src/redis.cc (1 hunks)
  • src/storage/src/redis.h (2 hunks)
  • src/storage/src/redis_lists.cc (1 hunks)
  • src/storage/src/storage.cc (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/cmd_admin.cc
  • src/cmd_keys.cc
  • src/cmd_table_manager.cc
  • src/storage/src/redis.cc
  • src/storage/include/storage/storage.h
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/cmd_list.cc

[performance] 203-203: Variable 'func_' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)

🔇 Additional comments (10)
src/kiwi.h (1)

114-120: LGTM: Well-documented member variables with clear thread safety considerations.

The member variables are well documented and use appropriate data structures. The use of shared_mutex ensures thread-safe access to the blocked connections map.

src/kiwi.cc (1)

290-293: LGTM: Well-configured timer task.

The timer task is properly configured with a reasonable interval and correct use of std::bind.

src/storage/src/redis.h (2)

38-38: LGTM: Constructor updated to support thread-safe operations.

The addition of lock_mgr parameter enables proper synchronization for blocking operations.


254-254: LGTM: Added RPopWithoutLock method to support non-blocking list operations.

This method complements the blocking list commands implementation by providing a way to pop elements without acquiring locks.

src/storage/src/storage.cc (2)

151-151: LGTM: Proper initialization of Redis instances with lock manager.

The lock manager is correctly passed to each Redis instance.


909-914: LGTM: Clean implementation of RPopWithoutLock.

The method properly:

  1. Clears the output vector
  2. Gets the correct Redis instance
  3. Delegates to the instance's RPopWithoutLock
src/base_cmd.h (1)

224-238: LGTM! Well-structured BlockedConnNode class.

The class properly uses std::shared_ptr for client management and includes all necessary fields for blocking operations.

src/cmd_list.cc (3)

31-31: LGTM! Proper placement of ServeAndUnblockConns calls.

The ServeAndUnblockConns calls are correctly placed after successful push operations to unblock waiting clients.

Also applies to: 77-77, 102-102


186-205: LGTM! Proper timeout validation in BRPopCmd::DoInitial.

The implementation correctly:

  • Validates timeout value
  • Handles negative timeout
  • Calculates expiration time
🧰 Tools
🪛 cppcheck (2.10-2)

[performance] 203-203: Variable 'func_' is assigned in constructor body. Consider performing initialization in initialization list.

(useInitializationList)


207-227: LGTM! Well-implemented blocking mechanism in BRPopCmd::DoCmd.

The implementation correctly:

  • Uses proper locking mechanism
  • Tries all keys before blocking
  • Properly formats the response
  • Blocks the client if no data is available

Comment on lines +125 to +174
void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};

std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}

auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
auto BlockedClient = conn_blocked->GetBlockedClient();

if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle potential exceptions in ServeAndUnblockConns.

The method should use RAII to ensure resources are properly cleaned up if SendPacket throws an exception.

Consider using a scope guard or try-catch block:

 void BaseCmd::ServeAndUnblockConns(PClient* client) {
   kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};
   std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
   auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
   auto it = key_to_conns.find(key);
   if (it == key_to_conns.end()) {
     return;
   }
   auto& waitting_list = it->second;
   std::vector<std::string> elements;
   storage::Status s;
   for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
     auto BlockedClient = conn_blocked->GetBlockedClient();
+    try {
       if (BlockedClient->State() == ClientState::kClosed) {
         conn_blocked = waitting_list->erase(conn_blocked);
         g_kiwi->CleanBlockedNodes(BlockedClient);
         continue;
       }
       // ... rest of the implementation
       BlockedClient->SendPacket();
       conn_blocked = waitting_list->erase(conn_blocked);
       g_kiwi->CleanBlockedNodes(BlockedClient);
+    } catch (...) {
+      conn_blocked = waitting_list->erase(conn_blocked);
+      g_kiwi->CleanBlockedNodes(BlockedClient);
+      throw;
+    }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;
// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
auto BlockedClient = conn_blocked->GetBlockedClient();
if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}
switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}
if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
}
}
void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;
// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served"
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
auto BlockedClient = conn_blocked->GetBlockedClient();
try {
if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}
switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}
if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
} catch (...) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
throw;
}
}
}

Comment on lines +202 to +220
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential iterator invalidation in CleanBlockedNodes.

The method modifies the list while iterating, which could lead to iterator invalidation.

 void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
   std::vector<kiwi::BlockKey> blocked_keys;
   for (auto key : client->Keys()) {
     blocked_keys.emplace_back(client->GetCurrentDB(), key);
   }
   auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto& blocked_key : blocked_keys) {
     const auto& it = key_to_blocked_conns.find(blocked_key);
     if (it != key_to_blocked_conns.end()) {
       auto& conns_list = it->second;
-      for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
+      for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
         if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
-          conns_list->erase(conn_node);
+          conn_node = conns_list->erase(conn_node);
           break;
+        } else {
+          ++conn_node;
         }
       }
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conn_node = conns_list->erase(conn_node);
break;
} else {
++conn_node;
}
}
}
}
}

Comment on lines +171 to +200
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;

std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}

for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for SendPacket in ScanEvictedBlockedConnsOfBlrpop.

The method should handle potential exceptions from SendPacket to ensure proper cleanup.

 void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
   std::vector<kiwi::BlockKey> keys_need_remove;
   std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
   auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto& it : key_to_blocked_conns) {
     auto& conns_list = it.second;
     for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
       auto conn_ptr = conn_node->GetBlockedClient();
+      try {
         if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
           conn_node = conns_list->erase(conn_node);
           CleanBlockedNodes(conn_ptr);
         } else if (conn_node->IsExpired()) {
           conn_ptr->AppendString("");
           conn_ptr->SendPacket();
           conn_node = conns_list->erase(conn_node);
           CleanBlockedNodes(conn_ptr);
         } else {
           ++conn_node;
         }
+      } catch (...) {
+        conn_node = conns_list->erase(conn_node);
+        CleanBlockedNodes(conn_ptr);
+      }
     }
     if (conns_list->empty()) {
       keys_need_remove.push_back(it.first);
     }
   }
   for (auto& remove_key : keys_need_remove) {
     key_to_blocked_conns.erase(remove_key);
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;
std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}
for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;
std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
try {
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
} catch (...) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}
for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants