Skip to content

Commit

Permalink
fix:add class DB & sharedmutex (#218)
Browse files Browse the repository at this point in the history
* add class DB & mutex
  • Loading branch information
dingxiaoshuai123 authored Mar 21, 2024
1 parent 6c6eb71 commit d187d65
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 106 deletions.
9 changes: 9 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,19 @@ bool BaseCmd::CheckArg(size_t num) const {
std::vector<std::string> BaseCmd::CurrentKey(PClient* client) const { return std::vector<std::string>{client->Key()}; }

void BaseCmd::Execute(PClient* client) {
auto dbIndex = client->GetCurrentDB();
if (!isExclusive()) {
PSTORE.GetBackend(dbIndex)->LockShared();
}

if (!DoInitial(client)) {
return;
}
DoCmd(client);

if (!isExclusive()) {
PSTORE.GetBackend(dbIndex)->UnLockShared();
}
}

std::string BaseCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum,
Expand Down
4 changes: 4 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <vector>

#include "client.h"
#include "store.h"

namespace pikiwidb {

Expand Down Expand Up @@ -137,6 +138,7 @@ enum CmdFlags {
kCmdFlagsProtected = (1 << 12), // Don't accept in scripts
kCmdFlagsModuleNoCluster = (1 << 13), // No cluster mode support
kCmdFlagsNoMulti = (1 << 14), // Cannot be pipelined
kCmdFlagsExclusive = (1 << 15), // May change Storage pointer, like pika's kCmdFlagsSuspend
};

enum AclCategory {
Expand Down Expand Up @@ -272,6 +274,8 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdId() const;

bool isExclusive() { return static_cast<bool>(flag_ & kCmdFlagsExclusive); }

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down
33 changes: 19 additions & 14 deletions src/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void HSetCmd::DoCmd(PClient* client) {
auto value = client->argv_[i + 1];
int32_t temp = 0;
// TODO(century): current bw doesn't support multiple fvs, fix it when necessary
s = PSTORE.GetBackend(client->GetCurrentDB())->HSet(client->Key(), field, value, &temp);
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HSet(client->Key(), field, value, &temp);
if (s.ok()) {
ret += temp;
} else {
Expand All @@ -61,7 +61,7 @@ bool HGetCmd::DoInitial(PClient* client) {
void HGetCmd::DoCmd(PClient* client) {
PString value;
auto field = client->argv_[2];
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->HGet(client->Key(), field, &value);
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HGet(client->Key(), field, &value);
if (s.ok()) {
client->AppendString(value);
} else if (s.IsNotFound()) {
Expand All @@ -82,7 +82,7 @@ bool HDelCmd::DoInitial(PClient* client) {
void HDelCmd::DoCmd(PClient* client) {
int32_t res{};
std::vector<std::string> fields(client->argv_.begin() + 2, client->argv_.end());
auto s = PSTORE.GetBackend(client->GetCurrentDB())->HDel(client->Key(), fields, &res);
auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HDel(client->Key(), fields, &res);
if (!s.ok() && !s.IsNotFound()) {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
Expand All @@ -104,7 +104,7 @@ bool HMSetCmd::DoInitial(PClient* client) {
}

void HMSetCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->HMSet(client->Key(), client->Fvs());
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HMSet(client->Key(), client->Fvs());
if (s.ok()) {
client->SetRes(CmdRes::kOK);
} else {
Expand All @@ -125,7 +125,7 @@ bool HMGetCmd::DoInitial(PClient* client) {

void HMGetCmd::DoCmd(PClient* client) {
std::vector<storage::ValueStatus> vss;
auto s = PSTORE.GetBackend(client->GetCurrentDB())->HMGet(client->Key(), client->Fields(), &vss);
auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HMGet(client->Key(), client->Fields(), &vss);
if (s.ok() || s.IsNotFound()) {
client->AppendArrayLenUint64(vss.size());
for (size_t i = 0; i < vss.size(); ++i) {
Expand Down Expand Up @@ -160,6 +160,7 @@ void HGetAllCmd::DoCmd(PClient* client) {
do {
fvs.clear();
s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->HScan(client->Key(), cursor, "*", PIKIWIDB_SCAN_STEP_LENGTH, &fvs, &next_cursor);
if (!s.ok()) {
raw.clear();
Expand Down Expand Up @@ -199,7 +200,7 @@ bool HKeysCmd::DoInitial(PClient* client) {

void HKeysCmd::DoCmd(PClient* client) {
std::vector<std::string> fields;
auto s = PSTORE.GetBackend(client->GetCurrentDB())->HKeys(client->Key(), &fields);
auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HKeys(client->Key(), &fields);
if (s.ok() || s.IsNotFound()) {
client->AppendArrayLenUint64(fields.size());
for (const auto& field : fields) {
Expand All @@ -223,7 +224,7 @@ bool HLenCmd::DoInitial(PClient* client) {

void HLenCmd::DoCmd(PClient* client) {
int32_t len = 0;
auto s = PSTORE.GetBackend(client->GetCurrentDB())->HLen(client->Key(), &len);
auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HLen(client->Key(), &len);
if (s.ok() || s.IsNotFound()) {
client->AppendInteger(len);
} else {
Expand All @@ -241,7 +242,7 @@ bool HStrLenCmd::DoInitial(PClient* client) {

void HStrLenCmd::DoCmd(PClient* client) {
int32_t len = 0;
auto s = PSTORE.GetBackend(client->GetCurrentDB())->HStrlen(client->Key(), client->argv_[2], &len);
auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HStrlen(client->Key(), client->argv_[2], &len);
if (s.ok() || s.IsNotFound()) {
client->AppendInteger(len);
} else {
Expand Down Expand Up @@ -288,8 +289,9 @@ void HScanCmd::DoCmd(PClient* client) {
// execute command
std::vector<storage::FieldValue> fvs;
int64_t next_cursor{};
auto status =
PSTORE.GetBackend(client->GetCurrentDB())->HScan(client->Key(), cursor, pattern, count, &fvs, &next_cursor);
auto status = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->HScan(client->Key(), cursor, pattern, count, &fvs, &next_cursor);
if (!status.ok() && !status.IsNotFound()) {
client->SetRes(CmdRes::kErrOther, status.ToString());
return;
Expand All @@ -315,7 +317,7 @@ bool HValsCmd::DoInitial(PClient* client) {

void HValsCmd::DoCmd(PClient* client) {
std::vector<std::string> valueVec;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->HVals(client->Key(), &valueVec);
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HVals(client->Key(), &valueVec);
if (s.ok() || s.IsNotFound()) {
client->AppendStringVector(valueVec);
} else {
Expand Down Expand Up @@ -344,6 +346,7 @@ void HIncrbyFloatCmd::DoCmd(PClient* client) {
}
std::string newValue;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->HIncrbyfloat(client->Key(), client->argv_[2], client->argv_[3], &newValue);
if (s.ok() || s.IsNotFound()) {
client->AppendString(newValue);
Expand All @@ -363,7 +366,9 @@ bool HSetNXCmd::DoInitial(PClient* client) {
void HSetNXCmd::DoCmd(PClient* client) {
int32_t temp = 0;
storage::Status s;
s = PSTORE.GetBackend(client->GetCurrentDB())->HSetnx(client->Key(), client->argv_[2], client->argv_[3], &temp);
s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->HSetnx(client->Key(), client->argv_[2], client->argv_[3], &temp);
if (s.ok()) {
client->AppendInteger(temp);
} else {
Expand All @@ -389,7 +394,7 @@ void HIncrbyCmd::DoCmd(PClient* client) {

int64_t temp = 0;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->HIncrby(client->Key(), client->argv_[2], int_by, &temp);
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HIncrby(client->Key(), client->argv_[2], int_by, &temp);
if (s.ok() || s.IsNotFound()) {
client->AppendInteger(temp);
} else {
Expand Down Expand Up @@ -431,7 +436,7 @@ void HRandFieldCmd::DoCmd(PClient* client) {

// execute command
std::vector<std::string> res;
auto s = PSTORE.GetBackend(client->GetCurrentDB())->HRandField(client->Key(), count, with_values, &res);
auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HRandField(client->Key(), count, with_values, &res);
if (s.IsNotFound()) {
client->AppendString("");
return;
Expand Down
12 changes: 6 additions & 6 deletions src/cmd_keys.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bool DelCmd::DoInitial(PClient* client) {
}

void DelCmd::DoCmd(PClient* client) {
int64_t count = PSTORE.GetBackend(client->GetCurrentDB())->Del(client->Keys());
int64_t count = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Del(client->Keys());
if (count >= 0) {
client->AppendInteger(count);
} else {
Expand All @@ -41,7 +41,7 @@ bool ExistsCmd::DoInitial(PClient* client) {
}

void ExistsCmd::DoCmd(PClient* client) {
int64_t count = PSTORE.GetBackend(client->GetCurrentDB())->Exists(client->Keys());
int64_t count = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Exists(client->Keys());
if (count >= 0) {
client->AppendInteger(count);
// if (PSTORE.ExistsKey(client->Key())) {
Expand All @@ -68,7 +68,7 @@ void PExpireCmd::DoCmd(PClient* client) {
client->SetRes(CmdRes ::kInvalidInt);
return;
}
auto res = PSTORE.GetBackend(client->GetCurrentDB())->Expire(client->Key(), msec / 1000);
auto res = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Expire(client->Key(), msec / 1000);
if (res != -1) {
client->AppendInteger(res);
} else {
Expand All @@ -90,7 +90,7 @@ void ExpireatCmd::DoCmd(PClient* client) {
client->SetRes(CmdRes ::kInvalidInt);
return;
}
auto res = PSTORE.GetBackend(client->GetCurrentDB())->Expireat(client->Key(), time_stamp);
auto res = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Expireat(client->Key(), time_stamp);
if (res != -1) {
client->AppendInteger(res);
} else {
Expand All @@ -113,7 +113,7 @@ void PExpireatCmd::DoCmd(PClient* client) {
client->SetRes(CmdRes ::kInvalidInt);
return;
}
auto res = PSTORE.GetBackend(client->GetCurrentDB())->Expireat(client->Key(), time_stamp_ms / 1000);
auto res = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Expireat(client->Key(), time_stamp_ms / 1000);
if (res != -1) {
client->AppendInteger(res);
} else {
Expand All @@ -131,7 +131,7 @@ bool PersistCmd::DoInitial(PClient* client) {

void PersistCmd::DoCmd(PClient* client) {
std::map<storage::DataType, storage::Status> type_status;
auto res = PSTORE.GetBackend(client->GetCurrentDB())->Persist(client->Key(), &type_status);
auto res = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Persist(client->Key(), &type_status);
if (res != -1) {
client->AppendInteger(res);
} else {
Expand Down
Loading

0 comments on commit d187d65

Please sign in to comment.