Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add brpop cmd #48

Open
wants to merge 13 commits into
base: unstable
Choose a base branch
from
79 changes: 79 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,85 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};

auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
auto& wait_list_of_this_key = it->second;
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
wait_list_of_this_key->emplace_back(expire_time, client, type);
}
}

void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};

std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}

auto& waitting_list = it->second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto waitting_list = it->second; 这样写就行了吧, 不需要加 引用, 或者加了 引用有什么特殊作用吗

std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的迭代器 在哪里 ++ 的, 不在 for()的最后 ++conn_blocked 是出于什么考虑

auto BlockedClient = conn_blocked->GetBlockedClient();

if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的错误码能兼容redis吗

}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
}
}
Comment on lines +125 to +174
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle potential exceptions in ServeAndUnblockConns.

The method should use RAII to ensure resources are properly cleaned up if SendPacket throws an exception.

Consider using a scope guard or try-catch block:

 void BaseCmd::ServeAndUnblockConns(PClient* client) {
   kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};
   std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
   auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
   auto it = key_to_conns.find(key);
   if (it == key_to_conns.end()) {
     return;
   }
   auto& waitting_list = it->second;
   std::vector<std::string> elements;
   storage::Status s;
   for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
     auto BlockedClient = conn_blocked->GetBlockedClient();
+    try {
       if (BlockedClient->State() == ClientState::kClosed) {
         conn_blocked = waitting_list->erase(conn_blocked);
         g_kiwi->CleanBlockedNodes(BlockedClient);
         continue;
       }
       // ... rest of the implementation
       BlockedClient->SendPacket();
       conn_blocked = waitting_list->erase(conn_blocked);
       g_kiwi->CleanBlockedNodes(BlockedClient);
+    } catch (...) {
+      conn_blocked = waitting_list->erase(conn_blocked);
+      g_kiwi->CleanBlockedNodes(BlockedClient);
+      throw;
+    }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;
// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
auto BlockedClient = conn_blocked->GetBlockedClient();
if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}
switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}
if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
}
}
void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;
// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served"
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
auto BlockedClient = conn_blocked->GetBlockedClient();
try {
if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}
switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}
if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
} catch (...) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
throw;
}
}
}


bool BlockedConnNode::IsExpired(std::chrono::system_clock::time_point now) {
if (expire_time_ == 0) {
return false;
}
int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断是否过期, 从调用 IsExpired 的地方传递一个 时间戳过来, 会不会更好,或者提供一个函数重载,可以通过外部传入时间戳, 避免在多次调用时,重复获取当前时间

if (expire_time_ <= now_in_ms) {
return true;
}
return false;
}

bool BaseCmdGroup::DoInitial(PClient* client) {
client->SetSubCmdName(client->argv_[1]);
if (!subCmds_.contains(client->SubCmdName())) {
Expand Down
35 changes: 35 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ const std::string kCmdNameRPush = "rpush";
const std::string kCmdNameRPushx = "rpushx";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameBRPop = "brpop";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLTrim = "ltrim";
Expand Down Expand Up @@ -217,6 +219,22 @@ enum AclCategory {
kAclCategoryRaft = (1 << 21),
};

class BlockedConnNode {
public:
enum Type { BLPop = 0, BRPop };
virtual ~BlockedConnNode() =default;
BlockedConnNode(int64_t expire_time, std::shared_ptr<PClient> client, Type type)
: expire_time_(expire_time), client_(client), type_(type) {}
bool IsExpired(std::chrono::system_clock::time_point now = std::chrono::system_clock::now());
std::shared_ptr<PClient> GetBlockedClient() { return client_; }
Type GetCmdType() { return type_; }

private:
Type type_;
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
int64_t expire_time_;
std::shared_ptr<PClient> client_;
};

/**
* @brief Base class for all commands
* BaseCmd, as the base class for all commands, mainly implements some common functions
Expand Down Expand Up @@ -280,6 +298,11 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(PClient* client);

void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, std::shared_ptr<PClient> client,
BlockedConnNode::Type type);

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down Expand Up @@ -319,4 +342,16 @@ class BaseCmdGroup : public BaseCmd {
private:
std::map<std::string, std::unique_ptr<BaseCmd>> subCmds_;
};

struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi.
int db_id;
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
std::string key;
bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; }
};
struct BlockKeyHash {
std::size_t operator()(const BlockKey& k) const {
return std::hash<int>{}(k.db_id) ^ std::hash<std::string>{}(k.key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里依然会出现hash碰撞的情况吧,只是概率大小的问题

}
};

} // namespace kiwi
2 changes: 2 additions & 0 deletions src/cmd_admin.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,8 @@ void SortCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
client->SetKey(store_key_);
ServeAndUnblockConns(client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down
4 changes: 2 additions & 2 deletions src/cmd_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CmdConfig : public BaseCmdGroup {
private:
// std::vector<std::string> subCmd_;

void DoCmd(PClient* client) override{};
void DoCmd(PClient* client) override {};
};

class CmdConfigGet : public BaseCmd {
Expand Down Expand Up @@ -256,7 +256,7 @@ class CmdDebug : public BaseCmdGroup {
bool DoInitial(PClient* client) override { return true; };

private:
void DoCmd(PClient* client) override{};
void DoCmd(PClient* client) override {};
};

class CmdDebugHelp : public BaseCmd {
Expand Down
1 change: 1 addition & 0 deletions src/cmd_keys.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ void RenameCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
client->SetKey(client->argv_[2]);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else {
Expand Down
62 changes: 58 additions & 4 deletions src/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cmd_list.h"
#include "pstd_string.h"
#include "src/scope_record_lock.h"
#include "store.h"

namespace kiwi {
Expand All @@ -27,6 +28,7 @@ void LPushCmd::DoCmd(PClient* client) {
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -60,10 +62,6 @@ RPoplpushCmd::RPoplpushCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool RPoplpushCmd::DoInitial(PClient* client) {
if (((arity_ > 0 && client->argv_.size() != arity_) || (arity_ < 0 && client->argv_.size() < -arity_))) {
client->SetRes(CmdRes::kWrongNum, kCmdNameRPoplpush);
return false;
}
source_ = client->argv_[1];
receiver_ = client->argv_[2];
return true;
Expand All @@ -74,6 +72,8 @@ void RPoplpushCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value);
if (s.ok()) {
client->AppendString(value);
client->SetKey(receiver_);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->AppendString("");
} else if (s.IsInvalidArgument()) {
Expand All @@ -98,6 +98,7 @@ void RPushCmd::DoCmd(PClient* client) {
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -171,6 +172,59 @@ void RPopCmd::DoCmd(PClient* client) {
}
}

BLPopCmd::BLPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BLPopCmd::DoInitial(PClient* client) { return true; }

void BLPopCmd::DoCmd(PClient* client) {}
Comment on lines +176 to +181
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement BLPopCmd functionality.

The BLPopCmd has an empty implementation. It should mirror BRPopCmd's functionality but for the left side of the list.

Would you like me to generate the implementation for BLPopCmd?


BRPopCmd::BRPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BRPopCmd::DoInitial(PClient* client) {
std::vector<std::string> keys(client->argv_.begin() + 1, client->argv_.end() - 1);
client->SetKey(keys);

int64_t timeout = 0;
if (!pstd::String2int(client->argv_.back(), &timeout)) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}
if (timeout < 0) {
client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value");
return false;
}
if (timeout > 0) {
auto now = std::chrono::system_clock::now();
expire_time_ =
std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count() + timeout * 1000;
}
return true;
}

void BRPopCmd::DoCmd(PClient* client) {
std::vector<std::string> elements;
std::vector<std::string> list_keys(client->Keys().begin(), client->Keys().end());
storage::MultiScopeRecordLock(PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys);
for (auto& list_key : list_keys) {
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPopWithoutLock(list_key, 1, &elements);
if (s.ok()) {
client->AppendArrayLen(2);
client->AppendString(list_key);
client->AppendString(elements[0]);
return;
} else if (s.IsNotFound()) {
continue;
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BRPop);
}

LRangeCmd::LRangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategoryList) {}

Expand Down
25 changes: 25 additions & 0 deletions src/cmd_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd {
private:
void DoCmd(PClient* client) override;
};

class BLPopCmd : public BaseCmd {
public:
BLPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class BRPopCmd : public BaseCmd {
public:
BRPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class LRangeCmd : public BaseCmd {
public:
LRangeCmd(const std::string& name, int16_t arity);
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(LPush, -3);
ADD_COMMAND(RPush, -3);
ADD_COMMAND(RPop, 2);
ADD_COMMAND(BLPop, -3);
ADD_COMMAND(BRPop, -3);
ADD_COMMAND(LRem, 4);
ADD_COMMAND(LRange, 4);
ADD_COMMAND(LTrim, 4);
Expand Down
55 changes: 55 additions & 0 deletions src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,57 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& cl
ClientMap::getInstance().AddClient(client->GetUniqueID(), client);
}

void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;

std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}

for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}
Comment on lines +171 to +200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for SendPacket in ScanEvictedBlockedConnsOfBlrpop.

The method should handle potential exceptions from SendPacket to ensure proper cleanup.

 void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
   std::vector<kiwi::BlockKey> keys_need_remove;
   std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
   auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto& it : key_to_blocked_conns) {
     auto& conns_list = it.second;
     for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
       auto conn_ptr = conn_node->GetBlockedClient();
+      try {
         if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
           conn_node = conns_list->erase(conn_node);
           CleanBlockedNodes(conn_ptr);
         } else if (conn_node->IsExpired()) {
           conn_ptr->AppendString("");
           conn_ptr->SendPacket();
           conn_node = conns_list->erase(conn_node);
           CleanBlockedNodes(conn_ptr);
         } else {
           ++conn_node;
         }
+      } catch (...) {
+        conn_node = conns_list->erase(conn_node);
+        CleanBlockedNodes(conn_ptr);
+      }
     }
     if (conns_list->empty()) {
       keys_need_remove.push_back(it.first);
     }
   }
   for (auto& remove_key : keys_need_remove) {
     key_to_blocked_conns.erase(remove_key);
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;
std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}
for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;
std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
try {
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
} catch (...) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}
for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}


void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}
Comment on lines +202 to +220
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential iterator invalidation in CleanBlockedNodes.

The method modifies the list while iterating, which could lead to iterator invalidation.

 void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
   std::vector<kiwi::BlockKey> blocked_keys;
   for (auto key : client->Keys()) {
     blocked_keys.emplace_back(client->GetCurrentDB(), key);
   }
   auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto& blocked_key : blocked_keys) {
     const auto& it = key_to_blocked_conns.find(blocked_key);
     if (it != key_to_blocked_conns.end()) {
       auto& conns_list = it->second;
-      for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
+      for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
         if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
-          conns_list->erase(conn_node);
+          conn_node = conns_list->erase(conn_node);
           break;
+        } else {
+          ++conn_node;
         }
       }
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conn_node = conns_list->erase(conn_node);
break;
} else {
++conn_node;
}
}
}
}
}


bool KiwiDB::Init() {
char runid[kRunidSize + 1] = "";
getRandomHexChars(runid, kRunidSize);
Expand Down Expand Up @@ -228,6 +279,10 @@ bool KiwiDB::Init() {
timerTask->SetCallback([]() { PREPL.Cron(); });
event_server_->AddTimerTask(timerTask);

auto BLRPopTimerTask = std::make_shared<net::CommonTimerTask>(250);
BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this));
event_server_->AddTimerTask(BLRPopTimerTask);

time(&start_time_s_);

return true;
Expand Down
Loading
Loading