Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: zset cmd zremrangebyrank, zrange, zscore, zrangebylex, zrevrangebylex #187

Merged
merged 12 commits into from
Mar 26, 2024
3 changes: 3 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ const std::string kCmdNameLInsert = "linsert";
const std::string kCmdNameZAdd = "zadd";
const std::string kCmdNameZRevrange = "zrevrange";
const std::string kCmdNameZRangebyscore = "zrangebyscore";
const std::string kCmdNameZRemRangeByRank = "zremrangebyrank";
const std::string kCmdNameZCard = "zcard";
const std::string kCmdNameZScore = "zscore";
const std::string kCmdNameZRange = "zrange";

enum CmdFlags {
kCmdFlagsWrite = (1 << 0), // May modify the dataset
Expand Down
3 changes: 3 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(ZAdd, -4);
ADD_COMMAND(ZRevrange, -4);
ADD_COMMAND(ZRangebyscore, -4);
ADD_COMMAND(ZRemRangeByRank, 4);
ADD_COMMAND(ZCard, 2);
ADD_COMMAND(ZScore, 3);
ADD_COMMAND(ZRange, -4);
}

std::pair<BaseCmd*, CmdRes::CmdRet> CmdTableManager::GetCommand(const std::string& cmdName, PClient* client) {
Expand Down
170 changes: 169 additions & 1 deletion src/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,172 @@ void ZCardCmd::DoCmd(PClient* client) {
}
client->AppendInteger(reply_Num);
}
} // namespace pikiwidb

ZRemRangeByRankCmd::ZRemRangeByRankCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryString) {}

bool ZRemRangeByRankCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}

void ZRemRangeByRankCmd::DoCmd(PClient* client) {
int32_t ret = 0;
int32_t start = 0;
int32_t end = 0;

if (pstd::String2int(client->argv_[2], &start) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
if (pstd::String2int(client->argv_[3], &end) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}

storage::Status s;
s = PSTORE.GetBackend(client->GetCurrentDB())->ZRemrangebyrank(client->Key(), start, end, &ret);
if (s.ok() || s.IsNotFound()) {
client->AppendInteger(ret);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZRangeCmd::ZRangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySortedSet) {}

bool ZRangeCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}

void ZRangeCmd::DoCmd(PClient* client) {
double start = 0;
double stop = 0;
int64_t count = -1;
int64_t offset = 0;
bool with_scores = false;
bool by_score = false;
bool by_lex = false;
bool left_close = false;
bool right_close = false;
bool is_rev = false;
int32_t ret = DoScoreStrRange(client->argv_[2], client->argv_[3], &left_close, &right_close, &start, &stop);
if (ret == -1) {
client->SetRes(CmdRes::kErrOther, "start or stop is not a float");
return;
}
size_t argc = client->argv_.size();
if (argc >= 5) {
size_t index = 4;
while (index < argc) {
if (strcasecmp(client->argv_[index].data(), "byscore") == 0) {
by_score = true;
} else if (strcasecmp(client->argv_[index].data(), "bylex") == 0) {
by_lex = true;
} else if (strcasecmp(client->argv_[index].data(), "rev") == 0) {
is_rev = true;
} else if (strcasecmp(client->argv_[index].data(), "withscores") == 0) {
with_scores = true;
} else if (strcasecmp(client->argv_[index].data(), "limit") == 0) {
if (index + 3 > argc) {
client->SetRes(CmdRes::kSyntaxErr);
return;
}
index++;
if (pstd::String2int(client->argv_[index].data(), client->argv_[index].size(), &offset) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
index++;
if (pstd::String2int(client->argv_[index].data(), client->argv_[index].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
} else {
client->SetRes(CmdRes::kSyntaxErr);
return;
}
index++;
}
}
if (by_score && by_lex) {
client->SetRes(CmdRes::kSyntaxErr);
return;
}

std::vector<storage::ScoreMember> score_members;
std::vector<std::string> members;
storage::Status s;
if (!is_rev) {
if (by_score) {
s = PSTORE.GetBackend(client->GetCurrentDB())
->ZRangebyscore(client->Key(), start, stop, left_close, right_close, count, offset, &score_members);
} else if (by_lex) {
s = PSTORE.GetBackend(client->GetCurrentDB())
->ZRangebylex(client->Key(), client->argv_[2], client->argv_[3], left_close, right_close, &members);
} else {
s = PSTORE.GetBackend(client->GetCurrentDB())->ZRange(client->Key(), start, stop, &score_members);
}
} else {
if (by_score) {
s = PSTORE.GetBackend(client->GetCurrentDB())
->ZRevrangebyscore(client->Key(), start, stop, left_close, right_close, count, offset, &score_members);
} else if (by_lex) {
s = PSTORE.GetBackend(client->GetCurrentDB())
->ZRevrangebylex(client->Key(), client->argv_[2], client->argv_[3], left_close, right_close, count,
offset, &members);
} else {
s = PSTORE.GetBackend(client->GetCurrentDB())->ZRevrange(client->Key(), start, stop, &score_members);
}
}
if (!s.ok() && !s.IsNotFound()) {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
}
// TODO(taota csx) bylex cmd's return.
FitLimit(count, offset, static_cast<int64_t>(score_members.size()));
size_t m_start = offset;
size_t m_end = offset + count;
if (with_scores) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(count * 2);
for (; m_start < m_end; m_start++) {
client->AppendStringLenUint64(score_members[m_start].member.size());
client->AppendContent(score_members[m_start].member);
len = pstd::D2string(buf, sizeof(buf), score_members[m_start].score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->AppendArrayLen(count);
for (; m_start < m_end; m_start++) {
client->AppendStringLenUint64(score_members[m_start].member.size());
client->AppendContent(score_members[m_start].member);
}
}
}

ZScoreCmd::ZScoreCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategoryString) {}

bool ZScoreCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}

void ZScoreCmd::DoCmd(PClient* client) {
double score = 0;

storage::Status s;
s = PSTORE.GetBackend(client->GetCurrentDB())->ZScore(client->Key(), client->argv_[2], &score);
if (s.ok() || s.IsNotFound()) {
client->AppendString(std::to_string(score));
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

} // namespace pikiwidb
35 changes: 34 additions & 1 deletion src/cmd_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ class ZRangebyscoreCmd : public BaseCmd {
void DoCmd(PClient *client) override;
};

class ZRemRangeByRankCmd : public BaseCmd {
callme-taota marked this conversation as resolved.
Show resolved Hide resolved
public:
ZRemRangeByRankCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZCardCmd : public BaseCmd {
public:
ZCardCmd(const std::string &name, int16_t arity);
Expand All @@ -56,4 +67,26 @@ class ZCardCmd : public BaseCmd {
void DoCmd(PClient *client) override;
};

} // namespace pikiwidb
class ZRangeCmd : public BaseCmd {
callme-taota marked this conversation as resolved.
Show resolved Hide resolved
public:
ZRangeCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZScoreCmd : public BaseCmd {
callme-taota marked this conversation as resolved.
Show resolved Hide resolved
public:
ZScoreCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

} // namespace pikiwidb
3 changes: 3 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ class Storage {
Status ZRevrangebyscore(const Slice& key, double min, double max, bool left_close, bool right_close, int64_t count,
int64_t offset, std::vector<ScoreMember>* score_members);

Status ZRevrangebylex(const Slice& key, const Slice& min, const Slice& max, bool left_close, bool right_close,
int64_t count, int64_t offset, std::vector<std::string>* members);

// Returns the rank of member in the sorted set stored at key, with the scores
// ordered from high to low. The rank (or index) is 0-based, which means that
// the member with the highest score has rank 0.
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ class Redis {
Status ZRevrange(const Slice& key, int32_t start, int32_t stop, std::vector<ScoreMember>* score_members);
Status ZRevrangebyscore(const Slice& key, double min, double max, bool left_close, bool right_close, int64_t count,
int64_t offset, std::vector<ScoreMember>* score_members);
Status ZRevrangebylex(const Slice& key, const Slice& min, const Slice& max, bool left_close, bool right_close,
int64_t count, int64_t offset, std::vector<std::string>* members);
Status ZRevrank(const Slice& key, const Slice& member, int32_t* rank);
Status ZScore(const Slice& key, const Slice& member, double* score);
Status ZGetAll(const Slice& key, double weight, std::map<std::string, double>* value_to_dest);
Expand Down
58 changes: 58 additions & 0 deletions src/storage/src/redis_zsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,64 @@ Status Redis::ZRevrangebyscore(const Slice& key, double min, double max, bool le
return s;
}

Status Redis::ZRevrangebylex(const Slice& key, const Slice& min, const Slice& max, bool left_close, bool right_close,
int64_t count, int64_t offset, std::vector<std::string>* members) {
members->clear();
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot = nullptr;

std::string meta_value;
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;

bool left_no_limit = min.compare("-") == 0;
bool right_not_limit = max.compare("+") == 0;

BaseMetaKey base_meta_key(key);
Status s = db_->Get(read_options, handles_[kZsetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
ParsedZSetsMetaValue parsed_zsets_meta_value(&meta_value);
if (parsed_zsets_meta_value.IsStale() || parsed_zsets_meta_value.Count() == 0) {
return Status::NotFound();
} else {
uint64_t version = parsed_zsets_meta_value.Version();
int32_t left = parsed_zsets_meta_value.Count();
int64_t skipped = 0;
ZSetsMemberKey zsets_member_key(key, version, Slice());
KeyStatisticsDurationGuard guard(this, DataType::kZSets, key.ToString());
rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[kZsetsDataCF]);
for (iter->SeekForPrev(zsets_member_key.Encode()); iter->Valid() && left > 0; iter->Prev(), --left) {
bool left_pass = false;
bool right_pass = false;
ParsedZSetsMemberKey parsed_zsets_member_key(iter->key());
Slice member = parsed_zsets_member_key.member();
if (left_no_limit || (left_close && min.compare(member) >= 0) || (!left_close && min.compare(member) > 0)) {
left_pass = true;
}
if (right_not_limit || (right_close && max.compare(member) <= 0) || (!right_close && max.compare(member) < 0)) {
right_pass = true;
}
if (left_pass && right_pass) {
// skip offset
if (skipped < offset) {
++skipped;
continue;
}
members->push_back(member.ToString());
if (count > 0 && members->size() == static_cast<size_t>(count)) {
break;
}
}
if (!right_pass) {
break;
}
}
delete iter;
}
}
return s;
}

Status Redis::ZRevrank(const Slice& key, const Slice& member, int32_t* rank) {
*rank = -1;
rocksdb::ReadOptions read_options;
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,13 @@ Status Storage::ZRangebylex(const Slice& key, const Slice& min, const Slice& max
return inst->ZRangebylex(key, min, max, left_close, right_close, members);
}

Status Storage::ZRevrangebylex(const Slice& key, const Slice& min, const Slice& max, bool left_close, bool right_close,
int64_t count, int64_t offset, std::vector<std::string>* members) {
members->clear();
auto& inst = GetDBInstance(key);
return inst->ZRevrangebylex(key, min, max, left_close, right_close, count, offset, members);
}

Status Storage::ZLexcount(const Slice& key, const Slice& min, const Slice& max, bool left_close, bool right_close,
int32_t* ret) {
auto& inst = GetDBInstance(key);
Expand Down
Loading
Loading