Skip to content

Commit

Permalink
Add paused_purpose to INFO CLIENTS
Browse files Browse the repository at this point in the history
In valkey-io#1519, we added paused_actions and paused_timeout_milliseconds,
it would be helpful if we add the paused_purpose since users also
want to know the purpose for the pause.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jan 15, 2025
1 parent c5a1585 commit cf2a680
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 7 deletions.
22 changes: 20 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4520,12 +4520,30 @@ void flushReplicasOutputBuffers(void) {
}
}

mstime_t getPausedActionTimeout(uint32_t action) {
char *getPausedPurposeString(pause_purpose purpose) {
switch (purpose) {
case PAUSE_BY_CLIENT_COMMAND:
return "client_command";
case PAUSE_DURING_SHUTDOWN:
return "during_shutdown";
case PAUSE_DURING_FAILOVER:
return "during_failover";
case NUM_PAUSE_PURPOSES:
return "none";
default:
return "Unknown pause purpose";
}
}

mstime_t getPausedActionTimeout(uint32_t action, pause_purpose *purpose) {
mstime_t timeout = 0;
*purpose = NUM_PAUSE_PURPOSES;
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
pause_event *p = &(server.client_pause_per_purpose[i]);
if (p->paused_actions & action && (p->end - server.mstime) > timeout)
if (p->paused_actions & action && (p->end - server.mstime) > timeout) {
timeout = p->end - server.mstime;
*purpose = i;
}
}
return timeout;
}
Expand Down
9 changes: 7 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -5669,14 +5669,18 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
getExpansiveClientsInfo(&maxin, &maxout);
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys);

pause_purpose purpose;
char *paused_purpose = "none";
char *paused_actions = "none";
long long paused_timeout = 0;
if (server.paused_actions & PAUSE_ACTION_CLIENT_ALL) {
paused_actions = "all";
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_ALL);
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_ALL, &purpose);
paused_purpose = getPausedPurposeString(purpose);
} else if (server.paused_actions & PAUSE_ACTION_CLIENT_WRITE) {
paused_actions = "write";
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_WRITE);
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_WRITE, &purpose);
paused_purpose = getPausedPurposeString(purpose);
}

if (sections++) info = sdscat(info, "\r\n");
Expand All @@ -5696,6 +5700,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"total_watched_keys:%lu\r\n", watched_keys,
"total_blocking_keys:%lu\r\n", blocking_keys,
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey,
"paused_purpose:%s\r\n", paused_purpose,
"paused_actions:%s\r\n", paused_actions,
"paused_timeout_milliseconds:%lld\r\n", paused_timeout));
}
Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2714,7 +2714,8 @@ void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions);
void unpauseActions(pause_purpose purpose);
uint32_t isPausedActions(uint32_t action_bitmask);
uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask);
mstime_t getPausedActionTimeout(uint32_t action);
char *getPausedPurposeString(pause_purpose purpose);
mstime_t getPausedActionTimeout(uint32_t action, pause_purpose *purpose);
void updatePausedActions(void);
void unblockPostponedClients(void);
void processEventsWhileBlocked(void);
Expand Down
26 changes: 24 additions & 2 deletions tests/unit/pause.tcl
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
start_server {tags {"pause network"}} {
test "Test check paused_actions in info stats" {
test "Test check paused info in info clients" {
assert_equal [s paused_purpose] "none"
assert_equal [s paused_actions] "none"
assert_equal [s paused_timeout_milliseconds] 0

r client PAUSE 10000 WRITE
assert_equal [s paused_purpose] "client_command"
assert_equal [s paused_actions] "write"
after 1000
set timeout [s paused_timeout_milliseconds]
Expand All @@ -13,9 +15,14 @@ start_server {tags {"pause network"}} {
r multi
r client PAUSE 1000 ALL
r info clients
assert_match "*paused_actions:all*" [r exec]
set res [r exec]
assert_match "*paused_purpose:client_command*" $res
assert_match "*paused_actions:all*" $res

r client unpause
assert_equal [s paused_purpose] "none"
assert_equal [s paused_actions] "none"
assert_equal [s paused_timeout_milliseconds] 0
}

test "Test read commands are not blocked by client pause" {
Expand Down Expand Up @@ -408,3 +415,18 @@ start_server {tags {"pause network"}} {
# Make sure we unpause at the end
r client unpause
}

start_cluster 1 1 {tags {"external:skip cluster pause network"}} {
test "Test check paused info during the cluster failover in info clients" {
assert_equal [s 0 paused_purpose] "none"
assert_equal [s 0 paused_actions] "none"
assert_equal [s 0 paused_timeout_milliseconds] 0

R 1 cluster failover
wait_for_log_messages 0 {"*Manual failover requested by replica*"} 0 10 1000

assert_equal [s 0 paused_purpose] "during_failover"
assert_equal [s 0 paused_actions] "write"
assert_morethan [s 0 paused_timeout_milliseconds] 0
}
}

0 comments on commit cf2a680

Please sign in to comment.