Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add brpop cmd #48

Open
wants to merge 13 commits into
base: unstable
Choose a base branch
from
87 changes: 86 additions & 1 deletion src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

#include "common.h"
#include "config.h"
#include "log.h"
#include "kiwi.h"
#include "log.h"
#include "praft/praft.h"

namespace kiwi {
Expand Down Expand Up @@ -106,6 +106,91 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, PClient* client,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockThisClientToWaitLRPush 这个方法没有看到哪里有调用

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看 pika 那边用到了这个方法, 可以补齐

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@浩林

BlockedConnNode::Type type) {
std::unique_lock<std::shared_mutex> latch(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
std::shared_ptr<std::atomic<bool>> is_done = std::make_shared<std::atomic<bool>>(false);
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};

auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
auto& wait_list_of_this_key = it->second;
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
wait_list_of_this_key->emplace_back(expire_time, client, type, is_done);
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
}
}

void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};

std::shared_lock<std::shared_mutex> read_latch(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
read_latch.unlock();
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved

std::unique_lock<std::shared_mutex> write_lock(g_kiwi->GetBlockMtx());
auto& waitting_list = it->second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto waitting_list = it->second; 这样写就行了吧, 不需要加 引用, 或者加了 引用有什么特殊作用吗

std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的迭代器 在哪里 ++ 的, 不在 for()的最后 ++conn_blocked 是出于什么考虑

if (conn_blocked->is_done_->exchange(true)) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

PClient* BlockedClient = (*conn_blocked).GetBlockedClient();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

指针强转用 reinterpret_cast


if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的错误码能兼容redis吗

}
BlockedClient->SendPacket();
conn_blocked = waitting_list->erase(conn_blocked); // remove this conn from current waiting list
}
}

bool BlockedConnNode::IsExpired() {
if (expire_time_ == 0) {
return false;
}
auto now = std::chrono::system_clock::now();
int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断是否过期, 从调用 IsExpired 的地方传递一个 时间戳过来, 会不会更好,或者提供一个函数重载,可以通过外部传入时间戳, 避免在多次调用时,重复获取当前时间

if (expire_time_ <= now_in_ms) {
return true;
}
return false;
}

bool BaseCmdGroup::DoInitial(PClient* client) {
client->SetSubCmdName(client->argv_[1]);
if (!subCmds_.contains(client->SubCmdName())) {
Expand Down
36 changes: 36 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ const std::string kCmdNameRPush = "rpush";
const std::string kCmdNameRPushx = "rpushx";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameBRPop = "brpop";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLTrim = "ltrim";
Expand Down Expand Up @@ -210,6 +212,23 @@ enum AclCategory {
kAclCategoryRaft = (1 << 21),
};

class BlockedConnNode {
public:
enum Type { BLPop = 0, BRPop };
virtual ~BlockedConnNode() {}
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
BlockedConnNode(int64_t expire_time, PClient* client, Type type, std::shared_ptr<std::atomic<bool>> is_done)
: expire_time_(expire_time), client_(client), type_(type), is_done_(is_done) {}
bool IsExpired();
PClient* GetBlockedClient() { return client_; }
std::shared_ptr<std::atomic<bool>> is_done_;
Type GetCmdType() { return type_; }

private:
Type type_;
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
int64_t expire_time_;
PClient* client_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

尽量使用 std::week_ptr 因为在 client由网络层持有, 并且管理声明周期, 如果client被释放,但是这里还保存了原来的地址, 这里的 client* 就成了野指针, 调用会出错.

};

/**
* @brief Base class for all commands
* BaseCmd, as the base class for all commands, mainly implements some common functions
Expand Down Expand Up @@ -273,6 +292,11 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(PClient* client);

void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, PClient* client,
BlockedConnNode::Type type);

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down Expand Up @@ -312,4 +336,16 @@ class BaseCmdGroup : public BaseCmd {
private:
std::map<std::string, std::unique_ptr<BaseCmd>> subCmds_;
};

struct BlockKey { // this data struct is made for the scenario of multi dbs in pika.
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
int db_id;
yeyeye2333 marked this conversation as resolved.
Show resolved Hide resolved
std::string key;
bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; }
};
struct BlockKeyHash {
std::size_t operator()(const BlockKey& k) const {
return std::hash<int>{}(k.db_id) ^ std::hash<std::string>{}(k.key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里依然会出现hash碰撞的情况吧,只是概率大小的问题

}
};

} // namespace kiwi
2 changes: 2 additions & 0 deletions src/cmd_admin.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ void SortCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
client->SetKey(store_key_);
ServeAndUnblockConns(client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_keys.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ void RenameCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
client->SetKey(client->argv_[2]);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else {
Expand Down
4 changes: 4 additions & 0 deletions src/cmd_list.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void LPushCmd::DoCmd(PClient* client) {
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -74,6 +75,8 @@ void RPoplpushCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value);
if (s.ok()) {
client->AppendString(value);
client->SetKey(receiver_);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->AppendStringLen(-1);
} else if (s.IsInvalidArgument()) {
Expand All @@ -98,6 +101,7 @@ void RPushCmd::DoCmd(PClient* client) {
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down
25 changes: 25 additions & 0 deletions src/cmd_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd {
private:
void DoCmd(PClient* client) override;
};

class BLPopCmd : public BaseCmd {
public:
BLPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class BRPopCmd : public BaseCmd {
public:
BRPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class LRangeCmd : public BaseCmd {
public:
LRangeCmd(const std::string& name, int16_t arity);
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(LPush, -3);
ADD_COMMAND(RPush, -3);
ADD_COMMAND(RPop, 2);
ADD_COMMAND(BLPop, -3);
ADD_COMMAND(BRPop, -3);
ADD_COMMAND(LRem, 4);
ADD_COMMAND(LRange, 4);
ADD_COMMAND(LTrim, 4);
Expand Down
26 changes: 25 additions & 1 deletion src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& cl
client->OnConnect();
}

void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::unique_lock<std::shared_mutex> latch(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
if (conn_node->is_done_->exchange(true) || conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
} else if (conn_node->IsExpired()) {
PClient* conn_ptr = conn_node->GetBlockedClient();
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
} else {
conn_node++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++conn_node; 会不会更好, 还有上面两个 if 分支里 不需要 ++ 吗

}
}
}
}

bool KiwiDB::Init() {
char runid[kRunidSize + 1] = "";
getRandomHexChars(runid, kRunidSize);
Expand Down Expand Up @@ -201,7 +221,7 @@ bool KiwiDB::Init() {
PREPL.SetMasterAddr(g_config.master_ip.ToString().c_str(), g_config.master_port.load());
}

event_server_ =std::make_unique<net::EventServer<std::shared_ptr<PClient>>>(num);
event_server_ = std::make_unique<net::EventServer<std::shared_ptr<PClient>>>(num);

event_server_->SetRwSeparation(true);

Expand Down Expand Up @@ -232,6 +252,10 @@ bool KiwiDB::Init() {
timerTask->SetCallback([]() { PREPL.Cron(); });
event_server_->AddTimerTask(timerTask);

auto BLRPopTimerTask = std::make_shared<net::CommonTimerTask>(250);
BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this));
event_server_->AddTimerTask(BLRPopTimerTask);

time(&start_time_s_);

return true;
Expand Down
27 changes: 24 additions & 3 deletions src/kiwi.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ class KiwiDB final {
event_server_->SendPacket(client, std::move(msg));
}

std::unordered_map<kiwi::BlockKey, std::unique_ptr<std::list<kiwi::BlockedConnNode>>, kiwi::BlockKeyHash>&
GetMapFromKeyToConns() {
return key_to_blocked_conns_;
}

std::shared_mutex& GetBlockMtx() { return block_mtx_; };

void ScanEvictedBlockedConnsOfBlrpop();
inline void SendPacket2Client(const std::shared_ptr<kiwi::PClient>& client, std::string&& msg) {
event_server_->SendPacket(client, std::move(msg));
}

inline void CloseConnection(const std::shared_ptr<kiwi::PClient>& client) {
event_server_->CloseConnection(client);
}
inline void CloseConnection(const std::shared_ptr<kiwi::PClient>& client) { event_server_->CloseConnection(client); }

void TCPConnect(
const net::SocketAddr& addr,
Expand Down Expand Up @@ -88,6 +94,21 @@ class KiwiDB final {
uint32_t cmd_id_ = 0;

time_t start_time_s_ = 0;

/*
* Blpop/BRpop used
*/
/* key_to_blocked_conns_:
* mapping from key to a list that stored the nodes of client-connections that
* were blocked by command blpop/brpop with key.
*/
std::unordered_map<kiwi::BlockKey, std::unique_ptr<std::list<kiwi::BlockedConnNode>>, kiwi::BlockKeyHash>
key_to_blocked_conns_;

/*
* latch of above map.
*/
std::shared_mutex block_mtx_;
};

extern std::unique_ptr<KiwiDB> g_kiwi;
Loading