Skip to content

Commit

Permalink
Fixed issue with exiting subscribe mode
Browse files Browse the repository at this point in the history
Signed-off-by: Nikhil Manglore <[email protected]>
  • Loading branch information
Nikhil-Manglore committed Dec 12, 2024
1 parent 3eb8314 commit c21bf9d
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/valkey-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ static struct config {
int shutdown;
int monitor_mode;
int pubsub_mode;
int pubsub_channel_count;
int pubsub_pattern_count;
int pubsub_shard_count;
int pubsub_total_count;
int blocking_state_aborted; /* used to abort monitor_mode and pubsub_mode. */
int latency_mode;
int latency_dist_mode;
Expand Down Expand Up @@ -2224,6 +2228,31 @@ static int cliReadReply(int output_raw_strings) {
fflush(stdout);
sdsfree(out);
}

/* Handle pubsub mode */
if (config.pubsub_mode) {
if (isPubsubPush(reply)) {
if (reply->elements >= 3) {
char *cmd = reply->element[0]->str;
int count = reply->element[2]->integer;

if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0) {
config.pubsub_channel_count = count;
} else if (strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) {
config.pubsub_pattern_count = count;
} else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) {
config.pubsub_shard_count = count;
}
config.pubsub_total_count = config.pubsub_channel_count + config.pubsub_pattern_count + config.pubsub_shard_count;

if (config.pubsub_total_count == 0) {
config.pubsub_mode = 0;
cliRefreshPrompt();
}
}
}
}

return REDIS_OK;
}

Expand Down Expand Up @@ -9493,6 +9522,10 @@ int main(int argc, char **argv) {
config.shutdown = 0;
config.monitor_mode = 0;
config.pubsub_mode = 0;
config.pubsub_channel_count = 0;
config.pubsub_pattern_count = 0;
config.pubsub_shard_count = 0;
config.pubsub_total_count = 0;
config.blocking_state_aborted = 0;
config.latency_mode = 0;
config.latency_dist_mode = 0;
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/pubsub.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,79 @@ start_server {tags {"pubsub network"}} {
assert_equal [r read] {message foo vaz}
} {} {resp3}

test "SUBSCRIBE and UNSUBSCRIBE with multiple channels" {
# Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels.
set rd1 [valkey_deferring_client]

assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
assert_equal {chan1 1 chan2 1 chan3 1} [r pubsub numsub chan1 chan2 chan3]
assert_equal {2} [unsubscribe $rd1 {chan2}]
assert_equal {chan1 1 chan2 0 chan3 1} [r pubsub numsub chan1 chan2 chan3]
unsubscribe $rd1

set unsub1 [$rd1 read]
set unsub2 [$rd1 read]

assert {[lindex $unsub1 0] eq "unsubscribe" && [lindex $unsub2 0] eq "unsubscribe"}
assert {([lindex $unsub1 1] eq "chan1" && [lindex $unsub2 1] eq "chan3") ||
([lindex $unsub1 1] eq "chan3" && [lindex $unsub2 1] eq "chan1")}
assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0}
assert_equal {chan1 0 chan2 0 chan3 0} [r pubsub numsub chan1 chan2 chan3]

$rd1 ping
assert_equal {PONG} [$rd1 read]

$rd1 close
}

test "PSUBSCRIBE and PUNSUBSCRIBE with multiple patterns" {
# Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels.
set rd1 [valkey_deferring_client]

assert_equal {1 2 3} [psubscribe $rd1 {chan1.* chan2.* chan3.*}]
assert_equal 3 [r pubsub numpat]
assert_equal {2} [punsubscribe $rd1 {chan2.*}]
assert_equal 2 [r pubsub numpat]
punsubscribe $rd1

set unsub1 [$rd1 read]
set unsub2 [$rd1 read]

assert {[lindex $unsub1 0] eq "punsubscribe" && [lindex $unsub2 0] eq "punsubscribe"}
assert {([lindex $unsub1 1] eq "chan1.*" && [lindex $unsub2 1] eq "chan3.*") ||
([lindex $unsub1 1] eq "chan3.*" && [lindex $unsub2 1] eq "chan1.*")}
assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0}
assert_equal 0 [r pubsub numpat]

$rd1 ping
assert_equal {PONG} [$rd1 read]

$rd1 close
}

test "SSUBSCRIBE and SUNSUBSCRIBE with multiple shard channels" {
# Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels.
set rd1 [valkey_deferring_client]

assert_equal {1 2 3} [ssubscribe $rd1 {schan1 schan2 schan3}]
assert_equal {schan1 1 schan2 1 schan3 1} [r pubsub shardnumsub schan1 schan2 schan3]
assert_equal {2} [sunsubscribe $rd1 {schan2}]
assert_equal {schan1 1 schan2 0 schan3 1} [r pubsub shardnumsub schan1 schan2 schan3]
sunsubscribe $rd1

set unsub1 [$rd1 read]
set unsub2 [$rd1 read]

assert {[lindex $unsub1 0] eq "sunsubscribe" && [lindex $unsub2 0] eq "sunsubscribe"}
assert {([lindex $unsub1 1] eq "schan1" && [lindex $unsub2 1] eq "schan3") ||
([lindex $unsub1 1] eq "schan3" && [lindex $unsub2 1] eq "schan1")}
assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0}

assert_equal {schan1 0 schan2 0 schan3 0} [r pubsub shardnumsub schan1 schan2 schan3]

$rd1 ping
assert_equal {PONG} [$rd1 read]

$rd1 close
}
}

0 comments on commit c21bf9d

Please sign in to comment.