Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: better format of raft command #198

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 113 additions & 137 deletions src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,83 +5,68 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include <cassert>
#include <cstdint>
#include <optional>
#include <string>

#include "braft/configuration.h"
#include "client.h"
#include "cmd_raft.h"
#include "event_loop.h"
#include "log.h"
#include "pikiwidb.h"
#include "praft.h"
#include "pstd_status.h"
#include "pstd_string.h"

#define VALID_NODE_ID(x) ((x) > 0)

namespace pikiwidb {

RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {}

bool RaftNodeCmd::DoInitial(PClient* client) { return true; }

/* RAFT.NODE ADD [id] [address:port]
* Add a new node to the cluster. The [id] can be an explicit non-zero value,
* or zero to let the cluster choose one.
* Reply:
* -NOCLUSTER ||
* -LOADING ||
* -CLUSTERDOWN ||
* -MOVED <slot> <addr>:<port> ||
* *2
* :<new node id>
* :<dbid>
*
* RAFT.NODE REMOVE [id]
* Remove an existing node from the cluster.
* Reply:
* -NOCLUSTER ||
* -LOADING ||
* -CLUSTERDOWN ||
* -MOVED <slot> <addr>:<port> ||
* +OK
*/
void RaftNodeCmd::DoCmd(PClient* client) {
// Check whether it is a leader. If it is not a leader, return the leader information
if (!PRAFT.IsLeader()) {
return client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderId());
}

auto cmd = client->argv_[1];
if (!strcasecmp(cmd.c_str(), "ADD")) {
if (client->argv_.size() != 4) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}
pstd::StringToUpper(cmd);
if (cmd == kAddCmd) {
DoCmdAdd(client);
} else if (cmd == kRemoveCmd) {
DoCmdRemove(client);
} else {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD/REMOVE only");
}
}

// RedisRaft has nodeid, but in Braft, NodeId is IP:Port.
// So we do not need to parse and use nodeid like redis;
auto s = PRAFT.AddPeer(client->argv_[3]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else {
client->SetRes(CmdRes::kErrOther);
}
} else if (!strcasecmp(cmd.c_str(), "REMOVE")) {
if (client->argv_.size() != 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}
void RaftNodeCmd::DoCmdAdd(PClient* client) {
if (client->argv_.size() != 4) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

// (KKorpse)TODO: Redirect to leader if not leader.
auto s = PRAFT.RemovePeer(client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else {
client->SetRes(CmdRes::kErrOther);
}
// RedisRaft has nodeid, but in Braft, NodeId is IP:Port.
// So we do not need to parse and use nodeid like redis;
auto s = PRAFT.AddPeer(client->argv_[3]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else {
client->SetRes(CmdRes::kErrOther, fmt::format("Failed to add peer: {}", s.error_str()));
}
}

void RaftNodeCmd::DoCmdRemove(PClient* client) {
if (client->argv_.size() != 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

// (KKorpse)TODO: Redirect to leader if not leader.
auto s = PRAFT.RemovePeer(client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else {
client->SetRes(CmdRes::kErrOther, "ERR RAFT.NODE supports ADD / REMOVE only");
client->SetRes(CmdRes::kErrOther, fmt::format("Failed to remove peer: {}", s.error_str()));
}
}

Expand All @@ -90,109 +75,100 @@ RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity)

bool RaftClusterCmd::DoInitial(PClient* client) { return true; }

// The endpoint must be in the league format of ip:port
std::string GetIpFromEndPoint(std::string& endpoint) {
auto pos = endpoint.find(':');
if (pos == std::string::npos) {
return "";
void RaftClusterCmd::DoCmd(PClient* client) {
// parse arguments
if (client->argv_.size() < 2) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}
auto cmd = client->argv_[1];

if (PRAFT.IsInitialized()) {
return client->SetRes(CmdRes::kErrOther, "Already cluster member");
}

pstd::StringToUpper(cmd);
if (cmd == kInitCmd) {
DoCmdInit(client);
} else if (cmd == kJoinCmd) {
DoCmdJoin(client);
} else {
client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only");
}

return endpoint.substr(0, pos);
}

// The endpoint must be in the league format of ip:port
int GetPortFromEndPoint(std::string& endpoint) {
void RaftClusterCmd::DoCmdInit(PClient* client) {
if (client->argv_.size() != 2 && client->argv_.size() != 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

std::string cluster_id;
if (client->argv_.size() == 3) {
cluster_id = client->argv_[2];
if (cluster_id.size() != RAFT_DBID_LEN) {
return client->SetRes(CmdRes::kInvalidParameter,
"Cluster id must be " + std::to_string(RAFT_DBID_LEN) + " characters");
}
} else {
cluster_id = pstd::RandomHexChars(RAFT_DBID_LEN);
}
auto s = PRAFT.Init(cluster_id, false);
if (!s.ok()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str()));
}
client->SetRes(CmdRes::kOK);
}

static inline std::optional<std::pair<std::string, int32_t>> GetIpAndPortFromEndPoint(const std::string& endpoint) {
auto pos = endpoint.find(':');
if (pos == std::string::npos) {
return 0;
return std::nullopt;
}

int ret = 0;
int32_t ret = 0;
pstd::String2int(endpoint.substr(pos + 1), &ret);
return ret;
return {{endpoint.substr(0, pos), ret}};
}

/* RAFT.CLUSTER INIT <id>
* Initializes a new Raft cluster.
* <id> is an optional 32 character string, if set, cluster will use it for the id
* Reply:
* +OK [dbid]
*
* RAFT.CLUSTER JOIN [addr:port]
* Join an existing cluster.
* The operation is asynchronous and may take place/retry in the background.
* Reply:
* +OK
*/
void RaftClusterCmd::DoCmd(PClient* client) {
if (client->argv_.size() < 2) {
void RaftClusterCmd::DoCmdJoin(PClient* client) {
if (client->argv_.size() < 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

if (PRAFT.IsInitialized()) {
return client->SetRes(CmdRes::kErrOther, "ERR Already cluster member");
// (KKorpse)TODO: Support multiple nodes join at the same time.
if (client->argv_.size() > 3) {
return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments");
}

auto cmd = client->argv_[1];
if (!strcasecmp(cmd.c_str(), "INIT")) {
if (client->argv_.size() != 2 && client->argv_.size() != 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

std::string cluster_id;
if (client->argv_.size() == 3) {
cluster_id = client->argv_[2];
if (cluster_id.size() != RAFT_DBID_LEN) {
return client->SetRes(CmdRes::kInvalidParameter,
"ERR cluster id must be " + std::to_string(RAFT_DBID_LEN) + " characters");
}
} else {
cluster_id = pstd::RandomHexChars(RAFT_DBID_LEN);
}
auto s = PRAFT.Init(cluster_id, false);
if (!s.ok()) {
return client->SetRes(CmdRes::kErrOther, s.error_str());
}
client->SetRes(CmdRes::kOK);
} else if (!strcasecmp(cmd.c_str(), "JOIN")) {
if (client->argv_.size() < 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

// (KKorpse)TODO: Support multiple nodes join at the same time.
if (client->argv_.size() > 3) {
return client->SetRes(CmdRes::kInvalidParameter, "ERR too many arguments");
}

auto addr = client->argv_[2];
if (braft::PeerId(addr).is_empty()) {
return client->SetRes(CmdRes::kInvalidParameter, "ERR invalid ip::port: " + addr);
}
auto addr = client->argv_[2];
if (braft::PeerId(addr).is_empty()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr));
}

auto on_new_conn = [](TcpConnection* obj) {
if (g_pikiwidb) {
g_pikiwidb->OnNewConnection(obj);
}
};
auto fail_cb = [&](EventLoop* loop, const char* peer_ip, int port) {
PRAFT.OnJoinCmdConnectionFailed(loop, peer_ip, port);
};

auto loop = EventLoop::Self();
auto peer_ip = GetIpFromEndPoint(addr);
auto port = GetPortFromEndPoint(addr);
// FIXME: The client here is not smart pointer, may cause undefined behavior.
// should use shared_ptr in DoCmd() rather than raw pointer.
auto ret = PRAFT.GetJoinCtx().Set(client, peer_ip, port);
if (!ret) { // other clients have joined
client->SetRes(CmdRes::kErrOther, "other clients have joined");
} else {
loop->Connect(peer_ip.c_str(), port, on_new_conn, fail_cb);
// Not reply any message here, we will reply after the connection is established.
client->Clear();
auto on_new_conn = [](TcpConnection* obj) {
if (g_pikiwidb) {
g_pikiwidb->OnNewConnection(obj);
}
} else {
client->SetRes(CmdRes::kErrOther, "ERR RAFT.CLUSTER supports INIT / JOIN only");
};
auto on_fail = [&](EventLoop* loop, const char* peer_ip, int port) {
PRAFT.OnJoinCmdConnectionFailed(loop, peer_ip, port);
};

auto loop = EventLoop::Self();
auto ip_port = GetIpAndPortFromEndPoint(addr);
if (!ip_port.has_value()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr));
}
auto& [peer_ip, port] = *ip_port;
// FIXME: The client here is not smart pointer, may cause undefined behavior.
// should use shared_ptr in DoCmd() rather than raw pointer.
auto ret = PRAFT.GetJoinCtx().Set(client, peer_ip, port);
if (!ret) { // other clients have joined
return client->SetRes(CmdRes::kErrOther, "Other clients have joined");
}
loop->Connect(peer_ip.c_str(), port, on_new_conn, on_fail);
INFO("Sent join request to leader successfully");
// Not reply any message here, we will reply after the connection is established.
client->Clear();
}
} // namespace pikiwidb

} // namespace pikiwidb
47 changes: 45 additions & 2 deletions src/cmd_raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,33 @@

#pragma once

#include "braft/raft.h"
#include "brpc/server.h"
#include <string_view>

#include "base_cmd.h"

namespace pikiwidb {

/* RAFT.NODE ADD [id] [address:port]
* Add a new node to the cluster. The [id] can be an explicit non-zero value,
* or zero to let the cluster choose one.
* Reply:
* -NOCLUSTER ||
* -LOADING ||
* -CLUSTERDOWN ||
* -MOVED <slot> <addr>:<port> ||
* *2
* :<new node id>
* :<dbid>
*
* RAFT.NODE REMOVE [id]
* Remove an existing node from the cluster.
* Reply:
* -NOCLUSTER ||
* -LOADING ||
* -CLUSTERDOWN ||
* -MOVED <slot> <addr>:<port> ||
* +OK
*/
class RaftNodeCmd : public BaseCmd {
public:
RaftNodeCmd(const std::string &name, int16_t arity);
Expand All @@ -22,8 +43,25 @@ class RaftNodeCmd : public BaseCmd {

private:
void DoCmd(PClient *client) override;
void DoCmdAdd(PClient *client);
void DoCmdRemove(PClient *client);

static constexpr std::string_view kAddCmd = "ADD";
static constexpr std::string_view kRemoveCmd = "REMOVE";
};

/* RAFT.CLUSTER INIT <id>
* Initializes a new Raft cluster.
* <id> is an optional 32 character string, if set, cluster will use it for the id
* Reply:
* +OK [dbid]
*
* RAFT.CLUSTER JOIN [addr:port]
* Join an existing cluster.
* The operation is asynchronous and may take place/retry in the background.
* Reply:
* +OK
*/
class RaftClusterCmd : public BaseCmd {
public:
RaftClusterCmd(const std::string &name, int16_t arity);
Expand All @@ -33,6 +71,11 @@ class RaftClusterCmd : public BaseCmd {

private:
void DoCmd(PClient *client) override;
void DoCmdInit(PClient *client);
void DoCmdJoin(PClient *client);

static constexpr std::string_view kInitCmd = "INIT";
static constexpr std::string_view kJoinCmd = "JOIN";
};

} // namespace pikiwidb
Loading